6
|
1 from commons.core.utils.FileUtils import FileUtils
|
|
2 from commons.core.seq.BioseqDB import BioseqDB
|
|
3 from commons.core.seq.Bioseq import Bioseq
|
|
4 from commons.core.parsing.Multifasta2SNPFile import Multifasta2SNPFileWriter
|
|
5 from commons.core.parsing.Multifasta2SNPFile import Multifasta2SNPFile
|
|
6 from commons.core.parsing.Multifasta2SNPFile import ReferenceBioseqAndLinesBioseqDBWrapper
|
|
7 from commons.core.LoggerFactory import LoggerFactory
|
|
8 import os
|
|
9 import logging
|
|
10 import unittest
|
|
11
|
|
12 class Test_Multifasta2SNPFileWriter(unittest.TestCase):
|
|
13
|
|
14 def setUp(self):
|
|
15 self._obsSubSNPFile = "SubSNP.csv"
|
|
16 self._expSubSNPFile = "ExpSubSNP.csv"
|
|
17
|
|
18 self._obsAlleleFile = "Allele.csv"
|
|
19 self._expAlleleFile = "ExpAllele.csv"
|
|
20
|
|
21 self._obsIndividualFile = "Individual.csv"
|
|
22 self._expIndividualFile = "ExpIndividual.csv"
|
|
23
|
|
24 self._obsSequenceFSAFile = "Sequences.fsa"
|
|
25 self._expSequenceFSAFile = "ExpSequences.fsa"
|
|
26
|
|
27 self._obsSequenceCSVFile = "Sequences.csv"
|
|
28 self._expSequenceCSVFile = "ExpSequences.csv"
|
|
29
|
|
30 self._obsBatchFile = "Batch.txt"
|
|
31 self._expBatchFile = "ExpBatch.txt"
|
|
32
|
|
33 self._obsBatchLineFile = "BatchLine.csv"
|
|
34 self._expBatchLineFile = "ExpBatchLine.csv"
|
|
35
|
|
36 self._logFileName = "Test_Multifasta2SNPWriter.log"
|
|
37
|
|
38 self._inputFileName = "multifasta.fsa"
|
|
39
|
|
40 self._lSNPResult = []
|
|
41 self._dAlleleResult = {}
|
|
42 self._lIndividualResult = []
|
|
43 self._refSeq = Bioseq()
|
|
44 self._seqDb= BioseqDB()
|
|
45
|
|
46 self._logFile = LoggerFactory.createLogger(self._logFileName, logging.INFO, "%(asctime)s %(levelname)s: %(message)s")
|
|
47 self._lSequenceWrapper = ReferenceBioseqAndLinesBioseqDBWrapper(self._refSeq, self._seqDb, self._logFile, self._inputFileName)
|
|
48 self._lBatchLineResults = []
|
|
49
|
|
50 self._Multifasta2SNPFileWriter = Multifasta2SNPFileWriter()
|
|
51
|
|
52 self._inFileName = "multifasta.txt"
|
|
53 self._taxon = "Arabidopsis thaliana"
|
|
54
|
|
55 def tearDown(self):
|
|
56 if FileUtils.isRessourceExists(self._inFileName):
|
|
57 os.remove(self._inFileName)
|
|
58 if FileUtils.isRessourceExists("multifasta2SNP.log"):
|
|
59 os.remove("multifasta2SNP.log")
|
|
60 if FileUtils.isRessourceExists("Test_Multifasta2SNPWriter.log"):
|
|
61 os.remove("Test_Multifasta2SNPWriter.log")
|
|
62
|
|
63 if FileUtils.isRessourceExists(self._obsSubSNPFile):
|
|
64 os.remove(self._obsSubSNPFile)
|
|
65 if FileUtils.isRessourceExists(self._expSubSNPFile):
|
|
66 os.remove(self._expSubSNPFile)
|
|
67
|
|
68 if FileUtils.isRessourceExists(self._obsAlleleFile):
|
|
69 os.remove(self._obsAlleleFile)
|
|
70 if FileUtils.isRessourceExists(self._expAlleleFile):
|
|
71 os.remove(self._expAlleleFile)
|
|
72
|
|
73 if FileUtils.isRessourceExists(self._obsIndividualFile):
|
|
74 os.remove(self._obsIndividualFile)
|
|
75 if FileUtils.isRessourceExists(self._expIndividualFile):
|
|
76 os.remove(self._expIndividualFile)
|
|
77
|
|
78 if FileUtils.isRessourceExists(self._obsSequenceFSAFile):
|
|
79 os.remove(self._obsSequenceFSAFile)
|
|
80 if FileUtils.isRessourceExists(self._expSequenceFSAFile):
|
|
81 os.remove(self._expSequenceFSAFile)
|
|
82
|
|
83 if FileUtils.isRessourceExists(self._obsSequenceCSVFile):
|
|
84 os.remove(self._obsSequenceCSVFile)
|
|
85 if FileUtils.isRessourceExists(self._expSequenceCSVFile):
|
|
86 os.remove(self._expSequenceCSVFile)
|
|
87
|
|
88 if FileUtils.isRessourceExists(self._obsBatchFile):
|
|
89 FileUtils.removeFilesByPattern(self._obsBatchFile)
|
|
90 if FileUtils.isRessourceExists(self._expBatchFile):
|
|
91 FileUtils.removeFilesByPattern(self._expBatchFile)
|
|
92
|
|
93 if FileUtils.isRessourceExists(self._obsBatchLineFile):
|
|
94 FileUtils.removeFilesByPattern(self._obsBatchLineFile)
|
|
95 if FileUtils.isRessourceExists(self._expBatchLineFile):
|
|
96 FileUtils.removeFilesByPattern(self._expBatchLineFile)
|
|
97
|
|
98 def test_writeSubSNPFileWithSubSNPList(self):
|
|
99 self._lSNPResult = [{'subSNPName': "SubSNP1", '5flank': "A", '3flank': "T", 'position': 1, 'lineName': "1", 'allele': 1, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1},
|
|
100 {'subSNPName': "SubSNP2", '5flank': "T", '3flank': "A", 'position': 10, 'lineName': "1", 'allele': 2, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1},
|
|
101 {'subSNPName': "SubSNP3", '5flank': "T", '3flank': "A", 'position': 20, 'lineName': "2", 'allele': 3, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1}]
|
|
102
|
|
103 self._writeExpSubSNPFile()
|
|
104 self._Multifasta2SNPFileWriter._writeSubSNPFile(self._obsSubSNPFile, self._lSNPResult)
|
|
105
|
|
106 self.assertTrue(FileUtils.isRessourceExists(self._obsSubSNPFile))
|
|
107 self.assertTrue(FileUtils.are2FilesIdentical(self._expSubSNPFile, self._obsSubSNPFile))
|
|
108
|
|
109 def test_writeAlleleFileWithAlleleDict(self):
|
|
110 self._dAlleleResult['A'] = 1
|
|
111 self._dAlleleResult['C'] = 2
|
|
112 self._dAlleleResult['T'] = 3
|
|
113
|
|
114 self._writeExpAlleleFile()
|
|
115 self._Multifasta2SNPFileWriter._writeAlleleFile(self._obsAlleleFile, self._dAlleleResult)
|
|
116
|
|
117 self.assertTrue(FileUtils.isRessourceExists(self._obsAlleleFile))
|
|
118 self.assertTrue(FileUtils.are2FilesIdentical(self._expAlleleFile, self._obsAlleleFile))
|
|
119
|
|
120 def test_writeIndividualFileWithIndivList(self):
|
|
121 self._lIndividualResult = [{'individualNumber': 1, 'individualName': "Individual1", 'scientificName': "Arabidopsis thaliana"},
|
|
122 {'individualNumber': 2, 'individualName': "Individual2", 'scientificName': "Arabidopsis thaliana"}]
|
|
123
|
|
124 self._writeExpIndividualFile()
|
|
125
|
|
126 self._Multifasta2SNPFileWriter._writeIndividualFile(self._obsIndividualFile, self._lIndividualResult)
|
|
127
|
|
128 self.assertTrue(FileUtils.isRessourceExists(self._obsIndividualFile))
|
|
129 self.assertTrue(FileUtils.are2FilesIdentical(self._expIndividualFile, self._obsIndividualFile))
|
|
130
|
|
131 def test_writeSequenceFilesWithSequenceWrapper(self):
|
|
132 self._writeInputFile()
|
|
133 self._writeExpSequenceFiles()
|
|
134 batchName = "batch1"
|
|
135 taxon = "Arabidopsis thaliana"
|
|
136 gene = "methyltransferase"
|
|
137 multifasta2SNPFile = Multifasta2SNPFile(batchName, gene, taxon)
|
|
138 self._lSequenceWrapper = multifasta2SNPFile.createWrapperFromFile(self._inFileName)
|
|
139 lRefseq = []
|
|
140 lRefseq.append(self._lSequenceWrapper._iReferenceBioseq)
|
|
141 self._Multifasta2SNPFileWriter._writeSequenceFiles(self._obsSequenceFSAFile, self._obsSequenceCSVFile, lRefseq, taxon)
|
|
142
|
|
143 self.assertTrue(FileUtils.isRessourceExists(self._obsSequenceFSAFile))
|
|
144 self.assertTrue(FileUtils.are2FilesIdentical(self._expSequenceFSAFile, self._obsSequenceFSAFile))
|
|
145 self.assertTrue(FileUtils.isRessourceExists(self._obsSequenceCSVFile))
|
|
146 self.assertTrue(FileUtils.are2FilesIdentical(self._expSequenceCSVFile, self._obsSequenceCSVFile))
|
|
147
|
|
148 def test_writeBatchFile(self):
|
|
149 self._dBatchResults = {'BatchNumber': "1", 'BatchName': "batch1", 'GeneName': "gene1", 'RefSeqName': "Sequence de Reference"}
|
|
150 lBatchResults = []
|
|
151 lBatchResults.append(self._dBatchResults)
|
|
152 self._writeExpBatchFile()
|
|
153 self._Multifasta2SNPFileWriter._writeBatchFile(self._obsBatchFile, lBatchResults)
|
|
154 self.assertTrue(FileUtils.isRessourceExists(self._obsBatchFile))
|
|
155 self.assertTrue(FileUtils.are2FilesIdentical(self._expBatchFile, self._obsBatchFile))
|
|
156
|
|
157 def test_writeBatchLineFile(self):
|
|
158 self._lBatchLineResults = [{'IndividualNumber': "1", 'BatchNumber': "1"},
|
|
159 {'IndividualNumber': "2", 'BatchNumber': "1"}]
|
|
160 self._writeExpBatchLineFile()
|
|
161 self._Multifasta2SNPFileWriter._writeBatchLineFile(self._obsBatchLineFile, self._lBatchLineResults)
|
|
162 self.assertTrue(FileUtils.isRessourceExists(self._obsBatchLineFile))
|
|
163 self.assertTrue(FileUtils.are2FilesIdentical(self._expBatchLineFile, self._obsBatchLineFile))
|
|
164
|
|
165 def test_sortAlleleResultByAlleleNumber(self):
|
|
166 dAlleleResults = {'A': 3,
|
|
167 'G': 1,
|
|
168 'C': 2}
|
|
169
|
|
170 lExpAlleleSortedList = [('G', 1),
|
|
171 ('C', 2),
|
|
172 ('A', 3)]
|
|
173
|
|
174 lObsAlleleSortedList = self._Multifasta2SNPFileWriter.sortAlleleResultByAlleleNumber(dAlleleResults)
|
|
175 self.assertEquals(lExpAlleleSortedList, lObsAlleleSortedList)
|
|
176
|
|
177 def test_write(self):
|
|
178
|
|
179 self._writeInputFile()
|
|
180 batchName = "batch1"
|
|
181 taxon = "Arabidopsis thaliana"
|
|
182 gene = "methyltransferase"
|
|
183 multifasta2SNPFile = Multifasta2SNPFile(taxon, batchName, gene)
|
|
184 self._lSequenceWrapper = multifasta2SNPFile.createWrapperFromFile(self._inFileName)
|
|
185
|
|
186
|
|
187 multifasta2SNPFile._lSubSNPFileResults = [{'subSNPName': "SubSNP1", '5flank': "A", '3flank': "T", 'position': 1, 'lineName': "1", 'allele': 1, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1},
|
|
188 {'subSNPName': "SubSNP2", '5flank': "T", '3flank': "A", 'position': 10, 'lineName': "1", 'allele': 2, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1},
|
|
189 {'subSNPName': "SubSNP3", '5flank': "T", '3flank': "A", 'position': 20, 'lineName': "2", 'allele': 3, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1}]
|
|
190 multifasta2SNPFile._dAlleleFileResults['A'] = 1
|
|
191 multifasta2SNPFile._dAlleleFileResults['C'] = 2
|
|
192 multifasta2SNPFile._dAlleleFileResults['T'] = 3
|
|
193
|
|
194 multifasta2SNPFile._lIndividualFileResults = [{'individualNumber': 1, 'individualName': "Individual1", 'scientificName': "Arabidopsis thaliana"},
|
|
195 {'individualNumber': 2, 'individualName': "Individual2", 'scientificName': "Arabidopsis thaliana"}]
|
|
196
|
|
197 multifasta2SNPFile._lBatchFileResults = [{'BatchNumber': "1", 'BatchName': "batch1", 'GeneName': "gene1", 'RefSeqName': "Sequence de Reference"}]
|
|
198
|
|
199 multifasta2SNPFile._lBatchLineFileResults = [{'IndividualNumber': "1", 'BatchNumber': "1"},
|
|
200 {'IndividualNumber': "2", 'BatchNumber': "1"}]
|
|
201
|
|
202
|
|
203 self._writeExpSubSNPFile()
|
|
204 self._writeExpAlleleFile()
|
|
205 self._writeExpIndividualFile()
|
|
206 self._writeExpSequenceFiles()
|
|
207 self._writeExpBatchFile()
|
|
208 self._writeExpBatchLineFile()
|
|
209
|
|
210 self._Multifasta2SNPFileWriter.write(multifasta2SNPFile)
|
|
211 self.assertTrue(FileUtils.isRessourceExists(self._obsSubSNPFile))
|
|
212 self.assertTrue(FileUtils.are2FilesIdentical(self._expSubSNPFile, self._obsSubSNPFile))
|
|
213 self.assertTrue(FileUtils.isRessourceExists(self._obsAlleleFile))
|
|
214 self.assertTrue(FileUtils.are2FilesIdentical(self._expAlleleFile, self._obsAlleleFile))
|
|
215 self.assertTrue(FileUtils.isRessourceExists(self._obsIndividualFile))
|
|
216 self.assertTrue(FileUtils.are2FilesIdentical(self._expIndividualFile, self._obsIndividualFile))
|
|
217 self.assertTrue(FileUtils.isRessourceExists(self._obsSequenceFSAFile))
|
|
218 self.assertTrue(FileUtils.are2FilesIdentical(self._expSequenceFSAFile, self._obsSequenceFSAFile))
|
|
219 self.assertTrue(FileUtils.isRessourceExists(self._obsSequenceCSVFile))
|
|
220 self.assertTrue(FileUtils.are2FilesIdentical(self._expSequenceCSVFile, self._obsSequenceCSVFile))
|
|
221 self.assertTrue(FileUtils.isRessourceExists(self._obsBatchFile))
|
|
222 self.assertTrue(FileUtils.are2FilesIdentical(self._expBatchFile, self._obsBatchFile))
|
|
223 self.assertTrue(FileUtils.isRessourceExists(self._obsBatchLineFile))
|
|
224 self.assertTrue(FileUtils.are2FilesIdentical(self._expBatchLineFile, self._obsBatchLineFile))
|
|
225
|
|
226 def _writeExpSubSNPFile(self):
|
|
227 expFile = open(self._expSubSNPFile, "w")
|
|
228 expFile.write("SubSNPName;ConfidenceValue;Type;Position;5flank;3flank;Length;BatchNumber;IndividualNumber;PrimerType;PrimerNumber;Forward_or_Reverse;AlleleNumber\n")
|
|
229 expFile.write("SubSNP1;A;SNP;1;A;T;1;1;1;Sequence;;;1\n")
|
|
230 expFile.write("SubSNP2;A;SNP;10;T;A;1;1;1;Sequence;;;2\n")
|
|
231 expFile.write("SubSNP3;A;SNP;20;T;A;1;1;2;Sequence;;;3\n")
|
|
232 expFile.close()
|
|
233
|
|
234 def _writeExpAlleleFile(self):
|
|
235 expFile = open(self._expAlleleFile, "w")
|
|
236 expFile.write("AlleleNumber;Value;Motif;NbCopy;Comment\n")
|
|
237 expFile.write("1;A;;;\n")
|
|
238 expFile.write("2;C;;;\n")
|
|
239 expFile.write("3;T;;;\n")
|
|
240 expFile.close()
|
|
241
|
|
242
|
|
243 def _writeExpIndividualFile(self):
|
|
244 expFile = open(self._expIndividualFile, "w")
|
|
245 expFile.write("IndividualNumber;IndividualName;Description;AberrAneuploide;FractionLength;DeletionLineSynthesis;UrlEarImage;TypeLine;ChromNumber;ArmChrom;DeletionBin;ScientificName;local_germplasm_name;submitter_code;local_institute;donor_institute;donor_acc_id\n")
|
|
246 expFile.write("1;Individual1;;;;;;;;;;Arabidopsis thaliana;;;;;\n")
|
|
247 expFile.write("2;Individual2;;;;;;;;;;Arabidopsis thaliana;;;;;\n")
|
|
248 expFile.close()
|
|
249
|
|
250 def _writeInputFile(self):
|
|
251 inFileHandle = open(self._inFileName, "w")
|
|
252 inFileHandle.write(">Sequence_de_Reference\n")
|
|
253 inFileHandle.write("CCTAAGCCATTGCTTGGTGATTATGAAGGCAGTAGTCAAACCTCCACAATC\n")
|
|
254 inFileHandle.write(">Line1\n")
|
|
255 inFileHandle.write("CCTTAGCCATTGCTTGGTGACTATGAAGGCAGTAGGCAAACCTCCACAATC\n")
|
|
256 inFileHandle.write(">Line2\n")
|
|
257 inFileHandle.write("CCTAAGCCATTGCTTGGTGACTATCAAGGCAGTAGCCAAACCTCCACAATA")
|
|
258 inFileHandle.close()
|
|
259
|
|
260 def _writeExpSequenceFiles(self):
|
|
261 SequenceFSAFileHandle = open(self._expSequenceFSAFile, "w")
|
|
262 SequenceFSAFileHandle.write(">Sequence_de_Reference\n")
|
|
263 SequenceFSAFileHandle.write("CCTAAGCCATTGCTTGGTGATTATGAAGGCAGTAGTCAAACCTCCACAATC\n")
|
|
264 SequenceFSAFileHandle.close()
|
|
265 SequenceCSVFileHandle = open(self._expSequenceCSVFile, "w")
|
|
266 SequenceCSVFileHandle.write("SequenceName;SeqType;BankName;BankVersion;ACNumber;Locus;ScientificName\n")
|
|
267 SequenceCSVFileHandle.write("Sequence_de_Reference;Reference;;;;;Arabidopsis thaliana\n")
|
|
268 SequenceCSVFileHandle.close()
|
|
269
|
|
270 def _writeExpBatchFile(self):
|
|
271 BatchFileHandle = open(self._expBatchFile, "w")
|
|
272 BatchFileHandle.write("BatchNumber: 1\n")
|
|
273 BatchFileHandle.write("BatchName: batch1\n")
|
|
274 BatchFileHandle.write("GeneName: gene1\n")
|
|
275 BatchFileHandle.write("Description: \n")
|
|
276 BatchFileHandle.write("ContactNumber: \n")
|
|
277 BatchFileHandle.write("ProtocolNumber: \n")
|
|
278 BatchFileHandle.write("ThematicNumber: \n")
|
|
279 BatchFileHandle.write("RefSeqName: Sequence de Reference\n")
|
|
280 BatchFileHandle.write("AlignmentFileName: \n")
|
|
281 BatchFileHandle.write("SeqName: \n")
|
|
282 BatchFileHandle.write("//\n")
|
|
283 BatchFileHandle.close()
|
|
284
|
|
285 def _writeExpBatchLineFile(self):
|
|
286 BatchLineFileHandle = open(self._expBatchLineFile, "w")
|
|
287 BatchLineFileHandle.write("IndividualNumber;Pos5;Pos3;BatchNumber;Sequence\n")
|
|
288 BatchLineFileHandle.write("1;;;1;\n")
|
|
289 BatchLineFileHandle.write("2;;;1;\n")
|
|
290 BatchLineFileHandle.close()
|
|
291 if __name__ == "__main__":
|
|
292 unittest.main() |