Mercurial > repos > yufei-luo > s_mart
diff smart_toolShed/commons/core/sql/DbMySql.py @ 0:e0f8dcca02ed
Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author | yufei-luo |
---|---|
date | Thu, 17 Jan 2013 10:52:14 -0500 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/smart_toolShed/commons/core/sql/DbMySql.py Thu Jan 17 10:52:14 2013 -0500 @@ -0,0 +1,851 @@ +# Copyright INRA (Institut National de la Recherche Agronomique) +# http://www.inra.fr +# http://urgi.versailles.inra.fr +# +# This software is governed by the CeCILL license under French law and +# abiding by the rules of distribution of free software. You can use, +# modify and/ or redistribute the software under the terms of the CeCILL +# license as circulated by CEA, CNRS and INRIA at the following URL +# "http://www.cecill.info". +# +# As a counterpart to the access to the source code and rights to copy, +# modify and redistribute granted by the license, users are provided only +# with a limited warranty and the software's author, the holder of the +# economic rights, and the successive licensors have only limited +# liability. +# +# In this respect, the user's attention is drawn to the risks associated +# with loading, using, modifying and/or developing or reproducing the +# software by the user in light of its specific status of free software, +# that may mean that it is complicated to manipulate, and that also +# therefore means that it is reserved for developers and experienced +# professionals having in-depth computer knowledge. Users are therefore +# encouraged to load and test the software's suitability as regards their +# requirements in conditions enabling the security of their systems and/or +# data to be ensured and, more generally, to use and operate it in the +# same conditions as regards security. +# +# The fact that you are presently reading this means that you have had +# knowledge of the CeCILL license and that you accept its terms. + +# Exception hierarchy: +# +# StandardError +# |__Warning +# |__Error +# |__InterfaceError +# |__DatabaseError +# |__DataError +# |__OperationalError +# |__IntegrityError +# |__InternalError +# |__ProgrammingError +# |__NotSupportedError + +import os +import sys +import time +import ConfigParser +import MySQLdb +from MySQLdb import InterfaceError +from MySQLdb import OperationalError +from MySQLdb import InternalError +from MySQLdb import DatabaseError +from commons.core.seq.Bioseq import Bioseq +from commons.core.LoggerFactory import LoggerFactory +from commons.core.checker.RepetException import RepetException +from commons.core.sql.TablePathAdaptator import TablePathAdaptator +from commons.core.sql.TableSetAdaptator import TableSetAdaptator + +LOG_DEPTH = "repet.commons" + +TABLE_SCHEMA_DESCRIPTOR = {"map": [("name", "varchar(255)"), ("chr", "varchar(255)"), ("start", "int"), ("end", "int")], + "set": [("path", "int unsigned"), ("name", "varchar(255)"), ("chr", "varchar(255)"), ("start", "int"), ("end", "int")], + "match": [("query_name", "varchar(255)"), ("query_start", "int"), ("query_end", "int"), ("query_length", "int unsigned"), ("query_length_perc", "float"), + ("match_length_perc", "float"), ("subject_name", "varchar(255)"), ("subject_start", "int unsigned"), ("subject_end", "int unsigned"), + ("subject_length", "int unsigned"), ("subject_length_perc", "float"), ("E_value", "double"), ("score", "int unsigned"), ("identity", "float"), + ("path", "int unsigned")], + "path": [("path", "int unsigned"), ("query_name", "varchar(255)"), ("query_start", "int"), ("query_end", "int"), ("subject_name", "varchar(255)"), + ("subject_start", "int unsigned"), ("subject_end", "int unsigned"), ("E_value", "double"), ("score", "int unsigned"), ("identity", "float")], + "align": [("query_name", "varchar(255)"), ("query_start", "int"), ("query_end", "int"), ("subject_name", "varchar(255)"), ("subject_start", "int unsigned"), + ("subject_end", "int unsigned"), ("E_value", "double"), ("score", "int unsigned"), ("identity", "float")], + "seq": [("accession", "varchar(255)"), ("sequence", "longtext"), ("description", "varchar(255)"), ("length", "int unsigned")], + "length": [("accession", "varchar(255)"), ("length", "int unsigned")], + "jobs": [("jobid", "int unsigned"), ("jobname", "varchar(255)"), ("groupid", "varchar(255)"), ("launcher", "varchar(1024)"), + ("queue", "varchar(255)"), ("resources", "varchar(255)"), ("status", "varchar(255)"), ("time", "datetime"), ("node", "varchar(255)")], + "classif": [("seq_name", "varchar(255)"), ("length", "int unsigned"), ("strand", "char"), ("status", "varchar(255)"), ("class_classif", "varchar(255)"), + ("order_classif", "varchar(255)"), ("completeness", "varchar(255)"), ("evidence", "text")], + "pathstat": [("family", "varchar(255)"), ("maxLength", "int"), ("meanLength", "int"), ("covg", "int"), ("frags", "int"), ("fullLgthFrags", "int"), ("copies", "int"), + ("fullLgthCopies", "int"), ("meanId", "varchar(255)"), ("sdId", "varchar(255)"), ("minId", "varchar(255)"), ("q25Id", "varchar(255)"), ("medId", "varchar(255)"), + ("q75Id", "varchar(255)"), ("maxId", "varchar(255)"), ("meanLgth", "varchar(255)"), ("sdLgth", "varchar(255)"), ("minLgth", "varchar(255)"), ("q25Lgth", "varchar(255)"), + ("medLgth", "varchar(255)"), ("q75Lgth", "varchar(255)"), ("maxLgth", "varchar(255)"), ("meanLgthPerc", "varchar(255)"), ("sdLgthPerc", "varchar(255)"), + ("minLgthPerc", "varchar(255)"), ("q25LgthPerc", "varchar(255)"), ("medLgthPerc", "varchar(255)"), ("q75LgthPerc", "varchar(255)"), ("maxLgthPerc", "varchar(255)")], + "info_tables":[("name", "varchar(255)"), ("file", "varchar(255)")] + } + +TABLE_INDEX_DESCRIPTOR = {"map": [("iname", "name"), ("ichr", "chr"), ("istart", "start"), ("iend", "end"), ("icoord", "start, end")], + "set": [("id", "path"), ("iname", "name"), ("ichr", "chr"), ("istart", "start"), ("iend", "end"), ("icoord", "start, end")], + "match": [("id", "path"), ("qname", "query_name"), ("qstart", "query_start"), ("qend", "query_end"), + ("sname", "subject_name"), ("sstart", "subject_start"), ("send", "subject_end"), ("qcoord", "query_start, query_end")], + "path": [("id", "path"), ("qname", "query_name"), ("qstart", "query_start"), ("qend", "query_end"), + ("sname", "subject_name"), ("sstart", "subject_start"), ("send", "subject_end"), ("qcoord", "query_start, query_end")], + "align": [("qname", "query_name"), ("qstart", "query_start"), ("qend", "query_end"), + ("sname", "subject_name"), ("sstart", "subject_start"), ("send", "subject_end"), ("qcoord", "query_start, query_end")], + "seq": [("iacc", "accession"), ("idescr", "description")], + "length": [("iacc", "accession"), ("ilength", "length")], + "jobs": [("ijobid", "jobid"), ("ijobname", "jobname"), ("igroupid", "groupid"), ("istatus", "status")], + "classif": [("iseq_name", "seq_name"), ("istatus", "status"), ("iclass", "class_classif"), ("iorder", "order_classif"), ("icomp", "completeness")], + "pathstat": [], + "info_tables": [] + } + +TABLE_TYPE_SYNONYMS = {"tab": "match", + "fasta": "seq", + "fa": "seq", + "fsa": "seq" + } + +## Handle connections to MySQL tables formatted for REPET +# +class DbMySql(object): + + ## Constructor + # + # @param user string db user name + # @param host string db host name + # @param passwd string db user password + # @param dbname string database name + # @param port integer database port + # @param cfgFileName string configuration file name + # + # @note when a parameter is left blank, the constructor is able + # to set attribute values from environment variables: REPET_HOST, + # REPET_USER, REPET_PW, REPET_DB, REPET_PORT + # + def __init__(self, user = "", host = "", passwd = "", dbname = "", port = "", cfgFileName = "", verbosity = 1): + self._log = LoggerFactory.createLogger("%s.%s" % (LOG_DEPTH, self.__class__.__name__), verbosity) + if cfgFileName != "": + self.setAttributesFromConfigFile(cfgFileName) + + elif host != "" and user != "" and passwd != "" and dbname != "": + self.host = host + self.user = user + self.passwd = passwd + self.dbname = dbname + + else: + for envVar in ["REPET_HOST","REPET_USER","REPET_PW","REPET_DB"]: + if os.environ.get( envVar ) == None: + msg = "ERROR: can't find environment variable '%s'" % envVar + self._log.error(msg) + raise RepetException(msg) + self.host = os.environ.get("REPET_HOST") + self.user = os.environ.get("REPET_USER") + self.passwd = os.environ.get("REPET_PW") + self.dbname = os.environ.get("REPET_DB") + + if port != "" and cfgFileName == "": + self.port = int(port) + elif os.environ.get("REPET_PORT") != None: + self.port = int(os.environ.get("REPET_PORT")) + else: + self.port = 3306 + + maxNbTry = 10 + for i in xrange(1,maxNbTry+1): + if not self.open(): + time.sleep(2) + if i == maxNbTry: + msg = "ERROR: failed to connect to the MySQL database" + self._log.error(msg) + raise DatabaseError(msg) + else: + break + + self.cursor = self.db.cursor() + self.execute("""use %s""" %(self.dbname)) + + + ## Set the attributes from the configuration file + # + # @param configFileName string configuration file name + # + def setAttributesFromConfigFile(self, configFileName): + config = ConfigParser.ConfigParser() + config.readfp( open(configFileName) ) + self.host = config.get("repet_env","repet_host") + self.user = config.get("repet_env","repet_user") + self.passwd = config.get("repet_env","repet_pw") + self.dbname = config.get("repet_env","repet_db") + self.port = int( config.get("repet_env","repet_port") ) + + + ## Connect to the MySQL database + # + def open(self): + try: + if int(MySQLdb.get_client_info().split(".")[0]) >= 5: + self.db = MySQLdb.connect( user = self.user, host = self.host,\ + passwd = self.passwd, db = self.dbname, \ + port = self.port, \ + local_infile = 1 ) + else: + self.db = MySQLdb.connect( user = self.user, host = self.host,\ + passwd = self.passwd, db = self.dbname, \ + port = self.port ) + except MySQLdb.Error, e: + msg = "ERROR %d: %s" % (e.args[0], e.args[1]) + self._log.error(msg) + return False + + return True + + + ## Execute a SQL query + # + # @param qry string SQL query to execute + # @param params parameters of SQL query + # + def execute(self, qry, params = None, nbTry = 3, sleep = 5): + if nbTry: + self._log.debug("################START SQL DEBUG MODE################") + self._log.debug("Current directory: %s" % os.getcwd()) + self._log.debug("Host: %s" % self.host) + self._log.debug("User: %s" % self.user) + self._log.debug("Database: %s" % self.dbname) + self._log.debug("SQL command: %s" % qry) + self._log.debug("################STOP SQL DEBUG MODE################\n") + + try: + if params == None: + self.cursor.execute(qry) + else: + self.cursor.execute(qry, params) + except (InterfaceError, OperationalError, InternalError) as iError: + self._log.error("FAILED to execute query '%s': %s. %s retries left." % (qry, iError.args[1], nbTry - 1)) + self._log.debug("WAIT %is to execute '%s'" % (sleep, qry)) + time.sleep(sleep) + try: + self.close() + except: + pass + self.open() + self.cursor = self.db.cursor() + self.execute(qry, params, nbTry - 1, sleep) + else: + msg = "ERROR: can't execute '%s' after several tries" % qry + self._log.error(msg) + raise DatabaseError(msg) + + ## Close the connection + # + def close( self ): + self.db.close() + + + ## Retrieve the results of a SQL query + # + def fetchall(self): + return self.cursor.fetchall() + + + ## Test if a table exists + # + # @param table string table name + # @return boolean True if the table exists, False otherwise + # + def doesTableExist( self, table ): + self.execute( """SHOW TABLES""" ) + results = self.cursor.fetchall() + if (table,) in results: + return True + return False + + + ## Remove a table if it exists + # + # @param table string table name + # + def dropTable(self, table): + if self.doesTableExist( table ): + sqlCmd = "DROP TABLE %s" % table + self.execute( sqlCmd ) + sqlCmd = 'DELETE FROM info_tables WHERE name = "%s"' % table + self.execute( sqlCmd ) + + + ## Rename a table + # + # @param table string existing table name + # @param newName string new table name + # + def renameTable( self, table, newName ): + self.dropTable( newName ) + self.execute( 'RENAME TABLE %s TO %s ;' % (table, newName) ) + self.execute( 'UPDATE info_tables SET name="%s" WHERE name="%s";' % (newName, table) ) + + + ## Duplicate a table + # + # @param tableName string source table name + # @param newTableName string new table name + # + def copyTable(self, sourcetableName, newTableName): + self.dropTable( newTableName ) + sqlCmd = "CREATE TABLE %s LIKE %s;" % (newTableName, sourcetableName) + self.execute( sqlCmd ) + sqlCmd = "INSERT INTO %s SELECT * FROM %s;" % (newTableName, sourcetableName) + self.execute( sqlCmd ) + self._log.info("copying table data,", sourcetableName, "in", newTableName) + self.updateInfoTable(newTableName, "") + + + ## Give the rows number of the table + # + # @param tableName string table name + # + def getSize( self, tableName ): + qry = "SELECT count(*) FROM %s;" % (tableName) + self.execute(qry) + res = self.fetchall() + return int( res[0][0] ) + + + def getTableType(self, tableName): + qry = "SHOW COLUMNS FROM %s;" % (tableName) + self.execute(qry) + res = self.fetchall() + + fieldNames = [] + for row in res: + fieldNames.append(row[0]) + + for tableType, fieldInfos in TABLE_SCHEMA_DESCRIPTOR.items(): + refFieldsNames = [name for name,type in fieldInfos] + if refFieldsNames == fieldNames: + return tableType + + return None + + + ## Test if table is empty + # + # @param tableName string table name + # @return boolean True if the table is empty, False otherwise + # + def isEmpty(self, tableName): + return self.getSize(tableName) == 0 + + + ## Record a new table in the 'info_table' table + # + # @param tableName string table name + # @param info string information on the table origin + # + def updateInfoTable( self, tableName, info ): + if not self.doesTableExist( "info_tables" ): + sqlCmd = "CREATE TABLE info_tables ( name varchar(255), file varchar(255) )" + self.execute( sqlCmd ) + sqlCmd = 'INSERT INTO info_tables VALUES ("%s","%s")' % (tableName, info) + self.execute( sqlCmd ) + + + ## Get a list with the fields + # + def getFieldList( self, table ): + lFields = [] + sqlCmd = "DESCRIBE %s" % ( table ) + self.execute( sqlCmd ) + lResults = self.fetchall() + for res in lResults: + lFields.append( res[0] ) + return lFields + + + ## Check that the input file has as many fields than it is supposed to according to its format + # + # @note fields should be separated by tab + # + def checkDataFormatting( self, dataType, fileName ): + dataType = dataType.lower() + if dataType in ["fa", "fasta", "seq", "classif", "length", "jobs", "pathstat"]: + return + dDataType2NbFields = { "map": 4, "set": 5, "align": 9, "path": 10, "match": 15, "tab": 15 } + fileHandler = open( fileName, "r" ) + line = fileHandler.readline() + if line != "": + tokens = line.split("\t") + if len(tokens) < dDataType2NbFields[ dataType ]: + msg = "ERROR: '%s' file has less than %i fields" % ( dataType, dDataType2NbFields[ dataType ] ) + self._log.error(msg) + raise RepetException(msg) + if len(tokens) > dDataType2NbFields[ dataType ]: + msg = "ERROR: '%s' file has more than %i fields" % ( dataType, dDataType2NbFields[ dataType ] ) + self._log.error(msg) + raise RepetException(msg) + fileHandler.close() + + + def createIndex(self, tableName="", tableType=""): + sqlCmd = "SHOW INDEX FROM %s;"% (tableName) + self.execute(sqlCmd) + res = self.fetchall() + lIndex = [] + for i in res: + lIndex.append(i[2]) + self._log.warning("existing indexes:", lIndex) + + for indexName, fieldNames in TABLE_INDEX_DESCRIPTOR.get(tableType): + if not indexName in lIndex: + sqlCmd = "CREATE INDEX %s ON %s ( %s );" % (indexName, tableName, fieldNames) + self.execute(sqlCmd) + + + ## Create a MySQL table of specified data type and load data + # + # @param tableName string name of the table to be created + # @param fileName string name of the file containing the data to be loaded in the table + # @param dataType string type of the data (map, set, align, path, match, seq, length, jobs) + # @param overwrite boolean (default = False) + # + def createTable(self, tableName, dataType, fileName = "", overwrite = False): + self._log.info("creating table '%s' from file '%s' of type '%s'..." % (tableName, fileName, dataType)) + + if fileName != "": + self.checkDataFormatting(dataType, fileName) + + if overwrite: + self.dropTable(tableName) + + tableType = dataType.lower() + if TABLE_SCHEMA_DESCRIPTOR.get(tableType,None) is None and TABLE_TYPE_SYNONYMS.get(tableType,None) is None: + msg = "ERROR: unknown type %s" % dataType + self._log.error(msg) + raise RepetException(msg) + + tableType = TABLE_TYPE_SYNONYMS.get(tableType,tableType) + + fields = [" ".join(fieldDescription) for fieldDescription in TABLE_SCHEMA_DESCRIPTOR.get(tableType)] + sqlCmd = "CREATE TABLE %s (%s)" % (tableName, ",".join(fields)) + self.execute(sqlCmd) + self.createIndex(tableName, tableType) + + tmpFileName = "" + if fileName: + if tableType == "seq": + tmpFileName = "%s.seq" % os.path.basename(fileName) + self._convertFastaToSeq(fileName, tmpFileName) + fileName = tmpFileName + elif tableType == "length": + tmpFileName = "%s.length" % os.path.basename(fileName) + self._convertFastaToLength(fileName, tmpFileName) + fileName = tmpFileName + + hasHeaderLine = tableType == "match" or tableType == "pathstat" + self.loadDataFromFile(tableName, fileName, hasHeaderLine) + if tmpFileName: + os.remove(tmpFileName) + + if tableType == "path": + self.changePathQueryCoordinatesToDirectStrand( tableName ) + + self.updateInfoTable(tableName, fileName) + self._log.info("creating table '%s' done!" % tableName) + + + ## Create a bin table for fast access + # + # @param pathTableName string path table name (input table) + # @param idxTableName string bin path table name (output table) + # @param overwrite boolean default = False + # + def createBinPathTable(self, pathTableName, overwrite = False): + idxTableName = "%s_idx" % pathTableName # is an attribute in TableBinPathAdaptator + if not self.doesTableExist(pathTableName): + msg = "ERROR: '%s' doesn't exist => '%s' can't be created" % (pathTableName, idxTableName) + self._log.error(msg) + raise RepetException(msg) + self._log.info("creating %s for fast access" % idxTableName) + if overwrite: + self.dropTable(idxTableName) + + sqlCmd = "CREATE TABLE %s ( path int unsigned, idx int unsigned, contig varchar(255), min int, max int, strand int unsigned)" % idxTableName + self.execute(sqlCmd) + + sqlCmd = "CREATE INDEX id ON %s ( path );" % idxTableName + self.execute(sqlCmd) + sqlCmd = "CREATE INDEX ibin ON %s ( idx );" % idxTableName + self.execute(sqlCmd) + sqlCmd = "CREATE INDEX icontig ON %s ( contig );" % idxTableName + self.execute(sqlCmd) + sqlCmd = "CREATE INDEX imin ON %s ( min );" % idxTableName + self.execute(sqlCmd) + sqlCmd = "CREATE INDEX imax ON %s ( max );" % idxTableName + self.execute(sqlCmd) + sqlCmd = "CREATE INDEX istrand ON %s ( strand );" % idxTableName + self.execute(sqlCmd) + + tmpTableName = "%s_tmp" % pathTableName + self._createPathTableAndGroupByIdAndOrderByStrand(pathTableName, tmpTableName) + iTPA = TablePathAdaptator(self, tmpTableName) + if not self.isEmpty(tmpTableName): + tmpFileName = "%s.tmp%s" % (pathTableName, str(os.getpid())) + with open(tmpFileName, "w") as f: + lQueryNames = iTPA.getQueryList() + for queryName in lQueryNames: + lPaths = iTPA.getPathListFromQuery(queryName) + for i in lPaths: + idx = i.range_query.findIdx() + max = i.range_query.getMax() + min = i.range_query.getMin() + strand = i.range_query.isOnDirectStrand() + f.write("%d\t%d\t%s\t%d\t%d\t%d\n"%(i.id, idx, i.range_query.seqname, min, max, strand)) + sqlCmd="LOAD DATA LOCAL INFILE '%s' INTO TABLE %s FIELDS ESCAPED BY '' " % (tmpFileName, idxTableName) + self.execute(sqlCmd) + self.updateInfoTable(idxTableName, "%s bin indexes" % pathTableName) + os.remove(tmpFileName) + self.dropTable(tmpTableName) + + + ## This table summarize the Path list information according to the identifier numbers. The min and max value is taken + # + def _createPathTableAndGroupByIdAndOrderByStrand(self, pathTableName, outTableName): + self.dropTable(outTableName) + + sqlcmd="CREATE TABLE %s SELECT path, query_name, min(query_start) AS query_start, max(query_end) AS query_end, subject_name, min(subject_start) AS subject_start, max(subject_end) AS subject_end, min(e_value) AS e_value, sum(score) AS score, avg(identity) AS identity FROM %s WHERE query_start<query_end and subject_start<subject_end group by path;" % (outTableName, pathTableName) + self.execute(sqlcmd) + + sqlcmd="INSERT INTO %s SELECT path, query_name, min(query_start) AS query_start, max(query_end) AS query_end, subject_name, max(subject_start) AS subject_start, min(subject_end) AS subject_end, min(e_value) AS e_value, sum(score) AS score, avg(identity) AS identity FROM %s WHERE query_start<query_end and subject_start>subject_end group by path;" % (outTableName, pathTableName) + self.execute(sqlcmd) + + sqlcmd="INSERT INTO %s SELECT path, query_name, max(query_start) AS query_start, min(query_end) AS query_end, subject_name, min(subject_start) AS subject_start, max(subject_end) AS subject_end, min(e_value) AS e_value, sum(score) AS score, avg(identity) AS identity FROM %s WHERE query_start>query_end and subject_start<subject_end group by path;" % (outTableName, pathTableName) + self.execute(sqlcmd) + + sqlcmd="INSERT INTO %s SELECT path, query_name, max(query_start) AS query_start, min(query_end) AS query_end, subject_name, max(subject_start) AS subject_start, min(subject_end) AS subject_end, min(e_value) AS e_value, sum(score) AS score, avg(identity) AS identity FROM %s WHERE query_start>query_end and subject_start>subject_end group by path;" % (outTableName, pathTableName) + self.execute(sqlcmd) + + self.createIndex(outTableName, "path") + + + ## Create a bin table for fast access + # + # @param setTableName string set table name (input table) + # @param idxTableName string bin set table name (output table) + # @param overwrite boolean default = False + # + def createBinSetTable(self, setTableName, overwrite = False): + idxTableName = "%s_idx" % setTableName # is an attribute in TableBinSetAdaptator + if not self.doesTableExist(setTableName): + msg = "ERROR: '%s' doesn't exist => '%s' can't be created" % (setTableName, idxTableName) + self._log.error(msg) + raise RepetException(msg) + self._log.info("creating %s for fast access" % idxTableName) + if overwrite: + self.dropTable(idxTableName) + + sqlCmd = "CREATE TABLE %s ( path int unsigned, bin float, contig varchar(255), min int, max int, strand int unsigned)" % idxTableName + self.execute(sqlCmd) + + sqlCmd = "CREATE INDEX id ON %s ( path );" % idxTableName + self.execute(sqlCmd) + sqlCmd = "CREATE INDEX ibin ON %s ( bin );" % idxTableName + self.execute(sqlCmd) + sqlCmd = "CREATE INDEX icontig ON %s ( contig );" % idxTableName + self.execute(sqlCmd) + sqlCmd = "CREATE INDEX imin ON %s ( min );" % idxTableName + self.execute(sqlCmd) + sqlCmd = "CREATE INDEX imax ON %s ( max );" % idxTableName + self.execute(sqlCmd) + sqlCmd = "CREATE INDEX istrand ON %s ( strand );" % idxTableName + self.execute(sqlCmd) + + tmpTableName = "%s_tmp" % setTableName + self._createSetTableAndGroupByIdAndOrderByStrand(setTableName, tmpTableName) + iTSA = TableSetAdaptator(self, tmpTableName) + if not self.isEmpty(tmpTableName): + tmpFileName = "%s.tmp%s" % (setTableName, str(os.getpid())) + with open(tmpFileName, "w") as f: + lSeqNames = iTSA.getSeqNameList() + for seqName in lSeqNames: + lSets = iTSA.getSetListFromSeqName(seqName) + for i in lSets: + bin = i.getBin() + max = i.getMax() + min = i.getMin() + strand = i.isOnDirectStrand() + f.write("%d\t%f\t%s\t%d\t%d\t%d\n"%(i.id, bin, i.seqname, min, max, strand)) + sqlCmd="LOAD DATA LOCAL INFILE '%s' INTO TABLE %s FIELDS ESCAPED BY '' " % (tmpFileName, idxTableName) + self.execute(sqlCmd) + self.updateInfoTable(idxTableName, "%s bin indexes" % setTableName) + os.remove(tmpFileName) + self.dropTable(tmpTableName) + + + ## This table summarize the Set list information according to the identifier numbers. The min and max value is taken + # + def _createSetTableAndGroupByIdAndOrderByStrand(self, setTableName, outTableName): + self.dropTable(outTableName) + + sqlcmd="CREATE TABLE %s SELECT path, name, chr, min(start) AS start, max(end) AS end FROM %s WHERE start<end group by path;" % (outTableName, setTableName) + self.execute(sqlcmd) + + sqlcmd="INSERT INTO %s SELECT path, name, chr, max(start) AS start, min(end) AS end FROM %s WHERE start>end group by path;" % (outTableName, setTableName) + self.execute(sqlcmd) + + self.createIndex(outTableName, "set") + + + ## Load data from a file into a MySQL table + # + # @param tableName string table name + # @param fileName string file name + # @param escapeFirstLine boolean True to ignore the first line of file, False otherwise + # + def loadDataFromFile(self, tableName, fileName, escapeFirstLine = False): + if fileName != "": + sqlCmd = "LOAD DATA LOCAL INFILE '%s' INTO TABLE %s FIELDS ESCAPED BY '' " % ( fileName, tableName ) + if escapeFirstLine == True: + sqlCmd = "%s IGNORE 1 LINES" %(sqlCmd) + self.execute( sqlCmd ) + + self._log.info("%i entries in the table %s" % (self.getSize(tableName), tableName)) + +###################################################################################### +#TODO: remove duplication with same methods in fastautils + ## Convert a fasta file to a length file + # + # @param inFile string name of the input fasta file + # @param outFile string name of the output file + # + def _convertFastaToLength(self, inFile, outFile = ""): + if outFile == "": + outFile = "%s.length" % inFile + + if inFile != "": + with open(inFile, "r") as inFH: + with open(outFile, "w") as outFH: + bioseq = Bioseq() + while True: + bioseq.read(inFH) + if bioseq.sequence == None: + break + seqLen = bioseq.getLength() + outFH.write("%s\t%d\n" % (bioseq.header.split()[0], seqLen)) + + + ## Convert a fasta file to a seq file + # + # @param inFile string name of the input fasta file + # @param outFile string name of the output file + # + def _convertFastaToSeq(self, inFile, outFile = ""): + if outFile == "": + outFile = "%s.seq" % inFile + + if inFile != "": + with open(inFile, "r") as inFH: + with open(outFile, "w") as outFH: + bioseq = Bioseq() + while True: + bioseq.read(inFH) + if bioseq.sequence == None: + break + seqLen = bioseq.getLength() + outFH.write("%s\t%s\t%s\t%d\n" % (bioseq.header.split()[0], \ + bioseq.sequence, bioseq.header, seqLen)) + +###################################################################################### + + ## Change the coordinates such that the query is on the direct strand. + # + # @param inTable string path table name to update + # + def changePathQueryCoordinatesToDirectStrand( self, inTable ): + sqlCmd = "ALTER TABLE %s ADD COLUMN tmpid INT NOT NULL AUTO_INCREMENT PRIMARY KEY" % ( inTable ) + self.execute( sqlCmd ) + + tmpTable = "%s_tmp" % ( inTable ) + sqlCmd = "CREATE TABLE %s SELECT * FROM %s WHERE query_start > query_end" % ( tmpTable, inTable ) + self.execute( sqlCmd ) + + sqlCmd = "UPDATE %s, %s" % ( inTable, tmpTable ) + sqlCmd += " SET %s.query_start=%s.query_end," % ( inTable, tmpTable ) + sqlCmd += " %s.query_end=%s.query_start," % ( inTable, tmpTable ) + sqlCmd += " %s.subject_start=%s.subject_end," % ( inTable, tmpTable ) + sqlCmd += " %s.subject_end=%s.subject_start" % ( inTable, tmpTable ) + sqlCmd += " WHERE %s.tmpid=%s.tmpid" % ( inTable, tmpTable ) + self.execute( sqlCmd ) + + sqlCmd = "ALTER TABLE %s DROP COLUMN tmpid" % ( inTable ) + self.execute( sqlCmd ) + self.dropTable( tmpTable ) + + + ## Export data from a table in a file. + # + # @param tableName string table name + # @param outFileName string output file name + # @param keepFirstLine boolean if you want the first line (column name) in output file + # @param param string sql parameters to select data expected + # + def exportDataToFile( self, tableName, outFileName="", keepFirstLine=False, param="" ): + if outFileName == "": outFileName = tableName + prg = "mysql" + cmd = prg + cmd += " -h %s" % ( self.host ) + cmd += " -u %s" % ( self.user ) + cmd += " -p\"%s\"" % ( self.passwd ) + cmd += " --database=%s" % ( self.dbname ) + cmd += " -e\"SELECT * FROM %s" % ( tableName ) + if param != "": cmd += " %s" % ( param ) + cmd += ";\"" + cmd += " > " + if keepFirstLine == False: + cmd += "%s.tmp" % ( outFileName ) + else: + cmd += "%s" % ( outFileName ) + log = os.system( cmd ) + if log != 0: print "ERROR: mysql returned %i" % ( log ); sys.exit(1) + + if keepFirstLine == False: + tmpFileName = "%s.tmp" % ( outFileName ) + tmpFile = open( tmpFileName, "r" ) + outFile = open( outFileName, "w" ) + i = 0 + for line in tmpFile: + if i > 0: + outFile.write( line ) + i += 1 + tmpFile.close() + outFile.close() + os.remove( tmpFileName ) + + + ## Convert a Path table into an Align table + # + # @param inPathTable string name of the input Path table + # @param outAlignTable string name of the output Align table + # + def convertPathTableIntoAlignTable( self, inPathTable, outAlignTable ): + sqlCmd = "CREATE TABLE %s SELECT query_name,query_start,query_end,subject_name,subject_start,subject_end,E_value,score,identity FROM %s;" % ( outAlignTable, inPathTable ) + self.execute( sqlCmd ) + self.updateInfoTable( outAlignTable, "" ) + + + ## Create a set table from a map table + # + # @param mapTableName string map table name + # @param setTableName string new set table name + # + def convertMapTableIntoSetTable( self, mapTableName, setTableName ): + sqlCmd = "CREATE TABLE %s (path int(10) unsigned auto_increment primary key) select name, chr, start, end from %s;" % (setTableName, mapTableName) + self.execute(sqlCmd) + self.createIndex(setTableName, "set") + + + ## Convert an Align table into a Path table + # + # @param inAlignTable string name of the input Align table + # @param outPathTable string name of the output Path table + # + def convertAlignTableIntoPathTable( self, inAlignTable, outPathTable ): + self.createTable( outPathTable, "path", "", True ) + sqlCmd = "SELECT * FROM %s" % ( inAlignTable ) + self.execute( sqlCmd ) + lResults = self.fetchall() + rowIndex = 0 + for res in lResults: + rowIndex += 1 + sqlCmd = "INSERT INTO %s" % ( outPathTable ) + sqlCmd += " (path,query_name,query_start,query_end,subject_name,subject_start,subject_end,E_value,score,identity)" + sqlCmd += " VALUES ( '%i'" % ( rowIndex ) + for i in res: + sqlCmd += ', "%s"' % ( i ) + sqlCmd += " )" + self.execute( sqlCmd ) + self.updateInfoTable( outPathTable, "" ) + + + ## Give a list of instances according to the SQL command + # + # @param SQLCmd string is a SQL command + # @param methodGetInstance2Adapt a getter method name. With this method you choose the type of intances contained in lObjs. See example in Test_DbMySql.py. + # @return lObjs list of instances + # + def getObjectListWithSQLCmd( self, SQLCmd, methodGetInstance2Adapt): + self.execute( SQLCmd ) + res = self.fetchall() + lObjs = [] + for t in res: + iObj = methodGetInstance2Adapt() + iObj.setFromTuple( t ) + lObjs.append( iObj ) + return lObjs + + + ## Give a list of integer according to the SQL command + # + # @param sqlCmd string is a SQL command + # @return lInteger integer list + # + def getIntegerListWithSQLCmd( self, sqlCmd ): + self.execute(sqlCmd) + res = self.fetchall() + lInteger = [] + for t in res: + if t[0] != None: + lInteger.append(int(t[0])) + return lInteger + + + ## Give a int according to the SQL command + # + # @param sqlCmd string is a SQL command + # @return nb integer + # + def getIntegerWithSQLCmd( self, sqlCmd ): + self.execute(sqlCmd) + res = self.fetchall() + nb = res[0][0] + if nb == None: + nb = 0 + return nb + + + ## Give a list of str according to the SQL command + # + # @param sqlCmd string is a SQL command + # @return lString str list + # + def getStringListWithSQLCmd( self, sqlCmd ): + self.execute(sqlCmd) + res = self.fetchall() + lString = [] + for i in res: + lString.append(i[0]) + return lString + +#TODO: use API to add indexes + ## Remove doublons in a given table + # + # @param table string name of a MySQL table + # + def removeDoublons( self, table ): + tmpTable = "%s_%s" % ( table, time.strftime("%Y%m%d%H%M%S") ) + sqlCmd = "CREATE TABLE %s SELECT DISTINCT * FROM %s" % ( tmpTable, table ) + self.execute( sqlCmd ) + self.dropTable( table ) + self.renameTable(tmpTable, table) + + + ## Get a list of table names from a pattern + # + # @note for instance pattern = 'MyProject_%' + # + def getTableListFromPattern( self, pattern ): + if pattern == "*" or pattern == "%": + sqlCmd = "SHOW TABLES" + else: + sqlCmd = "SHOW TABLES like '%s'" % ( pattern ) + lTables = self.getStringListWithSQLCmd( sqlCmd ) + return lTables