6
|
1 import unittest
|
|
2 import os
|
|
3 import time
|
|
4 from commons.core.sql.TableBinSetAdaptator import TableBinSetAdaptator
|
|
5 from commons.core.coord.Set import Set
|
|
6 from commons.core.sql.DbFactory import DbFactory
|
|
7
|
|
8 class Test_TableBinSetAdaptator(unittest.TestCase):
|
|
9
|
|
10 def setUp(self):
|
|
11 self._uniqId = "%s_%s" % (time.strftime("%Y%m%d%H%M%S") , os.getpid())
|
|
12 self._iDb = DbFactory.createInstance()
|
|
13 radicalTableName = "dummySetTable"
|
|
14 self._tableName = "%s_%s" % (radicalTableName, self._uniqId)
|
|
15 self._tableName_bin = "%s_idx" % self._tableName
|
|
16 self._setFileName = "dummySetFile_%s" % self._uniqId
|
|
17 setF = open( self._setFileName, "w" )
|
|
18 setF.write("1\tseq1\tchr1\t1900\t3900\n")
|
|
19 setF.write("2\tseq2\tchr1\t2\t9\n")
|
|
20 setF.write("3\tseq3\tchr1\t8\t13\n")
|
|
21 setF.close()
|
|
22 self._iDb.createTable(self._tableName, "set", self._setFileName)
|
|
23 self._iTableBinSetAdaptator = TableBinSetAdaptator(self._iDb, self._tableName)
|
|
24
|
|
25 def tearDown(self):
|
|
26 self._iDb.dropTable( self._tableName )
|
|
27 self._iDb.dropTable( self._tableName_bin )
|
|
28 self._iDb.close()
|
|
29 if os.path.exists(self._setFileName):
|
|
30 os.remove(self._setFileName)
|
|
31
|
|
32 def test_insASetInSetAndBinTable(self):
|
|
33 iSet = Set(1, "set1", "seq1", 2, 1)
|
|
34 self._iDb.createBinSetTable(self._tableName, True)
|
|
35 self._iTableBinSetAdaptator.insASetInSetAndBinTable(iSet)
|
|
36 expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L), (1L, 1000.0, 'seq1', 1L, 2L, 0L))
|
|
37 sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
|
|
38 self._iDb.execute( sqlCmd )
|
|
39 obsTupleInBinTable = self._iDb.cursor.fetchall()
|
|
40 self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
|
|
41 expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L), (1L, 'set1', 'seq1', 2L, 1L))
|
|
42 sqlCmd = "SELECT * FROM %s" % ( self._tableName )
|
|
43 self._iDb.execute( sqlCmd )
|
|
44 obsTupleInSetTable = self._iDb.cursor.fetchall()
|
|
45 self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
|
|
46
|
|
47 def test_insASetInSetAndBinTable_delayedCase(self):
|
|
48 iSet = Set(1, "set1", "seq1", 2, 1)
|
|
49 self._iDb.createBinSetTable(self._tableName, True)
|
|
50 self._iTableBinSetAdaptator.insASetInSetAndBinTable(iSet, True)
|
|
51 expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L), (1L, 1000.0, 'seq1', 1L, 2L, 0L))
|
|
52 sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
|
|
53 self._iDb.execute( sqlCmd )
|
|
54 obsTupleInBinTable = self._iDb.cursor.fetchall()
|
|
55 self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
|
|
56 expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L), (1L, 'set1', 'seq1', 2L, 1L))
|
|
57 sqlCmd = "SELECT * FROM %s" % ( self._tableName )
|
|
58 self._iDb.execute( sqlCmd )
|
|
59 obsTupleInSetTable = self._iDb.cursor.fetchall()
|
|
60 self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
|
|
61
|
|
62 def test_deleteFromIdFromSetAndBinTable(self):
|
|
63 self._iDb.createBinSetTable(self._tableName, True)
|
|
64 self._iTableBinSetAdaptator.deleteFromIdFromSetAndBinTable(2)
|
|
65 expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L))
|
|
66 sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
|
|
67 self._iDb.execute( sqlCmd )
|
|
68 obsTupleInBinTable = self._iDb.cursor.fetchall()
|
|
69 self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
|
|
70 expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (3L, 'seq3', 'chr1', 8L, 13L))
|
|
71 sqlCmd = "SELECT * FROM %s" % ( self._tableName )
|
|
72 self._iDb.execute( sqlCmd )
|
|
73 obsTupleInSetTable = self._iDb.cursor.fetchall()
|
|
74 self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
|
|
75
|
|
76 def test_deleteFromListIdFromSetAndBinTable(self):
|
|
77 lSetToRemove = [1,2]
|
|
78 self._iDb.createBinSetTable(self._tableName, True)
|
|
79 self._iTableBinSetAdaptator.deleteFromListIdFromSetAndBinTable(lSetToRemove)
|
|
80 expTupleInBinTable = ((3L, 1000.0, 'chr1', 8L, 13L, 1L),)
|
|
81 sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
|
|
82 self._iDb.execute( sqlCmd )
|
|
83 obsTupleInBinTable = self._iDb.cursor.fetchall()
|
|
84 self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
|
|
85 expTupleInSetTable = ((3L, 'seq3', 'chr1', 8L, 13L),)
|
|
86 sqlCmd = "SELECT * FROM %s" % ( self._tableName )
|
|
87 self._iDb.execute( sqlCmd )
|
|
88 obsTupleInSetTable = self._iDb.cursor.fetchall()
|
|
89 self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
|
|
90 os.remove(self._setFileName)
|
|
91
|
|
92 def test_joinTwoSetsFromSetAndBinTable(self):
|
|
93 id1 = 1
|
|
94 id2 = 2
|
|
95 self._iDb.createBinSetTable(self._tableName, True)
|
|
96 obsNewId = self._iTableBinSetAdaptator.joinTwoSetsFromSetAndBinTable(id1, id2)
|
|
97 expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (1L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L))
|
|
98 expNewId = 1
|
|
99 sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
|
|
100 self._iDb.execute( sqlCmd )
|
|
101 obsTupleInBinTable = self._iDb.cursor.fetchall()
|
|
102 self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
|
|
103 expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (1L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L))
|
|
104 sqlCmd = "SELECT * FROM %s" % ( self._tableName )
|
|
105 self._iDb.execute( sqlCmd )
|
|
106 obsTupleInSetTable = self._iDb.cursor.fetchall()
|
|
107 self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
|
|
108 self.assertEquals(expNewId, obsNewId)
|
|
109
|
|
110 def test_joinTwoSetsFromSetAndBinTable_with_reversed_id(self):
|
|
111 id1 = 2
|
|
112 id2 = 1
|
|
113 self._iDb.createBinSetTable(self._tableName, True)
|
|
114 obsNewId = self._iTableBinSetAdaptator.joinTwoSetsFromSetAndBinTable(id1, id2)
|
|
115 expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (1L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L))
|
|
116 expNewId = 1
|
|
117 sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
|
|
118 self._iDb.execute( sqlCmd )
|
|
119 obsTupleInBinTable = self._iDb.cursor.fetchall()
|
|
120 self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
|
|
121 expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (1L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L))
|
|
122 sqlCmd = "SELECT * FROM %s" % ( self._tableName )
|
|
123 self._iDb.execute( sqlCmd )
|
|
124 obsTupleInSetTable = self._iDb.cursor.fetchall()
|
|
125 self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
|
|
126 self.assertEquals(expNewId, obsNewId)
|
|
127
|
|
128 def test_getNewId(self):
|
|
129 self._iDb.createBinSetTable(self._tableName, True)
|
|
130 obsNewId = self._iTableBinSetAdaptator.getNewId()
|
|
131 expNewId = 4
|
|
132 self.assertEquals(expNewId, obsNewId)
|
|
133
|
|
134 def test_getNewId_empty_table(self):
|
|
135 self._iDb.dropTable( self._tableName )
|
|
136 self._iDb.dropTable( self._tableName_bin )
|
|
137 setF = open( self._setFileName, "w" )
|
|
138 setF.close()
|
|
139 self._iDb.createTable( self._tableName, "set", self._setFileName )
|
|
140 self._iDb.createBinSetTable(self._tableName, True)
|
|
141 obsNewId = self._iTableBinSetAdaptator.getNewId()
|
|
142 expNewId = 1
|
|
143 self.assertEquals(expNewId, obsNewId)
|
|
144
|
|
145 def test_getSetListFromQueryCoord(self):
|
|
146 start = 10
|
|
147 end = 4000
|
|
148 seqName = 'chr1'
|
|
149 self._iDb.createBinSetTable(self._tableName, True)
|
|
150 obsLSet = self._iTableBinSetAdaptator.getSetListFromQueryCoord(seqName, start, end)
|
|
151 iSet1 = Set(1, "seq1", "chr1", 1900, 3900)
|
|
152 iSet2 = Set(3, "seq3", "chr1", 8, 13)
|
|
153 expLSet = [iSet1, iSet2]
|
|
154 self.assertEquals(expLSet, obsLSet)
|
|
155
|
|
156 def test_getSetListFromQueryCoord_return_empty_list(self):
|
|
157 start = 4000
|
|
158 end = 40000
|
|
159 seqName = 'chr1'
|
|
160 self._iDb.createBinSetTable(self._tableName, True)
|
|
161 obsLSet = self._iTableBinSetAdaptator.getSetListFromQueryCoord(seqName, start, end)
|
|
162 expLSet = []
|
|
163 self.assertEquals(expLSet, obsLSet)
|
|
164
|
|
165 def test_getSetListStrictlyIncludedInQueryCoord(self):
|
|
166 start = 10
|
|
167 end = 4000
|
|
168 seqName = 'chr1'
|
|
169 self._iDb.createBinSetTable(self._tableName, True)
|
|
170 obsLSet = self._iTableBinSetAdaptator.getSetListStrictlyIncludedInQueryCoord(seqName, start, end)
|
|
171 iSet1 = Set(1, "seq1", "chr1", 1900, 3900)
|
|
172 expLSet = [iSet1]
|
|
173 self.assertEquals(expLSet, obsLSet)
|
|
174
|
|
175 def test_getSetListStrictlyIncludedInQueryCoord_return_empty_list(self):
|
|
176 start = 4000
|
|
177 end = 40000
|
|
178 seqName = 'chr1'
|
|
179 self._iDb.createBinSetTable(self._tableName, True)
|
|
180 obsLSet = self._iTableBinSetAdaptator.getSetListStrictlyIncludedInQueryCoord(seqName, start, end)
|
|
181 expLSet = []
|
|
182 self.assertEquals(expLSet, obsLSet)
|
|
183
|
|
184 def test_getIdList(self):
|
|
185 expLId = [1,2,3]
|
|
186 self._iDb.createBinSetTable(self._tableName, True)
|
|
187 obsLId = self._iTableBinSetAdaptator.getIdList()
|
|
188 self.assertEquals(expLId, obsLId)
|
|
189
|
|
190 def test_getSeqNameList(self):
|
|
191 self._iDb.dropTable( self._tableName )
|
|
192 self._iDb.dropTable( self._tableName_bin )
|
|
193 setF = open( self._setFileName, "w" )
|
|
194 setF.write("1\tseq1\tchr2\t1900\t3900\n")
|
|
195 setF.write("2\tseq2\tchr1\t2\t9\n")
|
|
196 setF.write("3\tseq3\tchr1\t8\t13\n")
|
|
197 setF.close()
|
|
198 self._iDb.createTable( self._tableName, "set", self._setFileName )
|
|
199 self._iDb.createBinSetTable(self._tableName, True)
|
|
200 expLSeqName = ["chr1", "chr2"]
|
|
201 obsLSeqName = self._iTableBinSetAdaptator.getSeqNameList()
|
|
202 self.assertEquals(expLSeqName, obsLSeqName)
|
|
203
|
|
204 def test_insertListInSetAndBinTable(self):
|
|
205 iSet1 = Set(1, "seq4", "chr1", 100, 390)
|
|
206 iSet2 = Set(2, "seq5", "chr1", 1, 13)
|
|
207 lSet = [iSet1, iSet2]
|
|
208 self._iDb.createBinSetTable(self._tableName, True)
|
|
209 self._iTableBinSetAdaptator.insertListInSetAndBinTable(lSet)
|
|
210 expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L), (4L, 1000.0, 'chr1', 100L, 390L, 1L), (4L, 1000.0, 'chr1', 1L, 13L, 1L))
|
|
211 sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
|
|
212 self._iDb.execute( sqlCmd )
|
|
213 obsTupleInBinTable = self._iDb.cursor.fetchall()
|
|
214 self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
|
|
215 expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L), (4L, 'seq4', 'chr1', 100L, 390L), (4L, 'seq5', 'chr1', 1L, 13L))
|
|
216 sqlCmd = "SELECT * FROM %s" % ( self._tableName )
|
|
217 self._iDb.execute( sqlCmd )
|
|
218 obsTupleInSetTable = self._iDb.cursor.fetchall()
|
|
219 self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
|
|
220
|
|
221 def test_insertListInSetAndBinTableAndMergeAllSets(self):
|
|
222 iSet1 = Set(1, "seq4", "chr1", 100, 390)
|
|
223 iSet2 = Set(2, "seq5", "chr1", 1, 13)
|
|
224 lSet = [iSet1, iSet2]
|
|
225 self._iDb.createBinSetTable(self._tableName, True)
|
|
226 self._iTableBinSetAdaptator.insertListInSetAndBinTableAndMergeAllSets(lSet)
|
|
227 expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (5L, 1000.0, 'chr1', 1L, 13L, 1L), (4L, 1000.0, 'chr1', 100L, 390L, 1L))
|
|
228 sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
|
|
229 self._iDb.execute( sqlCmd )
|
|
230 obsTupleInBinTable = self._iDb.cursor.fetchall()
|
|
231 self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
|
|
232 expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (5L, 'seq5', 'chr1', 1L, 13L), (4L, 'seq4', 'chr1', 100L, 390L) )
|
|
233 sqlCmd = "SELECT * FROM %s" % ( self._tableName )
|
|
234 self._iDb.execute( sqlCmd )
|
|
235 obsTupleInSetTable = self._iDb.cursor.fetchall()
|
|
236 self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
|
|
237
|
|
238 def test_insertListInSetAndBinTableAndRemoveOverlaps(self):
|
|
239 iSet1 = Set(1, "seq4", "chr1", 100, 390)
|
|
240 iSet2 = Set(2, "seq5", "chr1", 1, 13)
|
|
241 lSet = [iSet1, iSet2]
|
|
242 self._iDb.createBinSetTable(self._tableName, True)
|
|
243 self._iTableBinSetAdaptator.insertListInSetAndBinTableAndRemoveOverlaps(lSet)
|
|
244 expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L), (4L, 1000.0, 'chr1', 100L, 390L, 1L))
|
|
245 sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
|
|
246 self._iDb.execute( sqlCmd )
|
|
247 obsTupleInBinTable = self._iDb.cursor.fetchall()
|
|
248 self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
|
|
249 expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L), (4L, 'seq4', 'chr1', 100L, 390L))
|
|
250 sqlCmd = "SELECT * FROM %s" % ( self._tableName )
|
|
251 self._iDb.execute( sqlCmd )
|
|
252 obsTupleInSetTable = self._iDb.cursor.fetchall()
|
|
253 self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
|
|
254
|
|
255 def test_insertListInSetAndBinTableAndRemoveOverlaps_Without_Overlaps(self):
|
|
256 iSet1 = Set(1, "seq4", "chr1", 100, 390)
|
|
257 iSet2 = Set(2, "seq5", "chr1", 50, 65)
|
|
258 lSet = [iSet1, iSet2]
|
|
259 self._iDb.createBinSetTable(self._tableName, True)
|
|
260 self._iTableBinSetAdaptator.insertListInSetAndBinTableAndRemoveOverlaps(lSet)
|
|
261 expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L), (4L, 1000.0, 'chr1', 100L, 390L, 1L), (5L, 1000.0, 'chr1', 50L, 65L, 1L))
|
|
262 sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
|
|
263 self._iDb.execute( sqlCmd )
|
|
264 obsTupleInBinTable = self._iDb.cursor.fetchall()
|
|
265 self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
|
|
266 expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L), (4L, 'seq4', 'chr1', 100L, 390L), (5L, 'seq5', 'chr1', 50L, 65L))
|
|
267 sqlCmd = "SELECT * FROM %s" % ( self._tableName )
|
|
268 self._iDb.execute( sqlCmd )
|
|
269 obsTupleInSetTable = self._iDb.cursor.fetchall()
|
|
270 self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
|
|
271
|
|
272 def test_insertListInSetAndBinTableAndRemoveOverlaps_With_Only_Overlaps(self):
|
|
273 iSet1 = Set(1, "seq4", "chr1", 1, 5)
|
|
274 iSet2 = Set(2, "seq5", "chr1", 8, 13)
|
|
275 lSet = [iSet1, iSet2]
|
|
276 self._iDb.createBinSetTable(self._tableName, True)
|
|
277 self._iTableBinSetAdaptator.insertListInSetAndBinTableAndRemoveOverlaps(lSet)
|
|
278 expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L))
|
|
279 sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
|
|
280 self._iDb.execute( sqlCmd )
|
|
281 obsTupleInBinTable = self._iDb.cursor.fetchall()
|
|
282 self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
|
|
283 expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L))
|
|
284 sqlCmd = "SELECT * FROM %s" % ( self._tableName )
|
|
285 self._iDb.execute( sqlCmd )
|
|
286 obsTupleInSetTable = self._iDb.cursor.fetchall()
|
|
287 self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
|
|
288
|
|
289 if __name__ == "__main__":
|
|
290 unittest.main() |