Mercurial > repos > yufei-luo > s_mart
diff commons/core/sql/test/Test_TableBinSetAdaptator.py @ 6:769e306b7933
Change the repository level.
author | yufei-luo |
---|---|
date | Fri, 18 Jan 2013 04:54:14 -0500 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/commons/core/sql/test/Test_TableBinSetAdaptator.py Fri Jan 18 04:54:14 2013 -0500 @@ -0,0 +1,290 @@ +import unittest +import os +import time +from commons.core.sql.TableBinSetAdaptator import TableBinSetAdaptator +from commons.core.coord.Set import Set +from commons.core.sql.DbFactory import DbFactory + +class Test_TableBinSetAdaptator(unittest.TestCase): + + def setUp(self): + self._uniqId = "%s_%s" % (time.strftime("%Y%m%d%H%M%S") , os.getpid()) + self._iDb = DbFactory.createInstance() + radicalTableName = "dummySetTable" + self._tableName = "%s_%s" % (radicalTableName, self._uniqId) + self._tableName_bin = "%s_idx" % self._tableName + self._setFileName = "dummySetFile_%s" % self._uniqId + setF = open( self._setFileName, "w" ) + setF.write("1\tseq1\tchr1\t1900\t3900\n") + setF.write("2\tseq2\tchr1\t2\t9\n") + setF.write("3\tseq3\tchr1\t8\t13\n") + setF.close() + self._iDb.createTable(self._tableName, "set", self._setFileName) + self._iTableBinSetAdaptator = TableBinSetAdaptator(self._iDb, self._tableName) + + def tearDown(self): + self._iDb.dropTable( self._tableName ) + self._iDb.dropTable( self._tableName_bin ) + self._iDb.close() + if os.path.exists(self._setFileName): + os.remove(self._setFileName) + + def test_insASetInSetAndBinTable(self): + iSet = Set(1, "set1", "seq1", 2, 1) + self._iDb.createBinSetTable(self._tableName, True) + self._iTableBinSetAdaptator.insASetInSetAndBinTable(iSet) + expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L), (1L, 1000.0, 'seq1', 1L, 2L, 0L)) + sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin ) + self._iDb.execute( sqlCmd ) + obsTupleInBinTable = self._iDb.cursor.fetchall() + self.assertEquals(expTupleInBinTable, obsTupleInBinTable) + expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L), (1L, 'set1', 'seq1', 2L, 1L)) + sqlCmd = "SELECT * FROM %s" % ( self._tableName ) + self._iDb.execute( sqlCmd ) + obsTupleInSetTable = self._iDb.cursor.fetchall() + self.assertEquals(expTupleInSetTable, obsTupleInSetTable) + + def test_insASetInSetAndBinTable_delayedCase(self): + iSet = Set(1, "set1", "seq1", 2, 1) + self._iDb.createBinSetTable(self._tableName, True) + self._iTableBinSetAdaptator.insASetInSetAndBinTable(iSet, True) + expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L), (1L, 1000.0, 'seq1', 1L, 2L, 0L)) + sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin ) + self._iDb.execute( sqlCmd ) + obsTupleInBinTable = self._iDb.cursor.fetchall() + self.assertEquals(expTupleInBinTable, obsTupleInBinTable) + expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L), (1L, 'set1', 'seq1', 2L, 1L)) + sqlCmd = "SELECT * FROM %s" % ( self._tableName ) + self._iDb.execute( sqlCmd ) + obsTupleInSetTable = self._iDb.cursor.fetchall() + self.assertEquals(expTupleInSetTable, obsTupleInSetTable) + + def test_deleteFromIdFromSetAndBinTable(self): + self._iDb.createBinSetTable(self._tableName, True) + self._iTableBinSetAdaptator.deleteFromIdFromSetAndBinTable(2) + expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L)) + sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin ) + self._iDb.execute( sqlCmd ) + obsTupleInBinTable = self._iDb.cursor.fetchall() + self.assertEquals(expTupleInBinTable, obsTupleInBinTable) + expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (3L, 'seq3', 'chr1', 8L, 13L)) + sqlCmd = "SELECT * FROM %s" % ( self._tableName ) + self._iDb.execute( sqlCmd ) + obsTupleInSetTable = self._iDb.cursor.fetchall() + self.assertEquals(expTupleInSetTable, obsTupleInSetTable) + + def test_deleteFromListIdFromSetAndBinTable(self): + lSetToRemove = [1,2] + self._iDb.createBinSetTable(self._tableName, True) + self._iTableBinSetAdaptator.deleteFromListIdFromSetAndBinTable(lSetToRemove) + expTupleInBinTable = ((3L, 1000.0, 'chr1', 8L, 13L, 1L),) + sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin ) + self._iDb.execute( sqlCmd ) + obsTupleInBinTable = self._iDb.cursor.fetchall() + self.assertEquals(expTupleInBinTable, obsTupleInBinTable) + expTupleInSetTable = ((3L, 'seq3', 'chr1', 8L, 13L),) + sqlCmd = "SELECT * FROM %s" % ( self._tableName ) + self._iDb.execute( sqlCmd ) + obsTupleInSetTable = self._iDb.cursor.fetchall() + self.assertEquals(expTupleInSetTable, obsTupleInSetTable) + os.remove(self._setFileName) + + def test_joinTwoSetsFromSetAndBinTable(self): + id1 = 1 + id2 = 2 + self._iDb.createBinSetTable(self._tableName, True) + obsNewId = self._iTableBinSetAdaptator.joinTwoSetsFromSetAndBinTable(id1, id2) + expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (1L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L)) + expNewId = 1 + sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin ) + self._iDb.execute( sqlCmd ) + obsTupleInBinTable = self._iDb.cursor.fetchall() + self.assertEquals(expTupleInBinTable, obsTupleInBinTable) + expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (1L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L)) + sqlCmd = "SELECT * FROM %s" % ( self._tableName ) + self._iDb.execute( sqlCmd ) + obsTupleInSetTable = self._iDb.cursor.fetchall() + self.assertEquals(expTupleInSetTable, obsTupleInSetTable) + self.assertEquals(expNewId, obsNewId) + + def test_joinTwoSetsFromSetAndBinTable_with_reversed_id(self): + id1 = 2 + id2 = 1 + self._iDb.createBinSetTable(self._tableName, True) + obsNewId = self._iTableBinSetAdaptator.joinTwoSetsFromSetAndBinTable(id1, id2) + expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (1L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L)) + expNewId = 1 + sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin ) + self._iDb.execute( sqlCmd ) + obsTupleInBinTable = self._iDb.cursor.fetchall() + self.assertEquals(expTupleInBinTable, obsTupleInBinTable) + expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (1L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L)) + sqlCmd = "SELECT * FROM %s" % ( self._tableName ) + self._iDb.execute( sqlCmd ) + obsTupleInSetTable = self._iDb.cursor.fetchall() + self.assertEquals(expTupleInSetTable, obsTupleInSetTable) + self.assertEquals(expNewId, obsNewId) + + def test_getNewId(self): + self._iDb.createBinSetTable(self._tableName, True) + obsNewId = self._iTableBinSetAdaptator.getNewId() + expNewId = 4 + self.assertEquals(expNewId, obsNewId) + + def test_getNewId_empty_table(self): + self._iDb.dropTable( self._tableName ) + self._iDb.dropTable( self._tableName_bin ) + setF = open( self._setFileName, "w" ) + setF.close() + self._iDb.createTable( self._tableName, "set", self._setFileName ) + self._iDb.createBinSetTable(self._tableName, True) + obsNewId = self._iTableBinSetAdaptator.getNewId() + expNewId = 1 + self.assertEquals(expNewId, obsNewId) + + def test_getSetListFromQueryCoord(self): + start = 10 + end = 4000 + seqName = 'chr1' + self._iDb.createBinSetTable(self._tableName, True) + obsLSet = self._iTableBinSetAdaptator.getSetListFromQueryCoord(seqName, start, end) + iSet1 = Set(1, "seq1", "chr1", 1900, 3900) + iSet2 = Set(3, "seq3", "chr1", 8, 13) + expLSet = [iSet1, iSet2] + self.assertEquals(expLSet, obsLSet) + + def test_getSetListFromQueryCoord_return_empty_list(self): + start = 4000 + end = 40000 + seqName = 'chr1' + self._iDb.createBinSetTable(self._tableName, True) + obsLSet = self._iTableBinSetAdaptator.getSetListFromQueryCoord(seqName, start, end) + expLSet = [] + self.assertEquals(expLSet, obsLSet) + + def test_getSetListStrictlyIncludedInQueryCoord(self): + start = 10 + end = 4000 + seqName = 'chr1' + self._iDb.createBinSetTable(self._tableName, True) + obsLSet = self._iTableBinSetAdaptator.getSetListStrictlyIncludedInQueryCoord(seqName, start, end) + iSet1 = Set(1, "seq1", "chr1", 1900, 3900) + expLSet = [iSet1] + self.assertEquals(expLSet, obsLSet) + + def test_getSetListStrictlyIncludedInQueryCoord_return_empty_list(self): + start = 4000 + end = 40000 + seqName = 'chr1' + self._iDb.createBinSetTable(self._tableName, True) + obsLSet = self._iTableBinSetAdaptator.getSetListStrictlyIncludedInQueryCoord(seqName, start, end) + expLSet = [] + self.assertEquals(expLSet, obsLSet) + + def test_getIdList(self): + expLId = [1,2,3] + self._iDb.createBinSetTable(self._tableName, True) + obsLId = self._iTableBinSetAdaptator.getIdList() + self.assertEquals(expLId, obsLId) + + def test_getSeqNameList(self): + self._iDb.dropTable( self._tableName ) + self._iDb.dropTable( self._tableName_bin ) + setF = open( self._setFileName, "w" ) + setF.write("1\tseq1\tchr2\t1900\t3900\n") + setF.write("2\tseq2\tchr1\t2\t9\n") + setF.write("3\tseq3\tchr1\t8\t13\n") + setF.close() + self._iDb.createTable( self._tableName, "set", self._setFileName ) + self._iDb.createBinSetTable(self._tableName, True) + expLSeqName = ["chr1", "chr2"] + obsLSeqName = self._iTableBinSetAdaptator.getSeqNameList() + self.assertEquals(expLSeqName, obsLSeqName) + + def test_insertListInSetAndBinTable(self): + iSet1 = Set(1, "seq4", "chr1", 100, 390) + iSet2 = Set(2, "seq5", "chr1", 1, 13) + lSet = [iSet1, iSet2] + self._iDb.createBinSetTable(self._tableName, True) + self._iTableBinSetAdaptator.insertListInSetAndBinTable(lSet) + expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L), (4L, 1000.0, 'chr1', 100L, 390L, 1L), (4L, 1000.0, 'chr1', 1L, 13L, 1L)) + sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin ) + self._iDb.execute( sqlCmd ) + obsTupleInBinTable = self._iDb.cursor.fetchall() + self.assertEquals(expTupleInBinTable, obsTupleInBinTable) + expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L), (4L, 'seq4', 'chr1', 100L, 390L), (4L, 'seq5', 'chr1', 1L, 13L)) + sqlCmd = "SELECT * FROM %s" % ( self._tableName ) + self._iDb.execute( sqlCmd ) + obsTupleInSetTable = self._iDb.cursor.fetchall() + self.assertEquals(expTupleInSetTable, obsTupleInSetTable) + + def test_insertListInSetAndBinTableAndMergeAllSets(self): + iSet1 = Set(1, "seq4", "chr1", 100, 390) + iSet2 = Set(2, "seq5", "chr1", 1, 13) + lSet = [iSet1, iSet2] + self._iDb.createBinSetTable(self._tableName, True) + self._iTableBinSetAdaptator.insertListInSetAndBinTableAndMergeAllSets(lSet) + expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (5L, 1000.0, 'chr1', 1L, 13L, 1L), (4L, 1000.0, 'chr1', 100L, 390L, 1L)) + sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin ) + self._iDb.execute( sqlCmd ) + obsTupleInBinTable = self._iDb.cursor.fetchall() + self.assertEquals(expTupleInBinTable, obsTupleInBinTable) + expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (5L, 'seq5', 'chr1', 1L, 13L), (4L, 'seq4', 'chr1', 100L, 390L) ) + sqlCmd = "SELECT * FROM %s" % ( self._tableName ) + self._iDb.execute( sqlCmd ) + obsTupleInSetTable = self._iDb.cursor.fetchall() + self.assertEquals(expTupleInSetTable, obsTupleInSetTable) + + def test_insertListInSetAndBinTableAndRemoveOverlaps(self): + iSet1 = Set(1, "seq4", "chr1", 100, 390) + iSet2 = Set(2, "seq5", "chr1", 1, 13) + lSet = [iSet1, iSet2] + self._iDb.createBinSetTable(self._tableName, True) + self._iTableBinSetAdaptator.insertListInSetAndBinTableAndRemoveOverlaps(lSet) + expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L), (4L, 1000.0, 'chr1', 100L, 390L, 1L)) + sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin ) + self._iDb.execute( sqlCmd ) + obsTupleInBinTable = self._iDb.cursor.fetchall() + self.assertEquals(expTupleInBinTable, obsTupleInBinTable) + expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L), (4L, 'seq4', 'chr1', 100L, 390L)) + sqlCmd = "SELECT * FROM %s" % ( self._tableName ) + self._iDb.execute( sqlCmd ) + obsTupleInSetTable = self._iDb.cursor.fetchall() + self.assertEquals(expTupleInSetTable, obsTupleInSetTable) + + def test_insertListInSetAndBinTableAndRemoveOverlaps_Without_Overlaps(self): + iSet1 = Set(1, "seq4", "chr1", 100, 390) + iSet2 = Set(2, "seq5", "chr1", 50, 65) + lSet = [iSet1, iSet2] + self._iDb.createBinSetTable(self._tableName, True) + self._iTableBinSetAdaptator.insertListInSetAndBinTableAndRemoveOverlaps(lSet) + expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L), (4L, 1000.0, 'chr1', 100L, 390L, 1L), (5L, 1000.0, 'chr1', 50L, 65L, 1L)) + sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin ) + self._iDb.execute( sqlCmd ) + obsTupleInBinTable = self._iDb.cursor.fetchall() + self.assertEquals(expTupleInBinTable, obsTupleInBinTable) + expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L), (4L, 'seq4', 'chr1', 100L, 390L), (5L, 'seq5', 'chr1', 50L, 65L)) + sqlCmd = "SELECT * FROM %s" % ( self._tableName ) + self._iDb.execute( sqlCmd ) + obsTupleInSetTable = self._iDb.cursor.fetchall() + self.assertEquals(expTupleInSetTable, obsTupleInSetTable) + + def test_insertListInSetAndBinTableAndRemoveOverlaps_With_Only_Overlaps(self): + iSet1 = Set(1, "seq4", "chr1", 1, 5) + iSet2 = Set(2, "seq5", "chr1", 8, 13) + lSet = [iSet1, iSet2] + self._iDb.createBinSetTable(self._tableName, True) + self._iTableBinSetAdaptator.insertListInSetAndBinTableAndRemoveOverlaps(lSet) + expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L)) + sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin ) + self._iDb.execute( sqlCmd ) + obsTupleInBinTable = self._iDb.cursor.fetchall() + self.assertEquals(expTupleInBinTable, obsTupleInBinTable) + expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L)) + sqlCmd = "SELECT * FROM %s" % ( self._tableName ) + self._iDb.execute( sqlCmd ) + obsTupleInSetTable = self._iDb.cursor.fetchall() + self.assertEquals(expTupleInSetTable, obsTupleInSetTable) + +if __name__ == "__main__": + unittest.main() \ No newline at end of file