Mercurial > repos > yufei-luo > s_mart
comparison smart_toolShed/commons/core/sql/test/Test_TableBinSetAdaptator.py @ 0:e0f8dcca02ed
Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author | yufei-luo |
---|---|
date | Thu, 17 Jan 2013 10:52:14 -0500 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:e0f8dcca02ed |
---|---|
1 import unittest | |
2 import os | |
3 import time | |
4 from commons.core.sql.TableBinSetAdaptator import TableBinSetAdaptator | |
5 from commons.core.coord.Set import Set | |
6 from commons.core.sql.DbFactory import DbFactory | |
7 | |
8 class Test_TableBinSetAdaptator(unittest.TestCase): | |
9 | |
10 def setUp(self): | |
11 self._uniqId = "%s_%s" % (time.strftime("%Y%m%d%H%M%S") , os.getpid()) | |
12 self._iDb = DbFactory.createInstance() | |
13 radicalTableName = "dummySetTable" | |
14 self._tableName = "%s_%s" % (radicalTableName, self._uniqId) | |
15 self._tableName_bin = "%s_idx" % self._tableName | |
16 self._setFileName = "dummySetFile_%s" % self._uniqId | |
17 setF = open( self._setFileName, "w" ) | |
18 setF.write("1\tseq1\tchr1\t1900\t3900\n") | |
19 setF.write("2\tseq2\tchr1\t2\t9\n") | |
20 setF.write("3\tseq3\tchr1\t8\t13\n") | |
21 setF.close() | |
22 self._iDb.createTable(self._tableName, "set", self._setFileName) | |
23 self._iTableBinSetAdaptator = TableBinSetAdaptator(self._iDb, self._tableName) | |
24 | |
25 def tearDown(self): | |
26 self._iDb.dropTable( self._tableName ) | |
27 self._iDb.dropTable( self._tableName_bin ) | |
28 self._iDb.close() | |
29 if os.path.exists(self._setFileName): | |
30 os.remove(self._setFileName) | |
31 | |
32 def test_insASetInSetAndBinTable(self): | |
33 iSet = Set(1, "set1", "seq1", 2, 1) | |
34 self._iDb.createBinSetTable(self._tableName, True) | |
35 self._iTableBinSetAdaptator.insASetInSetAndBinTable(iSet) | |
36 expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L), (1L, 1000.0, 'seq1', 1L, 2L, 0L)) | |
37 sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin ) | |
38 self._iDb.execute( sqlCmd ) | |
39 obsTupleInBinTable = self._iDb.cursor.fetchall() | |
40 self.assertEquals(expTupleInBinTable, obsTupleInBinTable) | |
41 expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L), (1L, 'set1', 'seq1', 2L, 1L)) | |
42 sqlCmd = "SELECT * FROM %s" % ( self._tableName ) | |
43 self._iDb.execute( sqlCmd ) | |
44 obsTupleInSetTable = self._iDb.cursor.fetchall() | |
45 self.assertEquals(expTupleInSetTable, obsTupleInSetTable) | |
46 | |
47 def test_insASetInSetAndBinTable_delayedCase(self): | |
48 iSet = Set(1, "set1", "seq1", 2, 1) | |
49 self._iDb.createBinSetTable(self._tableName, True) | |
50 self._iTableBinSetAdaptator.insASetInSetAndBinTable(iSet, True) | |
51 expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L), (1L, 1000.0, 'seq1', 1L, 2L, 0L)) | |
52 sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin ) | |
53 self._iDb.execute( sqlCmd ) | |
54 obsTupleInBinTable = self._iDb.cursor.fetchall() | |
55 self.assertEquals(expTupleInBinTable, obsTupleInBinTable) | |
56 expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L), (1L, 'set1', 'seq1', 2L, 1L)) | |
57 sqlCmd = "SELECT * FROM %s" % ( self._tableName ) | |
58 self._iDb.execute( sqlCmd ) | |
59 obsTupleInSetTable = self._iDb.cursor.fetchall() | |
60 self.assertEquals(expTupleInSetTable, obsTupleInSetTable) | |
61 | |
62 def test_deleteFromIdFromSetAndBinTable(self): | |
63 self._iDb.createBinSetTable(self._tableName, True) | |
64 self._iTableBinSetAdaptator.deleteFromIdFromSetAndBinTable(2) | |
65 expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L)) | |
66 sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin ) | |
67 self._iDb.execute( sqlCmd ) | |
68 obsTupleInBinTable = self._iDb.cursor.fetchall() | |
69 self.assertEquals(expTupleInBinTable, obsTupleInBinTable) | |
70 expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (3L, 'seq3', 'chr1', 8L, 13L)) | |
71 sqlCmd = "SELECT * FROM %s" % ( self._tableName ) | |
72 self._iDb.execute( sqlCmd ) | |
73 obsTupleInSetTable = self._iDb.cursor.fetchall() | |
74 self.assertEquals(expTupleInSetTable, obsTupleInSetTable) | |
75 | |
76 def test_deleteFromListIdFromSetAndBinTable(self): | |
77 lSetToRemove = [1,2] | |
78 self._iDb.createBinSetTable(self._tableName, True) | |
79 self._iTableBinSetAdaptator.deleteFromListIdFromSetAndBinTable(lSetToRemove) | |
80 expTupleInBinTable = ((3L, 1000.0, 'chr1', 8L, 13L, 1L),) | |
81 sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin ) | |
82 self._iDb.execute( sqlCmd ) | |
83 obsTupleInBinTable = self._iDb.cursor.fetchall() | |
84 self.assertEquals(expTupleInBinTable, obsTupleInBinTable) | |
85 expTupleInSetTable = ((3L, 'seq3', 'chr1', 8L, 13L),) | |
86 sqlCmd = "SELECT * FROM %s" % ( self._tableName ) | |
87 self._iDb.execute( sqlCmd ) | |
88 obsTupleInSetTable = self._iDb.cursor.fetchall() | |
89 self.assertEquals(expTupleInSetTable, obsTupleInSetTable) | |
90 os.remove(self._setFileName) | |
91 | |
92 def test_joinTwoSetsFromSetAndBinTable(self): | |
93 id1 = 1 | |
94 id2 = 2 | |
95 self._iDb.createBinSetTable(self._tableName, True) | |
96 obsNewId = self._iTableBinSetAdaptator.joinTwoSetsFromSetAndBinTable(id1, id2) | |
97 expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (1L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L)) | |
98 expNewId = 1 | |
99 sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin ) | |
100 self._iDb.execute( sqlCmd ) | |
101 obsTupleInBinTable = self._iDb.cursor.fetchall() | |
102 self.assertEquals(expTupleInBinTable, obsTupleInBinTable) | |
103 expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (1L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L)) | |
104 sqlCmd = "SELECT * FROM %s" % ( self._tableName ) | |
105 self._iDb.execute( sqlCmd ) | |
106 obsTupleInSetTable = self._iDb.cursor.fetchall() | |
107 self.assertEquals(expTupleInSetTable, obsTupleInSetTable) | |
108 self.assertEquals(expNewId, obsNewId) | |
109 | |
110 def test_joinTwoSetsFromSetAndBinTable_with_reversed_id(self): | |
111 id1 = 2 | |
112 id2 = 1 | |
113 self._iDb.createBinSetTable(self._tableName, True) | |
114 obsNewId = self._iTableBinSetAdaptator.joinTwoSetsFromSetAndBinTable(id1, id2) | |
115 expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (1L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L)) | |
116 expNewId = 1 | |
117 sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin ) | |
118 self._iDb.execute( sqlCmd ) | |
119 obsTupleInBinTable = self._iDb.cursor.fetchall() | |
120 self.assertEquals(expTupleInBinTable, obsTupleInBinTable) | |
121 expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (1L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L)) | |
122 sqlCmd = "SELECT * FROM %s" % ( self._tableName ) | |
123 self._iDb.execute( sqlCmd ) | |
124 obsTupleInSetTable = self._iDb.cursor.fetchall() | |
125 self.assertEquals(expTupleInSetTable, obsTupleInSetTable) | |
126 self.assertEquals(expNewId, obsNewId) | |
127 | |
128 def test_getNewId(self): | |
129 self._iDb.createBinSetTable(self._tableName, True) | |
130 obsNewId = self._iTableBinSetAdaptator.getNewId() | |
131 expNewId = 4 | |
132 self.assertEquals(expNewId, obsNewId) | |
133 | |
134 def test_getNewId_empty_table(self): | |
135 self._iDb.dropTable( self._tableName ) | |
136 self._iDb.dropTable( self._tableName_bin ) | |
137 setF = open( self._setFileName, "w" ) | |
138 setF.close() | |
139 self._iDb.createTable( self._tableName, "set", self._setFileName ) | |
140 self._iDb.createBinSetTable(self._tableName, True) | |
141 obsNewId = self._iTableBinSetAdaptator.getNewId() | |
142 expNewId = 1 | |
143 self.assertEquals(expNewId, obsNewId) | |
144 | |
145 def test_getSetListFromQueryCoord(self): | |
146 start = 10 | |
147 end = 4000 | |
148 seqName = 'chr1' | |
149 self._iDb.createBinSetTable(self._tableName, True) | |
150 obsLSet = self._iTableBinSetAdaptator.getSetListFromQueryCoord(seqName, start, end) | |
151 iSet1 = Set(1, "seq1", "chr1", 1900, 3900) | |
152 iSet2 = Set(3, "seq3", "chr1", 8, 13) | |
153 expLSet = [iSet1, iSet2] | |
154 self.assertEquals(expLSet, obsLSet) | |
155 | |
156 def test_getSetListFromQueryCoord_return_empty_list(self): | |
157 start = 4000 | |
158 end = 40000 | |
159 seqName = 'chr1' | |
160 self._iDb.createBinSetTable(self._tableName, True) | |
161 obsLSet = self._iTableBinSetAdaptator.getSetListFromQueryCoord(seqName, start, end) | |
162 expLSet = [] | |
163 self.assertEquals(expLSet, obsLSet) | |
164 | |
165 def test_getSetListStrictlyIncludedInQueryCoord(self): | |
166 start = 10 | |
167 end = 4000 | |
168 seqName = 'chr1' | |
169 self._iDb.createBinSetTable(self._tableName, True) | |
170 obsLSet = self._iTableBinSetAdaptator.getSetListStrictlyIncludedInQueryCoord(seqName, start, end) | |
171 iSet1 = Set(1, "seq1", "chr1", 1900, 3900) | |
172 expLSet = [iSet1] | |
173 self.assertEquals(expLSet, obsLSet) | |
174 | |
175 def test_getSetListStrictlyIncludedInQueryCoord_return_empty_list(self): | |
176 start = 4000 | |
177 end = 40000 | |
178 seqName = 'chr1' | |
179 self._iDb.createBinSetTable(self._tableName, True) | |
180 obsLSet = self._iTableBinSetAdaptator.getSetListStrictlyIncludedInQueryCoord(seqName, start, end) | |
181 expLSet = [] | |
182 self.assertEquals(expLSet, obsLSet) | |
183 | |
184 def test_getIdList(self): | |
185 expLId = [1,2,3] | |
186 self._iDb.createBinSetTable(self._tableName, True) | |
187 obsLId = self._iTableBinSetAdaptator.getIdList() | |
188 self.assertEquals(expLId, obsLId) | |
189 | |
190 def test_getSeqNameList(self): | |
191 self._iDb.dropTable( self._tableName ) | |
192 self._iDb.dropTable( self._tableName_bin ) | |
193 setF = open( self._setFileName, "w" ) | |
194 setF.write("1\tseq1\tchr2\t1900\t3900\n") | |
195 setF.write("2\tseq2\tchr1\t2\t9\n") | |
196 setF.write("3\tseq3\tchr1\t8\t13\n") | |
197 setF.close() | |
198 self._iDb.createTable( self._tableName, "set", self._setFileName ) | |
199 self._iDb.createBinSetTable(self._tableName, True) | |
200 expLSeqName = ["chr1", "chr2"] | |
201 obsLSeqName = self._iTableBinSetAdaptator.getSeqNameList() | |
202 self.assertEquals(expLSeqName, obsLSeqName) | |
203 | |
204 def test_insertListInSetAndBinTable(self): | |
205 iSet1 = Set(1, "seq4", "chr1", 100, 390) | |
206 iSet2 = Set(2, "seq5", "chr1", 1, 13) | |
207 lSet = [iSet1, iSet2] | |
208 self._iDb.createBinSetTable(self._tableName, True) | |
209 self._iTableBinSetAdaptator.insertListInSetAndBinTable(lSet) | |
210 expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L), (4L, 1000.0, 'chr1', 100L, 390L, 1L), (4L, 1000.0, 'chr1', 1L, 13L, 1L)) | |
211 sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin ) | |
212 self._iDb.execute( sqlCmd ) | |
213 obsTupleInBinTable = self._iDb.cursor.fetchall() | |
214 self.assertEquals(expTupleInBinTable, obsTupleInBinTable) | |
215 expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L), (4L, 'seq4', 'chr1', 100L, 390L), (4L, 'seq5', 'chr1', 1L, 13L)) | |
216 sqlCmd = "SELECT * FROM %s" % ( self._tableName ) | |
217 self._iDb.execute( sqlCmd ) | |
218 obsTupleInSetTable = self._iDb.cursor.fetchall() | |
219 self.assertEquals(expTupleInSetTable, obsTupleInSetTable) | |
220 | |
221 def test_insertListInSetAndBinTableAndMergeAllSets(self): | |
222 iSet1 = Set(1, "seq4", "chr1", 100, 390) | |
223 iSet2 = Set(2, "seq5", "chr1", 1, 13) | |
224 lSet = [iSet1, iSet2] | |
225 self._iDb.createBinSetTable(self._tableName, True) | |
226 self._iTableBinSetAdaptator.insertListInSetAndBinTableAndMergeAllSets(lSet) | |
227 expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (5L, 1000.0, 'chr1', 1L, 13L, 1L), (4L, 1000.0, 'chr1', 100L, 390L, 1L)) | |
228 sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin ) | |
229 self._iDb.execute( sqlCmd ) | |
230 obsTupleInBinTable = self._iDb.cursor.fetchall() | |
231 self.assertEquals(expTupleInBinTable, obsTupleInBinTable) | |
232 expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (5L, 'seq5', 'chr1', 1L, 13L), (4L, 'seq4', 'chr1', 100L, 390L) ) | |
233 sqlCmd = "SELECT * FROM %s" % ( self._tableName ) | |
234 self._iDb.execute( sqlCmd ) | |
235 obsTupleInSetTable = self._iDb.cursor.fetchall() | |
236 self.assertEquals(expTupleInSetTable, obsTupleInSetTable) | |
237 | |
238 def test_insertListInSetAndBinTableAndRemoveOverlaps(self): | |
239 iSet1 = Set(1, "seq4", "chr1", 100, 390) | |
240 iSet2 = Set(2, "seq5", "chr1", 1, 13) | |
241 lSet = [iSet1, iSet2] | |
242 self._iDb.createBinSetTable(self._tableName, True) | |
243 self._iTableBinSetAdaptator.insertListInSetAndBinTableAndRemoveOverlaps(lSet) | |
244 expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L), (4L, 1000.0, 'chr1', 100L, 390L, 1L)) | |
245 sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin ) | |
246 self._iDb.execute( sqlCmd ) | |
247 obsTupleInBinTable = self._iDb.cursor.fetchall() | |
248 self.assertEquals(expTupleInBinTable, obsTupleInBinTable) | |
249 expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L), (4L, 'seq4', 'chr1', 100L, 390L)) | |
250 sqlCmd = "SELECT * FROM %s" % ( self._tableName ) | |
251 self._iDb.execute( sqlCmd ) | |
252 obsTupleInSetTable = self._iDb.cursor.fetchall() | |
253 self.assertEquals(expTupleInSetTable, obsTupleInSetTable) | |
254 | |
255 def test_insertListInSetAndBinTableAndRemoveOverlaps_Without_Overlaps(self): | |
256 iSet1 = Set(1, "seq4", "chr1", 100, 390) | |
257 iSet2 = Set(2, "seq5", "chr1", 50, 65) | |
258 lSet = [iSet1, iSet2] | |
259 self._iDb.createBinSetTable(self._tableName, True) | |
260 self._iTableBinSetAdaptator.insertListInSetAndBinTableAndRemoveOverlaps(lSet) | |
261 expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L), (4L, 1000.0, 'chr1', 100L, 390L, 1L), (5L, 1000.0, 'chr1', 50L, 65L, 1L)) | |
262 sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin ) | |
263 self._iDb.execute( sqlCmd ) | |
264 obsTupleInBinTable = self._iDb.cursor.fetchall() | |
265 self.assertEquals(expTupleInBinTable, obsTupleInBinTable) | |
266 expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L), (4L, 'seq4', 'chr1', 100L, 390L), (5L, 'seq5', 'chr1', 50L, 65L)) | |
267 sqlCmd = "SELECT * FROM %s" % ( self._tableName ) | |
268 self._iDb.execute( sqlCmd ) | |
269 obsTupleInSetTable = self._iDb.cursor.fetchall() | |
270 self.assertEquals(expTupleInSetTable, obsTupleInSetTable) | |
271 | |
272 def test_insertListInSetAndBinTableAndRemoveOverlaps_With_Only_Overlaps(self): | |
273 iSet1 = Set(1, "seq4", "chr1", 1, 5) | |
274 iSet2 = Set(2, "seq5", "chr1", 8, 13) | |
275 lSet = [iSet1, iSet2] | |
276 self._iDb.createBinSetTable(self._tableName, True) | |
277 self._iTableBinSetAdaptator.insertListInSetAndBinTableAndRemoveOverlaps(lSet) | |
278 expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L)) | |
279 sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin ) | |
280 self._iDb.execute( sqlCmd ) | |
281 obsTupleInBinTable = self._iDb.cursor.fetchall() | |
282 self.assertEquals(expTupleInBinTable, obsTupleInBinTable) | |
283 expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L)) | |
284 sqlCmd = "SELECT * FROM %s" % ( self._tableName ) | |
285 self._iDb.execute( sqlCmd ) | |
286 obsTupleInSetTable = self._iDb.cursor.fetchall() | |
287 self.assertEquals(expTupleInSetTable, obsTupleInSetTable) | |
288 | |
289 if __name__ == "__main__": | |
290 unittest.main() |