view smart_toolShed/commons/core/coord/test/Test_Align.py @ 0:e0f8dcca02ed

Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author yufei-luo
date Thu, 17 Jan 2013 10:52:14 -0500
parents
children
line wrap: on
line source

# Copyright INRA (Institut National de la Recherche Agronomique)
# http://www.inra.fr
# http://urgi.versailles.inra.fr
#
# This software is governed by the CeCILL license under French law and
# abiding by the rules of distribution of free software.  You can  use, 
# modify and/ or redistribute the software under the terms of the CeCILL
# license as circulated by CEA, CNRS and INRIA at the following URL
# "http://www.cecill.info". 
#
# As a counterpart to the access to the source code and  rights to copy,
# modify and redistribute granted by the license, users are provided only
# with a limited warranty  and the software's author,  the holder of the
# economic rights,  and the successive licensors  have only  limited
# liability. 
#
# In this respect, the user's attention is drawn to the risks associated
# with loading,  using,  modifying and/or developing or reproducing the
# software by the user in light of its specific status of free software,
# that may mean  that it is complicated to manipulate,  and  that  also
# therefore means  that it is reserved for developers  and  experienced
# professionals having in-depth computer knowledge. Users are therefore
# encouraged to load and test the software's suitability as regards their
# requirements in conditions enabling the security of their systems and/or 
# data to be ensured and,  more generally, to use and operate it in the 
# same conditions as regards security. 
#
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license and that you accept its terms.


import unittest
import os
import time
from commons.core.coord.Align import Align
from commons.core.coord.Map import Map
from commons.core.utils.FileUtils import FileUtils
from commons.core.coord.Range import Range


