diff commons/core/sql/test/Test_TableBinSetAdaptator.py @ 6:769e306b7933

Change the repository level.
author yufei-luo
date Fri, 18 Jan 2013 04:54:14 -0500
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/commons/core/sql/test/Test_TableBinSetAdaptator.py	Fri Jan 18 04:54:14 2013 -0500
@@ -0,0 +1,290 @@
+import unittest
+import os
+import time
+from commons.core.sql.TableBinSetAdaptator import TableBinSetAdaptator
+from commons.core.coord.Set import Set
+from commons.core.sql.DbFactory import DbFactory
+
+class Test_TableBinSetAdaptator(unittest.TestCase):
+
+    def setUp(self):
+        self._uniqId = "%s_%s" % (time.strftime("%Y%m%d%H%M%S") , os.getpid())
+        self._iDb = DbFactory.createInstance()
+        radicalTableName = "dummySetTable"
+        self._tableName = "%s_%s" % (radicalTableName, self._uniqId)
+        self._tableName_bin = "%s_idx" % self._tableName
+        self._setFileName = "dummySetFile_%s" % self._uniqId
+        setF = open( self._setFileName, "w" )
+        setF.write("1\tseq1\tchr1\t1900\t3900\n")
+        setF.write("2\tseq2\tchr1\t2\t9\n")
+        setF.write("3\tseq3\tchr1\t8\t13\n")
+        setF.close()
+        self._iDb.createTable(self._tableName, "set", self._setFileName)
+        self._iTableBinSetAdaptator = TableBinSetAdaptator(self._iDb, self._tableName)
+       
+    def tearDown(self):
+        self._iDb.dropTable( self._tableName )
+        self._iDb.dropTable( self._tableName_bin )
+        self._iDb.close()
+        if os.path.exists(self._setFileName):
+            os.remove(self._setFileName)
+        
+    def test_insASetInSetAndBinTable(self):
+        iSet = Set(1, "set1", "seq1", 2, 1)
+        self._iDb.createBinSetTable(self._tableName, True)
+        self._iTableBinSetAdaptator.insASetInSetAndBinTable(iSet)
+        expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L), (1L, 1000.0, 'seq1', 1L, 2L, 0L))
+        sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
+        self._iDb.execute( sqlCmd )
+        obsTupleInBinTable = self._iDb.cursor.fetchall()
+        self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
+        expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L), (1L, 'set1', 'seq1', 2L, 1L))
+        sqlCmd = "SELECT * FROM %s" % ( self._tableName )
+        self._iDb.execute( sqlCmd )
+        obsTupleInSetTable = self._iDb.cursor.fetchall()
+        self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
+        
+    def test_insASetInSetAndBinTable_delayedCase(self):
+        iSet = Set(1, "set1", "seq1", 2, 1)
+        self._iDb.createBinSetTable(self._tableName, True)
+        self._iTableBinSetAdaptator.insASetInSetAndBinTable(iSet, True)
+        expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L), (1L, 1000.0, 'seq1', 1L, 2L, 0L))
+        sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
+        self._iDb.execute( sqlCmd )
+        obsTupleInBinTable = self._iDb.cursor.fetchall()
+        self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
+        expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L), (1L, 'set1', 'seq1', 2L, 1L))
+        sqlCmd = "SELECT * FROM %s" % ( self._tableName )
+        self._iDb.execute( sqlCmd )
+        obsTupleInSetTable = self._iDb.cursor.fetchall()
+        self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
+        
+    def test_deleteFromIdFromSetAndBinTable(self):
+        self._iDb.createBinSetTable(self._tableName, True)
+        self._iTableBinSetAdaptator.deleteFromIdFromSetAndBinTable(2)
+        expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L))
+        sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
+        self._iDb.execute( sqlCmd )
+        obsTupleInBinTable = self._iDb.cursor.fetchall()
+        self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
+        expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (3L, 'seq3', 'chr1', 8L, 13L))
+        sqlCmd = "SELECT * FROM %s" % ( self._tableName )
+        self._iDb.execute( sqlCmd )
+        obsTupleInSetTable = self._iDb.cursor.fetchall()
+        self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
+
+    def test_deleteFromListIdFromSetAndBinTable(self):
+        lSetToRemove = [1,2]
+        self._iDb.createBinSetTable(self._tableName, True)
+        self._iTableBinSetAdaptator.deleteFromListIdFromSetAndBinTable(lSetToRemove)
+        expTupleInBinTable = ((3L, 1000.0, 'chr1', 8L, 13L, 1L),)
+        sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
+        self._iDb.execute( sqlCmd )
+        obsTupleInBinTable = self._iDb.cursor.fetchall()
+        self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
+        expTupleInSetTable = ((3L, 'seq3', 'chr1', 8L, 13L),)
+        sqlCmd = "SELECT * FROM %s" % ( self._tableName )
+        self._iDb.execute( sqlCmd )
+        obsTupleInSetTable = self._iDb.cursor.fetchall()
+        self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
+        os.remove(self._setFileName)
+
+    def test_joinTwoSetsFromSetAndBinTable(self):
+        id1 = 1
+        id2 = 2
+        self._iDb.createBinSetTable(self._tableName, True)
+        obsNewId = self._iTableBinSetAdaptator.joinTwoSetsFromSetAndBinTable(id1, id2)
+        expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (1L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L))
+        expNewId = 1
+        sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
+        self._iDb.execute( sqlCmd )
+        obsTupleInBinTable = self._iDb.cursor.fetchall()
+        self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
+        expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (1L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L))
+        sqlCmd = "SELECT * FROM %s" % ( self._tableName )
+        self._iDb.execute( sqlCmd )
+        obsTupleInSetTable = self._iDb.cursor.fetchall()
+        self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
+        self.assertEquals(expNewId, obsNewId)
+        
+    def test_joinTwoSetsFromSetAndBinTable_with_reversed_id(self):
+        id1 = 2
+        id2 = 1
+        self._iDb.createBinSetTable(self._tableName, True)
+        obsNewId = self._iTableBinSetAdaptator.joinTwoSetsFromSetAndBinTable(id1, id2)
+        expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (1L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L))
+        expNewId = 1
+        sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
+        self._iDb.execute( sqlCmd )
+        obsTupleInBinTable = self._iDb.cursor.fetchall()
+        self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
+        expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (1L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L))
+        sqlCmd = "SELECT * FROM %s" % ( self._tableName )
+        self._iDb.execute( sqlCmd )
+        obsTupleInSetTable = self._iDb.cursor.fetchall()
+        self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
+        self.assertEquals(expNewId, obsNewId)
+        
+    def test_getNewId(self):   
+        self._iDb.createBinSetTable(self._tableName, True)
+        obsNewId = self._iTableBinSetAdaptator.getNewId()
+        expNewId = 4
+        self.assertEquals(expNewId, obsNewId)
+        
+    def test_getNewId_empty_table(self):
+        self._iDb.dropTable( self._tableName )
+        self._iDb.dropTable( self._tableName_bin )
+        setF = open( self._setFileName, "w" )
+        setF.close()
+        self._iDb.createTable( self._tableName, "set", self._setFileName )
+        self._iDb.createBinSetTable(self._tableName, True)
+        obsNewId = self._iTableBinSetAdaptator.getNewId()
+        expNewId = 1
+        self.assertEquals(expNewId, obsNewId)
+        
+    def test_getSetListFromQueryCoord(self):
+        start = 10
+        end = 4000
+        seqName = 'chr1'
+        self._iDb.createBinSetTable(self._tableName, True)
+        obsLSet = self._iTableBinSetAdaptator.getSetListFromQueryCoord(seqName, start, end)
+        iSet1 = Set(1, "seq1", "chr1", 1900, 3900)
+        iSet2 = Set(3, "seq3", "chr1", 8, 13)
+        expLSet = [iSet1, iSet2]
+        self.assertEquals(expLSet, obsLSet)
+        
+    def test_getSetListFromQueryCoord_return_empty_list(self):
+        start = 4000
+        end = 40000
+        seqName = 'chr1'
+        self._iDb.createBinSetTable(self._tableName, True)
+        obsLSet = self._iTableBinSetAdaptator.getSetListFromQueryCoord(seqName, start, end)
+        expLSet = []
+        self.assertEquals(expLSet, obsLSet)
+        
+    def test_getSetListStrictlyIncludedInQueryCoord(self):
+        start = 10
+        end = 4000
+        seqName = 'chr1'
+        self._iDb.createBinSetTable(self._tableName, True)
+        obsLSet = self._iTableBinSetAdaptator.getSetListStrictlyIncludedInQueryCoord(seqName, start, end)
+        iSet1 = Set(1, "seq1", "chr1", 1900, 3900)
+        expLSet = [iSet1]
+        self.assertEquals(expLSet, obsLSet)
+        
+    def test_getSetListStrictlyIncludedInQueryCoord_return_empty_list(self):
+        start = 4000
+        end = 40000
+        seqName = 'chr1'
+        self._iDb.createBinSetTable(self._tableName, True)
+        obsLSet = self._iTableBinSetAdaptator.getSetListStrictlyIncludedInQueryCoord(seqName, start, end)
+        expLSet = []
+        self.assertEquals(expLSet, obsLSet)
+        
+    def test_getIdList(self):
+        expLId = [1,2,3]
+        self._iDb.createBinSetTable(self._tableName, True)
+        obsLId = self._iTableBinSetAdaptator.getIdList()
+        self.assertEquals(expLId, obsLId)
+        
+    def test_getSeqNameList(self):
+        self._iDb.dropTable( self._tableName )
+        self._iDb.dropTable( self._tableName_bin )
+        setF = open( self._setFileName, "w" )
+        setF.write("1\tseq1\tchr2\t1900\t3900\n")
+        setF.write("2\tseq2\tchr1\t2\t9\n")
+        setF.write("3\tseq3\tchr1\t8\t13\n")
+        setF.close()
+        self._iDb.createTable( self._tableName, "set", self._setFileName )
+        self._iDb.createBinSetTable(self._tableName, True)
+        expLSeqName = ["chr1", "chr2"]
+        obsLSeqName = self._iTableBinSetAdaptator.getSeqNameList()
+        self.assertEquals(expLSeqName, obsLSeqName)
+        
+    def test_insertListInSetAndBinTable(self):
+        iSet1 = Set(1, "seq4", "chr1", 100, 390)
+        iSet2 = Set(2, "seq5", "chr1", 1, 13)
+        lSet = [iSet1, iSet2]
+        self._iDb.createBinSetTable(self._tableName, True)
+        self._iTableBinSetAdaptator.insertListInSetAndBinTable(lSet)
+        expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L), (4L, 1000.0, 'chr1', 100L, 390L, 1L), (4L, 1000.0, 'chr1', 1L, 13L, 1L))
+        sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
+        self._iDb.execute( sqlCmd )
+        obsTupleInBinTable = self._iDb.cursor.fetchall()
+        self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
+        expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L), (4L, 'seq4', 'chr1', 100L, 390L), (4L, 'seq5', 'chr1', 1L, 13L))
+        sqlCmd = "SELECT * FROM %s" % ( self._tableName )
+        self._iDb.execute( sqlCmd )
+        obsTupleInSetTable = self._iDb.cursor.fetchall()
+        self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
+        
+    def test_insertListInSetAndBinTableAndMergeAllSets(self):
+        iSet1 = Set(1, "seq4", "chr1", 100, 390)
+        iSet2 = Set(2, "seq5", "chr1", 1, 13)
+        lSet = [iSet1, iSet2]
+        self._iDb.createBinSetTable(self._tableName, True)
+        self._iTableBinSetAdaptator.insertListInSetAndBinTableAndMergeAllSets(lSet)
+        expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (5L, 1000.0, 'chr1', 1L, 13L, 1L), (4L, 1000.0, 'chr1', 100L, 390L, 1L))
+        sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
+        self._iDb.execute( sqlCmd )
+        obsTupleInBinTable = self._iDb.cursor.fetchall()
+        self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
+        expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (5L, 'seq5', 'chr1', 1L, 13L), (4L, 'seq4', 'chr1', 100L, 390L) )
+        sqlCmd = "SELECT * FROM %s" % ( self._tableName )
+        self._iDb.execute( sqlCmd )
+        obsTupleInSetTable = self._iDb.cursor.fetchall()
+        self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
+
+    def test_insertListInSetAndBinTableAndRemoveOverlaps(self):
+        iSet1 = Set(1, "seq4", "chr1", 100, 390)
+        iSet2 = Set(2, "seq5", "chr1", 1, 13)
+        lSet = [iSet1, iSet2]
+        self._iDb.createBinSetTable(self._tableName, True)
+        self._iTableBinSetAdaptator.insertListInSetAndBinTableAndRemoveOverlaps(lSet)
+        expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L), (4L, 1000.0, 'chr1', 100L, 390L, 1L))
+        sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
+        self._iDb.execute( sqlCmd )
+        obsTupleInBinTable = self._iDb.cursor.fetchall()
+        self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
+        expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L), (4L, 'seq4', 'chr1', 100L, 390L))
+        sqlCmd = "SELECT * FROM %s" % ( self._tableName )
+        self._iDb.execute( sqlCmd )
+        obsTupleInSetTable = self._iDb.cursor.fetchall()
+        self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
+
+    def test_insertListInSetAndBinTableAndRemoveOverlaps_Without_Overlaps(self):
+        iSet1 = Set(1, "seq4", "chr1", 100, 390)
+        iSet2 = Set(2, "seq5", "chr1", 50, 65)
+        lSet = [iSet1, iSet2]
+        self._iDb.createBinSetTable(self._tableName, True)
+        self._iTableBinSetAdaptator.insertListInSetAndBinTableAndRemoveOverlaps(lSet)
+        expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L), (4L, 1000.0, 'chr1', 100L, 390L, 1L), (5L, 1000.0, 'chr1', 50L, 65L, 1L))
+        sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
+        self._iDb.execute( sqlCmd )
+        obsTupleInBinTable = self._iDb.cursor.fetchall()
+        self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
+        expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L), (4L, 'seq4', 'chr1', 100L, 390L), (5L, 'seq5', 'chr1', 50L, 65L))
+        sqlCmd = "SELECT * FROM %s" % ( self._tableName )
+        self._iDb.execute( sqlCmd )
+        obsTupleInSetTable = self._iDb.cursor.fetchall()
+        self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
+
+    def test_insertListInSetAndBinTableAndRemoveOverlaps_With_Only_Overlaps(self):
+        iSet1 = Set(1, "seq4", "chr1", 1, 5)
+        iSet2 = Set(2, "seq5", "chr1", 8, 13)
+        lSet = [iSet1, iSet2]
+        self._iDb.createBinSetTable(self._tableName, True)
+        self._iTableBinSetAdaptator.insertListInSetAndBinTableAndRemoveOverlaps(lSet)
+        expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L))
+        sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
+        self._iDb.execute( sqlCmd )
+        obsTupleInBinTable = self._iDb.cursor.fetchall()
+        self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
+        expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L))
+        sqlCmd = "SELECT * FROM %s" % ( self._tableName )
+        self._iDb.execute( sqlCmd )
+        obsTupleInSetTable = self._iDb.cursor.fetchall()
+        self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
+                          
+if __name__ == "__main__":
+    unittest.main()
\ No newline at end of file