view commons/pyRepetUnit/convCoord/test/Test_PathChunkConnector.py @ 18:94ab73e8a190

Uploaded
author m-zytnicki
date Mon, 29 Apr 2013 03:20:15 -0400
parents
children
line wrap: on
line source

import unittest
import time
import os
from commons.core.sql.TablePathAdaptator import TablePathAdaptator
from commons.pyRepetUnit.convCoord.PathChunkConnector import PathChunkConnector
from commons.core.sql.DbMySql import DbMySql


class Test_PathChunkConnector( unittest.TestCase ):
    
    def setUp(self):
        self._uniqId = "%s_%s" % ( time.strftime("%Y%m%d%H%M%S") , os.getpid() )
        self._configFileName = "dummyConfigFile_%s" % ( self._uniqId )
        configF = open(self._configFileName, "w" )
        configF.write( "[repet_env]\n" )
        configF.write( "repet_host: %s\n" % ( os.environ["REPET_HOST"] ) )
        configF.write( "repet_user: %s\n" % ( os.environ["REPET_USER"] ) )
        configF.write( "repet_pw: %s\n" % ( os.environ["REPET_PW"] ) )
        configF.write( "repet_db: %s\n" % ( os.environ["REPET_DB"] ) )
        configF.write( "repet_port: %s\n" % ( os.environ["REPET_PORT"] ) )
        configF.close()
        self._db = DbMySql( cfgFileName=self._configFileName )
        self._table = "dummyPathTable_%s" % ( self._uniqId )
        self._tpA = TablePathAdaptator( self._db, self._table )
        self._mapDict = {'chunk1': ('dmel_chr4', 1, 100000),'chunk2': ('dmel_chr4', 90001, 190000),'chunk3': ('dmel_chr4', 180001, 280000), }
        self._mapFileName = "map_file.map"
        
    def tearDown(self):
        self._db.close()
        
    def testTwoQueryOverlapsOnPlusStrand (self):
        lines = [
                 "1\tdmel_chr4\t95535\t95570\tsbj2\t125423\t125467\t7e-15\t82\t97.78\n",
                 "2\tdmel_chr4\t95545\t95576\tsbj2\t125457\t133465\t2e-38\t83\t65\n"
                ]
        
        expectedList =[("dmel_chr4", 95535, 95576, "sbj2", 125423, 133465, 2e-38, 83,97.78)]
        self._templateTest(lines, expectedList)
        
    def testTwoQueryOverlapsOnReverseStrand (self):
        lines = [
                 "1\tdmel_chr4\t95535\t95570\tsbj2\t125467\t125423\t7e-15\t82\t97.78\n",
                 "2\tdmel_chr4\t95545\t95576\tsbj2\t133465\t125457\t2e-38\t83\t65\n"
                ]

        expectedList =[("dmel_chr4", 95535, 95576, "sbj2", 133465, 125423, 2e-38, 83,97.78)]
        self._templateTest(lines, expectedList)
        
    def testTwoQueryOverlapsOnDifferentStrands (self):
        lines = [                   
                 "1\tdmel_chr4\t95535\t95570\tsbj2\t125423\t125467\t7e-15\t82\t97.78\n",
                 "2\tdmel_chr4\t95545\t95576\tsbj2\t133465\t125457\t2e-38\t83\t65\n"
                ]
        expectedList =[
                       ("dmel_chr4", 95535, 95570, "sbj2", 125423, 125467, 7e-15, 82,97.78),
                       ("dmel_chr4", 95545, 95576, "sbj2", 133465, 125457, 2e-38, 83,65)
                       ]
        self._templateTest(lines, expectedList)
        
    def _templateTest(self, datas2TestList, expectedList ):
        pathFileName = "dummyPathFile_%s" % ( self._uniqId )
        _MockPathFile(pathFileName, datas2TestList)
        

        self._db.createTable( self._table, "path", pathFileName )

        _MockMapFile(self._mapFileName)
        chunkConnector = PathChunkConnector(self._mapFileName, self._db, self._table, 0)
        chunkConnector.run()
       
        sql_cmd = 'select * from %s' % (self._table)
        self._db.execute(sql_cmd)
        res = self._db.fetchall()
        
        for i in xrange(len(expectedList)):
            resultTuple = res[i]
            expectedTuple = expectedList[i]
            self._assertExpectedTupleEqualsObsTuple(expectedTuple, resultTuple)
        
        self._db.dropTable( self._table )
        os.remove(pathFileName)
        os.remove(self._mapFileName)
        
    def _assertExpectedTupleEqualsObsTuple(self, expectedTuple, resultTuple):
        self.assertEquals(expectedTuple[0], resultTuple[1])
        self.assertEquals(expectedTuple[1], resultTuple[2])
        self.assertEquals(expectedTuple[2], resultTuple[3])
        self.assertEquals(expectedTuple[3], resultTuple[4])
        self.assertEquals(expectedTuple[4], resultTuple[5])
        self.assertEquals(expectedTuple[5], resultTuple[6])
        self.assertEquals(expectedTuple[6], resultTuple[7])
        self.assertEquals(expectedTuple[7], resultTuple[8])
        self.assertEquals(expectedTuple[8], resultTuple[9])
        

class _MockPathFile:
    
    def __init__(self, fileName, lines):
        path = open(fileName, "w");
        for line in lines:
            path.write(line)
        path.close
        
        
class _MockMapFile:
    
    def __init__ (self, fileName):
        map = open(fileName, "w")
        line1 = "chunk1" + '\t'   + "dmel_chr4" + '\t' +   "1" + '\t'+   "100000" + "\n"
        line2 = "chunk2" + '\t'   + "dmel_chr4" + '\t' +   "90001" + '\t'+   "190000" + "\n"
        line3 = "chunk3" + '\t'   + "dmel_chr4" + '\t' +   "180001" + '\t' + "280000" + "\n"  
        map.write(line1)
        map.write(line2)
        map.write(line3)
        map.close
        
        
test_suite = unittest.TestSuite()
test_suite.addTest( unittest.makeSuite( Test_PathChunkConnector ) )
if __name__ == "__main__":
    unittest.TextTestRunner(verbosity=2).run( test_suite )