view commons/tools/AnnotationStats.py @ 18:94ab73e8a190

Uploaded
author m-zytnicki
date Mon, 29 Apr 2013 03:20:15 -0400
parents
children
line wrap: on
line source

#!/usr/bin/env python

# Copyright INRA (Institut National de la Recherche Agronomique)
# http://www.inra.fr
# http://urgi.versailles.inra.fr
#
# This software is governed by the CeCILL license under French law and
# abiding by the rules of distribution of free software.  You can  use, 
# modify and/ or redistribute the software under the terms of the CeCILL
# license as circulated by CEA, CNRS and INRIA at the following URL
# "http://www.cecill.info". 
#
# As a counterpart to the access to the source code and  rights to copy,
# modify and redistribute granted by the license, users are provided only
# with a limited warranty  and the software's author,  the holder of the
# economic rights,  and the successive licensors  have only  limited
# liability. 
#
# In this respect, the user's attention is drawn to the risks associated
# with loading,  using,  modifying and/or developing or reproducing the
# software by the user in light of its specific status of free software,
# that may mean  that it is complicated to manipulate,  and  that  also
# therefore means  that it is reserved for developers  and  experienced
# professionals having in-depth computer knowledge. Users are therefore
# encouraged to load and test the software's suitability as regards their
# requirements in conditions enabling the security of their systems and/or 
# data to be ensured and,  more generally, to use and operate it in the 
# same conditions as regards security. 
#
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license and that you accept its terms.

##@file
# Give summary information on a TE annotation table.
# options:
#     -h: this help
#     -t: analysis type (default = 1, 1: per transposable element (TE), 2: per cluster, 3: per classification, 4: with map input file)
#     -p: name of the table (_path) or file (.path) with the annotated TE copies
#     -s: name of the table (_seq) or file (.fasta or .fa) with the TE reference sequences
#     -g: length of the genome (in bp)
#     -m: name of the file with the group and the corresponding TE names (format = 'map')
#     -o: name of the output file (default = pathTableName + '_stats.txt')
#     -C: name of the configuration file to access MySQL (e.g. 'TEannot.cfg')
#     -c: remove map files and blastclust file (if analysis type is 2 or 3)
#     -I: identity coverage threshold (default = 0)
#     -L: length coverage threshold (default=0.8)
#     -v: verbosity level (default = 0)

from commons.core.LoggerFactory import LoggerFactory
from commons.core.stat.Stat import Stat
from commons.core.sql.DbFactory import DbFactory
from commons.core.sql.TablePathAdaptator import TablePathAdaptator
from commons.core.sql.TableSeqAdaptator import TableSeqAdaptator
from commons.tools.getCumulLengthFromTEannot import getCumulLengthFromTEannot

LOG_DEPTH = "repet.tools"

