diff commons/core/parsing/test/Test_Multifasta2SNPFileWriter.py @ 6:769e306b7933

Change the repository level.
author yufei-luo
date Fri, 18 Jan 2013 04:54:14 -0500
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/commons/core/parsing/test/Test_Multifasta2SNPFileWriter.py	Fri Jan 18 04:54:14 2013 -0500
@@ -0,0 +1,292 @@
+from commons.core.utils.FileUtils import FileUtils
+from commons.core.seq.BioseqDB import BioseqDB
+from commons.core.seq.Bioseq import Bioseq
+from commons.core.parsing.Multifasta2SNPFile import Multifasta2SNPFileWriter
+from commons.core.parsing.Multifasta2SNPFile import Multifasta2SNPFile
+from commons.core.parsing.Multifasta2SNPFile import ReferenceBioseqAndLinesBioseqDBWrapper
+from commons.core.LoggerFactory import LoggerFactory
+import os
+import logging
+import unittest
+
+class Test_Multifasta2SNPFileWriter(unittest.TestCase):
+
+    def setUp(self):
+        self._obsSubSNPFile = "SubSNP.csv"
+        self._expSubSNPFile = "ExpSubSNP.csv"
+        
+        self._obsAlleleFile = "Allele.csv"
+        self._expAlleleFile = "ExpAllele.csv"
+        
+        self._obsIndividualFile = "Individual.csv"
+        self._expIndividualFile = "ExpIndividual.csv"
+        
+        self._obsSequenceFSAFile = "Sequences.fsa"
+        self._expSequenceFSAFile = "ExpSequences.fsa"
+        
+        self._obsSequenceCSVFile = "Sequences.csv"
+        self._expSequenceCSVFile = "ExpSequences.csv"
+        
+        self._obsBatchFile = "Batch.txt"
+        self._expBatchFile = "ExpBatch.txt"
+        
+        self._obsBatchLineFile = "BatchLine.csv"
+        self._expBatchLineFile = "ExpBatchLine.csv"
+        
+        self._logFileName = "Test_Multifasta2SNPWriter.log"
+        
+        self._inputFileName = "multifasta.fsa"
+        
+        self._lSNPResult = []
+        self._dAlleleResult = {}
+        self._lIndividualResult = []
+        self._refSeq = Bioseq()
+        self._seqDb= BioseqDB()
+        
+        self._logFile = LoggerFactory.createLogger(self._logFileName, logging.INFO, "%(asctime)s %(levelname)s: %(message)s")
+        self._lSequenceWrapper = ReferenceBioseqAndLinesBioseqDBWrapper(self._refSeq, self._seqDb,  self._logFile, self._inputFileName)
+        self._lBatchLineResults = []
+        
+        self._Multifasta2SNPFileWriter = Multifasta2SNPFileWriter()
+        
+        self._inFileName = "multifasta.txt"
+        self._taxon = "Arabidopsis thaliana"
+
+    def tearDown(self):
+        if FileUtils.isRessourceExists(self._inFileName):
+            os.remove(self._inFileName)
+        if FileUtils.isRessourceExists("multifasta2SNP.log"):
+            os.remove("multifasta2SNP.log")
+        if FileUtils.isRessourceExists("Test_Multifasta2SNPWriter.log"):
+            os.remove("Test_Multifasta2SNPWriter.log")
+            
+        if FileUtils.isRessourceExists(self._obsSubSNPFile):
+            os.remove(self._obsSubSNPFile)
+        if FileUtils.isRessourceExists(self._expSubSNPFile):
+            os.remove(self._expSubSNPFile)
+            
+        if FileUtils.isRessourceExists(self._obsAlleleFile):
+            os.remove(self._obsAlleleFile)
+        if FileUtils.isRessourceExists(self._expAlleleFile):
+            os.remove(self._expAlleleFile)
+            
+        if FileUtils.isRessourceExists(self._obsIndividualFile):
+            os.remove(self._obsIndividualFile)
+        if FileUtils.isRessourceExists(self._expIndividualFile):
+            os.remove(self._expIndividualFile)
+            
+        if FileUtils.isRessourceExists(self._obsSequenceFSAFile):
+            os.remove(self._obsSequenceFSAFile)
+        if FileUtils.isRessourceExists(self._expSequenceFSAFile):
+            os.remove(self._expSequenceFSAFile)
+            
+        if FileUtils.isRessourceExists(self._obsSequenceCSVFile):
+            os.remove(self._obsSequenceCSVFile)
+        if FileUtils.isRessourceExists(self._expSequenceCSVFile):
+            os.remove(self._expSequenceCSVFile)
+
+        if FileUtils.isRessourceExists(self._obsBatchFile):
+            FileUtils.removeFilesByPattern(self._obsBatchFile)
+        if FileUtils.isRessourceExists(self._expBatchFile):
+            FileUtils.removeFilesByPattern(self._expBatchFile)
+        
+        if FileUtils.isRessourceExists(self._obsBatchLineFile):
+            FileUtils.removeFilesByPattern(self._obsBatchLineFile)
+        if FileUtils.isRessourceExists(self._expBatchLineFile):
+            FileUtils.removeFilesByPattern(self._expBatchLineFile)
+        
+    def test_writeSubSNPFileWithSubSNPList(self):
+        self._lSNPResult = [{'subSNPName': "SubSNP1", '5flank': "A", '3flank': "T", 'position': 1, 'lineName': "1", 'allele': 1, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1}, 
+                            {'subSNPName': "SubSNP2", '5flank': "T", '3flank': "A", 'position': 10, 'lineName': "1", 'allele': 2, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1},
+                            {'subSNPName': "SubSNP3", '5flank': "T", '3flank': "A", 'position': 20, 'lineName': "2", 'allele': 3, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1}]
+        
+        self._writeExpSubSNPFile()
+        self._Multifasta2SNPFileWriter._writeSubSNPFile(self._obsSubSNPFile, self._lSNPResult)
+        
+        self.assertTrue(FileUtils.isRessourceExists(self._obsSubSNPFile))
+        self.assertTrue(FileUtils.are2FilesIdentical(self._expSubSNPFile, self._obsSubSNPFile))
+    
+    def test_writeAlleleFileWithAlleleDict(self):
+        self._dAlleleResult['A'] = 1
+        self._dAlleleResult['C'] = 2
+        self._dAlleleResult['T'] = 3
+                        
+        self._writeExpAlleleFile()
+        self._Multifasta2SNPFileWriter._writeAlleleFile(self._obsAlleleFile, self._dAlleleResult)
+        
+        self.assertTrue(FileUtils.isRessourceExists(self._obsAlleleFile))
+        self.assertTrue(FileUtils.are2FilesIdentical(self._expAlleleFile, self._obsAlleleFile))
+        
+    def test_writeIndividualFileWithIndivList(self):
+        self._lIndividualResult = [{'individualNumber': 1, 'individualName': "Individual1", 'scientificName': "Arabidopsis thaliana"},
+                                   {'individualNumber': 2, 'individualName': "Individual2", 'scientificName': "Arabidopsis thaliana"}]
+              
+        self._writeExpIndividualFile()
+        
+        self._Multifasta2SNPFileWriter._writeIndividualFile(self._obsIndividualFile, self._lIndividualResult)
+        
+        self.assertTrue(FileUtils.isRessourceExists(self._obsIndividualFile))
+        self.assertTrue(FileUtils.are2FilesIdentical(self._expIndividualFile, self._obsIndividualFile))
+    
+    def test_writeSequenceFilesWithSequenceWrapper(self):        
+        self._writeInputFile()
+        self._writeExpSequenceFiles()
+        batchName = "batch1"
+        taxon = "Arabidopsis thaliana"
+        gene = "methyltransferase"
+        multifasta2SNPFile = Multifasta2SNPFile(batchName, gene, taxon)
+        self._lSequenceWrapper = multifasta2SNPFile.createWrapperFromFile(self._inFileName)
+        lRefseq = []
+        lRefseq.append(self._lSequenceWrapper._iReferenceBioseq)
+        self._Multifasta2SNPFileWriter._writeSequenceFiles(self._obsSequenceFSAFile, self._obsSequenceCSVFile, lRefseq, taxon)
+
+        self.assertTrue(FileUtils.isRessourceExists(self._obsSequenceFSAFile))
+        self.assertTrue(FileUtils.are2FilesIdentical(self._expSequenceFSAFile, self._obsSequenceFSAFile))
+        self.assertTrue(FileUtils.isRessourceExists(self._obsSequenceCSVFile))
+        self.assertTrue(FileUtils.are2FilesIdentical(self._expSequenceCSVFile, self._obsSequenceCSVFile))        
+    
+    def test_writeBatchFile(self):        
+        self._dBatchResults = {'BatchNumber': "1", 'BatchName': "batch1", 'GeneName': "gene1", 'RefSeqName': "Sequence de Reference"}
+        lBatchResults = []
+        lBatchResults.append(self._dBatchResults)
+        self._writeExpBatchFile()
+        self._Multifasta2SNPFileWriter._writeBatchFile(self._obsBatchFile, lBatchResults)
+        self.assertTrue(FileUtils.isRessourceExists(self._obsBatchFile))
+        self.assertTrue(FileUtils.are2FilesIdentical(self._expBatchFile, self._obsBatchFile))
+        
+    def test_writeBatchLineFile(self):        
+        self._lBatchLineResults = [{'IndividualNumber': "1", 'BatchNumber': "1"},
+                                   {'IndividualNumber': "2", 'BatchNumber': "1"}]
+        self._writeExpBatchLineFile()
+        self._Multifasta2SNPFileWriter._writeBatchLineFile(self._obsBatchLineFile, self._lBatchLineResults)
+        self.assertTrue(FileUtils.isRessourceExists(self._obsBatchLineFile))
+        self.assertTrue(FileUtils.are2FilesIdentical(self._expBatchLineFile, self._obsBatchLineFile))
+        
+    def test_sortAlleleResultByAlleleNumber(self):
+        dAlleleResults = {'A': 3,
+                          'G': 1,
+                          'C': 2}
+        
+        lExpAlleleSortedList = [('G', 1),
+                                ('C', 2),
+                                ('A', 3)]        
+        
+        lObsAlleleSortedList = self._Multifasta2SNPFileWriter.sortAlleleResultByAlleleNumber(dAlleleResults)
+        self.assertEquals(lExpAlleleSortedList, lObsAlleleSortedList)
+        
+    def test_write(self):
+        
+        self._writeInputFile()
+        batchName = "batch1"
+        taxon = "Arabidopsis thaliana"
+        gene = "methyltransferase"
+        multifasta2SNPFile = Multifasta2SNPFile(taxon, batchName, gene)
+        self._lSequenceWrapper = multifasta2SNPFile.createWrapperFromFile(self._inFileName)
+        
+        
+        multifasta2SNPFile._lSubSNPFileResults = [{'subSNPName': "SubSNP1", '5flank': "A", '3flank': "T", 'position': 1, 'lineName': "1", 'allele': 1, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1}, 
+                            {'subSNPName': "SubSNP2", '5flank': "T", '3flank': "A", 'position': 10, 'lineName': "1", 'allele': 2, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1},
+                            {'subSNPName': "SubSNP3", '5flank': "T", '3flank': "A", 'position': 20, 'lineName': "2", 'allele': 3, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1}]
+        multifasta2SNPFile._dAlleleFileResults['A'] = 1
+        multifasta2SNPFile._dAlleleFileResults['C'] = 2
+        multifasta2SNPFile._dAlleleFileResults['T'] = 3
+        
+        multifasta2SNPFile._lIndividualFileResults = [{'individualNumber': 1, 'individualName': "Individual1", 'scientificName': "Arabidopsis thaliana"},
+                                   {'individualNumber': 2, 'individualName': "Individual2", 'scientificName': "Arabidopsis thaliana"}]
+        
+        multifasta2SNPFile._lBatchFileResults = [{'BatchNumber': "1", 'BatchName': "batch1", 'GeneName': "gene1", 'RefSeqName': "Sequence de Reference"}]
+        
+        multifasta2SNPFile._lBatchLineFileResults = [{'IndividualNumber': "1", 'BatchNumber': "1"},
+                                   {'IndividualNumber': "2", 'BatchNumber': "1"}] 
+        
+        
+        self._writeExpSubSNPFile()
+        self._writeExpAlleleFile()
+        self._writeExpIndividualFile()
+        self._writeExpSequenceFiles()
+        self._writeExpBatchFile()
+        self._writeExpBatchLineFile()
+        
+        self._Multifasta2SNPFileWriter.write(multifasta2SNPFile)
+        self.assertTrue(FileUtils.isRessourceExists(self._obsSubSNPFile))
+        self.assertTrue(FileUtils.are2FilesIdentical(self._expSubSNPFile, self._obsSubSNPFile))
+        self.assertTrue(FileUtils.isRessourceExists(self._obsAlleleFile))
+        self.assertTrue(FileUtils.are2FilesIdentical(self._expAlleleFile, self._obsAlleleFile))
+        self.assertTrue(FileUtils.isRessourceExists(self._obsIndividualFile))
+        self.assertTrue(FileUtils.are2FilesIdentical(self._expIndividualFile, self._obsIndividualFile))
+        self.assertTrue(FileUtils.isRessourceExists(self._obsSequenceFSAFile))
+        self.assertTrue(FileUtils.are2FilesIdentical(self._expSequenceFSAFile, self._obsSequenceFSAFile))
+        self.assertTrue(FileUtils.isRessourceExists(self._obsSequenceCSVFile))
+        self.assertTrue(FileUtils.are2FilesIdentical(self._expSequenceCSVFile, self._obsSequenceCSVFile))
+        self.assertTrue(FileUtils.isRessourceExists(self._obsBatchFile))
+        self.assertTrue(FileUtils.are2FilesIdentical(self._expBatchFile, self._obsBatchFile))
+        self.assertTrue(FileUtils.isRessourceExists(self._obsBatchLineFile))
+        self.assertTrue(FileUtils.are2FilesIdentical(self._expBatchLineFile, self._obsBatchLineFile))      
+    
+    def _writeExpSubSNPFile(self):
+        expFile = open(self._expSubSNPFile, "w")
+        expFile.write("SubSNPName;ConfidenceValue;Type;Position;5flank;3flank;Length;BatchNumber;IndividualNumber;PrimerType;PrimerNumber;Forward_or_Reverse;AlleleNumber\n")
+        expFile.write("SubSNP1;A;SNP;1;A;T;1;1;1;Sequence;;;1\n")
+        expFile.write("SubSNP2;A;SNP;10;T;A;1;1;1;Sequence;;;2\n")
+        expFile.write("SubSNP3;A;SNP;20;T;A;1;1;2;Sequence;;;3\n")
+        expFile.close()
+        
+    def _writeExpAlleleFile(self):
+        expFile = open(self._expAlleleFile, "w")
+        expFile.write("AlleleNumber;Value;Motif;NbCopy;Comment\n")
+        expFile.write("1;A;;;\n")
+        expFile.write("2;C;;;\n")
+        expFile.write("3;T;;;\n")
+        expFile.close()        
+        
+        
+    def _writeExpIndividualFile(self):
+        expFile = open(self._expIndividualFile, "w")
+        expFile.write("IndividualNumber;IndividualName;Description;AberrAneuploide;FractionLength;DeletionLineSynthesis;UrlEarImage;TypeLine;ChromNumber;ArmChrom;DeletionBin;ScientificName;local_germplasm_name;submitter_code;local_institute;donor_institute;donor_acc_id\n")
+        expFile.write("1;Individual1;;;;;;;;;;Arabidopsis thaliana;;;;;\n")
+        expFile.write("2;Individual2;;;;;;;;;;Arabidopsis thaliana;;;;;\n")
+        expFile.close()        
+
+    def _writeInputFile(self):
+        inFileHandle = open(self._inFileName, "w")
+        inFileHandle.write(">Sequence_de_Reference\n")
+        inFileHandle.write("CCTAAGCCATTGCTTGGTGATTATGAAGGCAGTAGTCAAACCTCCACAATC\n")
+        inFileHandle.write(">Line1\n")
+        inFileHandle.write("CCTTAGCCATTGCTTGGTGACTATGAAGGCAGTAGGCAAACCTCCACAATC\n")
+        inFileHandle.write(">Line2\n")
+        inFileHandle.write("CCTAAGCCATTGCTTGGTGACTATCAAGGCAGTAGCCAAACCTCCACAATA")
+        inFileHandle.close()
+        
+    def _writeExpSequenceFiles(self):
+        SequenceFSAFileHandle = open(self._expSequenceFSAFile, "w")
+        SequenceFSAFileHandle.write(">Sequence_de_Reference\n")
+        SequenceFSAFileHandle.write("CCTAAGCCATTGCTTGGTGATTATGAAGGCAGTAGTCAAACCTCCACAATC\n")
+        SequenceFSAFileHandle.close()
+        SequenceCSVFileHandle = open(self._expSequenceCSVFile, "w")
+        SequenceCSVFileHandle.write("SequenceName;SeqType;BankName;BankVersion;ACNumber;Locus;ScientificName\n")
+        SequenceCSVFileHandle.write("Sequence_de_Reference;Reference;;;;;Arabidopsis thaliana\n")
+        SequenceCSVFileHandle.close()
+        
+    def _writeExpBatchFile(self):
+        BatchFileHandle = open(self._expBatchFile, "w")
+        BatchFileHandle.write("BatchNumber: 1\n")
+        BatchFileHandle.write("BatchName: batch1\n")
+        BatchFileHandle.write("GeneName: gene1\n")
+        BatchFileHandle.write("Description: \n")
+        BatchFileHandle.write("ContactNumber: \n")
+        BatchFileHandle.write("ProtocolNumber: \n")
+        BatchFileHandle.write("ThematicNumber: \n")
+        BatchFileHandle.write("RefSeqName: Sequence de Reference\n")
+        BatchFileHandle.write("AlignmentFileName: \n")
+        BatchFileHandle.write("SeqName: \n")
+        BatchFileHandle.write("//\n")
+        BatchFileHandle.close()
+        
+    def _writeExpBatchLineFile(self):
+        BatchLineFileHandle = open(self._expBatchLineFile, "w")
+        BatchLineFileHandle.write("IndividualNumber;Pos5;Pos3;BatchNumber;Sequence\n")
+        BatchLineFileHandle.write("1;;;1;\n")
+        BatchLineFileHandle.write("2;;;1;\n")
+        BatchLineFileHandle.close()
+if __name__ == "__main__":
+    unittest.main()
\ No newline at end of file