comparison commons/core/parsing/test/Test_Multifasta2SNPFileWriter.py @ 6:769e306b7933

Change the repository level.
author yufei-luo
date Fri, 18 Jan 2013 04:54:14 -0500
parents
children
comparison
equal deleted inserted replaced
5:ea3082881bf8 6:769e306b7933
1 from commons.core.utils.FileUtils import FileUtils
2 from commons.core.seq.BioseqDB import BioseqDB
3 from commons.core.seq.Bioseq import Bioseq
4 from commons.core.parsing.Multifasta2SNPFile import Multifasta2SNPFileWriter
5 from commons.core.parsing.Multifasta2SNPFile import Multifasta2SNPFile
6 from commons.core.parsing.Multifasta2SNPFile import ReferenceBioseqAndLinesBioseqDBWrapper
7 from commons.core.LoggerFactory import LoggerFactory
8 import os
9 import logging
10 import unittest
11
12 class Test_Multifasta2SNPFileWriter(unittest.TestCase):
13
14 def setUp(self):
15 self._obsSubSNPFile = "SubSNP.csv"
16 self._expSubSNPFile = "ExpSubSNP.csv"
17
18 self._obsAlleleFile = "Allele.csv"
19 self._expAlleleFile = "ExpAllele.csv"
20
21 self._obsIndividualFile = "Individual.csv"
22 self._expIndividualFile = "ExpIndividual.csv"
23
24 self._obsSequenceFSAFile = "Sequences.fsa"
25 self._expSequenceFSAFile = "ExpSequences.fsa"
26
27 self._obsSequenceCSVFile = "Sequences.csv"
28 self._expSequenceCSVFile = "ExpSequences.csv"
29
30 self._obsBatchFile = "Batch.txt"
31 self._expBatchFile = "ExpBatch.txt"
32
33 self._obsBatchLineFile = "BatchLine.csv"
34 self._expBatchLineFile = "ExpBatchLine.csv"
35
36 self._logFileName = "Test_Multifasta2SNPWriter.log"
37
38 self._inputFileName = "multifasta.fsa"
39
40 self._lSNPResult = []
41 self._dAlleleResult = {}
42 self._lIndividualResult = []
43 self._refSeq = Bioseq()
44 self._seqDb= BioseqDB()
45
46 self._logFile = LoggerFactory.createLogger(self._logFileName, logging.INFO, "%(asctime)s %(levelname)s: %(message)s")
47 self._lSequenceWrapper = ReferenceBioseqAndLinesBioseqDBWrapper(self._refSeq, self._seqDb, self._logFile, self._inputFileName)
48 self._lBatchLineResults = []
49
50 self._Multifasta2SNPFileWriter = Multifasta2SNPFileWriter()
51
52 self._inFileName = "multifasta.txt"
53 self._taxon = "Arabidopsis thaliana"
54
55 def tearDown(self):
56 if FileUtils.isRessourceExists(self._inFileName):
57 os.remove(self._inFileName)
58 if FileUtils.isRessourceExists("multifasta2SNP.log"):
59 os.remove("multifasta2SNP.log")
60 if FileUtils.isRessourceExists("Test_Multifasta2SNPWriter.log"):
61 os.remove("Test_Multifasta2SNPWriter.log")
62
63 if FileUtils.isRessourceExists(self._obsSubSNPFile):
64 os.remove(self._obsSubSNPFile)
65 if FileUtils.isRessourceExists(self._expSubSNPFile):
66 os.remove(self._expSubSNPFile)
67
68 if FileUtils.isRessourceExists(self._obsAlleleFile):
69 os.remove(self._obsAlleleFile)
70 if FileUtils.isRessourceExists(self._expAlleleFile):
71 os.remove(self._expAlleleFile)
72
73 if FileUtils.isRessourceExists(self._obsIndividualFile):
74 os.remove(self._obsIndividualFile)
75 if FileUtils.isRessourceExists(self._expIndividualFile):
76 os.remove(self._expIndividualFile)
77
78 if FileUtils.isRessourceExists(self._obsSequenceFSAFile):
79 os.remove(self._obsSequenceFSAFile)
80 if FileUtils.isRessourceExists(self._expSequenceFSAFile):
81 os.remove(self._expSequenceFSAFile)
82
83 if FileUtils.isRessourceExists(self._obsSequenceCSVFile):
84 os.remove(self._obsSequenceCSVFile)
85 if FileUtils.isRessourceExists(self._expSequenceCSVFile):
86 os.remove(self._expSequenceCSVFile)
87
88 if FileUtils.isRessourceExists(self._obsBatchFile):
89 FileUtils.removeFilesByPattern(self._obsBatchFile)
90 if FileUtils.isRessourceExists(self._expBatchFile):
91 FileUtils.removeFilesByPattern(self._expBatchFile)
92
93 if FileUtils.isRessourceExists(self._obsBatchLineFile):
94 FileUtils.removeFilesByPattern(self._obsBatchLineFile)
95 if FileUtils.isRessourceExists(self._expBatchLineFile):
96 FileUtils.removeFilesByPattern(self._expBatchLineFile)
97
98 def test_writeSubSNPFileWithSubSNPList(self):
99 self._lSNPResult = [{'subSNPName': "SubSNP1", '5flank': "A", '3flank': "T", 'position': 1, 'lineName': "1", 'allele': 1, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1},
100 {'subSNPName': "SubSNP2", '5flank': "T", '3flank': "A", 'position': 10, 'lineName': "1", 'allele': 2, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1},
101 {'subSNPName': "SubSNP3", '5flank': "T", '3flank': "A", 'position': 20, 'lineName': "2", 'allele': 3, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1}]
102
103 self._writeExpSubSNPFile()
104 self._Multifasta2SNPFileWriter._writeSubSNPFile(self._obsSubSNPFile, self._lSNPResult)
105
106 self.assertTrue(FileUtils.isRessourceExists(self._obsSubSNPFile))
107 self.assertTrue(FileUtils.are2FilesIdentical(self._expSubSNPFile, self._obsSubSNPFile))
108
109 def test_writeAlleleFileWithAlleleDict(self):
110 self._dAlleleResult['A'] = 1
111 self._dAlleleResult['C'] = 2
112 self._dAlleleResult['T'] = 3
113
114 self._writeExpAlleleFile()
115 self._Multifasta2SNPFileWriter._writeAlleleFile(self._obsAlleleFile, self._dAlleleResult)
116
117 self.assertTrue(FileUtils.isRessourceExists(self._obsAlleleFile))
118 self.assertTrue(FileUtils.are2FilesIdentical(self._expAlleleFile, self._obsAlleleFile))
119
120 def test_writeIndividualFileWithIndivList(self):
121 self._lIndividualResult = [{'individualNumber': 1, 'individualName': "Individual1", 'scientificName': "Arabidopsis thaliana"},
122 {'individualNumber': 2, 'individualName': "Individual2", 'scientificName': "Arabidopsis thaliana"}]
123
124 self._writeExpIndividualFile()
125
126 self._Multifasta2SNPFileWriter._writeIndividualFile(self._obsIndividualFile, self._lIndividualResult)
127
128 self.assertTrue(FileUtils.isRessourceExists(self._obsIndividualFile))
129 self.assertTrue(FileUtils.are2FilesIdentical(self._expIndividualFile, self._obsIndividualFile))
130
131 def test_writeSequenceFilesWithSequenceWrapper(self):
132 self._writeInputFile()
133 self._writeExpSequenceFiles()
134 batchName = "batch1"
135 taxon = "Arabidopsis thaliana"
136 gene = "methyltransferase"
137 multifasta2SNPFile = Multifasta2SNPFile(batchName, gene, taxon)
138 self._lSequenceWrapper = multifasta2SNPFile.createWrapperFromFile(self._inFileName)
139 lRefseq = []
140 lRefseq.append(self._lSequenceWrapper._iReferenceBioseq)
141 self._Multifasta2SNPFileWriter._writeSequenceFiles(self._obsSequenceFSAFile, self._obsSequenceCSVFile, lRefseq, taxon)
142
143 self.assertTrue(FileUtils.isRessourceExists(self._obsSequenceFSAFile))
144 self.assertTrue(FileUtils.are2FilesIdentical(self._expSequenceFSAFile, self._obsSequenceFSAFile))
145 self.assertTrue(FileUtils.isRessourceExists(self._obsSequenceCSVFile))
146 self.assertTrue(FileUtils.are2FilesIdentical(self._expSequenceCSVFile, self._obsSequenceCSVFile))
147
148 def test_writeBatchFile(self):
149 self._dBatchResults = {'BatchNumber': "1", 'BatchName': "batch1", 'GeneName': "gene1", 'RefSeqName': "Sequence de Reference"}
150 lBatchResults = []
151 lBatchResults.append(self._dBatchResults)
152 self._writeExpBatchFile()
153 self._Multifasta2SNPFileWriter._writeBatchFile(self._obsBatchFile, lBatchResults)
154 self.assertTrue(FileUtils.isRessourceExists(self._obsBatchFile))
155 self.assertTrue(FileUtils.are2FilesIdentical(self._expBatchFile, self._obsBatchFile))
156
157 def test_writeBatchLineFile(self):
158 self._lBatchLineResults = [{'IndividualNumber': "1", 'BatchNumber': "1"},
159 {'IndividualNumber': "2", 'BatchNumber': "1"}]
160 self._writeExpBatchLineFile()
161 self._Multifasta2SNPFileWriter._writeBatchLineFile(self._obsBatchLineFile, self._lBatchLineResults)
162 self.assertTrue(FileUtils.isRessourceExists(self._obsBatchLineFile))
163 self.assertTrue(FileUtils.are2FilesIdentical(self._expBatchLineFile, self._obsBatchLineFile))
164
165 def test_sortAlleleResultByAlleleNumber(self):
166 dAlleleResults = {'A': 3,
167 'G': 1,
168 'C': 2}
169
170 lExpAlleleSortedList = [('G', 1),
171 ('C', 2),
172 ('A', 3)]
173
174 lObsAlleleSortedList = self._Multifasta2SNPFileWriter.sortAlleleResultByAlleleNumber(dAlleleResults)
175 self.assertEquals(lExpAlleleSortedList, lObsAlleleSortedList)
176
177 def test_write(self):
178
179 self._writeInputFile()
180 batchName = "batch1"
181 taxon = "Arabidopsis thaliana"
182 gene = "methyltransferase"
183 multifasta2SNPFile = Multifasta2SNPFile(taxon, batchName, gene)
184 self._lSequenceWrapper = multifasta2SNPFile.createWrapperFromFile(self._inFileName)
185
186
187 multifasta2SNPFile._lSubSNPFileResults = [{'subSNPName': "SubSNP1", '5flank': "A", '3flank': "T", 'position': 1, 'lineName': "1", 'allele': 1, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1},
188 {'subSNPName': "SubSNP2", '5flank': "T", '3flank': "A", 'position': 10, 'lineName': "1", 'allele': 2, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1},
189 {'subSNPName': "SubSNP3", '5flank': "T", '3flank': "A", 'position': 20, 'lineName': "2", 'allele': 3, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1}]
190 multifasta2SNPFile._dAlleleFileResults['A'] = 1
191 multifasta2SNPFile._dAlleleFileResults['C'] = 2
192 multifasta2SNPFile._dAlleleFileResults['T'] = 3
193
194 multifasta2SNPFile._lIndividualFileResults = [{'individualNumber': 1, 'individualName': "Individual1", 'scientificName': "Arabidopsis thaliana"},
195 {'individualNumber': 2, 'individualName': "Individual2", 'scientificName': "Arabidopsis thaliana"}]
196
197 multifasta2SNPFile._lBatchFileResults = [{'BatchNumber': "1", 'BatchName': "batch1", 'GeneName': "gene1", 'RefSeqName': "Sequence de Reference"}]
198
199 multifasta2SNPFile._lBatchLineFileResults = [{'IndividualNumber': "1", 'BatchNumber': "1"},
200 {'IndividualNumber': "2", 'BatchNumber': "1"}]
201
202
203 self._writeExpSubSNPFile()
204 self._writeExpAlleleFile()
205 self._writeExpIndividualFile()
206 self._writeExpSequenceFiles()
207 self._writeExpBatchFile()
208 self._writeExpBatchLineFile()
209
210 self._Multifasta2SNPFileWriter.write(multifasta2SNPFile)
211 self.assertTrue(FileUtils.isRessourceExists(self._obsSubSNPFile))
212 self.assertTrue(FileUtils.are2FilesIdentical(self._expSubSNPFile, self._obsSubSNPFile))
213 self.assertTrue(FileUtils.isRessourceExists(self._obsAlleleFile))
214 self.assertTrue(FileUtils.are2FilesIdentical(self._expAlleleFile, self._obsAlleleFile))
215 self.assertTrue(FileUtils.isRessourceExists(self._obsIndividualFile))
216 self.assertTrue(FileUtils.are2FilesIdentical(self._expIndividualFile, self._obsIndividualFile))
217 self.assertTrue(FileUtils.isRessourceExists(self._obsSequenceFSAFile))
218 self.assertTrue(FileUtils.are2FilesIdentical(self._expSequenceFSAFile, self._obsSequenceFSAFile))
219 self.assertTrue(FileUtils.isRessourceExists(self._obsSequenceCSVFile))
220 self.assertTrue(FileUtils.are2FilesIdentical(self._expSequenceCSVFile, self._obsSequenceCSVFile))
221 self.assertTrue(FileUtils.isRessourceExists(self._obsBatchFile))
222 self.assertTrue(FileUtils.are2FilesIdentical(self._expBatchFile, self._obsBatchFile))
223 self.assertTrue(FileUtils.isRessourceExists(self._obsBatchLineFile))
224 self.assertTrue(FileUtils.are2FilesIdentical(self._expBatchLineFile, self._obsBatchLineFile))
225
226 def _writeExpSubSNPFile(self):
227 expFile = open(self._expSubSNPFile, "w")
228 expFile.write("SubSNPName;ConfidenceValue;Type;Position;5flank;3flank;Length;BatchNumber;IndividualNumber;PrimerType;PrimerNumber;Forward_or_Reverse;AlleleNumber\n")
229 expFile.write("SubSNP1;A;SNP;1;A;T;1;1;1;Sequence;;;1\n")
230 expFile.write("SubSNP2;A;SNP;10;T;A;1;1;1;Sequence;;;2\n")
231 expFile.write("SubSNP3;A;SNP;20;T;A;1;1;2;Sequence;;;3\n")
232 expFile.close()
233
234 def _writeExpAlleleFile(self):
235 expFile = open(self._expAlleleFile, "w")
236 expFile.write("AlleleNumber;Value;Motif;NbCopy;Comment\n")
237 expFile.write("1;A;;;\n")
238 expFile.write("2;C;;;\n")
239 expFile.write("3;T;;;\n")
240 expFile.close()
241
242
243 def _writeExpIndividualFile(self):
244 expFile = open(self._expIndividualFile, "w")
245 expFile.write("IndividualNumber;IndividualName;Description;AberrAneuploide;FractionLength;DeletionLineSynthesis;UrlEarImage;TypeLine;ChromNumber;ArmChrom;DeletionBin;ScientificName;local_germplasm_name;submitter_code;local_institute;donor_institute;donor_acc_id\n")
246 expFile.write("1;Individual1;;;;;;;;;;Arabidopsis thaliana;;;;;\n")
247 expFile.write("2;Individual2;;;;;;;;;;Arabidopsis thaliana;;;;;\n")
248 expFile.close()
249
250 def _writeInputFile(self):
251 inFileHandle = open(self._inFileName, "w")
252 inFileHandle.write(">Sequence_de_Reference\n")
253 inFileHandle.write("CCTAAGCCATTGCTTGGTGATTATGAAGGCAGTAGTCAAACCTCCACAATC\n")
254 inFileHandle.write(">Line1\n")
255 inFileHandle.write("CCTTAGCCATTGCTTGGTGACTATGAAGGCAGTAGGCAAACCTCCACAATC\n")
256 inFileHandle.write(">Line2\n")
257 inFileHandle.write("CCTAAGCCATTGCTTGGTGACTATCAAGGCAGTAGCCAAACCTCCACAATA")
258 inFileHandle.close()
259
260 def _writeExpSequenceFiles(self):
261 SequenceFSAFileHandle = open(self._expSequenceFSAFile, "w")
262 SequenceFSAFileHandle.write(">Sequence_de_Reference\n")
263 SequenceFSAFileHandle.write("CCTAAGCCATTGCTTGGTGATTATGAAGGCAGTAGTCAAACCTCCACAATC\n")
264 SequenceFSAFileHandle.close()
265 SequenceCSVFileHandle = open(self._expSequenceCSVFile, "w")
266 SequenceCSVFileHandle.write("SequenceName;SeqType;BankName;BankVersion;ACNumber;Locus;ScientificName\n")
267 SequenceCSVFileHandle.write("Sequence_de_Reference;Reference;;;;;Arabidopsis thaliana\n")
268 SequenceCSVFileHandle.close()
269
270 def _writeExpBatchFile(self):
271 BatchFileHandle = open(self._expBatchFile, "w")
272 BatchFileHandle.write("BatchNumber: 1\n")
273 BatchFileHandle.write("BatchName: batch1\n")
274 BatchFileHandle.write("GeneName: gene1\n")
275 BatchFileHandle.write("Description: \n")
276 BatchFileHandle.write("ContactNumber: \n")
277 BatchFileHandle.write("ProtocolNumber: \n")
278 BatchFileHandle.write("ThematicNumber: \n")
279 BatchFileHandle.write("RefSeqName: Sequence de Reference\n")
280 BatchFileHandle.write("AlignmentFileName: \n")
281 BatchFileHandle.write("SeqName: \n")
282 BatchFileHandle.write("//\n")
283 BatchFileHandle.close()
284
285 def _writeExpBatchLineFile(self):
286 BatchLineFileHandle = open(self._expBatchLineFile, "w")
287 BatchLineFileHandle.write("IndividualNumber;Pos5;Pos3;BatchNumber;Sequence\n")
288 BatchLineFileHandle.write("1;;;1;\n")
289 BatchLineFileHandle.write("2;;;1;\n")
290 BatchLineFileHandle.close()
291 if __name__ == "__main__":
292 unittest.main()