class Test_Align( unittest.TestCase ):
    
    def setUp(self):
        self._align = Align()
        self._uniqId = "%s_%s" % ( time.strftime("%Y%m%d%H%M%S"), os.getpid() )
        
    def tearDown(self):
        self._align = None
        
    def test_isEmpty_True(self):
        alignInstance = Align()
        
        self.assertTrue(alignInstance.isEmpty())
        
    def test_isEmpty_True_query_is_empty(self):
        alignInstance = Align()
        line = "\t-1\t-1\tTE2\t3\t10\t1e-20\t30\t90.2\n"
        alignInstance.setFromString(line)
        
        self.assertTrue(alignInstance.isEmpty())
        
    def test_isEmpty_True_subject_is_empty(self):
        alignInstance = Align()
        line = "chr1\t2\t20\t\t-1\t-1\t1e-20\t30\t90.2\n"
        alignInstance.setFromString(line)
        
        self.assertTrue(alignInstance.isEmpty())
        
    def test_isEmpty_False(self):
        alignInstance = Align()
        line = "chr1\t2\t20\tTE2\t3\t10\t1e-20\t30\t90.2\n"
        alignInstance.setFromString(line)
        
        self.assertFalse(alignInstance.isEmpty())
        
    def test_read(self):
        line = "chr2\t1\t10\tTE3\t11\t17\t1e-20\t30\t90.2\n"
        expReturn = 1

        dummyMockAlignFile = "dummyMockAlignFile"
        mockAlignFileHandle = open(dummyMockAlignFile, "w")
        mockAlignFileHandle.write(line)
        mockAlignFileHandle.close()
        
        expAlignInstance = Align()
        expAlignInstance.setFromString(line)

        mockAlignFileHandle = open(dummyMockAlignFile, "r")
        obsAlignInstance = Align()
        obsReturn = obsAlignInstance.read(mockAlignFileHandle)
        
        mockAlignFileHandle.close()
        os.remove(dummyMockAlignFile)   
        
        self.assertEquals(expAlignInstance, obsAlignInstance)    
        self.assertEquals(expReturn, obsReturn)    
        
    def test_read_empty_file(self):
        expReturn = 0
         
        dummyMockAlignFile = "dummyMockAlignFile"
        mockAlignFileHandle = open(dummyMockAlignFile, "w")
        mockAlignFileHandle.close()
       
        mockAlignFileHandle = open(dummyMockAlignFile, "r")
        obsAlignInstance = Align()
        obsReturn = obsAlignInstance.read(mockAlignFileHandle)
        os.remove(dummyMockAlignFile)   
        
        self.assertEquals(expReturn, obsReturn)
        
    def test_write (self):
        expAlignFile = "expMockAlignFile"
        expAlignFileHandle = open(expAlignFile, "w")
        expLine = "chr1\t1\t10\tTE2\t3\t10\t0\t30\t90.200000\n"
        expAlignFileHandle.write(expLine)
        expAlignFileHandle.close()
       
        obsAlignFile = "obsAlignFile"
        obsAlignFileHandle = open(obsAlignFile, "w")
        obsAlignInstance = Align()
        obsAlignTuple = ("chr1", 1, 10, "TE2", 3, 10, 0.0, 30, 90.2)
        obsAlignInstance.setFromTuple(obsAlignTuple) 
        obsAlignInstance.write(obsAlignFileHandle)
        obsAlignFileHandle.close()   
        
        self.assertTrue( FileUtils.are2FilesIdentical( expAlignFile, obsAlignFile ) )
        os.remove(expAlignFile)
        os.remove(obsAlignFile)
        
    def test_merge (self):
        alignInstanceChr1 = Align()
        alignInstanceChr2 = Align()

        line1 = "chr1\t1\t10\tTE2\t3\t10\t1e-20\t30\t90.2\n"
        line2 = "chr2\t1\t10\tTE2\t11\t17\t1e-20\t30\t90.2\n"

        alignInstanceChr1.setFromString(line1)
        alignInstanceChr2.setFromString(line2)
        
        expResult = None
        obsResult = alignInstanceChr1.merge(alignInstanceChr2)
        
        self.assertEquals(expResult, obsResult)
        
        line1 = "chr1\t1\t10\tTE2\t3\t10\t1e-20\t30\t90.2\n"
        line2 = "chr1\t1\t10\tTE3\t11\t17\t1e-20\t30\t90.2\n"

        alignInstanceTE2 = Align()
        alignInstanceTE3 = Align()
 
        alignInstanceTE2.setFromString(line1)
        alignInstanceTE3.setFromString(line2)
        
        expResult = None
        obsResult = alignInstanceTE2.merge(alignInstanceTE3)
        
        self.assertEquals(expResult, obsResult)
        
    def test_merge_plus_strand1 (self):
        alignInstance1 = Align()
        alignInstance2 = Align()

        line1 = "chr1\t2\t20\tTE2\t3\t10\t1e-20\t30\t90.2\n"
        line2 = "chr1\t1\t10\tTE2\t1\t9\t1e-20\t30\t90.2\n"

        alignInstance1.setFromString(line1)
        alignInstance2.setFromString(line2)
        
        expLine = "chr1\t1\t20\tTE2\t1\t10\t1e-20\t30\t90.2\n"
        expAlign = Align()
        expAlign.setFromString(expLine)

        alignInstance1.merge(alignInstance2)
        obsAlign = alignInstance1

        self.assertEquals(expAlign, obsAlign)
        
    def test_merge_plus_strand2 (self ):
        alignInstance1 = Align()
        alignInstance2 = Align()

        line1 = "chr1\t2\t20\tTE2\t3\t10\t1e-20\t30\t90.2\n"
        line2 = "chr1\t4\t30\tTE2\t4\t12\t1e-20\t30\t90.2\n"

        alignInstance1.setFromString(line1)
        alignInstance2.setFromString(line2)
        
        expLine = "chr1\t2\t30\tTE2\t3\t12\t1e-20\t30\t90.2\n"
        expAlign = Align()
        expAlign.setFromString(expLine)

        alignInstance1.merge(alignInstance2)
        obsAlign = alignInstance1

        self.assertEquals(expAlign, obsAlign)
        
    def test_merge_plus_strand3 (self ):
        alignInstance1 = Align()
        alignInstance2 = Align()

        line1 = "chr1\t2\t10\tTE2\t3\t10\t1e-20\t30\t90.2\n"
        line2 = "chr1\t1\t20\tTE2\t1\t12\t1e-20\t30\t90.2\n"

        alignInstance1.setFromString(line1)
        alignInstance2.setFromString(line2)
        
        expLine = "chr1\t1\t20\tTE2\t1\t12\t1e-20\t30\t90.2\n"
        expAlign = Align()
        expAlign.setFromString(expLine)

        alignInstance1.merge(alignInstance2)
        obsAlign = alignInstance1

        self.assertEquals(expAlign, obsAlign)
        
    def test_merge_plus_strand4 (self ):
        alignInstance1 = Align()
        alignInstance2 = Align()

        line1 = "chr1\t1\t20\tTE2\t1\t12\t1e-20\t30\t90.2\n"
        line2 = "chr1\t2\t10\tTE2\t2\t10\t1e-20\t30\t90.2\n"

        alignInstance1.setFromString(line1)
        alignInstance2.setFromString(line2)
        
        expLine = "chr1\t1\t20\tTE2\t1\t12\t1e-20\t30\t90.2\n"
        expAlign = Align()
        expAlign.setFromString(expLine)

        alignInstance1.merge(alignInstance2)
        obsAlign = alignInstance1

        self.assertEquals(expAlign, obsAlign)
        
    def test_merge__neg_strand1 (self):

        rangeQuery1 = Range("chr1", 20, 2); rangeSubject1 = Range("TE2", 10, 3)
        rangeQuery2 = Range("chr1", 1, 10); rangeSubject2 = Range("TE2", 1, 9)

        alignInstance1 = Align(rangeQuery1, rangeSubject1, 0, 30, 90.2)
        alignInstance2 = Align(rangeQuery2, rangeSubject2, 0, 30, 90.2)
        
        expRangeQuery = Range("chr1", 20, 1); expRangeSubject = Range("TE2", 10, 1)
        expAlign = Align(expRangeQuery, expRangeSubject, 0, 30, 90.2)

        alignInstance1.merge(alignInstance2)
        obsAlign = alignInstance1
        
        self.assertEquals(expAlign, obsAlign)
        
    def test_merge__neg_strand2 (self):
        rangeQuery1 = Range("chr1", 20, 2); rangeSubject1 = Range("TE2", 10, 3)
        rangeQuery2 = Range("chr1", 4, 30); rangeSubject2 = Range("TE2", 4, 12)

        alignInstance1 = Align(rangeQuery1, rangeSubject1, 0, 30, 90.2)
        alignInstance2 = Align(rangeQuery2, rangeSubject2, 0, 30, 90.2)
        
        expRangeQuery = Range("chr1", 30, 2); expRangeSubject = Range("TE2", 12, 3)
        expAlign = Align(expRangeQuery, expRangeSubject, 0, 30, 90.2)

        alignInstance1.merge(alignInstance2)
        obsAlign = alignInstance1

        self.assertEquals(expAlign, obsAlign)
        
    def test_merge_neg_strand3 (self ):
        rangeQuery1 = Range("chr1", 10, 2); rangeSubject1 = Range("TE2", 10, 3)
        rangeQuery2 = Range("chr1", 1, 20); rangeSubject2 = Range("TE2", 1, 12)

        alignInstance1 = Align(rangeQuery1, rangeSubject1, 0, 30, 90.2)
        alignInstance2 = Align(rangeQuery2, rangeSubject2, 0, 30, 90.2)
       
        expRangeQuery = Range("chr1", 20, 1); expRangeSubject = Range("TE2", 12, 1)
        expAlign = Align(expRangeQuery, expRangeSubject, 0, 30, 90.2)

        alignInstance1.merge(alignInstance2)
        obsAlign = alignInstance1

        self.assertEquals(expAlign, obsAlign)
        
    def test_merge_neg_strand4 (self ):
        rangeQuery1 = Range("chr1", 20, 1); rangeSubject1 = Range("TE2", 12, 1)
        rangeQuery2 = Range("chr1", 2, 10); rangeSubject2 = Range("TE2", 2, 10)

        alignInstance1 = Align(rangeQuery1, rangeSubject1, 0, 30, 90.2)
        alignInstance2 = Align(rangeQuery2, rangeSubject2, 0, 30, 90.2)
        
        expRangeQuery = Range("chr1", 20, 1); expRangeSubject = Range("TE2", 12, 1)
        expAlign = Align(expRangeQuery, expRangeSubject, 0, 30, 90.2)

        alignInstance1.merge(alignInstance2)
        obsAlign = alignInstance1

        self.assertEquals(expAlign, obsAlign)
        
    def test_merge_id_score_identity_eValue (self):
        rangeQuery1 = Range("chr1", 20, 1); rangeSubject1 = Range("TE2", 12, 1)
        rangeQuery2 = Range("chr1", 2, 10); rangeSubject2 = Range("TE2", 2, 10)

        alignInstance1 = Align(rangeQuery1, rangeSubject1, 0.05, 20, 90.2)
        alignInstance1.id = 1
        alignInstance2 = Align(rangeQuery2, rangeSubject2, 0, 30, 90.3)
        alignInstance2.id = 2

        expRangeQuery = Range("chr1", 20, 1); expRangeSubject = Range("TE2", 12, 1)
        expAlign = Align(expRangeQuery, expRangeSubject, 0, 30, 90.3)
        expAlign.id = 1
        
        alignInstance1.merge(alignInstance2)
        obsAlign = alignInstance1

        self.assertEquals(expAlign, obsAlign)
        self.assertEquals(expAlign.id, obsAlign.id)
        
    def test_setFromTuple_QryRev(self):
        self._align.setFromTuple( ( "qry1", 100, 1, "sbj1", 201, 300, 0.0, 135, 97.2 ) )
        self.assertEqual( self._align.range_query.seqname, "qry1" )
        self.assertEqual( self._align.range_query.start, 1 )
        self.assertEqual( self._align.range_query.end, 100 )
        self.assertEqual( self._align.range_subject.seqname, "sbj1" )
        self.assertEqual( self._align.range_subject.start, 300 )
        self.assertEqual( self._align.range_subject.end, 201 )
        self.assertEqual( self._align.e_value, 0.0 )
        self.assertEqual( self._align.score, 135 )
        self.assertEquals( self._align.identity, 97.2 )
        
    def test_setFromTuple_identityAsFloat(self):
        self._align.setFromTuple( ( "qry1", "301", "600", "sbj1", "1", "300", "0.0", "135", 0.0) )
        self.assertEqual( self._align.range_query.seqname, "qry1" )
        self.assertEqual( self._align.range_query.start, 301 )
        self.assertEqual( self._align.range_query.end, 600 )
        self.assertEqual( self._align.range_subject.seqname, "sbj1" )
        self.assertEqual( self._align.range_subject.start, 1 )
        self.assertEqual( self._align.range_subject.end, 300 )
        self.assertEqual( self._align.e_value, float("0.0") )
        self.assertEqual( self._align.score, float("135") )
        self.assertEquals( self._align.identity, 0.0 )
        
    def test_setFromString(self):
        line = "chr1\t1\t10\tTE2\t11\t17\t1e-20\t30\t90.2\n"
        self._align.setFromString( line )
        self.assertEqual( self._align.range_query.seqname, "chr1" )
        self.assertEqual( self._align.range_query.start, 1 )
        self.assertEqual( self._align.range_query.end, 10 )
        self.assertEqual( self._align.range_subject.seqname, "TE2" )
        self.assertEqual( self._align.range_subject.start, 11 )
        self.assertEqual( self._align.range_subject.end, 17 )
        self.assertEqual( self._align.e_value, float("1e-20") )
        self.assertEqual( self._align.score, float("30") )
        self.assertEquals( float(self._align.identity), float("90.2") )
        
    def test__eq__(self):
        self._align.setFromString( "chr1\t1\t6\tTE2\t11\t16\t1e-20\t30\t90.2\n" )
        o = Align()
        o.setFromString( "chr1\t1\t6\tTE2\t11\t16\t1e-20\t30\t90.2\n" )
        self.assertEqual( self._align,  o )
        o.setFromString( "chromosome1\t1\t6\tTE2\t11\t16\t1e-20\t30\t90.2\n" )
        self.assertNotEqual( self._align, o )
        o.setFromString( "chr1\t1\t6\ttranspElem2\t11\t16\t1e-20\t30\t90.2\n" )
        self.assertNotEqual( self._align, o )
        o.setFromString( "chr1\t100\t600\tTE2\t11\t16\t1e-20\t30\t90.2\n" )
        self.assertNotEqual( self._align, o )
        o.setFromString( "chr1\t1\t6\tTE2\t1100\t1600\t1e-20\t30\t90.2\n" )
        self.assertNotEqual( self._align, o )
        o.setFromString( "chr1\t1\t6\tTE2\t11\t16\t1e-20\t30000\t90.2\n" )
        self.assertNotEqual( self._align, o )
        
    def test_getSubjectAsMapOfQuery_direct(self):
        exp = Map( name="TE2", seqname="chr1", start=1, end=6 )
        self._align.setFromString( "chr1\t1\t6\tTE2\t11\t16\t1e-20\t30\t90.2\n" )
        obs = self._align.getSubjectAsMapOfQuery()
        self.assertEqual( obs, exp )
        
    def test_getSubjectAsMapOfQuery_reverse(self):
        exp = Map( name="TE2", seqname="chr1", start=6, end=1 )
        self._align.setFromString( "chr1\t1\t6\tTE2\t16\t11\t1e-20\t30\t90.2\n" )
        obs = self._align.getSubjectAsMapOfQuery()
        self.assertEqual( obs, exp )
        
    def test_writeSubjectAsMapOfQuery( self ):
        self._align.setFromTuple( ( "chr3", "250", "151", "seq5", "1", "100", "1e-32", "147", "87.9" ) )
        expFile = "dummyExpFile_%s" % ( self._uniqId )
        expFileHandler = open( expFile, "w" )
        expFileHandler.write( "seq5\tchr3\t250\t151\n" )
        expFileHandler.close()
        obsFile = "dummyObsFile_%s" % ( self._uniqId )
        obsFileHandler = open( obsFile, "w" )
        self._align.writeSubjectAsMapOfQuery( obsFileHandler )
        obsFileHandler.close()
        self.assertTrue( FileUtils.are2FilesIdentical( expFile, obsFile ) )
        for f in [ expFile, obsFile ]:
            if os.path.exists( f ):
                os.remove( f )
                
    def test_areQrySbjOnOppositeStrands(self):
        self._align.setFromTuple( ( "qry1", "1", "100", "sbj1", "1", "100", "0.0", "135", "95.7" ) )
        obs = self._align.areQrySbjOnOppositeStrands()
        self.assertFalse( obs )
        self._align.setFromTuple( ( "qry1", "600", "301", "sbj1", "1", "300", "0.0", "135", "95.7" ) )
        obs = self._align.areQrySbjOnOppositeStrands()
        self.assertTrue( obs )
        
    def test_reverse(self):
        line = "chr1\t1\t10\tTE2\t11\t17\t1e-20\t30\t90.2"
        expLine = "chr1\t10\t1\tTE2\t17\t11\t1e-20\t30\t90.200000"
        
        obsAlignInstance = Align()
        obsAlignInstance.setFromString(line)
        obsAlignInstance.reverse()
        obsLine = obsAlignInstance.toString()

        self.assertEquals(expLine, obsLine)
        
    def test_getMapsOfQueryAndSubject(self):
        self._align.setFromTuple( ( "qry1", "1", "100", "sbj1", "1", "100", "0.0", "135", "95.7" ) )
        
        expMapQuery = Map()
        expMapQuery.setFromTuple( ( "repet", "qry1", "1", "100" ) )
        expMapSubject = Map()
        expMapSubject.setFromTuple( ( "repet", "sbj1", "1", "100" ) )
        
        obsMapQuery, obsMapSubject = self._align.getMapsOfQueryAndSubject()
        
        self.assertEqual( expMapQuery, obsMapQuery )
        self.assertEqual( expMapSubject, obsMapSubject )
        
    def test_getBin_bin_level_9(self):
        tuple = ("chr1","190000000","390000000","TE2","11","17","1e-20","30","90.2")
        self._align.setFromTuple(tuple)
        expRes = 100000000.0
        obsRes = self._align.getBin()
        self.assertEquals(expRes, obsRes)

    def test_getBin_bin_level_8(self):
        tuple = ("chr1","19000000","39000000","TE2","11","17","1e-20","30","90.2")
        self._align.setFromTuple(tuple)
        expRes = 100000000.0
        obsRes = self._align.getBin()
        self.assertEquals(expRes, obsRes)
        
    def test_getBin_bin_level_7(self):
        tuple = ("chr1","1900000","3900000","TE2","11","17","1e-20","30","90.2")
        self._align.setFromTuple(tuple)
        expRes = 10000000.0
        obsRes = self._align.getBin()
        self.assertEquals(expRes, obsRes)
        
    def test_getBin_bin_level_6(self):
        tuple = ("chr1","190000","390000","TE2","11","17","1e-20","30","90.2")
        self._align.setFromTuple(tuple)
        obsRes = self._align.getBin()
        expRes = 1000000.0
        self.assertEquals(expRes, obsRes)
        
    def test_getBin_bin_level_5(self):
        tuple = ("chr1","19000","39000","TE2","11","17","1e-20","30","90.2")
        self._align.setFromTuple(tuple)
        obsRes = self._align.getBin()
        expRes = 100000.0
        self.assertEquals(expRes, obsRes)
        
    def test_getBin_bin_level_4(self):
        tuple = ("chr1","1900","3900","TE2","11","17","1e-20","30","90.2")
        self._align.setFromTuple(tuple)
        obsRes = self._align.getBin()
        expRes = 10000.0
        self.assertEquals(expRes, obsRes)
        
    def test_getBin_bin_level_3(self):
        tuple = ("chr1","190","390","TE2","11","17","1e-20","30","90.2")
        self._align.setFromTuple(tuple)
        obsRes = self._align.getBin()
        expRes = 1000.0
        self.assertEquals(expRes, obsRes)
        
    def test_getBin_bin_level_2(self):
        tuple = ("chr1","19","39","TE2","11","17","1e-20","30","90.2")
        self._align.setFromTuple(tuple)
        obsRes = self._align.getBin()
        expRes = 1000.0
        self.assertEquals(expRes, obsRes)
        
    def test_getBin_bin_level_1(self):
        tuple = ("chr1","1","3","TE2","11","17","1e-20","30","90.2")
        self._align.setFromTuple(tuple)
        obsRes = self._align.getBin()
        expRes = 1000.0
        self.assertEquals(expRes, obsRes)
        
        
    def test_switchQuerySubject_directS( self ):
        tuple = ("chr1","1","3","TE2","11","17","1e-20","30","90.2")
        self._align.setFromTuple( tuple )
        exp = Align( Range("TE2","11","17"), Range("chr1","1","3"), "1e-20", "30", "90.2" )
        self._align.switchQuerySubject()
        self.assertEquals( exp, self._align )
        
        
    def test_switchQuerySubject_reverseS( self ):
        tuple = ("chr1","1","3","TE2","17","11","1e-20","30","90.2")
        self._align.setFromTuple( tuple )
        exp = Align( Range("TE2","11","17"), Range("chr1","3","1"), "1e-20", "30", "90.2" )
        self._align.switchQuerySubject()
        self.assertEquals( exp, self._align )
        
        
    def test_toStringAsGff( self ):
        self._align.setFromString( "chr1\t1\t10\tTE3\t11\t17\t1e-20\t30\t85.2\n" )
        exp = "chr1\tREPET\tmatch\t1\t10\t1e-20\t+\t.\tID=23;Target=TE3 11 17"
        obs = self._align.toStringAsGff( ID="23" )
        self.assertEqual( obs, exp )
        
        
test_suite = unittest.TestSuite()
test_suite.addTest( unittest.makeSuite( Test_Align ) )
if __name__ == "__main__":
    unittest.TextTestRunner(verbosity=2).run( test_suite )