comparison smart_toolShed/commons/core/sql/DbMySql.py @ 0:e0f8dcca02ed

Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author yufei-luo
date Thu, 17 Jan 2013 10:52:14 -0500
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:e0f8dcca02ed
1 # Copyright INRA (Institut National de la Recherche Agronomique)
2 # http://www.inra.fr
3 # http://urgi.versailles.inra.fr
4 #
5 # This software is governed by the CeCILL license under French law and
6 # abiding by the rules of distribution of free software. You can use,
7 # modify and/ or redistribute the software under the terms of the CeCILL
8 # license as circulated by CEA, CNRS and INRIA at the following URL
9 # "http://www.cecill.info".
10 #
11 # As a counterpart to the access to the source code and rights to copy,
12 # modify and redistribute granted by the license, users are provided only
13 # with a limited warranty and the software's author, the holder of the
14 # economic rights, and the successive licensors have only limited
15 # liability.
16 #
17 # In this respect, the user's attention is drawn to the risks associated
18 # with loading, using, modifying and/or developing or reproducing the
19 # software by the user in light of its specific status of free software,
20 # that may mean that it is complicated to manipulate, and that also
21 # therefore means that it is reserved for developers and experienced
22 # professionals having in-depth computer knowledge. Users are therefore
23 # encouraged to load and test the software's suitability as regards their
24 # requirements in conditions enabling the security of their systems and/or
25 # data to be ensured and, more generally, to use and operate it in the
26 # same conditions as regards security.
27 #
28 # The fact that you are presently reading this means that you have had
29 # knowledge of the CeCILL license and that you accept its terms.
30
31 # Exception hierarchy:
32 #
33 # StandardError
34 # |__Warning
35 # |__Error
36 # |__InterfaceError
37 # |__DatabaseError
38 # |__DataError
39 # |__OperationalError
40 # |__IntegrityError
41 # |__InternalError
42 # |__ProgrammingError
43 # |__NotSupportedError
44
45 import os
46 import sys
47 import time
48 import ConfigParser
49 import MySQLdb
50 from MySQLdb import InterfaceError
51 from MySQLdb import OperationalError
52 from MySQLdb import InternalError
53 from MySQLdb import DatabaseError
54 from commons.core.seq.Bioseq import Bioseq
55 from commons.core.LoggerFactory import LoggerFactory
56 from commons.core.checker.RepetException import RepetException
57 from commons.core.sql.TablePathAdaptator import TablePathAdaptator
58 from commons.core.sql.TableSetAdaptator import TableSetAdaptator
59
60 LOG_DEPTH = "repet.commons"
61
62 TABLE_SCHEMA_DESCRIPTOR = {"map": [("name", "varchar(255)"), ("chr", "varchar(255)"), ("start", "int"), ("end", "int")],
63 "set": [("path", "int unsigned"), ("name", "varchar(255)"), ("chr", "varchar(255)"), ("start", "int"), ("end", "int")],
64 "match": [("query_name", "varchar(255)"), ("query_start", "int"), ("query_end", "int"), ("query_length", "int unsigned"), ("query_length_perc", "float"),
65 ("match_length_perc", "float"), ("subject_name", "varchar(255)"), ("subject_start", "int unsigned"), ("subject_end", "int unsigned"),
66 ("subject_length", "int unsigned"), ("subject_length_perc", "float"), ("E_value", "double"), ("score", "int unsigned"), ("identity", "float"),
67 ("path", "int unsigned")],
68 "path": [("path", "int unsigned"), ("query_name", "varchar(255)"), ("query_start", "int"), ("query_end", "int"), ("subject_name", "varchar(255)"),
69 ("subject_start", "int unsigned"), ("subject_end", "int unsigned"), ("E_value", "double"), ("score", "int unsigned"), ("identity", "float")],
70 "align": [("query_name", "varchar(255)"), ("query_start", "int"), ("query_end", "int"), ("subject_name", "varchar(255)"), ("subject_start", "int unsigned"),
71 ("subject_end", "int unsigned"), ("E_value", "double"), ("score", "int unsigned"), ("identity", "float")],
72 "seq": [("accession", "varchar(255)"), ("sequence", "longtext"), ("description", "varchar(255)"), ("length", "int unsigned")],
73 "length": [("accession", "varchar(255)"), ("length", "int unsigned")],
74 "jobs": [("jobid", "int unsigned"), ("jobname", "varchar(255)"), ("groupid", "varchar(255)"), ("launcher", "varchar(1024)"),
75 ("queue", "varchar(255)"), ("resources", "varchar(255)"), ("status", "varchar(255)"), ("time", "datetime"), ("node", "varchar(255)")],
76 "classif": [("seq_name", "varchar(255)"), ("length", "int unsigned"), ("strand", "char"), ("status", "varchar(255)"), ("class_classif", "varchar(255)"),
77 ("order_classif", "varchar(255)"), ("completeness", "varchar(255)"), ("evidence", "text")],
78 "pathstat": [("family", "varchar(255)"), ("maxLength", "int"), ("meanLength", "int"), ("covg", "int"), ("frags", "int"), ("fullLgthFrags", "int"), ("copies", "int"),
79 ("fullLgthCopies", "int"), ("meanId", "varchar(255)"), ("sdId", "varchar(255)"), ("minId", "varchar(255)"), ("q25Id", "varchar(255)"), ("medId", "varchar(255)"),
80 ("q75Id", "varchar(255)"), ("maxId", "varchar(255)"), ("meanLgth", "varchar(255)"), ("sdLgth", "varchar(255)"), ("minLgth", "varchar(255)"), ("q25Lgth", "varchar(255)"),
81 ("medLgth", "varchar(255)"), ("q75Lgth", "varchar(255)"), ("maxLgth", "varchar(255)"), ("meanLgthPerc", "varchar(255)"), ("sdLgthPerc", "varchar(255)"),
82 ("minLgthPerc", "varchar(255)"), ("q25LgthPerc", "varchar(255)"), ("medLgthPerc", "varchar(255)"), ("q75LgthPerc", "varchar(255)"), ("maxLgthPerc", "varchar(255)")],
83 "info_tables":[("name", "varchar(255)"), ("file", "varchar(255)")]
84 }
85
86 TABLE_INDEX_DESCRIPTOR = {"map": [("iname", "name"), ("ichr", "chr"), ("istart", "start"), ("iend", "end"), ("icoord", "start, end")],
87 "set": [("id", "path"), ("iname", "name"), ("ichr", "chr"), ("istart", "start"), ("iend", "end"), ("icoord", "start, end")],
88 "match": [("id", "path"), ("qname", "query_name"), ("qstart", "query_start"), ("qend", "query_end"),
89 ("sname", "subject_name"), ("sstart", "subject_start"), ("send", "subject_end"), ("qcoord", "query_start, query_end")],
90 "path": [("id", "path"), ("qname", "query_name"), ("qstart", "query_start"), ("qend", "query_end"),
91 ("sname", "subject_name"), ("sstart", "subject_start"), ("send", "subject_end"), ("qcoord", "query_start, query_end")],
92 "align": [("qname", "query_name"), ("qstart", "query_start"), ("qend", "query_end"),
93 ("sname", "subject_name"), ("sstart", "subject_start"), ("send", "subject_end"), ("qcoord", "query_start, query_end")],
94 "seq": [("iacc", "accession"), ("idescr", "description")],
95 "length": [("iacc", "accession"), ("ilength", "length")],
96 "jobs": [("ijobid", "jobid"), ("ijobname", "jobname"), ("igroupid", "groupid"), ("istatus", "status")],
97 "classif": [("iseq_name", "seq_name"), ("istatus", "status"), ("iclass", "class_classif"), ("iorder", "order_classif"), ("icomp", "completeness")],
98 "pathstat": [],
99 "info_tables": []
100 }
101
102 TABLE_TYPE_SYNONYMS = {"tab": "match",
103 "fasta": "seq",
104 "fa": "seq",
105 "fsa": "seq"
106 }
107
108 ## Handle connections to MySQL tables formatted for REPET
109 #
110 class DbMySql(object):
111
112 ## Constructor
113 #
114 # @param user string db user name
115 # @param host string db host name
116 # @param passwd string db user password
117 # @param dbname string database name
118 # @param port integer database port
119 # @param cfgFileName string configuration file name
120 #
121 # @note when a parameter is left blank, the constructor is able
122 # to set attribute values from environment variables: REPET_HOST,
123 # REPET_USER, REPET_PW, REPET_DB, REPET_PORT
124 #
125 def __init__(self, user = "", host = "", passwd = "", dbname = "", port = "", cfgFileName = "", verbosity = 1):
126 self._log = LoggerFactory.createLogger("%s.%s" % (LOG_DEPTH, self.__class__.__name__), verbosity)
127 if cfgFileName != "":
128 self.setAttributesFromConfigFile(cfgFileName)
129
130 elif host != "" and user != "" and passwd != "" and dbname != "":
131 self.host = host
132 self.user = user
133 self.passwd = passwd
134 self.dbname = dbname
135
136 else:
137 for envVar in ["REPET_HOST","REPET_USER","REPET_PW","REPET_DB"]:
138 if os.environ.get( envVar ) == None:
139 msg = "ERROR: can't find environment variable '%s'" % envVar
140 self._log.error(msg)
141 raise RepetException(msg)
142 self.host = os.environ.get("REPET_HOST")
143 self.user = os.environ.get("REPET_USER")
144 self.passwd = os.environ.get("REPET_PW")
145 self.dbname = os.environ.get("REPET_DB")
146
147 if port != "" and cfgFileName == "":
148 self.port = int(port)
149 elif os.environ.get("REPET_PORT") != None:
150 self.port = int(os.environ.get("REPET_PORT"))
151 else:
152 self.port = 3306
153
154 maxNbTry = 10
155 for i in xrange(1,maxNbTry+1):
156 if not self.open():
157 time.sleep(2)
158 if i == maxNbTry:
159 msg = "ERROR: failed to connect to the MySQL database"
160 self._log.error(msg)
161 raise DatabaseError(msg)
162 else:
163 break
164
165 self.cursor = self.db.cursor()
166 self.execute("""use %s""" %(self.dbname))
167
168
169 ## Set the attributes from the configuration file
170 #
171 # @param configFileName string configuration file name
172 #
173 def setAttributesFromConfigFile(self, configFileName):
174 config = ConfigParser.ConfigParser()
175 config.readfp( open(configFileName) )
176 self.host = config.get("repet_env","repet_host")
177 self.user = config.get("repet_env","repet_user")
178 self.passwd = config.get("repet_env","repet_pw")
179 self.dbname = config.get("repet_env","repet_db")
180 self.port = int( config.get("repet_env","repet_port") )
181
182
183 ## Connect to the MySQL database
184 #
185 def open(self):
186 try:
187 if int(MySQLdb.get_client_info().split(".")[0]) >= 5:
188 self.db = MySQLdb.connect( user = self.user, host = self.host,\
189 passwd = self.passwd, db = self.dbname, \
190 port = self.port, \
191 local_infile = 1 )
192 else:
193 self.db = MySQLdb.connect( user = self.user, host = self.host,\
194 passwd = self.passwd, db = self.dbname, \
195 port = self.port )
196 except MySQLdb.Error, e:
197 msg = "ERROR %d: %s" % (e.args[0], e.args[1])
198 self._log.error(msg)
199 return False
200
201 return True
202
203
204 ## Execute a SQL query
205 #
206 # @param qry string SQL query to execute
207 # @param params parameters of SQL query
208 #
209 def execute(self, qry, params = None, nbTry = 3, sleep = 5):
210 if nbTry:
211 self._log.debug("################START SQL DEBUG MODE################")
212 self._log.debug("Current directory: %s" % os.getcwd())
213 self._log.debug("Host: %s" % self.host)
214 self._log.debug("User: %s" % self.user)
215 self._log.debug("Database: %s" % self.dbname)
216 self._log.debug("SQL command: %s" % qry)
217 self._log.debug("################STOP SQL DEBUG MODE################\n")
218
219 try:
220 if params == None:
221 self.cursor.execute(qry)
222 else:
223 self.cursor.execute(qry, params)
224 except (InterfaceError, OperationalError, InternalError) as iError:
225 self._log.error("FAILED to execute query '%s': %s. %s retries left." % (qry, iError.args[1], nbTry - 1))
226 self._log.debug("WAIT %is to execute '%s'" % (sleep, qry))
227 time.sleep(sleep)
228 try:
229 self.close()
230 except:
231 pass
232 self.open()
233 self.cursor = self.db.cursor()
234 self.execute(qry, params, nbTry - 1, sleep)
235 else:
236 msg = "ERROR: can't execute '%s' after several tries" % qry
237 self._log.error(msg)
238 raise DatabaseError(msg)
239
240 ## Close the connection
241 #
242 def close( self ):
243 self.db.close()
244
245
246 ## Retrieve the results of a SQL query
247 #
248 def fetchall(self):
249 return self.cursor.fetchall()
250
251
252 ## Test if a table exists
253 #
254 # @param table string table name
255 # @return boolean True if the table exists, False otherwise
256 #
257 def doesTableExist( self, table ):
258 self.execute( """SHOW TABLES""" )
259 results = self.cursor.fetchall()
260 if (table,) in results:
261 return True
262 return False
263
264
265 ## Remove a table if it exists
266 #
267 # @param table string table name
268 #
269 def dropTable(self, table):
270 if self.doesTableExist( table ):
271 sqlCmd = "DROP TABLE %s" % table
272 self.execute( sqlCmd )
273 sqlCmd = 'DELETE FROM info_tables WHERE name = "%s"' % table
274 self.execute( sqlCmd )
275
276
277 ## Rename a table
278 #
279 # @param table string existing table name
280 # @param newName string new table name
281 #
282 def renameTable( self, table, newName ):
283 self.dropTable( newName )
284 self.execute( 'RENAME TABLE %s TO %s ;' % (table, newName) )
285 self.execute( 'UPDATE info_tables SET name="%s" WHERE name="%s";' % (newName, table) )
286
287
288 ## Duplicate a table
289 #
290 # @param tableName string source table name
291 # @param newTableName string new table name
292 #
293 def copyTable(self, sourcetableName, newTableName):
294 self.dropTable( newTableName )
295 sqlCmd = "CREATE TABLE %s LIKE %s;" % (newTableName, sourcetableName)
296 self.execute( sqlCmd )
297 sqlCmd = "INSERT INTO %s SELECT * FROM %s;" % (newTableName, sourcetableName)
298 self.execute( sqlCmd )
299 self._log.info("copying table data,", sourcetableName, "in", newTableName)
300 self.updateInfoTable(newTableName, "")
301
302
303 ## Give the rows number of the table
304 #
305 # @param tableName string table name
306 #
307 def getSize( self, tableName ):
308 qry = "SELECT count(*) FROM %s;" % (tableName)
309 self.execute(qry)
310 res = self.fetchall()
311 return int( res[0][0] )
312
313
314 def getTableType(self, tableName):
315 qry = "SHOW COLUMNS FROM %s;" % (tableName)
316 self.execute(qry)
317 res = self.fetchall()
318
319 fieldNames = []
320 for row in res:
321 fieldNames.append(row[0])
322
323 for tableType, fieldInfos in TABLE_SCHEMA_DESCRIPTOR.items():
324 refFieldsNames = [name for name,type in fieldInfos]
325 if refFieldsNames == fieldNames:
326 return tableType
327
328 return None
329
330
331 ## Test if table is empty
332 #
333 # @param tableName string table name
334 # @return boolean True if the table is empty, False otherwise
335 #
336 def isEmpty(self, tableName):
337 return self.getSize(tableName) == 0
338
339
340 ## Record a new table in the 'info_table' table
341 #
342 # @param tableName string table name
343 # @param info string information on the table origin
344 #
345 def updateInfoTable( self, tableName, info ):
346 if not self.doesTableExist( "info_tables" ):
347 sqlCmd = "CREATE TABLE info_tables ( name varchar(255), file varchar(255) )"
348 self.execute( sqlCmd )
349 sqlCmd = 'INSERT INTO info_tables VALUES ("%s","%s")' % (tableName, info)
350 self.execute( sqlCmd )
351
352
353 ## Get a list with the fields
354 #
355 def getFieldList( self, table ):
356 lFields = []
357 sqlCmd = "DESCRIBE %s" % ( table )
358 self.execute( sqlCmd )
359 lResults = self.fetchall()
360 for res in lResults:
361 lFields.append( res[0] )
362 return lFields
363
364
365 ## Check that the input file has as many fields than it is supposed to according to its format
366 #
367 # @note fields should be separated by tab
368 #
369 def checkDataFormatting( self, dataType, fileName ):
370 dataType = dataType.lower()
371 if dataType in ["fa", "fasta", "seq", "classif", "length", "jobs", "pathstat"]:
372 return
373 dDataType2NbFields = { "map": 4, "set": 5, "align": 9, "path": 10, "match": 15, "tab": 15 }
374 fileHandler = open( fileName, "r" )
375 line = fileHandler.readline()
376 if line != "":
377 tokens = line.split("\t")
378 if len(tokens) < dDataType2NbFields[ dataType ]:
379 msg = "ERROR: '%s' file has less than %i fields" % ( dataType, dDataType2NbFields[ dataType ] )
380 self._log.error(msg)
381 raise RepetException(msg)
382 if len(tokens) > dDataType2NbFields[ dataType ]:
383 msg = "ERROR: '%s' file has more than %i fields" % ( dataType, dDataType2NbFields[ dataType ] )
384 self._log.error(msg)
385 raise RepetException(msg)
386 fileHandler.close()
387
388
389 def createIndex(self, tableName="", tableType=""):
390 sqlCmd = "SHOW INDEX FROM %s;"% (tableName)
391 self.execute(sqlCmd)
392 res = self.fetchall()
393 lIndex = []
394 for i in res:
395 lIndex.append(i[2])
396 self._log.warning("existing indexes:", lIndex)
397
398 for indexName, fieldNames in TABLE_INDEX_DESCRIPTOR.get(tableType):
399 if not indexName in lIndex:
400 sqlCmd = "CREATE INDEX %s ON %s ( %s );" % (indexName, tableName, fieldNames)
401 self.execute(sqlCmd)
402
403
404 ## Create a MySQL table of specified data type and load data
405 #
406 # @param tableName string name of the table to be created
407 # @param fileName string name of the file containing the data to be loaded in the table
408 # @param dataType string type of the data (map, set, align, path, match, seq, length, jobs)
409 # @param overwrite boolean (default = False)
410 #
411 def createTable(self, tableName, dataType, fileName = "", overwrite = False):
412 self._log.info("creating table '%s' from file '%s' of type '%s'..." % (tableName, fileName, dataType))
413
414 if fileName != "":
415 self.checkDataFormatting(dataType, fileName)
416
417 if overwrite:
418 self.dropTable(tableName)
419
420 tableType = dataType.lower()
421 if TABLE_SCHEMA_DESCRIPTOR.get(tableType,None) is None and TABLE_TYPE_SYNONYMS.get(tableType,None) is None:
422 msg = "ERROR: unknown type %s" % dataType
423 self._log.error(msg)
424 raise RepetException(msg)
425
426 tableType = TABLE_TYPE_SYNONYMS.get(tableType,tableType)
427
428 fields = [" ".join(fieldDescription) for fieldDescription in TABLE_SCHEMA_DESCRIPTOR.get(tableType)]
429 sqlCmd = "CREATE TABLE %s (%s)" % (tableName, ",".join(fields))
430 self.execute(sqlCmd)
431 self.createIndex(tableName, tableType)
432
433 tmpFileName = ""
434 if fileName:
435 if tableType == "seq":
436 tmpFileName = "%s.seq" % os.path.basename(fileName)
437 self._convertFastaToSeq(fileName, tmpFileName)
438 fileName = tmpFileName
439 elif tableType == "length":
440 tmpFileName = "%s.length" % os.path.basename(fileName)
441 self._convertFastaToLength(fileName, tmpFileName)
442 fileName = tmpFileName
443
444 hasHeaderLine = tableType == "match" or tableType == "pathstat"
445 self.loadDataFromFile(tableName, fileName, hasHeaderLine)
446 if tmpFileName:
447 os.remove(tmpFileName)
448
449 if tableType == "path":
450 self.changePathQueryCoordinatesToDirectStrand( tableName )
451
452 self.updateInfoTable(tableName, fileName)
453 self._log.info("creating table '%s' done!" % tableName)
454
455
456 ## Create a bin table for fast access
457 #
458 # @param pathTableName string path table name (input table)
459 # @param idxTableName string bin path table name (output table)
460 # @param overwrite boolean default = False
461 #
462 def createBinPathTable(self, pathTableName, overwrite = False):
463 idxTableName = "%s_idx" % pathTableName # is an attribute in TableBinPathAdaptator
464 if not self.doesTableExist(pathTableName):
465 msg = "ERROR: '%s' doesn't exist => '%s' can't be created" % (pathTableName, idxTableName)
466 self._log.error(msg)
467 raise RepetException(msg)
468 self._log.info("creating %s for fast access" % idxTableName)
469 if overwrite:
470 self.dropTable(idxTableName)
471
472 sqlCmd = "CREATE TABLE %s ( path int unsigned, idx int unsigned, contig varchar(255), min int, max int, strand int unsigned)" % idxTableName
473 self.execute(sqlCmd)
474
475 sqlCmd = "CREATE INDEX id ON %s ( path );" % idxTableName
476 self.execute(sqlCmd)
477 sqlCmd = "CREATE INDEX ibin ON %s ( idx );" % idxTableName
478 self.execute(sqlCmd)
479 sqlCmd = "CREATE INDEX icontig ON %s ( contig );" % idxTableName
480 self.execute(sqlCmd)
481 sqlCmd = "CREATE INDEX imin ON %s ( min );" % idxTableName
482 self.execute(sqlCmd)
483 sqlCmd = "CREATE INDEX imax ON %s ( max );" % idxTableName
484 self.execute(sqlCmd)
485 sqlCmd = "CREATE INDEX istrand ON %s ( strand );" % idxTableName
486 self.execute(sqlCmd)
487
488 tmpTableName = "%s_tmp" % pathTableName
489 self._createPathTableAndGroupByIdAndOrderByStrand(pathTableName, tmpTableName)
490 iTPA = TablePathAdaptator(self, tmpTableName)
491 if not self.isEmpty(tmpTableName):
492 tmpFileName = "%s.tmp%s" % (pathTableName, str(os.getpid()))
493 with open(tmpFileName, "w") as f:
494 lQueryNames = iTPA.getQueryList()
495 for queryName in lQueryNames:
496 lPaths = iTPA.getPathListFromQuery(queryName)
497 for i in lPaths:
498 idx = i.range_query.findIdx()
499 max = i.range_query.getMax()
500 min = i.range_query.getMin()
501 strand = i.range_query.isOnDirectStrand()
502 f.write("%d\t%d\t%s\t%d\t%d\t%d\n"%(i.id, idx, i.range_query.seqname, min, max, strand))
503 sqlCmd="LOAD DATA LOCAL INFILE '%s' INTO TABLE %s FIELDS ESCAPED BY '' " % (tmpFileName, idxTableName)
504 self.execute(sqlCmd)
505 self.updateInfoTable(idxTableName, "%s bin indexes" % pathTableName)
506 os.remove(tmpFileName)
507 self.dropTable(tmpTableName)
508
509
510 ## This table summarize the Path list information according to the identifier numbers. The min and max value is taken
511 #
512 def _createPathTableAndGroupByIdAndOrderByStrand(self, pathTableName, outTableName):
513 self.dropTable(outTableName)
514
515 sqlcmd="CREATE TABLE %s SELECT path, query_name, min(query_start) AS query_start, max(query_end) AS query_end, subject_name, min(subject_start) AS subject_start, max(subject_end) AS subject_end, min(e_value) AS e_value, sum(score) AS score, avg(identity) AS identity FROM %s WHERE query_start<query_end and subject_start<subject_end group by path;" % (outTableName, pathTableName)
516 self.execute(sqlcmd)
517
518 sqlcmd="INSERT INTO %s SELECT path, query_name, min(query_start) AS query_start, max(query_end) AS query_end, subject_name, max(subject_start) AS subject_start, min(subject_end) AS subject_end, min(e_value) AS e_value, sum(score) AS score, avg(identity) AS identity FROM %s WHERE query_start<query_end and subject_start>subject_end group by path;" % (outTableName, pathTableName)
519 self.execute(sqlcmd)
520
521 sqlcmd="INSERT INTO %s SELECT path, query_name, max(query_start) AS query_start, min(query_end) AS query_end, subject_name, min(subject_start) AS subject_start, max(subject_end) AS subject_end, min(e_value) AS e_value, sum(score) AS score, avg(identity) AS identity FROM %s WHERE query_start>query_end and subject_start<subject_end group by path;" % (outTableName, pathTableName)
522 self.execute(sqlcmd)
523
524 sqlcmd="INSERT INTO %s SELECT path, query_name, max(query_start) AS query_start, min(query_end) AS query_end, subject_name, max(subject_start) AS subject_start, min(subject_end) AS subject_end, min(e_value) AS e_value, sum(score) AS score, avg(identity) AS identity FROM %s WHERE query_start>query_end and subject_start>subject_end group by path;" % (outTableName, pathTableName)
525 self.execute(sqlcmd)
526
527 self.createIndex(outTableName, "path")
528
529
530 ## Create a bin table for fast access
531 #
532 # @param setTableName string set table name (input table)
533 # @param idxTableName string bin set table name (output table)
534 # @param overwrite boolean default = False
535 #
536 def createBinSetTable(self, setTableName, overwrite = False):
537 idxTableName = "%s_idx" % setTableName # is an attribute in TableBinSetAdaptator
538 if not self.doesTableExist(setTableName):
539 msg = "ERROR: '%s' doesn't exist => '%s' can't be created" % (setTableName, idxTableName)
540 self._log.error(msg)
541 raise RepetException(msg)
542 self._log.info("creating %s for fast access" % idxTableName)
543 if overwrite:
544 self.dropTable(idxTableName)
545
546 sqlCmd = "CREATE TABLE %s ( path int unsigned, bin float, contig varchar(255), min int, max int, strand int unsigned)" % idxTableName
547 self.execute(sqlCmd)
548
549 sqlCmd = "CREATE INDEX id ON %s ( path );" % idxTableName
550 self.execute(sqlCmd)
551 sqlCmd = "CREATE INDEX ibin ON %s ( bin );" % idxTableName
552 self.execute(sqlCmd)
553 sqlCmd = "CREATE INDEX icontig ON %s ( contig );" % idxTableName
554 self.execute(sqlCmd)
555 sqlCmd = "CREATE INDEX imin ON %s ( min );" % idxTableName
556 self.execute(sqlCmd)
557 sqlCmd = "CREATE INDEX imax ON %s ( max );" % idxTableName
558 self.execute(sqlCmd)
559 sqlCmd = "CREATE INDEX istrand ON %s ( strand );" % idxTableName
560 self.execute(sqlCmd)
561
562 tmpTableName = "%s_tmp" % setTableName
563 self._createSetTableAndGroupByIdAndOrderByStrand(setTableName, tmpTableName)
564 iTSA = TableSetAdaptator(self, tmpTableName)
565 if not self.isEmpty(tmpTableName):
566 tmpFileName = "%s.tmp%s" % (setTableName, str(os.getpid()))
567 with open(tmpFileName, "w") as f:
568 lSeqNames = iTSA.getSeqNameList()
569 for seqName in lSeqNames:
570 lSets = iTSA.getSetListFromSeqName(seqName)
571 for i in lSets:
572 bin = i.getBin()
573 max = i.getMax()
574 min = i.getMin()
575 strand = i.isOnDirectStrand()
576 f.write("%d\t%f\t%s\t%d\t%d\t%d\n"%(i.id, bin, i.seqname, min, max, strand))
577 sqlCmd="LOAD DATA LOCAL INFILE '%s' INTO TABLE %s FIELDS ESCAPED BY '' " % (tmpFileName, idxTableName)
578 self.execute(sqlCmd)
579 self.updateInfoTable(idxTableName, "%s bin indexes" % setTableName)
580 os.remove(tmpFileName)
581 self.dropTable(tmpTableName)
582
583
584 ## This table summarize the Set list information according to the identifier numbers. The min and max value is taken
585 #
586 def _createSetTableAndGroupByIdAndOrderByStrand(self, setTableName, outTableName):
587 self.dropTable(outTableName)
588
589 sqlcmd="CREATE TABLE %s SELECT path, name, chr, min(start) AS start, max(end) AS end FROM %s WHERE start<end group by path;" % (outTableName, setTableName)
590 self.execute(sqlcmd)
591
592 sqlcmd="INSERT INTO %s SELECT path, name, chr, max(start) AS start, min(end) AS end FROM %s WHERE start>end group by path;" % (outTableName, setTableName)
593 self.execute(sqlcmd)
594
595 self.createIndex(outTableName, "set")
596
597
598 ## Load data from a file into a MySQL table
599 #
600 # @param tableName string table name
601 # @param fileName string file name
602 # @param escapeFirstLine boolean True to ignore the first line of file, False otherwise
603 #
604 def loadDataFromFile(self, tableName, fileName, escapeFirstLine = False):
605 if fileName != "":
606 sqlCmd = "LOAD DATA LOCAL INFILE '%s' INTO TABLE %s FIELDS ESCAPED BY '' " % ( fileName, tableName )
607 if escapeFirstLine == True:
608 sqlCmd = "%s IGNORE 1 LINES" %(sqlCmd)
609 self.execute( sqlCmd )
610
611 self._log.info("%i entries in the table %s" % (self.getSize(tableName), tableName))
612
613 ######################################################################################
614 #TODO: remove duplication with same methods in fastautils
615 ## Convert a fasta file to a length file
616 #
617 # @param inFile string name of the input fasta file
618 # @param outFile string name of the output file
619 #
620 def _convertFastaToLength(self, inFile, outFile = ""):
621 if outFile == "":
622 outFile = "%s.length" % inFile
623
624 if inFile != "":
625 with open(inFile, "r") as inFH:
626 with open(outFile, "w") as outFH:
627 bioseq = Bioseq()
628 while True:
629 bioseq.read(inFH)
630 if bioseq.sequence == None:
631 break
632 seqLen = bioseq.getLength()
633 outFH.write("%s\t%d\n" % (bioseq.header.split()[0], seqLen))
634
635
636 ## Convert a fasta file to a seq file
637 #
638 # @param inFile string name of the input fasta file
639 # @param outFile string name of the output file
640 #
641 def _convertFastaToSeq(self, inFile, outFile = ""):
642 if outFile == "":
643 outFile = "%s.seq" % inFile
644
645 if inFile != "":
646 with open(inFile, "r") as inFH:
647 with open(outFile, "w") as outFH:
648 bioseq = Bioseq()
649 while True:
650 bioseq.read(inFH)
651 if bioseq.sequence == None:
652 break
653 seqLen = bioseq.getLength()
654 outFH.write("%s\t%s\t%s\t%d\n" % (bioseq.header.split()[0], \
655 bioseq.sequence, bioseq.header, seqLen))
656
657 ######################################################################################
658
659 ## Change the coordinates such that the query is on the direct strand.
660 #
661 # @param inTable string path table name to update
662 #
663 def changePathQueryCoordinatesToDirectStrand( self, inTable ):
664 sqlCmd = "ALTER TABLE %s ADD COLUMN tmpid INT NOT NULL AUTO_INCREMENT PRIMARY KEY" % ( inTable )
665 self.execute( sqlCmd )
666
667 tmpTable = "%s_tmp" % ( inTable )
668 sqlCmd = "CREATE TABLE %s SELECT * FROM %s WHERE query_start > query_end" % ( tmpTable, inTable )
669 self.execute( sqlCmd )
670
671 sqlCmd = "UPDATE %s, %s" % ( inTable, tmpTable )
672 sqlCmd += " SET %s.query_start=%s.query_end," % ( inTable, tmpTable )
673 sqlCmd += " %s.query_end=%s.query_start," % ( inTable, tmpTable )
674 sqlCmd += " %s.subject_start=%s.subject_end," % ( inTable, tmpTable )
675 sqlCmd += " %s.subject_end=%s.subject_start" % ( inTable, tmpTable )
676 sqlCmd += " WHERE %s.tmpid=%s.tmpid" % ( inTable, tmpTable )
677 self.execute( sqlCmd )
678
679 sqlCmd = "ALTER TABLE %s DROP COLUMN tmpid" % ( inTable )
680 self.execute( sqlCmd )
681 self.dropTable( tmpTable )
682
683
684 ## Export data from a table in a file.
685 #
686 # @param tableName string table name
687 # @param outFileName string output file name
688 # @param keepFirstLine boolean if you want the first line (column name) in output file
689 # @param param string sql parameters to select data expected
690 #
691 def exportDataToFile( self, tableName, outFileName="", keepFirstLine=False, param="" ):
692 if outFileName == "": outFileName = tableName
693 prg = "mysql"
694 cmd = prg
695 cmd += " -h %s" % ( self.host )
696 cmd += " -u %s" % ( self.user )
697 cmd += " -p\"%s\"" % ( self.passwd )
698 cmd += " --database=%s" % ( self.dbname )
699 cmd += " -e\"SELECT * FROM %s" % ( tableName )
700 if param != "": cmd += " %s" % ( param )
701 cmd += ";\""
702 cmd += " > "
703 if keepFirstLine == False:
704 cmd += "%s.tmp" % ( outFileName )
705 else:
706 cmd += "%s" % ( outFileName )
707 log = os.system( cmd )
708 if log != 0: print "ERROR: mysql returned %i" % ( log ); sys.exit(1)
709
710 if keepFirstLine == False:
711 tmpFileName = "%s.tmp" % ( outFileName )
712 tmpFile = open( tmpFileName, "r" )
713 outFile = open( outFileName, "w" )
714 i = 0
715 for line in tmpFile:
716 if i > 0:
717 outFile.write( line )
718 i += 1
719 tmpFile.close()
720 outFile.close()
721 os.remove( tmpFileName )
722
723
724 ## Convert a Path table into an Align table
725 #
726 # @param inPathTable string name of the input Path table
727 # @param outAlignTable string name of the output Align table
728 #
729 def convertPathTableIntoAlignTable( self, inPathTable, outAlignTable ):
730 sqlCmd = "CREATE TABLE %s SELECT query_name,query_start,query_end,subject_name,subject_start,subject_end,E_value,score,identity FROM %s;" % ( outAlignTable, inPathTable )
731 self.execute( sqlCmd )
732 self.updateInfoTable( outAlignTable, "" )
733
734
735 ## Create a set table from a map table
736 #
737 # @param mapTableName string map table name
738 # @param setTableName string new set table name
739 #
740 def convertMapTableIntoSetTable( self, mapTableName, setTableName ):
741 sqlCmd = "CREATE TABLE %s (path int(10) unsigned auto_increment primary key) select name, chr, start, end from %s;" % (setTableName, mapTableName)
742 self.execute(sqlCmd)
743 self.createIndex(setTableName, "set")
744
745
746 ## Convert an Align table into a Path table
747 #
748 # @param inAlignTable string name of the input Align table
749 # @param outPathTable string name of the output Path table
750 #
751 def convertAlignTableIntoPathTable( self, inAlignTable, outPathTable ):
752 self.createTable( outPathTable, "path", "", True )
753 sqlCmd = "SELECT * FROM %s" % ( inAlignTable )
754 self.execute( sqlCmd )
755 lResults = self.fetchall()
756 rowIndex = 0
757 for res in lResults:
758 rowIndex += 1
759 sqlCmd = "INSERT INTO %s" % ( outPathTable )
760 sqlCmd += " (path,query_name,query_start,query_end,subject_name,subject_start,subject_end,E_value,score,identity)"
761 sqlCmd += " VALUES ( '%i'" % ( rowIndex )
762 for i in res:
763 sqlCmd += ', "%s"' % ( i )
764 sqlCmd += " )"
765 self.execute( sqlCmd )
766 self.updateInfoTable( outPathTable, "" )
767
768
769 ## Give a list of instances according to the SQL command
770 #
771 # @param SQLCmd string is a SQL command
772 # @param methodGetInstance2Adapt a getter method name. With this method you choose the type of intances contained in lObjs. See example in Test_DbMySql.py.
773 # @return lObjs list of instances
774 #
775 def getObjectListWithSQLCmd( self, SQLCmd, methodGetInstance2Adapt):
776 self.execute( SQLCmd )
777 res = self.fetchall()
778 lObjs = []
779 for t in res:
780 iObj = methodGetInstance2Adapt()
781 iObj.setFromTuple( t )
782 lObjs.append( iObj )
783 return lObjs
784
785
786 ## Give a list of integer according to the SQL command
787 #
788 # @param sqlCmd string is a SQL command
789 # @return lInteger integer list
790 #
791 def getIntegerListWithSQLCmd( self, sqlCmd ):
792 self.execute(sqlCmd)
793 res = self.fetchall()
794 lInteger = []
795 for t in res:
796 if t[0] != None:
797 lInteger.append(int(t[0]))
798 return lInteger
799
800
801 ## Give a int according to the SQL command
802 #
803 # @param sqlCmd string is a SQL command
804 # @return nb integer
805 #
806 def getIntegerWithSQLCmd( self, sqlCmd ):
807 self.execute(sqlCmd)
808 res = self.fetchall()
809 nb = res[0][0]
810 if nb == None:
811 nb = 0
812 return nb
813
814
815 ## Give a list of str according to the SQL command
816 #
817 # @param sqlCmd string is a SQL command
818 # @return lString str list
819 #
820 def getStringListWithSQLCmd( self, sqlCmd ):
821 self.execute(sqlCmd)
822 res = self.fetchall()
823 lString = []
824 for i in res:
825 lString.append(i[0])
826 return lString
827
828 #TODO: use API to add indexes
829 ## Remove doublons in a given table
830 #
831 # @param table string name of a MySQL table
832 #
833 def removeDoublons( self, table ):
834 tmpTable = "%s_%s" % ( table, time.strftime("%Y%m%d%H%M%S") )
835 sqlCmd = "CREATE TABLE %s SELECT DISTINCT * FROM %s" % ( tmpTable, table )
836 self.execute( sqlCmd )
837 self.dropTable( table )
838 self.renameTable(tmpTable, table)
839
840
841 ## Get a list of table names from a pattern
842 #
843 # @note for instance pattern = 'MyProject_%'
844 #
845 def getTableListFromPattern( self, pattern ):
846 if pattern == "*" or pattern == "%":
847 sqlCmd = "SHOW TABLES"
848 else:
849 sqlCmd = "SHOW TABLES like '%s'" % ( pattern )
850 lTables = self.getStringListWithSQLCmd( sqlCmd )
851 return lTables