view commons/core/sql/test/Test_TableBinSetAdaptator.py @ 31:0ab839023fe4

Uploaded
author m-zytnicki
date Tue, 30 Apr 2013 14:33:21 -0400
parents 769e306b7933
children
line wrap: on
line source

import unittest
import os
import time
from commons.core.sql.TableBinSetAdaptator import TableBinSetAdaptator
from commons.core.coord.Set import Set
from commons.core.sql.DbFactory import DbFactory

class Test_TableBinSetAdaptator(unittest.TestCase):

    def setUp(self):
        self._uniqId = "%s_%s" % (time.strftime("%Y%m%d%H%M%S") , os.getpid())
        self._iDb = DbFactory.createInstance()
        radicalTableName = "dummySetTable"
        self._tableName = "%s_%s" % (radicalTableName, self._uniqId)
        self._tableName_bin = "%s_idx" % self._tableName
        self._setFileName = "dummySetFile_%s" % self._uniqId
        setF = open( self._setFileName, "w" )
        setF.write("1\tseq1\tchr1\t1900\t3900\n")
        setF.write("2\tseq2\tchr1\t2\t9\n")
        setF.write("3\tseq3\tchr1\t8\t13\n")
        setF.close()
        self._iDb.createTable(self._tableName, "set", self._setFileName)
        self._iTableBinSetAdaptator = TableBinSetAdaptator(self._iDb, self._tableName)
       
    def tearDown(self):
        self._iDb.dropTable( self._tableName )
        self._iDb.dropTable( self._tableName_bin )
        self._iDb.close()
        if os.path.exists(self._setFileName):
            os.remove(self._setFileName)
        
    def test_insASetInSetAndBinTable(self):
        iSet = Set(1, "set1", "seq1", 2, 1)
        self._iDb.createBinSetTable(self._tableName, True)
        self._iTableBinSetAdaptator.insASetInSetAndBinTable(iSet)
        expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L), (1L, 1000.0, 'seq1', 1L, 2L, 0L))
        sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
        self._iDb.execute( sqlCmd )
        obsTupleInBinTable = self._iDb.cursor.fetchall()
        self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
        expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L), (1L, 'set1', 'seq1', 2L, 1L))
        sqlCmd = "SELECT * FROM %s" % ( self._tableName )
        self._iDb.execute( sqlCmd )
        obsTupleInSetTable = self._iDb.cursor.fetchall()
        self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
        
    def test_insASetInSetAndBinTable_delayedCase(self):
        iSet = Set(1, "set1", "seq1", 2, 1)
        self._iDb.createBinSetTable(self._tableName, True)
        self._iTableBinSetAdaptator.insASetInSetAndBinTable(iSet, True)
        expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L), (1L, 1000.0, 'seq1', 1L, 2L, 0L))
        sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
        self._iDb.execute( sqlCmd )
        obsTupleInBinTable = self._iDb.cursor.fetchall()
        self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
        expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L), (1L, 'set1', 'seq1', 2L, 1L))
        sqlCmd = "SELECT * FROM %s" % ( self._tableName )
        self._iDb.execute( sqlCmd )
        obsTupleInSetTable = self._iDb.cursor.fetchall()
        self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
        
    def test_deleteFromIdFromSetAndBinTable(self):
        self._iDb.createBinSetTable(self._tableName, True)
        self._iTableBinSetAdaptator.deleteFromIdFromSetAndBinTable(2)
        expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L))
        sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
        self._iDb.execute( sqlCmd )
        obsTupleInBinTable = self._iDb.cursor.fetchall()
        self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
        expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (3L, 'seq3', 'chr1', 8L, 13L))
        sqlCmd = "SELECT * FROM %s" % ( self._tableName )
        self._iDb.execute( sqlCmd )
        obsTupleInSetTable = self._iDb.cursor.fetchall()
        self.assertEquals(expTupleInSetTable, obsTupleInSetTable)

    def test_deleteFromListIdFromSetAndBinTable(self):
        lSetToRemove = [1,2]
        self._iDb.createBinSetTable(self._tableName, True)
        self._iTableBinSetAdaptator.deleteFromListIdFromSetAndBinTable(lSetToRemove)
        expTupleInBinTable = ((3L, 1000.0, 'chr1', 8L, 13L, 1L),)
        sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
        self._iDb.execute( sqlCmd )
        obsTupleInBinTable = self._iDb.cursor.fetchall()
        self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
        expTupleInSetTable = ((3L, 'seq3', 'chr1', 8L, 13L),)
        sqlCmd = "SELECT * FROM %s" % ( self._tableName )
        self._iDb.execute( sqlCmd )
        obsTupleInSetTable = self._iDb.cursor.fetchall()
        self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
        os.remove(self._setFileName)

    def test_joinTwoSetsFromSetAndBinTable(self):
        id1 = 1
        id2 = 2
        self._iDb.createBinSetTable(self._tableName, True)
        obsNewId = self._iTableBinSetAdaptator.joinTwoSetsFromSetAndBinTable(id1, id2)
        expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (1L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L))
        expNewId = 1
        sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
        self._iDb.execute( sqlCmd )
        obsTupleInBinTable = self._iDb.cursor.fetchall()
        self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
        expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (1L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L))
        sqlCmd = "SELECT * FROM %s" % ( self._tableName )
        self._iDb.execute( sqlCmd )
        obsTupleInSetTable = self._iDb.cursor.fetchall()
        self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
        self.assertEquals(expNewId, obsNewId)
        
    def test_joinTwoSetsFromSetAndBinTable_with_reversed_id(self):
        id1 = 2
        id2 = 1
        self._iDb.createBinSetTable(self._tableName, True)
        obsNewId = self._iTableBinSetAdaptator.joinTwoSetsFromSetAndBinTable(id1, id2)
        expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (1L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L))
        expNewId = 1
        sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
        self._iDb.execute( sqlCmd )
        obsTupleInBinTable = self._iDb.cursor.fetchall()
        self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
        expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (1L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L))
        sqlCmd = "SELECT * FROM %s" % ( self._tableName )
        self._iDb.execute( sqlCmd )
        obsTupleInSetTable = self._iDb.cursor.fetchall()
        self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
        self.assertEquals(expNewId, obsNewId)
        
    def test_getNewId(self):   
        self._iDb.createBinSetTable(self._tableName, True)
        obsNewId = self._iTableBinSetAdaptator.getNewId()
        expNewId = 4
        self.assertEquals(expNewId, obsNewId)
        
    def test_getNewId_empty_table(self):
        self._iDb.dropTable( self._tableName )
        self._iDb.dropTable( self._tableName_bin )
        setF = open( self._setFileName, "w" )
        setF.close()
        self._iDb.createTable( self._tableName, "set", self._setFileName )
        self._iDb.createBinSetTable(self._tableName, True)
        obsNewId = self._iTableBinSetAdaptator.getNewId()
        expNewId = 1
        self.assertEquals(expNewId, obsNewId)
        
    def test_getSetListFromQueryCoord(self):
        start = 10
        end = 4000
        seqName = 'chr1'
        self._iDb.createBinSetTable(self._tableName, True)
        obsLSet = self._iTableBinSetAdaptator.getSetListFromQueryCoord(seqName, start, end)
        iSet1 = Set(1, "seq1", "chr1", 1900, 3900)
        iSet2 = Set(3, "seq3", "chr1", 8, 13)
        expLSet = [iSet1, iSet2]
        self.assertEquals(expLSet, obsLSet)
        
    def test_getSetListFromQueryCoord_return_empty_list(self):
        start = 4000
        end = 40000
        seqName = 'chr1'
        self._iDb.createBinSetTable(self._tableName, True)
        obsLSet = self._iTableBinSetAdaptator.getSetListFromQueryCoord(seqName, start, end)
        expLSet = []
        self.assertEquals(expLSet, obsLSet)
        
    def test_getSetListStrictlyIncludedInQueryCoord(self):
        start = 10
        end = 4000
        seqName = 'chr1'
        self._iDb.createBinSetTable(self._tableName, True)
        obsLSet = self._iTableBinSetAdaptator.getSetListStrictlyIncludedInQueryCoord(seqName, start, end)
        iSet1 = Set(1, "seq1", "chr1", 1900, 3900)
        expLSet = [iSet1]
        self.assertEquals(expLSet, obsLSet)
        
    def test_getSetListStrictlyIncludedInQueryCoord_return_empty_list(self):
        start = 4000
        end = 40000
        seqName = 'chr1'
        self._iDb.createBinSetTable(self._tableName, True)
        obsLSet = self._iTableBinSetAdaptator.getSetListStrictlyIncludedInQueryCoord(seqName, start, end)
        expLSet = []
        self.assertEquals(expLSet, obsLSet)
        
    def test_getIdList(self):
        expLId = [1,2,3]
        self._iDb.createBinSetTable(self._tableName, True)
        obsLId = self._iTableBinSetAdaptator.getIdList()
        self.assertEquals(expLId, obsLId)
        
    def test_getSeqNameList(self):
        self._iDb.dropTable( self._tableName )
        self._iDb.dropTable( self._tableName_bin )
        setF = open( self._setFileName, "w" )
        setF.write("1\tseq1\tchr2\t1900\t3900\n")
        setF.write("2\tseq2\tchr1\t2\t9\n")
        setF.write("3\tseq3\tchr1\t8\t13\n")
        setF.close()
        self._iDb.createTable( self._tableName, "set", self._setFileName )
        self._iDb.createBinSetTable(self._tableName, True)
        expLSeqName = ["chr1", "chr2"]
        obsLSeqName = self._iTableBinSetAdaptator.getSeqNameList()
        self.assertEquals(expLSeqName, obsLSeqName)
        
    def test_insertListInSetAndBinTable(self):
        iSet1 = Set(1, "seq4", "chr1", 100, 390)
        iSet2 = Set(2, "seq5", "chr1", 1, 13)
        lSet = [iSet1, iSet2]
        self._iDb.createBinSetTable(self._tableName, True)
        self._iTableBinSetAdaptator.insertListInSetAndBinTable(lSet)
        expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L), (4L, 1000.0, 'chr1', 100L, 390L, 1L), (4L, 1000.0, 'chr1', 1L, 13L, 1L))
        sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
        self._iDb.execute( sqlCmd )
        obsTupleInBinTable = self._iDb.cursor.fetchall()
        self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
        expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L), (4L, 'seq4', 'chr1', 100L, 390L), (4L, 'seq5', 'chr1', 1L, 13L))
        sqlCmd = "SELECT * FROM %s" % ( self._tableName )
        self._iDb.execute( sqlCmd )
        obsTupleInSetTable = self._iDb.cursor.fetchall()
        self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
        
    def test_insertListInSetAndBinTableAndMergeAllSets(self):
        iSet1 = Set(1, "seq4", "chr1", 100, 390)
        iSet2 = Set(2, "seq5", "chr1", 1, 13)
        lSet = [iSet1, iSet2]
        self._iDb.createBinSetTable(self._tableName, True)
        self._iTableBinSetAdaptator.insertListInSetAndBinTableAndMergeAllSets(lSet)
        expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (5L, 1000.0, 'chr1', 1L, 13L, 1L), (4L, 1000.0, 'chr1', 100L, 390L, 1L))
        sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
        self._iDb.execute( sqlCmd )
        obsTupleInBinTable = self._iDb.cursor.fetchall()
        self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
        expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (5L, 'seq5', 'chr1', 1L, 13L), (4L, 'seq4', 'chr1', 100L, 390L) )
        sqlCmd = "SELECT * FROM %s" % ( self._tableName )
        self._iDb.execute( sqlCmd )
        obsTupleInSetTable = self._iDb.cursor.fetchall()
        self.assertEquals(expTupleInSetTable, obsTupleInSetTable)

    def test_insertListInSetAndBinTableAndRemoveOverlaps(self):
        iSet1 = Set(1, "seq4", "chr1", 100, 390)
        iSet2 = Set(2, "seq5", "chr1", 1, 13)
        lSet = [iSet1, iSet2]
        self._iDb.createBinSetTable(self._tableName, True)
        self._iTableBinSetAdaptator.insertListInSetAndBinTableAndRemoveOverlaps(lSet)
        expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L), (4L, 1000.0, 'chr1', 100L, 390L, 1L))
        sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
        self._iDb.execute( sqlCmd )
        obsTupleInBinTable = self._iDb.cursor.fetchall()
        self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
        expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L), (4L, 'seq4', 'chr1', 100L, 390L))
        sqlCmd = "SELECT * FROM %s" % ( self._tableName )
        self._iDb.execute( sqlCmd )
        obsTupleInSetTable = self._iDb.cursor.fetchall()
        self.assertEquals(expTupleInSetTable, obsTupleInSetTable)

    def test_insertListInSetAndBinTableAndRemoveOverlaps_Without_Overlaps(self):
        iSet1 = Set(1, "seq4", "chr1", 100, 390)
        iSet2 = Set(2, "seq5", "chr1", 50, 65)
        lSet = [iSet1, iSet2]
        self._iDb.createBinSetTable(self._tableName, True)
        self._iTableBinSetAdaptator.insertListInSetAndBinTableAndRemoveOverlaps(lSet)
        expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L), (4L, 1000.0, 'chr1', 100L, 390L, 1L), (5L, 1000.0, 'chr1', 50L, 65L, 1L))
        sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
        self._iDb.execute( sqlCmd )
        obsTupleInBinTable = self._iDb.cursor.fetchall()
        self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
        expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L), (4L, 'seq4', 'chr1', 100L, 390L), (5L, 'seq5', 'chr1', 50L, 65L))
        sqlCmd = "SELECT * FROM %s" % ( self._tableName )
        self._iDb.execute( sqlCmd )
        obsTupleInSetTable = self._iDb.cursor.fetchall()
        self.assertEquals(expTupleInSetTable, obsTupleInSetTable)

    def test_insertListInSetAndBinTableAndRemoveOverlaps_With_Only_Overlaps(self):
        iSet1 = Set(1, "seq4", "chr1", 1, 5)
        iSet2 = Set(2, "seq5", "chr1", 8, 13)
        lSet = [iSet1, iSet2]
        self._iDb.createBinSetTable(self._tableName, True)
        self._iTableBinSetAdaptator.insertListInSetAndBinTableAndRemoveOverlaps(lSet)
        expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L))
        sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
        self._iDb.execute( sqlCmd )
        obsTupleInBinTable = self._iDb.cursor.fetchall()
        self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
        expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L))
        sqlCmd = "SELECT * FROM %s" % ( self._tableName )
        self._iDb.execute( sqlCmd )
        obsTupleInSetTable = self._iDb.cursor.fetchall()
        self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
                          
if __name__ == "__main__":
    unittest.main()