#TODO: use templating engine instead of raw strings for AnnotationStatsWriter
class AnnotationStats( object ):

    def __init__(self, analysisName="TE", clusterFileName="",seqTableName="", pathTableName="", genomeLength=0, statsFileName="", globalStatsFileName="", verbosity=3):
        self._analysisName = analysisName
        self._clusterFileName = clusterFileName
        self._seqTableName = seqTableName
        self._pathTableName = pathTableName
        self._genomeLength = genomeLength
        self._statsFileName = statsFileName
        self._globalStatsFileName = globalStatsFileName
        self._iDb = None
        self._iTablePathAdaptator = None
        self._iTableSeqAdaptator = None
        self._save = False
        self._clean = False
        self._verbosity = verbosity
        self._log = LoggerFactory.createLogger("%s.%s" % (LOG_DEPTH, self.__class__.__name__), self._verbosity)
        
    def _logAndRaise(self, errorMsg):
        self._log.error(errorMsg)
        raise Exception(errorMsg)
        
    def setCoverageThreshold( self, lengthThresh ):
        self._coverageThreshold = float(lengthThresh)
                
    def setIdentityThreshold( self, identityThresh ):
        self._identityThreshold = int(identityThresh) 
            
    def setAnalyseType(self, analyseType):
        self._analyseType = str(analyseType)
            
    def setPathTableName(self, pathTableName):
        self._pathTableName = pathTableName
        
    def setDBInstance(self, iDb):
        self._iDb = iDb
        
    def setTablePathAdaptator(self, iTablePathAdaptator):
        self._iTablePathAdaptator = iTablePathAdaptator
        
    def setTableSeqAdaptator(self, iTableSeqAdaptator):
        self._iTableSeqAdaptator = iTableSeqAdaptator
    
    ## Get the coverage of TE copies for a given family (using 'mapOp')
    #
    # @param consensus string name of a TE family ('subject_name' in the 'path' table)
    # @return cumulCoverage integer cumulative coverage
    #
    def getCumulCoverage( self, consensus = "" ):
        gclft = getCumulLengthFromTEannot()
        gclft.setInputTable( self._pathTableName )
        gclft.setTErefseq( consensus )
        gclft.setClean()
        gclft._db = self._iDb
        gclft._tpA = self._iTablePathAdaptator
        mapFileName = gclft.getAllSubjectsAsMapOfQueries()
        mergeFileName = gclft.mergeRanges( mapFileName )
        cumulCoverage = gclft.getCumulLength( mergeFileName ) #self._iTablePathAdaptator.getCumulPathLength_from_subject( consensus )
        return cumulCoverage
    
    ## Get the number of full-lengths (95% <= L =< 105%)
    #
    # @param consensusLength integer
    # @param lLengths list of integers
    # @return fullLengthConsensusNb integer
    #
    def getNbFullLengths( self, consensusLength, lLengths ):
        fullLengthConsensusNb = 0
        for i in lLengths:
            if i / float(consensusLength ) >= 0.95 and i / float(consensusLength ) <= 1.05:
                fullLengthConsensusNb += 1
        return fullLengthConsensusNb
    
    def getStatPerTE(self, consensusName):
        dConsensusStats = {}
        lLengthPerFragment = self._iTablePathAdaptator.getPathLengthListFromSubject(consensusName)
        lLengthPerCopy = self._iTablePathAdaptator.getChainLengthListFromSubject(consensusName)
        lIdentityPerCopy = self._iTablePathAdaptator.getChainIdentityListFromSubject(consensusName)
        dConsensusStats["length"] = self._iTableSeqAdaptator.getSeqLengthFromAccession(consensusName)
        dConsensusStats["cumulCoverage"] = self.getCumulCoverage(consensusName)
        dConsensusStats["nbFragments"] = len(lLengthPerFragment)
        dConsensusStats["nbFullLengthFragments"] = self.getNbFullLengths(dConsensusStats["length"], lLengthPerFragment)
        dConsensusStats["nbCopies"] = len(lLengthPerCopy)
        dConsensusStats["nbFullLengthCopies"] = self.getNbFullLengths(dConsensusStats["length"], lLengthPerCopy)
        dConsensusStats["statsIdentityPerChain"] = Stat()
        dConsensusStats["statsLengthPerChain"] = Stat()
        dConsensusStats["statsLengthPerChainPerc"] = Stat()
        self._statsForIdentityAndLength(dConsensusStats, lLengthPerCopy, lIdentityPerCopy)
        return dConsensusStats
    
    def getStatPerCluster(self, lConsensusNames):
        dConsensusClusterStats = {}
        lLengthPerFragment = []
        lLengthPerCopy = []
        cumulCoverageLength = 0
        for consensusName in lConsensusNames:
            cumulCoverageLength += self.getCumulCoverage(consensusName)
            lLengthPerFragment.extend(self._iTablePathAdaptator.getPathLengthListFromSubject(consensusName))
            lLengthPerCopy.extend(self._iTablePathAdaptator.getChainLengthListFromSubject(consensusName))
        dConsensusClusterStats["cumulCoverage"] = cumulCoverageLength
        dConsensusClusterStats["nbFragments"] = len(lLengthPerFragment)
        dConsensusClusterStats["nbCopies"] = len(lLengthPerCopy)
        return dConsensusClusterStats
        
    def getClusterListFromFile(self):
        lClusters = []
        with open(self._clusterFileName) as fCluster:
            for line in fCluster:
                lConsensusNames = line.rstrip().split("\t")
                lClusters.append(lConsensusNames)
        return lClusters
        
    def run(self):
        LoggerFactory.setLevel(self._log, self._verbosity)
        self._iDb = DbFactory.createInstance()
        self._iTablePathAdaptator = TablePathAdaptator(self._iDb, self._pathTableName)
        self._iTableSeqAdaptator = TableSeqAdaptator(self._iDb, self._seqTableName)
        
        iASW = AnnotationStatsWriter()
        if self._analysisName == "TE":
            with open(self._statsFileName, "w") as fStats:
                string = "%s\tlength\tcovg\tfrags\tfullLgthFrags\tcopies\tfullLgthCopies\tmeanId\tmeanLgth\tmeanLgthPerc\n" % self._analysisName
                fStats.write(string)
                
                lNamesTErefseq = self._iTableSeqAdaptator.getAccessionsList()
                lDistinctSubjects = self._iTablePathAdaptator.getSubjectList()
                totalCumulCoverage = self.getCumulCoverage()
                
                with open(self._globalStatsFileName, "w") as fG:
                    fG.write("%s\n" % iASW.printResume(lNamesTErefseq, lDistinctSubjects, totalCumulCoverage, self._genomeLength))
                    for consensusName in lNamesTErefseq:
                        self._log.debug("processing '%s'..." % consensusName)
                        dStatForOneConsensus = self.getStatPerTE(consensusName)
                        iASW.addCalculsOfOneTE(dStatForOneConsensus)
                        fStats.write("%s\n" % iASW.getStatAsString(consensusName, dStatForOneConsensus))
                    fG.write(iASW.printStatsForAllTEs(len(lNamesTErefseq)))
                    
        elif self._analysisName == "Cluster":
            lClusters = self.getClusterListFromFile()
            lClusters.sort(key=lambda k: len(k), reverse=True)
            with open(self._statsFileName, "w") as fStats:
                string = "%s\tcovg\tfrags\tcopies\n" % self._analysisName
                #TODO: add fullLgthFrags and fullLgthCopies ? Is addition of previous results significant ?
                fStats.write(string)
                for index, lConsensus in enumerate(lClusters):
                    self._log.debug("processing '%s'..." % lConsensus)
                    dStatForOneCluster = self.getStatPerCluster(lConsensus)
                    fStats.write("%s\n" % iASW.getStatAsStringForCluster(str(index + 1), dStatForOneCluster))
        
        if self._save:
            outTableName = "%s_statsPer%s" % (self._pathTableName, self._analysisName)
            self._iDb.createTable(outTableName, "pathstat", self._statsFileName)
            
        self._iDb.close()
        self._log.info("END %s" % type(self).__name__)

    def _statsForIdentityAndLength(self, dStat, lLengthPerCopy, lIdentityPerCopy):
        for i in lIdentityPerCopy:
            dStat["statsIdentityPerChain"].add(i)
        lLengthPercPerCopy = []
        for i in lLengthPerCopy:
            dStat["statsLengthPerChain"].add(i)
            lperc = 100 * i / float(dStat["length"])
            lLengthPercPerCopy.append(lperc)
            dStat["statsLengthPerChainPerc"].add(lperc)

