view TEisotools-1.0/commons/core/utils/ClassifUtils.py @ 6:20ec0d14798e draft

Uploaded
author urgi-team
date Wed, 20 Jul 2016 05:00:24 -0400
parents
children
line wrap: on
line source

# Copyright INRA (Institut National de la Recherche Agronomique)
# http://www.inra.fr
# http://urgi.versailles.inra.fr
#
# This software is governed by the CeCILL license under French law and
# abiding by the rules of distribution of free software.  You can  use,
# modify and/ or redistribute the software under the terms of the CeCILL
# license as circulated by CEA, CNRS and INRIA at the following URL
# "http://www.cecill.info".
#
# As a counterpart to the access to the source code and  rights to copy,
# modify and redistribute granted by the license, users are provided only
# with a limited warranty  and the software's author,  the holder of the
# economic rights,  and the successive licensors  have only  limited
# liability.
#
# In this respect, the user's attention is drawn to the risks associated
# with loading,  using,  modifying and/or developing or reproducing the
# software by the user in light of its specific status of free software,
# that may mean  that it is complicated to manipulate,  and  that  also
# therefore means  that it is reserved for developers  and  experienced
# professionals having in-depth computer knowledge. Users are therefore
# encouraged to load and test the software's suitability as regards their
# requirements in conditions enabling the security of their systems and/or
# data to be ensured and,  more generally, to use and operate it in the
# same conditions as regards security.
#
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license and that you accept its terms.

import os
import json
from collections import OrderedDict
from commons.tools.RenameHeaderClassif import RenameHeaderClassif

