view TEisotools-1.0/commons/core/seq/Bioseq.py @ 6:20ec0d14798e draft

Uploaded
author urgi-team
date Wed, 20 Jul 2016 05:00:24 -0400
parents
children
line wrap: on
line source

# Copyright INRA (Institut National de la Recherche Agronomique)
# http://www.inra.fr
# http://urgi.versailles.inra.fr
#
# This software is governed by the CeCILL license under French law and
# abiding by the rules of distribution of free software.  You can  use,
# modify and/ or redistribute the software under the terms of the CeCILL
# license as circulated by CEA, CNRS and INRIA at the following URL
# "http://www.cecill.info".
#
# As a counterpart to the access to the source code and  rights to copy,
# modify and redistribute granted by the license, users are provided only
# with a limited warranty  and the software's author,  the holder of the
# economic rights,  and the successive licensors  have only  limited
# liability.
#
# In this respect, the user's attention is drawn to the risks associated
# with loading,  using,  modifying and/or developing or reproducing the
# software by the user in light of its specific status of free software,
# that may mean  that it is complicated to manipulate,  and  that  also
# therefore means  that it is reserved for developers  and  experienced
# professionals having in-depth computer knowledge. Users are therefore
# encouraged to load and test the software's suitability as regards their
# requirements in conditions enabling the security of their systems and/or
# data to be ensured and,  more generally, to use and operate it in the
# same conditions as regards security.
#
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license and that you accept its terms.


import re
import sys
import random
import string
import cStringIO
from commons.core.coord.Map import Map
from commons.core.checker.RepetException import RepetException

DNA_ALPHABET_WITH_N = set(['A', 'T', 'G', 'C', 'N'])
IUPAC = set(['A', 'T', 'G', 'C', 'U', 'R', 'Y', 'M', 'K', 'W', 'S', 'B', 'D', 'H', 'V', 'N'])


