Mercurial > repos > yufei-luo > s_mart
view commons/core/sql/DbMySql.py @ 9:1eb55963fe39
Updated CompareOverlappingSmall*.py
author | m-zytnicki |
---|---|
date | Thu, 14 Mar 2013 05:23:05 -0400 |
parents | 769e306b7933 |
children |
line wrap: on
line source
# Copyright INRA (Institut National de la Recherche Agronomique) # http://www.inra.fr # http://urgi.versailles.inra.fr # # This software is governed by the CeCILL license under French law and # abiding by the rules of distribution of free software. You can use, # modify and/ or redistribute the software under the terms of the CeCILL # license as circulated by CEA, CNRS and INRIA at the following URL # "http://www.cecill.info". # # As a counterpart to the access to the source code and rights to copy, # modify and redistribute granted by the license, users are provided only # with a limited warranty and the software's author, the holder of the # economic rights, and the successive licensors have only limited # liability. # # In this respect, the user's attention is drawn to the risks associated # with loading, using, modifying and/or developing or reproducing the # software by the user in light of its specific status of free software, # that may mean that it is complicated to manipulate, and that also # therefore means that it is reserved for developers and experienced # professionals having in-depth computer knowledge. Users are therefore # encouraged to load and test the software's suitability as regards their # requirements in conditions enabling the security of their systems and/or # data to be ensured and, more generally, to use and operate it in the # same conditions as regards security. # # The fact that you are presently reading this means that you have had # knowledge of the CeCILL license and that you accept its terms. # Exception hierarchy: # # StandardError # |__Warning # |__Error # |__InterfaceError # |__DatabaseError # |__DataError # |__OperationalError # |__IntegrityError # |__InternalError # |__ProgrammingError # |__NotSupportedError import os import sys import time import ConfigParser import MySQLdb from MySQLdb import InterfaceError from MySQLdb import OperationalError from MySQLdb import InternalError from MySQLdb import DatabaseError from commons.core.seq.Bioseq import Bioseq from commons.core.LoggerFactory import LoggerFactory from commons.core.checker.RepetException import RepetException from commons.core.sql.TablePathAdaptator import TablePathAdaptator from commons.core.sql.TableSetAdaptator import TableSetAdaptator LOG_DEPTH = "repet.commons" TABLE_SCHEMA_DESCRIPTOR = {"map": [("name", "varchar(255)"), ("chr", "varchar(255)"), ("start", "int"), ("end", "int")], "set": [("path", "int unsigned"), ("name", "varchar(255)"), ("chr", "varchar(255)"), ("start", "int"), ("end", "int")], "match": [("query_name", "varchar(255)"), ("query_start", "int"), ("query_end", "int"), ("query_length", "int unsigned"), ("query_length_perc", "float"), ("match_length_perc", "float"), ("subject_name", "varchar(255)"), ("subject_start", "int unsigned"), ("subject_end", "int unsigned"), ("subject_length", "int unsigned"), ("subject_length_perc", "float"), ("E_value", "double"), ("score", "int unsigned"), ("identity", "float"), ("path", "int unsigned")], "path": [("path", "int unsigned"), ("query_name", "varchar(255)"), ("query_start", "int"), ("query_end", "int"), ("subject_name", "varchar(255)"), ("subject_start", "int unsigned"), ("subject_end", "int unsigned"), ("E_value", "double"), ("score", "int unsigned"), ("identity", "float")], "align": [("query_name", "varchar(255)"), ("query_start", "int"), ("query_end", "int"), ("subject_name", "varchar(255)"), ("subject_start", "int unsigned"), ("subject_end", "int unsigned"), ("E_value", "double"), ("score", "int unsigned"), ("identity", "float")], "seq": [("accession", "varchar(255)"), ("sequence", "longtext"), ("description", "varchar(255)"), ("length", "int unsigned")], "length": [("accession", "varchar(255)"), ("length", "int unsigned")], "jobs": [("jobid", "int unsigned"), ("jobname", "varchar(255)"), ("groupid", "varchar(255)"), ("launcher", "varchar(1024)"), ("queue", "varchar(255)"), ("resources", "varchar(255)"), ("status", "varchar(255)"), ("time", "datetime"), ("node", "varchar(255)")], "classif": [("seq_name", "varchar(255)"), ("length", "int unsigned"), ("strand", "char"), ("status", "varchar(255)"), ("class_classif", "varchar(255)"), ("order_classif", "varchar(255)"), ("completeness", "varchar(255)"), ("evidence", "text")], "pathstat": [("family", "varchar(255)"), ("maxLength", "int"), ("meanLength", "int"), ("covg", "int"), ("frags", "int"), ("fullLgthFrags", "int"), ("copies", "int"), ("fullLgthCopies", "int"), ("meanId", "varchar(255)"), ("sdId", "varchar(255)"), ("minId", "varchar(255)"), ("q25Id", "varchar(255)"), ("medId", "varchar(255)"), ("q75Id", "varchar(255)"), ("maxId", "varchar(255)"), ("meanLgth", "varchar(255)"), ("sdLgth", "varchar(255)"), ("minLgth", "varchar(255)"), ("q25Lgth", "varchar(255)"), ("medLgth", "varchar(255)"), ("q75Lgth", "varchar(255)"), ("maxLgth", "varchar(255)"), ("meanLgthPerc", "varchar(255)"), ("sdLgthPerc", "varchar(255)"), ("minLgthPerc", "varchar(255)"), ("q25LgthPerc", "varchar(255)"), ("medLgthPerc", "varchar(255)"), ("q75LgthPerc", "varchar(255)"), ("maxLgthPerc", "varchar(255)")], "info_tables":[("name", "varchar(255)"), ("file", "varchar(255)")] } TABLE_INDEX_DESCRIPTOR = {"map": [("iname", "name"), ("ichr", "chr"), ("istart", "start"), ("iend", "end"), ("icoord", "start, end")], "set": [("id", "path"), ("iname", "name"), ("ichr", "chr"), ("istart", "start"), ("iend", "end"), ("icoord", "start, end")], "match": [("id", "path"), ("qname", "query_name"), ("qstart", "query_start"), ("qend", "query_end"), ("sname", "subject_name"), ("sstart", "subject_start"), ("send", "subject_end"), ("qcoord", "query_start, query_end")], "path": [("id", "path"), ("qname", "query_name"), ("qstart", "query_start"), ("qend", "query_end"), ("sname", "subject_name"), ("sstart", "subject_start"), ("send", "subject_end"), ("qcoord", "query_start, query_end")], "align": [("qname", "query_name"), ("qstart", "query_start"), ("qend", "query_end"), ("sname", "subject_name"), ("sstart", "subject_start"), ("send", "subject_end"), ("qcoord", "query_start, query_end")], "seq": [("iacc", "accession"), ("idescr", "description")], "length": [("iacc", "accession"), ("ilength", "length")], "jobs": [("ijobid", "jobid"), ("ijobname", "jobname"), ("igroupid", "groupid"), ("istatus", "status")], "classif": [("iseq_name", "seq_name"), ("istatus", "status"), ("iclass", "class_classif"), ("iorder", "order_classif"), ("icomp", "completeness")], "pathstat": [], "info_tables": [] } TABLE_TYPE_SYNONYMS = {"tab": "match", "fasta": "seq", "fa": "seq", "fsa": "seq" } ## Handle connections to MySQL tables formatted for REPET # class DbMySql(object): ## Constructor # # @param user string db user name # @param host string db host name # @param passwd string db user password # @param dbname string database name # @param port integer database port # @param cfgFileName string configuration file name # # @note when a parameter is left blank, the constructor is able # to set attribute values from environment variables: REPET_HOST, # REPET_USER, REPET_PW, REPET_DB, REPET_PORT # def __init__(self, user = "", host = "", passwd = "", dbname = "", port = "", cfgFileName = "", verbosity = 1): self._log = LoggerFactory.createLogger("%s.%s" % (LOG_DEPTH, self.__class__.__name__), verbosity) if cfgFileName != "": self.setAttributesFromConfigFile(cfgFileName) elif host != "" and user != "" and passwd != "" and dbname != "": self.host = host self.user = user self.passwd = passwd self.dbname = dbname else: for envVar in ["REPET_HOST","REPET_USER","REPET_PW","REPET_DB"]: if os.environ.get( envVar ) == None: msg = "ERROR: can't find environment variable '%s'" % envVar self._log.error(msg) raise RepetException(msg) self.host = os.environ.get("REPET_HOST") self.user = os.environ.get("REPET_USER") self.passwd = os.environ.get("REPET_PW") self.dbname = os.environ.get("REPET_DB") if port != "" and cfgFileName == "": self.port = int(port) elif os.environ.get("REPET_PORT") != None: self.port = int(os.environ.get("REPET_PORT")) else: self.port = 3306 maxNbTry = 10 for i in xrange(1,maxNbTry+1): if not self.open(): time.sleep(2) if i == maxNbTry: msg = "ERROR: failed to connect to the MySQL database" self._log.error(msg) raise DatabaseError(msg) else: break self.cursor = self.db.cursor() self.execute("""use %s""" %(self.dbname)) ## Set the attributes from the configuration file # # @param configFileName string configuration file name # def setAttributesFromConfigFile(self, configFileName): config = ConfigParser.ConfigParser() config.readfp( open(configFileName) ) self.host = config.get("repet_env","repet_host") self.user = config.get("repet_env","repet_user") self.passwd = config.get("repet_env","repet_pw") self.dbname = config.get("repet_env","repet_db") self.port = int( config.get("repet_env","repet_port") ) ## Connect to the MySQL database # def open(self): try: if int(MySQLdb.get_client_info().split(".")[0]) >= 5: self.db = MySQLdb.connect( user = self.user, host = self.host,\ passwd = self.passwd, db = self.dbname, \ port = self.port, \ local_infile = 1 ) else: self.db = MySQLdb.connect( user = self.user, host = self.host,\ passwd = self.passwd, db = self.dbname, \ port = self.port ) except MySQLdb.Error, e: msg = "ERROR %d: %s" % (e.args[0], e.args[1]) self._log.error(msg) return False return True ## Execute a SQL query # # @param qry string SQL query to execute # @param params parameters of SQL query # def execute(self, qry, params = None, nbTry = 3, sleep = 5): if nbTry: self._log.debug("################START SQL DEBUG MODE################") self._log.debug("Current directory: %s" % os.getcwd()) self._log.debug("Host: %s" % self.host) self._log.debug("User: %s" % self.user) self._log.debug("Database: %s" % self.dbname) self._log.debug("SQL command: %s" % qry) self._log.debug("################STOP SQL DEBUG MODE################\n") try: if params == None: self.cursor.execute(qry) else: self.cursor.execute(qry, params) except (InterfaceError, OperationalError, InternalError) as iError: self._log.error("FAILED to execute query '%s': %s. %s retries left." % (qry, iError.args[1], nbTry - 1)) self._log.debug("WAIT %is to execute '%s'" % (sleep, qry)) time.sleep(sleep) try: self.close() except: pass self.open() self.cursor = self.db.cursor() self.execute(qry, params, nbTry - 1, sleep) else: msg = "ERROR: can't execute '%s' after several tries" % qry self._log.error(msg) raise DatabaseError(msg) ## Close the connection # def close( self ): self.db.close() ## Retrieve the results of a SQL query # def fetchall(self): return self.cursor.fetchall() ## Test if a table exists # # @param table string table name # @return boolean True if the table exists, False otherwise # def doesTableExist( self, table ): self.execute( """SHOW TABLES""" ) results = self.cursor.fetchall() if (table,) in results: return True return False ## Remove a table if it exists # # @param table string table name # def dropTable(self, table): if self.doesTableExist( table ): sqlCmd = "DROP TABLE %s" % table self.execute( sqlCmd ) sqlCmd = 'DELETE FROM info_tables WHERE name = "%s"' % table self.execute( sqlCmd ) ## Rename a table # # @param table string existing table name # @param newName string new table name # def renameTable( self, table, newName ): self.dropTable( newName ) self.execute( 'RENAME TABLE %s TO %s ;' % (table, newName) ) self.execute( 'UPDATE info_tables SET name="%s" WHERE name="%s";' % (newName, table) ) ## Duplicate a table # # @param tableName string source table name # @param newTableName string new table name # def copyTable(self, sourcetableName, newTableName): self.dropTable( newTableName ) sqlCmd = "CREATE TABLE %s LIKE %s;" % (newTableName, sourcetableName) self.execute( sqlCmd ) sqlCmd = "INSERT INTO %s SELECT * FROM %s;" % (newTableName, sourcetableName) self.execute( sqlCmd ) self._log.info("copying table data,", sourcetableName, "in", newTableName) self.updateInfoTable(newTableName, "") ## Give the rows number of the table # # @param tableName string table name # def getSize( self, tableName ): qry = "SELECT count(*) FROM %s;" % (tableName) self.execute(qry) res = self.fetchall() return int( res[0][0] ) def getTableType(self, tableName): qry = "SHOW COLUMNS FROM %s;" % (tableName) self.execute(qry) res = self.fetchall() fieldNames = [] for row in res: fieldNames.append(row[0]) for tableType, fieldInfos in TABLE_SCHEMA_DESCRIPTOR.items(): refFieldsNames = [name for name,type in fieldInfos] if refFieldsNames == fieldNames: return tableType return None ## Test if table is empty # # @param tableName string table name # @return boolean True if the table is empty, False otherwise # def isEmpty(self, tableName): return self.getSize(tableName) == 0 ## Record a new table in the 'info_table' table # # @param tableName string table name # @param info string information on the table origin # def updateInfoTable( self, tableName, info ): if not self.doesTableExist( "info_tables" ): sqlCmd = "CREATE TABLE info_tables ( name varchar(255), file varchar(255) )" self.execute( sqlCmd ) sqlCmd = 'INSERT INTO info_tables VALUES ("%s","%s")' % (tableName, info) self.execute( sqlCmd ) ## Get a list with the fields # def getFieldList( self, table ): lFields = [] sqlCmd = "DESCRIBE %s" % ( table ) self.execute( sqlCmd ) lResults = self.fetchall() for res in lResults: lFields.append( res[0] ) return lFields ## Check that the input file has as many fields than it is supposed to according to its format # # @note fields should be separated by tab # def checkDataFormatting( self, dataType, fileName ): dataType = dataType.lower() if dataType in ["fa", "fasta", "seq", "classif", "length", "jobs", "pathstat"]: return dDataType2NbFields = { "map": 4, "set": 5, "align": 9, "path": 10, "match": 15, "tab": 15 } fileHandler = open( fileName, "r" ) line = fileHandler.readline() if line != "": tokens = line.split("\t") if len(tokens) < dDataType2NbFields[ dataType ]: msg = "ERROR: '%s' file has less than %i fields" % ( dataType, dDataType2NbFields[ dataType ] ) self._log.error(msg) raise RepetException(msg) if len(tokens) > dDataType2NbFields[ dataType ]: msg = "ERROR: '%s' file has more than %i fields" % ( dataType, dDataType2NbFields[ dataType ] ) self._log.error(msg) raise RepetException(msg) fileHandler.close() def createIndex(self, tableName="", tableType=""): sqlCmd = "SHOW INDEX FROM %s;"% (tableName) self.execute(sqlCmd) res = self.fetchall() lIndex = [] for i in res: lIndex.append(i[2]) self._log.warning("existing indexes:", lIndex) for indexName, fieldNames in TABLE_INDEX_DESCRIPTOR.get(tableType): if not indexName in lIndex: sqlCmd = "CREATE INDEX %s ON %s ( %s );" % (indexName, tableName, fieldNames) self.execute(sqlCmd) ## Create a MySQL table of specified data type and load data # # @param tableName string name of the table to be created # @param fileName string name of the file containing the data to be loaded in the table # @param dataType string type of the data (map, set, align, path, match, seq, length, jobs) # @param overwrite boolean (default = False) # def createTable(self, tableName, dataType, fileName = "", overwrite = False): self._log.info("creating table '%s' from file '%s' of type '%s'..." % (tableName, fileName, dataType)) if fileName != "": self.checkDataFormatting(dataType, fileName) if overwrite: self.dropTable(tableName) tableType = dataType.lower() if TABLE_SCHEMA_DESCRIPTOR.get(tableType,None) is None and TABLE_TYPE_SYNONYMS.get(tableType,None) is None: msg = "ERROR: unknown type %s" % dataType self._log.error(msg) raise RepetException(msg) tableType = TABLE_TYPE_SYNONYMS.get(tableType,tableType) fields = [" ".join(fieldDescription) for fieldDescription in TABLE_SCHEMA_DESCRIPTOR.get(tableType)] sqlCmd = "CREATE TABLE %s (%s)" % (tableName, ",".join(fields)) self.execute(sqlCmd) self.createIndex(tableName, tableType) tmpFileName = "" if fileName: if tableType == "seq": tmpFileName = "%s.seq" % os.path.basename(fileName) self._convertFastaToSeq(fileName, tmpFileName) fileName = tmpFileName elif tableType == "length": tmpFileName = "%s.length" % os.path.basename(fileName) self._convertFastaToLength(fileName, tmpFileName) fileName = tmpFileName hasHeaderLine = tableType == "match" or tableType == "pathstat" self.loadDataFromFile(tableName, fileName, hasHeaderLine) if tmpFileName: os.remove(tmpFileName) if tableType == "path": self.changePathQueryCoordinatesToDirectStrand( tableName ) self.updateInfoTable(tableName, fileName) self._log.info("creating table '%s' done!" % tableName) ## Create a bin table for fast access # # @param pathTableName string path table name (input table) # @param idxTableName string bin path table name (output table) # @param overwrite boolean default = False # def createBinPathTable(self, pathTableName, overwrite = False): idxTableName = "%s_idx" % pathTableName # is an attribute in TableBinPathAdaptator if not self.doesTableExist(pathTableName): msg = "ERROR: '%s' doesn't exist => '%s' can't be created" % (pathTableName, idxTableName) self._log.error(msg) raise RepetException(msg) self._log.info("creating %s for fast access" % idxTableName) if overwrite: self.dropTable(idxTableName) sqlCmd = "CREATE TABLE %s ( path int unsigned, idx int unsigned, contig varchar(255), min int, max int, strand int unsigned)" % idxTableName self.execute(sqlCmd) sqlCmd = "CREATE INDEX id ON %s ( path );" % idxTableName self.execute(sqlCmd) sqlCmd = "CREATE INDEX ibin ON %s ( idx );" % idxTableName self.execute(sqlCmd) sqlCmd = "CREATE INDEX icontig ON %s ( contig );" % idxTableName self.execute(sqlCmd) sqlCmd = "CREATE INDEX imin ON %s ( min );" % idxTableName self.execute(sqlCmd) sqlCmd = "CREATE INDEX imax ON %s ( max );" % idxTableName self.execute(sqlCmd) sqlCmd = "CREATE INDEX istrand ON %s ( strand );" % idxTableName self.execute(sqlCmd) tmpTableName = "%s_tmp" % pathTableName self._createPathTableAndGroupByIdAndOrderByStrand(pathTableName, tmpTableName) iTPA = TablePathAdaptator(self, tmpTableName) if not self.isEmpty(tmpTableName): tmpFileName = "%s.tmp%s" % (pathTableName, str(os.getpid())) with open(tmpFileName, "w") as f: lQueryNames = iTPA.getQueryList() for queryName in lQueryNames: lPaths = iTPA.getPathListFromQuery(queryName) for i in lPaths: idx = i.range_query.findIdx() max = i.range_query.getMax() min = i.range_query.getMin() strand = i.range_query.isOnDirectStrand() f.write("%d\t%d\t%s\t%d\t%d\t%d\n"%(i.id, idx, i.range_query.seqname, min, max, strand)) sqlCmd="LOAD DATA LOCAL INFILE '%s' INTO TABLE %s FIELDS ESCAPED BY '' " % (tmpFileName, idxTableName) self.execute(sqlCmd) self.updateInfoTable(idxTableName, "%s bin indexes" % pathTableName) os.remove(tmpFileName) self.dropTable(tmpTableName) ## This table summarize the Path list information according to the identifier numbers. The min and max value is taken # def _createPathTableAndGroupByIdAndOrderByStrand(self, pathTableName, outTableName): self.dropTable(outTableName) sqlcmd="CREATE TABLE %s SELECT path, query_name, min(query_start) AS query_start, max(query_end) AS query_end, subject_name, min(subject_start) AS subject_start, max(subject_end) AS subject_end, min(e_value) AS e_value, sum(score) AS score, avg(identity) AS identity FROM %s WHERE query_start<query_end and subject_start<subject_end group by path;" % (outTableName, pathTableName) self.execute(sqlcmd) sqlcmd="INSERT INTO %s SELECT path, query_name, min(query_start) AS query_start, max(query_end) AS query_end, subject_name, max(subject_start) AS subject_start, min(subject_end) AS subject_end, min(e_value) AS e_value, sum(score) AS score, avg(identity) AS identity FROM %s WHERE query_start<query_end and subject_start>subject_end group by path;" % (outTableName, pathTableName) self.execute(sqlcmd) sqlcmd="INSERT INTO %s SELECT path, query_name, max(query_start) AS query_start, min(query_end) AS query_end, subject_name, min(subject_start) AS subject_start, max(subject_end) AS subject_end, min(e_value) AS e_value, sum(score) AS score, avg(identity) AS identity FROM %s WHERE query_start>query_end and subject_start<subject_end group by path;" % (outTableName, pathTableName) self.execute(sqlcmd) sqlcmd="INSERT INTO %s SELECT path, query_name, max(query_start) AS query_start, min(query_end) AS query_end, subject_name, max(subject_start) AS subject_start, min(subject_end) AS subject_end, min(e_value) AS e_value, sum(score) AS score, avg(identity) AS identity FROM %s WHERE query_start>query_end and subject_start>subject_end group by path;" % (outTableName, pathTableName) self.execute(sqlcmd) self.createIndex(outTableName, "path") ## Create a bin table for fast access # # @param setTableName string set table name (input table) # @param idxTableName string bin set table name (output table) # @param overwrite boolean default = False # def createBinSetTable(self, setTableName, overwrite = False): idxTableName = "%s_idx" % setTableName # is an attribute in TableBinSetAdaptator if not self.doesTableExist(setTableName): msg = "ERROR: '%s' doesn't exist => '%s' can't be created" % (setTableName, idxTableName) self._log.error(msg) raise RepetException(msg) self._log.info("creating %s for fast access" % idxTableName) if overwrite: self.dropTable(idxTableName) sqlCmd = "CREATE TABLE %s ( path int unsigned, bin float, contig varchar(255), min int, max int, strand int unsigned)" % idxTableName self.execute(sqlCmd) sqlCmd = "CREATE INDEX id ON %s ( path );" % idxTableName self.execute(sqlCmd) sqlCmd = "CREATE INDEX ibin ON %s ( bin );" % idxTableName self.execute(sqlCmd) sqlCmd = "CREATE INDEX icontig ON %s ( contig );" % idxTableName self.execute(sqlCmd) sqlCmd = "CREATE INDEX imin ON %s ( min );" % idxTableName self.execute(sqlCmd) sqlCmd = "CREATE INDEX imax ON %s ( max );" % idxTableName self.execute(sqlCmd) sqlCmd = "CREATE INDEX istrand ON %s ( strand );" % idxTableName self.execute(sqlCmd) tmpTableName = "%s_tmp" % setTableName self._createSetTableAndGroupByIdAndOrderByStrand(setTableName, tmpTableName) iTSA = TableSetAdaptator(self, tmpTableName) if not self.isEmpty(tmpTableName): tmpFileName = "%s.tmp%s" % (setTableName, str(os.getpid())) with open(tmpFileName, "w") as f: lSeqNames = iTSA.getSeqNameList() for seqName in lSeqNames: lSets = iTSA.getSetListFromSeqName(seqName) for i in lSets: bin = i.getBin() max = i.getMax() min = i.getMin() strand = i.isOnDirectStrand() f.write("%d\t%f\t%s\t%d\t%d\t%d\n"%(i.id, bin, i.seqname, min, max, strand)) sqlCmd="LOAD DATA LOCAL INFILE '%s' INTO TABLE %s FIELDS ESCAPED BY '' " % (tmpFileName, idxTableName) self.execute(sqlCmd) self.updateInfoTable(idxTableName, "%s bin indexes" % setTableName) os.remove(tmpFileName) self.dropTable(tmpTableName) ## This table summarize the Set list information according to the identifier numbers. The min and max value is taken # def _createSetTableAndGroupByIdAndOrderByStrand(self, setTableName, outTableName): self.dropTable(outTableName) sqlcmd="CREATE TABLE %s SELECT path, name, chr, min(start) AS start, max(end) AS end FROM %s WHERE start<end group by path;" % (outTableName, setTableName) self.execute(sqlcmd) sqlcmd="INSERT INTO %s SELECT path, name, chr, max(start) AS start, min(end) AS end FROM %s WHERE start>end group by path;" % (outTableName, setTableName) self.execute(sqlcmd) self.createIndex(outTableName, "set") ## Load data from a file into a MySQL table # # @param tableName string table name # @param fileName string file name # @param escapeFirstLine boolean True to ignore the first line of file, False otherwise # def loadDataFromFile(self, tableName, fileName, escapeFirstLine = False): if fileName != "": sqlCmd = "LOAD DATA LOCAL INFILE '%s' INTO TABLE %s FIELDS ESCAPED BY '' " % ( fileName, tableName ) if escapeFirstLine == True: sqlCmd = "%s IGNORE 1 LINES" %(sqlCmd) self.execute( sqlCmd ) self._log.info("%i entries in the table %s" % (self.getSize(tableName), tableName)) ###################################################################################### #TODO: remove duplication with same methods in fastautils ## Convert a fasta file to a length file # # @param inFile string name of the input fasta file # @param outFile string name of the output file # def _convertFastaToLength(self, inFile, outFile = ""): if outFile == "": outFile = "%s.length" % inFile if inFile != "": with open(inFile, "r") as inFH: with open(outFile, "w") as outFH: bioseq = Bioseq() while True: bioseq.read(inFH) if bioseq.sequence == None: break seqLen = bioseq.getLength() outFH.write("%s\t%d\n" % (bioseq.header.split()[0], seqLen)) ## Convert a fasta file to a seq file # # @param inFile string name of the input fasta file # @param outFile string name of the output file # def _convertFastaToSeq(self, inFile, outFile = ""): if outFile == "": outFile = "%s.seq" % inFile if inFile != "": with open(inFile, "r") as inFH: with open(outFile, "w") as outFH: bioseq = Bioseq() while True: bioseq.read(inFH) if bioseq.sequence == None: break seqLen = bioseq.getLength() outFH.write("%s\t%s\t%s\t%d\n" % (bioseq.header.split()[0], \ bioseq.sequence, bioseq.header, seqLen)) ###################################################################################### ## Change the coordinates such that the query is on the direct strand. # # @param inTable string path table name to update # def changePathQueryCoordinatesToDirectStrand( self, inTable ): sqlCmd = "ALTER TABLE %s ADD COLUMN tmpid INT NOT NULL AUTO_INCREMENT PRIMARY KEY" % ( inTable ) self.execute( sqlCmd ) tmpTable = "%s_tmp" % ( inTable ) sqlCmd = "CREATE TABLE %s SELECT * FROM %s WHERE query_start > query_end" % ( tmpTable, inTable ) self.execute( sqlCmd ) sqlCmd = "UPDATE %s, %s" % ( inTable, tmpTable ) sqlCmd += " SET %s.query_start=%s.query_end," % ( inTable, tmpTable ) sqlCmd += " %s.query_end=%s.query_start," % ( inTable, tmpTable ) sqlCmd += " %s.subject_start=%s.subject_end," % ( inTable, tmpTable ) sqlCmd += " %s.subject_end=%s.subject_start" % ( inTable, tmpTable ) sqlCmd += " WHERE %s.tmpid=%s.tmpid" % ( inTable, tmpTable ) self.execute( sqlCmd ) sqlCmd = "ALTER TABLE %s DROP COLUMN tmpid" % ( inTable ) self.execute( sqlCmd ) self.dropTable( tmpTable ) ## Export data from a table in a file. # # @param tableName string table name # @param outFileName string output file name # @param keepFirstLine boolean if you want the first line (column name) in output file # @param param string sql parameters to select data expected # def exportDataToFile( self, tableName, outFileName="", keepFirstLine=False, param="" ): if outFileName == "": outFileName = tableName prg = "mysql" cmd = prg cmd += " -h %s" % ( self.host ) cmd += " -u %s" % ( self.user ) cmd += " -p\"%s\"" % ( self.passwd ) cmd += " --database=%s" % ( self.dbname ) cmd += " -e\"SELECT * FROM %s" % ( tableName ) if param != "": cmd += " %s" % ( param ) cmd += ";\"" cmd += " > " if keepFirstLine == False: cmd += "%s.tmp" % ( outFileName ) else: cmd += "%s" % ( outFileName ) log = os.system( cmd ) if log != 0: print "ERROR: mysql returned %i" % ( log ); sys.exit(1) if keepFirstLine == False: tmpFileName = "%s.tmp" % ( outFileName ) tmpFile = open( tmpFileName, "r" ) outFile = open( outFileName, "w" ) i = 0 for line in tmpFile: if i > 0: outFile.write( line ) i += 1 tmpFile.close() outFile.close() os.remove( tmpFileName ) ## Convert a Path table into an Align table # # @param inPathTable string name of the input Path table # @param outAlignTable string name of the output Align table # def convertPathTableIntoAlignTable( self, inPathTable, outAlignTable ): sqlCmd = "CREATE TABLE %s SELECT query_name,query_start,query_end,subject_name,subject_start,subject_end,E_value,score,identity FROM %s;" % ( outAlignTable, inPathTable ) self.execute( sqlCmd ) self.updateInfoTable( outAlignTable, "" ) ## Create a set table from a map table # # @param mapTableName string map table name # @param setTableName string new set table name # def convertMapTableIntoSetTable( self, mapTableName, setTableName ): sqlCmd = "CREATE TABLE %s (path int(10) unsigned auto_increment primary key) select name, chr, start, end from %s;" % (setTableName, mapTableName) self.execute(sqlCmd) self.createIndex(setTableName, "set") ## Convert an Align table into a Path table # # @param inAlignTable string name of the input Align table # @param outPathTable string name of the output Path table # def convertAlignTableIntoPathTable( self, inAlignTable, outPathTable ): self.createTable( outPathTable, "path", "", True ) sqlCmd = "SELECT * FROM %s" % ( inAlignTable ) self.execute( sqlCmd ) lResults = self.fetchall() rowIndex = 0 for res in lResults: rowIndex += 1 sqlCmd = "INSERT INTO %s" % ( outPathTable ) sqlCmd += " (path,query_name,query_start,query_end,subject_name,subject_start,subject_end,E_value,score,identity)" sqlCmd += " VALUES ( '%i'" % ( rowIndex ) for i in res: sqlCmd += ', "%s"' % ( i ) sqlCmd += " )" self.execute( sqlCmd ) self.updateInfoTable( outPathTable, "" ) ## Give a list of instances according to the SQL command # # @param SQLCmd string is a SQL command # @param methodGetInstance2Adapt a getter method name. With this method you choose the type of intances contained in lObjs. See example in Test_DbMySql.py. # @return lObjs list of instances # def getObjectListWithSQLCmd( self, SQLCmd, methodGetInstance2Adapt): self.execute( SQLCmd ) res = self.fetchall() lObjs = [] for t in res: iObj = methodGetInstance2Adapt() iObj.setFromTuple( t ) lObjs.append( iObj ) return lObjs ## Give a list of integer according to the SQL command # # @param sqlCmd string is a SQL command # @return lInteger integer list # def getIntegerListWithSQLCmd( self, sqlCmd ): self.execute(sqlCmd) res = self.fetchall() lInteger = [] for t in res: if t[0] != None: lInteger.append(int(t[0])) return lInteger ## Give a int according to the SQL command # # @param sqlCmd string is a SQL command # @return nb integer # def getIntegerWithSQLCmd( self, sqlCmd ): self.execute(sqlCmd) res = self.fetchall() nb = res[0][0] if nb == None: nb = 0 return nb ## Give a list of str according to the SQL command # # @param sqlCmd string is a SQL command # @return lString str list # def getStringListWithSQLCmd( self, sqlCmd ): self.execute(sqlCmd) res = self.fetchall() lString = [] for i in res: lString.append(i[0]) return lString #TODO: use API to add indexes ## Remove doublons in a given table # # @param table string name of a MySQL table # def removeDoublons( self, table ): tmpTable = "%s_%s" % ( table, time.strftime("%Y%m%d%H%M%S") ) sqlCmd = "CREATE TABLE %s SELECT DISTINCT * FROM %s" % ( tmpTable, table ) self.execute( sqlCmd ) self.dropTable( table ) self.renameTable(tmpTable, table) ## Get a list of table names from a pattern # # @note for instance pattern = 'MyProject_%' # def getTableListFromPattern( self, pattern ): if pattern == "*" or pattern == "%": sqlCmd = "SHOW TABLES" else: sqlCmd = "SHOW TABLES like '%s'" % ( pattern ) lTables = self.getStringListWithSQLCmd( sqlCmd ) return lTables