Mercurial > repos > urgi-team > teiso
diff TEisotools-1.0/commons/core/utils/ClassifUtils.py @ 6:20ec0d14798e draft
Uploaded
author | urgi-team |
---|---|
date | Wed, 20 Jul 2016 05:00:24 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/TEisotools-1.0/commons/core/utils/ClassifUtils.py Wed Jul 20 05:00:24 2016 -0400 @@ -0,0 +1,311 @@ +# Copyright INRA (Institut National de la Recherche Agronomique) +# http://www.inra.fr +# http://urgi.versailles.inra.fr +# +# This software is governed by the CeCILL license under French law and +# abiding by the rules of distribution of free software. You can use, +# modify and/ or redistribute the software under the terms of the CeCILL +# license as circulated by CEA, CNRS and INRIA at the following URL +# "http://www.cecill.info". +# +# As a counterpart to the access to the source code and rights to copy, +# modify and redistribute granted by the license, users are provided only +# with a limited warranty and the software's author, the holder of the +# economic rights, and the successive licensors have only limited +# liability. +# +# In this respect, the user's attention is drawn to the risks associated +# with loading, using, modifying and/or developing or reproducing the +# software by the user in light of its specific status of free software, +# that may mean that it is complicated to manipulate, and that also +# therefore means that it is reserved for developers and experienced +# professionals having in-depth computer knowledge. Users are therefore +# encouraged to load and test the software's suitability as regards their +# requirements in conditions enabling the security of their systems and/or +# data to be ensured and, more generally, to use and operate it in the +# same conditions as regards security. +# +# The fact that you are presently reading this means that you have had +# knowledge of the CeCILL license and that you accept its terms. + +import os +import json +from collections import OrderedDict +from commons.tools.RenameHeaderClassif import RenameHeaderClassif + +class ClassifUtils(object): + + @staticmethod + def _formatProfilesResultsAsDict(lProfilesResults): + if len(lProfilesResults) == 0: + return OrderedDict() + + dResults = OrderedDict() + + for refNameAndCoverage in lProfilesResults: + refName, coverage = refNameAndCoverage.split(": ") + + coverage = coverage.split("%(") + coverageOnSubject = float(coverage.pop(1).replace("%)", "")) + coverage = float(coverage.pop(0)) + + profilesResult = OrderedDict() + profilesResult["cov"] = coverage + profilesResult["covOnSubject"] = coverageOnSubject + dResults[refName] = profilesResult + return dResults + + @staticmethod + def _formatCodingFeaturesAsDict(lineOfEvidence, dCoding): + codingEvidences = lineOfEvidence.split("; ") + + for codingTypeData in codingEvidences: + codingTypeData = codingTypeData.split(": ") + codingType = codingTypeData.pop(0) + + codingTypeData = ": ".join(codingTypeData) + codingTypeData = codingTypeData.split(", ") + + if codingType == "TE_BLRtx": + if not dCoding.has_key("TE_BLRtx"): + dCoding["TE_BLRtx"] = OrderedDict() + for refNameAndCoverage in codingTypeData: + blrtxResult = OrderedDict() + refName, coverage = refNameAndCoverage.rsplit(": ", 1) + coverage = float(coverage.replace("%", "")) + blrtxResult["cov"] = coverage + dCoding["TE_BLRtx"][refName] = blrtxResult + + if codingType == "TE_BLRx": + if not dCoding.has_key("TE_BLRx"): + dCoding["TE_BLRx"] = OrderedDict() + for refNameAndCoverage in codingTypeData: + blrxResult = OrderedDict() + refName, coverage = refNameAndCoverage.rsplit(": ", 1) + coverage = float(coverage.replace("%", "")) + blrxResult["cov"] = coverage + dCoding["TE_BLRx"][refName] = blrxResult + + if codingType == "profiles": + dCoding["profiles"] = ClassifUtils._formatProfilesResultsAsDict(codingTypeData) + + if codingType == "Other_profiles": + dCoding["Other_profiles"] = ClassifUtils._formatProfilesResultsAsDict(codingTypeData) + + if codingType == "rDNA_BLRn": + dCoding["rDNA_BLRn"] = OrderedDict() + codingTypeData = ", ".join(codingTypeData) + try: + refName, coverage = codingTypeData.rsplit(": ", 1) + coverage = float(coverage.replace("%", "")) + except ValueError: + refName = codingTypeData + coverage = -1.0 + + dCoding["rDNA_BLRn"]["name"] = refName + dCoding["rDNA_BLRn"]["cov"] = coverage + + if codingType == "HG_BLRn": + dCoding["HG_BLRn"] = OrderedDict() + refName, coverage = codingTypeData[0].rsplit(": ", 1) + coverage = float(coverage.replace("%", "")) + + dCoding["HG_BLRn"]["name"] = refName + dCoding["HG_BLRn"]["cov"] = coverage + + @staticmethod + def _formatStructFeaturesAsDict(lineOfEvidence, dStruct): + structEvidences = lineOfEvidence.split("; ") + for structTypeData in structEvidences: + + structTypeData = structTypeData.split(": ") + structType = structTypeData.pop(0) + + structTypeData = ": ".join(structTypeData) + structTypeData = structTypeData.split(", ") + + if structType == "TElength": + dStruct["TElength"] = structTypeData.pop() + + if structType == "TermRepeats": + dStruct["TermRepeats"] = OrderedDict() + for refNameAndLength in structTypeData: + refName, length = refNameAndLength.rsplit(": ", 1) + dStruct["TermRepeats"][refName] = int(length) + + if structType == "ORF": + if not dStruct.has_key("ORF"): + dStruct["ORF"] = structTypeData + + if structType in ["SSR", "SSRtrf"]: + if not dStruct.has_key(structType): + dStruct[structType] = structTypeData + + if "SSRCoverage" in structType : + dummy, cov = structType.split("=") + dStruct["SSRCoverage"] = float(cov) + + if structType == "polyAtail": + dStruct["polyAtail"] = True + + if structType == "helitronExtremities": + structTypeData = ", ".join(structTypeData) + structTypeData = structTypeData.split("), ") + dStruct["helitronExtremities"] = OrderedDict() + for helitronData in structTypeData: + helName, helData = helitronData.split(": (") + helData = helData.replace(")", "") + eValue, start, end = helData.split(", ") + + helitronExtResult = OrderedDict() + helitronExtResult["start"] = int(start) + helitronExtResult["end"] = int(end) + helitronExtResult["eValue"] = float(eValue) + dStruct["helitronExtremities"][helName] = helitronExtResult + + @staticmethod + def _formatOtherFeaturesAsDict(lineOfEvidence, dOther): + if lineOfEvidence != "": + ClassifUtils._formatCodingFeaturesAsDict(lineOfEvidence, dOther) + ClassifUtils._formatStructFeaturesAsDict(lineOfEvidence, dOther) + + @staticmethod + def getClassifLineAsDict(line): + dClassif = OrderedDict() + iRenameHeaderClassif = RenameHeaderClassif() + lClassifItem = line.split("\t") + if len(lClassifItem) != 8: + msg = "Can't parse line: \"%s\"\n" % line.strip() + print("WARNING - ClassifUtils - %s" % msg) + return dClassif + + teClass = lClassifItem[4] + teOrder = lClassifItem[5] + # TODO: recompute wicker code like this or force the user to provide a classif file as input with the wicker code already added + wCode = iRenameHeaderClassif._decisionRuleForWickerCode(teClass, teOrder) + + dClassif["name"] = lClassifItem[0] + dClassif["wCode"] = wCode + dClassif["length"] = int(lClassifItem[1]) + dClassif["strand"] = lClassifItem[2] + dClassif["chimeric"] = False if lClassifItem[3] == "ok" else True + + dClassif["class"] = teClass + dClassif["order"] = teOrder + + if(lClassifItem[6] == "complete"): + dClassif["complete"] = True + elif(lClassifItem[6] == "incomplete"): + dClassif["complete"] = False + else: + dClassif["complete"] = None + + allFields = lClassifItem[7].split("; ") + + CI = allFields.pop(0) + CI = CI.split("=")[-1] + if CI != "NA": + try: + CI = int(CI) + except ValueError as e: + print "Couldn't convert %s to int : %s" % (CI, e) + dClassif["CI"] = CI + + dClassif["coding"] = OrderedDict() + dClassif["struct"] = OrderedDict() + dClassif["other"] = OrderedDict() + + allFields = "; ".join(allFields) + codingField = "" + structField = "" + otherField = "" + + codingStart = allFields.find("coding=(") + if codingStart != -1: + pCount = 1 + trueStart = codingStart + len("coding=(") + end = trueStart + for char in allFields[trueStart:]: + if char == "(": + pCount += 1 + if char == ")": + pCount -= 1 + if pCount == 0: + break; + end += 1 + if pCount == 0: + codingField = allFields[trueStart:end] + + structStart = allFields.find("struct=(") + if structStart != -1: + pCount = 1 + trueStart = structStart + len("struct=(") + end = trueStart + for char in allFields[trueStart:]: + if char == "(": + pCount += 1 + if char == ")": + pCount -= 1 + if pCount == 0: + break; + end += 1 + structField = allFields[trueStart:end] + + otherStart = allFields.find("other=(") + if otherStart != -1: + pCount = 1 + trueStart = otherStart + len("other=(") + end = trueStart + for char in allFields[trueStart:]: + if char == "(": + pCount += 1 + if char == ")": + pCount -= 1 + if pCount == 0: + break; + end += 1 + otherField = allFields[trueStart:end] + + if codingField != "": + ClassifUtils._formatCodingFeaturesAsDict(codingField, dClassif["coding"]) + if structField != "": + ClassifUtils._formatStructFeaturesAsDict(structField, dClassif["struct"]) + if otherField != "": + ClassifUtils._formatOtherFeaturesAsDict(otherField, dClassif["other"]) + + return dClassif + + ## Retrieve the classification informations of a classif file + # + # @param fileName Name of the classif file + # @return A dict containing the classification infos + # + @staticmethod + def getClassifInfosAsDict(fileName): + dConsensusInfo = OrderedDict() + + ext = os.path.splitext(fileName)[1] + if ext != ".classif": + msg = "Input file must be a classif file from TEdenovo\n" + print("ERROR - ClassifUtils - %s" % msg) + exit(1) + + with open(fileName, "r") as classifFile: + for line in classifFile: + seqName = line.split("\t")[0] + dConsensusInfo[seqName] = ClassifUtils.getClassifLineAsDict(line) + + return dConsensusInfo + + ## Convert a classif file to JSON format + # + # @param fileName Name of the classif file + # @param outFileName Name of the output JSON file (optional) + # + @staticmethod + def convertClassifToJson(fileName, outFileName = ""): + dConsensusInfo = ClassifUtils.getClassifInfosAsDict(fileName) + if outFileName == "": + outFileName = "%s_classif.json" % (os.path.basename(fileName).rsplit(".", 1)[0]) + with open(outFileName, 'w') as outFile: + json.dump(dConsensusInfo, outFile)