Mercurial > repos > yufei-luo > s_mart
view commons/tools/AlignTEOnGenomeAccordingToAnnotation.py @ 31:0ab839023fe4
Uploaded
author | m-zytnicki |
---|---|
date | Tue, 30 Apr 2013 14:33:21 -0400 |
parents | 94ab73e8a190 |
children |
line wrap: on
line source
#!/usr/bin/env python # Copyright INRA (Institut National de la Recherche Agronomique) # http://www.inra.fr # http://urgi.versailles.inra.fr # # This software is governed by the CeCILL license under French law and # abiding by the rules of distribution of free software. You can use, # modify and/ or redistribute the software under the terms of the CeCILL # license as circulated by CEA, CNRS and INRIA at the following URL # "http://www.cecill.info". # # As a counterpart to the access to the source code and rights to copy, # modify and redistribute granted by the license, users are provided only # with a limited warranty and the software's author, the holder of the # economic rights, and the successive licensors have only limited # liability. # # In this respect, the user's attention is drawn to the risks associated # with loading, using, modifying and/or developing or reproducing the # software by the user in light of its specific status of free software, # that may mean that it is complicated to manipulate, and that also # therefore means that it is reserved for developers and experienced # professionals having in-depth computer knowledge. Users are therefore # encouraged to load and test the software's suitability as regards their # requirements in conditions enabling the security of their systems and/or # data to be ensured and, more generally, to use and operate it in the # same conditions as regards security. # # The fact that you are presently reading this means that you have had # knowledge of the CeCILL license and that you accept its terms. import re import os from commons.core.LoggerFactory import LoggerFactory from commons.core.utils.RepetOptionParser import RepetOptionParser from commons.core.checker.ConfigChecker import ConfigRules, ConfigChecker import subprocess from commons.core.seq.AlignedBioseqDB import AlignedBioseqDB from commons.core.sql.TableSeqAdaptator import TableSeqAdaptator from commons.core.coord.PathUtils import PathUtils from commons.core.coord.SetUtils import SetUtils from commons.core.sql.TablePathAdaptator import TablePathAdaptator from commons.core.coord.Set import Set from commons.core.sql.DbFactory import DbFactory ## Align a TE on genome according to annotation LOG_DEPTH = "repet.tools" class AlignTEOnGenomeAccordingToAnnotation(object): def __init__(self, pathTableName = "", queryTableName = "", subjectTableName = "", mergeSamePathId = False, outTableName = "", matchPenality=10, mism=8, gapo=16, gape=4, gapl=20, configFileName = "", doClean = False, verbosity = 0): self._pathTableName = pathTableName self._queryTableName = queryTableName self._subjectTableName = subjectTableName self._mergeSamePathId = mergeSamePathId self.setOutTableName(outTableName) self._matchPenality = matchPenality self._mismatch = mism self._gapOpening = gapo self._gapExtend = gape self._gapLength = gapl self._configFileName = configFileName self._doClean = doClean self._verbosity = verbosity self._iDb = None self._log = LoggerFactory.createLogger("%s.%s" % (LOG_DEPTH, self.__class__.__name__), self._verbosity) def setAttributesFromCmdLine(self): description = "Align a TE on genome according to annotation." epilog = "\nExample 1: launch without verbosity and keep temporary files.\n" epilog += "\t$ python AlignTEOnGenomeAccordingToAnnotation.py -p DmelChr4_chr_allTEs_nr_noSSR_join_path -q DmelChr4_chr_seq -s DmelChr4_refTEs_seq -v 0" epilog += "\n" parser = RepetOptionParser(description = description, epilog = epilog) parser.add_option("-p", "--path", dest = "pathTableName", action = "store", type = "string", help = "path table name [compulsory] [format: path]", default = "") parser.add_option("-q", "--query", dest = "queryTableName", action = "store", type = "string", help = "query table name [compulsory] [format: seq]", default = "") parser.add_option("-s", "--subject", dest = "subjectTableName", action = "store", type = "string", help = "subject table name [compulsory] [format: seq]", default = "") parser.add_option("-m", "--merge", dest = "mergeSamePathId", action = "store_true", help = "merge joined matchs [optional] [default: False]", default = False) parser.add_option("-o", "--out", dest = "outTableName", action = "store", type = "string", help = "output table name [default: <pathTableName>_align]", default = "") #TODO: add options for : matchPenality=10, mism=8, gapo=16, gape=4, gapl=20 parser.add_option("-C", "--config", dest = "configFileName", action = "store", type = "string", help = "configuration file name (e.g. TEannot.cfg)", default = "") #NOTE: doClean unused # parser.add_option("-c", "--clean", dest = "doClean", action = "store_true", help = "clean temporary files [optional] [default: False]", default = False) parser.add_option("-v", "--verbosity", dest = "verbosity", action = "store", type = "int", help = "verbosity [optional] [default: 1]", default = 1) options = parser.parse_args()[0] self._setAttributesFromOptions(options) def _setAttributesFromOptions(self, options): self.setPathTableName(options.pathTableName) self.setQueryTableName(options.queryTableName) self.setSubjectTableName(options.subjectTableName) self.setMergeSamePathId(options.mergeSamePathId) self.setOutTableName(options.outTableName) self.setConfigFileName(options.configFileName) # self.setDoClean(options.doClean) self.setVerbosity(options.verbosity) def _checkConfig(self): iConfigRules = ConfigRules() iConfigRules.addRuleSection(section="", mandatory=True) iConfigRules.addRuleOption(section="", option ="fasta_name", mandatory=True, type="string") iConfigRules.addRuleOption(section="", option ="clean", mandatory=True, type="bool") iConfigChecker = ConfigChecker(self._configFileName, iConfigRules) iConfig = iConfigChecker.getConfig() self._setAttributesFromConfig(iConfig) def _setAttributesFromConfig(self, iConfig): self.setOutTableName(self._outTableName) self.setDoClean(iConfig.get("", "clean")) def setPathTableName(self, pathTableName): self._pathTableName = pathTableName def setQueryTableName(self, queryTableName): self._queryTableName = queryTableName def setSubjectTableName(self, subjectTableName): self._subjectTableName = subjectTableName def setMergeSamePathId(self, mergeSamePathId): self._mergeSamePathId = mergeSamePathId def setOutTableName(self, outTableName): if outTableName == "": self._outTableName = "%s_align" % self._pathTableName else: self._outTableName = outTableName def setConfigFileName(self, configFileName): self._configFileName = configFileName def setDoClean(self, doClean): self._doClean = doClean def setVerbosity(self, verbosity): self._verbosity = verbosity def setDbInstance(self, iDb): self._iDb = iDb def _checkOptions(self): if self._pathTableName == "" or not self._iDb.doesTableExist(self._pathTableName): self._logAndRaise("ERROR: Missing path table") if self._queryTableName == "" or not self._iDb.doesTableExist(self._queryTableName): self._logAndRaise("ERROR: Missing query table") if self._subjectTableName == "" or not self._iDb.doesTableExist(self._subjectTableName): self._logAndRaise("ERROR: Missing subject table") def _logAndRaise(self, errorMsg): self._log.error(errorMsg) raise Exception(errorMsg) def alignBioseqWithNWalign(self, iBioseq1, iBioseq2): fastaFileName1 = "seqtoalign1.tmp" iBioseq1.save(fastaFileName1) fastaFileName2 = "seqtoalign2.tmp" iBioseq2.save(fastaFileName2) alignBioseqDBFileName = "aligned.tmp" cmd = "NWalign" cmd += " %s" % fastaFileName1 cmd += " %s" % fastaFileName2 cmd += " -m %s" % self._matchPenality cmd += " -d %s" % self._mismatch cmd += " -g %s" % self._gapOpening cmd += " -e %s" % self._gapExtend cmd += " -l %s" % self._gapLength cmd += " -D" cmd += " -o %s" % alignBioseqDBFileName process = subprocess.Popen(cmd, shell = True) self._log.debug("Running : %s" % cmd) process.communicate() if process.returncode != 0: self._logAndRaise("ERROR when launching '%s'" % cmd) iAlignedBioseqDB = AlignedBioseqDB() iAlignedBioseqDB.load(alignBioseqDBFileName) os.remove(fastaFileName1) os.remove(fastaFileName2) os.remove(alignBioseqDBFileName) return iAlignedBioseqDB def alignSeqAccordingToPathAndBuildAlignedSeqTable(self): if self._iDb.doesTableExist(self._outTableName): self._logAndRaise("ERROR: out table %s already exists" % self._outTableName) #TODO: create alignedSeq table in DbMySql... sqlCmd="CREATE TABLE %s (path int unsigned, query_aligned_seq longtext, subject_aligned_seq longtext, score int unsigned, identity float unsigned)" % self._outTableName self._iDb.execute(sqlCmd) iQueryTSA = TableSeqAdaptator(self._iDb, self._queryTableName) iSubjectTSA = TableSeqAdaptator(self._iDb, self._subjectTableName) iTPA = TablePathAdaptator(self._iDb, self._pathTableName) if self._mergeSamePathId: lPathId = iTPA.getIdList() pathNb = len(lPathId) count = 0 for pathNum in lPathId: self._log.debug(count,"/",pathNb,"=>path",pathNum,"...") lPaths = iTPA.getPathListFromId(pathNum) #TODO: getSetListFromQueries() call getSubjectAsSetOfQuery() => "reverse complement" the query (coordinates are inversed, so getBioseq() will take the reverse-comp.) : is it correct ? lQuerySets = PathUtils.getSetListFromQueries(lPaths) #NOTE: merge sets if overlap lQueryMergedSets = SetUtils.mergeSetsInList(lQuerySets) #TODO: getBioseqFromSetList() build a sequence that does not exist : is it correct ? iQueryBioseq = iQueryTSA.getBioseqFromSetList(lQueryMergedSets) lSubjectSets = PathUtils.getSetListFromSubjects(lPaths) #TODO: no merge for subjects : is it correct ? matcher allow overlap on query and not on subject ? iSubjectBioseq = iSubjectTSA.getBioseqFromSetList(lSubjectSets) iAlignedBioseqDB = self.alignBioseqWithNWalign(iQueryBioseq, iSubjectBioseq) self._insertAlignedBioseqDBWithScoreAndIdentityInTable(pathNum, iAlignedBioseqDB) else: lPathId = iTPA.getIdList() pathNb = len(lPathId) count = 0 for pathNum in lPathId: self._log.debug(count,"/",pathNb,"=>path",pathNum,"...") lPaths = iTPA.getPathListFromId(pathNum) queryName = lPaths[0].getQueryName() subjectName = lPaths[0].getSubjectName() lQueryStart = [] lQueryEnd = [] lSubjectStart = [] lSubjectEnd = [] isReversed = not lPaths[0].isSubjectOnDirectStrand() for iPath in lPaths: lQueryStart.append(iPath.getQueryStart()) lQueryEnd.append(iPath.getQueryEnd()) lSubjectStart.append(iPath.getSubjectStart()) lSubjectEnd.append(iPath.getSubjectEnd()) queryStart = min(lQueryStart) queryEnd = max(lQueryEnd) if isReversed: subjectStart = max(lSubjectStart) subjectEnd = min(lSubjectEnd) else: subjectStart = min(lSubjectStart) subjectEnd = max(lSubjectEnd) lQuerySets = [Set(pathNum,subjectName, queryName,queryStart,queryEnd)] lSubjectSets = [Set(pathNum,queryName, subjectName,subjectStart,subjectEnd)] iQueryBioseq = iQueryTSA.getBioseqFromSetList(lQuerySets) iSubjectBioseq = iSubjectTSA.getBioseqFromSetList(lSubjectSets) iAlignedBioseqDB = self.alignBioseqWithNWalign(iQueryBioseq, iSubjectBioseq) self._insertAlignedBioseqDBWithScoreAndIdentityInTable(pathNum, iAlignedBioseqDB) def run(self): LoggerFactory.setLevel(self._log, self._verbosity) if self._configFileName: self._checkConfig() self._iDb = DbFactory.createInstance() self._checkOptions() self._log.info("START AlignTEOnGenomeAccordingToAnnotation") self._log.debug("path table name: %s" % self._pathTableName) self._log.debug("query table name: %s" % self._queryTableName) self._log.debug("subject table name: %s" % self._subjectTableName) self.alignSeqAccordingToPathAndBuildAlignedSeqTable() self._iDb.close() self._log.info("END AlignTEOnGenomeAccordingToAnnotation") def _insertAlignedBioseqDBWithScoreAndIdentityInTable(self, pathNum, iAlignedBioseqDB): scoreWithEndLine = re.split("Score=", iAlignedBioseqDB.db[0].header)[1] score = int(scoreWithEndLine.split()[0]) identity = re.split("Identity=", scoreWithEndLine)[1] if identity == "nan": identity = "0.0" identity = float(identity)*100.0 #TODO: create TableAlignedSeqAdaptator (to use insert...) sqlCmd = 'INSERT INTO %s VALUES (%d,"%s","%s", %d,%f);' % (self._outTableName, pathNum, iAlignedBioseqDB.db[0].sequence, iAlignedBioseqDB.db[1].sequence, score, identity) self._iDb.execute(sqlCmd) self._log.debug("header:", iAlignedBioseqDB.db[0].header) self._log.debug("path", pathNum, "Score=", score, "Identity=", identity, "ok") self._log.debug(iAlignedBioseqDB.db[0].sequence[:80]) self._log.debug(iAlignedBioseqDB.db[1].sequence[:80]) if __name__ == "__main__": iLaunch = AlignTEOnGenomeAccordingToAnnotation() iLaunch.setAttributesFromCmdLine() iLaunch.run()