diff commons/launcher/tests/Test_LaunchBlastclust.py @ 18:94ab73e8a190

Uploaded
author m-zytnicki
date Mon, 29 Apr 2013 03:20:15 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/commons/launcher/tests/Test_LaunchBlastclust.py	Mon Apr 29 03:20:15 2013 -0400
@@ -0,0 +1,237 @@
+import unittest
+import time
+import os
+from commons.launcher.LaunchBlastclust import LaunchBlastclust
+from commons.core.utils.FileUtils import FileUtils
+
+class Test_LaunchBlastclust( unittest.TestCase ):
+
+    def setUp(self):
+        self._iLaunchBlastclust = LaunchBlastclust()
+        self._iLaunchBlastclust.setClean()
+        self._uniqId = "%s_%s" % (time.strftime("%Y%m%d%H%M%S"), os.getpid())
+
+    def tearDown(self):
+        self._iLaunchBlastclust = None
+        self._uniqId = None
+
+    def test_getClustersFromTxtFile(self):
+        inFileName = "dummyInFile_%s"  % self._uniqId
+        inF = open(inFileName, "w")
+        inF.write("seq1 seq3 seq4 \n")
+        inF.write("seq2 seq5 \n")
+        inF.close()
+        dExp = {1:["seq1","seq3","seq4"], 2:["seq2","seq5"]}
+        self._iLaunchBlastclust.setTmpFileName(inFileName)
+        dObs = self._iLaunchBlastclust.getClustersFromTxtFile()
+        self.assertEqual(dObs, dExp)
+        os.remove(inFileName)
+        
+    def test_getClusteringResultsInFasta_without_filtering(self):
+        inFileName = "dummyInFile_%s"  % self._uniqId
+        inF = open(inFileName, "w")
+        inF.write(">seq1\n")
+        inF.write("gaattgtttactta\n")
+        inF.write(">seq2\n")
+        inF.write("gaattgtttactta\n")
+        inF.write(">seq3\n")
+        inF.write("gaattgtttactta\n")
+        inF.write(">seq4\n")
+        inF.write("gaattgtttactta\n")
+        inF.write(">seq5\n")
+        inF.write("gaattgtttactta\n")
+        inF.close()
+        
+        tmpFileName = "%s_blastclust.txt" % self._uniqId
+        inF = open(tmpFileName, "w")
+        inF.write("seq1 seq3 seq4 \n")
+        inF.write("seq2 seq5 \n")
+        inF.close()
+        self._iLaunchBlastclust.setTmpFileName(tmpFileName)
+        
+        fileExp = "getClusteringResultsInFastaExpected.fa"
+        outF = open(fileExp, "w")
+        outF.write(">BlastclustCluster1Mb1_seq1\n")
+        outF.write("gaattgtttactta\n")
+        outF.write(">BlastclustCluster1Mb2_seq3\n")
+        outF.write("gaattgtttactta\n")
+        outF.write(">BlastclustCluster1Mb3_seq4\n")
+        outF.write("gaattgtttactta\n")
+        outF.write(">BlastclustCluster2Mb1_seq2\n")
+        outF.write("gaattgtttactta\n")
+        outF.write(">BlastclustCluster2Mb2_seq5\n")
+        outF.write("gaattgtttactta\n")
+        outF.close()
+        
+        self._iLaunchBlastclust.getClusteringResultsInFasta(inFileName)
+        fileObs = "%s_Blastclust.fa" % os.path.splitext(inFileName)[0]
+        
+        if not FileUtils.are2FilesIdentical(fileObs, fileExp):
+            print "Files are different"
+            return
+        else:
+            print "Files are identical\n"
+
+        os.remove(inFileName)
+        os.remove(tmpFileName)
+        os.remove(fileExp)
+        os.remove(fileObs)
+
+    def test_getClusteringResultsInFasta_with_filtering(self):
+        inFileName = "dummyInFile_%s"  % self._uniqId
+        inF = open(inFileName, "w")
+        inF.write(">seq1\n")
+        inF.write("gaattgtttactta\n")
+        inF.write(">seq2\n")
+        inF.write("gaattgtttactta\n")
+        inF.write(">seq3\n")
+        inF.write("gaattgtttactta\n")
+        inF.write(">seq4\n")
+        inF.write("gaattgtttactta\n")
+        inF.write(">seq5\n")
+        inF.write("gaattgtttactta\n")
+        inF.close()
+        
+        tmpFileName = "%s_blastclust.txt" % self._uniqId
+        inF = open(tmpFileName, "w")
+        inF.write("seq1 seq3 seq4 \n")
+        inF.write("seq2\n")
+        inF.write("seq5\n")
+        inF.close()
+        self._iLaunchBlastclust.setTmpFileName(tmpFileName)
+        
+        fileExp = "getClusteringResultsInFastaExpected.fa"
+        outF = open(fileExp, "w")
+        outF.write(">BlastclustCluster1Mb1_seq1\n")
+        outF.write("gaattgtttactta\n")
+        outF.write(">BlastclustCluster1Mb2_seq3\n")
+        outF.write("gaattgtttactta\n")
+        outF.write(">BlastclustCluster1Mb3_seq4\n")
+        outF.write("gaattgtttactta\n")
+        outF.close()
+        
+        self._iLaunchBlastclust.setFilterUnclusteredSequences()
+        self._iLaunchBlastclust.getClusteringResultsInFasta(inFileName)
+        fileObs = "%s_Blastclust.fa" % os.path.splitext(inFileName)[0]
+        
+        if not FileUtils.are2FilesIdentical(fileObs, fileExp):
+            print "Files are different"
+            return
+        else:
+            print "Files are identical\n"
+
+        os.remove(inFileName)
+        os.remove(tmpFileName)
+        os.remove(fileExp)
+        os.remove(fileObs)
+
+    def test_getLinkInitNewHeaders(self):
+        inFileName = "dummyInput_%s.shortHlink" % self._uniqId
+        inF = open(inFileName, "w")
+        inF.write("seq1\tHeader1\t1\t5193\n")
+        inF.write("seq2\tHeader2\t1\t5193\n")
+        inF.write("seq3\tHeader3\t1\t5193\n")
+        inF.write("seq4\tHeader4\t1\t5193\n")
+        inF.close()
+        
+        self._iLaunchBlastclust.setInputFileName("dummyInput_%s" % self._uniqId)
+        dObs = self._iLaunchBlastclust.getLinkInitNewHeaders()
+        dExp = {"seq1":"Header1", "seq2":"Header2", "seq3":"Header3", "seq4":"Header4"}
+        
+        self.assertEqual(dObs, dExp)
+        os.remove(inFileName)
+        
+    def test_retrieveInitHeaders(self):
+        dIn = {"seq1":"Header1", "seq2":"Header2", "seq3":"Header3", "seq4":"Header4"}
+        
+        inFileName = "dummyInFile_%s"  % self._uniqId
+        outFilePrefix = self._uniqId
+        
+        tmpFileName = "%s_blastclust.txt" % outFilePrefix
+        inF = open(tmpFileName, "w")
+        inF.write("seq1 seq3 seq4\n")
+        inF.write("seq2\n")
+        inF.close()
+        
+        shortHFile = "%s.shortH_Blastclust.fa"  % inFileName
+        shF = open(shortHFile, "w")
+        shF.write(">BlastclustCluster1Mb1_seq1\n")
+        shF.write("gaattgtttactta\n")
+        shF.write(">BlastclustCluster1Mb2_seq3\n")
+        shF.write("gaattgtttactta\n")
+        shF.write(">BlastclustCluster1Mb3_seq4\n")
+        shF.write("gaattgtttactta\n")
+        shF.write(">BlastclustCluster2Mb1_seq2\n")
+        shF.write("gaattgtttactta\n")
+        shF.close()
+        
+        fileExp = "retrieveInitHeadersExpected.fa"
+        outF = open(fileExp, "w")
+        outF.write(">BlastclustCluster1Mb1_Header1\n")
+        outF.write("gaattgtttactta\n")
+        outF.write(">BlastclustCluster1Mb2_Header3\n")
+        outF.write("gaattgtttactta\n")
+        outF.write(">BlastclustCluster1Mb3_Header4\n")
+        outF.write("gaattgtttactta\n")
+        outF.write(">BlastclustCluster2Mb1_Header2\n")
+        outF.write("gaattgtttactta\n")
+        outF.close()
+
+        self._iLaunchBlastclust.setInputFileName(inFileName)
+        self._iLaunchBlastclust.setTmpFileName(tmpFileName)
+        self._iLaunchBlastclust.setOutputFilePrefix(outFilePrefix)
+        self._iLaunchBlastclust.retrieveInitHeaders(dIn)
+        fileObs = "%s_Blastclust.fa" % outFilePrefix
+        
+        if not FileUtils.are2FilesIdentical(fileObs, fileExp):
+            print "Files are different"
+            return
+        else:
+            print "Files are identical\n"
+        
+        os.remove(fileObs)
+        os.remove(fileExp)
+        os.remove(tmpFileName)
+
+    def test_filterUnclusteredSequences(self):
+        dClusterId2SeqHeaders = {1: ["seq1","seq2"], 2: ["seq3"]}
+        dExp = {1: ["seq1","seq2"]}
+        dObs = self._iLaunchBlastclust.filterUnclusteredSequences(dClusterId2SeqHeaders)
+        self.assertEqual(dObs, dExp)
+        
+    def test_blastclustToMap(self):
+        inFileName = "dummyBlastclustOut_%s.fa"  % self._uniqId
+        inF = open(inFileName, "w")
+        inF.write(">BlastclustCluster1Mb1_chunk1 (dbseq-nr 1) [1,14]\n")
+        inF.write("gaattgtttactta\n")
+        inF.write(">BlastclustCluster1Mb2_chunk1 (dbseq-nr 1) [30,44]\n")
+        inF.write("gaattgtttactta\n")
+        inF.write(">BlastclustCluster2Mb1_chunk2 (dbseq-nr 1) [100,114]\n")
+        inF.write("gaattgtttactta\n")
+        inF.write(">BlastclustCluster3Mb1_chunk5 (dbseq-nr 8) [1000,1014]\n")
+        inF.write("gaattgtttactta")
+        inF.close()
+        
+        fileExp = "blastclustToMapExpected.map"
+        outF = open(fileExp, "w")
+        outF.write("BlastclustCluster1Mb1\tchunk1\t1\t14\n")
+        outF.write("BlastclustCluster1Mb2\tchunk1\t30\t44\n")
+        outF.write("BlastclustCluster2Mb1\tchunk2\t100\t114\n")
+        outF.write("BlastclustCluster3Mb1\tchunk5\t1000\t1014\n")
+        outF.close()
+        
+        self._iLaunchBlastclust.blastclustToMap(inFileName)
+        fileObs = "%s.map" % os.path.splitext(inFileName)[0]
+        
+        if not FileUtils.are2FilesIdentical(fileObs, fileExp):
+            print "Files are different"
+            return
+        else:
+            print "Files are identical\n"
+        
+        os.remove(inFileName)
+        os.remove(fileObs)
+        os.remove(fileExp)
+
+if __name__ == "__main__":
+        unittest.main()
\ No newline at end of file