class ClassifUtils(object):
    
    @staticmethod
    def _formatProfilesResultsAsDict(lProfilesResults):
        if len(lProfilesResults) == 0:
            return OrderedDict()

        dResults = OrderedDict()

        for refNameAndCoverage in lProfilesResults:
            refName, coverage = refNameAndCoverage.split(": ")

            coverage = coverage.split("%(")
            coverageOnSubject = float(coverage.pop(1).replace("%)", ""))
            coverage = float(coverage.pop(0))

            profilesResult = OrderedDict()
            profilesResult["cov"] = coverage
            profilesResult["covOnSubject"] = coverageOnSubject
            dResults[refName] = profilesResult
        return dResults
    
    @staticmethod    
    def _formatCodingFeaturesAsDict(lineOfEvidence, dCoding):
        codingEvidences = lineOfEvidence.split("; ")

        for codingTypeData in codingEvidences:
            codingTypeData = codingTypeData.split(": ")
            codingType = codingTypeData.pop(0)

            codingTypeData = ": ".join(codingTypeData)
            codingTypeData = codingTypeData.split(", ")

            if codingType == "TE_BLRtx":
                if not dCoding.has_key("TE_BLRtx"):
                    dCoding["TE_BLRtx"] = OrderedDict()
                for refNameAndCoverage in codingTypeData:
                    blrtxResult = OrderedDict()
                    refName, coverage = refNameAndCoverage.rsplit(": ", 1)
                    coverage = float(coverage.replace("%", ""))
                    blrtxResult["cov"] = coverage
                    dCoding["TE_BLRtx"][refName] = blrtxResult

            if codingType == "TE_BLRx":
                if not dCoding.has_key("TE_BLRx"):
                    dCoding["TE_BLRx"] = OrderedDict()
                for refNameAndCoverage in codingTypeData:
                    blrxResult = OrderedDict()
                    refName, coverage = refNameAndCoverage.rsplit(": ", 1)
                    coverage = float(coverage.replace("%", ""))
                    blrxResult["cov"] = coverage
                    dCoding["TE_BLRx"][refName] = blrxResult

            if codingType == "profiles":
                dCoding["profiles"] = ClassifUtils._formatProfilesResultsAsDict(codingTypeData)

            if codingType == "Other_profiles":
                dCoding["Other_profiles"] = ClassifUtils._formatProfilesResultsAsDict(codingTypeData)

            if codingType == "rDNA_BLRn":
                dCoding["rDNA_BLRn"] = OrderedDict()
                codingTypeData = ", ".join(codingTypeData)
                try:
                    refName, coverage = codingTypeData.rsplit(": ", 1)
                    coverage = float(coverage.replace("%", ""))
                except ValueError:
                    refName = codingTypeData
                    coverage = -1.0

                dCoding["rDNA_BLRn"]["name"] = refName
                dCoding["rDNA_BLRn"]["cov"] = coverage

            if codingType == "HG_BLRn":
                dCoding["HG_BLRn"] = OrderedDict()
                refName, coverage = codingTypeData[0].rsplit(": ", 1)
                coverage = float(coverage.replace("%", ""))

                dCoding["HG_BLRn"]["name"] = refName
                dCoding["HG_BLRn"]["cov"] = coverage
    
    @staticmethod
    def _formatStructFeaturesAsDict(lineOfEvidence, dStruct):
        structEvidences = lineOfEvidence.split("; ")
        for structTypeData in structEvidences:

            structTypeData = structTypeData.split(": ")
            structType = structTypeData.pop(0)

            structTypeData = ": ".join(structTypeData)
            structTypeData = structTypeData.split(", ")

            if structType == "TElength":
                dStruct["TElength"] = structTypeData.pop()

            if structType == "TermRepeats":
                dStruct["TermRepeats"] = OrderedDict()
                for refNameAndLength in structTypeData:
                    refName, length = refNameAndLength.rsplit(": ", 1)
                    dStruct["TermRepeats"][refName] = int(length)

            if structType == "ORF":
                if not dStruct.has_key("ORF"):
                    dStruct["ORF"] = structTypeData

            if structType in ["SSR", "SSRtrf"]:
                if not dStruct.has_key(structType):
                    dStruct[structType] = structTypeData

            if "SSRCoverage" in structType :
                dummy, cov = structType.split("=")
                dStruct["SSRCoverage"] = float(cov)

            if structType == "polyAtail":
                dStruct["polyAtail"] = True

            if structType == "helitronExtremities":
                structTypeData = ", ".join(structTypeData)
                structTypeData = structTypeData.split("), ")
                dStruct["helitronExtremities"] = OrderedDict()
                for helitronData in structTypeData:
                    helName, helData = helitronData.split(": (")
                    helData = helData.replace(")", "")
                    eValue, start, end = helData.split(", ")

                    helitronExtResult = OrderedDict()
                    helitronExtResult["start"] = int(start)
                    helitronExtResult["end"] = int(end)
                    helitronExtResult["eValue"] = float(eValue)
                    dStruct["helitronExtremities"][helName] = helitronExtResult
    
    @staticmethod
    def _formatOtherFeaturesAsDict(lineOfEvidence, dOther):
        if lineOfEvidence != "":
            ClassifUtils._formatCodingFeaturesAsDict(lineOfEvidence, dOther)
            ClassifUtils._formatStructFeaturesAsDict(lineOfEvidence, dOther)
    
    @staticmethod
    def getClassifLineAsDict(line):
        dClassif = OrderedDict()
        iRenameHeaderClassif = RenameHeaderClassif()
        lClassifItem = line.split("\t")
        if len(lClassifItem) != 8:
            msg = "Can't parse line: \"%s\"\n" % line.strip()
            print("WARNING - ClassifUtils - %s" % msg)
            return dClassif
        
        teClass = lClassifItem[4]
        teOrder = lClassifItem[5]
        # TODO: recompute wicker code like this or force the user to provide a classif file as input with the wicker code already added
        wCode = iRenameHeaderClassif._decisionRuleForWickerCode(teClass, teOrder)
        
        dClassif["name"] = lClassifItem[0]
        dClassif["wCode"] = wCode
        dClassif["length"] = int(lClassifItem[1])
        dClassif["strand"] = lClassifItem[2]
        dClassif["chimeric"] = False if lClassifItem[3] == "ok" else True
        
        dClassif["class"] = teClass
        dClassif["order"] = teOrder
        
        if(lClassifItem[6] == "complete"):
            dClassif["complete"] = True
        elif(lClassifItem[6] == "incomplete"):
            dClassif["complete"] = False
        else:
            dClassif["complete"] = None

        allFields = lClassifItem[7].split("; ")
        
        CI = allFields.pop(0)
        CI = CI.split("=")[-1]
        if CI != "NA":
            try:
                CI = int(CI)
            except ValueError as e:
                print "Couldn't convert %s to int : %s" % (CI, e)
        dClassif["CI"] = CI
        
        dClassif["coding"] = OrderedDict()
        dClassif["struct"] = OrderedDict()
        dClassif["other"] = OrderedDict()
        
        allFields = "; ".join(allFields)
        codingField = ""
        structField = ""
        otherField = ""
        
        codingStart = allFields.find("coding=(")
        if codingStart != -1:
            pCount = 1
            trueStart = codingStart + len("coding=(")
            end = trueStart
            for char in allFields[trueStart:]:
                if char == "(":
                    pCount += 1
                if char == ")":
                    pCount -= 1
                if pCount == 0:
                    break;
                end += 1
            if pCount == 0:
                codingField = allFields[trueStart:end]
        
        structStart = allFields.find("struct=(")
        if structStart != -1:
            pCount = 1
            trueStart = structStart + len("struct=(")
            end = trueStart
            for char in allFields[trueStart:]:
                if char == "(":
                    pCount += 1
                if char == ")":
                    pCount -= 1
                if pCount == 0:
                    break;
                end += 1
            structField = allFields[trueStart:end]
            
        otherStart = allFields.find("other=(")
        if otherStart != -1:
            pCount = 1
            trueStart = otherStart + len("other=(")
            end = trueStart
            for char in allFields[trueStart:]:
                if char == "(":
                    pCount += 1
                if char == ")":
                    pCount -= 1
                if pCount == 0:
                    break;
                end += 1
            otherField = allFields[trueStart:end]
        
        if codingField != "":
            ClassifUtils._formatCodingFeaturesAsDict(codingField, dClassif["coding"])
        if structField != "":
            ClassifUtils._formatStructFeaturesAsDict(structField, dClassif["struct"])
        if otherField != "":
            ClassifUtils._formatOtherFeaturesAsDict(otherField, dClassif["other"])

        return dClassif
    
    ## Retrieve the classification informations of a classif file
    # 
    # @param fileName Name of the classif file
    # @return A dict containing the classification infos
    #
    @staticmethod
    def getClassifInfosAsDict(fileName):
        dConsensusInfo = OrderedDict()

        ext = os.path.splitext(fileName)[1]
        if  ext != ".classif":
            msg = "Input file must be a classif file from TEdenovo\n"
            print("ERROR - ClassifUtils - %s" % msg)
            exit(1)
        
        with open(fileName, "r") as classifFile:
            for line in classifFile:
                seqName = line.split("\t")[0]
                dConsensusInfo[seqName] = ClassifUtils.getClassifLineAsDict(line)
        
        return dConsensusInfo
    
    ## Convert a classif file to JSON format
    # 
    # @param fileName Name of the classif file
    # @param outFileName Name of the output JSON file (optional)
    #  
    @staticmethod
    def convertClassifToJson(fileName, outFileName = ""):
        dConsensusInfo = ClassifUtils.getClassifInfosAsDict(fileName)
        if outFileName == "":
            outFileName = "%s_classif.json" % (os.path.basename(fileName).rsplit(".", 1)[0])
        with open(outFileName, 'w') as outFile:
            json.dump(dConsensusInfo, outFile)