Mercurial > repos > urgi-team > teiso
view TEisotools-1.0/commons/core/seq/Bioseq.py @ 6:20ec0d14798e draft
Uploaded
author | urgi-team |
---|---|
date | Wed, 20 Jul 2016 05:00:24 -0400 |
parents | |
children |
line wrap: on
line source
# Copyright INRA (Institut National de la Recherche Agronomique) # http://www.inra.fr # http://urgi.versailles.inra.fr # # This software is governed by the CeCILL license under French law and # abiding by the rules of distribution of free software. You can use, # modify and/ or redistribute the software under the terms of the CeCILL # license as circulated by CEA, CNRS and INRIA at the following URL # "http://www.cecill.info". # # As a counterpart to the access to the source code and rights to copy, # modify and redistribute granted by the license, users are provided only # with a limited warranty and the software's author, the holder of the # economic rights, and the successive licensors have only limited # liability. # # In this respect, the user's attention is drawn to the risks associated # with loading, using, modifying and/or developing or reproducing the # software by the user in light of its specific status of free software, # that may mean that it is complicated to manipulate, and that also # therefore means that it is reserved for developers and experienced # professionals having in-depth computer knowledge. Users are therefore # encouraged to load and test the software's suitability as regards their # requirements in conditions enabling the security of their systems and/or # data to be ensured and, more generally, to use and operate it in the # same conditions as regards security. # # The fact that you are presently reading this means that you have had # knowledge of the CeCILL license and that you accept its terms. import re import sys import random import string import cStringIO from commons.core.coord.Map import Map from commons.core.checker.RepetException import RepetException DNA_ALPHABET_WITH_N = set(['A', 'T', 'G', 'C', 'N']) IUPAC = set(['A', 'T', 'G', 'C', 'U', 'R', 'Y', 'M', 'K', 'W', 'S', 'B', 'D', 'H', 'V', 'N']) ## Record a sequence with its header # class Bioseq(object): __slots__ = ("header", "sequence", '__dict__') ## constructor # # @param name the header of sequence # @param seq sequence (DNA, RNA, protein) # def __init__(self, name = "", seq = ""): self.header = name self.sequence = seq ## Equal operator # def __eq__(self, o): if type(o) is type(self) and self.header == o.header and self.sequence == o.sequence: return True return False ## Not equal operator # def __ne__(self, o): return not self.__eq__(o) ## overload __repr__ # def __repr__(self): return "%s;%s" % (self.header, self.sequence) ## set attribute header # # @param header a string # def setHeader(self, header): self.header = header ## get attribute header # # @return header def getHeader(self): return self.header ## set attribute sequence # # @param sequence a string # def setSequence(self, sequence): self.sequence = sequence def getSequence(self): return self.sequence ## reset # def reset(self): self.setHeader("") self.setSequence("") ## Test if bioseq is empty # def isEmpty(self): return self.header == "" and self.sequence == "" ## Reverse the sequence # def reverse(self): tmp = self.sequence self.sequence = tmp[::-1] ## Turn the sequence into its complement # Force upper case letters # @warning: old name in pyRepet.Bioseq realComplement # def complement(self): complement = "" self.upCase() for i in xrange(0, len(self.sequence), 1): if self.sequence[i] == "A": complement += "T" elif self.sequence[i] == "T": complement += "A" elif self.sequence[i] == "C": complement += "G" elif self.sequence[i] == "G": complement += "C" elif self.sequence[i] == "M": complement += "K" elif self.sequence[i] == "R": complement += "Y" elif self.sequence[i] == "W": complement += "W" elif self.sequence[i] == "S": complement += "S" elif self.sequence[i] == "Y": complement += "R" elif self.sequence[i] == "K": complement += "M" elif self.sequence[i] == "V": complement += "B" elif self.sequence[i] == "H": complement += "D" elif self.sequence[i] == "D": complement += "H" elif self.sequence[i] == "B": complement += "V" elif self.sequence[i] == "N": complement += "N" elif self.sequence[i] == "-": complement += "-" else: print "WARNING: unknown symbol '%s', replacing it by N" % (self.sequence[i]) complement += "N" self.sequence = complement ## Reverse and complement the sequence # # Force upper case letters # @warning: old name in pyRepet.Bioseq : complement # def reverseComplement(self): self.reverse() self.complement() ## Remove gap in the sequence # def cleanGap(self): self.sequence = self.sequence.replace("-", "") ## Copy current Bioseq Instance # # @return: a Bioseq instance, a copy of current sequence. # def copyBioseqInstance(self): seq = Bioseq() seq.sequence = self.sequence seq.header = self.header return seq ## Add phase information after the name of sequence in header # # @param phase integer representing phase (1, 2, 3, -1, -2, -3) # def setFrameInfoOnHeader(self, phase): if " " in self.header: name, desc = self.header.split(" ", 1) name = name + "_" + str(phase) self.header = name + " " + desc else: self.header = self.header + "_" + str(phase) ## Fill Bioseq attributes with fasta file # # @param faFileHandler file handler of a fasta file # def read(self, faFileHandler): line = faFileHandler.readline() if line == "": self.header = None self.sequence = None return while line == "\n": line = faFileHandler.readline() if line[0] == '>': self.header = string.rstrip(line[1:]) else: print "error, line is", string.rstrip(line) return line = " " seq = cStringIO.StringIO() while line: prev_pos = faFileHandler.tell() line = faFileHandler.readline() if line == "": break if line[0] == '>': faFileHandler.seek(prev_pos) break seq.write(string.rstrip(line)) self.sequence = seq.getvalue() ## Create a subsequence with a modified header # # @param s integer start a required subsequence # @param e integer end a required subsequence # # @return a Bioseq instance, a subsequence of current sequence # def subseq(self, s, e = 0): if e == 0 : e = len(self.sequence) if s > e : print "error: start must be < or = to end" return if s <= 0 : print "error: start must be > 0" return sub = Bioseq() sub.header = self.header + " fragment " + str(s) + ".." + str(e) sub.sequence = self.sequence[(s - 1):e] return sub ## Get the nucleotide or aminoacid at the given position # # @param pos integer nucleotide or aminoacid position # # @return a string # def getNtFromPosition(self, pos): result = None if not (pos < 1 or pos > self.getLength()): result = self.sequence[pos - 1] return result ## Print in stdout the Bioseq in fasta format with 60 characters lines # # @param l length of required sequence default is whole sequence # def view(self, l = 0): print '>' + self.header i = 0 if(l == 0): l = len(self.sequence) seq = self.sequence[0:l] while i < len(seq): print seq[i:i + 60] i = i + 60 ## Get length of sequence # # @param avoidN boolean don't count 'N' nucleotides # # @return length of current sequence # def getLength(self, countN = True): if countN: return len(self.sequence) else: return len(self.sequence) - self.countNt('N') ## Return the proportion of a specific character # # @param nt character that we want to count # def propNt(self, nt): return self.countNt(nt) / float(self.getLength()) ## Count occurrence of specific character # # @param nt character that we want to count # # @return nb of occurrences # def countNt(self, nt): return self.sequence.count(nt) ## Count occurrence of each nucleotide in current seq # # @return a dict, keys are nucleotides, values are nb of occurrences # def countAllNt(self): dNt2Count = {} for nt in ["A", "T", "G", "C", "N"]: dNt2Count[ nt ] = self.countNt(nt) return dNt2Count ## Return a dict with the number of occurrences for each combination of ATGC of specified size and number of word found # # @param size integer required length word # def occ_word(self, size): occ = {} if size == 0: return occ, 0 nbword = 0 srch = re.compile('[^ATGC]+') wordlist = self._createWordList(size) for i in wordlist: occ[i] = 0 lenseq = len(self.sequence) i = 0 while i < lenseq - size + 1: word = self.sequence[i:i + size].upper() m = srch.search(word) if m == None: occ[word] = occ[word] + 1 nbword = nbword + 1 i = i + 1 else: i = i + m.end(0) return occ, nbword ## Return a dictionary with the frequency of occurs for each combination of ATGC of specified size # # @param size integer required length word # def freq_word(self, size): dOcc, nbWords = self.occ_word(size) freq = {} for word in dOcc.keys(): freq[word] = float(dOcc[word]) / nbWords return freq ## Find ORF in each phase # # @return: a dict, keys are phases, values are stop codon positions. # def findORF (self): orf = {0:[], 1:[], 2:[]} length = len (self.sequence) for i in xrange(0, length): triplet = self.sequence[i:i + 3] if (triplet == "TAA" or triplet == "TAG" or triplet == "TGA"): phase = i % 3 orf[phase].append(i) return orf ## Convert the sequence into upper case # def upCase(self): self.sequence = self.sequence.upper() ## Convert the sequence into lower case # def lowCase(self): self.sequence = self.sequence.lower() ## Extract the cluster of the fragment (output from Grouper) # # @return cluster id (string) # def getClusterID(self): data = self.header.split() return data[0].split("Cl")[1] ## Extract the group of the sequence (output from Grouper) # # @return group id (string) # def getGroupID(self): data = self.header.split() return data[0].split("Gr")[1].split("Cl")[0] ## Get the header of the full sequence (output from Grouper) # # @example 'Dmel_Grouper_3091_Malign_3:LARD' from '>MbS1566Gr81Cl81 Dmel_Grouper_3091_Malign_3:LARD {Fragment} 1..5203' # @return header (string) # def getHeaderFullSeq(self): data = self.header.split() return data[1] ## Get the strand of the fragment (output from Grouper) # # @return: strand (+ or -) # def getFragStrand(self): data = self.header.split() coord = data[3].split("..") if int(coord[0]) < int(coord[-1]): return "+" else: return "-" ## Get A, T, G, C or N from an IUPAC letter # IUPAC = ['A','T','G','C','U','R','Y','M','K','W','S','B','D','H','V','N'] # # @return A, T, G, C or N # def getATGCNFromIUPAC(self, nt): subset = ["A", "T", "G", "C", "N"] if nt in subset: return nt elif nt == "U": return "T" elif nt == "R": return random.choice("AG") elif nt == "Y": return random.choice("CT") elif nt == "M": return random.choice("CA") elif nt == "K": return random.choice("TG") elif nt == "W": return random.choice("TA") elif nt == "S": return random.choice("CG") elif nt == "B": return random.choice("CTG") elif nt == "D": return random.choice("ATG") elif nt == "H": return random.choice("ATC") elif nt == "V": return random.choice("ACG") else: return "N" ## Get nucleotide from an IUPAC letter and a nucleotide # Works only for IUPAC code with two possibilities ['R','Y','M','K','W','S'] # Examples: # Y and C returns T # Y and T returns C # B and C throws RepetException # # @return A, T, G, C # def getATGCNFromIUPACandATGCN(self, IUPACCode, nt): if IUPACCode == "R": possibleNt = set(["A", "G"]) if nt not in possibleNt: raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt)) return (possibleNt - set(nt)).pop() elif IUPACCode == "Y": possibleNt = set(["C", "T"]) if nt not in possibleNt: raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt)) return (possibleNt - set(nt)).pop() elif IUPACCode == "M": possibleNt = set(["A", "C"]) if nt not in possibleNt: raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt)) return (possibleNt - set(nt)).pop() elif IUPACCode == "K": possibleNt = set(["T", "G"]) if nt not in possibleNt: raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt)) return (possibleNt - set(nt)).pop() elif IUPACCode == "W": possibleNt = set(["A", "T"]) if nt not in possibleNt: raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt)) return (possibleNt - set(nt)).pop() elif IUPACCode == "S": possibleNt = set(["C", "G"]) if nt not in possibleNt: raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt)) return (possibleNt - set(nt)).pop() else: raise RepetException("Can't retrieve the third nucleotide from IUPAC code '%s' and nucleotide '%s'" % (IUPACCode, nt)) def getSeqWithOnlyATGCN(self): newSeq = "" for nt in self.sequence: newSeq += self.getATGCNFromIUPAC(nt) return newSeq ## Replace any symbol not in (A,T,G,C,N) by another nucleotide it represents # def partialIUPAC(self): self.sequence = self.getSeqWithOnlyATGCN() ## Remove non Unix end-of-line symbols, if any # def checkEOF(self): symbol = "\r" # corresponds to '^M' from Windows if symbol in self.sequence: print "WARNING: Windows EOF removed in '%s'" % (self.header) sys.stdout.flush() newSeq = self.sequence.replace(symbol, "") self.sequence = newSeq ## Write Bioseq instance into a fasta file handler # # @param faFileHandler file handler of a fasta file # def write(self, faFileHandler): faFileHandler.write(">%s\n" % (self.header)) self.writeSeqInFasta(faFileHandler) ## Write only the sequence of Bioseq instance into a fasta file handler # # @param faFileHandler file handler of a fasta file # def writeSeqInFasta(self, faFileHandler): i = 0 while i < self.getLength(): faFileHandler.write("%s\n" % (self.sequence[i:i + 60])) i += 60 ## Append Bioseq instance to a fasta file # # @param faFile name of a fasta file as a string # @param mode 'write' or 'append' # def save(self, faFile, mode = "a"): faFileHandler = open(faFile, mode) self.write(faFileHandler) faFileHandler.close() ## Append Bioseq instance to a fasta file # # @param faFile name of a fasta file as a string # def appendBioseqInFile(self, faFile): self.save(faFile, "a") ## Write Bioseq instance into a fasta file handler # # @param faFileHandler file handler on a file with writing right # def writeABioseqInAFastaFile(self, faFileHandler): self.write(faFileHandler) ## Write Bioseq instance with other header into a fasta file handler # # @param faFileHandler file handler on a file with writing right # @param otherHeader a string representing a new header (without the > and the \n) # def writeWithOtherHeader(self, faFileHandler, otherHeader): self.header = otherHeader self.write(faFileHandler) ## Append Bioseq header and Bioseq sequence in a fasta file # # @param faFileHandler file handler on a file with writing right # @param otherHeader a string representing a new header (without the > and the \n) # def writeABioseqInAFastaFileWithOtherHeader(self, faFileHandler, otherHeader): self.writeWithOtherHeader(faFileHandler, otherHeader) ## get the list of Maps corresponding to seq without gap # # @warning This method was called getMap() in pyRepet.Bioseq # @return a list of Map object # def getLMapWhithoutGap(self): lMaps = [] countSite = 1 countSubseq = 1 inGap = False startMap = -1 endMap = -1 # initialize with the first site if self.sequence[0] == "-": inGap = True else: startMap = countSite # for each remaining site for site in self.sequence[1:]: countSite += 1 # if it is a gap if site == "-": # if this is the beginning of a gap, record the previous subsequence if inGap == False: inGap = True endMap = countSite - 1 lMaps.append(Map("%s_subSeq%i" % (self.header, countSubseq), self.header, startMap, endMap)) countSubseq += 1 # if it is NOT a gap if site != "-": # if it is the end of a gap, begin the next subsequence if inGap == True: inGap = False startMap = countSite # if it is the last site if countSite == self.getLength(): endMap = countSite lMaps.append(Map("%s_subSeq%i" % (self.header, countSubseq), self.header, startMap, endMap)) return lMaps ## get the percentage of GC # # @return a percentage # def getGCpercentage(self): tmpSeq = self.getSeqWithOnlyATGCN() nbGC = tmpSeq.count("G") + tmpSeq.count("C") return 100 * nbGC / float(self.getLength()) ## get the percentage of GC of a sequence without counting N in sequence length # # @return a percentage # def getGCpercentageInSequenceWithoutCountNInLength(self): tmpSeq = self.getSeqWithOnlyATGCN() nbGC = tmpSeq.count("G") + tmpSeq.count("C") return 100 * nbGC / float(self.getLength() - self.countNt("N")) ## get the 5 prime subsequence of a given length at the given position # # @param position integer # @param flankLength integer subsequence length # @return a sequence string # def get5PrimeFlank(self, position, flankLength): if(position == 1): return "" else: startOfFlank = 1 endOfFlank = position - 1 if((position - flankLength) > 0): startOfFlank = position - flankLength else: startOfFlank = 1 return self.subseq(startOfFlank, endOfFlank).sequence ## get the 3 prime subsequence of a given length at the given position # In the case of indels, the polymorphism length can be specified # # @param position integer # @param flankLength integer subsequence length # @param polymLength integer polymorphism length # @return a sequence string # def get3PrimeFlank(self, position, flankLength, polymLength = 1): if((position + polymLength) > len(self.sequence)): return "" else: startOfFlank = position + polymLength if((position + polymLength + flankLength) > len(self.sequence)): endOfFlank = len(self.sequence) else: endOfFlank = position + polymLength + flankLength - 1 return self.subseq(startOfFlank, endOfFlank).sequence def _createWordList(self, size, l = ['A', 'T', 'G', 'C']): if size == 1 : return l else: l2 = [] for i in l: for j in ['A', 'T', 'G', 'C']: l2.append(i + j) return self._createWordList(size - 1, l2) def removeSymbol(self, symbol): tmp = self.sequence.replace(symbol, "") self.sequence = tmp