view commons/launcher/tests/Test_LaunchBlastclust.py @ 31:0ab839023fe4

Uploaded
author m-zytnicki
date Tue, 30 Apr 2013 14:33:21 -0400
parents 94ab73e8a190
children
line wrap: on
line source

import unittest
import time
import os
from commons.launcher.LaunchBlastclust import LaunchBlastclust
from commons.core.utils.FileUtils import FileUtils

class Test_LaunchBlastclust( unittest.TestCase ):

    def setUp(self):
        self._iLaunchBlastclust = LaunchBlastclust()
        self._iLaunchBlastclust.setClean()
        self._uniqId = "%s_%s" % (time.strftime("%Y%m%d%H%M%S"), os.getpid())

    def tearDown(self):
        self._iLaunchBlastclust = None
        self._uniqId = None

    def test_getClustersFromTxtFile(self):
        inFileName = "dummyInFile_%s"  % self._uniqId
        inF = open(inFileName, "w")
        inF.write("seq1 seq3 seq4 \n")
        inF.write("seq2 seq5 \n")
        inF.close()
        dExp = {1:["seq1","seq3","seq4"], 2:["seq2","seq5"]}
        self._iLaunchBlastclust.setTmpFileName(inFileName)
        dObs = self._iLaunchBlastclust.getClustersFromTxtFile()
        self.assertEqual(dObs, dExp)
        os.remove(inFileName)
        
    def test_getClusteringResultsInFasta_without_filtering(self):
        inFileName = "dummyInFile_%s"  % self._uniqId
        inF = open(inFileName, "w")
        inF.write(">seq1\n")
        inF.write("gaattgtttactta\n")
        inF.write(">seq2\n")
        inF.write("gaattgtttactta\n")
        inF.write(">seq3\n")
        inF.write("gaattgtttactta\n")
        inF.write(">seq4\n")
        inF.write("gaattgtttactta\n")
        inF.write(">seq5\n")
        inF.write("gaattgtttactta\n")
        inF.close()
        
        tmpFileName = "%s_blastclust.txt" % self._uniqId
        inF = open(tmpFileName, "w")
        inF.write("seq1 seq3 seq4 \n")
        inF.write("seq2 seq5 \n")
        inF.close()
        self._iLaunchBlastclust.setTmpFileName(tmpFileName)
        
        fileExp = "getClusteringResultsInFastaExpected.fa"
        outF = open(fileExp, "w")
        outF.write(">BlastclustCluster1Mb1_seq1\n")
        outF.write("gaattgtttactta\n")
        outF.write(">BlastclustCluster1Mb2_seq3\n")
        outF.write("gaattgtttactta\n")
        outF.write(">BlastclustCluster1Mb3_seq4\n")
        outF.write("gaattgtttactta\n")
        outF.write(">BlastclustCluster2Mb1_seq2\n")
        outF.write("gaattgtttactta\n")
        outF.write(">BlastclustCluster2Mb2_seq5\n")
        outF.write("gaattgtttactta\n")
        outF.close()
        
        self._iLaunchBlastclust.getClusteringResultsInFasta(inFileName)
        fileObs = "%s_Blastclust.fa" % os.path.splitext(inFileName)[0]
        
        if not FileUtils.are2FilesIdentical(fileObs, fileExp):
            print "Files are different"
            return
        else:
            print "Files are identical\n"

        os.remove(inFileName)
        os.remove(tmpFileName)
        os.remove(fileExp)
        os.remove(fileObs)

    def test_getClusteringResultsInFasta_with_filtering(self):
        inFileName = "dummyInFile_%s"  % self._uniqId
        inF = open(inFileName, "w")
        inF.write(">seq1\n")
        inF.write("gaattgtttactta\n")
        inF.write(">seq2\n")
        inF.write("gaattgtttactta\n")
        inF.write(">seq3\n")
        inF.write("gaattgtttactta\n")
        inF.write(">seq4\n")
        inF.write("gaattgtttactta\n")
        inF.write(">seq5\n")
        inF.write("gaattgtttactta\n")
        inF.close()
        
        tmpFileName = "%s_blastclust.txt" % self._uniqId
        inF = open(tmpFileName, "w")
        inF.write("seq1 seq3 seq4 \n")
        inF.write("seq2\n")
        inF.write("seq5\n")
        inF.close()
        self._iLaunchBlastclust.setTmpFileName(tmpFileName)
        
        fileExp = "getClusteringResultsInFastaExpected.fa"
        outF = open(fileExp, "w")
        outF.write(">BlastclustCluster1Mb1_seq1\n")
        outF.write("gaattgtttactta\n")
        outF.write(">BlastclustCluster1Mb2_seq3\n")
        outF.write("gaattgtttactta\n")
        outF.write(">BlastclustCluster1Mb3_seq4\n")
        outF.write("gaattgtttactta\n")
        outF.close()
        
        self._iLaunchBlastclust.setFilterUnclusteredSequences()
        self._iLaunchBlastclust.getClusteringResultsInFasta(inFileName)
        fileObs = "%s_Blastclust.fa" % os.path.splitext(inFileName)[0]
        
        if not FileUtils.are2FilesIdentical(fileObs, fileExp):
            print "Files are different"
            return
        else:
            print "Files are identical\n"

        os.remove(inFileName)
        os.remove(tmpFileName)
        os.remove(fileExp)
        os.remove(fileObs)

    def test_getLinkInitNewHeaders(self):
        inFileName = "dummyInput_%s.shortHlink" % self._uniqId
        inF = open(inFileName, "w")
        inF.write("seq1\tHeader1\t1\t5193\n")
        inF.write("seq2\tHeader2\t1\t5193\n")
        inF.write("seq3\tHeader3\t1\t5193\n")
        inF.write("seq4\tHeader4\t1\t5193\n")
        inF.close()
        
        self._iLaunchBlastclust.setInputFileName("dummyInput_%s" % self._uniqId)
        dObs = self._iLaunchBlastclust.getLinkInitNewHeaders()
        dExp = {"seq1":"Header1", "seq2":"Header2", "seq3":"Header3", "seq4":"Header4"}
        
        self.assertEqual(dObs, dExp)
        os.remove(inFileName)
        
    def test_retrieveInitHeaders(self):
        dIn = {"seq1":"Header1", "seq2":"Header2", "seq3":"Header3", "seq4":"Header4"}
        
        inFileName = "dummyInFile_%s"  % self._uniqId
        outFilePrefix = self._uniqId
        
        tmpFileName = "%s_blastclust.txt" % outFilePrefix
        inF = open(tmpFileName, "w")
        inF.write("seq1 seq3 seq4\n")
        inF.write("seq2\n")
        inF.close()
        
        shortHFile = "%s.shortH_Blastclust.fa"  % inFileName
        shF = open(shortHFile, "w")
        shF.write(">BlastclustCluster1Mb1_seq1\n")
        shF.write("gaattgtttactta\n")
        shF.write(">BlastclustCluster1Mb2_seq3\n")
        shF.write("gaattgtttactta\n")
        shF.write(">BlastclustCluster1Mb3_seq4\n")
        shF.write("gaattgtttactta\n")
        shF.write(">BlastclustCluster2Mb1_seq2\n")
        shF.write("gaattgtttactta\n")
        shF.close()
        
        fileExp = "retrieveInitHeadersExpected.fa"
        outF = open(fileExp, "w")
        outF.write(">BlastclustCluster1Mb1_Header1\n")
        outF.write("gaattgtttactta\n")
        outF.write(">BlastclustCluster1Mb2_Header3\n")
        outF.write("gaattgtttactta\n")
        outF.write(">BlastclustCluster1Mb3_Header4\n")
        outF.write("gaattgtttactta\n")
        outF.write(">BlastclustCluster2Mb1_Header2\n")
        outF.write("gaattgtttactta\n")
        outF.close()

        self._iLaunchBlastclust.setInputFileName(inFileName)
        self._iLaunchBlastclust.setTmpFileName(tmpFileName)
        self._iLaunchBlastclust.setOutputFilePrefix(outFilePrefix)
        self._iLaunchBlastclust.retrieveInitHeaders(dIn)
        fileObs = "%s_Blastclust.fa" % outFilePrefix
        
        if not FileUtils.are2FilesIdentical(fileObs, fileExp):
            print "Files are different"
            return
        else:
            print "Files are identical\n"
        
        os.remove(fileObs)
        os.remove(fileExp)
        os.remove(tmpFileName)

    def test_filterUnclusteredSequences(self):
        dClusterId2SeqHeaders = {1: ["seq1","seq2"], 2: ["seq3"]}
        dExp = {1: ["seq1","seq2"]}
        dObs = self._iLaunchBlastclust.filterUnclusteredSequences(dClusterId2SeqHeaders)
        self.assertEqual(dObs, dExp)
        
    def test_blastclustToMap(self):
        inFileName = "dummyBlastclustOut_%s.fa"  % self._uniqId
        inF = open(inFileName, "w")
        inF.write(">BlastclustCluster1Mb1_chunk1 (dbseq-nr 1) [1,14]\n")
        inF.write("gaattgtttactta\n")
        inF.write(">BlastclustCluster1Mb2_chunk1 (dbseq-nr 1) [30,44]\n")
        inF.write("gaattgtttactta\n")
        inF.write(">BlastclustCluster2Mb1_chunk2 (dbseq-nr 1) [100,114]\n")
        inF.write("gaattgtttactta\n")
        inF.write(">BlastclustCluster3Mb1_chunk5 (dbseq-nr 8) [1000,1014]\n")
        inF.write("gaattgtttactta")
        inF.close()
        
        fileExp = "blastclustToMapExpected.map"
        outF = open(fileExp, "w")
        outF.write("BlastclustCluster1Mb1\tchunk1\t1\t14\n")
        outF.write("BlastclustCluster1Mb2\tchunk1\t30\t44\n")
        outF.write("BlastclustCluster2Mb1\tchunk2\t100\t114\n")
        outF.write("BlastclustCluster3Mb1\tchunk5\t1000\t1014\n")
        outF.close()
        
        self._iLaunchBlastclust.blastclustToMap(inFileName)
        fileObs = "%s.map" % os.path.splitext(inFileName)[0]
        
        if not FileUtils.are2FilesIdentical(fileObs, fileExp):
            print "Files are different"
            return
        else:
            print "Files are identical\n"
        
        os.remove(inFileName)
        os.remove(fileObs)
        os.remove(fileExp)

if __name__ == "__main__":
        unittest.main()