class AnnotationStatsWriter(object):

    def __init__(self):
        self._dAllTErefseqs = { "sumCumulCoverage": 0,
                         "totalNbFragments": 0,
                         "totalNbFullLengthFragments": 0,
                         "totalNbCopies": 0,
                         "totalNbFullLengthCopies": 0,
                         "nbFamWithFullLengthFragments": 0,
                         "nbFamWithOneFullLengthFragment": 0,
                         "nbFamWithTwoFullLengthFragments": 0,
                         "nbFamWithThreeFullLengthFragments": 0,
                         "nbFamWithMoreThanThreeFullLengthFragments": 0,
                         "nbFamWithFullLengthCopies": 0,
                         "nbFamWithOneFullLengthCopy": 0,
                         "nbFamWithTwoFullLengthCopies": 0,
                         "nbFamWithThreeFullLengthCopies": 0,
                         "nbFamWithMoreThanThreeFullLengthCopies": 0,
                         "statsAllCopiesMedIdentity": Stat(),
                         "statsAllCopiesMedLengthPerc": Stat()
                         }
        
    def getAllTEsRefSeqDict(self):
        return self._dAllTErefseqs
        
    def getStatAsString( self, name, d ):
        """
        Return a string with all data properly formatted.
        """
        string = ""
        string += "%s" % name
        string += "\t%i" % d["length"]
        string += "\t%i" % d["cumulCoverage"]
        string += "\t%i" % d["nbFragments"]
        string += "\t%i" % d["nbFullLengthFragments"]
        string += "\t%i" % d["nbCopies"]
        string += "\t%i" % d["nbFullLengthCopies"]
        
        if d["statsIdentityPerChain"].getValuesNumber() != 0:
            string += "\t%.2f" % d["statsIdentityPerChain"].mean()
        else:
            string += "\tNA"
            
        if d["statsLengthPerChain"].getValuesNumber() != 0:
            string += "\t%.2f" % d["statsLengthPerChain"].mean()
        else:
            string += "\tNA"
                
        if d["statsLengthPerChainPerc"].getValuesNumber() != 0:
            string += "\t%.2f" % d["statsLengthPerChainPerc"].mean()
        else:
            string += "\tNA"
                
        return string
    
    def getStatAsStringForCluster( self, name, d ):
        """
        Return a string with all data properly formatted.
        """
        string = ""
        string += "%s" % name
        string += "\t%i" % d["cumulCoverage"]
        string += "\t%i" % d["nbFragments"]
        string += "\t%i" % d["nbCopies"]
            
        return string
    
    def addCalculsOfOneTE(self, dOneTErefseq):
        self._dAllTErefseqs[ "sumCumulCoverage" ] += dOneTErefseq[ "cumulCoverage" ]
        
        self._dAllTErefseqs[ "totalNbFragments" ] += dOneTErefseq[ "nbFragments" ]
        self._dAllTErefseqs[ "totalNbFullLengthFragments" ] += dOneTErefseq[ "nbFullLengthFragments" ]
        if dOneTErefseq[ "nbFullLengthFragments" ] > 0:
            self._dAllTErefseqs[ "nbFamWithFullLengthFragments" ] += 1
        if dOneTErefseq[ "nbFullLengthFragments" ] == 1:
            self._dAllTErefseqs[ "nbFamWithOneFullLengthFragment" ] += 1
        elif dOneTErefseq[ "nbFullLengthFragments" ] == 2:
            self._dAllTErefseqs[ "nbFamWithTwoFullLengthFragments" ] += 1
        elif dOneTErefseq[ "nbFullLengthFragments" ] == 3:
            self._dAllTErefseqs[ "nbFamWithThreeFullLengthFragments" ] += 1
        elif dOneTErefseq[ "nbFullLengthFragments" ] > 3:
            self._dAllTErefseqs[ "nbFamWithMoreThanThreeFullLengthFragments" ] += 1
        
        self._dAllTErefseqs[ "totalNbCopies" ] += dOneTErefseq[ "nbCopies" ]
        self._dAllTErefseqs[ "totalNbFullLengthCopies" ] += dOneTErefseq[ "nbFullLengthCopies" ]
        if dOneTErefseq[ "nbFullLengthCopies" ] > 0:
            self._dAllTErefseqs[ "nbFamWithFullLengthCopies" ] += 1
        if dOneTErefseq[ "nbFullLengthCopies" ] == 1:
            self._dAllTErefseqs[ "nbFamWithOneFullLengthCopy" ] += 1
        elif dOneTErefseq[ "nbFullLengthCopies" ] == 2:
            self._dAllTErefseqs[ "nbFamWithTwoFullLengthCopies" ] += 1
        elif dOneTErefseq[ "nbFullLengthCopies" ] == 3:
            self._dAllTErefseqs[ "nbFamWithThreeFullLengthCopies" ] += 1
        elif dOneTErefseq[ "nbFullLengthCopies" ] > 3:
            self._dAllTErefseqs[ "nbFamWithMoreThanThreeFullLengthCopies" ] += 1
        
        if dOneTErefseq[ "statsIdentityPerChain" ].getValuesNumber() != 0:
            self._dAllTErefseqs[ "statsAllCopiesMedIdentity" ].add( dOneTErefseq[ "statsIdentityPerChain" ].median() )
        
        if dOneTErefseq[ "statsLengthPerChainPerc" ].getValuesNumber() != 0:
            self._dAllTErefseqs[ "statsAllCopiesMedLengthPerc" ].add( dOneTErefseq[ "statsLengthPerChainPerc" ].median() )

    def printStatsForAllTEs(self, TEnb):
