view smart_toolShed/commons/core/parsing/test/Test_VarscanFile.py @ 0:e0f8dcca02ed

Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author yufei-luo
date Thu, 17 Jan 2013 10:52:14 -0500
parents
children
line wrap: on
line source

from commons.core.parsing.VarscanFile import VarscanFile
from commons.core.parsing.VarscanHit import VarscanHit
import unittest
import os
from commons.core.parsing.VarscanHit_WithTag import VarscanHit_WithTag
from commons.core.parsing.VarscanHit_v2_2_8 import VarscanHit_v2_2_8
from commons.core.parsing.VarscanHit_v2_2_8_WithTag import VarscanHit_v2_2_8_WithTag
from commons.core.checker.CheckerException import CheckerException

class Test_VarscanFile(unittest.TestCase):

    def test_parse_fileWithHeader(self):
        varscanFileName = "file.varscan"
        self._writeVarscanFile(varscanFileName)
        
        varscanHit1 = VarscanHit()
        varscanHit1.setChrom('C02HBa0291P19_LR48')
        varscanHit1.setPosition('32')
        varscanHit1.setRef('C')
        varscanHit1.setVar('T')
        
        varscanHit2 = VarscanHit()
        varscanHit2.setChrom('C02HBa0291P19_LR48')
        varscanHit2.setPosition('34')
        varscanHit2.setRef('A')
        varscanHit2.setVar('T')
        expVarscanHitsList = [varscanHit1, varscanHit2]
        
        iVarscanFile = VarscanFile(varscanFileName)
        iVarscanFile.parse()
        obsVarscanHitsList = iVarscanFile.getVarscanHitsList()
        os.remove(varscanFileName)
        
        self.assertEquals(expVarscanHitsList, obsVarscanHitsList)   

    def test_parse_FileWithoutHeader(self):
        varscanFileName = "file.varscan"
        self._writeVarscanFileWithoutHeader(varscanFileName)
        
        varscanHit1 = VarscanHit()
        varscanHit1.setChrom('C02HBa0291P19_LR48')
        varscanHit1.setPosition('32')
        varscanHit1.setRef('C')
        varscanHit1.setVar('T')
        
        varscanHit2 = VarscanHit()
        varscanHit2.setChrom('C02HBa0291P19_LR48')
        varscanHit2.setPosition('34')
        varscanHit2.setRef('A')
        varscanHit2.setVar('T')
        expVarscanHitsList = [varscanHit1, varscanHit2]
        
        iVarscanFile = VarscanFile(varscanFileName)
        iVarscanFile.parse()
        obsVarscanHitsList = iVarscanFile.getVarscanHitsList()
        obsTypeOfVarscanFile = iVarscanFile.getTypeOfVarscanFile()
        expTypeOfVarscanFile = "Varscan_2_2"
        
        self.assertEquals(expVarscanHitsList, obsVarscanHitsList) 
        self.assertEquals(expTypeOfVarscanFile, obsTypeOfVarscanFile) 
        os.remove(varscanFileName)
        
    def test_parse_VarscanFileWithTag(self):
        inputFileName = "%s/commons/core/parsing/test/varscan.tab" % os.environ["REPET_PATH"]
        self._writeVarscanFileWithTag(inputFileName)
        launcher = VarscanFile(inputFileName)
        launcher.parse()
        obsListOfVarscanHits = launcher.getListOfVarscanHits() 
        
        varscanHit1 = VarscanHit_WithTag()
        varscanHit1.setChrom('C02HBa0291P19_LR48')
        varscanHit1.setPosition('32')
        varscanHit1.setRef('C')
        varscanHit1.setVar('T')
        varscanHit1.setTag('EspeceA')
        
        varscanHit2 = VarscanHit_WithTag()
        varscanHit2.setChrom('C02HBa0291P19_LR48')
        varscanHit2.setPosition('34')
        varscanHit2.setRef('A')
        varscanHit2.setVar('T')
        varscanHit2.setTag('EspeceA')
        expVarscanHitsList = [varscanHit1, varscanHit2] 
        
        obsTypeOfVarscanFile = launcher.getTypeOfVarscanFile()
        expTypeOfVarscanFile = "Varscan_2_2_WithTag"
        
        self.assertEquals(expVarscanHitsList, obsListOfVarscanHits) 
        self.assertEquals(expTypeOfVarscanFile, obsTypeOfVarscanFile) 
        os.remove(inputFileName)
        
    def test_parse_VarscanFile_v2_2_8(self):
        inputFileName = "%s/commons/core/parsing/test/varscan.tab" % os.environ["REPET_PATH"]
        self._writeVarscanFile_v2_2_8(inputFileName)
        launcher = VarscanFile(inputFileName)
        launcher.parse()
        obsListOfVarscanHits = launcher.getListOfVarscanHits() 
        
        varscanHit1 = VarscanHit_v2_2_8()
        varscanHit1.setChrom('C11HBa0064J13_LR285')
        varscanHit1.setPosition('3227')
        varscanHit1.setRef('G')
        varscanHit1.setVar('A')
        varscanHit1.setCns('A')
        
        varscanHit2 = VarscanHit_v2_2_8()
        varscanHit2.setChrom('C11HBa0064J13_LR285')
        varscanHit2.setPosition('3230')
        varscanHit2.setRef('G')
        varscanHit2.setVar('T')
        varscanHit2.setCns('T')
        expVarscanHitsList = [varscanHit1, varscanHit2] 
        
        obsTypeOfVarscanFile = launcher.getTypeOfVarscanFile()
        expTypeOfVarscanFile = "Varscan_2_2_8"
        
        self.assertEquals(expVarscanHitsList, obsListOfVarscanHits) 
        self.assertEquals(expTypeOfVarscanFile, obsTypeOfVarscanFile) 
        os.remove(inputFileName)
    
    def test_parse_other(self):
        inputFileName = "%s/commons/core/parsing/test/varscan.tab" % os.environ["REPET_PATH"]
        self._writeOther(inputFileName)
        launcher = VarscanFile(inputFileName)
        try:
            launcher.parse()
        except CheckerException, e:
            checkerExceptionInstance = e
        expMessage = "Warning: this line (l.1) is not a valid varscan line !"
        obsMessage = checkerExceptionInstance.msg
        os.remove(inputFileName)
        self.assertEquals(expMessage, obsMessage) 
        
    def test__eq__notEqual(self):
        lVarscanHits1 = [VarscanHit("chrom", "12", "T", "G"), VarscanHit("chrom", "14", "T", "G")]
        iVarscanFile1 = VarscanFile()
        iVarscanFile1.setVarscanHitsList(lVarscanHits1)
        lVarscanHits2 = [VarscanHit("chrom", "12", "T", "G"), VarscanHit("chrom", "14", "T", "C")]
        iVarscanFile2 = VarscanFile()
        iVarscanFile2.setVarscanHitsList(lVarscanHits2)

        self.assertNotEquals(iVarscanFile1, iVarscanFile2)
        
    def test__eq__equal(self):
        lVarscanHits1 = [VarscanHit("chrom", "12", "T", "G"), VarscanHit("chrom", "14", "T", "G")]
        iVarscanFile1 = VarscanFile()
        iVarscanFile1.setVarscanHitsList(lVarscanHits1)
        lVarscanHits2 = [VarscanHit("chrom", "12", "T", "G"), VarscanHit("chrom", "14", "T", "G")]
        iVarscanFile2 = VarscanFile()
        iVarscanFile2.setVarscanHitsList(lVarscanHits2)

        self.assertEquals(iVarscanFile1, iVarscanFile2)
        
    def test_selectTypeOfVarscanHitObject_noVarscanHit(self):
        launcher = VarscanFile()
        launcher.setTypeOfVarscanFile("")
        try:
            launcher.selectTypeOfVarscanHitObject()
        except CheckerException, e:
            checkerExceptionInstance = e
        expMessage = "Error: no varscan object found !"
        obsMessage = checkerExceptionInstance.msg
        self.assertEquals(expMessage, obsMessage) 
        
    def test_selectTypeOfVarscanHitObject_VarscanHit(self):
        launcher = VarscanFile()
        launcher.setTypeOfVarscanFile("Varscan_2_2")
        obsVarscanHit = launcher.selectTypeOfVarscanHitObject()
        expVarscanHit = VarscanHit()
        self.assertEquals(expVarscanHit, obsVarscanHit)
        
    def test_selectTypeOfVarscanHitObject_VarscanHitWithTag(self):
        launcher = VarscanFile()
        launcher.setTypeOfVarscanFile("Varscan_2_2_WithTag")
        obsVarscanHit = launcher.selectTypeOfVarscanHitObject()
        expVarscanHit = VarscanHit_WithTag()
        self.assertEquals(expVarscanHit, obsVarscanHit)
        
    def test_selectTypeOfVarscanHitObject_noVarscanHit_2_2_8(self):
        launcher = VarscanFile()
        launcher.setTypeOfVarscanFile("Varscan_2_2_8")
        obsVarscanHit = launcher.selectTypeOfVarscanHitObject()
        expVarscanHit = VarscanHit_v2_2_8()
        self.assertEquals(expVarscanHit, obsVarscanHit)
        
    def test_selectTypeOfVarscanHitObject_noVarscanHit_2_2_8_WithTag(self):
        launcher = VarscanFile()
        launcher.setTypeOfVarscanFile("Varscan_2_2_8_WithTag")
        obsVarscanHit = launcher.selectTypeOfVarscanHitObject()
        expVarscanHit = VarscanHit_v2_2_8_WithTag()
        self.assertEquals(expVarscanHit, obsVarscanHit)
        
    def test_createVarscanObjectFromLine_VarscanHit(self):
        line = "C02HBa0291P19_LR48\t32\tC\tT\t1\t2\t66,67%\t1\t1\t37\t35\t0.3999999999999999\n"
        nbLine = 1
        launcher = VarscanFile()
        launcher.setTypeOfVarscanFile("Varscan_2_2")
        obsVarscanHit = launcher.createVarscanObjectFromLine(line, nbLine)
        expVarscanHit = VarscanHit()
        expVarscanHit.setChrom('C02HBa0291P19_LR48')
        expVarscanHit.setPosition('32')
        expVarscanHit.setRef('C')
        expVarscanHit.setVar('T')
        self.assertEquals(expVarscanHit, obsVarscanHit)
        
    def test_createVarscanObjectFromLine_VarscanHitWithTag(self):
        line = "C02HBa0291P19_LR48\t32\tC\tT\t1\t2\t66,67%\t1\t1\t37\t35\t0.3999999999999999\tEspeceA\n"
        nbLine = 1
        launcher = VarscanFile()
        launcher.setTypeOfVarscanFile("Varscan_2_2_WithTag")
        obsVarscanHit = launcher.createVarscanObjectFromLine(line, nbLine)
        expVarscanHit = VarscanHit_WithTag()
        expVarscanHit.setChrom('C02HBa0291P19_LR48')
        expVarscanHit.setPosition('32')
        expVarscanHit.setRef('C')
        expVarscanHit.setVar('T')
        expVarscanHit.setTag('EspeceA')
        self.assertEquals(expVarscanHit, obsVarscanHit)
        
    def test_createVarscanObjectFromLine_VarscanHit_v2_2_8(self):
        line = "C11HBa0064J13_LR285\t3227\tG\tA\t0\t1\t100%\t0\t1\t0\t54\t0.98\t0\t1\t0\t0\t1\t0\tA\n"
        nbLine = 1
        launcher = VarscanFile()
        launcher.setTypeOfVarscanFile("Varscan_2_2_8")
        obsVarscanHit = launcher.createVarscanObjectFromLine(line, nbLine)
        expVarscanHit = VarscanHit_v2_2_8()
        expVarscanHit.setChrom('C11HBa0064J13_LR285')
        expVarscanHit.setPosition('3227')
        expVarscanHit.setRef('G')
        expVarscanHit.setVar('A')
        expVarscanHit.setCns('A')
        self.assertEquals(expVarscanHit, obsVarscanHit)
        
    def test_createVarscanObjectFromLine_VarscanHit_v2_2_8_WithTag(self):
        line = "C11HBa0064J13_LR285\t3227\tG\tA\t0\t1\t100%\t0\t1\t0\t54\t0.98\t0\t1\t0\t0\t1\t0\tA\tEspeceA\n"
        nbLine = 1
        launcher = VarscanFile()
        launcher.setTypeOfVarscanFile("Varscan_2_2_8_WithTag")
        obsVarscanHit = launcher.createVarscanObjectFromLine(line, nbLine)
        expVarscanHit = VarscanHit_v2_2_8_WithTag()
        expVarscanHit.setChrom('C11HBa0064J13_LR285')
        expVarscanHit.setPosition('3227')
        expVarscanHit.setRef('G')
        expVarscanHit.setVar('A')
        expVarscanHit.setCns('A')
        expVarscanHit.setTag('EspeceA')
        self.assertEquals(expVarscanHit, obsVarscanHit)
    
    def _writeVarscanFile(self, varscanFileName):
        varscanFile = open(varscanFileName, 'w')
        varscanFile.write("Chrom\tPosition\tRef\tVar\tReads1\tReads2\tVarFreq\tStrands1\tStrands2\tQual1\tQual2\tPvalue\n")
        varscanFile.write("C02HBa0291P19_LR48\t32\tC\tT\t1\t2\t66,67%\t1\t1\t37\t35\t0.3999999999999999\n")
        varscanFile.write("C02HBa0291P19_LR48\t34\tA\tT\t1\t2\t66,67%\t1\t1\t40\t34\t0.3999999999999999\n")
        varscanFile.close()

    def _writeVarscanFileWithoutHeader(self, varscanFileName):
        varscanFile = open(varscanFileName, 'w')
        varscanFile.write("C02HBa0291P19_LR48\t32\tC\tT\t1\t2\t66,67%\t1\t1\t37\t35\t0.3999999999999999\n")
        varscanFile.write("C02HBa0291P19_LR48\t34\tA\tT\t1\t2\t66,67%\t1\t1\t40\t34\t0.3999999999999999\n")
        varscanFile.close()
    
    def _writeVarscanFileWithTag(self, varscanFileName):
        varscanFile = open(varscanFileName, 'w')
        varscanFile.write("Chrom\tPosition\tRef\tVar\tReads1\tReads2\tVarFreq\tStrands1\tStrands2\tQual1\tQual2\tPvalue\n")
        varscanFile.write("C02HBa0291P19_LR48\t32\tC\tT\t1\t2\t66,67%\t1\t1\t37\t35\t0.3999999999999999\tEspeceA\n")
        varscanFile.write("C02HBa0291P19_LR48\t34\tA\tT\t1\t2\t66,67%\t1\t1\t40\t34\t0.3999999999999999\tEspeceA\n")
        varscanFile.close()
    
    def _writeVarscanFile_v2_2_8(self, varscanFileName):
        varscanFile = open(varscanFileName, 'w')
        varscanFile.write("Chrom\tPosition\tRef\tCons\tReads1\tReads2\tVarFreq\tStrands1\tStrands2\tQual1\tQual2\tPvalue\tMapQual1\tMapQual2\tReads1Plus\tReads1Minus\tReads2Plus\tReads2Minus\tVarAllele\n")
        varscanFile.write("C11HBa0064J13_LR285\t3227\tG\tA\t0\t1\t100%\t0\t1\t0\t54\t0.98\t0\t1\t0\t0\t1\t0\tA\n")
        varscanFile.write("C11HBa0064J13_LR285\t3230\tG\tT\t0\t1\t100%\t0\t1\t0\t54\t0.98\t0\t1\t0\t0\t1\t0\tT\n")
        varscanFile.close()
    
    def _writeOther(self, fileName):
        file = open(fileName, 'w')
        file.write('##gff-version 3\n')
        file.write('chr16\tBlatToGff\tBES\t21686950\t21687294\t.\t+\t.\tID=MRRE1H001H13FM1;Name=MRRE1H001H13FM1;bes_start=21686950;bes_end=21687294;bes_size=22053297\n')
        file.write('chr16\tBlatToGff\tBES\t21736364\t21737069\t.\t+\t.\tID=machin1;Name=machin1;bes_start=21736364;bes_end=21737069;bes_size=22053297\n')
        file.write('chr11\tBlatToGff\tBES\t3725876\t3726473\t.\t+\t.\tID=MRRE1H032F08FM1;Name=MRRE1H032F08FM1;bes_start=3725876;bes_end=3726473;bes_size=19818926\n')
        file.write('chr11\tBlatToGff\tBES\t3794984\t3795627\t.\t+\t.\tID=machin2;Name=machin2;bes_start=3794984;bes_end=3795627;bes_size=19818926\n')
        file.write('chr18\tBlatToGff\tBES\t12067347\t12067719\t.\t+\t.\tID=machin3;Name=machin3;bes_start=12067347;bes_end=12067719;bes_size=29360087\n')
        file.close()
        
if __name__ == "__main__":
    unittest.main()