Mercurial > repos > yufei-luo > s_mart
comparison commons/core/parsing/test/Test_Multifasta2SNPFileWriter.py @ 6:769e306b7933
Change the repository level.
author | yufei-luo |
---|---|
date | Fri, 18 Jan 2013 04:54:14 -0500 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
5:ea3082881bf8 | 6:769e306b7933 |
---|---|
1 from commons.core.utils.FileUtils import FileUtils | |
2 from commons.core.seq.BioseqDB import BioseqDB | |
3 from commons.core.seq.Bioseq import Bioseq | |
4 from commons.core.parsing.Multifasta2SNPFile import Multifasta2SNPFileWriter | |
5 from commons.core.parsing.Multifasta2SNPFile import Multifasta2SNPFile | |
6 from commons.core.parsing.Multifasta2SNPFile import ReferenceBioseqAndLinesBioseqDBWrapper | |
7 from commons.core.LoggerFactory import LoggerFactory | |
8 import os | |
9 import logging | |
10 import unittest | |
11 | |
12 class Test_Multifasta2SNPFileWriter(unittest.TestCase): | |
13 | |
14 def setUp(self): | |
15 self._obsSubSNPFile = "SubSNP.csv" | |
16 self._expSubSNPFile = "ExpSubSNP.csv" | |
17 | |
18 self._obsAlleleFile = "Allele.csv" | |
19 self._expAlleleFile = "ExpAllele.csv" | |
20 | |
21 self._obsIndividualFile = "Individual.csv" | |
22 self._expIndividualFile = "ExpIndividual.csv" | |
23 | |
24 self._obsSequenceFSAFile = "Sequences.fsa" | |
25 self._expSequenceFSAFile = "ExpSequences.fsa" | |
26 | |
27 self._obsSequenceCSVFile = "Sequences.csv" | |
28 self._expSequenceCSVFile = "ExpSequences.csv" | |
29 | |
30 self._obsBatchFile = "Batch.txt" | |
31 self._expBatchFile = "ExpBatch.txt" | |
32 | |
33 self._obsBatchLineFile = "BatchLine.csv" | |
34 self._expBatchLineFile = "ExpBatchLine.csv" | |
35 | |
36 self._logFileName = "Test_Multifasta2SNPWriter.log" | |
37 | |
38 self._inputFileName = "multifasta.fsa" | |
39 | |
40 self._lSNPResult = [] | |
41 self._dAlleleResult = {} | |
42 self._lIndividualResult = [] | |
43 self._refSeq = Bioseq() | |
44 self._seqDb= BioseqDB() | |
45 | |
46 self._logFile = LoggerFactory.createLogger(self._logFileName, logging.INFO, "%(asctime)s %(levelname)s: %(message)s") | |
47 self._lSequenceWrapper = ReferenceBioseqAndLinesBioseqDBWrapper(self._refSeq, self._seqDb, self._logFile, self._inputFileName) | |
48 self._lBatchLineResults = [] | |
49 | |
50 self._Multifasta2SNPFileWriter = Multifasta2SNPFileWriter() | |
51 | |
52 self._inFileName = "multifasta.txt" | |
53 self._taxon = "Arabidopsis thaliana" | |
54 | |
55 def tearDown(self): | |
56 if FileUtils.isRessourceExists(self._inFileName): | |
57 os.remove(self._inFileName) | |
58 if FileUtils.isRessourceExists("multifasta2SNP.log"): | |
59 os.remove("multifasta2SNP.log") | |
60 if FileUtils.isRessourceExists("Test_Multifasta2SNPWriter.log"): | |
61 os.remove("Test_Multifasta2SNPWriter.log") | |
62 | |
63 if FileUtils.isRessourceExists(self._obsSubSNPFile): | |
64 os.remove(self._obsSubSNPFile) | |
65 if FileUtils.isRessourceExists(self._expSubSNPFile): | |
66 os.remove(self._expSubSNPFile) | |
67 | |
68 if FileUtils.isRessourceExists(self._obsAlleleFile): | |
69 os.remove(self._obsAlleleFile) | |
70 if FileUtils.isRessourceExists(self._expAlleleFile): | |
71 os.remove(self._expAlleleFile) | |
72 | |
73 if FileUtils.isRessourceExists(self._obsIndividualFile): | |
74 os.remove(self._obsIndividualFile) | |
75 if FileUtils.isRessourceExists(self._expIndividualFile): | |
76 os.remove(self._expIndividualFile) | |
77 | |
78 if FileUtils.isRessourceExists(self._obsSequenceFSAFile): | |
79 os.remove(self._obsSequenceFSAFile) | |
80 if FileUtils.isRessourceExists(self._expSequenceFSAFile): | |
81 os.remove(self._expSequenceFSAFile) | |
82 | |
83 if FileUtils.isRessourceExists(self._obsSequenceCSVFile): | |
84 os.remove(self._obsSequenceCSVFile) | |
85 if FileUtils.isRessourceExists(self._expSequenceCSVFile): | |
86 os.remove(self._expSequenceCSVFile) | |
87 | |
88 if FileUtils.isRessourceExists(self._obsBatchFile): | |
89 FileUtils.removeFilesByPattern(self._obsBatchFile) | |
90 if FileUtils.isRessourceExists(self._expBatchFile): | |
91 FileUtils.removeFilesByPattern(self._expBatchFile) | |
92 | |
93 if FileUtils.isRessourceExists(self._obsBatchLineFile): | |
94 FileUtils.removeFilesByPattern(self._obsBatchLineFile) | |
95 if FileUtils.isRessourceExists(self._expBatchLineFile): | |
96 FileUtils.removeFilesByPattern(self._expBatchLineFile) | |
97 | |
98 def test_writeSubSNPFileWithSubSNPList(self): | |
99 self._lSNPResult = [{'subSNPName': "SubSNP1", '5flank': "A", '3flank': "T", 'position': 1, 'lineName': "1", 'allele': 1, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1}, | |
100 {'subSNPName': "SubSNP2", '5flank': "T", '3flank': "A", 'position': 10, 'lineName': "1", 'allele': 2, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1}, | |
101 {'subSNPName': "SubSNP3", '5flank': "T", '3flank': "A", 'position': 20, 'lineName': "2", 'allele': 3, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1}] | |
102 | |
103 self._writeExpSubSNPFile() | |
104 self._Multifasta2SNPFileWriter._writeSubSNPFile(self._obsSubSNPFile, self._lSNPResult) | |
105 | |
106 self.assertTrue(FileUtils.isRessourceExists(self._obsSubSNPFile)) | |
107 self.assertTrue(FileUtils.are2FilesIdentical(self._expSubSNPFile, self._obsSubSNPFile)) | |
108 | |
109 def test_writeAlleleFileWithAlleleDict(self): | |
110 self._dAlleleResult['A'] = 1 | |
111 self._dAlleleResult['C'] = 2 | |
112 self._dAlleleResult['T'] = 3 | |
113 | |
114 self._writeExpAlleleFile() | |
115 self._Multifasta2SNPFileWriter._writeAlleleFile(self._obsAlleleFile, self._dAlleleResult) | |
116 | |
117 self.assertTrue(FileUtils.isRessourceExists(self._obsAlleleFile)) | |
118 self.assertTrue(FileUtils.are2FilesIdentical(self._expAlleleFile, self._obsAlleleFile)) | |
119 | |
120 def test_writeIndividualFileWithIndivList(self): | |
121 self._lIndividualResult = [{'individualNumber': 1, 'individualName': "Individual1", 'scientificName': "Arabidopsis thaliana"}, | |
122 {'individualNumber': 2, 'individualName': "Individual2", 'scientificName': "Arabidopsis thaliana"}] | |
123 | |
124 self._writeExpIndividualFile() | |
125 | |
126 self._Multifasta2SNPFileWriter._writeIndividualFile(self._obsIndividualFile, self._lIndividualResult) | |
127 | |
128 self.assertTrue(FileUtils.isRessourceExists(self._obsIndividualFile)) | |
129 self.assertTrue(FileUtils.are2FilesIdentical(self._expIndividualFile, self._obsIndividualFile)) | |
130 | |
131 def test_writeSequenceFilesWithSequenceWrapper(self): | |
132 self._writeInputFile() | |
133 self._writeExpSequenceFiles() | |
134 batchName = "batch1" | |
135 taxon = "Arabidopsis thaliana" | |
136 gene = "methyltransferase" | |
137 multifasta2SNPFile = Multifasta2SNPFile(batchName, gene, taxon) | |
138 self._lSequenceWrapper = multifasta2SNPFile.createWrapperFromFile(self._inFileName) | |
139 lRefseq = [] | |
140 lRefseq.append(self._lSequenceWrapper._iReferenceBioseq) | |
141 self._Multifasta2SNPFileWriter._writeSequenceFiles(self._obsSequenceFSAFile, self._obsSequenceCSVFile, lRefseq, taxon) | |
142 | |
143 self.assertTrue(FileUtils.isRessourceExists(self._obsSequenceFSAFile)) | |
144 self.assertTrue(FileUtils.are2FilesIdentical(self._expSequenceFSAFile, self._obsSequenceFSAFile)) | |
145 self.assertTrue(FileUtils.isRessourceExists(self._obsSequenceCSVFile)) | |
146 self.assertTrue(FileUtils.are2FilesIdentical(self._expSequenceCSVFile, self._obsSequenceCSVFile)) | |
147 | |
148 def test_writeBatchFile(self): | |
149 self._dBatchResults = {'BatchNumber': "1", 'BatchName': "batch1", 'GeneName': "gene1", 'RefSeqName': "Sequence de Reference"} | |
150 lBatchResults = [] | |
151 lBatchResults.append(self._dBatchResults) | |
152 self._writeExpBatchFile() | |
153 self._Multifasta2SNPFileWriter._writeBatchFile(self._obsBatchFile, lBatchResults) | |
154 self.assertTrue(FileUtils.isRessourceExists(self._obsBatchFile)) | |
155 self.assertTrue(FileUtils.are2FilesIdentical(self._expBatchFile, self._obsBatchFile)) | |
156 | |
157 def test_writeBatchLineFile(self): | |
158 self._lBatchLineResults = [{'IndividualNumber': "1", 'BatchNumber': "1"}, | |
159 {'IndividualNumber': "2", 'BatchNumber': "1"}] | |
160 self._writeExpBatchLineFile() | |
161 self._Multifasta2SNPFileWriter._writeBatchLineFile(self._obsBatchLineFile, self._lBatchLineResults) | |
162 self.assertTrue(FileUtils.isRessourceExists(self._obsBatchLineFile)) | |
163 self.assertTrue(FileUtils.are2FilesIdentical(self._expBatchLineFile, self._obsBatchLineFile)) | |
164 | |
165 def test_sortAlleleResultByAlleleNumber(self): | |
166 dAlleleResults = {'A': 3, | |
167 'G': 1, | |
168 'C': 2} | |
169 | |
170 lExpAlleleSortedList = [('G', 1), | |
171 ('C', 2), | |
172 ('A', 3)] | |
173 | |
174 lObsAlleleSortedList = self._Multifasta2SNPFileWriter.sortAlleleResultByAlleleNumber(dAlleleResults) | |
175 self.assertEquals(lExpAlleleSortedList, lObsAlleleSortedList) | |
176 | |
177 def test_write(self): | |
178 | |
179 self._writeInputFile() | |
180 batchName = "batch1" | |
181 taxon = "Arabidopsis thaliana" | |
182 gene = "methyltransferase" | |
183 multifasta2SNPFile = Multifasta2SNPFile(taxon, batchName, gene) | |
184 self._lSequenceWrapper = multifasta2SNPFile.createWrapperFromFile(self._inFileName) | |
185 | |
186 | |
187 multifasta2SNPFile._lSubSNPFileResults = [{'subSNPName': "SubSNP1", '5flank': "A", '3flank': "T", 'position': 1, 'lineName': "1", 'allele': 1, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1}, | |
188 {'subSNPName': "SubSNP2", '5flank': "T", '3flank': "A", 'position': 10, 'lineName': "1", 'allele': 2, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1}, | |
189 {'subSNPName': "SubSNP3", '5flank': "T", '3flank': "A", 'position': 20, 'lineName': "2", 'allele': 3, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1}] | |
190 multifasta2SNPFile._dAlleleFileResults['A'] = 1 | |
191 multifasta2SNPFile._dAlleleFileResults['C'] = 2 | |
192 multifasta2SNPFile._dAlleleFileResults['T'] = 3 | |
193 | |
194 multifasta2SNPFile._lIndividualFileResults = [{'individualNumber': 1, 'individualName': "Individual1", 'scientificName': "Arabidopsis thaliana"}, | |
195 {'individualNumber': 2, 'individualName': "Individual2", 'scientificName': "Arabidopsis thaliana"}] | |
196 | |
197 multifasta2SNPFile._lBatchFileResults = [{'BatchNumber': "1", 'BatchName': "batch1", 'GeneName': "gene1", 'RefSeqName': "Sequence de Reference"}] | |
198 | |
199 multifasta2SNPFile._lBatchLineFileResults = [{'IndividualNumber': "1", 'BatchNumber': "1"}, | |
200 {'IndividualNumber': "2", 'BatchNumber': "1"}] | |
201 | |
202 | |
203 self._writeExpSubSNPFile() | |
204 self._writeExpAlleleFile() | |
205 self._writeExpIndividualFile() | |
206 self._writeExpSequenceFiles() | |
207 self._writeExpBatchFile() | |
208 self._writeExpBatchLineFile() | |
209 | |
210 self._Multifasta2SNPFileWriter.write(multifasta2SNPFile) | |
211 self.assertTrue(FileUtils.isRessourceExists(self._obsSubSNPFile)) | |
212 self.assertTrue(FileUtils.are2FilesIdentical(self._expSubSNPFile, self._obsSubSNPFile)) | |
213 self.assertTrue(FileUtils.isRessourceExists(self._obsAlleleFile)) | |
214 self.assertTrue(FileUtils.are2FilesIdentical(self._expAlleleFile, self._obsAlleleFile)) | |
215 self.assertTrue(FileUtils.isRessourceExists(self._obsIndividualFile)) | |
216 self.assertTrue(FileUtils.are2FilesIdentical(self._expIndividualFile, self._obsIndividualFile)) | |
217 self.assertTrue(FileUtils.isRessourceExists(self._obsSequenceFSAFile)) | |
218 self.assertTrue(FileUtils.are2FilesIdentical(self._expSequenceFSAFile, self._obsSequenceFSAFile)) | |
219 self.assertTrue(FileUtils.isRessourceExists(self._obsSequenceCSVFile)) | |
220 self.assertTrue(FileUtils.are2FilesIdentical(self._expSequenceCSVFile, self._obsSequenceCSVFile)) | |
221 self.assertTrue(FileUtils.isRessourceExists(self._obsBatchFile)) | |
222 self.assertTrue(FileUtils.are2FilesIdentical(self._expBatchFile, self._obsBatchFile)) | |
223 self.assertTrue(FileUtils.isRessourceExists(self._obsBatchLineFile)) | |
224 self.assertTrue(FileUtils.are2FilesIdentical(self._expBatchLineFile, self._obsBatchLineFile)) | |
225 | |
226 def _writeExpSubSNPFile(self): | |
227 expFile = open(self._expSubSNPFile, "w") | |
228 expFile.write("SubSNPName;ConfidenceValue;Type;Position;5flank;3flank;Length;BatchNumber;IndividualNumber;PrimerType;PrimerNumber;Forward_or_Reverse;AlleleNumber\n") | |
229 expFile.write("SubSNP1;A;SNP;1;A;T;1;1;1;Sequence;;;1\n") | |
230 expFile.write("SubSNP2;A;SNP;10;T;A;1;1;1;Sequence;;;2\n") | |
231 expFile.write("SubSNP3;A;SNP;20;T;A;1;1;2;Sequence;;;3\n") | |
232 expFile.close() | |
233 | |
234 def _writeExpAlleleFile(self): | |
235 expFile = open(self._expAlleleFile, "w") | |
236 expFile.write("AlleleNumber;Value;Motif;NbCopy;Comment\n") | |
237 expFile.write("1;A;;;\n") | |
238 expFile.write("2;C;;;\n") | |
239 expFile.write("3;T;;;\n") | |
240 expFile.close() | |
241 | |
242 | |
243 def _writeExpIndividualFile(self): | |
244 expFile = open(self._expIndividualFile, "w") | |
245 expFile.write("IndividualNumber;IndividualName;Description;AberrAneuploide;FractionLength;DeletionLineSynthesis;UrlEarImage;TypeLine;ChromNumber;ArmChrom;DeletionBin;ScientificName;local_germplasm_name;submitter_code;local_institute;donor_institute;donor_acc_id\n") | |
246 expFile.write("1;Individual1;;;;;;;;;;Arabidopsis thaliana;;;;;\n") | |
247 expFile.write("2;Individual2;;;;;;;;;;Arabidopsis thaliana;;;;;\n") | |
248 expFile.close() | |
249 | |
250 def _writeInputFile(self): | |
251 inFileHandle = open(self._inFileName, "w") | |
252 inFileHandle.write(">Sequence_de_Reference\n") | |
253 inFileHandle.write("CCTAAGCCATTGCTTGGTGATTATGAAGGCAGTAGTCAAACCTCCACAATC\n") | |
254 inFileHandle.write(">Line1\n") | |
255 inFileHandle.write("CCTTAGCCATTGCTTGGTGACTATGAAGGCAGTAGGCAAACCTCCACAATC\n") | |
256 inFileHandle.write(">Line2\n") | |
257 inFileHandle.write("CCTAAGCCATTGCTTGGTGACTATCAAGGCAGTAGCCAAACCTCCACAATA") | |
258 inFileHandle.close() | |
259 | |
260 def _writeExpSequenceFiles(self): | |
261 SequenceFSAFileHandle = open(self._expSequenceFSAFile, "w") | |
262 SequenceFSAFileHandle.write(">Sequence_de_Reference\n") | |
263 SequenceFSAFileHandle.write("CCTAAGCCATTGCTTGGTGATTATGAAGGCAGTAGTCAAACCTCCACAATC\n") | |
264 SequenceFSAFileHandle.close() | |
265 SequenceCSVFileHandle = open(self._expSequenceCSVFile, "w") | |
266 SequenceCSVFileHandle.write("SequenceName;SeqType;BankName;BankVersion;ACNumber;Locus;ScientificName\n") | |
267 SequenceCSVFileHandle.write("Sequence_de_Reference;Reference;;;;;Arabidopsis thaliana\n") | |
268 SequenceCSVFileHandle.close() | |
269 | |
270 def _writeExpBatchFile(self): | |
271 BatchFileHandle = open(self._expBatchFile, "w") | |
272 BatchFileHandle.write("BatchNumber: 1\n") | |
273 BatchFileHandle.write("BatchName: batch1\n") | |
274 BatchFileHandle.write("GeneName: gene1\n") | |
275 BatchFileHandle.write("Description: \n") | |
276 BatchFileHandle.write("ContactNumber: \n") | |
277 BatchFileHandle.write("ProtocolNumber: \n") | |
278 BatchFileHandle.write("ThematicNumber: \n") | |
279 BatchFileHandle.write("RefSeqName: Sequence de Reference\n") | |
280 BatchFileHandle.write("AlignmentFileName: \n") | |
281 BatchFileHandle.write("SeqName: \n") | |
282 BatchFileHandle.write("//\n") | |
283 BatchFileHandle.close() | |
284 | |
285 def _writeExpBatchLineFile(self): | |
286 BatchLineFileHandle = open(self._expBatchLineFile, "w") | |
287 BatchLineFileHandle.write("IndividualNumber;Pos5;Pos3;BatchNumber;Sequence\n") | |
288 BatchLineFileHandle.write("1;;;1;\n") | |
289 BatchLineFileHandle.write("2;;;1;\n") | |
290 BatchLineFileHandle.close() | |
291 if __name__ == "__main__": | |
292 unittest.main() |