Mercurial > repos > yufei-luo > s_mart
view commons/tools/AnnotationStats.py @ 19:9bcfa7936eec
Deleted selected files
author | m-zytnicki |
---|---|
date | Mon, 29 Apr 2013 03:23:29 -0400 |
parents | 94ab73e8a190 |
children |
line wrap: on
line source
#!/usr/bin/env python # Copyright INRA (Institut National de la Recherche Agronomique) # http://www.inra.fr # http://urgi.versailles.inra.fr # # This software is governed by the CeCILL license under French law and # abiding by the rules of distribution of free software. You can use, # modify and/ or redistribute the software under the terms of the CeCILL # license as circulated by CEA, CNRS and INRIA at the following URL # "http://www.cecill.info". # # As a counterpart to the access to the source code and rights to copy, # modify and redistribute granted by the license, users are provided only # with a limited warranty and the software's author, the holder of the # economic rights, and the successive licensors have only limited # liability. # # In this respect, the user's attention is drawn to the risks associated # with loading, using, modifying and/or developing or reproducing the # software by the user in light of its specific status of free software, # that may mean that it is complicated to manipulate, and that also # therefore means that it is reserved for developers and experienced # professionals having in-depth computer knowledge. Users are therefore # encouraged to load and test the software's suitability as regards their # requirements in conditions enabling the security of their systems and/or # data to be ensured and, more generally, to use and operate it in the # same conditions as regards security. # # The fact that you are presently reading this means that you have had # knowledge of the CeCILL license and that you accept its terms. ##@file # Give summary information on a TE annotation table. # options: # -h: this help # -t: analysis type (default = 1, 1: per transposable element (TE), 2: per cluster, 3: per classification, 4: with map input file) # -p: name of the table (_path) or file (.path) with the annotated TE copies # -s: name of the table (_seq) or file (.fasta or .fa) with the TE reference sequences # -g: length of the genome (in bp) # -m: name of the file with the group and the corresponding TE names (format = 'map') # -o: name of the output file (default = pathTableName + '_stats.txt') # -C: name of the configuration file to access MySQL (e.g. 'TEannot.cfg') # -c: remove map files and blastclust file (if analysis type is 2 or 3) # -I: identity coverage threshold (default = 0) # -L: length coverage threshold (default=0.8) # -v: verbosity level (default = 0) from commons.core.LoggerFactory import LoggerFactory from commons.core.stat.Stat import Stat from commons.core.sql.DbFactory import DbFactory from commons.core.sql.TablePathAdaptator import TablePathAdaptator from commons.core.sql.TableSeqAdaptator import TableSeqAdaptator from commons.tools.getCumulLengthFromTEannot import getCumulLengthFromTEannot LOG_DEPTH = "repet.tools" #TODO: use templating engine instead of raw strings for AnnotationStatsWriter class AnnotationStats( object ): def __init__(self, analysisName="TE", clusterFileName="",seqTableName="", pathTableName="", genomeLength=0, statsFileName="", globalStatsFileName="", verbosity=3): self._analysisName = analysisName self._clusterFileName = clusterFileName self._seqTableName = seqTableName self._pathTableName = pathTableName self._genomeLength = genomeLength self._statsFileName = statsFileName self._globalStatsFileName = globalStatsFileName self._iDb = None self._iTablePathAdaptator = None self._iTableSeqAdaptator = None self._save = False self._clean = False self._verbosity = verbosity self._log = LoggerFactory.createLogger("%s.%s" % (LOG_DEPTH, self.__class__.__name__), self._verbosity) def _logAndRaise(self, errorMsg): self._log.error(errorMsg) raise Exception(errorMsg) def setCoverageThreshold( self, lengthThresh ): self._coverageThreshold = float(lengthThresh) def setIdentityThreshold( self, identityThresh ): self._identityThreshold = int(identityThresh) def setAnalyseType(self, analyseType): self._analyseType = str(analyseType) def setPathTableName(self, pathTableName): self._pathTableName = pathTableName def setDBInstance(self, iDb): self._iDb = iDb def setTablePathAdaptator(self, iTablePathAdaptator): self._iTablePathAdaptator = iTablePathAdaptator def setTableSeqAdaptator(self, iTableSeqAdaptator): self._iTableSeqAdaptator = iTableSeqAdaptator ## Get the coverage of TE copies for a given family (using 'mapOp') # # @param consensus string name of a TE family ('subject_name' in the 'path' table) # @return cumulCoverage integer cumulative coverage # def getCumulCoverage( self, consensus = "" ): gclft = getCumulLengthFromTEannot() gclft.setInputTable( self._pathTableName ) gclft.setTErefseq( consensus ) gclft.setClean() gclft._db = self._iDb gclft._tpA = self._iTablePathAdaptator mapFileName = gclft.getAllSubjectsAsMapOfQueries() mergeFileName = gclft.mergeRanges( mapFileName ) cumulCoverage = gclft.getCumulLength( mergeFileName ) #self._iTablePathAdaptator.getCumulPathLength_from_subject( consensus ) return cumulCoverage ## Get the number of full-lengths (95% <= L =< 105%) # # @param consensusLength integer # @param lLengths list of integers # @return fullLengthConsensusNb integer # def getNbFullLengths( self, consensusLength, lLengths ): fullLengthConsensusNb = 0 for i in lLengths: if i / float(consensusLength ) >= 0.95 and i / float(consensusLength ) <= 1.05: fullLengthConsensusNb += 1 return fullLengthConsensusNb def getStatPerTE(self, consensusName): dConsensusStats = {} lLengthPerFragment = self._iTablePathAdaptator.getPathLengthListFromSubject(consensusName) lLengthPerCopy = self._iTablePathAdaptator.getChainLengthListFromSubject(consensusName) lIdentityPerCopy = self._iTablePathAdaptator.getChainIdentityListFromSubject(consensusName) dConsensusStats["length"] = self._iTableSeqAdaptator.getSeqLengthFromAccession(consensusName) dConsensusStats["cumulCoverage"] = self.getCumulCoverage(consensusName) dConsensusStats["nbFragments"] = len(lLengthPerFragment) dConsensusStats["nbFullLengthFragments"] = self.getNbFullLengths(dConsensusStats["length"], lLengthPerFragment) dConsensusStats["nbCopies"] = len(lLengthPerCopy) dConsensusStats["nbFullLengthCopies"] = self.getNbFullLengths(dConsensusStats["length"], lLengthPerCopy) dConsensusStats["statsIdentityPerChain"] = Stat() dConsensusStats["statsLengthPerChain"] = Stat() dConsensusStats["statsLengthPerChainPerc"] = Stat() self._statsForIdentityAndLength(dConsensusStats, lLengthPerCopy, lIdentityPerCopy) return dConsensusStats def getStatPerCluster(self, lConsensusNames): dConsensusClusterStats = {} lLengthPerFragment = [] lLengthPerCopy = [] cumulCoverageLength = 0 for consensusName in lConsensusNames: cumulCoverageLength += self.getCumulCoverage(consensusName) lLengthPerFragment.extend(self._iTablePathAdaptator.getPathLengthListFromSubject(consensusName)) lLengthPerCopy.extend(self._iTablePathAdaptator.getChainLengthListFromSubject(consensusName)) dConsensusClusterStats["cumulCoverage"] = cumulCoverageLength dConsensusClusterStats["nbFragments"] = len(lLengthPerFragment) dConsensusClusterStats["nbCopies"] = len(lLengthPerCopy) return dConsensusClusterStats def getClusterListFromFile(self): lClusters = [] with open(self._clusterFileName) as fCluster: for line in fCluster: lConsensusNames = line.rstrip().split("\t") lClusters.append(lConsensusNames) return lClusters def run(self): LoggerFactory.setLevel(self._log, self._verbosity) self._iDb = DbFactory.createInstance() self._iTablePathAdaptator = TablePathAdaptator(self._iDb, self._pathTableName) self._iTableSeqAdaptator = TableSeqAdaptator(self._iDb, self._seqTableName) iASW = AnnotationStatsWriter() if self._analysisName == "TE": with open(self._statsFileName, "w") as fStats: string = "%s\tlength\tcovg\tfrags\tfullLgthFrags\tcopies\tfullLgthCopies\tmeanId\tmeanLgth\tmeanLgthPerc\n" % self._analysisName fStats.write(string) lNamesTErefseq = self._iTableSeqAdaptator.getAccessionsList() lDistinctSubjects = self._iTablePathAdaptator.getSubjectList() totalCumulCoverage = self.getCumulCoverage() with open(self._globalStatsFileName, "w") as fG: fG.write("%s\n" % iASW.printResume(lNamesTErefseq, lDistinctSubjects, totalCumulCoverage, self._genomeLength)) for consensusName in lNamesTErefseq: self._log.debug("processing '%s'..." % consensusName) dStatForOneConsensus = self.getStatPerTE(consensusName) iASW.addCalculsOfOneTE(dStatForOneConsensus) fStats.write("%s\n" % iASW.getStatAsString(consensusName, dStatForOneConsensus)) fG.write(iASW.printStatsForAllTEs(len(lNamesTErefseq))) elif self._analysisName == "Cluster": lClusters = self.getClusterListFromFile() lClusters.sort(key=lambda k: len(k), reverse=True) with open(self._statsFileName, "w") as fStats: string = "%s\tcovg\tfrags\tcopies\n" % self._analysisName #TODO: add fullLgthFrags and fullLgthCopies ? Is addition of previous results significant ? fStats.write(string) for index, lConsensus in enumerate(lClusters): self._log.debug("processing '%s'..." % lConsensus) dStatForOneCluster = self.getStatPerCluster(lConsensus) fStats.write("%s\n" % iASW.getStatAsStringForCluster(str(index + 1), dStatForOneCluster)) if self._save: outTableName = "%s_statsPer%s" % (self._pathTableName, self._analysisName) self._iDb.createTable(outTableName, "pathstat", self._statsFileName) self._iDb.close() self._log.info("END %s" % type(self).__name__) def _statsForIdentityAndLength(self, dStat, lLengthPerCopy, lIdentityPerCopy): for i in lIdentityPerCopy: dStat["statsIdentityPerChain"].add(i) lLengthPercPerCopy = [] for i in lLengthPerCopy: dStat["statsLengthPerChain"].add(i) lperc = 100 * i / float(dStat["length"]) lLengthPercPerCopy.append(lperc) dStat["statsLengthPerChainPerc"].add(lperc) class AnnotationStatsWriter(object): def __init__(self): self._dAllTErefseqs = { "sumCumulCoverage": 0, "totalNbFragments": 0, "totalNbFullLengthFragments": 0, "totalNbCopies": 0, "totalNbFullLengthCopies": 0, "nbFamWithFullLengthFragments": 0, "nbFamWithOneFullLengthFragment": 0, "nbFamWithTwoFullLengthFragments": 0, "nbFamWithThreeFullLengthFragments": 0, "nbFamWithMoreThanThreeFullLengthFragments": 0, "nbFamWithFullLengthCopies": 0, "nbFamWithOneFullLengthCopy": 0, "nbFamWithTwoFullLengthCopies": 0, "nbFamWithThreeFullLengthCopies": 0, "nbFamWithMoreThanThreeFullLengthCopies": 0, "statsAllCopiesMedIdentity": Stat(), "statsAllCopiesMedLengthPerc": Stat() } def getAllTEsRefSeqDict(self): return self._dAllTErefseqs def getStatAsString( self, name, d ): """ Return a string with all data properly formatted. """ string = "" string += "%s" % name string += "\t%i" % d["length"] string += "\t%i" % d["cumulCoverage"] string += "\t%i" % d["nbFragments"] string += "\t%i" % d["nbFullLengthFragments"] string += "\t%i" % d["nbCopies"] string += "\t%i" % d["nbFullLengthCopies"] if d["statsIdentityPerChain"].getValuesNumber() != 0: string += "\t%.2f" % d["statsIdentityPerChain"].mean() else: string += "\tNA" if d["statsLengthPerChain"].getValuesNumber() != 0: string += "\t%.2f" % d["statsLengthPerChain"].mean() else: string += "\tNA" if d["statsLengthPerChainPerc"].getValuesNumber() != 0: string += "\t%.2f" % d["statsLengthPerChainPerc"].mean() else: string += "\tNA" return string def getStatAsStringForCluster( self, name, d ): """ Return a string with all data properly formatted. """ string = "" string += "%s" % name string += "\t%i" % d["cumulCoverage"] string += "\t%i" % d["nbFragments"] string += "\t%i" % d["nbCopies"] return string def addCalculsOfOneTE(self, dOneTErefseq): self._dAllTErefseqs[ "sumCumulCoverage" ] += dOneTErefseq[ "cumulCoverage" ] self._dAllTErefseqs[ "totalNbFragments" ] += dOneTErefseq[ "nbFragments" ] self._dAllTErefseqs[ "totalNbFullLengthFragments" ] += dOneTErefseq[ "nbFullLengthFragments" ] if dOneTErefseq[ "nbFullLengthFragments" ] > 0: self._dAllTErefseqs[ "nbFamWithFullLengthFragments" ] += 1 if dOneTErefseq[ "nbFullLengthFragments" ] == 1: self._dAllTErefseqs[ "nbFamWithOneFullLengthFragment" ] += 1 elif dOneTErefseq[ "nbFullLengthFragments" ] == 2: self._dAllTErefseqs[ "nbFamWithTwoFullLengthFragments" ] += 1 elif dOneTErefseq[ "nbFullLengthFragments" ] == 3: self._dAllTErefseqs[ "nbFamWithThreeFullLengthFragments" ] += 1 elif dOneTErefseq[ "nbFullLengthFragments" ] > 3: self._dAllTErefseqs[ "nbFamWithMoreThanThreeFullLengthFragments" ] += 1 self._dAllTErefseqs[ "totalNbCopies" ] += dOneTErefseq[ "nbCopies" ] self._dAllTErefseqs[ "totalNbFullLengthCopies" ] += dOneTErefseq[ "nbFullLengthCopies" ] if dOneTErefseq[ "nbFullLengthCopies" ] > 0: self._dAllTErefseqs[ "nbFamWithFullLengthCopies" ] += 1 if dOneTErefseq[ "nbFullLengthCopies" ] == 1: self._dAllTErefseqs[ "nbFamWithOneFullLengthCopy" ] += 1 elif dOneTErefseq[ "nbFullLengthCopies" ] == 2: self._dAllTErefseqs[ "nbFamWithTwoFullLengthCopies" ] += 1 elif dOneTErefseq[ "nbFullLengthCopies" ] == 3: self._dAllTErefseqs[ "nbFamWithThreeFullLengthCopies" ] += 1 elif dOneTErefseq[ "nbFullLengthCopies" ] > 3: self._dAllTErefseqs[ "nbFamWithMoreThanThreeFullLengthCopies" ] += 1 if dOneTErefseq[ "statsIdentityPerChain" ].getValuesNumber() != 0: self._dAllTErefseqs[ "statsAllCopiesMedIdentity" ].add( dOneTErefseq[ "statsIdentityPerChain" ].median() ) if dOneTErefseq[ "statsLengthPerChainPerc" ].getValuesNumber() != 0: self._dAllTErefseqs[ "statsAllCopiesMedLengthPerc" ].add( dOneTErefseq[ "statsLengthPerChainPerc" ].median() ) def printStatsForAllTEs(self, TEnb): # statString += "(sum of cumulative coverages: %i bp)" % ( self._dAllTErefseqs[ "sumCumulCoverage" ] ) statString = "total nb of TE fragments: %i\n" % ( self._dAllTErefseqs[ "totalNbFragments" ] ) if self._dAllTErefseqs[ "totalNbFragments" ] != 0: statString += "total nb full-length fragments: %i (%.2f%%)\n" % \ ( self._dAllTErefseqs[ "totalNbFullLengthFragments" ], \ 100*self._dAllTErefseqs[ "totalNbFullLengthFragments" ] / float(self._dAllTErefseqs[ "totalNbFragments" ]) ) statString += "total nb of TE copies: %i\n" % ( self._dAllTErefseqs[ "totalNbCopies" ] ) statString += "total nb full-length copies: %i (%.2f%%)\n" % \ ( self._dAllTErefseqs[ "totalNbFullLengthCopies" ], \ 100*self._dAllTErefseqs[ "totalNbFullLengthCopies" ] / float(self._dAllTErefseqs[ "totalNbCopies" ]) ) statString += "families with full-length fragments: %i (%.2f%%)\n" % \ ( self._dAllTErefseqs[ "nbFamWithFullLengthFragments" ], \ 100*self._dAllTErefseqs[ "nbFamWithFullLengthFragments" ] / float(TEnb) ) statString += " with only one full-length fragment: %i\n" % ( self._dAllTErefseqs[ "nbFamWithOneFullLengthFragment" ] ) statString += " with only two full-length fragments: %i\n" % ( self._dAllTErefseqs[ "nbFamWithTwoFullLengthFragments" ] ) statString += " with only three full-length fragments: %i\n" % ( self._dAllTErefseqs[ "nbFamWithThreeFullLengthFragments" ] ) statString += " with more than three full-length fragments: %i\n" % ( self._dAllTErefseqs[ "nbFamWithMoreThanThreeFullLengthFragments" ] ) statString += "families with full-length copies: %i (%.2f%%)\n" % \ ( self._dAllTErefseqs[ "nbFamWithFullLengthCopies" ], \ 100*self._dAllTErefseqs[ "nbFamWithFullLengthCopies" ] / float(TEnb) ) statString += " with only one full-length copy: %i\n" % ( self._dAllTErefseqs[ "nbFamWithOneFullLengthCopy" ] ) statString += " with only two full-length copies: %i\n" % ( self._dAllTErefseqs[ "nbFamWithTwoFullLengthCopies" ] ) statString += " with only three full-length copies: %i\n" % ( self._dAllTErefseqs[ "nbFamWithThreeFullLengthCopies" ] ) statString += " with more than three full-length copies: %i\n" % ( self._dAllTErefseqs[ "nbFamWithMoreThanThreeFullLengthCopies" ] ) statString += "mean of median identity of all families: %.2f +- %.2f\n" % \ ( self._dAllTErefseqs[ "statsAllCopiesMedIdentity" ].mean(), \ self._dAllTErefseqs[ "statsAllCopiesMedIdentity" ].sd() ) statString += "mean of median length percentage of all families: %.2f +- %.2f\n" % \ ( self._dAllTErefseqs[ "statsAllCopiesMedLengthPerc" ].mean(), \ self._dAllTErefseqs[ "statsAllCopiesMedLengthPerc" ].sd() ) return statString def printResume(self, lNamesTErefseq, lDistinctSubjects, totalCumulCoverage, genomeLength): statString = "nb of sequences: %i\n" % len(lNamesTErefseq) statString += "nb of matched sequences: %i\n" % len(lDistinctSubjects) statString += "cumulative coverage: %i bp\n" % totalCumulCoverage statString += "coverage percentage: %.2f%%\n" % ( 100 * totalCumulCoverage / float(genomeLength) ) # statString += "processing the %i TE families..." % len(lNamesTErefseq) return statString