view smart_toolShed/commons/core/parsing/test/Test_Multifasta2SNPFileWriter.py @ 0:e0f8dcca02ed

Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author yufei-luo
date Thu, 17 Jan 2013 10:52:14 -0500
parents
children
line wrap: on
line source

from commons.core.utils.FileUtils import FileUtils
from commons.core.seq.BioseqDB import BioseqDB
from commons.core.seq.Bioseq import Bioseq
from commons.core.parsing.Multifasta2SNPFile import Multifasta2SNPFileWriter
from commons.core.parsing.Multifasta2SNPFile import Multifasta2SNPFile
from commons.core.parsing.Multifasta2SNPFile import ReferenceBioseqAndLinesBioseqDBWrapper
from commons.core.LoggerFactory import LoggerFactory
import os
import logging
import unittest

class Test_Multifasta2SNPFileWriter(unittest.TestCase):

    def setUp(self):
        self._obsSubSNPFile = "SubSNP.csv"
        self._expSubSNPFile = "ExpSubSNP.csv"
        
        self._obsAlleleFile = "Allele.csv"
        self._expAlleleFile = "ExpAllele.csv"
        
        self._obsIndividualFile = "Individual.csv"
        self._expIndividualFile = "ExpIndividual.csv"
        
        self._obsSequenceFSAFile = "Sequences.fsa"
        self._expSequenceFSAFile = "ExpSequences.fsa"
        
        self._obsSequenceCSVFile = "Sequences.csv"
        self._expSequenceCSVFile = "ExpSequences.csv"
        
        self._obsBatchFile = "Batch.txt"
        self._expBatchFile = "ExpBatch.txt"
        
        self._obsBatchLineFile = "BatchLine.csv"
        self._expBatchLineFile = "ExpBatchLine.csv"
        
        self._logFileName = "Test_Multifasta2SNPWriter.log"
        
        self._inputFileName = "multifasta.fsa"
        
        self._lSNPResult = []
        self._dAlleleResult = {}
        self._lIndividualResult = []
        self._refSeq = Bioseq()
        self._seqDb= BioseqDB()
        
        self._logFile = LoggerFactory.createLogger(self._logFileName, logging.INFO, "%(asctime)s %(levelname)s: %(message)s")
        self._lSequenceWrapper = ReferenceBioseqAndLinesBioseqDBWrapper(self._refSeq, self._seqDb,  self._logFile, self._inputFileName)
        self._lBatchLineResults = []
        
        self._Multifasta2SNPFileWriter = Multifasta2SNPFileWriter()
        
        self._inFileName = "multifasta.txt"
        self._taxon = "Arabidopsis thaliana"

    def tearDown(self):
        if FileUtils.isRessourceExists(self._inFileName):
            os.remove(self._inFileName)
        if FileUtils.isRessourceExists("multifasta2SNP.log"):
            os.remove("multifasta2SNP.log")
        if FileUtils.isRessourceExists("Test_Multifasta2SNPWriter.log"):
            os.remove("Test_Multifasta2SNPWriter.log")
            
        if FileUtils.isRessourceExists(self._obsSubSNPFile):
            os.remove(self._obsSubSNPFile)
        if FileUtils.isRessourceExists(self._expSubSNPFile):
            os.remove(self._expSubSNPFile)
            
        if FileUtils.isRessourceExists(self._obsAlleleFile):
            os.remove(self._obsAlleleFile)
        if FileUtils.isRessourceExists(self._expAlleleFile):
            os.remove(self._expAlleleFile)
            
        if FileUtils.isRessourceExists(self._obsIndividualFile):
            os.remove(self._obsIndividualFile)
        if FileUtils.isRessourceExists(self._expIndividualFile):
            os.remove(self._expIndividualFile)
            
        if FileUtils.isRessourceExists(self._obsSequenceFSAFile):
            os.remove(self._obsSequenceFSAFile)
        if FileUtils.isRessourceExists(self._expSequenceFSAFile):
            os.remove(self._expSequenceFSAFile)
            
        if FileUtils.isRessourceExists(self._obsSequenceCSVFile):
            os.remove(self._obsSequenceCSVFile)
        if FileUtils.isRessourceExists(self._expSequenceCSVFile):
            os.remove(self._expSequenceCSVFile)

        if FileUtils.isRessourceExists(self._obsBatchFile):
            FileUtils.removeFilesByPattern(self._obsBatchFile)
        if FileUtils.isRessourceExists(self._expBatchFile):
            FileUtils.removeFilesByPattern(self._expBatchFile)
        
        if FileUtils.isRessourceExists(self._obsBatchLineFile):
            FileUtils.removeFilesByPattern(self._obsBatchLineFile)
        if FileUtils.isRessourceExists(self._expBatchLineFile):
            FileUtils.removeFilesByPattern(self._expBatchLineFile)
        
    def test_writeSubSNPFileWithSubSNPList(self):
        self._lSNPResult = [{'subSNPName': "SubSNP1", '5flank': "A", '3flank': "T", 'position': 1, 'lineName': "1", 'allele': 1, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1}, 
                            {'subSNPName': "SubSNP2", '5flank': "T", '3flank': "A", 'position': 10, 'lineName': "1", 'allele': 2, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1},
                            {'subSNPName': "SubSNP3", '5flank': "T", '3flank': "A", 'position': 20, 'lineName': "2", 'allele': 3, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1}]
        
        self._writeExpSubSNPFile()
        self._Multifasta2SNPFileWriter._writeSubSNPFile(self._obsSubSNPFile, self._lSNPResult)
        
        self.assertTrue(FileUtils.isRessourceExists(self._obsSubSNPFile))
        self.assertTrue(FileUtils.are2FilesIdentical(self._expSubSNPFile, self._obsSubSNPFile))
    
    def test_writeAlleleFileWithAlleleDict(self):
        self._dAlleleResult['A'] = 1
        self._dAlleleResult['C'] = 2
        self._dAlleleResult['T'] = 3
                        
        self._writeExpAlleleFile()
        self._Multifasta2SNPFileWriter._writeAlleleFile(self._obsAlleleFile, self._dAlleleResult)
        
        self.assertTrue(FileUtils.isRessourceExists(self._obsAlleleFile))
        self.assertTrue(FileUtils.are2FilesIdentical(self._expAlleleFile, self._obsAlleleFile))
        
    def test_writeIndividualFileWithIndivList(self):
        self._lIndividualResult = [{'individualNumber': 1, 'individualName': "Individual1", 'scientificName': "Arabidopsis thaliana"},
                                   {'individualNumber': 2, 'individualName': "Individual2", 'scientificName': "Arabidopsis thaliana"}]
              
        self._writeExpIndividualFile()
        
        self._Multifasta2SNPFileWriter._writeIndividualFile(self._obsIndividualFile, self._lIndividualResult)
        
        self.assertTrue(FileUtils.isRessourceExists(self._obsIndividualFile))
        self.assertTrue(FileUtils.are2FilesIdentical(self._expIndividualFile, self._obsIndividualFile))
    
    def test_writeSequenceFilesWithSequenceWrapper(self):        
        self._writeInputFile()
        self._writeExpSequenceFiles()
        batchName = "batch1"
        taxon = "Arabidopsis thaliana"
        gene = "methyltransferase"
        multifasta2SNPFile = Multifasta2SNPFile(batchName, gene, taxon)
        self._lSequenceWrapper = multifasta2SNPFile.createWrapperFromFile(self._inFileName)
        lRefseq = []
        lRefseq.append(self._lSequenceWrapper._iReferenceBioseq)
        self._Multifasta2SNPFileWriter._writeSequenceFiles(self._obsSequenceFSAFile, self._obsSequenceCSVFile, lRefseq, taxon)

        self.assertTrue(FileUtils.isRessourceExists(self._obsSequenceFSAFile))
        self.assertTrue(FileUtils.are2FilesIdentical(self._expSequenceFSAFile, self._obsSequenceFSAFile))
        self.assertTrue(FileUtils.isRessourceExists(self._obsSequenceCSVFile))
        self.assertTrue(FileUtils.are2FilesIdentical(self._expSequenceCSVFile, self._obsSequenceCSVFile))        
    
    def test_writeBatchFile(self):        
        self._dBatchResults = {'BatchNumber': "1", 'BatchName': "batch1", 'GeneName': "gene1", 'RefSeqName': "Sequence de Reference"}
        lBatchResults = []
        lBatchResults.append(self._dBatchResults)
        self._writeExpBatchFile()
        self._Multifasta2SNPFileWriter._writeBatchFile(self._obsBatchFile, lBatchResults)
        self.assertTrue(FileUtils.isRessourceExists(self._obsBatchFile))
        self.assertTrue(FileUtils.are2FilesIdentical(self._expBatchFile, self._obsBatchFile))
        
    def test_writeBatchLineFile(self):        
        self._lBatchLineResults = [{'IndividualNumber': "1", 'BatchNumber': "1"},
                                   {'IndividualNumber': "2", 'BatchNumber': "1"}]
        self._writeExpBatchLineFile()
        self._Multifasta2SNPFileWriter._writeBatchLineFile(self._obsBatchLineFile, self._lBatchLineResults)
        self.assertTrue(FileUtils.isRessourceExists(self._obsBatchLineFile))
        self.assertTrue(FileUtils.are2FilesIdentical(self._expBatchLineFile, self._obsBatchLineFile))
        
    def test_sortAlleleResultByAlleleNumber(self):
        dAlleleResults = {'A': 3,
                          'G': 1,
                          'C': 2}
        
        lExpAlleleSortedList = [('G', 1),
                                ('C', 2),
                                ('A', 3)]        
        
        lObsAlleleSortedList = self._Multifasta2SNPFileWriter.sortAlleleResultByAlleleNumber(dAlleleResults)
        self.assertEquals(lExpAlleleSortedList, lObsAlleleSortedList)
        
    def test_write(self):
        
        self._writeInputFile()
        batchName = "batch1"
        taxon = "Arabidopsis thaliana"
        gene = "methyltransferase"
        multifasta2SNPFile = Multifasta2SNPFile(taxon, batchName, gene)
        self._lSequenceWrapper = multifasta2SNPFile.createWrapperFromFile(self._inFileName)
        
        
        multifasta2SNPFile._lSubSNPFileResults = [{'subSNPName': "SubSNP1", '5flank': "A", '3flank': "T", 'position': 1, 'lineName': "1", 'allele': 1, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1}, 
                            {'subSNPName': "SubSNP2", '5flank': "T", '3flank': "A", 'position': 10, 'lineName': "1", 'allele': 2, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1},
                            {'subSNPName': "SubSNP3", '5flank': "T", '3flank': "A", 'position': 20, 'lineName': "2", 'allele': 3, 'batchNumber': 1, 'confidenceValue' : "A", 'type' : "SNP", 'length': 1}]
        multifasta2SNPFile._dAlleleFileResults['A'] = 1
        multifasta2SNPFile._dAlleleFileResults['C'] = 2
        multifasta2SNPFile._dAlleleFileResults['T'] = 3
        
        multifasta2SNPFile._lIndividualFileResults = [{'individualNumber': 1, 'individualName': "Individual1", 'scientificName': "Arabidopsis thaliana"},
                                   {'individualNumber': 2, 'individualName': "Individual2", 'scientificName': "Arabidopsis thaliana"}]
        
        multifasta2SNPFile._lBatchFileResults = [{'BatchNumber': "1", 'BatchName': "batch1", 'GeneName': "gene1", 'RefSeqName': "Sequence de Reference"}]
        
        multifasta2SNPFile._lBatchLineFileResults = [{'IndividualNumber': "1", 'BatchNumber': "1"},
                                   {'IndividualNumber': "2", 'BatchNumber': "1"}] 
        
        
        self._writeExpSubSNPFile()
        self._writeExpAlleleFile()
        self._writeExpIndividualFile()
        self._writeExpSequenceFiles()
        self._writeExpBatchFile()
        self._writeExpBatchLineFile()
        
        self._Multifasta2SNPFileWriter.write(multifasta2SNPFile)
        self.assertTrue(FileUtils.isRessourceExists(self._obsSubSNPFile))
        self.assertTrue(FileUtils.are2FilesIdentical(self._expSubSNPFile, self._obsSubSNPFile))
        self.assertTrue(FileUtils.isRessourceExists(self._obsAlleleFile))
        self.assertTrue(FileUtils.are2FilesIdentical(self._expAlleleFile, self._obsAlleleFile))
        self.assertTrue(FileUtils.isRessourceExists(self._obsIndividualFile))
        self.assertTrue(FileUtils.are2FilesIdentical(self._expIndividualFile, self._obsIndividualFile))
        self.assertTrue(FileUtils.isRessourceExists(self._obsSequenceFSAFile))
        self.assertTrue(FileUtils.are2FilesIdentical(self._expSequenceFSAFile, self._obsSequenceFSAFile))
        self.assertTrue(FileUtils.isRessourceExists(self._obsSequenceCSVFile))
        self.assertTrue(FileUtils.are2FilesIdentical(self._expSequenceCSVFile, self._obsSequenceCSVFile))
        self.assertTrue(FileUtils.isRessourceExists(self._obsBatchFile))
        self.assertTrue(FileUtils.are2FilesIdentical(self._expBatchFile, self._obsBatchFile))
        self.assertTrue(FileUtils.isRessourceExists(self._obsBatchLineFile))
        self.assertTrue(FileUtils.are2FilesIdentical(self._expBatchLineFile, self._obsBatchLineFile))      
    
    def _writeExpSubSNPFile(self):
        expFile = open(self._expSubSNPFile, "w")
        expFile.write("SubSNPName;ConfidenceValue;Type;Position;5flank;3flank;Length;BatchNumber;IndividualNumber;PrimerType;PrimerNumber;Forward_or_Reverse;AlleleNumber\n")
        expFile.write("SubSNP1;A;SNP;1;A;T;1;1;1;Sequence;;;1\n")
        expFile.write("SubSNP2;A;SNP;10;T;A;1;1;1;Sequence;;;2\n")
        expFile.write("SubSNP3;A;SNP;20;T;A;1;1;2;Sequence;;;3\n")
        expFile.close()
        
    def _writeExpAlleleFile(self):
        expFile = open(self._expAlleleFile, "w")
        expFile.write("AlleleNumber;Value;Motif;NbCopy;Comment\n")
        expFile.write("1;A;;;\n")
        expFile.write("2;C;;;\n")
        expFile.write("3;T;;;\n")
        expFile.close()        
        
        
    def _writeExpIndividualFile(self):
        expFile = open(self._expIndividualFile, "w")
        expFile.write("IndividualNumber;IndividualName;Description;AberrAneuploide;FractionLength;DeletionLineSynthesis;UrlEarImage;TypeLine;ChromNumber;ArmChrom;DeletionBin;ScientificName;local_germplasm_name;submitter_code;local_institute;donor_institute;donor_acc_id\n")
        expFile.write("1;Individual1;;;;;;;;;;Arabidopsis thaliana;;;;;\n")
        expFile.write("2;Individual2;;;;;;;;;;Arabidopsis thaliana;;;;;\n")
        expFile.close()        

    def _writeInputFile(self):
        inFileHandle = open(self._inFileName, "w")
        inFileHandle.write(">Sequence_de_Reference\n")
        inFileHandle.write("CCTAAGCCATTGCTTGGTGATTATGAAGGCAGTAGTCAAACCTCCACAATC\n")
        inFileHandle.write(">Line1\n")
        inFileHandle.write("CCTTAGCCATTGCTTGGTGACTATGAAGGCAGTAGGCAAACCTCCACAATC\n")
        inFileHandle.write(">Line2\n")
        inFileHandle.write("CCTAAGCCATTGCTTGGTGACTATCAAGGCAGTAGCCAAACCTCCACAATA")
        inFileHandle.close()
        
    def _writeExpSequenceFiles(self):
        SequenceFSAFileHandle = open(self._expSequenceFSAFile, "w")
        SequenceFSAFileHandle.write(">Sequence_de_Reference\n")
        SequenceFSAFileHandle.write("CCTAAGCCATTGCTTGGTGATTATGAAGGCAGTAGTCAAACCTCCACAATC\n")
        SequenceFSAFileHandle.close()
        SequenceCSVFileHandle = open(self._expSequenceCSVFile, "w")
        SequenceCSVFileHandle.write("SequenceName;SeqType;BankName;BankVersion;ACNumber;Locus;ScientificName\n")
        SequenceCSVFileHandle.write("Sequence_de_Reference;Reference;;;;;Arabidopsis thaliana\n")
        SequenceCSVFileHandle.close()
        
    def _writeExpBatchFile(self):
        BatchFileHandle = open(self._expBatchFile, "w")
        BatchFileHandle.write("BatchNumber: 1\n")
        BatchFileHandle.write("BatchName: batch1\n")
        BatchFileHandle.write("GeneName: gene1\n")
        BatchFileHandle.write("Description: \n")
        BatchFileHandle.write("ContactNumber: \n")
        BatchFileHandle.write("ProtocolNumber: \n")
        BatchFileHandle.write("ThematicNumber: \n")
        BatchFileHandle.write("RefSeqName: Sequence de Reference\n")
        BatchFileHandle.write("AlignmentFileName: \n")
        BatchFileHandle.write("SeqName: \n")
        BatchFileHandle.write("//\n")
        BatchFileHandle.close()
        
    def _writeExpBatchLineFile(self):
        BatchLineFileHandle = open(self._expBatchLineFile, "w")
        BatchLineFileHandle.write("IndividualNumber;Pos5;Pos3;BatchNumber;Sequence\n")
        BatchLineFileHandle.write("1;;;1;\n")
        BatchLineFileHandle.write("2;;;1;\n")
        BatchLineFileHandle.close()
if __name__ == "__main__":
    unittest.main()