Mercurial > repos > yufei-luo > s_mart
diff commons/core/parsing/test/Test_Multifasta2SNPFileWriter.py @ 31:0ab839023fe4
Uploaded
author | m-zytnicki |
---|---|
date | Tue, 30 Apr 2013 14:33:21 -0400 |
parents | 769e306b7933 |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/commons/core/parsing/test/Test_Multifasta2SNPFileWriter.py Tue Apr 30 14:33:21 2013 -0400 @@ -0,0 +1,292 @@ +from commons.core.utils.FileUtils import FileUtils +from commons.core.seq.BioseqDB import BioseqDB +from commons.core.seq.Bioseq import Bioseq +from commons.core.parsing.Multifasta2SNPFile import Multifasta2SNPFileWriter +from commons.core.parsing.Multifasta2SNPFile import Multifasta2SNPFile +from commons.core.parsing.Multifasta2SNPFile import ReferenceBioseqAndLinesBioseqDBWrapper +from commons.core.LoggerFactory import LoggerFactory +import os +import logging +import unittest + +class Test_Multifasta2SNPFileWriter(unittest.TestCase): + + def setUp(self): + self._obsSubSNPFile = "SubSNP.csv" + self._expSubSNPFile = "ExpSubSNP.csv" + + self._obsAlleleFile = "Allele.csv" + self._expAlleleFile = "ExpAllele.csv" + + self._obsIndividualFile = "Individual.csv" + self._expIndividualFile = "ExpIndividual.csv" + + self._obsSequenceFSAFile = "Sequences.fsa" + self._expSequenceFSAFile = "ExpSequences.fsa" + + self._obsSequenceCSVFile = "Sequences.csv" + self._expSequenceCSVFile = "ExpSequences.csv" + + self._obsBatchFile = "Batch.txt" + self._expBatchFile = "ExpBatch.txt" + + self._obsBatchLineFile = "BatchLine.csv" + self._expBatchLineFile = "ExpBatchLine.csv" + + self._logFileName = "Test_Multifasta2SNPWriter.log" + + self._inputFileName = "multifasta.fsa" + + self._lSNPResult = [] + self._dAlleleResult = {} + self._lIndividualResult = [] + self._refSeq = Bioseq() + self._seqDb= BioseqDB() + + self._logFile = LoggerFactory.createLogger(self._logFileName, logging.INFO, "%(asctime)s %(levelname)s: %(message)s") + self._lSequenceWrapper = ReferenceBioseqAndLinesBioseqDBWrapper(self._refSeq, self._seqDb, self._logFile, self._inputFileName) + self._lBatchLineResults = [] + + self._Multifasta2SNPFileWriter = Multifasta2SNPFileWriter() + + self._inFileName = "multifasta.txt" + self._taxon = "Arabidopsis thaliana" + + def tearDown(self): + if FileUtils.isRessourceExists(self._inFileName): + os.remove(self._inFileName) + if FileUtils.isRessourceExists("multifasta2SNP.log"): + os.remove("multifasta2SNP.log") + if FileUtils.isRessourceExists("Test_Multifasta2SNPWriter.log"): + os.remove("Test_Multifasta2SNPWriter.log") + + if FileUtils.isRessourceExists(self._obsSubSNPFile): + os.remove(self._obsSubSNPFile) + if FileUtils.isRessourceExists(self._expSubSNPFile): + os.remove(self._expSubSNPFile) + + if FileUtils.isRessourceExists(self._obsAlleleFile): + os.remove(self._obsAlleleFile) + if FileUtils.isRessourceExists(self._expAlleleFile): + os.remove(self._expAlleleFile) + + if FileUtils.isRessourceExists(self._obsIndividualFile): + os.remove(self._obsIndividualFile) + if FileUtils.isRessourceExists(self._expIndividualFile): + os.remove(self._expIndividualFile) + + if FileUtils.isRessourceExists(self._obsSequenceFSAFile): + os.remove(self._obsSequenceFSAFile) + if FileUtils.isRessourceExists(self._expSequenceFSAFile): + os.remove(self._expSequenceFSAFile) + + if FileUtils.isRessourceExists(self._obsSequenceCSVFile): + os.remove(self._obsSequenceCSVFile) + if FileUtils.isRessourceExists(self._expSequenceCSVFile): + os.remove(self._expSequenceCSVFile) + + if FileUtils.isRessourceExists(self._obsBatchFile): + FileUtils.removeFilesByPattern(self._obsBatchFile) + if FileUtils.isRessourceExists(self._expBatchFile): + FileUtils.removeFilesByPattern(self._expBatchFile) + + if FileUtils.isRessourceExists(self._obsBatchLineFile): + FileUtils.removeFilesByPattern(self._obsBatchLineFile) + if FileUtils.isRessourceExists(self._expBatchLineFile): + FileUtils.removeFilesByPattern(self._expBatchLineFile) + + def test_writeSubSNPFileWithSubSNPList(self): + self._lSNPResult = [{'subSNPName': "SubSNP1", '5flank': "A", '3flank': "T", 'position': 1, 'lineName': "1", 'allele': 1, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1}, + {'subSNPName': "SubSNP2", '5flank': "T", '3flank': "A", 'position': 10, 'lineName': "1", 'allele': 2, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1}, + {'subSNPName': "SubSNP3", '5flank': "T", '3flank': "A", 'position': 20, 'lineName': "2", 'allele': 3, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1}] + + self._writeExpSubSNPFile() + self._Multifasta2SNPFileWriter._writeSubSNPFile(self._obsSubSNPFile, self._lSNPResult) + + self.assertTrue(FileUtils.isRessourceExists(self._obsSubSNPFile)) + self.assertTrue(FileUtils.are2FilesIdentical(self._expSubSNPFile, self._obsSubSNPFile)) + + def test_writeAlleleFileWithAlleleDict(self): + self._dAlleleResult['A'] = 1 + self._dAlleleResult['C'] = 2 + self._dAlleleResult['T'] = 3 + + self._writeExpAlleleFile() + self._Multifasta2SNPFileWriter._writeAlleleFile(self._obsAlleleFile, self._dAlleleResult) + + self.assertTrue(FileUtils.isRessourceExists(self._obsAlleleFile)) + self.assertTrue(FileUtils.are2FilesIdentical(self._expAlleleFile, self._obsAlleleFile)) + + def test_writeIndividualFileWithIndivList(self): + self._lIndividualResult = [{'individualNumber': 1, 'individualName': "Individual1", 'scientificName': "Arabidopsis thaliana"}, + {'individualNumber': 2, 'individualName': "Individual2", 'scientificName': "Arabidopsis thaliana"}] + + self._writeExpIndividualFile() + + self._Multifasta2SNPFileWriter._writeIndividualFile(self._obsIndividualFile, self._lIndividualResult) + + self.assertTrue(FileUtils.isRessourceExists(self._obsIndividualFile)) + self.assertTrue(FileUtils.are2FilesIdentical(self._expIndividualFile, self._obsIndividualFile)) + + def test_writeSequenceFilesWithSequenceWrapper(self): + self._writeInputFile() + self._writeExpSequenceFiles() + batchName = "batch1" + taxon = "Arabidopsis thaliana" + gene = "methyltransferase" + multifasta2SNPFile = Multifasta2SNPFile(batchName, gene, taxon) + self._lSequenceWrapper = multifasta2SNPFile.createWrapperFromFile(self._inFileName) + lRefseq = [] + lRefseq.append(self._lSequenceWrapper._iReferenceBioseq) + self._Multifasta2SNPFileWriter._writeSequenceFiles(self._obsSequenceFSAFile, self._obsSequenceCSVFile, lRefseq, taxon) + + self.assertTrue(FileUtils.isRessourceExists(self._obsSequenceFSAFile)) + self.assertTrue(FileUtils.are2FilesIdentical(self._expSequenceFSAFile, self._obsSequenceFSAFile)) + self.assertTrue(FileUtils.isRessourceExists(self._obsSequenceCSVFile)) + self.assertTrue(FileUtils.are2FilesIdentical(self._expSequenceCSVFile, self._obsSequenceCSVFile)) + + def test_writeBatchFile(self): + self._dBatchResults = {'BatchNumber': "1", 'BatchName': "batch1", 'GeneName': "gene1", 'RefSeqName': "Sequence de Reference"} + lBatchResults = [] + lBatchResults.append(self._dBatchResults) + self._writeExpBatchFile() + self._Multifasta2SNPFileWriter._writeBatchFile(self._obsBatchFile, lBatchResults) + self.assertTrue(FileUtils.isRessourceExists(self._obsBatchFile)) + self.assertTrue(FileUtils.are2FilesIdentical(self._expBatchFile, self._obsBatchFile)) + + def test_writeBatchLineFile(self): + self._lBatchLineResults = [{'IndividualNumber': "1", 'BatchNumber': "1"}, + {'IndividualNumber': "2", 'BatchNumber': "1"}] + self._writeExpBatchLineFile() + self._Multifasta2SNPFileWriter._writeBatchLineFile(self._obsBatchLineFile, self._lBatchLineResults) + self.assertTrue(FileUtils.isRessourceExists(self._obsBatchLineFile)) + self.assertTrue(FileUtils.are2FilesIdentical(self._expBatchLineFile, self._obsBatchLineFile)) + + def test_sortAlleleResultByAlleleNumber(self): + dAlleleResults = {'A': 3, + 'G': 1, + 'C': 2} + + lExpAlleleSortedList = [('G', 1), + ('C', 2), + ('A', 3)] + + lObsAlleleSortedList = self._Multifasta2SNPFileWriter.sortAlleleResultByAlleleNumber(dAlleleResults) + self.assertEquals(lExpAlleleSortedList, lObsAlleleSortedList) + + def test_write(self): + + self._writeInputFile() + batchName = "batch1" + taxon = "Arabidopsis thaliana" + gene = "methyltransferase" + multifasta2SNPFile = Multifasta2SNPFile(taxon, batchName, gene) + self._lSequenceWrapper = multifasta2SNPFile.createWrapperFromFile(self._inFileName) + + + multifasta2SNPFile._lSubSNPFileResults = [{'subSNPName': "SubSNP1", '5flank': "A", '3flank': "T", 'position': 1, 'lineName': "1", 'allele': 1, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1}, + {'subSNPName': "SubSNP2", '5flank': "T", '3flank': "A", 'position': 10, 'lineName': "1", 'allele': 2, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1}, + {'subSNPName': "SubSNP3", '5flank': "T", '3flank': "A", 'position': 20, 'lineName': "2", 'allele': 3, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1}] + multifasta2SNPFile._dAlleleFileResults['A'] = 1 + multifasta2SNPFile._dAlleleFileResults['C'] = 2 + multifasta2SNPFile._dAlleleFileResults['T'] = 3 + + multifasta2SNPFile._lIndividualFileResults = [{'individualNumber': 1, 'individualName': "Individual1", 'scientificName': "Arabidopsis thaliana"}, + {'individualNumber': 2, 'individualName': "Individual2", 'scientificName': "Arabidopsis thaliana"}] + + multifasta2SNPFile._lBatchFileResults = [{'BatchNumber': "1", 'BatchName': "batch1", 'GeneName': "gene1", 'RefSeqName': "Sequence de Reference"}] + + multifasta2SNPFile._lBatchLineFileResults = [{'IndividualNumber': "1", 'BatchNumber': "1"}, + {'IndividualNumber': "2", 'BatchNumber': "1"}] + + + self._writeExpSubSNPFile() + self._writeExpAlleleFile() + self._writeExpIndividualFile() + self._writeExpSequenceFiles() + self._writeExpBatchFile() + self._writeExpBatchLineFile() + + self._Multifasta2SNPFileWriter.write(multifasta2SNPFile) + self.assertTrue(FileUtils.isRessourceExists(self._obsSubSNPFile)) + self.assertTrue(FileUtils.are2FilesIdentical(self._expSubSNPFile, self._obsSubSNPFile)) + self.assertTrue(FileUtils.isRessourceExists(self._obsAlleleFile)) + self.assertTrue(FileUtils.are2FilesIdentical(self._expAlleleFile, self._obsAlleleFile)) + self.assertTrue(FileUtils.isRessourceExists(self._obsIndividualFile)) + self.assertTrue(FileUtils.are2FilesIdentical(self._expIndividualFile, self._obsIndividualFile)) + self.assertTrue(FileUtils.isRessourceExists(self._obsSequenceFSAFile)) + self.assertTrue(FileUtils.are2FilesIdentical(self._expSequenceFSAFile, self._obsSequenceFSAFile)) + self.assertTrue(FileUtils.isRessourceExists(self._obsSequenceCSVFile)) + self.assertTrue(FileUtils.are2FilesIdentical(self._expSequenceCSVFile, self._obsSequenceCSVFile)) + self.assertTrue(FileUtils.isRessourceExists(self._obsBatchFile)) + self.assertTrue(FileUtils.are2FilesIdentical(self._expBatchFile, self._obsBatchFile)) + self.assertTrue(FileUtils.isRessourceExists(self._obsBatchLineFile)) + self.assertTrue(FileUtils.are2FilesIdentical(self._expBatchLineFile, self._obsBatchLineFile)) + + def _writeExpSubSNPFile(self): + expFile = open(self._expSubSNPFile, "w") + expFile.write("SubSNPName;ConfidenceValue;Type;Position;5flank;3flank;Length;BatchNumber;IndividualNumber;PrimerType;PrimerNumber;Forward_or_Reverse;AlleleNumber\n") + expFile.write("SubSNP1;A;SNP;1;A;T;1;1;1;Sequence;;;1\n") + expFile.write("SubSNP2;A;SNP;10;T;A;1;1;1;Sequence;;;2\n") + expFile.write("SubSNP3;A;SNP;20;T;A;1;1;2;Sequence;;;3\n") + expFile.close() + + def _writeExpAlleleFile(self): + expFile = open(self._expAlleleFile, "w") + expFile.write("AlleleNumber;Value;Motif;NbCopy;Comment\n") + expFile.write("1;A;;;\n") + expFile.write("2;C;;;\n") + expFile.write("3;T;;;\n") + expFile.close() + + + def _writeExpIndividualFile(self): + expFile = open(self._expIndividualFile, "w") + expFile.write("IndividualNumber;IndividualName;Description;AberrAneuploide;FractionLength;DeletionLineSynthesis;UrlEarImage;TypeLine;ChromNumber;ArmChrom;DeletionBin;ScientificName;local_germplasm_name;submitter_code;local_institute;donor_institute;donor_acc_id\n") + expFile.write("1;Individual1;;;;;;;;;;Arabidopsis thaliana;;;;;\n") + expFile.write("2;Individual2;;;;;;;;;;Arabidopsis thaliana;;;;;\n") + expFile.close() + + def _writeInputFile(self): + inFileHandle = open(self._inFileName, "w") + inFileHandle.write(">Sequence_de_Reference\n") + inFileHandle.write("CCTAAGCCATTGCTTGGTGATTATGAAGGCAGTAGTCAAACCTCCACAATC\n") + inFileHandle.write(">Line1\n") + inFileHandle.write("CCTTAGCCATTGCTTGGTGACTATGAAGGCAGTAGGCAAACCTCCACAATC\n") + inFileHandle.write(">Line2\n") + inFileHandle.write("CCTAAGCCATTGCTTGGTGACTATCAAGGCAGTAGCCAAACCTCCACAATA") + inFileHandle.close() + + def _writeExpSequenceFiles(self): + SequenceFSAFileHandle = open(self._expSequenceFSAFile, "w") + SequenceFSAFileHandle.write(">Sequence_de_Reference\n") + SequenceFSAFileHandle.write("CCTAAGCCATTGCTTGGTGATTATGAAGGCAGTAGTCAAACCTCCACAATC\n") + SequenceFSAFileHandle.close() + SequenceCSVFileHandle = open(self._expSequenceCSVFile, "w") + SequenceCSVFileHandle.write("SequenceName;SeqType;BankName;BankVersion;ACNumber;Locus;ScientificName\n") + SequenceCSVFileHandle.write("Sequence_de_Reference;Reference;;;;;Arabidopsis thaliana\n") + SequenceCSVFileHandle.close() + + def _writeExpBatchFile(self): + BatchFileHandle = open(self._expBatchFile, "w") + BatchFileHandle.write("BatchNumber: 1\n") + BatchFileHandle.write("BatchName: batch1\n") + BatchFileHandle.write("GeneName: gene1\n") + BatchFileHandle.write("Description: \n") + BatchFileHandle.write("ContactNumber: \n") + BatchFileHandle.write("ProtocolNumber: \n") + BatchFileHandle.write("ThematicNumber: \n") + BatchFileHandle.write("RefSeqName: Sequence de Reference\n") + BatchFileHandle.write("AlignmentFileName: \n") + BatchFileHandle.write("SeqName: \n") + BatchFileHandle.write("//\n") + BatchFileHandle.close() + + def _writeExpBatchLineFile(self): + BatchLineFileHandle = open(self._expBatchLineFile, "w") + BatchLineFileHandle.write("IndividualNumber;Pos5;Pos3;BatchNumber;Sequence\n") + BatchLineFileHandle.write("1;;;1;\n") + BatchLineFileHandle.write("2;;;1;\n") + BatchLineFileHandle.close() +if __name__ == "__main__": + unittest.main() \ No newline at end of file