Mercurial > repos > yufei-luo > s_mart
view SMART/Java/Python/structure/Transcript.py @ 6:769e306b7933
Change the repository level.
author | yufei-luo |
---|---|
date | Fri, 18 Jan 2013 04:54:14 -0500 |
parents | |
children | 94ab73e8a190 |
line wrap: on
line source
# # Copyright INRA-URGI 2009-2010 # # This software is governed by the CeCILL license under French law and # abiding by the rules of distribution of free software. You can use, # modify and/ or redistribute the software under the terms of the CeCILL # license as circulated by CEA, CNRS and INRIA at the following URL # "http://www.cecill.info". # # As a counterpart to the access to the source code and rights to copy, # modify and redistribute granted by the license, users are provided only # with a limited warranty and the software's author, the holder of the # economic rights, and the successive licensors have only limited # liability. # # In this respect, the user's attention is drawn to the risks associated # with loading, using, modifying and/or developing or reproducing the # software by the user in light of its specific status of free software, # that may mean that it is complicated to manipulate, and that also # therefore means that it is reserved for developers and experienced # professionals having in-depth computer knowledge. Users are therefore # encouraged to load and test the software's suitability as regards their # requirements in conditions enabling the security of their systems and/or # data to be ensured and, more generally, to use and operate it in the # same conditions as regards security. # # The fact that you are presently reading this means that you have had # knowledge of the CeCILL license and that you accept its terms. # import sys from SMART.Java.Python.structure.Interval import Interval from SMART.Java.Python.structure.Sequence import Sequence class Transcript(Interval): """ A class that models an transcript, considered as a specialized interval (the bounds of the transcript) that contains exons (also represented as intervals) @ivar exons: a list of exons (intervals) @type exons: list of L{Interval{Interval}} """ def __init__(self, transcript = None, verbosity = 0): """ Constructor @param transcript: transcript to be copied @type transcript: class L{Transcript<Transcript>} @param verbosity: verbosity @type verbosity: int """ super(Transcript, self).__init__(None, verbosity) self.exons = [] self.introns = None if transcript != None: self.copy(transcript) def copy(self, transcript): """ Copy method @param transcript: transcript to be copied @type transcript: class L{Transcript<Transcript>} or L{Interval<Interval>} """ super(Transcript, self).copy(transcript) if transcript.__class__.__name__ == "Transcript": exons = transcript.getExons() if len(exons) > 1: for exon in exons: exonCopy = Interval(exon) self.addExon(exonCopy) def setDirection(self, direction): """ Set the direction of the interval Possibly parse different formats Impact all exons @param direction: direction of the transcript (+ / -) @type direction: int or string """ super(Transcript, self).setDirection(direction) for exon in self.exons: exon.setDirection(direction) def setChromosome(self, chromosome): """ Set the chromosome @param chromosome: chromosome on which the transcript is @type chromosome: string """ super(Transcript, self).setChromosome(chromosome) for exon in self.exons: exon.setChromosome(chromosome) def addExon(self, exon): """ Add an exon to the list of exons @param exon: a new exon @type exon: class L{Interval<Interval>} """ if not self.exons and not exon.overlapWith(self): firstExon = Interval() firstExon.setStart(self.getStart()) firstExon.setEnd(self.getEnd()) firstExon.setDirection(self.getDirection()) firstExon.setChromosome(self.getChromosome()) self.exons.append(firstExon) newExon = Interval(exon) newExon.setDirection(self.getDirection()) self.exons.append(newExon) if newExon.getStart() < self.getStart(): self.setStart(newExon.getStart()) if newExon.getEnd() > self.getEnd(): self.setEnd(newExon.getEnd()) def setStart(self, start): """ Set the new start, move the first exon accordingly (if exists) @param start: the new start @type start: int """ super(Transcript, self).setStart(start) if self.exons: self.sortExonsIncreasing() self.exons[0].setStart(start) def setEnd(self, end): """ Set the new end, move the last exon accordingly (if exists) @param end: the new end @type end: int """ super(Transcript, self).setEnd(end) if self.exons: self.sortExonsIncreasing() self.exons[-1].setEnd(end) def reverse(self): """ Reverse the strand of the transcript """ super(Transcript, self).reverse() for exon in self.exons: exon.reverse() def getUniqueName(self): """ Try to give a unique name by possibly adding occurrence """ if "nbOccurrences" in self.tags and "occurrence" in self.tags and self.tags["nbOccurrences"] != 1: return "%s-%d" % (self.name, self.tags["occurrence"]) return self.name def getNbExons(self): """ Get the number of exons """ return max(1, len(self.exons)) def getExon(self, i): """ Get a specific exon @param i: the rank of the exon @type i: int """ if len(self.exons) == 0: if i != 0: raise Exception("Cannot get exon #%i while there is no exon in the transcript" % (i)) return self return self.exons[i] def getExons(self): """ Get all the exons """ if len(self.exons) == 0: return [Interval(self)] return self.exons def getIntrons(self): """ Get all the introns Compute introns on the fly """ if self.introns != None: return self.introns self.sortExons() self.introns = [] exonStart = self.getExon(0) for cpt, exonEnd in enumerate(self.exons[1:]): intron = Interval() intron.setName("%s_intron%d" % (self.getName(), cpt+1)) intron.setChromosome(self.getChromosome()) intron.setDirection(self.getDirection()) if self.getDirection() == 1: intron.setEnd(exonEnd.getStart() - 1) intron.setStart(exonStart.getEnd() + 1) else: intron.setStart(exonEnd.getEnd() + 1) intron.setEnd(exonStart.getStart() - 1) intron.setDirection(self.getDirection()) if intron.getSize() > 0: self.introns.append(intron) exonStart = exonEnd intron.setSize(intron.getEnd() - intron.getStart() + 1) return self.introns def getSize(self): """ Get the size of the transcript (i.e. the number of nucleotides) Compute size on the fly """ if len(self.exons) == 0: return self.getSizeWithIntrons() size = 0 for exon in self.exons: size += exon.getSize() return size def getSizeWithIntrons(self): """ Get the size of the interval (i.e. distance from start to end) """ return super(Transcript, self).getSize() def overlapWithExon(self, transcript, nbNucleotides = 1): """ Check if the exons of this transcript overlap with the exons of another transcript @param transcript: transcript to be compared to @type transcript: class L{Transcript<Transcript>} @param nbNucleotides: minimum number of nucleotides to declare and overlap @type nbNucleotides: int """ if not self.overlapWith(transcript, nbNucleotides): return False for thisExon in self.getExons(): for thatExon in transcript.getExons(): if thisExon.overlapWith(thatExon, nbNucleotides): return True return False def include(self, transcript): """ Whether this transcript includes the other one @param transcript: object to be compared to @type transcript: class L{Transcript<Transcript>} """ if not super(Transcript, self).include(transcript): return False for thatExon in transcript.getExons(): for thisExon in self.getExons(): if thisExon.include(thatExon): break else: return False return True def merge(self, transcript, normalization = False): """ Merge with another transcript Merge exons if they overlap, otherwise add exons @param transcript: transcript to be merged to @type transcript: class L{Transcript<Transcript>} @param normalization: whether the sum of the merge should be normalized wrt the number of mappings of each elements @type normalization: boolean """ if self.getChromosome() != transcript.getChromosome() or self.getDirection() != transcript.getDirection(): raise Exception("Cannot merge '%s' with '%s'!" % (self, transcript)) theseExons = self.getExons() thoseExons = transcript.getExons() for thatExon in thoseExons: toBeRemoved = [] for thisIndex, thisExon in enumerate(theseExons): if thisExon.overlapWith(thatExon): thatExon.merge(thisExon) toBeRemoved.append(thisIndex) theseExons.append(thatExon) for thisIndex in reversed(toBeRemoved): del theseExons[thisIndex] self.removeExons() self.setStart(min(self.getStart(), transcript.getStart())) self.setEnd(max(self.getEnd(), transcript.getEnd())) if len(theseExons) > 1: for thisExon in theseExons: self.addExon(thisExon) self.setName("%s--%s" % (self.getUniqueName(), transcript.getUniqueName())) super(Transcript, self).merge(transcript, normalization) def getDifference(self, transcript, sameStrand = False): """ Get the difference between this cluster and another one @param transcript: object to be compared to @type transcript: class L{Transcript<Transcript>} @param sameStrand: do the comparison iff the transcripts are on the same strand @type sameStrand: boolean @return: a transcript """ newTranscript = Transcript() newTranscript.copy(self) if self.getChromosome() != transcript.getChromosome(): return newTranscript if not self.overlapWith(transcript): return newTranscript if sameStrand and self.getDirection() != transcript.getDirection(): return newTranscript newTranscript.removeExons() if transcript.getEnd() > newTranscript.getStart(): newTranscript.setStart(transcript.getEnd() + 1) if transcript.getStart() < newTranscript.getEnd(): newTranscript.setEnd(transcript.getStart() + 1) theseExons = [] for exon in self.getExons(): exonCopy = Interval() exonCopy.copy(exon) theseExons.append(exonCopy) for thatExon in transcript.getExons(): newExons = [] for thisExon in theseExons: newExons.extend(thisExon.getDifference(thatExon)) theseExons = newExons if not theseExons: return None newStart, newEnd = theseExons[0].getStart(), theseExons[0].getEnd() for thisExon in theseExons[1:]: newStart = min(newStart, thisExon.getStart()) newEnd = max(newEnd, thisExon.getEnd()) newTranscript.setEnd(newEnd) newTranscript.setStart(newStart) newTranscript.exons = theseExons return newTranscript def getSqlVariables(cls): """ Get the properties of the object that should be saved in a database """ variables = Interval.getSqlVariables() variables.append("exons") return variables getSqlVariables = classmethod(getSqlVariables) def setSqlValues(self, array): """ Set the values of the properties of this object as given by a results line of a SQL query @param array: the values to be copied @type array: a list """ super(Transcript, self).setSqlValues(array) mergedExons = array[8] if not mergedExons: return for exonCount, splittedExon in enumerate(mergedExons.split(",")): start, end = splittedExon.split("-") exon = Interval() exon.setChromosome(self.getChromosome()) exon.setDirection(self.getDirection()) exon.setName("%s_exon%d" % (self.getName(), exonCount+1)) exon.setStart(int(start)) exon.setEnd(int(end)) self.addExon(exon) def getSqlValues(self): """ Get the values of the properties that should be saved in a database """ values = super(Transcript, self).getSqlValues() values["size"] = self.getSize() if self.getNbExons() == 1: values["exons"] = "" else: values["exons"] = ",".join(["%d-%d" % (exon.getStart(), exon.getEnd()) for exon in self.getExons()]) return values def getSqlTypes(cls): """ Get the types of the properties that should be saved in a database """ types = Interval.getSqlTypes() types["exons"] = "varchar" return types getSqlTypes = classmethod(getSqlTypes) def getSqlSizes(cls): """ Get the sizes of the properties that should be saved in a database """ sizes = Interval.getSqlSizes() sizes["exons"] = 10000 return sizes getSqlSizes = classmethod(getSqlSizes) def sortExons(self): """ Sort the exons Increasing order if transcript is on strand "+", decreasing otherwise """ self.sortExonsIncreasing() if self.getDirection() == -1: exons = self.getExons() exons.reverse() self.exons = exons def sortExonsIncreasing(self): """ Sort the exons Increasing order """ exons = self.getExons() sortedExons = [] while len(exons) > 0: minExon = exons[0] for index in range(1, len(exons)): if minExon.getStart() > exons[index].getStart(): minExon = exons[index] sortedExons.append(minExon) exons.remove(minExon) self.exons = sortedExons def extendStart(self, size): """ Extend the transcript by the 5' end @param size: the size to be extended @type size: int """ if len(self.exons) != 0: self.sortExons() if self.getDirection() == 1: self.exons[0].setStart(max(0, self.exons[0].getStart() - size)) else: self.exons[0].setEnd(self.exons[0].getEnd() + size) super(Transcript, self).extendStart(size) self.bin = None def extendEnd(self, size): """ Extend the transcript by the 3' end @param size: the size to be extended @type size: int """ if len(self.exons) != 0: self.sortExons() if self.getDirection() == 1: self.exons[-1].setEnd(self.exons[-1].getEnd() + size) else: self.exons[-1].setStart(max(0, self.exons[-1].getStart() - size)) super(Transcript, self).extendEnd(size) self.bin = None def extendExons(self, size): """ Extend all the exons @param size: the size to be extended @type size: int """ if len(self.exons) != 0: self.sortExons() exons = [] previousExon = None for exon in self.exons: exon.extendStart(size) exon.extendEnd(size) exon.setDirection(self.getDirection()) if previousExon != None and previousExon.overlapWith(exon): previousExon.merge(exon) else: if previousExon != None: exons.append(previousExon) previousExon = exon exons.append(previousExon) self.exons = exons super(Transcript, self).extendStart(size) super(Transcript, self).extendEnd(size) self.bin = None def restrictStart(self, size = 1): """ Restrict the transcript by some nucleotides, start from its start position Remove the exons @param size: the size to be restricted to @type size: int """ newExons = [] if self.getDirection() == 1: for exon in self.exons: if exon.getStart() <= self.getStart() + size - 1: if exon.getEnd() > self.getStart() + size - 1: exon.setEnd(self.getStart() + size - 1) newExons.append(exon) else: for exon in self.exons: if exon.getEnd() >= self.getEnd() - size + 1: if exon.getStart() < self.getEnd() - size + 1: exon.setStart(self.getEnd() - size + 1) newExons.append(exon) super(Transcript, self).restrictStart(size) self.exons = newExons def restrictEnd(self, size = 1): """ Restrict the transcript by some nucleotides, end from its end position Remove the exons @param size: the size to be restricted to @type size: int """ newExons = [] if self.getDirection() == 1: for exon in self.exons: if exon.getEnd() >= self.getEnd() - size + 1: if exon.getStart() < self.getEnd() - size + 1: exon.setStart(self.getEnd() - size + 1) newExons.append(exon) else: for exon in self.exons: if exon.getEnd() >= self.getEnd() - size + 1: if exon.getStart() < self.getEnd() - size + 1: exon.setEnd(self.getEnd() - size + 1) newExons.append(exon) super(Transcript, self).restrictEnd(size) self.exons = newExons def removeExons(self): """ Remove the exons and transforms the current transcript into a mere interval """ self.exons = [] self.bin = None def printGtf(self, title): """ Export this transcript using GTF2.2 format @param title: the title of the transcripts @type title: string @return: a string """ transcriptId = self.getUniqueName() geneId = "%s_gene" % (transcriptId) direction = "+" if self.getDirection() == -1: direction = "-" self.sortExonsIncreasing() string = "" for i, exon in enumerate(self.getExons()): exonCopy = Interval() exonCopy.copy(exon) if "ID" in exonCopy.getTagValues(): del exonCopy.tags["ID"] feature = "exon" if "feature" in exonCopy.getTagNames(): feature = exonCopy.getTagValue("feature") del exonCopy.tags["feature"] score = "." if "score" in exonCopy.getTagNames(): score = "%d" % (int(exonCopy.getTagValue("score"))) del exonCopy.tags["score"] if "Parent" in exonCopy.getTagNames(): del exonCopy.tags["Parent"] exonCopy.setName("%s_part%d" % (self.getName(), i+1)) comment = exonCopy.getTagValues("; ", " ", "\"") string += "%s\t%s\t%s\t%d\t%d\t%s\t%s\t.\ttranscript_id \"%s\"; gene_id \"%s\"; %s\n" % (exonCopy.getChromosome(), title, feature, exonCopy.getStart(), exonCopy.getEnd(), score, direction, transcriptId, geneId, comment) return string def printGff2(self, title): """ Export this transcript using GFF2 format @param title: the title of the transcripts @type title: string @return: a string """ direction = "+" if self.getDirection() == -1: direction = "-" self.sortExonsIncreasing() comment = self.getTagValues() if comment != None: comment = ";%s" % (comment) score = "." if "score" in self.getTagNames(): score = "%d" % (int(self.getTagValue("score"))) feature = "transcript" if "feature" in self.getTagNames(): feature = self.getTagValue("feature") string = "%s\t%s\t%s\t%d\t%d\t%s\t%s\t.\tGENE %s%s\n" % (self.getChromosome(), title, feature, self.getStart(), self.getEnd(), score, direction, self.name, comment) for exon in self.getExons(): if "score" in exon.getTagNames(): score = "%d" % (int(self.getTagValue("score"))) string += "%s\t%s\t_exon\t%d\t%d\t%s\t%s\t.\tGENE %s\n" % (self.getChromosome(), title, exon.getStart(), exon.getEnd(), score, direction, self.name) return string def printGff3(self, title): """ Export this transcript using GFF3 format @param title: the title of the transcripts @type title: string @return: a string """ direction = "+" if self.getDirection() == -1: direction = "-" self.sortExonsIncreasing() if "ID" not in self.getTagValues(): self.setTagValue("ID", self.getUniqueName()) feature = "transcript" tags = self.tags if "feature" in self.getTagNames(): feature = self.getTagValue("feature") del self.tags["feature"] score = "." if "score" in self.getTagNames(): score = "%d" % (int(self.getTagValue("score"))) del self.tags["score"] comment = self.getTagValues(";", "=") string = "%s\t%s\t%s\t%d\t%d\t%s\t%s\t.\t%s\n" % (self.getChromosome(), title, feature, self.getStart(), self.getEnd(), score, direction, comment) if len(self.exons) > 1: for i, exon in enumerate(self.getExons()): if "score" in exon.getTagNames(): score = "%d" % (int(exon.getTagValue("score"))) string += "%s\t%s\texon\t%d\t%d\t%s\t%s\t.\tID=%s-exon%d;Name=%s-exon%d;Parent=%s\n" % (self.getChromosome(), title, exon.getStart(), exon.getEnd(), score, direction, self.getTagValue("ID"), i+1, self.name, i+1, self.getTagValue("ID")) self.tags = tags return string def printEmbl(self): """ Export this transcript using EMBL format @return: a string """ if len(self.exons) <= 1: position = "%d..%d" % (self.getStart(), self.getEnd()) else: positions = [] for exon in self.getExons(): positions.append("%d..%d" % (self.getStart(), self.getEnd())) position = ",".join(positions) position = "join(%s)" % (position) if self.getDirection() == -1: position = "complement(%s)" % (position) feature = "misc_feature" if "feature" in self.getTagNames(): if not self.getTagValue("feature").startswith("S-MART"): feature = self.getTagValue("feature") string = "FT %s %s\n" % (feature, position) if "Name" in self.getTagNames(): string += "FT /label=\"%s\"\n" % (self.getTagValue("Name")) return string def printBed(self): """ Export this transcript using BED format @return: a string """ name = self.name if "nbOccurrences" in self.getTagNames() and self.getTagValue("nbOccurrences") != 1 and self.getTagValue("occurrences"): name = "%s-%d" % (name, self.getTagValue("occurrence")) comment = self.getTagValues(";", "=") sizes = [] starts = [] direction = "+" if self.getDirection() == -1: direction = "-" self.sortExonsIncreasing() for exon in self.getExons(): sizes.append("%d" % (exon.getSize())) starts.append("%d" % (exon.getStart() - self.getStart())) return "%s\t%d\t%d\t%s\t1000\t%s\t%d\t%d\t0\t%d\t%s,\t%s,\n" % (self.getChromosome(), self.getStart(), self.getEnd()+1, name, direction, self.getStart(), self.getEnd()+1, self.getNbExons(), ",".join(sizes), ",".join(starts)) def printSam(self): """ Export this transcript using SAM format @return: a string """ name = self.name flag = 0 if self.getDirection() == 1 else 0x10 chromosome = self.getChromosome() genomeStart = self.getStart() quality = 255 mate = "*" mateGenomeStart = 0 gapSize = 0 sequence = "*" qualityString = "*" tags = "NM:i:0" lastExonEnd = None self.sortExonsIncreasing() exon = self.getExons()[0] cigar = "%dM" % (self.getExons()[0].getSize()) lastExonEnd = exon.getEnd() for i, exon in enumerate(self.getExons()): if i == 0: continue cigar += "%dN" % (exon.getStart() - lastExonEnd - 1) cigar += "%dM" % (exon.getSize()) return "%s\t%d\t%s\t%d\t%d\t%s\t%s\t%d\t%d\t%s\t%s\t%s\n" % (name, flag, chromosome, genomeStart, quality, cigar, mate, mateGenomeStart, gapSize, sequence, qualityString, tags) def printUcsc(self): """ Export this transcript using UCSC BED format @return: a string """ if self.getChromosome().find("Het") != -1: return "" name = self.name comment = self.getTagValues(";", "") sizes = [] starts = [] direction = "+" if self.getDirection() == -1: direction = "-" self.sortExonsIncreasing() for exon in self.getExons(): sizes.append("%d" % (exon.getSize())) starts.append("%d" % (exon.getStart() - self.getStart())) return "%s\t%d\t%d\t%s\t1000\t%s\t%d\t%d\t0\t%d\t%s,\t%s,\n" % (self.getChromosome().replace("arm_", "chr"), self.getStart(), self.getEnd()+1, name, direction, self.getStart(), self.getEnd()+1, self.getNbExons(), ",".join(sizes), ",".join(starts)) def printGBrowseReference(self): """ Export this transcript using GBrowse format (1st line only) @return: a string """ return "reference = %s\n" % (self.getChromosome()) def printGBrowseLine(self): """ Export this transcript using GBrowse format (2nd line only) @return: a string """ self.sortExons() coordinates = [] for exon in self.getExons(): coordinates.append(exon.printCoordinates()) coordinatesString = ",".join(coordinates) comment = self.getTagValues(";", "=") if comment: comment = "\t\"%s\"" % (comment) return "User_data\t%s\t%s%s\n" % (self.name, coordinatesString, comment) def printGBrowse(self): """ Export this transcript using GBrowse format @return: a string """ return "%s%s" % (self.printGBrowseReference(), self.printGBrowseLine()) def printCsv(self): """ Export this transcript using CSV format @return: a string """ self.sortExons() string = "%s,%d,%d,\"%s\"," % (self.getChromosome(), self.getStart(), self.getEnd(), "+" if self.getDirection() == 1 else "-") if len(self.getExons()) == 1: string += "None" else: for exon in self.getExons(): string += "%d-%d " % (exon.getStart(), exon.getEnd()) for tag in sorted(self.tags.keys()): string += ",%s=%s" % (tag, str(self.tags[tag])) string += "\n" return string def extractSequence(self, parser): """ Get the sequence corresponding to this transcript @param parser: a parser to a FASTA file @type parser: class L{SequenceListParser<SequenceListParser>} @return: an instance of L{Sequence<Sequence>} """ self.sortExons() name = self.name if "ID" in self.getTagNames() and self.getTagValue("ID") != self.name: name += ":%s" % (self.getTagValue("ID")) sequence = Sequence(name) for exon in self.getExons(): sequence.concatenate(exon.extractSequence(parser)) return sequence def extractWigData(self, parser): """ Get some wig data corresponding to this transcript @param parser: a parser to a wig file @type parser: class L{WigParser<WigParser>} @return: a sequence of float """ self.sortExons() if parser.strands: strands = (-1, 1) values = dict([(strand, []) for strand in strands]) for exon in self.getExons(): theseValues = exon.extractWigData(parser) if self.getDirection() == -1: for strand in strands: theseValues[strand].reverse() for strand in strands: values[strand].extend(theseValues[strand]) if self.getDirection() == -1: for strand in strands: values[strand].reverse() return values else: values = [] for exon in self.getExons(): theseValues = exon.extractWigData(parser) #if self.getDirection() == -1: # theseValues.reverse() values.extend(theseValues) #if self.getDirection() == -1: # values.reverse() return values