comparison commons/launcher/tests/Test_LaunchBlastclust.py @ 18:94ab73e8a190

Uploaded
author m-zytnicki
date Mon, 29 Apr 2013 03:20:15 -0400
parents
children
comparison
equal deleted inserted replaced
17:b0e8584489e6 18:94ab73e8a190
1 import unittest
2 import time
3 import os
4 from commons.launcher.LaunchBlastclust import LaunchBlastclust
5 from commons.core.utils.FileUtils import FileUtils
6
7 class Test_LaunchBlastclust( unittest.TestCase ):
8
9 def setUp(self):
10 self._iLaunchBlastclust = LaunchBlastclust()
11 self._iLaunchBlastclust.setClean()
12 self._uniqId = "%s_%s" % (time.strftime("%Y%m%d%H%M%S"), os.getpid())
13
14 def tearDown(self):
15 self._iLaunchBlastclust = None
16 self._uniqId = None
17
18 def test_getClustersFromTxtFile(self):
19 inFileName = "dummyInFile_%s" % self._uniqId
20 inF = open(inFileName, "w")
21 inF.write("seq1 seq3 seq4 \n")
22 inF.write("seq2 seq5 \n")
23 inF.close()
24 dExp = {1:["seq1","seq3","seq4"], 2:["seq2","seq5"]}
25 self._iLaunchBlastclust.setTmpFileName(inFileName)
26 dObs = self._iLaunchBlastclust.getClustersFromTxtFile()
27 self.assertEqual(dObs, dExp)
28 os.remove(inFileName)
29
30 def test_getClusteringResultsInFasta_without_filtering(self):
31 inFileName = "dummyInFile_%s" % self._uniqId
32 inF = open(inFileName, "w")
33 inF.write(">seq1\n")
34 inF.write("gaattgtttactta\n")
35 inF.write(">seq2\n")
36 inF.write("gaattgtttactta\n")
37 inF.write(">seq3\n")
38 inF.write("gaattgtttactta\n")
39 inF.write(">seq4\n")
40 inF.write("gaattgtttactta\n")
41 inF.write(">seq5\n")
42 inF.write("gaattgtttactta\n")
43 inF.close()
44
45 tmpFileName = "%s_blastclust.txt" % self._uniqId
46 inF = open(tmpFileName, "w")
47 inF.write("seq1 seq3 seq4 \n")
48 inF.write("seq2 seq5 \n")
49 inF.close()
50 self._iLaunchBlastclust.setTmpFileName(tmpFileName)
51
52 fileExp = "getClusteringResultsInFastaExpected.fa"
53 outF = open(fileExp, "w")
54 outF.write(">BlastclustCluster1Mb1_seq1\n")
55 outF.write("gaattgtttactta\n")
56 outF.write(">BlastclustCluster1Mb2_seq3\n")
57 outF.write("gaattgtttactta\n")
58 outF.write(">BlastclustCluster1Mb3_seq4\n")
59 outF.write("gaattgtttactta\n")
60 outF.write(">BlastclustCluster2Mb1_seq2\n")
61 outF.write("gaattgtttactta\n")
62 outF.write(">BlastclustCluster2Mb2_seq5\n")
63 outF.write("gaattgtttactta\n")
64 outF.close()
65
66 self._iLaunchBlastclust.getClusteringResultsInFasta(inFileName)
67 fileObs = "%s_Blastclust.fa" % os.path.splitext(inFileName)[0]
68
69 if not FileUtils.are2FilesIdentical(fileObs, fileExp):
70 print "Files are different"
71 return
72 else:
73 print "Files are identical\n"
74
75 os.remove(inFileName)
76 os.remove(tmpFileName)
77 os.remove(fileExp)
78 os.remove(fileObs)
79
80 def test_getClusteringResultsInFasta_with_filtering(self):
81 inFileName = "dummyInFile_%s" % self._uniqId
82 inF = open(inFileName, "w")
83 inF.write(">seq1\n")
84 inF.write("gaattgtttactta\n")
85 inF.write(">seq2\n")
86 inF.write("gaattgtttactta\n")
87 inF.write(">seq3\n")
88 inF.write("gaattgtttactta\n")
89 inF.write(">seq4\n")
90 inF.write("gaattgtttactta\n")
91 inF.write(">seq5\n")
92 inF.write("gaattgtttactta\n")
93 inF.close()
94
95 tmpFileName = "%s_blastclust.txt" % self._uniqId
96 inF = open(tmpFileName, "w")
97 inF.write("seq1 seq3 seq4 \n")
98 inF.write("seq2\n")
99 inF.write("seq5\n")
100 inF.close()
101 self._iLaunchBlastclust.setTmpFileName(tmpFileName)
102
103 fileExp = "getClusteringResultsInFastaExpected.fa"
104 outF = open(fileExp, "w")
105 outF.write(">BlastclustCluster1Mb1_seq1\n")
106 outF.write("gaattgtttactta\n")
107 outF.write(">BlastclustCluster1Mb2_seq3\n")
108 outF.write("gaattgtttactta\n")
109 outF.write(">BlastclustCluster1Mb3_seq4\n")
110 outF.write("gaattgtttactta\n")
111 outF.close()
112
113 self._iLaunchBlastclust.setFilterUnclusteredSequences()
114 self._iLaunchBlastclust.getClusteringResultsInFasta(inFileName)
115 fileObs = "%s_Blastclust.fa" % os.path.splitext(inFileName)[0]
116
117 if not FileUtils.are2FilesIdentical(fileObs, fileExp):
118 print "Files are different"
119 return
120 else:
121 print "Files are identical\n"
122
123 os.remove(inFileName)
124 os.remove(tmpFileName)
125 os.remove(fileExp)
126 os.remove(fileObs)
127
128 def test_getLinkInitNewHeaders(self):
129 inFileName = "dummyInput_%s.shortHlink" % self._uniqId
130 inF = open(inFileName, "w")
131 inF.write("seq1\tHeader1\t1\t5193\n")
132 inF.write("seq2\tHeader2\t1\t5193\n")
133 inF.write("seq3\tHeader3\t1\t5193\n")
134 inF.write("seq4\tHeader4\t1\t5193\n")
135 inF.close()
136
137 self._iLaunchBlastclust.setInputFileName("dummyInput_%s" % self._uniqId)
138 dObs = self._iLaunchBlastclust.getLinkInitNewHeaders()
139 dExp = {"seq1":"Header1", "seq2":"Header2", "seq3":"Header3", "seq4":"Header4"}
140
141 self.assertEqual(dObs, dExp)
142 os.remove(inFileName)
143
144 def test_retrieveInitHeaders(self):
145 dIn = {"seq1":"Header1", "seq2":"Header2", "seq3":"Header3", "seq4":"Header4"}
146
147 inFileName = "dummyInFile_%s" % self._uniqId
148 outFilePrefix = self._uniqId
149
150 tmpFileName = "%s_blastclust.txt" % outFilePrefix
151 inF = open(tmpFileName, "w")
152 inF.write("seq1 seq3 seq4\n")
153 inF.write("seq2\n")
154 inF.close()
155
156 shortHFile = "%s.shortH_Blastclust.fa" % inFileName
157 shF = open(shortHFile, "w")
158 shF.write(">BlastclustCluster1Mb1_seq1\n")
159 shF.write("gaattgtttactta\n")
160 shF.write(">BlastclustCluster1Mb2_seq3\n")
161 shF.write("gaattgtttactta\n")
162 shF.write(">BlastclustCluster1Mb3_seq4\n")
163 shF.write("gaattgtttactta\n")
164 shF.write(">BlastclustCluster2Mb1_seq2\n")
165 shF.write("gaattgtttactta\n")
166 shF.close()
167
168 fileExp = "retrieveInitHeadersExpected.fa"
169 outF = open(fileExp, "w")
170 outF.write(">BlastclustCluster1Mb1_Header1\n")
171 outF.write("gaattgtttactta\n")
172 outF.write(">BlastclustCluster1Mb2_Header3\n")
173 outF.write("gaattgtttactta\n")
174 outF.write(">BlastclustCluster1Mb3_Header4\n")
175 outF.write("gaattgtttactta\n")
176 outF.write(">BlastclustCluster2Mb1_Header2\n")
177 outF.write("gaattgtttactta\n")
178 outF.close()
179
180 self._iLaunchBlastclust.setInputFileName(inFileName)
181 self._iLaunchBlastclust.setTmpFileName(tmpFileName)
182 self._iLaunchBlastclust.setOutputFilePrefix(outFilePrefix)
183 self._iLaunchBlastclust.retrieveInitHeaders(dIn)
184 fileObs = "%s_Blastclust.fa" % outFilePrefix
185
186 if not FileUtils.are2FilesIdentical(fileObs, fileExp):
187 print "Files are different"
188 return
189 else:
190 print "Files are identical\n"
191
192 os.remove(fileObs)
193 os.remove(fileExp)
194 os.remove(tmpFileName)
195
196 def test_filterUnclusteredSequences(self):
197 dClusterId2SeqHeaders = {1: ["seq1","seq2"], 2: ["seq3"]}
198 dExp = {1: ["seq1","seq2"]}
199 dObs = self._iLaunchBlastclust.filterUnclusteredSequences(dClusterId2SeqHeaders)
200 self.assertEqual(dObs, dExp)
201
202 def test_blastclustToMap(self):
203 inFileName = "dummyBlastclustOut_%s.fa" % self._uniqId
204 inF = open(inFileName, "w")
205 inF.write(">BlastclustCluster1Mb1_chunk1 (dbseq-nr 1) [1,14]\n")
206 inF.write("gaattgtttactta\n")
207 inF.write(">BlastclustCluster1Mb2_chunk1 (dbseq-nr 1) [30,44]\n")
208 inF.write("gaattgtttactta\n")
209 inF.write(">BlastclustCluster2Mb1_chunk2 (dbseq-nr 1) [100,114]\n")
210 inF.write("gaattgtttactta\n")
211 inF.write(">BlastclustCluster3Mb1_chunk5 (dbseq-nr 8) [1000,1014]\n")
212 inF.write("gaattgtttactta")
213 inF.close()
214
215 fileExp = "blastclustToMapExpected.map"
216 outF = open(fileExp, "w")
217 outF.write("BlastclustCluster1Mb1\tchunk1\t1\t14\n")
218 outF.write("BlastclustCluster1Mb2\tchunk1\t30\t44\n")
219 outF.write("BlastclustCluster2Mb1\tchunk2\t100\t114\n")
220 outF.write("BlastclustCluster3Mb1\tchunk5\t1000\t1014\n")
221 outF.close()
222
223 self._iLaunchBlastclust.blastclustToMap(inFileName)
224 fileObs = "%s.map" % os.path.splitext(inFileName)[0]
225
226 if not FileUtils.are2FilesIdentical(fileObs, fileExp):
227 print "Files are different"
228 return
229 else:
230 print "Files are identical\n"
231
232 os.remove(inFileName)
233 os.remove(fileObs)
234 os.remove(fileExp)
235
236 if __name__ == "__main__":
237 unittest.main()