diff TEisotools-1.0/commons/core/seq/Bioseq.py @ 6:20ec0d14798e draft

Uploaded
author urgi-team
date Wed, 20 Jul 2016 05:00:24 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/TEisotools-1.0/commons/core/seq/Bioseq.py	Wed Jul 20 05:00:24 2016 -0400
@@ -0,0 +1,738 @@
+# Copyright INRA (Institut National de la Recherche Agronomique)
+# http://www.inra.fr
+# http://urgi.versailles.inra.fr
+#
+# This software is governed by the CeCILL license under French law and
+# abiding by the rules of distribution of free software.  You can  use,
+# modify and/ or redistribute the software under the terms of the CeCILL
+# license as circulated by CEA, CNRS and INRIA at the following URL
+# "http://www.cecill.info".
+#
+# As a counterpart to the access to the source code and  rights to copy,
+# modify and redistribute granted by the license, users are provided only
+# with a limited warranty  and the software's author,  the holder of the
+# economic rights,  and the successive licensors  have only  limited
+# liability.
+#
+# In this respect, the user's attention is drawn to the risks associated
+# with loading,  using,  modifying and/or developing or reproducing the
+# software by the user in light of its specific status of free software,
+# that may mean  that it is complicated to manipulate,  and  that  also
+# therefore means  that it is reserved for developers  and  experienced
+# professionals having in-depth computer knowledge. Users are therefore
+# encouraged to load and test the software's suitability as regards their
+# requirements in conditions enabling the security of their systems and/or
+# data to be ensured and,  more generally, to use and operate it in the
+# same conditions as regards security.
+#
+# The fact that you are presently reading this means that you have had
+# knowledge of the CeCILL license and that you accept its terms.
+
+
+import re
+import sys
+import random
+import string
+import cStringIO
+from commons.core.coord.Map import Map
+from commons.core.checker.RepetException import RepetException
+
+DNA_ALPHABET_WITH_N = set(['A', 'T', 'G', 'C', 'N'])
+IUPAC = set(['A', 'T', 'G', 'C', 'U', 'R', 'Y', 'M', 'K', 'W', 'S', 'B', 'D', 'H', 'V', 'N'])
+
+
+## Record a sequence with its header
+#
+class Bioseq(object):
+
+    __slots__ = ("header", "sequence", '__dict__')
+
+    ## constructor
+    #
+    # @param name the header of sequence
+    # @param seq sequence (DNA, RNA, protein)
+    #
+    def __init__(self, name = "", seq = ""):
+        self.header = name
+        self.sequence = seq
+
+
+    ## Equal operator
+    #
+    def __eq__(self, o):
+        if type(o) is type(self) and self.header == o.header and self.sequence == o.sequence:
+            return True
+        return False
+
+    ## Not equal operator
+    #
+    def __ne__(self, o):
+        return not self.__eq__(o)
+
+    ## overload __repr__
+    #
+    def __repr__(self):
+        return "%s;%s" % (self.header, self.sequence)
+
+
+    ## set attribute header
+    #
+    # @param header a string
+    #
+    def setHeader(self, header):
+        self.header = header
+
+
+    ## get attribute header
+    #
+    # @return header
+    def getHeader(self):
+        return self.header
+
+
+    ## set attribute sequence
+    #
+    # @param sequence a string
+    #
+    def setSequence(self, sequence):
+        self.sequence = sequence
+
+
+    def getSequence(self):
+        return self.sequence
+
+    ## reset
+    #
+    def reset(self):
+        self.setHeader("")
+        self.setSequence("")
+
+
+    ## Test if bioseq is empty
+    #
+    def isEmpty(self):
+        return self.header == "" and self.sequence == ""
+
+
+    ## Reverse the sequence
+    #
+    def reverse(self):
+        tmp = self.sequence
+        self.sequence = tmp[::-1]
+
+
+    ## Turn the sequence into its complement
+    #  Force upper case letters
+    #  @warning: old name in pyRepet.Bioseq realComplement
+    #
+    def complement(self):
+        complement = ""
+        self.upCase()
+        for i in xrange(0, len(self.sequence), 1):
+            if self.sequence[i] == "A":
+                complement += "T"
+            elif self.sequence[i] == "T":
+                complement += "A"
+            elif self.sequence[i] == "C":
+                complement += "G"
+            elif self.sequence[i] == "G":
+                complement += "C"
+            elif self.sequence[i] == "M":
+                complement += "K"
+            elif self.sequence[i] == "R":
+                complement += "Y"
+            elif self.sequence[i] == "W":
+                complement += "W"
+            elif self.sequence[i] == "S":
+                complement += "S"
+            elif self.sequence[i] == "Y":
+                complement += "R"
+            elif self.sequence[i] == "K":
+                complement += "M"
+            elif self.sequence[i] == "V":
+                complement += "B"
+            elif self.sequence[i] == "H":
+                complement += "D"
+            elif self.sequence[i] == "D":
+                complement += "H"
+            elif self.sequence[i] == "B":
+                complement += "V"
+            elif self.sequence[i] == "N":
+                complement += "N"
+            elif self.sequence[i] == "-":
+                complement += "-"
+            else:
+                print "WARNING: unknown symbol '%s', replacing it by N" % (self.sequence[i])
+                complement += "N"
+        self.sequence = complement
+
+
+    ## Reverse and complement the sequence
+    #
+    #  Force upper case letters
+    #  @warning: old name in pyRepet.Bioseq : complement
+    #
+    def reverseComplement(self):
+        self.reverse()
+        self.complement()
+
+
+    ## Remove gap in the sequence
+    #
+    def cleanGap(self):
+        self.sequence = self.sequence.replace("-", "")
+
+
+    ## Copy current Bioseq Instance
+    #
+    # @return: a Bioseq instance, a copy of current sequence.
+    #
+    def copyBioseqInstance(self):
+        seq = Bioseq()
+        seq.sequence = self.sequence
+        seq.header = self.header
+        return seq
+
+
+    ## Add phase information after the name of sequence in header
+    #
+    # @param phase integer representing phase (1, 2, 3, -1, -2, -3)
+    #
+    def setFrameInfoOnHeader(self, phase):
+        if " " in self.header:
+            name, desc = self.header.split(" ", 1)
+            name = name + "_" + str(phase)
+            self.header = name + " " + desc
+        else:
+            self.header = self.header + "_" + str(phase)
+
+
+    ## Fill Bioseq attributes with fasta file
+    #
+    # @param faFileHandler file handler of a fasta file
+    #
+    def read(self, faFileHandler):
+        line = faFileHandler.readline()
+        if line == "":
+            self.header = None
+            self.sequence = None
+            return
+        while line == "\n":
+            line = faFileHandler.readline()
+        if line[0] == '>':
+            self.header = string.rstrip(line[1:])
+        else:
+            print "error, line is", string.rstrip(line)
+            return
+        line = " "
+        seq = cStringIO.StringIO()
+        while line:
+            prev_pos = faFileHandler.tell()
+            line = faFileHandler.readline()
+            if line == "":
+                break
+            if line[0] == '>':
+                faFileHandler.seek(prev_pos)
+                break
+            seq.write(string.rstrip(line))
+        self.sequence = seq.getvalue()
+
+
+    ## Create a subsequence with a modified header
+    #
+    # @param s integer start a required subsequence
+    # @param e integer end a required subsequence
+    #
+    # @return a Bioseq instance, a subsequence of current sequence
+    #
+    def subseq(self, s, e = 0):
+        if e == 0 :
+            e = len(self.sequence)
+        if s > e :
+            print "error: start must be < or = to end"
+            return
+        if s <= 0 :
+            print "error: start must be > 0"
+            return
+        sub = Bioseq()
+        sub.header = self.header + " fragment " + str(s) + ".." + str(e)
+        sub.sequence = self.sequence[(s - 1):e]
+        return sub
+
+
+    ## Get the nucleotide or aminoacid at the given position
+    #
+    # @param pos integer nucleotide or aminoacid position
+    #
+    # @return a string
+    #
+    def getNtFromPosition(self, pos):
+        result = None
+        if not (pos < 1 or pos > self.getLength()):
+            result = self.sequence[pos - 1]
+        return result
+
+
+    ## Print in stdout the Bioseq in fasta format with 60 characters lines
+    #
+    # @param l length of required sequence default is whole sequence
+    #
+    def view(self, l = 0):
+        print '>' + self.header
+        i = 0
+        if(l == 0):
+            l = len(self.sequence)
+        seq = self.sequence[0:l]
+
+        while i < len(seq):
+            print seq[i:i + 60]
+            i = i + 60
+
+
+    ## Get length of sequence
+    #
+    # @param avoidN boolean don't count 'N' nucleotides
+    #
+    # @return length of current sequence
+    #
+    def getLength(self, countN = True):
+        if countN:
+            return len(self.sequence)
+        else:
+            return len(self.sequence) - self.countNt('N')
+
+
+    ## Return the proportion of a specific character
+    #
+    # @param nt character that we want to count
+    #
+    def propNt(self, nt):
+        return self.countNt(nt) / float(self.getLength())
+
+
+    ## Count occurrence of specific character
+    #
+    # @param nt character that we want to count
+    #
+    # @return nb of occurrences
+    #
+    def countNt(self, nt):
+        return self.sequence.count(nt)
+
+
+    ## Count occurrence of each nucleotide in current seq
+    #
+    # @return a dict, keys are nucleotides, values are nb of occurrences
+    #
+    def countAllNt(self):
+        dNt2Count = {}
+        for nt in ["A", "T", "G", "C", "N"]:
+            dNt2Count[ nt ] = self.countNt(nt)
+        return dNt2Count
+
+
+    ## Return a dict with the number of occurrences for each combination of ATGC of specified size and number of word found
+    #
+    # @param size integer required length word
+    #
+    def occ_word(self, size):
+        occ = {}
+        if size == 0:
+            return occ, 0
+        nbword = 0
+        srch = re.compile('[^ATGC]+')
+        wordlist = self._createWordList(size)
+        for i in wordlist:
+            occ[i] = 0
+        lenseq = len(self.sequence)
+        i = 0
+        while i < lenseq - size + 1:
+            word = self.sequence[i:i + size].upper()
+            m = srch.search(word)
+            if m == None:
+                occ[word] = occ[word] + 1
+                nbword = nbword + 1
+                i = i + 1
+            else:
+                i = i + m.end(0)
+        return occ, nbword
+
+
+    ## Return a dictionary with the frequency of occurs for each combination of ATGC of specified size
+    #
+    # @param size integer required length word
+    #
+    def freq_word(self, size):
+        dOcc, nbWords = self.occ_word(size)
+        freq = {}
+        for word in dOcc.keys():
+            freq[word] = float(dOcc[word]) / nbWords
+        return freq
+
+
+    ## Find ORF in each phase
+    #
+    # @return: a dict, keys are phases, values are stop codon positions.
+    #
+    def findORF (self):
+        orf = {0:[], 1:[], 2:[]}
+        length = len (self.sequence)
+        for i in xrange(0, length):
+            triplet = self.sequence[i:i + 3]
+            if (triplet == "TAA" or triplet == "TAG" or triplet == "TGA"):
+                phase = i % 3
+                orf[phase].append(i)
+        return orf
+
+
+    ## Convert the sequence into upper case
+    #
+    def upCase(self):
+        self.sequence = self.sequence.upper()
+
+
+    ## Convert the sequence into lower case
+    #
+    def lowCase(self):
+        self.sequence = self.sequence.lower()
+
+
+    ## Extract the cluster of the fragment (output from Grouper)
+    #
+    # @return cluster id (string)
+    #
+    def getClusterID(self):
+        data = self.header.split()
+        return data[0].split("Cl")[1]
+
+
+    ## Extract the group of the sequence (output from Grouper)
+    #
+    # @return group id (string)
+    #
+    def getGroupID(self):
+        data = self.header.split()
+        return data[0].split("Gr")[1].split("Cl")[0]
+
+
+    ## Get the header of the full sequence (output from Grouper)
+    #
+    # @example  'Dmel_Grouper_3091_Malign_3:LARD' from '>MbS1566Gr81Cl81 Dmel_Grouper_3091_Malign_3:LARD {Fragment} 1..5203'
+    # @return header (string)
+    #
+    def getHeaderFullSeq(self):
+        data = self.header.split()
+        return data[1]
+
+
+    ## Get the strand of the fragment (output from Grouper)
+    #
+    # @return: strand (+ or -)
+    #
+    def getFragStrand(self):
+        data = self.header.split()
+        coord = data[3].split("..")
+        if int(coord[0]) < int(coord[-1]):
+            return "+"
+        else:
+            return "-"
+
+
+    ## Get A, T, G, C or N from an IUPAC letter
+    #  IUPAC = ['A','T','G','C','U','R','Y','M','K','W','S','B','D','H','V','N']
+    #
+    # @return A, T, G, C or N
+    #
+    def getATGCNFromIUPAC(self, nt):
+        subset = ["A", "T", "G", "C", "N"]
+
+        if nt in subset:
+            return nt
+        elif nt == "U":
+            return "T"
+        elif nt == "R":
+            return random.choice("AG")
+        elif nt == "Y":
+            return random.choice("CT")
+        elif nt == "M":
+            return random.choice("CA")
+        elif nt == "K":
+            return random.choice("TG")
+        elif nt == "W":
+            return random.choice("TA")
+        elif nt == "S":
+            return random.choice("CG")
+        elif nt == "B":
+            return random.choice("CTG")
+        elif nt == "D":
+            return random.choice("ATG")
+        elif nt == "H":
+            return random.choice("ATC")
+        elif nt == "V":
+            return random.choice("ACG")
+        else:
+            return "N"
+
+    ## Get nucleotide from an IUPAC letter and a nucleotide
+    # Works only for IUPAC code with two possibilities ['R','Y','M','K','W','S']
+    # Examples:
+    # Y and C returns T
+    # Y and T returns C
+    # B and C throws RepetException
+    #
+    # @return A, T, G, C
+    #
+    def getATGCNFromIUPACandATGCN(self, IUPACCode, nt):
+        if IUPACCode == "R":
+            possibleNt = set(["A", "G"])
+            if nt not in possibleNt:
+                raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
+            return (possibleNt - set(nt)).pop()
+
+        elif IUPACCode == "Y":
+            possibleNt = set(["C", "T"])
+            if nt not in possibleNt:
+                raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
+            return (possibleNt - set(nt)).pop()
+
+        elif IUPACCode == "M":
+            possibleNt = set(["A", "C"])
+            if nt not in possibleNt:
+                raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
+            return (possibleNt - set(nt)).pop()
+
+        elif IUPACCode == "K":
+            possibleNt = set(["T", "G"])
+            if nt not in possibleNt:
+                raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
+            return (possibleNt - set(nt)).pop()
+
+        elif IUPACCode == "W":
+            possibleNt = set(["A", "T"])
+            if nt not in possibleNt:
+                raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
+            return (possibleNt - set(nt)).pop()
+
+        elif IUPACCode == "S":
+            possibleNt = set(["C", "G"])
+            if nt not in possibleNt:
+                raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
+            return (possibleNt - set(nt)).pop()
+
+        else:
+            raise RepetException("Can't retrieve the third nucleotide from IUPAC code '%s' and nucleotide '%s'" % (IUPACCode, nt))
+
+    def getSeqWithOnlyATGCN(self):
+        newSeq = ""
+        for nt in self.sequence:
+            newSeq += self.getATGCNFromIUPAC(nt)
+        return newSeq
+
+
+    ## Replace any symbol not in (A,T,G,C,N) by another nucleotide it represents
+    #
+    def partialIUPAC(self):
+        self.sequence = self.getSeqWithOnlyATGCN()
+
+
+    ## Remove non Unix end-of-line symbols, if any
+    #
+    def checkEOF(self):
+        symbol = "\r"   # corresponds to '^M' from Windows
+        if symbol in self.sequence:
+            print "WARNING: Windows EOF removed in '%s'" % (self.header)
+            sys.stdout.flush()
+            newSeq = self.sequence.replace(symbol, "")
+            self.sequence = newSeq
+
+
+    ## Write Bioseq instance into a fasta file handler
+    #
+    # @param faFileHandler file handler of a fasta file
+    #
+    def write(self, faFileHandler):
+        faFileHandler.write(">%s\n" % (self.header))
+        self.writeSeqInFasta(faFileHandler)
+
+
+    ## Write only the sequence of Bioseq instance into a fasta file handler
+    #
+    # @param faFileHandler file handler of a fasta file
+    #
+    def writeSeqInFasta(self, faFileHandler):
+        i = 0
+        while i < self.getLength():
+            faFileHandler.write("%s\n" % (self.sequence[i:i + 60]))
+            i += 60
+
+
+    ## Append Bioseq instance to a fasta file
+    #
+    # @param faFile name of a fasta file as a string
+    # @param mode 'write' or 'append'
+    #
+    def save(self, faFile, mode = "a"):
+        faFileHandler = open(faFile, mode)
+        self.write(faFileHandler)
+        faFileHandler.close()
+
+
+    ## Append Bioseq instance to a fasta file
+    #
+    # @param faFile name of a fasta file as a string
+    #
+    def appendBioseqInFile(self, faFile):
+        self.save(faFile, "a")
+
+
+    ## Write Bioseq instance into a fasta file handler
+    #
+    # @param faFileHandler file handler on a file with writing right
+    #
+    def writeABioseqInAFastaFile(self, faFileHandler):
+        self.write(faFileHandler)
+
+
+    ## Write Bioseq instance with other header into a fasta file handler
+    #
+    # @param faFileHandler file handler on a file with writing right
+    # @param otherHeader a string representing a new header (without the > and the \n)
+    #
+    def writeWithOtherHeader(self, faFileHandler, otherHeader):
+        self.header = otherHeader
+        self.write(faFileHandler)
+
+
+    ## Append Bioseq header and Bioseq sequence in a fasta file
+    #
+    # @param faFileHandler file handler on a file with writing right
+    # @param otherHeader a string representing a new header (without the > and the \n)
+    #
+    def writeABioseqInAFastaFileWithOtherHeader(self, faFileHandler, otherHeader):
+        self.writeWithOtherHeader(faFileHandler, otherHeader)
+
+
+    ## get the list of Maps corresponding to seq without gap
+    #
+    # @warning This method was called getMap() in pyRepet.Bioseq
+    # @return a list of Map object
+    #
+    def getLMapWhithoutGap(self):
+        lMaps = []
+        countSite = 1
+        countSubseq = 1
+        inGap = False
+        startMap = -1
+        endMap = -1
+
+        # initialize with the first site
+        if self.sequence[0] == "-":
+            inGap = True
+        else:
+            startMap = countSite
+
+        # for each remaining site
+        for site in self.sequence[1:]:
+            countSite += 1
+
+            # if it is a gap
+            if site == "-":
+
+                # if this is the beginning of a gap, record the previous subsequence
+                if inGap == False:
+                    inGap = True
+                    endMap = countSite - 1
+                    lMaps.append(Map("%s_subSeq%i" % (self.header, countSubseq), self.header, startMap, endMap))
+                    countSubseq += 1
+
+            # if it is NOT a gap
+            if site != "-":
+
+                # if it is the end of a gap, begin the next subsequence
+                if inGap == True:
+                    inGap = False
+                    startMap = countSite
+
+                # if it is the last site
+                if countSite == self.getLength():
+                    endMap = countSite
+                    lMaps.append(Map("%s_subSeq%i" % (self.header, countSubseq), self.header, startMap, endMap))
+
+        return lMaps
+
+
+    ## get the percentage of GC
+    #
+    # @return a percentage
+    #
+    def getGCpercentage(self):
+        tmpSeq = self.getSeqWithOnlyATGCN()
+        nbGC = tmpSeq.count("G") + tmpSeq.count("C")
+        return 100 * nbGC / float(self.getLength())
+
+    ## get the percentage of GC of a sequence without counting N in sequence length
+    #
+    # @return a percentage
+    #
+    def getGCpercentageInSequenceWithoutCountNInLength(self):
+        tmpSeq = self.getSeqWithOnlyATGCN()
+        nbGC = tmpSeq.count("G") + tmpSeq.count("C")
+        return 100 * nbGC / float(self.getLength() - self.countNt("N"))
+
+    ## get the 5 prime subsequence of a given length at the given position
+    #
+    # @param position integer
+    # @param flankLength integer subsequence length
+    # @return a sequence string
+    #
+    def get5PrimeFlank(self, position, flankLength):
+        if(position == 1):
+            return ""
+        else:
+            startOfFlank = 1
+            endOfFlank = position - 1
+
+            if((position - flankLength) > 0):
+                startOfFlank = position - flankLength
+            else:
+                startOfFlank = 1
+
+            return self.subseq(startOfFlank, endOfFlank).sequence
+
+
+    ## get the 3 prime subsequence of a given length at the given position
+    #  In the case of indels, the polymorphism length can be specified
+    #
+    # @param position integer
+    # @param flankLength integer subsequence length
+    # @param polymLength integer polymorphism length
+    # @return a sequence string
+    #
+    def get3PrimeFlank(self, position, flankLength, polymLength = 1):
+        if((position + polymLength) > len(self.sequence)):
+            return ""
+        else:
+            startOfFlank = position + polymLength
+
+            if((position + polymLength + flankLength) > len(self.sequence)):
+                endOfFlank = len(self.sequence)
+            else:
+                endOfFlank = position + polymLength + flankLength - 1
+
+            return self.subseq(startOfFlank, endOfFlank).sequence
+
+
+    def _createWordList(self, size, l = ['A', 'T', 'G', 'C']):
+        if size == 1 :
+            return l
+        else:
+            l2 = []
+            for i in l:
+                for j in ['A', 'T', 'G', 'C']:
+                    l2.append(i + j)
+        return self._createWordList(size - 1, l2)
+
+
+    def removeSymbol(self, symbol):
+        tmp = self.sequence.replace(symbol, "")
+        self.sequence = tmp