view commons/core/coord/test/Test_F_ConvCoord.py @ 6:769e306b7933

Change the repository level.
author yufei-luo
date Fri, 18 Jan 2013 04:54:14 -0500
parents
children
line wrap: on
line source

from commons.core.utils.FileUtils import FileUtils
from commons.core.sql.DbFactory import DbFactory
from commons.core.coord.ConvCoord import ConvCoord
import time
import subprocess
import os
import unittest

class Test_F_ConvCoord(unittest.TestCase):
    
    def setUp( self ):
        self._i = ConvCoord()
        self._uniqId = "%s_%s" % ( time.strftime("%Y%m%d%H%M%S") , os.getpid() )
        self._inData = "dummyInData_%s" % ( self._uniqId )
        self._mapData = "dummyMapData_%s" % ( self._uniqId )
        self._expData = "dummyExpData_%s" % ( self._uniqId )
        self._obsData = "dummyObsData_%s" % ( self._uniqId )
        self._iDb = DbFactory.createInstance()
        self._i._iDb = self._iDb
        
    def tearDown( self ):
        self._iDb.close()
        
    def test_run_as_script_alignFile_query( self ):
        configFile = "%s/dummyConfigFile_%s" % ( os.getcwd(), self._uniqId )
        configF = open( configFile, "w" )
        configF.write( "[repet_env]\n" )
        configF.write( "repet_host: %s\n" % ( os.environ["REPET_HOST"] ) )
        configF.write( "repet_user: %s\n" % ( os.environ["REPET_USER"] ) )
        configF.write( "repet_pw: %s\n" % ( os.environ["REPET_PW"] ) )
        configF.write( "repet_db: %s\n" % ( os.environ["REPET_DB"] ) )
        configF.write( "repet_port: %s\n" % ( os.environ["REPET_PORT"] ) )
        configF.close()
        self._writeMapFile( self._mapData )
        
        linesToProcess = [ "chunk1" + "\t" + "21" + "\t" + "37" + "\t" + "TE1" + "\t" + "1" + "\t" + "27" + "\t" + "8e-58" + "\t" + "30" + "\t" + "97.800000" + "\n",  # hit within the 1st chunk
                           "chunk1" + "\t" + "92" + "\t" + "99" + "\t" + "TE1" + "\t" + "1" + "\t" + "8" + "\t" + "8e-58" + "\t" + "11" + "\t" + "97.800000" + "\n",   # hit included within the chunk overlap, on the 1st chunk
                           "chunk2" + "\t" + "2" + "\t" + "9" + "\t" + "TE1" + "\t" + "1" + "\t" + "8" + "\t" + "8e-58" + "\t" + "11" + "\t" + "97.800000" + "\n",       # hit included within the chunk overlap, on the 2nd chunk
                           "chunk2" + "\t" + "51" + "\t" + "58" + "\t" + "TE1" + "\t" + "1" + "\t" + "8" + "\t" + "8e-58" + "\t" + "11" + "\t" + "97.800000" + "\n",     # hit inside the 2nd chunk
                           "chunk2" + "\t" + "51" + "\t" + "70" + "\t" + "TE1" + "\t" + "8" + "\t" + "1" + "\t" + "8e-58" + "\t" + "11" + "\t" + "97.800000" + "\n"     # subject on reverse strand
                           ]
        FileUtils.writeLineListInFile( self._inData, linesToProcess )
        
        refLines = [ "chromosome1" + "\t" + "21" + "\t" + "37" + "\t" + "TE1" + "\t" + "1" + "\t" + "27" + "\t" + "8e-58" + "\t" + "30" + "\t" + "97.800000" + "\n",
                     "chromosome1" + "\t" + "92" + "\t" + "99" + "\t" + "TE1" + "\t" + "1" + "\t" + "8" + "\t" + "8e-58" + "\t" + "11" + "\t" + "97.800000" + "\n",
                     "chromosome1" + "\t" + "141" + "\t" + "148" + "\t" + "TE1" + "\t" + "1" + "\t" + "8" + "\t" + "8e-58" + "\t" + "11" + "\t" + "97.800000" + "\n",
                     "chromosome1" + "\t" + "141" + "\t" + "160" + "\t" + "TE1" + "\t" + "8" + "\t" + "1" + "\t" + "8e-58" + "\t" + "11" + "\t" + "97.800000" + "\n"
                     ]
        FileUtils.writeLineListInFile( self._expData, refLines )
        
        cmd = "ConvCoord.py"
        cmd += " -i %s" % ( self._inData )
        cmd += " -f %s" % ( "align" )
        cmd += " -c %s" % ( "q" )
        cmd += " -m %s" % ( self._mapData )
        cmd += " -o %s" % ( self._obsData )
        cmd += " -C %s" % ( configFile )
        process = subprocess.Popen(cmd, shell = True)
        process.communicate()
        
        self.assertTrue( FileUtils.are2FilesIdentical( self._expData, self._obsData ) )
        
        os.remove( self._inData )
        os.remove(configFile)
        os.remove( self._mapData )
        os.remove( self._expData )
        os.remove( self._obsData )
        
    def test_run_as_script_alignFile_queryAndSubject( self ):
        self._writeMapFile( self._mapData )
        linesToProcess = [ "chunk1" + "\t" + "21" + "\t" + "37" + "\t" + "chunk3" + "\t" + "1" + "\t" + "27" + "\t" + "8e-58" + "\t" + "30" + "\t" + "97.800000" + "\n",  # hit within the 1st chunk
                           "chunk1" + "\t" + "92" + "\t" + "99" + "\t" + "chunk2" + "\t" + "2" + "\t" + "9" + "\t" + "8e-58" + "\t" + "11" + "\t" + "97.800000" + "\n",   # hit included within the chunk overlap, on the 1st chunk
                           "chunk2" + "\t" + "51" + "\t" + "58" + "\t" + "chunk1" + "\t" + "1" + "\t" + "8" + "\t" + "8e-58" + "\t" + "11" + "\t" + "97.800000" + "\n",     # hit inside the 2nd chunk
                           "chunk2" + "\t" + "51" + "\t" + "70" + "\t" + "chunk1" + "\t" + "8" + "\t" + "1" + "\t" + "8e-58" + "\t" + "11" + "\t" + "97.800000" + "\n"     # subject on reverse strand
                           "chunk2" + "\t" + "51" + "\t" + "70" + "\t" + "chunk1" + "\t" + "8" + "\t" + "1" + "\t" + "8e-58" + "\t" + "11" + "\t" + "97.800000" + "\n"     # doublon of previous line
                           ]
        FileUtils.writeLineListInFile( self._inData, linesToProcess )
        
        refLines = [ "chromosome1" + "\t" + "21" + "\t" + "37" + "\t" + "chromosome2" + "\t" + "1" + "\t" + "27" + "\t" + "8e-58" + "\t" + "30" + "\t" + "97.800000" + "\n",
                     "chromosome1" + "\t" + "92" + "\t" + "99" + "\t" + "chromosome1" + "\t" + "92" + "\t" + "99" + "\t" + "8e-58" + "\t" + "11" + "\t" + "97.800000" + "\n",
                     "chromosome1" + "\t" + "141" + "\t" + "148" + "\t" + "chromosome1" + "\t" + "1" + "\t" + "8" + "\t" + "8e-58" + "\t" + "11" + "\t" + "97.800000" + "\n",
                     "chromosome1" + "\t" + "141" + "\t" + "160" + "\t" + "chromosome1" + "\t" + "8" + "\t" + "1" + "\t" + "8e-58" + "\t" + "11" + "\t" + "97.800000" + "\n"
                     ]
        FileUtils.writeLineListInFile( self._expData, refLines )
        
        cmd = "ConvCoord.py"
        cmd += " -i %s" % ( self._inData )
        cmd += " -f %s" % ( "align" )
        cmd += " -c %s" % ( "qs" )
        cmd += " -m %s" % ( self._mapData )
        cmd += " -o %s" % ( self._obsData )
        process = subprocess.Popen(cmd, shell = True)
        process.communicate()
        
        self.assertTrue( FileUtils.are2FilesIdentical( self._expData, self._obsData ) )
        
        os.remove( self._inData )
        self._iDb.dropTable( self._mapData )
        os.remove( self._expData )
        os.remove( self._obsData )
        os.remove( self._mapData )
        
    def test_run_as_script_pathTable_query( self ):
        self._writeMapFile( self._mapData )
        self._iDb.createTable( self._mapData, "map", self._mapData, True )
        os.remove( self._mapData )
        
        linesToProcess = [ "1" + "\t" + "chunk1" + "\t" + "21" + "\t" + "37" + "\t" + "TE1" + "\t" + "1" + "\t" + "27" + "\t" + "8e-58" + "\t" + "30" + "\t" + "97.800000" + "\n",  # hit within the 1st chunk
                           "2" + "\t" + "chunk1" + "\t" + "92" + "\t" + "99" + "\t" + "TE1" + "\t" + "1" + "\t" + "8" + "\t" + "8e-58" + "\t" + "11" + "\t" + "97.800000" + "\n",   # hit included within the chunk overlap, on the 1st chunk
                           "3" + "\t" + "chunk2" + "\t" + "2" + "\t" + "9" + "\t" + "TE1" + "\t" + "1" + "\t" + "8" + "\t" + "8e-58" + "\t" + "11" + "\t" + "97.800000" + "\n",       # hit included within the chunk overlap, on the 2nd chunk
                           "4" + "\t" + "chunk2" + "\t" + "51" + "\t" + "58" + "\t" + "TE1" + "\t" + "1" + "\t" + "8" + "\t" + "8e-58" + "\t" + "11" + "\t" + "97.800000" + "\n",     # hit inside the 2nd chunk
                           "5" + "\t" + "chunk2" + "\t" + "51" + "\t" + "70" + "\t" + "TE1" + "\t" + "8" + "\t" + "1" + "\t" + "8e-58" + "\t" + "11" + "\t" + "97.800000" + "\n"     # subject on reverse strand
                           ]
        FileUtils.writeLineListInFile( self._inData, linesToProcess )
        self._iDb.createTable( self._inData, "path", self._inData, True )
        os.remove( self._inData )
        
        refLines = [ "1" + "\t" + "chromosome1" + "\t" + "21" + "\t" + "37" + "\t" + "TE1" + "\t" + "1" + "\t" + "27" + "\t" + "8e-58" + "\t" + "30" + "\t" + "97.8" + "\n",
                     "2" + "\t" + "chromosome1" + "\t" + "92" + "\t" + "99" + "\t" + "TE1" + "\t" + "1" + "\t" + "8" + "\t" + "8e-58" + "\t" + "11" + "\t" + "97.8" + "\n",
                     "4" + "\t" + "chromosome1" + "\t" + "141" + "\t" + "148" + "\t" + "TE1" + "\t" + "1" + "\t" + "8" + "\t" + "8e-58" + "\t" + "11" + "\t" + "97.8" + "\n",
                     "5" + "\t" + "chromosome1" + "\t" + "141" + "\t" + "160" + "\t" + "TE1" + "\t" + "8" + "\t" + "1" + "\t" + "8e-58" + "\t" + "11" + "\t" + "97.8" + "\n"
                     ]
        FileUtils.writeLineListInFile( self._expData, refLines )
        
        cmd = "ConvCoord.py"
        cmd += " -i %s" % ( self._inData )
        cmd += " -f %s" % ( "path" )
        cmd += " -c %s" % ( "q" )
        cmd += " -m %s" % ( self._mapData )
        cmd += " -o %s" % ( self._obsData )
        process = subprocess.Popen(cmd, shell = True)
        process.communicate()
        
        self._iDb.exportDataToFile( self._obsData, self._obsData )
        self.assertTrue( FileUtils.are2FilesIdentical( self._expData, self._obsData ) )
        
        os.remove( self._obsData )
        os.remove( self._expData )
        self._iDb.dropTable( self._mapData )
        self._iDb.dropTable( self._inData )
        self._iDb.dropTable( self._expData )
        self._iDb.dropTable( self._obsData )
        
    def test_run_as_script_pathTable_query_noMergeChunkOverlaps( self ):
        self._writeMapFile( self._mapData )
        self._iDb.createTable( self._mapData, "map", self._mapData, True )
        os.remove( self._mapData )
        
        linesToProcess = [ "1" + "\t" + "chunk1" + "\t" + "21" + "\t" + "37" + "\t" + "TE1" + "\t" + "1" + "\t" + "27" + "\t" + "8e-58" + "\t" + "30" + "\t" + "97.800000" + "\n",  # hit within the 1st chunk
                           "2" + "\t" + "chunk1" + "\t" + "92" + "\t" + "99" + "\t" + "TE1" + "\t" + "1" + "\t" + "8" + "\t" + "8e-58" + "\t" + "11" + "\t" + "97.800000" + "\n",   # hit included within the chunk overlap, on the 1st chunk
                           "3" + "\t" + "chunk2" + "\t" + "2" + "\t" + "9" + "\t" + "TE1" + "\t" + "1" + "\t" + "8" + "\t" + "8e-58" + "\t" + "11" + "\t" + "97.800000" + "\n",       # hit included within the chunk overlap, on the 2nd chunk
                           "4" + "\t" + "chunk2" + "\t" + "51" + "\t" + "58" + "\t" + "TE1" + "\t" + "1" + "\t" + "8" + "\t" + "8e-58" + "\t" + "11" + "\t" + "97.800000" + "\n",     # hit inside the 2nd chunk
                           "5" + "\t" + "chunk2" + "\t" + "51" + "\t" + "70" + "\t" + "TE1" + "\t" + "8" + "\t" + "1" + "\t" + "8e-58" + "\t" + "11" + "\t" + "97.800000" + "\n"     # subject on reverse strand
                           ]
        FileUtils.writeLineListInFile( self._inData, linesToProcess )
        self._iDb.createTable( self._inData, "path", self._inData, True )
        os.remove( self._inData )
        
        refLines = [ "1" + "\t" + "chromosome1" + "\t" + "21" + "\t" + "37" + "\t" + "TE1" + "\t" + "1" + "\t" + "27" + "\t" + "8e-58" + "\t" + "30" + "\t" + "97.8" + "\n",
                     "2" + "\t" + "chromosome1" + "\t" + "92" + "\t" + "99" + "\t" + "TE1" + "\t" + "1" + "\t" + "8" + "\t" + "8e-58" + "\t" + "11" + "\t" + "97.8" + "\n",
                     "3" + "\t" + "chromosome1" + "\t" + "92" + "\t" + "99" + "\t" + "TE1" + "\t" + "1" + "\t" + "8" + "\t" + "8e-58" + "\t" + "11" + "\t" + "97.8" + "\n",       # hit included within the chunk overlap, on the 2nd chunk
                     "4" + "\t" + "chromosome1" + "\t" + "141" + "\t" + "148" + "\t" + "TE1" + "\t" + "1" + "\t" + "8" + "\t" + "8e-58" + "\t" + "11" + "\t" + "97.8" + "\n",
                     "5" + "\t" + "chromosome1" + "\t" + "141" + "\t" + "160" + "\t" + "TE1" + "\t" + "8" + "\t" + "1" + "\t" + "8e-58" + "\t" + "11" + "\t" + "97.8" + "\n"
                     ]
        FileUtils.writeLineListInFile( self._expData, refLines )
        
        cmd = "ConvCoord.py"
        cmd += " -i %s" % ( self._inData )
        cmd += " -f %s" % ( "path" )
        cmd += " -c %s" % ( "q" )
        cmd += " -m %s" % ( self._mapData )
        cmd += " -M %s" % ( "no" )
        cmd += " -o %s" % ( self._obsData )
        process = subprocess.Popen(cmd, shell = True)
        process.communicate()
        
        self._iDb.exportDataToFile( self._obsData, self._obsData )
        self.assertTrue( FileUtils.are2FilesIdentical( self._expData, self._obsData ) )
        
        os.remove( self._obsData )
        os.remove( self._expData )
        self._iDb.dropTable( self._mapData )
        self._iDb.dropTable( self._inData )
        self._iDb.dropTable( self._expData )
        self._iDb.dropTable( self._obsData )

    def test_run(self):
        inFileName = "DmelChr4_chk.align.not_over.filtered"
        expFileName = "%s/Tools/DmelChr4_chr.align.not_over.filtered" % os.environ["REPET_DATA"]
        obsFileName = "obs.align"
        os.symlink("%s/Tools/%s" % (os.environ["REPET_DATA"], inFileName), inFileName)
        iConvCoord = ConvCoord()
        iConvCoord.setInputData(inFileName)
        iConvCoord.setMapData("%s/Tools/DmelChr4_chunks.map" % os.environ["REPET_DATA"])
        iConvCoord.setCoordinatesToConvert("qs")
        iConvCoord.setMergeChunkOverlaps(False)
        iConvCoord.setOutputData(obsFileName)
        iConvCoord.run()
        
        self.assertTrue(FileUtils.are2FilesIdentical(expFileName, obsFileName))
        
        os.remove(inFileName)
        os.remove(obsFileName)
        
    def _writeMapFile( self, mapFile ):
        mapF = open( mapFile, "w" )
        mapF.write( "chunk1\tchromosome1\t1\t100\n" )
        mapF.write( "chunk2\tchromosome1\t91\t190\n" )
        mapF.write( "chunk3\tchromosome2\t1\t100\n" )
        mapF.close()

if __name__ == "__main__":
    unittest.main()