#        statString += "(sum of cumulative coverages: %i bp)" % ( self._dAllTErefseqs[ "sumCumulCoverage" ] )
        statString = "total nb of TE fragments: %i\n" % ( self._dAllTErefseqs[ "totalNbFragments" ] )
        
        if self._dAllTErefseqs[ "totalNbFragments" ] != 0:
            
            statString += "total nb full-length fragments: %i (%.2f%%)\n" % \
            ( self._dAllTErefseqs[ "totalNbFullLengthFragments" ], \
              100*self._dAllTErefseqs[ "totalNbFullLengthFragments" ] / float(self._dAllTErefseqs[ "totalNbFragments" ]) )
            
            statString += "total nb of TE copies: %i\n" % ( self._dAllTErefseqs[ "totalNbCopies" ] )
            
            statString += "total nb full-length copies: %i (%.2f%%)\n" % \
            ( self._dAllTErefseqs[ "totalNbFullLengthCopies" ], \
              100*self._dAllTErefseqs[ "totalNbFullLengthCopies" ] / float(self._dAllTErefseqs[ "totalNbCopies" ]) )
            
            statString += "families with full-length fragments: %i (%.2f%%)\n" % \
            ( self._dAllTErefseqs[ "nbFamWithFullLengthFragments" ], \
              100*self._dAllTErefseqs[ "nbFamWithFullLengthFragments" ] / float(TEnb) )
            statString += " with only one full-length fragment: %i\n" % ( self._dAllTErefseqs[ "nbFamWithOneFullLengthFragment" ] )
            statString += " with only two full-length fragments: %i\n" % ( self._dAllTErefseqs[ "nbFamWithTwoFullLengthFragments" ] )
            statString += " with only three full-length fragments: %i\n" % ( self._dAllTErefseqs[ "nbFamWithThreeFullLengthFragments" ] )
            statString += " with more than three full-length fragments: %i\n" % ( self._dAllTErefseqs[ "nbFamWithMoreThanThreeFullLengthFragments" ] )
            
            statString += "families with full-length copies: %i (%.2f%%)\n" % \
            ( self._dAllTErefseqs[ "nbFamWithFullLengthCopies" ], \
              100*self._dAllTErefseqs[ "nbFamWithFullLengthCopies" ] / float(TEnb) )
            statString += " with only one full-length copy: %i\n" % ( self._dAllTErefseqs[ "nbFamWithOneFullLengthCopy" ] )
            statString += " with only two full-length copies: %i\n" % ( self._dAllTErefseqs[ "nbFamWithTwoFullLengthCopies" ] )
            statString += " with only three full-length copies: %i\n" % ( self._dAllTErefseqs[ "nbFamWithThreeFullLengthCopies" ] )
            statString += " with more than three full-length copies: %i\n" % ( self._dAllTErefseqs[ "nbFamWithMoreThanThreeFullLengthCopies" ] )
            
            statString += "mean of median identity of all families: %.2f +- %.2f\n" % \
            ( self._dAllTErefseqs[ "statsAllCopiesMedIdentity" ].mean(), \
              self._dAllTErefseqs[ "statsAllCopiesMedIdentity" ].sd() )
            
            statString += "mean of median length percentage of all families: %.2f +- %.2f\n" % \
            ( self._dAllTErefseqs[ "statsAllCopiesMedLengthPerc" ].mean(), \
              self._dAllTErefseqs[ "statsAllCopiesMedLengthPerc" ].sd() )
        return statString
            
    def printResume(self, lNamesTErefseq, lDistinctSubjects, totalCumulCoverage, genomeLength):
        statString = "nb of sequences: %i\n" % len(lNamesTErefseq)
        statString += "nb of matched sequences: %i\n" % len(lDistinctSubjects)
        statString += "cumulative coverage: %i bp\n" % totalCumulCoverage
        statString += "coverage percentage: %.2f%%\n" % ( 100 * totalCumulCoverage / float(genomeLength) )
#            statString += "processing the %i TE families..." % len(lNamesTErefseq)
        return statString