## Record a sequence with its header
#
class Bioseq(object):

    __slots__ = ("header", "sequence", '__dict__')

    ## constructor
    #
    # @param name the header of sequence
    # @param seq sequence (DNA, RNA, protein)
    #
    def __init__(self, name = "", seq = ""):
        self.header = name
        self.sequence = seq


    ## Equal operator
    #
    def __eq__(self, o):
        if type(o) is type(self) and self.header == o.header and self.sequence == o.sequence:
            return True
        return False

    ## Not equal operator
    #
    def __ne__(self, o):
        return not self.__eq__(o)

    ## overload __repr__
    #
    def __repr__(self):
        return "%s;%s" % (self.header, self.sequence)


    ## set attribute header
    #
    # @param header a string
    #
    def setHeader(self, header):
        self.header = header


    ## get attribute header
    #
    # @return header
    def getHeader(self):
        return self.header


    ## set attribute sequence
    #
    # @param sequence a string
    #
    def setSequence(self, sequence):
        self.sequence = sequence


    def getSequence(self):
        return self.sequence

    ## reset
    #
    def reset(self):
        self.setHeader("")
        self.setSequence("")


    ## Test if bioseq is empty
    #
    def isEmpty(self):
        return self.header == "" and self.sequence == ""


    ## Reverse the sequence
    #
    def reverse(self):
        tmp = self.sequence
        self.sequence = tmp[::-1]


    ## Turn the sequence into its complement
    #  Force upper case letters
    #  @warning: old name in pyRepet.Bioseq realComplement
    #
    def complement(self):
        complement = ""
        self.upCase()
        for i in xrange(0, len(self.sequence), 1):
            if self.sequence[i] == "A":
                complement += "T"
            elif self.sequence[i] == "T":
                complement += "A"
            elif self.sequence[i] == "C":
                complement += "G"
            elif self.sequence[i] == "G":
                complement += "C"
            elif self.sequence[i] == "M":
                complement += "K"
            elif self.sequence[i] == "R":
                complement += "Y"
            elif self.sequence[i] == "W":
                complement += "W"
            elif self.sequence[i] == "S":
                complement += "S"
            elif self.sequence[i] == "Y":
                complement += "R"
            elif self.sequence[i] == "K":
                complement += "M"
            elif self.sequence[i] == "V":
                complement += "B"
            elif self.sequence[i] == "H":
                complement += "D"
            elif self.sequence[i] == "D":
                complement += "H"
            elif self.sequence[i] == "B":
                complement += "V"
            elif self.sequence[i] == "N":
                complement += "N"
            elif self.sequence[i] == "-":
                complement += "-"
            else:
                print "WARNING: unknown symbol '%s', replacing it by N" % (self.sequence[i])
                complement += "N"
        self.sequence = complement


    ## Reverse and complement the sequence
    #
    #  Force upper case letters
    #  @warning: old name in pyRepet.Bioseq : complement
    #
    def reverseComplement(self):
        self.reverse()
        self.complement()


    ## Remove gap in the sequence
    #
    def cleanGap(self):
        self.sequence = self.sequence.replace("-", "")


    ## Copy current Bioseq Instance
    #
    # @return: a Bioseq instance, a copy of current sequence.
    #
    def copyBioseqInstance(self):
        seq = Bioseq()
        seq.sequence = self.sequence
        seq.header = self.header
        return seq


    ## Add phase information after the name of sequence in header
    #
    # @param phase integer representing phase (1, 2, 3, -1, -2, -3)
    #
    def setFrameInfoOnHeader(self, phase):
        if " " in self.header:
            name, desc = self.header.split(" ", 1)
            name = name + "_" + str(phase)
            self.header = name + " " + desc
        else:
            self.header = self.header + "_" + str(phase)


    ## Fill Bioseq attributes with fasta file
    #
    # @param faFileHandler file handler of a fasta file
    #
    def read(self, faFileHandler):
        line = faFileHandler.readline()
        if line == "":
            self.header = None
            self.sequence = None
            return
        while line == "\n":
            line = faFileHandler.readline()
        if line[0] == '>':
            self.header = string.rstrip(line[1:])
        else:
            print "error, line is", string.rstrip(line)
            return
        line = " "
        seq = cStringIO.StringIO()
        while line:
            prev_pos = faFileHandler.tell()
            line = faFileHandler.readline()
            if line == "":
                break
            if line[0] == '>':
                faFileHandler.seek(prev_pos)
                break
            seq.write(string.rstrip(line))
        self.sequence = seq.getvalue()


    ## Create a subsequence with a modified header
    #
    # @param s integer start a required subsequence
    # @param e integer end a required subsequence
    #
    # @return a Bioseq instance, a subsequence of current sequence
    #
    def subseq(self, s, e = 0):
        if e == 0 :
            e = len(self.sequence)
        if s > e :
            print "error: start must be < or = to end"
            return
        if s <= 0 :
            print "error: start must be > 0"
            return
        sub = Bioseq()
        sub.header = self.header + " fragment " + str(s) + ".." + str(e)
        sub.sequence = self.sequence[(s - 1):e]
        return sub


    ## Get the nucleotide or aminoacid at the given position
    #
    # @param pos integer nucleotide or aminoacid position
    #
    # @return a string
    #
    def getNtFromPosition(self, pos):
        result = None
        if not (pos < 1 or pos > self.getLength()):
            result = self.sequence[pos - 1]
        return result


    ## Print in stdout the Bioseq in fasta format with 60 characters lines
    #
    # @param l length of required sequence default is whole sequence
    #
    def view(self, l = 0):
        print '>' + self.header
        i = 0
        if(l == 0):
            l = len(self.sequence)
        seq = self.sequence[0:l]

        while i < len(seq):
            print seq[i:i + 60]
            i = i + 60


    ## Get length of sequence
    #
    # @param avoidN boolean don't count 'N' nucleotides
    #
    # @return length of current sequence
    #
    def getLength(self, countN = True):
        if countN:
            return len(self.sequence)
        else:
            return len(self.sequence) - self.countNt('N')


    ## Return the proportion of a specific character
    #
    # @param nt character that we want to count
    #
    def propNt(self, nt):
        return self.countNt(nt) / float(self.getLength())


    ## Count occurrence of specific character
    #
    # @param nt character that we want to count
    #
    # @return nb of occurrences
    #
    def countNt(self, nt):
        return self.sequence.count(nt)


    ## Count occurrence of each nucleotide in current seq
    #
    # @return a dict, keys are nucleotides, values are nb of occurrences
    #
    def countAllNt(self):
        dNt2Count = {}
        for nt in ["A", "T", "G", "C", "N"]:
            dNt2Count[ nt ] = self.countNt(nt)
        return dNt2Count


    ## Return a dict with the number of occurrences for each combination of ATGC of specified size and number of word found
    #
    # @param size integer required length word
    #
    def occ_word(self, size):
        occ = {}
        if size == 0:
            return occ, 0
        nbword = 0
        srch = re.compile('[^ATGC]+')
        wordlist = self._createWordList(size)
        for i in wordlist:
            occ[i] = 0
        lenseq = len(self.sequence)
        i = 0
        while i < lenseq - size + 1:
            word = self.sequence[i:i + size].upper()
            m = srch.search(word)
            if m == None:
                occ[word] = occ[word] + 1
                nbword = nbword + 1
                i = i + 1
            else:
                i = i + m.end(0)
        return occ, nbword


    ## Return a dictionary with the frequency of occurs for each combination of ATGC of specified size
    #
    # @param size integer required length word
    #
    def freq_word(self, size):
        dOcc, nbWords = self.occ_word(size)
        freq = {}
        for word in dOcc.keys():
            freq[word] = float(dOcc[word]) / nbWords
        return freq


    ## Find ORF in each phase
    #
    # @return: a dict, keys are phases, values are stop codon positions.
    #
    def findORF (self):
        orf = {0:[], 1:[], 2:[]}
        length = len (self.sequence)
        for i in xrange(0, length):
            triplet = self.sequence[i:i + 3]
            if (triplet == "TAA" or triplet == "TAG" or triplet == "TGA"):
                phase = i % 3
                orf[phase].append(i)
        return orf


    ## Convert the sequence into upper case
    #
    def upCase(self):
        self.sequence = self.sequence.upper()


    ## Convert the sequence into lower case
    #
    def lowCase(self):
        self.sequence = self.sequence.lower()


    ## Extract the cluster of the fragment (output from Grouper)
    #
    # @return cluster id (string)
    #
    def getClusterID(self):
        data = self.header.split()
        return data[0].split("Cl")[1]


    ## Extract the group of the sequence (output from Grouper)
    #
    # @return group id (string)
    #
    def getGroupID(self):
        data = self.header.split()
        return data[0].split("Gr")[1].split("Cl")[0]


    ## Get the header of the full sequence (output from Grouper)
    #
    # @example  'Dmel_Grouper_3091_Malign_3:LARD' from '>MbS1566Gr81Cl81 Dmel_Grouper_3091_Malign_3:LARD {Fragment} 1..5203'
    # @return header (string)
    #
    def getHeaderFullSeq(self):
        data = self.header.split()
        return data[1]


    ## Get the strand of the fragment (output from Grouper)
    #
    # @return: strand (+ or -)
    #
    def getFragStrand(self):
        data = self.header.split()
        coord = data[3].split("..")
        if int(coord[0]) < int(coord[-1]):
            return "+"
        else:
            return "-"


    ## Get A, T, G, C or N from an IUPAC letter
    #  IUPAC = ['A','T','G','C','U','R','Y','M','K','W','S','B','D','H','V','N']
    #
    # @return A, T, G, C or N
    #
    def getATGCNFromIUPAC(self, nt):
        subset = ["A", "T", "G", "C", "N"]

        if nt in subset:
            return nt
        elif nt == "U":
            return "T"
        elif nt == "R":
            return random.choice("AG")
        elif nt == "Y":
            return random.choice("CT")
        elif nt == "M":
            return random.choice("CA")
        elif nt == "K":
            return random.choice("TG")
        elif nt == "W":
            return random.choice("TA")
        elif nt == "S":
            return random.choice("CG")
        elif nt == "B":
            return random.choice("CTG")
        elif nt == "D":
            return random.choice("ATG")
        elif nt == "H":
            return random.choice("ATC")
        elif nt == "V":
            return random.choice("ACG")
        else:
            return "N"

    ## Get nucleotide from an IUPAC letter and a nucleotide
    # Works only for IUPAC code with two possibilities ['R','Y','M','K','W','S']
    # Examples:
    # Y and C returns T
    # Y and T returns C
    # B and C throws RepetException
    #
    # @return A, T, G, C
    #
    def getATGCNFromIUPACandATGCN(self, IUPACCode, nt):
        if IUPACCode == "R":
            possibleNt = set(["A", "G"])
            if nt not in possibleNt:
                raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
            return (possibleNt - set(nt)).pop()

        elif IUPACCode == "Y":
            possibleNt = set(["C", "T"])
            if nt not in possibleNt:
                raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
            return (possibleNt - set(nt)).pop()

        elif IUPACCode == "M":
            possibleNt = set(["A", "C"])
            if nt not in possibleNt:
                raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
            return (possibleNt - set(nt)).pop()

        elif IUPACCode == "K":
            possibleNt = set(["T", "G"])
            if nt not in possibleNt:
                raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
            return (possibleNt - set(nt)).pop()

        elif IUPACCode == "W":
            possibleNt = set(["A", "T"])
            if nt not in possibleNt:
                raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
            return (possibleNt - set(nt)).pop()

        elif IUPACCode == "S":
            possibleNt = set(["C", "G"])
            if nt not in possibleNt:
                raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
            return (possibleNt - set(nt)).pop()

        else:
            raise RepetException("Can't retrieve the third nucleotide from IUPAC code '%s' and nucleotide '%s'" % (IUPACCode, nt))

    def getSeqWithOnlyATGCN(self):
        newSeq = ""
        for nt in self.sequence:
            newSeq += self.getATGCNFromIUPAC(nt)
        return newSeq


    ## Replace any symbol not in (A,T,G,C,N) by another nucleotide it represents
    #
    def partialIUPAC(self):
        self.sequence = self.getSeqWithOnlyATGCN()


    ## Remove non Unix end-of-line symbols, if any
    #
    def checkEOF(self):
        symbol = "\r"   # corresponds to '^M' from Windows
        if symbol in self.sequence:
            print "WARNING: Windows EOF removed in '%s'" % (self.header)
            sys.stdout.flush()
            newSeq = self.sequence.replace(symbol, "")
            self.sequence = newSeq


    ## Write Bioseq instance into a fasta file handler
    #
    # @param faFileHandler file handler of a fasta file
    #
    def write(self, faFileHandler):
        faFileHandler.write(">%s\n" % (self.header))
        self.writeSeqInFasta(faFileHandler)


    ## Write only the sequence of Bioseq instance into a fasta file handler
    #
    # @param faFileHandler file handler of a fasta file
    #
    def writeSeqInFasta(self, faFileHandler):
        i = 0
        while i < self.getLength():
            faFileHandler.write("%s\n" % (self.sequence[i:i + 60]))
            i += 60


    ## Append Bioseq instance to a fasta file
    #
    # @param faFile name of a fasta file as a string
    # @param mode 'write' or 'append'
    #
    def save(self, faFile, mode = "a"):
        faFileHandler = open(faFile, mode)
        self.write(faFileHandler)
        faFileHandler.close()


    ## Append Bioseq instance to a fasta file
    #
    # @param faFile name of a fasta file as a string
    #
    def appendBioseqInFile(self, faFile):
        self.save(faFile, "a")


    ## Write Bioseq instance into a fasta file handler
    #
    # @param faFileHandler file handler on a file with writing right
    #
    def writeABioseqInAFastaFile(self, faFileHandler):
        self.write(faFileHandler)


    ## Write Bioseq instance with other header into a fasta file handler
    #
    # @param faFileHandler file handler on a file with writing right
    # @param otherHeader a string representing a new header (without the > and the \n)
    #
    def writeWithOtherHeader(self, faFileHandler, otherHeader):
        self.header = otherHeader
        self.write(faFileHandler)


    ## Append Bioseq header and Bioseq sequence in a fasta file
    #
    # @param faFileHandler file handler on a file with writing right
    # @param otherHeader a string representing a new header (without the > and the \n)
    #
    def writeABioseqInAFastaFileWithOtherHeader(self, faFileHandler, otherHeader):
        self.writeWithOtherHeader(faFileHandler, otherHeader)


    ## get the list of Maps corresponding to seq without gap
    #
    # @warning This method was called getMap() in pyRepet.Bioseq
    # @return a list of Map object
    #
    def getLMapWhithoutGap(self):
        lMaps = []
        countSite = 1
        countSubseq = 1
        inGap = False
        startMap = -1
        endMap = -1

        # initialize with the first site
        if self.sequence[0] == "-":
            inGap = True
        else:
            startMap = countSite

        # for each remaining site
        for site in self.sequence[1:]:
            countSite += 1

            # if it is a gap
            if site == "-":

                # if this is the beginning of a gap, record the previous subsequence
                if inGap == False:
                    inGap = True
                    endMap = countSite - 1
                    lMaps.append(Map("%s_subSeq%i" % (self.header, countSubseq), self.header, startMap, endMap))
                    countSubseq += 1

            # if it is NOT a gap
            if site != "-":

                # if it is the end of a gap, begin the next subsequence
                if inGap == True:
                    inGap = False
                    startMap = countSite

                # if it is the last site
                if countSite == self.getLength():
                    endMap = countSite
                    lMaps.append(Map("%s_subSeq%i" % (self.header, countSubseq), self.header, startMap, endMap))

        return lMaps


    ## get the percentage of GC
    #
    # @return a percentage
    #
    def getGCpercentage(self):
        tmpSeq = self.getSeqWithOnlyATGCN()
        nbGC = tmpSeq.count("G") + tmpSeq.count("C")
        return 100 * nbGC / float(self.getLength())

    ## get the percentage of GC of a sequence without counting N in sequence length
    #
    # @return a percentage
    #
    def getGCpercentageInSequenceWithoutCountNInLength(self):
        tmpSeq = self.getSeqWithOnlyATGCN()
        nbGC = tmpSeq.count("G") + tmpSeq.count("C")
        return 100 * nbGC / float(self.getLength() - self.countNt("N"))

    ## get the 5 prime subsequence of a given length at the given position
    #
    # @param position integer
    # @param flankLength integer subsequence length
    # @return a sequence string
    #
    def get5PrimeFlank(self, position, flankLength):
        if(position == 1):
            return ""
        else:
            startOfFlank = 1
            endOfFlank = position - 1

            if((position - flankLength) > 0):
                startOfFlank = position - flankLength
            else:
                startOfFlank = 1

            return self.subseq(startOfFlank, endOfFlank).sequence


    ## get the 3 prime subsequence of a given length at the given position
    #  In the case of indels, the polymorphism length can be specified
    #
    # @param position integer
    # @param flankLength integer subsequence length
    # @param polymLength integer polymorphism length
    # @return a sequence string
    #
    def get3PrimeFlank(self, position, flankLength, polymLength = 1):
        if((position + polymLength) > len(self.sequence)):
            return ""
        else:
            startOfFlank = position + polymLength

            if((position + polymLength + flankLength) > len(self.sequence)):
                endOfFlank = len(self.sequence)
            else:
                endOfFlank = position + polymLength + flankLength - 1

            return self.subseq(startOfFlank, endOfFlank).sequence


    def _createWordList(self, size, l = ['A', 'T', 'G', 'C']):
        if size == 1 :
            return l
        else:
            l2 = []
            for i in l:
                for j in ['A', 'T', 'G', 'C']:
                    l2.append(i + j)
        return self._createWordList(size - 1, l2)


    def removeSymbol(self, symbol):
        tmp = self.sequence.replace(symbol, "")
        self.sequence = tmp