comparison commons/core/sql/test/Test_TableSetAdaptator.py @ 6:769e306b7933

Change the repository level.
author yufei-luo
date Fri, 18 Jan 2013 04:54:14 -0500
parents
children
comparison
equal deleted inserted replaced
5:ea3082881bf8 6:769e306b7933
1 # Copyright INRA (Institut National de la Recherche Agronomique)
2 # http://www.inra.fr
3 # http://urgi.versailles.inra.fr
4 #
5 # This software is governed by the CeCILL license under French law and
6 # abiding by the rules of distribution of free software. You can use,
7 # modify and/ or redistribute the software under the terms of the CeCILL
8 # license as circulated by CEA, CNRS and INRIA at the following URL
9 # "http://www.cecill.info".
10 #
11 # As a counterpart to the access to the source code and rights to copy,
12 # modify and redistribute granted by the license, users are provided only
13 # with a limited warranty and the software's author, the holder of the
14 # economic rights, and the successive licensors have only limited
15 # liability.
16 #
17 # In this respect, the user's attention is drawn to the risks associated
18 # with loading, using, modifying and/or developing or reproducing the
19 # software by the user in light of its specific status of free software,
20 # that may mean that it is complicated to manipulate, and that also
21 # therefore means that it is reserved for developers and experienced
22 # professionals having in-depth computer knowledge. Users are therefore
23 # encouraged to load and test the software's suitability as regards their
24 # requirements in conditions enabling the security of their systems and/or
25 # data to be ensured and, more generally, to use and operate it in the
26 # same conditions as regards security.
27 #
28 # The fact that you are presently reading this means that you have had
29 # knowledge of the CeCILL license and that you accept its terms.
30
31
32 import unittest
33 import time
34 import os
35 from commons.core.sql.TableSetAdaptator import TableSetAdaptator
36 from commons.core.sql.DbMySql import DbMySql
37 from commons.core.coord.Set import Set
38
39
40 class Test_TableSetAdaptator( unittest.TestCase ):
41
42 def setUp( self ):
43 self._uniqId = "%s_%s" % ( time.strftime("%Y%m%d%H%M%S") , os.getpid() )
44 self._configFileName = "dummyConfigFile_%s" % ( self._uniqId )
45 configF = open(self._configFileName, "w" )
46 configF.write( "[repet_env]\n" )
47 configF.write( "repet_host: %s\n" % ( os.environ["REPET_HOST"] ) )
48 configF.write( "repet_user: %s\n" % ( os.environ["REPET_USER"] ) )
49 configF.write( "repet_pw: %s\n" % ( os.environ["REPET_PW"] ) )
50 configF.write( "repet_db: %s\n" % ( os.environ["REPET_DB"] ) )
51 configF.write( "repet_port: %s\n" % ( os.environ["REPET_PORT"] ) )
52 configF.close()
53 self._iDb = DbMySql( cfgFileName=self._configFileName )
54 self._table = "dummySetTable_%s" % ( self._uniqId )
55 self._tSetA = TableSetAdaptator( self._iDb, self._table )
56
57 def tearDown( self ):
58 self._uniqId = None
59 self._iDb.dropTable( self._table )
60 self._iDb.close()
61 self._table = None
62 self._tSetA = None
63 os.remove( self._configFileName )
64 self._configFileName = ""
65
66 def test_insert(self):
67 set2Insert = Set()
68 set2Insert.id = 1
69 set2Insert.name = "name1"
70 set2Insert.seqname = "name2"
71 set2Insert.start = 1L
72 set2Insert.end = 50L
73 self._iDb.createTable( self._table, "set", "" )
74 self._tSetA.insert( set2Insert, False )
75 sqlCmd = "SELECT * FROM %s" % ( self._table )
76 self._iDb.execute( sqlCmd )
77 expTsetTuple = ((1, "name1", "name2", 1L, 50L),)
78 obsTsetTuples = self._iDb.cursor.fetchall()
79 self.assertEquals(expTsetTuple, obsTsetTuples )
80
81 def test_insertList ( self ):
82 self._iDb.createTable( self._table, "set", "" )
83 set1 = Set()
84 set1.setFromString( "1\tname1\tdesc1\t1\t120\n" )
85 set2 = Set()
86 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
87 lset = [ set1, set2 ]
88 self._tSetA.insertList( lset )
89 sqlCmd = "SELECT * FROM %s" % ( self._table )
90 self._iDb.execute( sqlCmd )
91 expTsetTuple = ((1, "name1", "desc1", 1, 120), (2, "name2", "desc2", 1, 20))
92 obsTsetTuples = self._iDb.cursor.fetchall()
93 self.assertEqual( expTsetTuple, obsTsetTuples )
94
95 def test_getIdList(self):
96 set2Insert = Set()
97 set2Insert.id = 1
98 set2Insert.name = "name1"
99 set2Insert.seqname = "name2"
100 set2Insert.start = 1
101 set2Insert.end = 50
102 self._iDb.createTable( self._table, "set", "" )
103 self._tSetA.insert( set2Insert )
104 l = self._tSetA.getIdList()
105 self.assertEquals( set2Insert.id, l[0] )
106
107 def test_getSeqNameList(self):
108 self._iDb.createTable( self._table, "set", "" )
109 set1 = Set()
110 set1.setFromString( "1\tname1\tdesc1\t1\t120\n" )
111 set2 = Set()
112 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
113 set3 = Set()
114 set3.setFromString( "2\tname2\tdesc2\t1\t50\n" )
115 for m in [ set1, set2, set3 ]: self._tSetA.insert( m )
116 lExp = ["desc1", "desc2"]
117 lObs = self._tSetA.getSeqNameList()
118 self.assertEqual( lObs, lExp )
119
120 def test_getSetListFromSeqName(self):
121 self._iDb.createTable( self._table, "set", "" )
122 set1 = Set()
123 set1.setFromString( "1\tname1\tdesc1\t1\t120\n" )
124 set2 = Set()
125 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
126 set3 = Set()
127 set3.setFromString( "3\tname2\tdesc2\t1\t50\n" )
128 for m in [ set1, set2, set3 ]: self._tSetA.insert( m )
129 explSet = [Set( 2,"name2", "desc2", 1, 20), Set( 3,"name2", "desc2", 1, 50)]
130 obslSet = self._tSetA.getSetListFromSeqName("desc2")
131 self.assertEqual( explSet, obslSet )
132
133 def test_getSetListFromId(self):
134 self._iDb.createTable( self._table, "set", "" )
135 set1 = Set()
136 set1.setFromString( "1\tname1\tdesc1\t1\t120\n" )
137 set2 = Set()
138 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
139 lset = [ set1, set2 ]
140 self._tSetA.insertList( lset )
141 explSet = [Set( 2,"name2", "desc2", 1, 20)]
142 obslSet = self._tSetA.getSetListFromId(2)
143 self.assertEqual( explSet, obslSet )
144
145 def test_getSetListFromIdList(self):
146 self._iDb.createTable( self._table, "set", "" )
147 set1 = Set()
148 set1.setFromString( "1\tname1\tdesc1\t1\t120\n" )
149 set2 = Set()
150 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
151 lset = [ set1, set2 ]
152 self._tSetA.insertList( lset )
153 explSet = [Set( 1, "name1", "desc1", 1, 120), Set( 2,"name2", "desc2", 1, 20)]
154 lId = [1, 2]
155 obslSet = self._tSetA.getSetListFromIdList(lId)
156 self.assertEqual( explSet, obslSet )
157
158 def test_getSetListFromIdList_emptyList(self):
159 self._iDb.createTable( self._table, "set", "" )
160 set1 = Set()
161 set1.setFromString( "1\tname1\tdesc1\t1\t120\n" )
162 set2 = Set()
163 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
164 lset = [ set1, set2 ]
165 self._tSetA.insertList( lset )
166 explSet = []
167 lId = []
168 obslSet = self._tSetA.getSetListFromIdList(lId)
169 self.assertEqual( explSet, obslSet )
170
171 def test_getSetListOverlappingCoord(self):
172 self._iDb.createTable( self._table, "set", "" )
173 set1 = Set()
174 set1.setFromString( "1\tname1\tdesc2\t1\t120\n" )
175 set2 = Set()
176 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
177 lset = [ set1, set2 ]
178 self._tSetA.insertList( lset )
179 explSet = [Set( 1,"name1", "desc2", 1, 120)]
180 obslSet = self._tSetA.getSetListOverlappingCoord("desc2", 30, 70)
181 self.assertEqual( explSet, obslSet )
182
183 def test_deleteFromId(self):
184 self._iDb.createTable( self._table, "set", "" )
185 set1 = Set()
186 set1.setFromString( "1\tname1\tdesc1\t1\t120\n" )
187 set2 = Set()
188 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
189 set3 = Set()
190 set3.setFromString( "3\tname2\tdesc3\t1\t50\n" )
191 for m in [ set1, set2, set3 ]: self._tSetA.insert( m )
192 self._tSetA.deleteFromId(1)
193 expTSetTuples = (( 2,"name2", "desc2", 1, 20), ( 3,"name2", "desc3", 1, 50))
194 sqlCmd = "SELECT * FROM %s" % ( self._table )
195 self._iDb.execute( sqlCmd )
196 obsTsetTuples = self._iDb.cursor.fetchall()
197
198 self.assertEqual( expTSetTuples, obsTsetTuples )
199
200 def test_deleteFromIdList(self):
201 self._iDb.createTable( self._table, "set", "" )
202 set1 = Set()
203 set1.setFromString( "1\tname1\tdesc1\t1\t120\n" )
204 set2 = Set()
205 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
206 set3 = Set()
207 set3.setFromString( "3\tname2\tdesc3\t1\t50\n" )
208 for m in [ set1, set2, set3 ]: self._tSetA.insert( m )
209 lId2del = [1, 2]
210 self._tSetA.deleteFromIdList(lId2del)
211 expTSetTuples = (( 3,"name2", "desc3", 1, 50),)
212 sqlCmd = "SELECT * FROM %s" % ( self._table )
213 self._iDb.execute( sqlCmd )
214 obsTsetTuples = self._iDb.cursor.fetchall()
215
216 self.assertEqual( expTSetTuples, obsTsetTuples )
217
218 def test_deleteFromIdListWithEmptyList(self):
219 self._iDb.createTable( self._table, "set", "" )
220 set1 = Set()
221 set1.setFromString( "1\tname1\tdesc1\t1\t120\n" )
222 set2 = Set()
223 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
224 set3 = Set()
225 set3.setFromString( "3\tname2\tdesc3\t1\t50\n" )
226 for m in [ set1, set2, set3 ]: self._tSetA.insert( m )
227 lId2del = []
228 self._tSetA.deleteFromIdList(lId2del)
229 expTSetTuples = ((1L, 'name1', 'desc1', 1L, 120L), (2L, 'name2', 'desc2', 1L, 20L), (3L, 'name2', 'desc3', 1L, 50L))
230 sqlCmd = "SELECT * FROM %s" % ( self._table )
231 self._iDb.execute( sqlCmd )
232 obsTsetTuples = self._iDb.cursor.fetchall()
233
234 self.assertEqual( expTSetTuples, obsTsetTuples )
235
236 def test_joinTwoSets(self):
237 self._iDb.createTable( self._table, "set", "" )
238 idSet1 = 5
239 set1 = Set()
240 set1.setFromString( "5\tname1\tdesc1\t1\t120\n" )
241 idSet2 = 2
242 set2 = Set()
243 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
244 lset = [ set1, set2 ]
245 self._tSetA.insertList( lset )
246 self._tSetA.joinTwoSets(idSet1, idSet2)
247 sqlCmd = "SELECT * FROM %s" % ( self._table )
248 self._iDb.execute( sqlCmd )
249
250 expTSetTuples = ((2L, "name1", "desc1", 1L, 120L ), (2L, "name2", "desc2", 1L, 20L ))
251 obsTSetTuples = self._iDb.cursor.fetchall()
252
253 self.assertEqual( expTSetTuples, obsTSetTuples)
254 self._iDb.dropTable(self._table)
255
256 def test_joinTwoSetsWhereId1InfId2(self):
257 self._iDb.createTable( self._table, "set", "" )
258 idSet1 = 2
259 set1 = Set()
260 set1.setFromString( "5\tname1\tdesc1\t1\t120\n" )
261
262 idSet2 = 5
263 set2 = Set()
264 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
265
266 lset = [ set1, set2 ]
267 self._tSetA.insertList( lset )
268
269 self._tSetA.joinTwoSets(idSet1, idSet2)
270
271 sqlCmd = "SELECT * FROM %s" % ( self._table )
272 self._iDb.execute( sqlCmd )
273
274 expTSetTuples = ((2L, "name1", "desc1", 1L, 120L ), (2L, "name2", "desc2", 1L, 20L ))
275 obsTSetTuples = self._iDb.cursor.fetchall()
276
277 self.assertEqual( expTSetTuples, obsTSetTuples)
278 self._iDb.dropTable(self._table)
279
280 def test_getNewId(self):
281 self._iDb.createTable( self._table, "set", "" )
282 set1 = Set()
283 set1.setFromString( "1\tname1\tdesc1\t1\t120\n" )
284 set2 = Set()
285 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
286 set3 = Set()
287 set3.setFromString( "5\tname1\tdesc1\t1\t120\n" )
288 set4 = Set()
289 set4.setFromString( "8\tname2\tdesc2\t1\t20\n" )
290 lset = [ set1, set2, set3, set4 ]
291 self._tSetA.insertList( lset )
292 expId = 9
293 obsId = self._tSetA.getNewId()
294 self.assertEqual( expId, obsId)
295 self._iDb.dropTable(self._table)
296
297 def test_getNewId_set_null(self):
298 self._iDb.createTable( self._table, "set", "" )
299 set1 = Set()
300 lset = [ set1 ]
301 self._tSetA.insertList( lset )
302 expId = 1
303 obsId = self._tSetA.getNewId()
304 self.assertEqual( expId, obsId)
305 self._iDb.dropTable(self._table)
306
307 def test_getListOfAllSets( self ):
308 self._iDb.createTable( self._table, "set" )
309 s1 = Set()
310 s1.setFromString( "1\tchr1\tTE3\t1\t10\n" )
311 s2a = Set()
312 s2a.setFromString( "2\tchr1\tTE2\t2\t9\n" )
313 s2b = Set()
314 s2b.setFromString( "2\tchr1\tTE2\t12\t19\n" )
315 lSets = [ s1, s2a, s2b ]
316 self._tSetA.insertList( lSets )
317 expLSets = [ s1, s2a, s2b ]
318 obsLSets = self._tSetA.getListOfAllSets()
319 self.assertEqual( expLSets, obsLSets )
320
321 def test_getListOfAllSets_empty_table( self ):
322 self._iDb.createTable( self._table, "set" )
323 expList = []
324 obsList = self._tSetA.getListOfAllSets()
325 self.assertEqual( expList, obsList )
326
327 test_suite = unittest.TestSuite()
328 test_suite.addTest( unittest.makeSuite( Test_TableSetAdaptator ) )
329 if __name__ == "__main__":
330 unittest.TextTestRunner(verbosity=2).run( test_suite )