Mercurial > repos > urgi-team > teiso
diff TEisotools-1.0/commons/core/seq/Bioseq.py @ 6:20ec0d14798e draft
Uploaded
author | urgi-team |
---|---|
date | Wed, 20 Jul 2016 05:00:24 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/TEisotools-1.0/commons/core/seq/Bioseq.py Wed Jul 20 05:00:24 2016 -0400 @@ -0,0 +1,738 @@ +# Copyright INRA (Institut National de la Recherche Agronomique) +# http://www.inra.fr +# http://urgi.versailles.inra.fr +# +# This software is governed by the CeCILL license under French law and +# abiding by the rules of distribution of free software. You can use, +# modify and/ or redistribute the software under the terms of the CeCILL +# license as circulated by CEA, CNRS and INRIA at the following URL +# "http://www.cecill.info". +# +# As a counterpart to the access to the source code and rights to copy, +# modify and redistribute granted by the license, users are provided only +# with a limited warranty and the software's author, the holder of the +# economic rights, and the successive licensors have only limited +# liability. +# +# In this respect, the user's attention is drawn to the risks associated +# with loading, using, modifying and/or developing or reproducing the +# software by the user in light of its specific status of free software, +# that may mean that it is complicated to manipulate, and that also +# therefore means that it is reserved for developers and experienced +# professionals having in-depth computer knowledge. Users are therefore +# encouraged to load and test the software's suitability as regards their +# requirements in conditions enabling the security of their systems and/or +# data to be ensured and, more generally, to use and operate it in the +# same conditions as regards security. +# +# The fact that you are presently reading this means that you have had +# knowledge of the CeCILL license and that you accept its terms. + + +import re +import sys +import random +import string +import cStringIO +from commons.core.coord.Map import Map +from commons.core.checker.RepetException import RepetException + +DNA_ALPHABET_WITH_N = set(['A', 'T', 'G', 'C', 'N']) +IUPAC = set(['A', 'T', 'G', 'C', 'U', 'R', 'Y', 'M', 'K', 'W', 'S', 'B', 'D', 'H', 'V', 'N']) + + +## Record a sequence with its header +# +class Bioseq(object): + + __slots__ = ("header", "sequence", '__dict__') + + ## constructor + # + # @param name the header of sequence + # @param seq sequence (DNA, RNA, protein) + # + def __init__(self, name = "", seq = ""): + self.header = name + self.sequence = seq + + + ## Equal operator + # + def __eq__(self, o): + if type(o) is type(self) and self.header == o.header and self.sequence == o.sequence: + return True + return False + + ## Not equal operator + # + def __ne__(self, o): + return not self.__eq__(o) + + ## overload __repr__ + # + def __repr__(self): + return "%s;%s" % (self.header, self.sequence) + + + ## set attribute header + # + # @param header a string + # + def setHeader(self, header): + self.header = header + + + ## get attribute header + # + # @return header + def getHeader(self): + return self.header + + + ## set attribute sequence + # + # @param sequence a string + # + def setSequence(self, sequence): + self.sequence = sequence + + + def getSequence(self): + return self.sequence + + ## reset + # + def reset(self): + self.setHeader("") + self.setSequence("") + + + ## Test if bioseq is empty + # + def isEmpty(self): + return self.header == "" and self.sequence == "" + + + ## Reverse the sequence + # + def reverse(self): + tmp = self.sequence + self.sequence = tmp[::-1] + + + ## Turn the sequence into its complement + # Force upper case letters + # @warning: old name in pyRepet.Bioseq realComplement + # + def complement(self): + complement = "" + self.upCase() + for i in xrange(0, len(self.sequence), 1): + if self.sequence[i] == "A": + complement += "T" + elif self.sequence[i] == "T": + complement += "A" + elif self.sequence[i] == "C": + complement += "G" + elif self.sequence[i] == "G": + complement += "C" + elif self.sequence[i] == "M": + complement += "K" + elif self.sequence[i] == "R": + complement += "Y" + elif self.sequence[i] == "W": + complement += "W" + elif self.sequence[i] == "S": + complement += "S" + elif self.sequence[i] == "Y": + complement += "R" + elif self.sequence[i] == "K": + complement += "M" + elif self.sequence[i] == "V": + complement += "B" + elif self.sequence[i] == "H": + complement += "D" + elif self.sequence[i] == "D": + complement += "H" + elif self.sequence[i] == "B": + complement += "V" + elif self.sequence[i] == "N": + complement += "N" + elif self.sequence[i] == "-": + complement += "-" + else: + print "WARNING: unknown symbol '%s', replacing it by N" % (self.sequence[i]) + complement += "N" + self.sequence = complement + + + ## Reverse and complement the sequence + # + # Force upper case letters + # @warning: old name in pyRepet.Bioseq : complement + # + def reverseComplement(self): + self.reverse() + self.complement() + + + ## Remove gap in the sequence + # + def cleanGap(self): + self.sequence = self.sequence.replace("-", "") + + + ## Copy current Bioseq Instance + # + # @return: a Bioseq instance, a copy of current sequence. + # + def copyBioseqInstance(self): + seq = Bioseq() + seq.sequence = self.sequence + seq.header = self.header + return seq + + + ## Add phase information after the name of sequence in header + # + # @param phase integer representing phase (1, 2, 3, -1, -2, -3) + # + def setFrameInfoOnHeader(self, phase): + if " " in self.header: + name, desc = self.header.split(" ", 1) + name = name + "_" + str(phase) + self.header = name + " " + desc + else: + self.header = self.header + "_" + str(phase) + + + ## Fill Bioseq attributes with fasta file + # + # @param faFileHandler file handler of a fasta file + # + def read(self, faFileHandler): + line = faFileHandler.readline() + if line == "": + self.header = None + self.sequence = None + return + while line == "\n": + line = faFileHandler.readline() + if line[0] == '>': + self.header = string.rstrip(line[1:]) + else: + print "error, line is", string.rstrip(line) + return + line = " " + seq = cStringIO.StringIO() + while line: + prev_pos = faFileHandler.tell() + line = faFileHandler.readline() + if line == "": + break + if line[0] == '>': + faFileHandler.seek(prev_pos) + break + seq.write(string.rstrip(line)) + self.sequence = seq.getvalue() + + + ## Create a subsequence with a modified header + # + # @param s integer start a required subsequence + # @param e integer end a required subsequence + # + # @return a Bioseq instance, a subsequence of current sequence + # + def subseq(self, s, e = 0): + if e == 0 : + e = len(self.sequence) + if s > e : + print "error: start must be < or = to end" + return + if s <= 0 : + print "error: start must be > 0" + return + sub = Bioseq() + sub.header = self.header + " fragment " + str(s) + ".." + str(e) + sub.sequence = self.sequence[(s - 1):e] + return sub + + + ## Get the nucleotide or aminoacid at the given position + # + # @param pos integer nucleotide or aminoacid position + # + # @return a string + # + def getNtFromPosition(self, pos): + result = None + if not (pos < 1 or pos > self.getLength()): + result = self.sequence[pos - 1] + return result + + + ## Print in stdout the Bioseq in fasta format with 60 characters lines + # + # @param l length of required sequence default is whole sequence + # + def view(self, l = 0): + print '>' + self.header + i = 0 + if(l == 0): + l = len(self.sequence) + seq = self.sequence[0:l] + + while i < len(seq): + print seq[i:i + 60] + i = i + 60 + + + ## Get length of sequence + # + # @param avoidN boolean don't count 'N' nucleotides + # + # @return length of current sequence + # + def getLength(self, countN = True): + if countN: + return len(self.sequence) + else: + return len(self.sequence) - self.countNt('N') + + + ## Return the proportion of a specific character + # + # @param nt character that we want to count + # + def propNt(self, nt): + return self.countNt(nt) / float(self.getLength()) + + + ## Count occurrence of specific character + # + # @param nt character that we want to count + # + # @return nb of occurrences + # + def countNt(self, nt): + return self.sequence.count(nt) + + + ## Count occurrence of each nucleotide in current seq + # + # @return a dict, keys are nucleotides, values are nb of occurrences + # + def countAllNt(self): + dNt2Count = {} + for nt in ["A", "T", "G", "C", "N"]: + dNt2Count[ nt ] = self.countNt(nt) + return dNt2Count + + + ## Return a dict with the number of occurrences for each combination of ATGC of specified size and number of word found + # + # @param size integer required length word + # + def occ_word(self, size): + occ = {} + if size == 0: + return occ, 0 + nbword = 0 + srch = re.compile('[^ATGC]+') + wordlist = self._createWordList(size) + for i in wordlist: + occ[i] = 0 + lenseq = len(self.sequence) + i = 0 + while i < lenseq - size + 1: + word = self.sequence[i:i + size].upper() + m = srch.search(word) + if m == None: + occ[word] = occ[word] + 1 + nbword = nbword + 1 + i = i + 1 + else: + i = i + m.end(0) + return occ, nbword + + + ## Return a dictionary with the frequency of occurs for each combination of ATGC of specified size + # + # @param size integer required length word + # + def freq_word(self, size): + dOcc, nbWords = self.occ_word(size) + freq = {} + for word in dOcc.keys(): + freq[word] = float(dOcc[word]) / nbWords + return freq + + + ## Find ORF in each phase + # + # @return: a dict, keys are phases, values are stop codon positions. + # + def findORF (self): + orf = {0:[], 1:[], 2:[]} + length = len (self.sequence) + for i in xrange(0, length): + triplet = self.sequence[i:i + 3] + if (triplet == "TAA" or triplet == "TAG" or triplet == "TGA"): + phase = i % 3 + orf[phase].append(i) + return orf + + + ## Convert the sequence into upper case + # + def upCase(self): + self.sequence = self.sequence.upper() + + + ## Convert the sequence into lower case + # + def lowCase(self): + self.sequence = self.sequence.lower() + + + ## Extract the cluster of the fragment (output from Grouper) + # + # @return cluster id (string) + # + def getClusterID(self): + data = self.header.split() + return data[0].split("Cl")[1] + + + ## Extract the group of the sequence (output from Grouper) + # + # @return group id (string) + # + def getGroupID(self): + data = self.header.split() + return data[0].split("Gr")[1].split("Cl")[0] + + + ## Get the header of the full sequence (output from Grouper) + # + # @example 'Dmel_Grouper_3091_Malign_3:LARD' from '>MbS1566Gr81Cl81 Dmel_Grouper_3091_Malign_3:LARD {Fragment} 1..5203' + # @return header (string) + # + def getHeaderFullSeq(self): + data = self.header.split() + return data[1] + + + ## Get the strand of the fragment (output from Grouper) + # + # @return: strand (+ or -) + # + def getFragStrand(self): + data = self.header.split() + coord = data[3].split("..") + if int(coord[0]) < int(coord[-1]): + return "+" + else: + return "-" + + + ## Get A, T, G, C or N from an IUPAC letter + # IUPAC = ['A','T','G','C','U','R','Y','M','K','W','S','B','D','H','V','N'] + # + # @return A, T, G, C or N + # + def getATGCNFromIUPAC(self, nt): + subset = ["A", "T", "G", "C", "N"] + + if nt in subset: + return nt + elif nt == "U": + return "T" + elif nt == "R": + return random.choice("AG") + elif nt == "Y": + return random.choice("CT") + elif nt == "M": + return random.choice("CA") + elif nt == "K": + return random.choice("TG") + elif nt == "W": + return random.choice("TA") + elif nt == "S": + return random.choice("CG") + elif nt == "B": + return random.choice("CTG") + elif nt == "D": + return random.choice("ATG") + elif nt == "H": + return random.choice("ATC") + elif nt == "V": + return random.choice("ACG") + else: + return "N" + + ## Get nucleotide from an IUPAC letter and a nucleotide + # Works only for IUPAC code with two possibilities ['R','Y','M','K','W','S'] + # Examples: + # Y and C returns T + # Y and T returns C + # B and C throws RepetException + # + # @return A, T, G, C + # + def getATGCNFromIUPACandATGCN(self, IUPACCode, nt): + if IUPACCode == "R": + possibleNt = set(["A", "G"]) + if nt not in possibleNt: + raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt)) + return (possibleNt - set(nt)).pop() + + elif IUPACCode == "Y": + possibleNt = set(["C", "T"]) + if nt not in possibleNt: + raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt)) + return (possibleNt - set(nt)).pop() + + elif IUPACCode == "M": + possibleNt = set(["A", "C"]) + if nt not in possibleNt: + raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt)) + return (possibleNt - set(nt)).pop() + + elif IUPACCode == "K": + possibleNt = set(["T", "G"]) + if nt not in possibleNt: + raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt)) + return (possibleNt - set(nt)).pop() + + elif IUPACCode == "W": + possibleNt = set(["A", "T"]) + if nt not in possibleNt: + raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt)) + return (possibleNt - set(nt)).pop() + + elif IUPACCode == "S": + possibleNt = set(["C", "G"]) + if nt not in possibleNt: + raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt)) + return (possibleNt - set(nt)).pop() + + else: + raise RepetException("Can't retrieve the third nucleotide from IUPAC code '%s' and nucleotide '%s'" % (IUPACCode, nt)) + + def getSeqWithOnlyATGCN(self): + newSeq = "" + for nt in self.sequence: + newSeq += self.getATGCNFromIUPAC(nt) + return newSeq + + + ## Replace any symbol not in (A,T,G,C,N) by another nucleotide it represents + # + def partialIUPAC(self): + self.sequence = self.getSeqWithOnlyATGCN() + + + ## Remove non Unix end-of-line symbols, if any + # + def checkEOF(self): + symbol = "\r" # corresponds to '^M' from Windows + if symbol in self.sequence: + print "WARNING: Windows EOF removed in '%s'" % (self.header) + sys.stdout.flush() + newSeq = self.sequence.replace(symbol, "") + self.sequence = newSeq + + + ## Write Bioseq instance into a fasta file handler + # + # @param faFileHandler file handler of a fasta file + # + def write(self, faFileHandler): + faFileHandler.write(">%s\n" % (self.header)) + self.writeSeqInFasta(faFileHandler) + + + ## Write only the sequence of Bioseq instance into a fasta file handler + # + # @param faFileHandler file handler of a fasta file + # + def writeSeqInFasta(self, faFileHandler): + i = 0 + while i < self.getLength(): + faFileHandler.write("%s\n" % (self.sequence[i:i + 60])) + i += 60 + + + ## Append Bioseq instance to a fasta file + # + # @param faFile name of a fasta file as a string + # @param mode 'write' or 'append' + # + def save(self, faFile, mode = "a"): + faFileHandler = open(faFile, mode) + self.write(faFileHandler) + faFileHandler.close() + + + ## Append Bioseq instance to a fasta file + # + # @param faFile name of a fasta file as a string + # + def appendBioseqInFile(self, faFile): + self.save(faFile, "a") + + + ## Write Bioseq instance into a fasta file handler + # + # @param faFileHandler file handler on a file with writing right + # + def writeABioseqInAFastaFile(self, faFileHandler): + self.write(faFileHandler) + + + ## Write Bioseq instance with other header into a fasta file handler + # + # @param faFileHandler file handler on a file with writing right + # @param otherHeader a string representing a new header (without the > and the \n) + # + def writeWithOtherHeader(self, faFileHandler, otherHeader): + self.header = otherHeader + self.write(faFileHandler) + + + ## Append Bioseq header and Bioseq sequence in a fasta file + # + # @param faFileHandler file handler on a file with writing right + # @param otherHeader a string representing a new header (without the > and the \n) + # + def writeABioseqInAFastaFileWithOtherHeader(self, faFileHandler, otherHeader): + self.writeWithOtherHeader(faFileHandler, otherHeader) + + + ## get the list of Maps corresponding to seq without gap + # + # @warning This method was called getMap() in pyRepet.Bioseq + # @return a list of Map object + # + def getLMapWhithoutGap(self): + lMaps = [] + countSite = 1 + countSubseq = 1 + inGap = False + startMap = -1 + endMap = -1 + + # initialize with the first site + if self.sequence[0] == "-": + inGap = True + else: + startMap = countSite + + # for each remaining site + for site in self.sequence[1:]: + countSite += 1 + + # if it is a gap + if site == "-": + + # if this is the beginning of a gap, record the previous subsequence + if inGap == False: + inGap = True + endMap = countSite - 1 + lMaps.append(Map("%s_subSeq%i" % (self.header, countSubseq), self.header, startMap, endMap)) + countSubseq += 1 + + # if it is NOT a gap + if site != "-": + + # if it is the end of a gap, begin the next subsequence + if inGap == True: + inGap = False + startMap = countSite + + # if it is the last site + if countSite == self.getLength(): + endMap = countSite + lMaps.append(Map("%s_subSeq%i" % (self.header, countSubseq), self.header, startMap, endMap)) + + return lMaps + + + ## get the percentage of GC + # + # @return a percentage + # + def getGCpercentage(self): + tmpSeq = self.getSeqWithOnlyATGCN() + nbGC = tmpSeq.count("G") + tmpSeq.count("C") + return 100 * nbGC / float(self.getLength()) + + ## get the percentage of GC of a sequence without counting N in sequence length + # + # @return a percentage + # + def getGCpercentageInSequenceWithoutCountNInLength(self): + tmpSeq = self.getSeqWithOnlyATGCN() + nbGC = tmpSeq.count("G") + tmpSeq.count("C") + return 100 * nbGC / float(self.getLength() - self.countNt("N")) + + ## get the 5 prime subsequence of a given length at the given position + # + # @param position integer + # @param flankLength integer subsequence length + # @return a sequence string + # + def get5PrimeFlank(self, position, flankLength): + if(position == 1): + return "" + else: + startOfFlank = 1 + endOfFlank = position - 1 + + if((position - flankLength) > 0): + startOfFlank = position - flankLength + else: + startOfFlank = 1 + + return self.subseq(startOfFlank, endOfFlank).sequence + + + ## get the 3 prime subsequence of a given length at the given position + # In the case of indels, the polymorphism length can be specified + # + # @param position integer + # @param flankLength integer subsequence length + # @param polymLength integer polymorphism length + # @return a sequence string + # + def get3PrimeFlank(self, position, flankLength, polymLength = 1): + if((position + polymLength) > len(self.sequence)): + return "" + else: + startOfFlank = position + polymLength + + if((position + polymLength + flankLength) > len(self.sequence)): + endOfFlank = len(self.sequence) + else: + endOfFlank = position + polymLength + flankLength - 1 + + return self.subseq(startOfFlank, endOfFlank).sequence + + + def _createWordList(self, size, l = ['A', 'T', 'G', 'C']): + if size == 1 : + return l + else: + l2 = [] + for i in l: + for j in ['A', 'T', 'G', 'C']: + l2.append(i + j) + return self._createWordList(size - 1, l2) + + + def removeSymbol(self, symbol): + tmp = self.sequence.replace(symbol, "") + self.sequence = tmp