comparison smart_toolShed/commons/core/sql/test/Test_TableBinSetAdaptator.py @ 0:e0f8dcca02ed

Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author yufei-luo
date Thu, 17 Jan 2013 10:52:14 -0500
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:e0f8dcca02ed
1 import unittest
2 import os
3 import time
4 from commons.core.sql.TableBinSetAdaptator import TableBinSetAdaptator
5 from commons.core.coord.Set import Set
6 from commons.core.sql.DbFactory import DbFactory
7
8 class Test_TableBinSetAdaptator(unittest.TestCase):
9
10 def setUp(self):
11 self._uniqId = "%s_%s" % (time.strftime("%Y%m%d%H%M%S") , os.getpid())
12 self._iDb = DbFactory.createInstance()
13 radicalTableName = "dummySetTable"
14 self._tableName = "%s_%s" % (radicalTableName, self._uniqId)
15 self._tableName_bin = "%s_idx" % self._tableName
16 self._setFileName = "dummySetFile_%s" % self._uniqId
17 setF = open( self._setFileName, "w" )
18 setF.write("1\tseq1\tchr1\t1900\t3900\n")
19 setF.write("2\tseq2\tchr1\t2\t9\n")
20 setF.write("3\tseq3\tchr1\t8\t13\n")
21 setF.close()
22 self._iDb.createTable(self._tableName, "set", self._setFileName)
23 self._iTableBinSetAdaptator = TableBinSetAdaptator(self._iDb, self._tableName)
24
25 def tearDown(self):
26 self._iDb.dropTable( self._tableName )
27 self._iDb.dropTable( self._tableName_bin )
28 self._iDb.close()
29 if os.path.exists(self._setFileName):
30 os.remove(self._setFileName)
31
32 def test_insASetInSetAndBinTable(self):
33 iSet = Set(1, "set1", "seq1", 2, 1)
34 self._iDb.createBinSetTable(self._tableName, True)
35 self._iTableBinSetAdaptator.insASetInSetAndBinTable(iSet)
36 expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L), (1L, 1000.0, 'seq1', 1L, 2L, 0L))
37 sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
38 self._iDb.execute( sqlCmd )
39 obsTupleInBinTable = self._iDb.cursor.fetchall()
40 self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
41 expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L), (1L, 'set1', 'seq1', 2L, 1L))
42 sqlCmd = "SELECT * FROM %s" % ( self._tableName )
43 self._iDb.execute( sqlCmd )
44 obsTupleInSetTable = self._iDb.cursor.fetchall()
45 self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
46
47 def test_insASetInSetAndBinTable_delayedCase(self):
48 iSet = Set(1, "set1", "seq1", 2, 1)
49 self._iDb.createBinSetTable(self._tableName, True)
50 self._iTableBinSetAdaptator.insASetInSetAndBinTable(iSet, True)
51 expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L), (1L, 1000.0, 'seq1', 1L, 2L, 0L))
52 sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
53 self._iDb.execute( sqlCmd )
54 obsTupleInBinTable = self._iDb.cursor.fetchall()
55 self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
56 expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L), (1L, 'set1', 'seq1', 2L, 1L))
57 sqlCmd = "SELECT * FROM %s" % ( self._tableName )
58 self._iDb.execute( sqlCmd )
59 obsTupleInSetTable = self._iDb.cursor.fetchall()
60 self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
61
62 def test_deleteFromIdFromSetAndBinTable(self):
63 self._iDb.createBinSetTable(self._tableName, True)
64 self._iTableBinSetAdaptator.deleteFromIdFromSetAndBinTable(2)
65 expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L))
66 sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
67 self._iDb.execute( sqlCmd )
68 obsTupleInBinTable = self._iDb.cursor.fetchall()
69 self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
70 expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (3L, 'seq3', 'chr1', 8L, 13L))
71 sqlCmd = "SELECT * FROM %s" % ( self._tableName )
72 self._iDb.execute( sqlCmd )
73 obsTupleInSetTable = self._iDb.cursor.fetchall()
74 self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
75
76 def test_deleteFromListIdFromSetAndBinTable(self):
77 lSetToRemove = [1,2]
78 self._iDb.createBinSetTable(self._tableName, True)
79 self._iTableBinSetAdaptator.deleteFromListIdFromSetAndBinTable(lSetToRemove)
80 expTupleInBinTable = ((3L, 1000.0, 'chr1', 8L, 13L, 1L),)
81 sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
82 self._iDb.execute( sqlCmd )
83 obsTupleInBinTable = self._iDb.cursor.fetchall()
84 self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
85 expTupleInSetTable = ((3L, 'seq3', 'chr1', 8L, 13L),)
86 sqlCmd = "SELECT * FROM %s" % ( self._tableName )
87 self._iDb.execute( sqlCmd )
88 obsTupleInSetTable = self._iDb.cursor.fetchall()
89 self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
90 os.remove(self._setFileName)
91
92 def test_joinTwoSetsFromSetAndBinTable(self):
93 id1 = 1
94 id2 = 2
95 self._iDb.createBinSetTable(self._tableName, True)
96 obsNewId = self._iTableBinSetAdaptator.joinTwoSetsFromSetAndBinTable(id1, id2)
97 expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (1L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L))
98 expNewId = 1
99 sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
100 self._iDb.execute( sqlCmd )
101 obsTupleInBinTable = self._iDb.cursor.fetchall()
102 self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
103 expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (1L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L))
104 sqlCmd = "SELECT * FROM %s" % ( self._tableName )
105 self._iDb.execute( sqlCmd )
106 obsTupleInSetTable = self._iDb.cursor.fetchall()
107 self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
108 self.assertEquals(expNewId, obsNewId)
109
110 def test_joinTwoSetsFromSetAndBinTable_with_reversed_id(self):
111 id1 = 2
112 id2 = 1
113 self._iDb.createBinSetTable(self._tableName, True)
114 obsNewId = self._iTableBinSetAdaptator.joinTwoSetsFromSetAndBinTable(id1, id2)
115 expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (1L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L))
116 expNewId = 1
117 sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
118 self._iDb.execute( sqlCmd )
119 obsTupleInBinTable = self._iDb.cursor.fetchall()
120 self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
121 expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (1L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L))
122 sqlCmd = "SELECT * FROM %s" % ( self._tableName )
123 self._iDb.execute( sqlCmd )
124 obsTupleInSetTable = self._iDb.cursor.fetchall()
125 self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
126 self.assertEquals(expNewId, obsNewId)
127
128 def test_getNewId(self):
129 self._iDb.createBinSetTable(self._tableName, True)
130 obsNewId = self._iTableBinSetAdaptator.getNewId()
131 expNewId = 4
132 self.assertEquals(expNewId, obsNewId)
133
134 def test_getNewId_empty_table(self):
135 self._iDb.dropTable( self._tableName )
136 self._iDb.dropTable( self._tableName_bin )
137 setF = open( self._setFileName, "w" )
138 setF.close()
139 self._iDb.createTable( self._tableName, "set", self._setFileName )
140 self._iDb.createBinSetTable(self._tableName, True)
141 obsNewId = self._iTableBinSetAdaptator.getNewId()
142 expNewId = 1
143 self.assertEquals(expNewId, obsNewId)
144
145 def test_getSetListFromQueryCoord(self):
146 start = 10
147 end = 4000
148 seqName = 'chr1'
149 self._iDb.createBinSetTable(self._tableName, True)
150 obsLSet = self._iTableBinSetAdaptator.getSetListFromQueryCoord(seqName, start, end)
151 iSet1 = Set(1, "seq1", "chr1", 1900, 3900)
152 iSet2 = Set(3, "seq3", "chr1", 8, 13)
153 expLSet = [iSet1, iSet2]
154 self.assertEquals(expLSet, obsLSet)
155
156 def test_getSetListFromQueryCoord_return_empty_list(self):
157 start = 4000
158 end = 40000
159 seqName = 'chr1'
160 self._iDb.createBinSetTable(self._tableName, True)
161 obsLSet = self._iTableBinSetAdaptator.getSetListFromQueryCoord(seqName, start, end)
162 expLSet = []
163 self.assertEquals(expLSet, obsLSet)
164
165 def test_getSetListStrictlyIncludedInQueryCoord(self):
166 start = 10
167 end = 4000
168 seqName = 'chr1'
169 self._iDb.createBinSetTable(self._tableName, True)
170 obsLSet = self._iTableBinSetAdaptator.getSetListStrictlyIncludedInQueryCoord(seqName, start, end)
171 iSet1 = Set(1, "seq1", "chr1", 1900, 3900)
172 expLSet = [iSet1]
173 self.assertEquals(expLSet, obsLSet)
174
175 def test_getSetListStrictlyIncludedInQueryCoord_return_empty_list(self):
176 start = 4000
177 end = 40000
178 seqName = 'chr1'
179 self._iDb.createBinSetTable(self._tableName, True)
180 obsLSet = self._iTableBinSetAdaptator.getSetListStrictlyIncludedInQueryCoord(seqName, start, end)
181 expLSet = []
182 self.assertEquals(expLSet, obsLSet)
183
184 def test_getIdList(self):
185 expLId = [1,2,3]
186 self._iDb.createBinSetTable(self._tableName, True)
187 obsLId = self._iTableBinSetAdaptator.getIdList()
188 self.assertEquals(expLId, obsLId)
189
190 def test_getSeqNameList(self):
191 self._iDb.dropTable( self._tableName )
192 self._iDb.dropTable( self._tableName_bin )
193 setF = open( self._setFileName, "w" )
194 setF.write("1\tseq1\tchr2\t1900\t3900\n")
195 setF.write("2\tseq2\tchr1\t2\t9\n")
196 setF.write("3\tseq3\tchr1\t8\t13\n")
197 setF.close()
198 self._iDb.createTable( self._tableName, "set", self._setFileName )
199 self._iDb.createBinSetTable(self._tableName, True)
200 expLSeqName = ["chr1", "chr2"]
201 obsLSeqName = self._iTableBinSetAdaptator.getSeqNameList()
202 self.assertEquals(expLSeqName, obsLSeqName)
203
204 def test_insertListInSetAndBinTable(self):
205 iSet1 = Set(1, "seq4", "chr1", 100, 390)
206 iSet2 = Set(2, "seq5", "chr1", 1, 13)
207 lSet = [iSet1, iSet2]
208 self._iDb.createBinSetTable(self._tableName, True)
209 self._iTableBinSetAdaptator.insertListInSetAndBinTable(lSet)
210 expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L), (4L, 1000.0, 'chr1', 100L, 390L, 1L), (4L, 1000.0, 'chr1', 1L, 13L, 1L))
211 sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
212 self._iDb.execute( sqlCmd )
213 obsTupleInBinTable = self._iDb.cursor.fetchall()
214 self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
215 expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L), (4L, 'seq4', 'chr1', 100L, 390L), (4L, 'seq5', 'chr1', 1L, 13L))
216 sqlCmd = "SELECT * FROM %s" % ( self._tableName )
217 self._iDb.execute( sqlCmd )
218 obsTupleInSetTable = self._iDb.cursor.fetchall()
219 self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
220
221 def test_insertListInSetAndBinTableAndMergeAllSets(self):
222 iSet1 = Set(1, "seq4", "chr1", 100, 390)
223 iSet2 = Set(2, "seq5", "chr1", 1, 13)
224 lSet = [iSet1, iSet2]
225 self._iDb.createBinSetTable(self._tableName, True)
226 self._iTableBinSetAdaptator.insertListInSetAndBinTableAndMergeAllSets(lSet)
227 expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (5L, 1000.0, 'chr1', 1L, 13L, 1L), (4L, 1000.0, 'chr1', 100L, 390L, 1L))
228 sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
229 self._iDb.execute( sqlCmd )
230 obsTupleInBinTable = self._iDb.cursor.fetchall()
231 self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
232 expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (5L, 'seq5', 'chr1', 1L, 13L), (4L, 'seq4', 'chr1', 100L, 390L) )
233 sqlCmd = "SELECT * FROM %s" % ( self._tableName )
234 self._iDb.execute( sqlCmd )
235 obsTupleInSetTable = self._iDb.cursor.fetchall()
236 self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
237
238 def test_insertListInSetAndBinTableAndRemoveOverlaps(self):
239 iSet1 = Set(1, "seq4", "chr1", 100, 390)
240 iSet2 = Set(2, "seq5", "chr1", 1, 13)
241 lSet = [iSet1, iSet2]
242 self._iDb.createBinSetTable(self._tableName, True)
243 self._iTableBinSetAdaptator.insertListInSetAndBinTableAndRemoveOverlaps(lSet)
244 expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L), (4L, 1000.0, 'chr1', 100L, 390L, 1L))
245 sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
246 self._iDb.execute( sqlCmd )
247 obsTupleInBinTable = self._iDb.cursor.fetchall()
248 self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
249 expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L), (4L, 'seq4', 'chr1', 100L, 390L))
250 sqlCmd = "SELECT * FROM %s" % ( self._tableName )
251 self._iDb.execute( sqlCmd )
252 obsTupleInSetTable = self._iDb.cursor.fetchall()
253 self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
254
255 def test_insertListInSetAndBinTableAndRemoveOverlaps_Without_Overlaps(self):
256 iSet1 = Set(1, "seq4", "chr1", 100, 390)
257 iSet2 = Set(2, "seq5", "chr1", 50, 65)
258 lSet = [iSet1, iSet2]
259 self._iDb.createBinSetTable(self._tableName, True)
260 self._iTableBinSetAdaptator.insertListInSetAndBinTableAndRemoveOverlaps(lSet)
261 expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L), (4L, 1000.0, 'chr1', 100L, 390L, 1L), (5L, 1000.0, 'chr1', 50L, 65L, 1L))
262 sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
263 self._iDb.execute( sqlCmd )
264 obsTupleInBinTable = self._iDb.cursor.fetchall()
265 self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
266 expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L), (4L, 'seq4', 'chr1', 100L, 390L), (5L, 'seq5', 'chr1', 50L, 65L))
267 sqlCmd = "SELECT * FROM %s" % ( self._tableName )
268 self._iDb.execute( sqlCmd )
269 obsTupleInSetTable = self._iDb.cursor.fetchall()
270 self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
271
272 def test_insertListInSetAndBinTableAndRemoveOverlaps_With_Only_Overlaps(self):
273 iSet1 = Set(1, "seq4", "chr1", 1, 5)
274 iSet2 = Set(2, "seq5", "chr1", 8, 13)
275 lSet = [iSet1, iSet2]
276 self._iDb.createBinSetTable(self._tableName, True)
277 self._iTableBinSetAdaptator.insertListInSetAndBinTableAndRemoveOverlaps(lSet)
278 expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L))
279 sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin )
280 self._iDb.execute( sqlCmd )
281 obsTupleInBinTable = self._iDb.cursor.fetchall()
282 self.assertEquals(expTupleInBinTable, obsTupleInBinTable)
283 expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L))
284 sqlCmd = "SELECT * FROM %s" % ( self._tableName )
285 self._iDb.execute( sqlCmd )
286 obsTupleInSetTable = self._iDb.cursor.fetchall()
287 self.assertEquals(expTupleInSetTable, obsTupleInSetTable)
288
289 if __name__ == "__main__":
290 unittest.main()