6
|
1 # Copyright INRA (Institut National de la Recherche Agronomique)
|
|
2 # http://www.inra.fr
|
|
3 # http://urgi.versailles.inra.fr
|
|
4 #
|
|
5 # This software is governed by the CeCILL license under French law and
|
|
6 # abiding by the rules of distribution of free software. You can use,
|
|
7 # modify and/ or redistribute the software under the terms of the CeCILL
|
|
8 # license as circulated by CEA, CNRS and INRIA at the following URL
|
|
9 # "http://www.cecill.info".
|
|
10 #
|
|
11 # As a counterpart to the access to the source code and rights to copy,
|
|
12 # modify and redistribute granted by the license, users are provided only
|
|
13 # with a limited warranty and the software's author, the holder of the
|
|
14 # economic rights, and the successive licensors have only limited
|
|
15 # liability.
|
|
16 #
|
|
17 # In this respect, the user's attention is drawn to the risks associated
|
|
18 # with loading, using, modifying and/or developing or reproducing the
|
|
19 # software by the user in light of its specific status of free software,
|
|
20 # that may mean that it is complicated to manipulate, and that also
|
|
21 # therefore means that it is reserved for developers and experienced
|
|
22 # professionals having in-depth computer knowledge. Users are therefore
|
|
23 # encouraged to load and test the software's suitability as regards their
|
|
24 # requirements in conditions enabling the security of their systems and/or
|
|
25 # data to be ensured and, more generally, to use and operate it in the
|
|
26 # same conditions as regards security.
|
|
27 #
|
|
28 # The fact that you are presently reading this means that you have had
|
|
29 # knowledge of the CeCILL license and that you accept its terms.
|
|
30
|
|
31
|
|
32 import unittest
|
|
33 import time
|
|
34 import os
|
|
35 from commons.core.sql.TableSetAdaptator import TableSetAdaptator
|
|
36 from commons.core.sql.DbMySql import DbMySql
|
|
37 from commons.core.coord.Set import Set
|
|
38
|
|
39
|
|
40 class Test_TableSetAdaptator( unittest.TestCase ):
|
|
41
|
|
42 def setUp( self ):
|
|
43 self._uniqId = "%s_%s" % ( time.strftime("%Y%m%d%H%M%S") , os.getpid() )
|
|
44 self._configFileName = "dummyConfigFile_%s" % ( self._uniqId )
|
|
45 configF = open(self._configFileName, "w" )
|
|
46 configF.write( "[repet_env]\n" )
|
|
47 configF.write( "repet_host: %s\n" % ( os.environ["REPET_HOST"] ) )
|
|
48 configF.write( "repet_user: %s\n" % ( os.environ["REPET_USER"] ) )
|
|
49 configF.write( "repet_pw: %s\n" % ( os.environ["REPET_PW"] ) )
|
|
50 configF.write( "repet_db: %s\n" % ( os.environ["REPET_DB"] ) )
|
|
51 configF.write( "repet_port: %s\n" % ( os.environ["REPET_PORT"] ) )
|
|
52 configF.close()
|
|
53 self._iDb = DbMySql( cfgFileName=self._configFileName )
|
|
54 self._table = "dummySetTable_%s" % ( self._uniqId )
|
|
55 self._tSetA = TableSetAdaptator( self._iDb, self._table )
|
|
56
|
|
57 def tearDown( self ):
|
|
58 self._uniqId = None
|
|
59 self._iDb.dropTable( self._table )
|
|
60 self._iDb.close()
|
|
61 self._table = None
|
|
62 self._tSetA = None
|
|
63 os.remove( self._configFileName )
|
|
64 self._configFileName = ""
|
|
65
|
|
66 def test_insert(self):
|
|
67 set2Insert = Set()
|
|
68 set2Insert.id = 1
|
|
69 set2Insert.name = "name1"
|
|
70 set2Insert.seqname = "name2"
|
|
71 set2Insert.start = 1L
|
|
72 set2Insert.end = 50L
|
|
73 self._iDb.createTable( self._table, "set", "" )
|
|
74 self._tSetA.insert( set2Insert, False )
|
|
75 sqlCmd = "SELECT * FROM %s" % ( self._table )
|
|
76 self._iDb.execute( sqlCmd )
|
|
77 expTsetTuple = ((1, "name1", "name2", 1L, 50L),)
|
|
78 obsTsetTuples = self._iDb.cursor.fetchall()
|
|
79 self.assertEquals(expTsetTuple, obsTsetTuples )
|
|
80
|
|
81 def test_insertList ( self ):
|
|
82 self._iDb.createTable( self._table, "set", "" )
|
|
83 set1 = Set()
|
|
84 set1.setFromString( "1\tname1\tdesc1\t1\t120\n" )
|
|
85 set2 = Set()
|
|
86 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
|
|
87 lset = [ set1, set2 ]
|
|
88 self._tSetA.insertList( lset )
|
|
89 sqlCmd = "SELECT * FROM %s" % ( self._table )
|
|
90 self._iDb.execute( sqlCmd )
|
|
91 expTsetTuple = ((1, "name1", "desc1", 1, 120), (2, "name2", "desc2", 1, 20))
|
|
92 obsTsetTuples = self._iDb.cursor.fetchall()
|
|
93 self.assertEqual( expTsetTuple, obsTsetTuples )
|
|
94
|
|
95 def test_getIdList(self):
|
|
96 set2Insert = Set()
|
|
97 set2Insert.id = 1
|
|
98 set2Insert.name = "name1"
|
|
99 set2Insert.seqname = "name2"
|
|
100 set2Insert.start = 1
|
|
101 set2Insert.end = 50
|
|
102 self._iDb.createTable( self._table, "set", "" )
|
|
103 self._tSetA.insert( set2Insert )
|
|
104 l = self._tSetA.getIdList()
|
|
105 self.assertEquals( set2Insert.id, l[0] )
|
|
106
|
|
107 def test_getSeqNameList(self):
|
|
108 self._iDb.createTable( self._table, "set", "" )
|
|
109 set1 = Set()
|
|
110 set1.setFromString( "1\tname1\tdesc1\t1\t120\n" )
|
|
111 set2 = Set()
|
|
112 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
|
|
113 set3 = Set()
|
|
114 set3.setFromString( "2\tname2\tdesc2\t1\t50\n" )
|
|
115 for m in [ set1, set2, set3 ]: self._tSetA.insert( m )
|
|
116 lExp = ["desc1", "desc2"]
|
|
117 lObs = self._tSetA.getSeqNameList()
|
|
118 self.assertEqual( lObs, lExp )
|
|
119
|
|
120 def test_getSetListFromSeqName(self):
|
|
121 self._iDb.createTable( self._table, "set", "" )
|
|
122 set1 = Set()
|
|
123 set1.setFromString( "1\tname1\tdesc1\t1\t120\n" )
|
|
124 set2 = Set()
|
|
125 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
|
|
126 set3 = Set()
|
|
127 set3.setFromString( "3\tname2\tdesc2\t1\t50\n" )
|
|
128 for m in [ set1, set2, set3 ]: self._tSetA.insert( m )
|
|
129 explSet = [Set( 2,"name2", "desc2", 1, 20), Set( 3,"name2", "desc2", 1, 50)]
|
|
130 obslSet = self._tSetA.getSetListFromSeqName("desc2")
|
|
131 self.assertEqual( explSet, obslSet )
|
|
132
|
|
133 def test_getSetListFromId(self):
|
|
134 self._iDb.createTable( self._table, "set", "" )
|
|
135 set1 = Set()
|
|
136 set1.setFromString( "1\tname1\tdesc1\t1\t120\n" )
|
|
137 set2 = Set()
|
|
138 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
|
|
139 lset = [ set1, set2 ]
|
|
140 self._tSetA.insertList( lset )
|
|
141 explSet = [Set( 2,"name2", "desc2", 1, 20)]
|
|
142 obslSet = self._tSetA.getSetListFromId(2)
|
|
143 self.assertEqual( explSet, obslSet )
|
|
144
|
|
145 def test_getSetListFromIdList(self):
|
|
146 self._iDb.createTable( self._table, "set", "" )
|
|
147 set1 = Set()
|
|
148 set1.setFromString( "1\tname1\tdesc1\t1\t120\n" )
|
|
149 set2 = Set()
|
|
150 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
|
|
151 lset = [ set1, set2 ]
|
|
152 self._tSetA.insertList( lset )
|
|
153 explSet = [Set( 1, "name1", "desc1", 1, 120), Set( 2,"name2", "desc2", 1, 20)]
|
|
154 lId = [1, 2]
|
|
155 obslSet = self._tSetA.getSetListFromIdList(lId)
|
|
156 self.assertEqual( explSet, obslSet )
|
|
157
|
|
158 def test_getSetListFromIdList_emptyList(self):
|
|
159 self._iDb.createTable( self._table, "set", "" )
|
|
160 set1 = Set()
|
|
161 set1.setFromString( "1\tname1\tdesc1\t1\t120\n" )
|
|
162 set2 = Set()
|
|
163 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
|
|
164 lset = [ set1, set2 ]
|
|
165 self._tSetA.insertList( lset )
|
|
166 explSet = []
|
|
167 lId = []
|
|
168 obslSet = self._tSetA.getSetListFromIdList(lId)
|
|
169 self.assertEqual( explSet, obslSet )
|
|
170
|
|
171 def test_getSetListOverlappingCoord(self):
|
|
172 self._iDb.createTable( self._table, "set", "" )
|
|
173 set1 = Set()
|
|
174 set1.setFromString( "1\tname1\tdesc2\t1\t120\n" )
|
|
175 set2 = Set()
|
|
176 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
|
|
177 lset = [ set1, set2 ]
|
|
178 self._tSetA.insertList( lset )
|
|
179 explSet = [Set( 1,"name1", "desc2", 1, 120)]
|
|
180 obslSet = self._tSetA.getSetListOverlappingCoord("desc2", 30, 70)
|
|
181 self.assertEqual( explSet, obslSet )
|
|
182
|
|
183 def test_deleteFromId(self):
|
|
184 self._iDb.createTable( self._table, "set", "" )
|
|
185 set1 = Set()
|
|
186 set1.setFromString( "1\tname1\tdesc1\t1\t120\n" )
|
|
187 set2 = Set()
|
|
188 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
|
|
189 set3 = Set()
|
|
190 set3.setFromString( "3\tname2\tdesc3\t1\t50\n" )
|
|
191 for m in [ set1, set2, set3 ]: self._tSetA.insert( m )
|
|
192 self._tSetA.deleteFromId(1)
|
|
193 expTSetTuples = (( 2,"name2", "desc2", 1, 20), ( 3,"name2", "desc3", 1, 50))
|
|
194 sqlCmd = "SELECT * FROM %s" % ( self._table )
|
|
195 self._iDb.execute( sqlCmd )
|
|
196 obsTsetTuples = self._iDb.cursor.fetchall()
|
|
197
|
|
198 self.assertEqual( expTSetTuples, obsTsetTuples )
|
|
199
|
|
200 def test_deleteFromIdList(self):
|
|
201 self._iDb.createTable( self._table, "set", "" )
|
|
202 set1 = Set()
|
|
203 set1.setFromString( "1\tname1\tdesc1\t1\t120\n" )
|
|
204 set2 = Set()
|
|
205 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
|
|
206 set3 = Set()
|
|
207 set3.setFromString( "3\tname2\tdesc3\t1\t50\n" )
|
|
208 for m in [ set1, set2, set3 ]: self._tSetA.insert( m )
|
|
209 lId2del = [1, 2]
|
|
210 self._tSetA.deleteFromIdList(lId2del)
|
|
211 expTSetTuples = (( 3,"name2", "desc3", 1, 50),)
|
|
212 sqlCmd = "SELECT * FROM %s" % ( self._table )
|
|
213 self._iDb.execute( sqlCmd )
|
|
214 obsTsetTuples = self._iDb.cursor.fetchall()
|
|
215
|
|
216 self.assertEqual( expTSetTuples, obsTsetTuples )
|
|
217
|
|
218 def test_deleteFromIdListWithEmptyList(self):
|
|
219 self._iDb.createTable( self._table, "set", "" )
|
|
220 set1 = Set()
|
|
221 set1.setFromString( "1\tname1\tdesc1\t1\t120\n" )
|
|
222 set2 = Set()
|
|
223 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
|
|
224 set3 = Set()
|
|
225 set3.setFromString( "3\tname2\tdesc3\t1\t50\n" )
|
|
226 for m in [ set1, set2, set3 ]: self._tSetA.insert( m )
|
|
227 lId2del = []
|
|
228 self._tSetA.deleteFromIdList(lId2del)
|
|
229 expTSetTuples = ((1L, 'name1', 'desc1', 1L, 120L), (2L, 'name2', 'desc2', 1L, 20L), (3L, 'name2', 'desc3', 1L, 50L))
|
|
230 sqlCmd = "SELECT * FROM %s" % ( self._table )
|
|
231 self._iDb.execute( sqlCmd )
|
|
232 obsTsetTuples = self._iDb.cursor.fetchall()
|
|
233
|
|
234 self.assertEqual( expTSetTuples, obsTsetTuples )
|
|
235
|
|
236 def test_joinTwoSets(self):
|
|
237 self._iDb.createTable( self._table, "set", "" )
|
|
238 idSet1 = 5
|
|
239 set1 = Set()
|
|
240 set1.setFromString( "5\tname1\tdesc1\t1\t120\n" )
|
|
241 idSet2 = 2
|
|
242 set2 = Set()
|
|
243 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
|
|
244 lset = [ set1, set2 ]
|
|
245 self._tSetA.insertList( lset )
|
|
246 self._tSetA.joinTwoSets(idSet1, idSet2)
|
|
247 sqlCmd = "SELECT * FROM %s" % ( self._table )
|
|
248 self._iDb.execute( sqlCmd )
|
|
249
|
|
250 expTSetTuples = ((2L, "name1", "desc1", 1L, 120L ), (2L, "name2", "desc2", 1L, 20L ))
|
|
251 obsTSetTuples = self._iDb.cursor.fetchall()
|
|
252
|
|
253 self.assertEqual( expTSetTuples, obsTSetTuples)
|
|
254 self._iDb.dropTable(self._table)
|
|
255
|
|
256 def test_joinTwoSetsWhereId1InfId2(self):
|
|
257 self._iDb.createTable( self._table, "set", "" )
|
|
258 idSet1 = 2
|
|
259 set1 = Set()
|
|
260 set1.setFromString( "5\tname1\tdesc1\t1\t120\n" )
|
|
261
|
|
262 idSet2 = 5
|
|
263 set2 = Set()
|
|
264 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
|
|
265
|
|
266 lset = [ set1, set2 ]
|
|
267 self._tSetA.insertList( lset )
|
|
268
|
|
269 self._tSetA.joinTwoSets(idSet1, idSet2)
|
|
270
|
|
271 sqlCmd = "SELECT * FROM %s" % ( self._table )
|
|
272 self._iDb.execute( sqlCmd )
|
|
273
|
|
274 expTSetTuples = ((2L, "name1", "desc1", 1L, 120L ), (2L, "name2", "desc2", 1L, 20L ))
|
|
275 obsTSetTuples = self._iDb.cursor.fetchall()
|
|
276
|
|
277 self.assertEqual( expTSetTuples, obsTSetTuples)
|
|
278 self._iDb.dropTable(self._table)
|
|
279
|
|
280 def test_getNewId(self):
|
|
281 self._iDb.createTable( self._table, "set", "" )
|
|
282 set1 = Set()
|
|
283 set1.setFromString( "1\tname1\tdesc1\t1\t120\n" )
|
|
284 set2 = Set()
|
|
285 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
|
|
286 set3 = Set()
|
|
287 set3.setFromString( "5\tname1\tdesc1\t1\t120\n" )
|
|
288 set4 = Set()
|
|
289 set4.setFromString( "8\tname2\tdesc2\t1\t20\n" )
|
|
290 lset = [ set1, set2, set3, set4 ]
|
|
291 self._tSetA.insertList( lset )
|
|
292 expId = 9
|
|
293 obsId = self._tSetA.getNewId()
|
|
294 self.assertEqual( expId, obsId)
|
|
295 self._iDb.dropTable(self._table)
|
|
296
|
|
297 def test_getNewId_set_null(self):
|
|
298 self._iDb.createTable( self._table, "set", "" )
|
|
299 set1 = Set()
|
|
300 lset = [ set1 ]
|
|
301 self._tSetA.insertList( lset )
|
|
302 expId = 1
|
|
303 obsId = self._tSetA.getNewId()
|
|
304 self.assertEqual( expId, obsId)
|
|
305 self._iDb.dropTable(self._table)
|
|
306
|
|
307 def test_getListOfAllSets( self ):
|
|
308 self._iDb.createTable( self._table, "set" )
|
|
309 s1 = Set()
|
|
310 s1.setFromString( "1\tchr1\tTE3\t1\t10\n" )
|
|
311 s2a = Set()
|
|
312 s2a.setFromString( "2\tchr1\tTE2\t2\t9\n" )
|
|
313 s2b = Set()
|
|
314 s2b.setFromString( "2\tchr1\tTE2\t12\t19\n" )
|
|
315 lSets = [ s1, s2a, s2b ]
|
|
316 self._tSetA.insertList( lSets )
|
|
317 expLSets = [ s1, s2a, s2b ]
|
|
318 obsLSets = self._tSetA.getListOfAllSets()
|
|
319 self.assertEqual( expLSets, obsLSets )
|
|
320
|
|
321 def test_getListOfAllSets_empty_table( self ):
|
|
322 self._iDb.createTable( self._table, "set" )
|
|
323 expList = []
|
|
324 obsList = self._tSetA.getListOfAllSets()
|
|
325 self.assertEqual( expList, obsList )
|
|
326
|
|
327 test_suite = unittest.TestSuite()
|
|
328 test_suite.addTest( unittest.makeSuite( Test_TableSetAdaptator ) )
|
|
329 if __name__ == "__main__":
|
|
330 unittest.TextTestRunner(verbosity=2).run( test_suite )
|