comparison commons/core/sql/test/Test_DbMySql.py @ 6:769e306b7933

Change the repository level.
author yufei-luo
date Fri, 18 Jan 2013 04:54:14 -0500
parents
children
comparison
equal deleted inserted replaced
5:ea3082881bf8 6:769e306b7933
1 # Copyright INRA (Institut National de la Recherche Agronomique)
2 # http://www.inra.fr
3 # http://urgi.versailles.inra.fr
4 #
5 # This software is governed by the CeCILL license under French law and
6 # abiding by the rules of distribution of free software. You can use,
7 # modify and/ or redistribute the software under the terms of the CeCILL
8 # license as circulated by CEA, CNRS and INRIA at the following URL
9 # "http://www.cecill.info".
10 #
11 # As a counterpart to the access to the source code and rights to copy,
12 # modify and redistribute granted by the license, users are provided only
13 # with a limited warranty and the software's author, the holder of the
14 # economic rights, and the successive licensors have only limited
15 # liability.
16 #
17 # In this respect, the user's attention is drawn to the risks associated
18 # with loading, using, modifying and/or developing or reproducing the
19 # software by the user in light of its specific status of free software,
20 # that may mean that it is complicated to manipulate, and that also
21 # therefore means that it is reserved for developers and experienced
22 # professionals having in-depth computer knowledge. Users are therefore
23 # encouraged to load and test the software's suitability as regards their
24 # requirements in conditions enabling the security of their systems and/or
25 # data to be ensured and, more generally, to use and operate it in the
26 # same conditions as regards security.
27 #
28 # The fact that you are presently reading this means that you have had
29 # knowledge of the CeCILL license and that you accept its terms.
30
31 import unittest
32 import time
33 import os
34 from MySQLdb import ProgrammingError
35 from commons.core.sql.DbMySql import DbMySql
36 from commons.core.sql.DbMySql import TABLE_SCHEMA_DESCRIPTOR
37 from commons.core.sql.DbMySql import TABLE_TYPE_SYNONYMS
38 from commons.core.utils.FileUtils import FileUtils
39 from commons.core.coord.Path import Path
40
41 class Test_DbMySql( unittest.TestCase ):
42
43 def setUp( self ):
44 self._iDb = DbMySql( )
45 self._uniqId = "%s" % time.strftime("%Y%m%d%H%M%S")
46
47 def tearDown( self ):
48 if self._iDb.db.open:
49 self._iDb.close()
50 self._iDb = None
51
52 def test_execute_syntax_error(self):
53 expErrorMsg = "You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'CHAUD TABLES' at line 1"
54 obsErrorMsg = ""
55 sqlCmd = "CHAUD TABLES"
56 try:
57 self._iDb.execute(sqlCmd)
58 except ProgrammingError as excep:
59 obsErrorMsg = excep.args[1]
60
61 self.assertEquals(expErrorMsg, obsErrorMsg)
62
63 def test_execute_with_1_retry(self):
64 tableName = "dummyTable%s" % self._uniqId
65 sqlCmd = "CREATE TABLE %s (dummyColumn varchar(255))" % tableName
66 self._iDb.close()
67 self._iDb.execute(sqlCmd)
68 self.assertTrue(self._iDb.doesTableExist(tableName))
69 self._iDb.dropTable(tableName)
70
71 def test_setAttributesFromConfigFile(self):
72 expHost = "dummyHost"
73 expUser = "dummyUser"
74 expPw = "dummyPw"
75 expDb = "dummyDb"
76 expPort = 1000
77
78 configFileName = "dummyConfigFileName.cfg"
79 f = open( configFileName, "w" )
80 f.write("[repet_env]\n")
81 f.write("repet_host: " + expHost + "\n")
82 f.write("repet_user: " + expUser + "\n")
83 f.write("repet_pw: " + expPw + "\n")
84 f.write("repet_db: " + expDb + "\n")
85 f.write("repet_port: " + str(expPort) + "\n")
86 f.close()
87
88 self._iDb.setAttributesFromConfigFile(configFileName)
89
90 obsHost = self._iDb.host
91 obsUser = self._iDb.user
92 obsPw = self._iDb.passwd
93 obsDb = self._iDb.dbname
94 obsPort = self._iDb.port
95
96 os.remove(configFileName)
97
98 self.assertEquals( expHost, obsHost )
99 self.assertEquals( expUser, obsUser )
100 self.assertEquals( expPw, obsPw )
101 self.assertEquals( expDb, obsDb )
102 self.assertEquals( expPort, obsPort )
103
104 def test_open_True(self):
105 self._iDb.close()
106 self.assertTrue(self._iDb.open())
107 self.assertEquals(1, self._iDb.db.open)
108 self._iDb.close()
109 self.assertEquals(0, self._iDb.db.open)
110
111 def test_open_False(self):
112 self._iDb.close()
113 self._iDb.user = "dummyUser"
114 self.assertFalse( self._iDb.open() )
115
116 def test_doesTableExist_True(self):
117 tableName = "dummyTable" + self._uniqId
118 sqlCmd = "CREATE TABLE %s ( dummyColumn varchar(255) )" % ( tableName )
119 self._iDb.execute( sqlCmd )
120 self.assertTrue( self._iDb.doesTableExist(tableName) )
121 self._iDb.dropTable(tableName)
122
123 def test_doesTableExist_False(self):
124 tableName = "dummyTable" + self._uniqId
125 self.assertFalse( self._iDb.doesTableExist(tableName) )
126
127 def test_dropTable(self):
128 tableName = "dummyTable" + self._uniqId
129 sqlCmd = "CREATE TABLE %s ( dummyColumn varchar(255) )" % ( tableName )
130 self._iDb.execute( sqlCmd )
131 self._iDb.dropTable(tableName)
132 self.assertFalse( self._iDb.doesTableExist(tableName) )
133
134 def test_renameTable(self):
135 tableName = "dummyTable" + self._uniqId
136 sqlCmd = "CREATE TABLE %s ( dummyColumn varchar(255) )" % ( tableName )
137 self._iDb.execute( sqlCmd )
138 self._iDb.updateInfoTable( tableName, "" )
139 newTableName = "newDummyTable"
140
141 self._iDb.renameTable(tableName, newTableName)
142
143 self.assertFalse( self._iDb.doesTableExist(tableName) )
144 self.assertTrue( self._iDb.doesTableExist(newTableName) )
145
146 expTuple = (('newDummyTable', ''),)
147 sqlCmd = 'SELECT * FROM info_tables WHERE name = "%s"' % ( newTableName )
148 self._iDb.execute( sqlCmd )
149 obsTuple = self._iDb.cursor.fetchall()
150 self.assertEquals( expTuple, obsTuple)
151
152 expTuple = ()
153 sqlCmd = 'SELECT * FROM info_tables WHERE name = "%s"' % ( tableName )
154 self._iDb.execute( sqlCmd )
155 obsTuple = self._iDb.cursor.fetchall()
156 self.assertEquals( expTuple, obsTuple)
157
158 self._iDb.dropTable(newTableName)
159
160 def test_copyTable(self):
161 tableName = "dummyTable" + self._uniqId
162 sqlCmd = "CREATE TABLE %s ( dummyColumn varchar(255) );" % ( tableName )
163 self._iDb.execute( sqlCmd )
164 sqlCmd = "CREATE INDEX idummyColumn ON %s ( dummyColumn );" % (tableName)
165 self._iDb.execute( sqlCmd )
166
167 newTableName = "newDummyTable"
168
169 self._iDb.copyTable(tableName, newTableName)
170
171 self.assertTrue( self._iDb.doesTableExist(tableName) )
172 self.assertTrue( self._iDb.doesTableExist(newTableName) )
173
174 expTuple = (('newDummyTable', ''),)
175 sqlCmd = 'SELECT * FROM info_tables WHERE name = "%s";' % ( newTableName )
176 self._iDb.execute( sqlCmd )
177 obsTuple = self._iDb.cursor.fetchall()
178
179 self.assertEquals( expTuple, obsTuple)
180
181 expTuple = (('newDummyTable', 1L, 'idummyColumn', 1L, 'dummyColumn', 'A', None, None, None, 'YES', 'BTREE', ''),)
182 sqlCmd = "SHOW INDEX FROM %s;" % ( newTableName )
183 self._iDb.execute( sqlCmd )
184 obsTuple = self._iDb.cursor.fetchall()
185 self.assertEquals( expTuple, obsTuple)
186
187 self._iDb.dropTable(tableName)
188 self._iDb.dropTable(newTableName)
189
190 def test_getTableType(self):
191 lTypesToTest = TABLE_SCHEMA_DESCRIPTOR.keys()
192 for tableType in lTypesToTest:
193 tableName = "dummy%sTable%s" % (tableType, self._uniqId)
194 self._iDb.createTable(tableName, tableType)
195
196 obsType = self._iDb.getTableType(tableName)
197 self.assertEquals(tableType, obsType)
198
199 self._iDb.dropTable(tableName)
200
201 def test_getSize_empty_table(self):
202 tableName = "dummyPathTable" + self._uniqId
203 sqlCmd = "CREATE TABLE %s ( path int unsigned, query_name varchar(255), query_start int , query_end int, subject_name varchar(255), subject_start int unsigned, subject_end int unsigned, E_value double, score int unsigned, identity float)" % ( tableName )
204 self._iDb.execute( sqlCmd )
205
206 pathFileName = "dummyPathFile.txt"
207 pathF = open( pathFileName, "w" )
208 pathF.write( "")
209 pathF.close()
210 self._iDb.loadDataFromFile(tableName, pathFileName, False)
211 expSize = 0
212 obsSize = self._iDb.getSize(tableName)
213
214 self._iDb.dropTable(tableName)
215 os.remove(pathFileName)
216
217 self.assertEquals( expSize, obsSize )
218
219 def test_getSize_two_rows(self):
220 tableName = "dummyPathTable" + self._uniqId
221 sqlCmd = "CREATE TABLE %s ( path int unsigned, query_name varchar(255), query_start int , query_end int, subject_name varchar(255), subject_start int unsigned, subject_end int unsigned, E_value double, score int unsigned, identity float)" % ( tableName )
222 self._iDb.execute( sqlCmd )
223
224 pathFileName = "dummyPathFile.txt"
225 pathF = open( pathFileName, "w" )
226 pathF.write( "1\tqry\t1\t100\tsbj\t1\t100\t1e-123\t136\t98.4\n" )
227 pathF.write( "2\tqry\t500\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" )
228 pathF.close()
229 self._iDb.loadDataFromFile(tableName, pathFileName, False)
230 expSize = 2
231 obsSize = self._iDb.getSize(tableName)
232
233 self._iDb.dropTable(tableName)
234 os.remove(pathFileName)
235
236 self.assertEquals( expSize, obsSize )
237
238 def test_isEmpty_True(self):
239 tableName = "dummyTable" + self._uniqId
240 sqlCmd = "CREATE TABLE %s ( dummyColumn varchar(255) )" % ( tableName )
241 self._iDb.execute( sqlCmd )
242
243 fileName = "dummyTableFile.txt"
244 f = open( fileName, "w" )
245 f.write( "" )
246 f.close()
247 self._iDb.loadDataFromFile(tableName, fileName, False)
248
249 self.assertTrue( self._iDb.isEmpty(tableName) )
250
251 self._iDb.dropTable(tableName)
252 os.remove(fileName)
253
254 def test_isEmpty_False(self):
255 tableName = "dummyTable" + self._uniqId
256 sqlCmd = "CREATE TABLE %s ( dummyColumn varchar(255) )" % tableName
257 self._iDb.execute( sqlCmd )
258
259 fileName = "dummyTableFile.txt"
260 f = open( fileName, "w" )
261 f.write( "test" )
262 f.close()
263 self._iDb.loadDataFromFile(tableName, fileName, False)
264
265 self.assertFalse( self._iDb.isEmpty(tableName) )
266
267 self._iDb.dropTable(tableName)
268 os.remove(fileName)
269
270 def test_updateInfoTable(self):
271 tableName = "dummyTable" + self._uniqId
272 info = "Table_for_test"
273
274 self._iDb.updateInfoTable(tableName, info)
275
276 sqlCmd = 'SELECT file FROM info_tables WHERE name = "%s"' % ( tableName )
277 self._iDb.execute( sqlCmd )
278 results = self._iDb.cursor.fetchall()
279 obsResult = False
280 if (info,) in results:
281 obsResult = True
282 sqlCmd = 'DELETE FROM info_tables WHERE name = "%s"' % ( tableName )
283 self._iDb.execute( sqlCmd )
284
285 self.assertTrue( obsResult )
286
287 def test_loadDataFromFile_with_empty_file(self):
288 tableName = "dummyPathTable1" + self._uniqId
289 sqlCmd = "CREATE TABLE %s ( path int unsigned, query_name varchar(255), query_start int , query_end int, subject_name varchar(255), subject_start int unsigned, subject_end int unsigned, E_value double, score int unsigned, identity float)" % ( tableName )
290 self._iDb.execute( sqlCmd )
291
292 pathFileName = "dummyPathFile.txt"
293 pathF = open( pathFileName, "w" )
294 pathF.write( "" )
295 pathF.close()
296 expTPathTuples = ()
297
298 self._iDb.loadDataFromFile(tableName, pathFileName, False)
299
300 sqlCmd = "SELECT * FROM %s" % ( tableName )
301 self._iDb.execute( sqlCmd )
302 obsTPathTuples = self._iDb.cursor.fetchall()
303
304 self._iDb.dropTable(tableName)
305 os.remove(pathFileName)
306
307 self.assertEquals( expTPathTuples, obsTPathTuples )
308
309 def test_loadDataFromFile_with_first_line(self):
310 tableName = "dummyPathTable2" + self._uniqId
311 sqlCmd = "CREATE TABLE %s ( path int unsigned, query_name varchar(255), query_start int , query_end int, subject_name varchar(255), subject_start int unsigned, subject_end int unsigned, E_value double, score int unsigned, identity float)" % ( tableName )
312 self._iDb.execute( sqlCmd )
313
314 pathFileName = "dummyPathFile.txt"
315 pathF = open( pathFileName, "w" )
316 pathF.write( "1\tqry\t1\t100\tsbj\t1\t100\t1e-123\t136\t98.4\n" )
317 pathF.write( "2\tqry\t500\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" )
318 pathF.close()
319
320 expPathTuple1 = (1L, 'qry', 1L, 100L, 'sbj', 1L, 100L, 1e-123, 136L, 98.4)
321 expPathTuple2 = (2L, 'qry', 500L, 401L, 'sbj', 1L, 100L, 1e-152, 161L, 98.7)
322 expTPathTuples = (expPathTuple1, expPathTuple2)
323
324 self._iDb.loadDataFromFile(tableName, pathFileName, False)
325
326 sqlCmd = "SELECT * FROM %s" % ( tableName )
327 self._iDb.execute( sqlCmd )
328 obsTPathTuples = self._iDb.cursor.fetchall()
329
330 self._iDb.dropTable(tableName)
331 os.remove(pathFileName)
332
333 self.assertEquals( expTPathTuples, obsTPathTuples )
334
335 def test_loadDataFromFile_without_first_line(self):
336 tableName = "dummyPathTable3" + self._uniqId
337 sqlCmd = "CREATE TABLE %s ( path int unsigned, query_name varchar(255), query_start int , query_end int, subject_name varchar(255), subject_start int unsigned, subject_end int unsigned, E_value double, score int unsigned, identity float)" % ( tableName )
338 self._iDb.execute( sqlCmd )
339
340 pathFileName = "dummyPathFile.txt"
341 pathF = open( pathFileName, "w" )
342 pathF.write( "1\tqry\t1\t100\tsbj\t1\t100\t1e-123\t136\t98.4\n" )
343 pathF.write( "2\tqry\t500\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" )
344 pathF.close()
345
346 expPathTuple = (2L, 'qry', 500L, 401L, 'sbj', 1L, 100L, 1e-152, 161L, 98.7)
347 expTPathTuples = (expPathTuple,)
348
349 self._iDb.loadDataFromFile(tableName, pathFileName, True)
350
351 sqlCmd = "SELECT * FROM %s" % ( tableName )
352 self._iDb.execute( sqlCmd )
353 obsTPathTuples = self._iDb.cursor.fetchall()
354
355 self._iDb.dropTable(tableName)
356 os.remove(pathFileName)
357
358 self.assertEquals( expTPathTuples, obsTPathTuples )
359
360 def test_createIndex_Map(self):
361 tableName = "dummyMapTable" + self._uniqId
362 sqlCmd = "CREATE TABLE %s ( name varchar(255), chr varchar(255), start int, end int)" % ( tableName )
363 self._iDb.execute( sqlCmd )
364 expLIndex = ["iname", "ichr", "istart", "iend", "icoord", "icoord"]
365
366 self._iDb.createIndex(tableName, "map")
367
368 sqlCmd = "SHOW INDEX FROM %s" % ( tableName )
369 self._iDb.execute( sqlCmd )
370 results = self._iDb.cursor.fetchall()
371
372 for index in expLIndex[:-1]:
373 sqlCmd = "DROP INDEX %s ON %s" % ( index, tableName )
374 self._iDb.execute( sqlCmd )
375 self._iDb.dropTable(tableName)
376
377 obsLIndex = []
378 for tuple in results:
379 obsLIndex.append(tuple[2])
380
381 self.assertEquals( expLIndex, obsLIndex)
382
383 def test_createIndex_Map_coord_index_already_exist(self):
384 tableName = "dummyMapTable" + self._uniqId
385 sqlCmd = "CREATE TABLE %s ( name varchar(255), chr varchar(255), start int, end int)" % ( tableName )
386 self._iDb.execute( sqlCmd )
387 sqlCmd = "CREATE INDEX icoord ON %s ( start,end );" % (tableName)
388 self._iDb.execute( sqlCmd )
389 expLIndex = ["icoord", "icoord", "iname", "ichr", "istart", "iend"]
390
391 self._iDb.createIndex(tableName, "map")
392
393 sqlCmd = "SHOW INDEX FROM %s" % ( tableName )
394 self._iDb.execute( sqlCmd )
395 results = self._iDb.cursor.fetchall()
396
397 for index in expLIndex[1:]:
398 sqlCmd = "DROP INDEX %s ON %s" % ( index, tableName )
399 self._iDb.execute( sqlCmd )
400 self._iDb.dropTable(tableName)
401
402 obsLIndex = []
403 for tuple in results:
404 obsLIndex.append(tuple[2])
405
406 self.assertEquals( expLIndex, obsLIndex)
407
408 def test_createTable_Map( self ):
409 tableName = "dummyMapTable" + self._uniqId
410 mapFileName = "dummyMapFile.txt"
411 mapF = open( mapFileName, "w" )
412 mapF.write( "map1\tseq1\t20\t50\n" )
413 mapF.write( "map2\tseq2\t700\t760\n" )
414 mapF.close()
415
416 expMapTuple1 = ("map1", "seq1", 20L, 50L)
417 expMapTuple2 = ("map2", "seq2", 700L, 760L)
418 expTMapTuples = (expMapTuple1, expMapTuple2)
419
420 self._iDb.createTable(tableName, 'map', mapFileName)
421
422 sqlCmd = "SELECT * FROM %s" % ( tableName )
423 self._iDb.execute( sqlCmd )
424 obsTMapTuples = self._iDb.cursor.fetchall()
425
426 self._iDb.dropTable(tableName)
427 os.remove(mapFileName)
428
429 self.assertEquals( expTMapTuples, obsTMapTuples )
430
431 def test_createIndex_Match(self):
432 tableName = "dummyMatchTable" + self._uniqId
433 sqlCmd = "CREATE TABLE %s ( query_name varchar(255), query_start int, query_end int, query_length int unsigned, query_length_perc float, match_length_perc float, subject_name varchar(255), subject_start int unsigned, subject_end int unsigned, subject_length int unsigned, subject_length_perc float, E_value double, score int unsigned, identity float, path int unsigned)" % ( tableName )
434 self._iDb.execute( sqlCmd )
435 expLIndex = ["id", "qname", "qstart", "qend", "sname", "sstart", "send", "qcoord", "qcoord"]
436
437 self._iDb.createIndex(tableName, "match")
438
439 sqlCmd = "SHOW INDEX FROM %s" % ( tableName )
440 self._iDb.execute( sqlCmd )
441 results = self._iDb.cursor.fetchall()
442
443 obsLIndex = []
444 for tuple in results:
445 obsLIndex.append(tuple[2])
446
447 self._iDb.dropTable(tableName)
448 self.assertEquals( expLIndex, obsLIndex)
449
450 def test_createIndex_Match_all_index_already_exist(self):
451 tableName = "dummyMatchTable" + self._uniqId
452 sqlCmd = "CREATE TABLE %s ( query_name varchar(255), query_start int, query_end int, query_length int unsigned, query_length_perc float, match_length_perc float, subject_name varchar(255), subject_start int unsigned, subject_end int unsigned, subject_length int unsigned, subject_length_perc float, E_value double, score int unsigned, identity float, path int unsigned)" % ( tableName )
453 self._iDb.execute( sqlCmd )
454 sqlCmd = "CREATE UNIQUE INDEX id ON %s ( path );" % (tableName)
455 self._iDb.execute( sqlCmd )
456 sqlCmd = "CREATE INDEX qname ON %s ( query_name(10) );" % (tableName)
457 self._iDb.execute( sqlCmd )
458 sqlCmd = "CREATE INDEX qstart ON %s ( query_start );" % (tableName)
459 self._iDb.execute( sqlCmd )
460 sqlCmd = "CREATE INDEX qend ON %s ( query_end );" % (tableName)
461 self._iDb.execute( sqlCmd )
462 sqlCmd = "CREATE INDEX sname ON %s ( subject_name(10) );" % (tableName)
463 self._iDb.execute( sqlCmd )
464 sqlCmd = "CREATE INDEX sstart ON %s ( subject_start );" % (tableName)
465 self._iDb.execute( sqlCmd )
466 sqlCmd = "CREATE INDEX send ON %s ( subject_end );" % (tableName)
467 self._iDb.execute( sqlCmd )
468 sqlCmd = "CREATE INDEX qcoord ON %s ( query_start,query_end );" % (tableName)
469 self._iDb.execute( sqlCmd )
470 expLIndex = ["id", "qname", "qstart", "qend", "sname", "sstart", "send", "qcoord", "qcoord"]
471
472 self._iDb.createIndex(tableName, "match")
473
474 sqlCmd = "SHOW INDEX FROM %s" % ( tableName )
475 self._iDb.execute( sqlCmd )
476 results = self._iDb.cursor.fetchall()
477
478 for index in expLIndex[:-1]:
479 sqlCmd = "DROP INDEX %s ON %s" % ( index, tableName )
480 self._iDb.execute( sqlCmd )
481 self._iDb.dropTable(tableName)
482
483 obsLIndex = []
484 for tuple in results:
485 obsLIndex.append(tuple[2])
486
487 self.assertEquals( expLIndex, obsLIndex)
488
489 def test_createTable_match( self ):
490 tableName = "dummyMatchTable" + self._uniqId
491 matchFileName = "dummyMatchFile.txt"
492 matchF = open( matchFileName, "w" )
493 matchF.write( "qry1\t700\t760\t60\t100\t100\tsbj2\t500\t560\t60\t100\t1e-123\t136\t98.4\t2\n" )
494 matchF.write( "qry2\t700\t760\t60\t100\t100\tsbj2\t500\t560\t60\t100\t1e-123\t136\t98.4\t2\n" )
495 matchF.close()
496
497 expMatchTuple = ("qry2", 700L, 760L, 60L, 100.0, 100.0, "sbj2", 500L, 560L, 60L, 100.0, 1e-123, 136L, 98.4, 2L)
498 expTMatchTuples = (expMatchTuple,)
499
500 self._iDb.createTable(tableName, "match", matchFileName)
501 sqlCmd = "SELECT * FROM %s" % ( tableName )
502 self._iDb.execute( sqlCmd )
503 obsTMatchTuples = self._iDb.cursor.fetchall()
504
505 self._iDb.dropTable(tableName)
506 os.remove(matchFileName)
507
508 self.assertEquals( expTMatchTuples, obsTMatchTuples )
509
510 def test_createIndex_Path(self):
511 tableName = "dummyPathTable" + self._uniqId
512 sqlCmd = "CREATE TABLE %s ( path int unsigned, query_name varchar(255), query_start int , query_end int, subject_name varchar(255), subject_start int unsigned, subject_end int unsigned, E_value double, score int unsigned, identity float)" % ( tableName )
513 self._iDb.execute( sqlCmd )
514 expLIndex = ["id", "qname", "qstart", "qend", "sname", "sstart", "send", "qcoord", "qcoord"]
515
516 self._iDb.createIndex(tableName, "path")
517
518 sqlCmd = "SHOW INDEX FROM %s" % ( tableName )
519 self._iDb.execute( sqlCmd )
520 results = self._iDb.cursor.fetchall()
521
522 for index in expLIndex[:-1]:
523 sqlCmd = "DROP INDEX %s ON %s" % ( index, tableName )
524 self._iDb.execute( sqlCmd )
525 self._iDb.dropTable(tableName)
526
527 obsLIndex = []
528 for tuple in results:
529 obsLIndex.append(tuple[2])
530
531 self.assertEquals( expLIndex, obsLIndex)
532
533 def test_createIndex_Path_id_and_send_index_already_exist(self):
534 tableName = "dummyPathTable" + self._uniqId
535 sqlCmd = "CREATE TABLE %s ( path int unsigned, query_name varchar(255), query_start int , query_end int, subject_name varchar(255), subject_start int unsigned, subject_end int unsigned, E_value double, score int unsigned, identity float)" % ( tableName )
536 self._iDb.execute( sqlCmd )
537 sqlCmd = "CREATE INDEX id ON %s ( path );" % (tableName)
538 self._iDb.execute( sqlCmd )
539 sqlCmd = "CREATE INDEX send ON %s ( subject_end );" % (tableName)
540 self._iDb.execute( sqlCmd )
541 expLIndex = ["id", "send", "qname", "qstart", "qend", "sname", "sstart", "qcoord", "qcoord"]
542
543 self._iDb.createIndex(tableName, "path")
544
545 sqlCmd = "SHOW INDEX FROM %s" % ( tableName )
546 self._iDb.execute( sqlCmd )
547 results = self._iDb.cursor.fetchall()
548
549 for index in expLIndex[:-1]:
550 sqlCmd = "DROP INDEX %s ON %s" % ( index, tableName )
551 self._iDb.execute( sqlCmd )
552 self._iDb.dropTable(tableName)
553
554 obsLIndex = []
555 for tuple in results:
556 obsLIndex.append(tuple[2])
557
558 self.assertEquals( expLIndex, obsLIndex)
559
560 def test_createTable_path( self ):
561 tableName = "dummyPathTable" + self._uniqId
562 pathFileName = "dummyPathFile.txt"
563 pathF = open( pathFileName, "w" )
564 pathF.write( "1\tqry\t1\t100\tsbj\t1\t100\t1e-123\t136\t98.4\n" )
565 pathF.write( "2\tqry\t500\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" )
566 pathF.close()
567
568 expPathTuple1 = (1L, "qry", 1L, 100L, "sbj", 1L, 100L, 1e-123, 136L, 98.4)
569 expPathTuple2 = (2L, "qry", 401L, 500L, "sbj", 100L, 1L, 1e-152, 161L, 98.7) # change coordinates
570 expTPathTuples = (expPathTuple1, expPathTuple2)
571
572 self._iDb.createTable( tableName, "path", pathFileName)
573
574 sqlCmd = "SELECT * FROM %s" % ( tableName )
575 self._iDb.execute( sqlCmd )
576 obsTPathTuples = self._iDb.cursor.fetchall()
577
578 self._iDb.dropTable(tableName)
579 os.remove(pathFileName)
580
581 self.assertEquals( expTPathTuples, obsTPathTuples )
582
583 def test_createIndex_align(self):
584 tableName = "dummyAlignTable" + self._uniqId
585 sqlCmd = "CREATE TABLE %s ( query_name varchar(255), query_start int, query_end int,subject_name varchar(255), subject_start int unsigned, subject_end int unsigned,E_value double, score int unsigned, identity float)" % ( tableName )
586 self._iDb.execute( sqlCmd )
587 expLIndex = ["qname", "qstart", "qend", "sname", "sstart", "send", "qcoord", "qcoord"]
588
589 self._iDb.createIndex(tableName, "align")
590
591 sqlCmd = "SHOW INDEX FROM %s" % ( tableName )
592 self._iDb.execute( sqlCmd )
593 results = self._iDb.cursor.fetchall()
594
595 for index in expLIndex[:-1]:
596 sqlCmd = "DROP INDEX %s ON %s" % ( index, tableName )
597 self._iDb.execute( sqlCmd )
598 self._iDb.dropTable(tableName)
599
600 obsLIndex = []
601 for tuple in results:
602 obsLIndex.append(tuple[2])
603
604 self.assertEquals( expLIndex, obsLIndex)
605
606 def test_createIndex_align_qstart_index_already_exist(self):
607 tableName = "dummyAlignTable" + self._uniqId
608 sqlCmd = "CREATE TABLE %s ( query_name varchar(255), query_start int, query_end int,subject_name varchar(255), subject_start int unsigned, subject_end int unsigned,E_value double, score int unsigned, identity float)" % ( tableName )
609 self._iDb.execute( sqlCmd )
610 sqlCmd = "CREATE INDEX qstart ON %s ( query_start );" % (tableName)
611 self._iDb.execute( sqlCmd )
612 expLIndex = ["qstart", "qname", "qend", "sname", "sstart", "send", "qcoord", "qcoord"]
613
614 self._iDb.createIndex(tableName, "align")
615
616 sqlCmd = "SHOW INDEX FROM %s" % ( tableName )
617 self._iDb.execute( sqlCmd )
618 results = self._iDb.cursor.fetchall()
619
620 for index in expLIndex[:-1]:
621 sqlCmd = "DROP INDEX %s ON %s" % ( index, tableName )
622 self._iDb.execute( sqlCmd )
623 self._iDb.dropTable(tableName)
624
625 obsLIndex = []
626 for tuple in results:
627 obsLIndex.append(tuple[2])
628
629 self.assertEquals( expLIndex, obsLIndex)
630
631 def test_createTable_align( self ):
632 tableName = "dummyAlignTable" + self._uniqId
633 alignFileName = "dummyAlignFile.txt"
634 alignF = open( alignFileName, "w" )
635 alignF.write( "query1\t1\t100\tsubject1\t1\t150\t0.5\t15\t35\n" )
636 alignF.write( "query2\t1\t100\tsubject2\t1\t150\t0.5\t15\t35\n" )
637 alignF.close()
638
639 expAlignTuple1 = ("query1", 1L, 100L, "subject1", 1L, 150L, 0.5, 15L, 35)
640 expAlignTuple2 = ("query2", 1L, 100L, "subject2", 1L, 150L, 0.5, 15L, 35)
641 expTAlignTuples = (expAlignTuple1, expAlignTuple2)
642
643 self._iDb.createTable( tableName, "align", alignFileName )
644
645 sqlCmd = "SELECT * FROM %s" % ( tableName )
646 self._iDb.execute( sqlCmd )
647 obsTAlignTuples = self._iDb.cursor.fetchall()
648
649 self._iDb.dropTable(tableName)
650 os.remove(alignFileName)
651
652 self.assertEquals( expTAlignTuples, obsTAlignTuples )
653
654 def test_createIndex_set(self):
655 tableName = "dummySetTable" + self._uniqId
656 sqlCmd = "CREATE TABLE %s ( path int unsigned, name varchar(255), chr varchar(255), start int, end int)" % ( tableName )
657 self._iDb.execute( sqlCmd )
658 expLIndex = ["id", "iname", "ichr", "istart", "iend", "icoord", "icoord"]
659
660 self._iDb.createIndex(tableName, "set")
661
662 sqlCmd = "SHOW INDEX FROM %s" % ( tableName )
663 self._iDb.execute( sqlCmd )
664 results = self._iDb.cursor.fetchall()
665
666 for index in expLIndex[:-1]:
667 sqlCmd = "DROP INDEX %s ON %s" % ( index, tableName )
668 self._iDb.execute( sqlCmd )
669 self._iDb.dropTable(tableName)
670
671 obsLIndex = []
672 for tuple in results:
673 obsLIndex.append(tuple[2])
674
675 self.assertEquals( expLIndex, obsLIndex)
676
677 def test_createIndex_set_id_index_already_exist(self):
678 tableName = "dummySetTable" + self._uniqId
679 sqlCmd = "CREATE TABLE %s ( path int unsigned, name varchar(255), chr varchar(255), start int, end int)" % ( tableName )
680 self._iDb.execute( sqlCmd )
681 sqlCmd = "CREATE INDEX id ON %s ( path );" % (tableName)
682 self._iDb.execute( sqlCmd )
683 expLIndex = ["id", "iname", "ichr", "istart", "iend", "icoord", "icoord"]
684
685 self._iDb.createIndex(tableName, 'set')
686
687 sqlCmd = "SHOW INDEX FROM %s" % ( tableName )
688 self._iDb.execute( sqlCmd )
689 results = self._iDb.cursor.fetchall()
690
691 for index in expLIndex[:-1]:
692 sqlCmd = "DROP INDEX %s ON %s" % ( index, tableName )
693 self._iDb.execute( sqlCmd )
694 self._iDb.dropTable(tableName)
695
696 obsLIndex = []
697 for tuple in results:
698 obsLIndex.append(tuple[2])
699
700 self.assertEquals( expLIndex, obsLIndex)
701
702 def test_createTable_set( self ):
703 tableName = "dummySetTable" + self._uniqId
704 setFileName = "dummySetFile.txt"
705 setF = open( setFileName, "w" )
706 setF.write( "15\tset1\tchr1\t1\t100\n" )
707 setF.write( "15\tset2\tchr2\t1\t100\n" )
708 setF.close()
709
710 expSetTuple1 = (15L, "set1", "chr1", 1L, 100L)
711 expSetTuple2 = (15L, "set2", "chr2", 1L, 100L)
712 expTSetTuples = (expSetTuple1, expSetTuple2)
713
714 self._iDb.createTable( tableName, 'set', setFileName )
715
716 sqlCmd = "SELECT * FROM %s" % ( tableName )
717 self._iDb.execute( sqlCmd )
718 obsTSetTuples = self._iDb.cursor.fetchall()
719
720 self._iDb.dropTable(tableName)
721 os.remove(setFileName)
722
723 self.assertEquals( expTSetTuples, obsTSetTuples )
724
725 def test_convertMapTableIntoSetTable( self ):
726 mapTableName = "dummyMapTable" + self._uniqId
727 mapFileName = "dummyMapFile.txt"
728 with open(mapFileName, "w") as mapFH:
729 mapFH.write("map1\tchr1\t1\t100\n")
730 mapFH.write("map2\tchr2\t1\t100\n")
731
732 self._iDb.createTable(mapTableName, 'map', mapFileName)
733
734 expSetTuple1 = (1, "map1", "chr1", 1, 100)
735 expSetTuple2 = (2, "map2", "chr2", 1, 100)
736 expTSetTuples = (expSetTuple1, expSetTuple2)
737
738 setTableName = "dummySetTable" + self._uniqId
739 self._iDb.convertMapTableIntoSetTable(mapTableName, setTableName)
740
741 sqlCmd = "SELECT * FROM %s" % setTableName
742 self._iDb.execute(sqlCmd)
743 obsTSetTuples = self._iDb.cursor.fetchall()
744
745 self._iDb.dropTable(mapTableName)
746 self._iDb.dropTable(setTableName)
747 os.remove(mapFileName)
748
749 self.assertEquals( expTSetTuples, obsTSetTuples )
750
751 def test_createIndex_seq(self):
752 tableName = "dummySeqTable" + self._uniqId
753 sqlCmd = "CREATE TABLE %s ( accession varchar(255), sequence longtext, description varchar(255), length int unsigned)" % ( tableName )
754 self._iDb.execute( sqlCmd )
755 expLIndex = ["iacc", "idescr"]
756
757 self._iDb.createIndex(tableName,'seq')
758
759 sqlCmd = "SHOW INDEX FROM %s" % ( tableName )
760 self._iDb.execute( sqlCmd )
761 results = self._iDb.cursor.fetchall()
762
763 for index in expLIndex:
764 sqlCmd = "DROP INDEX %s ON %s" % ( index, tableName )
765 self._iDb.execute( sqlCmd )
766 self._iDb.dropTable(tableName)
767
768 obsLIndex = []
769 for tuple in results:
770 obsLIndex.append(tuple[2])
771
772 self.assertEquals(expLIndex, obsLIndex)
773
774 def test_createIndex_seq_idescr_index_already_exist(self):
775 tableName = "dummySeqTable" + self._uniqId
776 sqlCmd = "CREATE TABLE %s ( accession varchar(255), sequence longtext, description varchar(255), length int unsigned);" % ( tableName )
777 self._iDb.execute( sqlCmd )
778 sqlCmd = "CREATE INDEX idescr ON %s ( description(10) );" % ( tableName )
779 self._iDb.execute( sqlCmd )
780 expLIndex = ["idescr", "iacc"]
781
782 self._iDb.createIndex(tableName,'seq')
783
784 sqlCmd = "SHOW INDEX FROM %s" % ( tableName )
785 self._iDb.execute( sqlCmd )
786 results = self._iDb.cursor.fetchall()
787
788 for index in expLIndex:
789 sqlCmd = "DROP INDEX %s ON %s" % ( index, tableName )
790 self._iDb.execute( sqlCmd )
791 self._iDb.dropTable(tableName)
792
793 obsLIndex = []
794 for tuple in results:
795 obsLIndex.append(tuple[2])
796
797 self.assertEquals(expLIndex, obsLIndex)
798
799 def test_createTable_seq( self ):
800 tableName = "dummySeqTable" + self._uniqId
801 seqFileName = "dummySeqFile.txt"
802 seqF = open( seqFileName, "w" )
803 seqF.write( ">acc1 seq1\n" )
804 seqF.write( "ATACTTCGCTAGCTCGC\n" )
805 seqF.write( ">acc2 seq2\n" )
806 seqF.write( "ATACTTCGCTAGCTCGCATACTTCGCTAGCTCGCATACTTCGCTAGCTCGCATACTTCGCTAGCTCGC\n" )
807 seqF.close()
808
809 expSeqTuple1 = ("acc1", "ATACTTCGCTAGCTCGC", "acc1 seq1", 17L)
810 expSeqTuple2 = ("acc2", "ATACTTCGCTAGCTCGCATACTTCGCTAGCTCGCATACTTCGCTAGCTCGCATACTTCGCTAGCTCGC", "acc2 seq2", 68L)
811 expTSeqTuples = (expSeqTuple1, expSeqTuple2)
812
813 self._iDb.createTable( tableName,'seq', seqFileName )
814
815 sqlCmd = "SELECT * FROM %s" % ( tableName )
816 self._iDb.execute( sqlCmd )
817 obsTSeqTuples = self._iDb.cursor.fetchall()
818
819 self._iDb.dropTable(tableName)
820 os.remove(seqFileName)
821
822 self.assertEquals( expTSeqTuples, obsTSeqTuples )
823
824 def test_createIndex_job(self):
825 tableName = "dummyTable%s" % self._uniqId
826 sqlCmd = "CREATE TABLE %s" % tableName
827 sqlCmd += " ( jobid INT UNSIGNED"
828 sqlCmd += ", jobname VARCHAR(255)"
829 sqlCmd += ", groupid VARCHAR(255)"
830 sqlCmd += ", command TEXT"
831 sqlCmd += ", launcher VARCHAR(1024)"
832 sqlCmd += ", queue VARCHAR(255)"
833 sqlCmd += ", status VARCHAR(255)"
834 sqlCmd += ", time DATETIME"
835 sqlCmd += ", node VARCHAR(255) )"
836 self._iDb.execute(sqlCmd)
837 expLIndex = ["ijobid", "ijobname", "igroupid", "istatus"]
838
839 self._iDb.createIndex(tableName, 'jobs')
840
841 sqlCmd = "SHOW INDEX FROM %s" % tableName
842 self._iDb.execute(sqlCmd)
843 results = self._iDb.cursor.fetchall()
844
845 obsLIndex = []
846 for tuple in results:
847 obsLIndex.append(tuple[2])
848
849 for index in obsLIndex:
850 sqlCmd = "DROP INDEX %s ON %s" % (index, tableName)
851 self._iDb.execute(sqlCmd)
852 self._iDb.dropTable(tableName)
853
854 self.assertEquals(expLIndex, obsLIndex)
855
856 def test_createTable_job( self ):
857 tableName = "dummyTable%s" % self._uniqId
858 expTuples = ()
859
860 self._iDb.createTable(tableName,'jobs')
861
862 sqlCmd = "SELECT * FROM %s" % tableName
863 self._iDb.execute(sqlCmd)
864 obsTuples = self._iDb.cursor.fetchall()
865 self._iDb.dropTable(tableName)
866
867 self.assertEquals(expTuples, obsTuples)
868
869 def test_createIndex_length(self):
870 tableName = "dummyTable%s" % self._uniqId
871 sqlCmd = "CREATE TABLE %s (accession varchar(255), length int unsigned)" % tableName
872 self._iDb.execute(sqlCmd)
873 expLIndex = ["iacc", "ilength"]
874
875 self._iDb.createIndex(tableName,'length')
876
877 sqlCmd = "SHOW INDEX FROM %s" % tableName
878 self._iDb.execute(sqlCmd)
879 results = self._iDb.cursor.fetchall()
880
881 obsLIndex = []
882 for tuple in results:
883 obsLIndex.append(tuple[2])
884
885 for index in obsLIndex:
886 sqlCmd = "DROP INDEX %s ON %s" % (index, tableName)
887 self._iDb.execute(sqlCmd)
888 self._iDb.dropTable(tableName)
889
890 self.assertEquals(expLIndex, obsLIndex)
891
892 def test_createTable_length( self ):
893 tableName = "dummyLengthTable%s" % self._uniqId
894 seqFileName = "dummyFile.fa"
895 seqF = open( seqFileName, "w" )
896 seqF.write(">acc1 seq1\n")
897 seqF.write("ATACTTCGCTAGCTCGC\n")
898 seqF.write(">acc2 seq2\n")
899 seqF.write("ATACTTCGCTAGCTCGCATACTTCGCTAGCTCGCATACTTCGCTAGCTCGCATACTTCGCTAGCTCGC\n")
900 seqF.close()
901
902 expTuple1 = ("acc1", 17)
903 expTuple2 = ("acc2", 68)
904 expTTuples = (expTuple1, expTuple2)
905
906 self._iDb.createTable(tableName, "length", seqFileName)
907
908 sqlCmd = "SELECT * FROM %s" % tableName
909 self._iDb.execute(sqlCmd)
910 obsTTuples = self._iDb.cursor.fetchall()
911
912 self._iDb.dropTable(tableName)
913 os.remove(seqFileName)
914
915 self.assertEquals(expTTuples, obsTTuples)
916
917 def test_createTable_with_overwrite_Map( self ):
918 tableName = "dummyMapTable" + self._uniqId
919 sqlCmd = "CREATE TABLE %s ( dummyColumn varchar(255) )" % ( tableName )
920 self._iDb.execute( sqlCmd )
921
922 fileName = "dummyMapFile.txt"
923 mapF = open( fileName, "w" )
924 mapF.write( "map1\tseq1\t20\t50\n" )
925 mapF.write( "map2\tseq2\t700\t760\n" )
926 mapF.close()
927
928 expMapTuple1 = ("map1", "seq1", 20L, 50L)
929 expMapTuple2 = ("map2", "seq2", 700L, 760L)
930 expTMapTuples = (expMapTuple1, expMapTuple2)
931
932 self._iDb.createTable(tableName, "Map", fileName, True)
933
934 sqlCmd = "SELECT * FROM %s" % ( tableName )
935 self._iDb.execute( sqlCmd )
936 obsTMapTuples = self._iDb.cursor.fetchall()
937
938 self._iDb.dropTable(tableName)
939 os.remove(fileName)
940
941 self.assertEquals( expTMapTuples, obsTMapTuples )
942
943 def test_createTable_without_overwrite_Align( self ):
944 tableName = "dummyAlignTable" + self._uniqId
945 alignFileName = "dummyAlignFile.txt"
946 alignF = open( alignFileName, "w" )
947 alignF.write( "query1\t1\t100\tsubject1\t1\t150\t0.5\t15\t35\n" )
948 alignF.write( "query2\t1\t100\tsubject2\t1\t150\t0.5\t15\t35\n" )
949 alignF.close()
950
951 expAlignTuple1 = ("query1", 1L, 100L, "subject1", 1L, 150L, 0.5, 15L, 35)
952 expAlignTuple2 = ("query2", 1L, 100L, "subject2", 1L, 150L, 0.5, 15L, 35)
953 expTAlignTuples = (expAlignTuple1, expAlignTuple2)
954
955 self._iDb.createTable(tableName, "align", alignFileName, False)
956
957 sqlCmd = "SELECT * FROM %s" % ( tableName )
958 self._iDb.execute( sqlCmd )
959 obsTAlignTuples = self._iDb.cursor.fetchall()
960
961 self._iDb.dropTable(tableName)
962 os.remove(alignFileName)
963
964 self.assertEquals( expTAlignTuples, obsTAlignTuples )
965
966 def test_createTable_without_overwrite_Match( self ):
967 tableName = "dummyMatchTable" + self._uniqId
968 matchFileName = "dummyMatchFile.txt"
969 matchF = open( matchFileName, "w" )
970 matchF.write( "qry1\t700\t760\t60\t100\t100\tsbj2\t500\t560\t60\t100\t1e-123\t136\t98.4\t2\n" )
971 matchF.write( "qry2\t700\t760\t60\t100\t100\tsbj2\t500\t560\t60\t100\t1e-123\t136\t98.4\t2\n" )
972 matchF.close()
973
974 expMatchTuple = ("qry2", 700L, 760L, 60L, 100.0, 100.0, "sbj2", 500L, 560L, 60L, 100.0, 1e-123, 136L, 98.4, 2L)
975 expTMatchTuples = (expMatchTuple,)
976
977 self._iDb.createTable(tableName, "tab", matchFileName, False)
978
979 sqlCmd = "SELECT * FROM %s" % ( tableName )
980 self._iDb.execute( sqlCmd )
981 obsTMatchTuples = self._iDb.cursor.fetchall()
982
983 self._iDb.dropTable(tableName)
984 os.remove(matchFileName)
985
986 self.assertEquals( expTMatchTuples, obsTMatchTuples )
987
988 def test_createTable_without_overwrite_Path( self ):
989 tableName = "dummyPathTable" + self._uniqId
990 pathFileName = "dummyPathFile.txt"
991 pathF = open( pathFileName, "w" )
992 pathF.write( "1\tqry\t1\t100\tsbj\t1\t100\t1e-123\t136\t98.4\n" )
993 pathF.write( "2\tqry\t500\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" )
994 pathF.close()
995
996 expPathTuple1 = (1L, "qry", 1L, 100L, "sbj", 1L, 100L, 1e-123, 136L, 98.4)
997 expPathTuple2 = (2L, "qry", 401L, 500L, "sbj", 100L, 1L, 1e-152, 161L, 98.7) # change coordinates
998 expTPathTuples = (expPathTuple1, expPathTuple2)
999
1000 self._iDb.createTable(tableName, "Path", pathFileName, False)
1001
1002 sqlCmd = "SELECT * FROM %s" % ( tableName )
1003 self._iDb.execute( sqlCmd )
1004 obsTPathTuples = self._iDb.cursor.fetchall()
1005
1006 self._iDb.dropTable(tableName)
1007 os.remove(pathFileName)
1008
1009 self.assertEquals( expTPathTuples, obsTPathTuples )
1010
1011 def test_createTable_without_overwrite_Set( self ):
1012 tableName = "dummySetTable" + self._uniqId
1013 setFileName = "dummySetFile.txt"
1014 setF = open( setFileName, "w" )
1015 setF.write( "15\tset1\tchr1\t1\t100\n" )
1016 setF.write( "15\tset2\tchr2\t1\t100\n" )
1017 setF.close()
1018
1019 expSetTuple1 = (15L, "set1", "chr1", 1L, 100L)
1020 expSetTuple2 = (15L, "set2", "chr2", 1L, 100L)
1021 expTSetTuples = (expSetTuple1, expSetTuple2)
1022
1023 self._iDb.createTable(tableName, "Set", setFileName, False)
1024
1025 sqlCmd = "SELECT * FROM %s" % ( tableName )
1026 self._iDb.execute( sqlCmd )
1027 obsTSetTuples = self._iDb.cursor.fetchall()
1028
1029 self._iDb.dropTable(tableName)
1030 os.remove(setFileName)
1031
1032 self.assertEquals( expTSetTuples, obsTSetTuples )
1033
1034 def test_createTable_without_overwrite_Seq( self ):
1035 tableName = "dummySeqTable" + self._uniqId
1036 seqFileName = "dummySeqFile.txt"
1037 seqF = open( seqFileName, "w" )
1038 seqF.write( ">acc1 seq1\n" )
1039 seqF.write( "ATACTTCGCTAGCTCGC\n" )
1040 seqF.write( ">acc2 seq2\n" )
1041 seqF.write( "ATACTTCGCTAGCTCGCATACTTCGCTAGCTCGCATACTTCGCTAGCTCGCATACTTCGCTAGCTCGC\n" )
1042 seqF.close()
1043
1044 expSeqTuple1 = ("acc1", "ATACTTCGCTAGCTCGC", "acc1 seq1", 17L)
1045 expSeqTuple2 = ("acc2", "ATACTTCGCTAGCTCGCATACTTCGCTAGCTCGCATACTTCGCTAGCTCGCATACTTCGCTAGCTCGC", "acc2 seq2", 68L)
1046 expTSeqTuples = (expSeqTuple1, expSeqTuple2)
1047
1048 self._iDb.createTable(tableName, "fasta", seqFileName, False)
1049
1050 sqlCmd = "SELECT * FROM %s" % ( tableName )
1051 self._iDb.execute( sqlCmd )
1052 obsTSeqTuples = self._iDb.cursor.fetchall()
1053
1054 self._iDb.dropTable(tableName)
1055 os.remove(seqFileName)
1056
1057 self.assertEquals( expTSeqTuples, obsTSeqTuples )
1058
1059 def test_createTable_with_overwrite_Classif( self ):
1060 tableName = "dummyClassifTable" + self._uniqId
1061 classifFileName = "dummyClassifFile.txt"
1062 with open( classifFileName, "w" ) as f:
1063 f.write("RIX-incomp-chim_DmelCaf1_2_0-B-G1000-Map3\t3508\t-\tPotentialChimeric\tI\tLINE\tincomplete\tCI=36; coding=(TE_BLRtx: DMCR1A:ClassI:LINE:Jockey: 14.16%); struct=(TElength: >700bps)\n")
1064 f.write("RLX-incomp_DmelCaf1_2_0-B-G1019-Map3\t4131\t+\tok\tI\tLTR\tincomplete\tCI=28; coding=(TE_BLRtx: ROO_I:ClassI:LTR:Bel-Pao: 43.27%, ROO_LTR:ClassI:LTR:Bel-Pao: 100.00%; TE_BLRx: BEL-6_DWil-I_2p:ClassI:LTR:Bel-Pao: 69.84%); struct=(TElength: >4000bps); other=(HG_BLRn: FBtr0087866_Dmel_r4.3: 4.72%; SSRCoverage=0.15<0.75)\n")
1065
1066 self._iDb.createTable(tableName, "Classif", classifFileName, True)
1067
1068 self.assertTrue(self._iDb.getSize(tableName) == 2)
1069 self._iDb.dropTable(tableName)
1070 os.remove(classifFileName)
1071
1072 def test_createTable_no_file( self ):
1073 lTypesToTest = TABLE_SCHEMA_DESCRIPTOR.keys()
1074 lTypesToTest.extend(TABLE_TYPE_SYNONYMS)
1075 for tableType in lTypesToTest:
1076 tableName = "dummy%sTable%s" % (tableType, self._uniqId)
1077 self._iDb.createTable(tableName, tableType)
1078
1079 self.assertTrue(self._iDb.doesTableExist(tableName))
1080 self.assertTrue(self._iDb.isEmpty(tableName))
1081
1082 self._iDb.dropTable(tableName)
1083
1084 def test_changePathQueryCoordinatesToDirectStrand(self):
1085 tableName = "dummyPathTable" + self._uniqId
1086 pathFileName = "dummyPathFile.txt"
1087 pathF = open( pathFileName, "w" )
1088 pathF.write( "1\tqry\t100\t1\tsbj\t1\t100\t1e-123\t136\t98.4\n" )
1089 pathF.write( "2\tqry\t500\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" )
1090 pathF.write( "3\tqry\t5\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" )
1091 pathF.close()
1092
1093 expPathTuple1 = (1L, "qry", 1L, 100L, "sbj", 100L, 1L, 1e-123, 136L, 98.4)
1094 expPathTuple2 = (2L, "qry", 401L, 500L, "sbj", 100L, 1L, 1e-152, 161L, 98.7)
1095 expPathTuple3 = (3L, "qry", 5L, 401L, "sbj", 1L, 100L, 1e-152, 161L, 98.7)
1096 expTPathTuples = (expPathTuple1, expPathTuple2, expPathTuple3)
1097
1098 sqlCmd = "CREATE TABLE %s ( path int unsigned, query_name varchar(255), query_start int , query_end int, subject_name varchar(255), subject_start int unsigned, subject_end int unsigned, E_value double, score int unsigned, identity float)" % tableName
1099 self._iDb.execute( sqlCmd )
1100
1101 self._iDb.loadDataFromFile(tableName, pathFileName, False)
1102 self._iDb.changePathQueryCoordinatesToDirectStrand(tableName)
1103
1104 sqlCmd = "SELECT * FROM %s" % ( tableName )
1105 self._iDb.execute( sqlCmd )
1106 obsTPathTuples = self._iDb.cursor.fetchall()
1107
1108 self._iDb.dropTable(tableName)
1109 os.remove(pathFileName)
1110
1111 self.assertEquals( expTPathTuples, obsTPathTuples )
1112
1113 def test_exportDataToFile(self):
1114 tableName = "dummyPathTable" + self._uniqId
1115 expFileName = "dummyPathFile.txt"
1116 pathF = open( expFileName, "w" )
1117 pathF.write( "1\tqry\t1\t100\tsbj\t100\t1\t1e-123\t136\t98.4\n" )
1118 pathF.write( "2\tqry\t401\t500\tsbj\t100\t1\t1e-152\t161\t98.7\n" )
1119 pathF.write( "3\tqry\t5\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" )
1120 pathF.close()
1121 self._iDb.createTable(tableName, "Path", expFileName, False)
1122 obsFileName = "DummyObsFileName"
1123
1124 self._iDb.exportDataToFile(tableName, obsFileName)
1125
1126 self.assertTrue(FileUtils.isRessourceExists(obsFileName))
1127 self.assertTrue(FileUtils.are2FilesIdentical(expFileName, obsFileName))
1128
1129 self._iDb.dropTable(tableName)
1130 os.remove(expFileName)
1131 os.remove(obsFileName)
1132
1133 def test_exportDataToFile_keepFirstLineTrue(self):
1134 tableName = "dummyPathTable" + self._uniqId
1135 pathFileName = "dummyPathFile.txt"
1136 pathF = open( pathFileName, "w" )
1137 pathF.write( "1\tqry\t1\t100\tsbj\t100\t1\t1e-123\t136\t98.4\n" )
1138 pathF.write( "2\tqry\t401\t500\tsbj\t100\t1\t1e-152\t161\t98.7\n" )
1139 pathF.write( "3\tqry\t5\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" )
1140 pathF.close()
1141
1142 expFileName = "expPathFile.txt"
1143 pathF = open( expFileName, "w" )
1144 pathF.write("path\tquery_name\tquery_start\tquery_end\tsubject_name\tsubject_start\tsubject_end\tE_value\tscore\tidentity\n")
1145 pathF.write( "1\tqry\t1\t100\tsbj\t100\t1\t1e-123\t136\t98.4\n" )
1146 pathF.write( "2\tqry\t401\t500\tsbj\t100\t1\t1e-152\t161\t98.7\n" )
1147 pathF.write( "3\tqry\t5\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" )
1148 pathF.close()
1149
1150 self._iDb.createTable(tableName, "Path", pathFileName, False)
1151 obsFileName = "DummyObsFileName"
1152
1153 self._iDb.exportDataToFile(tableName, obsFileName, True)
1154
1155 self.assertTrue(FileUtils.isRessourceExists(obsFileName))
1156 self.assertTrue(FileUtils.are2FilesIdentical(expFileName, obsFileName))
1157
1158 self._iDb.dropTable(tableName)
1159 os.remove(expFileName)
1160 os.remove(obsFileName)
1161 os.remove(pathFileName)
1162
1163 def test_exportDataToFile_with_keepFirstLineTrue_and_param(self):
1164 tableName = "dummyPathTable" + self._uniqId
1165 pathFileName = "dummyPathFile.txt"
1166 pathF = open( pathFileName, "w" )
1167 pathF.write( "1\tqry\t1\t100\tsbj\t100\t1\t1e-123\t136\t98.4\n" )
1168 pathF.write( "2\tqry2\t401\t500\tsbj\t100\t1\t1e-152\t161\t98.7\n" )
1169 pathF.write( "3\tqry\t5\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" )
1170 pathF.close()
1171
1172 expFileName = "expPathFile.txt"
1173 pathF = open( expFileName, "w" )
1174 pathF.write("path\tquery_name\tquery_start\tquery_end\tsubject_name\tsubject_start\tsubject_end\tE_value\tscore\tidentity\n")
1175 pathF.write( "2\tqry2\t401\t500\tsbj\t100\t1\t1e-152\t161\t98.7\n" )
1176 pathF.close()
1177
1178 self._iDb.createTable(tableName, "Path", pathFileName, False)
1179 obsFileName = "DummyObsFileName"
1180
1181 self._iDb.exportDataToFile(tableName, obsFileName, True, "where query_name = 'qry2'")
1182
1183 self.assertTrue(FileUtils.isRessourceExists(obsFileName))
1184 self.assertTrue(FileUtils.are2FilesIdentical(expFileName, obsFileName))
1185
1186 self._iDb.dropTable(tableName)
1187 os.remove(expFileName)
1188 os.remove(obsFileName)
1189 os.remove(pathFileName)
1190
1191
1192 def test_convertPathTableIntoAlignTable( self ):
1193 inPathTable = "dummyInPathTable_%s" % ( self._uniqId )
1194 inPathFile = "dummyInPathFile_%s" % ( self._uniqId )
1195 inPathFileHandler = open( inPathFile, "w" )
1196 inPathFileHandler.write( "1\tqry\t1\t100\tsbj\t100\t1\t1e-123\t136\t98.4\n" )
1197 inPathFileHandler.write( "2\tqry2\t401\t500\tsbj\t100\t1\t1e-152\t161\t98.7\n" )
1198 inPathFileHandler.write( "3\tqry\t5\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" )
1199 inPathFileHandler.close()
1200 self._iDb.createTable( inPathTable, "path", inPathFile, True )
1201
1202 expAlignFile = "dummyExpAlignFile_%s" % ( self._uniqId )
1203 expAlignFileHandler = open( expAlignFile, "w" )
1204 expAlignFileHandler.write( "qry\t1\t100\tsbj\t100\t1\t1e-123\t136\t98.4\n" )
1205 expAlignFileHandler.write( "qry2\t401\t500\tsbj\t100\t1\t1e-152\t161\t98.7\n" )
1206 expAlignFileHandler.write( "qry\t5\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" )
1207 expAlignFileHandler.close()
1208 obsAlignTable = "dummyObsAlignTable_%s" % ( self._uniqId )
1209
1210 self._iDb.convertPathTableIntoAlignTable( inPathTable, obsAlignTable )
1211
1212 obsAlignFile = "dummyObsAlignFile_%s" % ( self._uniqId )
1213 self._iDb.exportDataToFile( obsAlignTable, obsAlignFile, False )
1214 self.assertTrue( FileUtils.are2FilesIdentical( expAlignFile, obsAlignFile ) )
1215
1216 for f in [ inPathFile, expAlignFile, obsAlignFile ]:
1217 os.remove( f )
1218 for t in [ inPathTable, obsAlignTable ]:
1219 self._iDb.dropTable( t )
1220
1221 def test_convertAlignTableIntoPathTable( self ):
1222 inAlignTable = "dummyInPathTable_%s" % ( self._uniqId )
1223 inAlignFile = "dummyInPathFile_%s" % ( self._uniqId )
1224 inAlignFileHandler = open( inAlignFile, "w" )
1225 inAlignFileHandler.write( "qry\t1\t100\tsbj\t100\t1\t1e-123\t136\t98.4\n" )
1226 inAlignFileHandler.write( "qry2\t401\t500\tsbj\t100\t1\t1e-152\t161\t98.7\n" )
1227 inAlignFileHandler.write( "qry3\t5\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" )
1228 inAlignFileHandler.close()
1229 self._iDb.createTable( inAlignTable, "align", inAlignFile, True )
1230
1231 expPathFile = "dummyExpPathFile_%s" % ( self._uniqId )
1232 expPathFileHandler = open( expPathFile, "w" )
1233 expPathFileHandler.write( "1\tqry\t1\t100\tsbj\t100\t1\t1e-123\t136\t98.4\n" )
1234 expPathFileHandler.write( "2\tqry2\t401\t500\tsbj\t100\t1\t1e-152\t161\t98.7\n" )
1235 expPathFileHandler.write( "3\tqry3\t5\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" )
1236 expPathFileHandler.close()
1237 obsPathTable = "dummyObsPathTable_%s" % ( self._uniqId )
1238
1239 self._iDb.convertAlignTableIntoPathTable( inAlignTable, obsPathTable )
1240
1241 obsPathFile = "dummyObsAlignFile_%s" % ( self._uniqId )
1242 self._iDb.exportDataToFile( obsPathTable, obsPathFile, False )
1243 self.assertTrue( FileUtils.are2FilesIdentical( expPathFile, obsPathFile ) )
1244
1245 for f in [ inAlignFile, expPathFile, obsPathFile ]:
1246 os.remove( f )
1247 for t in [ inAlignTable, obsPathTable ]:
1248 self._iDb.dropTable( t )
1249
1250 def test_convertAlignTableIntoPathTable_with_single_quote( self ):
1251 inAlignTable = "dummyInPathTable_%s" % ( self._uniqId )
1252 inAlignFile = "dummyInPathFile_%s" % ( self._uniqId )
1253 inAlignFileHandler = open( inAlignFile, "w" )
1254 inAlignFileHandler.write( "qry\t1\t100\t'sbj\t100\t1\t1e-123\t136\t98.4\n" )
1255 inAlignFileHandler.write( "qry2\t401\t500\tsbj'\t100\t1\t1e-152\t161\t98.7\n" )
1256 inAlignFileHandler.write( "qry3\t5\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" )
1257 inAlignFileHandler.close()
1258 self._iDb.createTable( inAlignTable, "align", inAlignFile, True )
1259
1260 expPathFile = "dummyExpPathFile_%s" % ( self._uniqId )
1261 expPathFileHandler = open( expPathFile, "w" )
1262 expPathFileHandler.write( "1\tqry\t1\t100\t'sbj\t100\t1\t1e-123\t136\t98.4\n" )
1263 expPathFileHandler.write( "2\tqry2\t401\t500\tsbj'\t100\t1\t1e-152\t161\t98.7\n" )
1264 expPathFileHandler.write( "3\tqry3\t5\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" )
1265 expPathFileHandler.close()
1266 obsPathTable = "dummyObsPathTable_%s" % ( self._uniqId )
1267
1268 self._iDb.convertAlignTableIntoPathTable( inAlignTable, obsPathTable )
1269
1270 obsPathFile = "dummyObsAlignFile_%s" % ( self._uniqId )
1271 self._iDb.exportDataToFile( obsPathTable, obsPathFile, False )
1272 self.assertTrue( FileUtils.are2FilesIdentical( expPathFile, obsPathFile ) )
1273
1274 for f in [ inAlignFile, expPathFile, obsPathFile ]:
1275 os.remove( f )
1276 for t in [ inAlignTable, obsPathTable ]:
1277 self._iDb.dropTable( t )
1278
1279 def test_getObjectListWithSQLCmd(self):
1280 inPathTable = "dummyInPathTable_%s" % ( self._uniqId )
1281 inPathFile = "dummyInPathFile_%s" % ( self._uniqId )
1282 inPathFileHandler = open( inPathFile, "w" )
1283 inPathFileHandler.write( "1\tqry\t100\t1\tsbj\t1\t100\t1e-123\t136\t98.4\n" )
1284 inPathFileHandler.write( "2\tqry\t500\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" )
1285 inPathFileHandler.write( "3\tqry\t5\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" )
1286 inPathFileHandler.close()
1287 self._iDb.createTable( inPathTable, "path", inPathFile, True )
1288
1289 path1 = Path()
1290 path1.setFromTuple((1, "qry", 1, 100, "sbj", 100, 1, 1e-123, 136, 98.4))
1291 path2 = Path()
1292 path2.setFromTuple((2, "qry", 401, 500, "sbj", 100, 1, 1e-152, 161, 98.7))
1293 path3 = Path()
1294 path3.setFromTuple((3, "qry", 5, 401, "sbj", 1, 100, 1e-152, 161, 98.7))
1295 expLPath = [path1, path2, path3]
1296 sqlCmd = "SELECT * FROM %s;" % (inPathTable)
1297 obsLPath = self._iDb.getObjectListWithSQLCmd(sqlCmd, self._getInstanceToAdapt)
1298
1299 os.remove( inPathFile )
1300 self._iDb.dropTable( inPathTable )
1301
1302 self.assertEquals(expLPath, obsLPath)
1303
1304 def test_getIntegerListWithSQLCmd(self):
1305 inPathTable = "dummyInPathTable_%s" % ( self._uniqId )
1306 inPathFile = "dummyInPathFile_%s" % ( self._uniqId )
1307 inPathFileHandler = open( inPathFile, "w" )
1308 inPathFileHandler.write( "1\tqry\t100\t1\tsbj\t1\t100\t1e-123\t136\t98.4\n" )
1309 inPathFileHandler.write( "2\tqry\t500\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" )
1310 inPathFileHandler.write( "3\tqry\t5\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" )
1311 inPathFileHandler.close()
1312 self._iDb.createTable( inPathTable, "path", inPathFile, True )
1313
1314 expLPath = [1, 2, 3]
1315 sqlCmd = "SELECT * FROM %s;" % (inPathTable)
1316 obsLPath = self._iDb.getIntegerListWithSQLCmd(sqlCmd)
1317
1318 os.remove( inPathFile )
1319 self._iDb.dropTable( inPathTable )
1320
1321 self.assertEquals(expLPath, obsLPath)
1322
1323 def test_getIntegerWithSQLCmd(self):
1324 inPathTable = "dummyInPathTable_%s" % ( self._uniqId )
1325 inPathFile = "dummyInPathFile_%s" % ( self._uniqId )
1326 inPathFileHandler = open( inPathFile, "w" )
1327 inPathFileHandler.write( "1\tqry\t100\t1\tsbj\t1\t100\t1e-123\t136\t98.4\n" )
1328 inPathFileHandler.write( "2\tqry\t500\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" )
1329 inPathFileHandler.write( "3\tqry\t5\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" )
1330 inPathFileHandler.close()
1331 self._iDb.createTable( inPathTable, "path", inPathFile, True )
1332
1333 expId = 1
1334 sqlCmd = "SELECT path FROM %s where path='%d';" % (inPathTable, 1)
1335 obsId = self._iDb.getIntegerWithSQLCmd(sqlCmd)
1336
1337 os.remove( inPathFile )
1338 self._iDb.dropTable( inPathTable )
1339
1340 self.assertEquals(expId, obsId)
1341
1342 def test_getStringListWithSQLCmd(self):
1343 inPathTable = "dummyInPathTable_%s" % ( self._uniqId )
1344 inPathFile = "dummyInPathFile_%s" % ( self._uniqId )
1345 inPathFileHandler = open( inPathFile, "w" )
1346 inPathFileHandler.write( "1\tqry\t100\t1\tsbj\t1\t100\t1e-123\t136\t98.4\n" )
1347 inPathFileHandler.write( "2\tqry\t500\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" )
1348 inPathFileHandler.write( "3\tqry\t5\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" )
1349 inPathFileHandler.close()
1350 self._iDb.createTable( inPathTable, "path", inPathFile, True )
1351
1352 expLString = ["qry","qry","qry"]
1353 sqlCmd = "SELECT query_name FROM %s;" % (inPathTable)
1354 obsLString = self._iDb.getStringListWithSQLCmd(sqlCmd)
1355
1356 os.remove( inPathFile )
1357 self._iDb.dropTable( inPathTable )
1358
1359 self.assertEquals(expLString, obsLString)
1360
1361 def test_removeDoublons( self ):
1362 inPathTable = "dummyInPathTable_%s" % ( self._uniqId )
1363 inPathFile = "dummyInPathFile_%s" % ( self._uniqId )
1364 inPathFileHandler = open( inPathFile, "w" )
1365 inPathFileHandler.write( "1\tqry\t1\t100\tsbj\t1\t100\t1e-123\t136\t98.4\n" )
1366 inPathFileHandler.write( "2\tqry\t401\t500\tsbj\t1\t100\t1e-152\t161\t98.7\n" )
1367 inPathFileHandler.write( "2\tqry\t401\t500\tsbj\t1\t100\t1e-152\t161\t98.7\n" )
1368 inPathFileHandler.close()
1369 self._iDb.createTable( inPathTable, "path", inPathFile, True )
1370
1371 expFile = "dummyExpFile_%s" % ( self._uniqId )
1372 expFileHandler = open( expFile, "w" )
1373 expFileHandler.write( "1\tqry\t1\t100\tsbj\t1\t100\t1e-123\t136\t98.4\n" )
1374 expFileHandler.write( "2\tqry\t401\t500\tsbj\t1\t100\t1e-152\t161\t98.7\n" )
1375 expFileHandler.close()
1376
1377 self._iDb.removeDoublons( inPathTable )
1378
1379 obsFile = "dummyObsFile_%s" % ( self._uniqId )
1380 self._iDb.exportDataToFile(inPathTable, obsFile)
1381
1382 self.assertTrue( FileUtils.are2FilesIdentical( expFile, obsFile ) )
1383
1384 self._iDb.dropTable( inPathTable )
1385 for f in [ inPathFile, expFile, obsFile ]:
1386 os.remove( f )
1387
1388 def test_getTableListFromPattern_oneTable( self ):
1389 inTable = "dummyInTable_%s" % ( self._uniqId )
1390 self._iDb.createTable( inTable, "path", "", True )
1391 exp = [ inTable ]
1392 obs = self._iDb.getTableListFromPattern( "%s%%" % inTable )
1393 self.assertEqual( exp, obs )
1394 self._iDb.dropTable( inTable )
1395
1396 def test_getTableListFromPattern_twoTables( self ):
1397 inTable1 = "dummyInTable1_%s" % ( self._uniqId )
1398 inTable2 = "dummyInTable2_%s" % ( self._uniqId )
1399 inTable3 = "dummyTotoTable3_%s" % ( self._uniqId )
1400 for table in [ inTable1, inTable2, inTable3 ]:
1401 self._iDb.createTable( table, "path", "", True )
1402 exp = [ inTable1, inTable2 ]
1403 obs = self._iDb.getTableListFromPattern( "dummyInTable%%_%s" % self._uniqId )
1404 self.assertEqual( exp, obs )
1405 for table in [ inTable1, inTable2, inTable3 ]:
1406 self._iDb.dropTable( table )
1407
1408 def test_createPathStatTable(self):
1409 statsFileName = "DmelCaf1_statsPerClassif.txt"
1410 f = open (statsFileName, "w")
1411 f.write("family\tmaxLength\tmeanLength\tcovg\tfrags\tfullLgthFrags\tcopies\tfullLgthCopies\tmeanId\tsdId\tminId\tq25Id\tmedId\tq75Id\tmaxId\tmeanLgth\tsdLgth\tminLgth\tq25Lgth\tmedLgth\tq75Lgth\tmaxLgth\tmeanLgthPerc\tsdLgthPerc\tminLgthPerc\tq25LgthPerc\tmedLgthPerc\tq75LgthPerc\tmaxLgthPerc\n")
1412 f.write("Helitron\t2367\t2367\t138367\t852\t0\t803\t0\t81.20\t4.24\t68.55\t78.32\t81.03\t83.49\t100.00\t172.46\t184.92\t21\t70.00\t129.00\t216.00\t2202\t7.29\t7.81\t0.89\t2.96\t5.45\t9.13\t93.03\n")
1413 f.write("LINE\t7688\t7688\t3769377\t8358\t10\t6329\t10\t85.52\t8.02\t62.80\t79.27\t83.33\t92.88\t100.00\t597.97\t980.29\t21\t117.00\t256.00\t537.00\t7726\t7.78\t12.75\t0.27\t1.52\t3.33\t6.98\t100.49\n")
1414 f.write("LTR\t13754\t13754\t9146587\t20749\t0\t17868\t1\t82.69\t7.39\t58.76\t77.81\t80.82\t85.67\t100.00\t519.75\t1217.12\t20\t105.00\t183.50\t336.00\t13738\t3.78\t8.85\t0.15\t0.76\t1.33\t2.44\t99.88\n")
1415 f.write("MITE\t378\t378\t2890\t10\t3\t9\t3\t98.78\t1.20\t95.80\t98.64\t99.18\t99.46\t99.73\t325.33\t47.86\t253\t290.00\t333.00\t362.00\t390\t86.07\t12.66\t66.93\t76.72\t88.10\t95.77\t103.17\n")
1416 f.write("NoCat\t9999\t9999\t384076\t1297\t1\t1219\t1\t82.60\t6.73\t61.20\t78.37\t81.41\t85.29\t100.00\t323.01\t686.85\t21\t64.00\t139.00\t280.00\t10000\t3.23\t6.87\t0.21\t0.64\t1.39\t2.80\t100.01\n")
1417 f.write("SSR\t680\t680\t325152\t2340\t24\t2290\t28\t79.07\t3.60\t69.19\t76.64\t79.02\t81.10\t97.83\t221.64\t139.84\t21\t121.00\t183.00\t285.00\t799\t32.59\t20.57\t3.09\t17.79\t26.91\t41.91\t117.50\n")
1418 f.write("TIR\t2532\t2532\t700173\t2503\t5\t2160\t5\t84.70\t7.43\t64.03\t79.46\t82.77\t90.09\t100.00\t326.54\t405.94\t21\t90.00\t187.00\t342.00\t2758\t12.90\t16.03\t0.83\t3.55\t7.39\t13.51\t108.93\n")
1419 f.write("confused\t19419\t19419\t1299224\t3903\t0\t3311\t0\t82.30\t6.34\t63.20\t78.17\t80.81\t84.58\t100.00\t408.22\t989.57\t21\t113.00\t207.00\t339.00\t17966\t2.10\t5.10\t0.11\t0.58\t1.07\t1.75\t92.52\n")
1420 f.close()
1421 tableName = "dummyDmelCaf1_chr_allTEs_nr_noSSR_join_path_statsPerClassif"
1422 self._iDb.createTable(tableName, "pathstat", statsFileName)
1423
1424 self.assertTrue(self._iDb.doesTableExist(tableName))
1425
1426 expSize = 8
1427 obsSize = self._iDb.getSize(tableName)
1428 self.assertEquals(expSize, obsSize)
1429
1430 expColumnNb = 29
1431 sqlCmd = "DESC %s;" % tableName
1432 self._iDb.execute(sqlCmd)
1433 res = self._iDb.fetchall()
1434 obsColumnNb = len(res)
1435 self.assertEquals(expColumnNb, obsColumnNb)
1436
1437 self._iDb.dropTable(tableName)
1438 os.remove(statsFileName)
1439
1440 def test_createJobTable_is_table_created(self):
1441 tableName = "dummyJobTable" + self._uniqId
1442 self._iDb.createTable(tableName, "jobs")
1443 self.assertTrue(self._iDb.doesTableExist(tableName))
1444 self._iDb.dropTable(tableName)
1445
1446 def test_createClassifTable(self):
1447 tableName = "dummyClassifTable"
1448 self._iDb.dropTable(tableName)
1449 fileName = "test.classif"
1450
1451 with open(fileName, "w") as f:
1452 f.write("RIX-incomp-chim_DmelCaf1_2_0-B-G1000-Map3\t3508\t-\tPotentialChimeric\tI\tLINE\tincomplete\tCI=36; coding=(TE_BLRtx: DMCR1A:ClassI:LINE:Jockey: 14.16%, FW3_DM:ClassI:LINE:Jockey: 15.07%; TE_BLRx: CR1-1_DWil_2p:ClassI:LINE:Jockey: 18.98%, FW2_DM-ORF1p:ClassI:LINE:Jockey: 22.36%, Jockey-1_DYa_1p:ClassI:LINE:Jockey: 11.86%); struct=(TElength: >700bps); other=(TE_BLRx: Gypsy7-I_Dmoj_1p:ClassI:LTR:Gypsy: 12.58%; HG_BLRn: FBtr0089196_Dmel_r4.3: 11.74%; SSRCoverage=0.12<0.75)\n")
1453 f.write("RLX-incomp_DmelCaf1_2_0-B-G1019-Map3\t4131\t+\tok\tI\tLTR\tincomplete\tCI=28; coding=(TE_BLRtx: ROO_I:ClassI:LTR:Bel-Pao: 43.27%, ROO_LTR:ClassI:LTR:Bel-Pao: 100.00%; TE_BLRx: BEL-6_DWil-I_2p:ClassI:LTR:Bel-Pao: 69.84%); struct=(TElength: >4000bps); other=(HG_BLRn: FBtr0087866_Dmel_r4.3: 4.72%; SSRCoverage=0.15<0.75)\n")
1454 f.write("RLX-incomp_DmelCaf1_2_0-B-G1025-Map3\t6534\t-\tok\tI\tLTR\tincomplete\tCI=28; coding=(TE_BLRtx: Gypsy2-I_Dmoj:ClassI:LTR:Gypsy: 11.82%, MDG3_DM:ClassI:LTR:Gypsy: 17.43%, STALKER2_LTR:ClassI:LTR:Gypsy: 14.62%, STALKER4_LTR:ClassI:LTR:Gypsy: 57.21%; TE_BLRx: Gypsy-16_DWil-I_1p:ClassI:LTR:Gypsy: 32.19%; profiles: PF00665.18_rve_INT_32.0: 68.64%); struct=(TElength: >4000bps); other=(HG_BLRn: FBtr0070036_Dmel_r4.3: 3.73%; TermRepeats: non-termLTR: 1701; SSRCoverage=0.14<0.75)\n")
1455
1456 self._iDb.createTable(tableName, "classif", fileName)
1457 self.assertTrue(self._iDb.doesTableExist(tableName))
1458
1459 expColumnNb = 8
1460 sqlCmd = "DESC %s;" % tableName
1461 self._iDb.execute(sqlCmd)
1462 res = self._iDb.fetchall()
1463 obsColumnNb = len(res)
1464 self.assertEquals(expColumnNb, obsColumnNb)
1465
1466 expSize = 3
1467 obsSize = self._iDb.getSize(tableName)
1468 self.assertEquals(expSize, obsSize)
1469
1470 expLIndex = ["iseq_name", "istatus", "iclass", "iorder", "icomp"]
1471 sqlCmd = "SHOW INDEX FROM %s" % tableName
1472 self._iDb.execute(sqlCmd)
1473 res = self._iDb.cursor.fetchall()
1474 obsLIndex = []
1475 for tuple in res:
1476 obsLIndex.append(tuple[2])
1477 self.assertEquals(expLIndex, obsLIndex)
1478
1479 self._iDb.dropTable(tableName)
1480 os.remove(fileName)
1481
1482 def test_createClassifIndex(self):
1483 tableName = "dummyclassifTable%s" % self._uniqId
1484 sqlCmd = "CREATE TABLE %s (seq_name varchar(255), length int unsigned, strand char, status varchar(255), class_classif varchar(255), order_classif varchar(255), completeness varchar(255), evidences text);" % tableName
1485 self._iDb.execute(sqlCmd)
1486 expLIndex = ["iseq_name", "istatus", "iclass", "iorder", "icomp"]
1487
1488 self._iDb.createIndex(tableName, "classif")
1489
1490 sqlCmd = "SHOW INDEX FROM %s" % tableName
1491 self._iDb.execute(sqlCmd)
1492 res = self._iDb.cursor.fetchall()
1493
1494 obsLIndex = []
1495 for tuple in res:
1496 obsLIndex.append(tuple[2])
1497 self.assertEquals(expLIndex, obsLIndex)
1498 self._iDb.dropTable(tableName)
1499
1500 def test_createBinPathTable(self):
1501 pathFileName = "dummy.path"
1502 with open(pathFileName, "w") as pathF:
1503 pathF.write("1\tqry\t1\t100\tsbj\t1\t100\t1e-123\t136\t98.4\n")
1504 pathF.write("2\tqry\t500\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n")
1505
1506 expPathTuple1 = (1, 1000000, "qry", 1, 100, 1)
1507 expPathTuple2 = (2, 1000000, "qry", 401, 500, 1) # change coordinates
1508 expTPathTuples = (expPathTuple1, expPathTuple2)
1509
1510 pathTableName = "dummy_path"
1511 idxTableName = "dummy_path_idx"
1512 self._iDb.createTable(pathTableName, "path", pathFileName)
1513 self._iDb.createBinPathTable(pathTableName, True)
1514
1515 sqlCmd = "SELECT * FROM %s" % idxTableName
1516 self._iDb.execute(sqlCmd)
1517 obsTPathTuples = self._iDb.fetchall()
1518
1519 self._iDb.dropTable(pathTableName)
1520 self._iDb.dropTable(idxTableName)
1521 os.remove(pathFileName)
1522
1523 self.assertEquals(expTPathTuples, obsTPathTuples)
1524
1525 def test_createBinSetTable(self):
1526 setFileName = "dummy.set"
1527 with open(setFileName, "w") as setF:
1528 setF.write("1\tseq1\tchr1\t1900\t3900\n")
1529 setF.write("2\tseq2\tchr1\t2\t9\n")
1530 setF.write("3\tseq3\tchr1\t8\t13\n")
1531
1532 expTuple = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L))
1533
1534 setTableName = "dummy_set"
1535 idxTableName = "dummy_set_idx"
1536 self._iDb.createTable(setTableName, "set", setFileName)
1537 self._iDb.createBinSetTable(setTableName, True)
1538
1539 sqlCmd = "SELECT * FROM %s" % idxTableName
1540 self._iDb.execute(sqlCmd)
1541 obsTuple = self._iDb.fetchall()
1542
1543 self._iDb.dropTable(setTableName)
1544 self._iDb.dropTable(idxTableName)
1545 os.remove(setFileName)
1546
1547 self.assertEquals(expTuple, obsTuple)
1548
1549 def _getInstanceToAdapt(self):
1550 iPath = Path()
1551 return iPath
1552
1553 if __name__ == "__main__":
1554 unittest.main()