view commons/core/seq/Bioseq.py @ 36:44d5973c188c

Uploaded
author m-zytnicki
date Tue, 30 Apr 2013 15:02:29 -0400
parents
children
line wrap: on
line source

# Copyright INRA (Institut National de la Recherche Agronomique)
# http://www.inra.fr
# http://urgi.versailles.inra.fr
#
# This software is governed by the CeCILL license under French law and
# abiding by the rules of distribution of free software.  You can  use, 
# modify and/ or redistribute the software under the terms of the CeCILL
# license as circulated by CEA, CNRS and INRIA at the following URL
# "http://www.cecill.info". 
#
# As a counterpart to the access to the source code and  rights to copy,
# modify and redistribute granted by the license, users are provided only
# with a limited warranty  and the software's author,  the holder of the
# economic rights,  and the successive licensors  have only  limited
# liability. 
#
# In this respect, the user's attention is drawn to the risks associated
# with loading,  using,  modifying and/or developing or reproducing the
# software by the user in light of its specific status of free software,
# that may mean  that it is complicated to manipulate,  and  that  also
# therefore means  that it is reserved for developers  and  experienced
# professionals having in-depth computer knowledge. Users are therefore
# encouraged to load and test the software's suitability as regards their
# requirements in conditions enabling the security of their systems and/or 
# data to be ensured and,  more generally, to use and operate it in the 
# same conditions as regards security. 
#
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license and that you accept its terms.


import sys
import string
import re
import random
import cStringIO
from commons.core.coord.Map import Map
from commons.core.checker.RepetException import RepetException

DNA_ALPHABET_WITH_N = set( ['A','T','G','C','N'] )
IUPAC = set(['A','T','G','C','U','R','Y','M','K','W','S','B','D','H','V','N'])


## Record a sequence with its header
#
class Bioseq( object ):
    
    header = ""
    sequence = ""
    
    ## constructor
    #
    # @param name the header of sequence
    # @param seq sequence (DNA, RNA, protein)
    #
    def __init__( self, name="", seq="" ):
        self.header = name
        self.sequence = seq
        
        
    ## Equal operator
    #        
    def __eq__( self, o ):
        if self.header==o.header and self.sequence==o.sequence:
            return True
        return False
    
    
    ## overload __repr__
    #
    def __repr__( self ):
        return "%s;%s" % ( self.header, self.sequence )
    
    
    ## set attribute header
    #
    # @param header a string
    #
    def setHeader( self, header ):
        self.header = header
        
        
    ## get attribute header
    #
    # @return header
    def getHeader(self):
        return self.header
    
    
    ## set attribute sequence
    #
    # @param sequence a string
    #
    def setSequence( self, sequence ):
        self.sequence = sequence
        
        
    def getSequence(self):
        return self.sequence
        
    ## reset
    #
    def reset( self ):
        self.setHeader( "" )
        self.setSequence( "" )
        
        
    ## Test if bioseq is empty
    #
    def isEmpty( self ):
        return self.header == "" and self.sequence == ""
    
    
    ## Reverse the sequence
    #
    def reverse( self ):
        tmp = self.sequence
        self.sequence = tmp[::-1]
        
        
    ## Turn the sequence into its complement
    #  Force upper case letters
    #  @warning: old name in pyRepet.Bioseq realComplement
    #
    def complement( self ):
        complement = ""
        self.upCase()
        for i in xrange(0,len(self.sequence),1):
            if self.sequence[i] == "A":
                complement += "T"
            elif self.sequence[i] == "T":
                complement += "A"
            elif self.sequence[i] == "C":
                complement += "G"
            elif self.sequence[i] == "G":
                complement += "C"
            elif self.sequence[i] == "M":
                complement += "K"
            elif self.sequence[i] == "R":
                complement += "Y"
            elif self.sequence[i] == "W":
                complement += "W"
            elif self.sequence[i] == "S":
                complement += "S"
            elif self.sequence[i] == "Y":
                complement += "R"
            elif self.sequence[i] == "K":
                complement += "M"
            elif self.sequence[i] == "V":
                complement += "B"
            elif self.sequence[i] == "H":
                complement += "D"
            elif self.sequence[i] == "D":
                complement += "H"
            elif self.sequence[i] == "B":
                complement += "V"
            elif self.sequence[i] == "N":
                complement += "N"
            elif self.sequence[i] == "-":
                complement += "-"
            else:
                print "WARNING: unknown symbol '%s', replacing it by N" % ( self.sequence[i] )
                complement += "N"
        self.sequence = complement
        
        
    ## Reverse and complement the sequence
    #
    #  Force upper case letters
    #  @warning: old name in pyRepet.Bioseq : complement  
    # 
    def reverseComplement( self ):
        self.reverse()
        self.complement()
        
        
    ## Remove gap in the sequence
    #
    def cleanGap(self):
        self.sequence = self.sequence.replace("-","")
        
        
    ## Copy current Bioseq Instance
    #
    # @return: a Bioseq instance, a copy of current sequence.
    #
    def copyBioseqInstance(self):
        seq = Bioseq()
        seq.sequence = self.sequence
        seq.header = self.header
        return seq
    
    
    ## Add phase information after the name of sequence in header
    #
    # @param phase integer representing phase (1, 2, 3, -1, -2, -3)
    #
    def setFrameInfoOnHeader(self, phase):
        if " " in self.header:
            name, desc = self.header.split(" ", 1)
            name = name + "_" + str(phase)
            self.header = name + " " + desc
        else:
            self.header = self.header + "_" + str(phase)
            
            
    ## Fill Bioseq attributes with fasta file
    #  
    # @param faFileHandler file handler of a fasta file
    #
    def read( self, faFileHandler ):
        line = faFileHandler.readline()
        if line == "":
            self.header = None
            self.sequence = None
            return
        while line == "\n":
            line = faFileHandler.readline()
        if line[0] == '>':
            self.header = string.rstrip(line[1:])
        else:
            print "error, line is",string.rstrip(line)
            return
        line = " "
        seq = cStringIO.StringIO()
        while line:
            prev_pos = faFileHandler.tell()
            line = faFileHandler.readline()
            if line == "":
                break
            if line[0] == '>':
                faFileHandler.seek( prev_pos )
                break
            seq.write( string.rstrip(line) )
        self.sequence = seq.getvalue()
        
        
    ## Create a subsequence with a modified header
    #  
    # @param s integer start a required subsequence
    # @param e integer end a required subsequence
    #
    # @return a Bioseq instance, a subsequence of current sequence
    #       
    def subseq( self, s, e=0 ):
        if e == 0 :
            e=len( self.sequence )
        if s > e :
            print "error: start must be < or = to end"
            return
        if s <= 0 :
            print "error: start must be > 0"
            return
        sub = Bioseq()
        sub.header = self.header + " fragment " + str(s) + ".." + str(e)
        sub.sequence = self.sequence[(s-1):e]
        return sub
        
        
    ## Get the nucleotide or aminoacid at the given position
    #  
    # @param pos integer nucleotide or aminoacid position
    #
    # @return a string
    #       
    def getNtFromPosition(self, pos):
        result = None
        if not (pos < 1 or pos > self.getLength()):
            result = self.sequence[pos - 1]
        return result
    
    
    ## Print in stdout the Bioseq in fasta format with 60 characters lines
    #  
    # @param l length of required sequence default is whole sequence
    # 
    def view(self,l=0):
        print '>'+self.header
        i=0
        if(l==0):
            l=len(self.sequence)
        seq=self.sequence[0:l]
            
        while i<len(seq):
            print seq[i:i+60]
            i=i+60        
            
            
    ## Get length of sequence
    #
    # @param avoidN boolean don't count 'N' nucleotides
    #
    # @return length of current sequence
    # 
    def getLength( self, countN = True ):
        if countN:
            return len(self.sequence)
        else:
            return len(self.sequence) - self.countNt('N')
            
            
    ## Return the proportion of a specific character
    # 
    # @param nt character that we want to count
    #
    def propNt( self, nt ):
        return self.countNt( nt ) / float( self.getLength() )
    
    
    ## Count occurrence of specific character
    # 
    # @param nt character that we want to count
    #
    # @return nb of occurrences
    #
    def countNt( self, nt ):
        return self.sequence.count( nt )
    
    
    ## Count occurrence of each nucleotide in current seq 
    #
    # @return a dict, keys are nucleotides, values are nb of occurrences
    #
    def countAllNt( self ):
        dNt2Count = {}
        for nt in ["A","T","G","C","N"]:
            dNt2Count[ nt ] = self.countNt( nt )
        return dNt2Count
    
    
    ## Return a dict with the number of occurrences for each combination of ATGC of specified size and number of word found
    #
    # @param size integer required length word
    #
    def occ_word( self, size ):
        occ = {}
        if size == 0:
            return occ,0
        nbword = 0
        srch = re.compile('[^ATGC]+')
        wordlist = self._createWordList( size )
        for i in wordlist:
            occ[i] = 0
        lenseq = len(self.sequence)
        i = 0
        while i < lenseq-size+1:
            word = self.sequence[i:i+size].upper()
            m = srch.search(word)
            if m == None:
                occ[word] = occ[word]+1
                nbword = nbword + 1
                i = i + 1
            else:
                i = i + m.end(0)
        return occ, nbword
    
    
    ## Return a dictionary with the frequency of occurs for each combination of ATGC of specified size
    #
    # @param size integer required length word
    #
    def freq_word( self, size ):
        dOcc, nbWords = self.occ_word( size )
        freq = {}
        for word in dOcc.keys():
            freq[word] = float(dOcc[word]) / nbWords
        return freq
    
    
    ## Find ORF in each phase
    #
    # @return: a dict, keys are phases, values are stop codon positions.
    #
    def findORF (self):
        orf = {0:[],1:[],2:[]}
        length = len (self.sequence)
        for i in xrange(0,length):
            triplet = self.sequence[i:i+3] 
            if ( triplet == "TAA" or triplet == "TAG" or triplet == "TGA"):
                phase = i % 3
                orf[phase].append(i)
        return orf
    
    
    ## Convert the sequence into upper case
    #
    def upCase( self ):
        self.sequence = self.sequence.upper()
        
        
    ## Convert the sequence into lower case
    #
    def lowCase( self ):
        self.sequence = self.sequence.lower()
        
        
    ## Extract the cluster of the fragment (output from Grouper)
    #
    # @return cluster id (string)
    #    
    def getClusterID( self ):
        data = self.header.split()
        return data[0].split("Cl")[1]
    
    
    ## Extract the group of the sequence (output from Grouper)
    #
    # @return group id (string)
    #
    def getGroupID( self ):
        data = self.header.split()
        return data[0].split("Gr")[1].split("Cl")[0]
    
    
    ## Get the header of the full sequence (output from Grouper)
    #
    # @example  'Dmel_Grouper_3091_Malign_3:LARD' from '>MbS1566Gr81Cl81 Dmel_Grouper_3091_Malign_3:LARD {Fragment} 1..5203' 
    # @return header (string)
    #
    def getHeaderFullSeq( self ):
        data = self.header.split()
        return data[1]
    
    
    ## Get the strand of the fragment (output from Grouper)
    #
    # @return: strand (+ or -)
    #
    def getFragStrand( self ):
        data = self.header.split()
        coord = data[3].split("..")
        if int(coord[0]) < int(coord[-1]):
            return "+"
        else:
            return "-"
        
        
    ## Get A, T, G, C or N from an IUPAC letter
    #  IUPAC = ['A','T','G','C','U','R','Y','M','K','W','S','B','D','H','V','N']
    #
    # @return A, T, G, C or N
    #
    def getATGCNFromIUPAC( self, nt ):
        subset = ["A","T","G","C","N"]

        if nt in subset:
            return nt
        elif nt == "U":
            return "T"
        elif nt == "R":
            return random.choice( "AG" )
        elif nt == "Y":
            return random.choice( "CT" )
        elif nt == "M":
            return random.choice( "CA" )
        elif nt == "K":
            return random.choice( "TG" )
        elif nt == "W":
            return random.choice( "TA" )
        elif nt == "S":
            return random.choice( "CG" )
        elif nt == "B":
            return random.choice( "CTG" )
        elif nt == "D":
            return random.choice( "ATG" )
        elif nt == "H":
            return random.choice( "ATC" )
        elif nt == "V":
            return random.choice( "ACG" )
        else:
            return "N"
    
    ## Get nucleotide from an IUPAC letter and a nucleotide
    # Works only for IUPAC code with two possibilities ['R','Y','M','K','W','S']
    # Examples:
    # Y and C returns T
    # Y and T returns C
    # B and C throws RepetException
    #
    # @return A, T, G, C
    #
    def getATGCNFromIUPACandATGCN(self, IUPACCode, nt):
        if IUPACCode == "R":
            possibleNt = set(["A", "G"])
            if nt not in possibleNt:
                raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
            return (possibleNt - set(nt)).pop()
        
        elif IUPACCode == "Y":
            possibleNt = set(["C", "T"])
            if nt not in possibleNt:
                raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
            return (possibleNt - set(nt)).pop()
        
        elif IUPACCode == "M":
            possibleNt = set(["A", "C"])
            if nt not in possibleNt:
                raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
            return (possibleNt - set(nt)).pop()
        
        elif IUPACCode == "K":
            possibleNt = set(["T", "G"])
            if nt not in possibleNt:
                raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
            return (possibleNt - set(nt)).pop()
        
        elif IUPACCode == "W":
            possibleNt = set(["A", "T"])
            if nt not in possibleNt:
                raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
            return (possibleNt - set(nt)).pop()
        
        elif IUPACCode == "S":
            possibleNt = set(["C", "G"])
            if nt not in possibleNt:
                raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
            return (possibleNt - set(nt)).pop()

        else:
            raise RepetException("Can't retrieve the third nucleotide from IUPAC code '%s' and nucleotide '%s'" % (IUPACCode, nt))
    
    def getSeqWithOnlyATGCN( self ):
        newSeq = ""
        for nt in self.sequence:
            newSeq += self.getATGCNFromIUPAC( nt )
        return newSeq
    
    
    ## Replace any symbol not in (A,T,G,C,N) by another nucleotide it represents
    #
    def partialIUPAC( self ):
        self.sequence = self.getSeqWithOnlyATGCN()
    
    
    ## Remove non Unix end-of-line symbols, if any
    #
    def checkEOF( self ):
        symbol = "\r"   # corresponds to '^M' from Windows
        if symbol in self.sequence:
            print "WARNING: Windows EOF removed in '%s'" % ( self.header )
            sys.stdout.flush()
            newSeq = self.sequence.replace( symbol, "" )
            self.sequence = newSeq
            
            
    ## Write Bioseq instance into a fasta file handler
    #
    # @param faFileHandler file handler of a fasta file
    # 
    def write( self, faFileHandler ):
        faFileHandler.write( ">%s\n" % ( self.header ) )
        self.writeSeqInFasta( faFileHandler )
        
        
    ## Write only the sequence of Bioseq instance into a fasta file handler
    #
    # @param faFileHandler file handler of a fasta file
    #
    def writeSeqInFasta( self, faFileHandler ):
        i = 0
        while i < self.getLength():
            faFileHandler.write( "%s\n" % ( self.sequence[i:i+60] ) )
            i += 60
            
            
    ## Append Bioseq instance to a fasta file
    #
    # @param faFile name of a fasta file as a string
    # @param mode 'write' or 'append'
    #
    def save( self, faFile, mode="a" ):
        faFileHandler = open( faFile, mode )
        self.write( faFileHandler )
        faFileHandler.close()
        
        
    ## Append Bioseq instance to a fasta file
    #  
    # @param faFile name of a fasta file as a string
    #
    def appendBioseqInFile( self, faFile ):
        self.save( faFile, "a" )
        
        
    ## Write Bioseq instance into a fasta file handler
    #  
    # @param faFileHandler file handler on a file with writing right
    #
    def writeABioseqInAFastaFile( self, faFileHandler ):
        self.write( faFileHandler )
        
        
    ## Write Bioseq instance with other header into a fasta file handler
    #
    # @param faFileHandler file handler on a file with writing right
    # @param otherHeader a string representing a new header (without the > and the \n)
    #
    def writeWithOtherHeader( self, faFileHandler, otherHeader ):
        self.header = otherHeader
        self.write( faFileHandler )
        
        
    ## Append Bioseq header and Bioseq sequence in a fasta file
    #  
    # @param faFileHandler file handler on a file with writing right
    # @param otherHeader a string representing a new header (without the > and the \n)
    #
    def writeABioseqInAFastaFileWithOtherHeader( self, faFileHandler, otherHeader ):
        self.writeWithOtherHeader( faFileHandler, otherHeader )
        
        
    ## get the list of Maps corresponding to seq without gap
    #
    # @warning This method was called getMap() in pyRepet.Bioseq
    # @return a list of Map object 
    # 
    def getLMapWhithoutGap( self ):
        lMaps = []
        countSite = 1
        countSubseq = 1
        inGap = False
        startMap = -1
        endMap = -1

        # initialize with the first site
        if self.sequence[0] == "-":
            inGap = True
        else:
            startMap = countSite

        # for each remaining site
        for site in self.sequence[1:]:
            countSite += 1

            # if it is a gap
            if site == "-":

                # if this is the beginning of a gap, record the previous subsequence
                if inGap == False:
                    inGap = True
                    endMap = countSite - 1
                    lMaps.append( Map( "%s_subSeq%i" % (self.header,countSubseq), self.header, startMap, endMap ) )
                    countSubseq += 1

            # if it is NOT a gap
            if site != "-":

                # if it is the end of a gap, begin the next subsequence
                if inGap == True:
                    inGap = False
                    startMap = countSite

                # if it is the last site
                if countSite == self.getLength():
                    endMap = countSite
                    lMaps.append( Map( "%s_subSeq%i" % (self.header,countSubseq), self.header, startMap, endMap ) )

        return lMaps
    
    
    ## get the percentage of GC
    #
    # @return a percentage
    # 
    def getGCpercentage( self ):
        tmpSeq = self.getSeqWithOnlyATGCN()
        nbGC = tmpSeq.count( "G" ) + tmpSeq.count( "C" )
        return 100 * nbGC / float( self.getLength() )
    
    ## get the percentage of GC of a sequence without counting N in sequence length
    #
    # @return a percentage
    # 
    def getGCpercentageInSequenceWithoutCountNInLength(self):
        tmpSeq = self.getSeqWithOnlyATGCN()
        nbGC = tmpSeq.count( "G" ) + tmpSeq.count( "C" )
        return 100 * nbGC / float( self.getLength() - self.countNt("N") )
    
    ## get the 5 prime subsequence of a given length at the given position 
    #
    # @param position integer
    # @param flankLength integer subsequence length
    # @return a sequence string
    # 
    def get5PrimeFlank(self, position, flankLength):
        if(position == 1):
            return ""
        else:
            startOfFlank = 1
            endOfFlank = position -1
        
            if((position - flankLength) > 0):
                startOfFlank = position - flankLength
            else:
                startOfFlank = 1
            
            return self.subseq(startOfFlank, endOfFlank).sequence
            
            
    ## get the 3 prime subsequence of a given length at the given position 
    #  In the case of indels, the polymorphism length can be specified
    #
    # @param position integer
    # @param flankLength integer subsequence length
    # @param polymLength integer polymorphism length
    # @return a sequence string
    # 
    def get3PrimeFlank(self, position, flankLength, polymLength = 1):
        if((position + polymLength) > len( self.sequence )):
            return ""
        else:
            startOfFlank = position + polymLength
         
            if((position+polymLength+flankLength) > len( self.sequence )):
                endOfFlank =  len( self.sequence )
            else:
                endOfFlank =  position+polymLength+flankLength-1
        
            return self.subseq(startOfFlank, endOfFlank).sequence
    
    
    def _createWordList(self,size,l=['A','T','G','C']):
        if size == 1 :
            return l
        else:
            l2 = []
            for i in l:
                for j in ['A','T','G','C']:
                    l2.append( i + j )
        return self._createWordList(size-1,l2)
    
    
    def removeSymbol( self, symbol ):
        tmp = self.sequence.replace( symbol, "" )
        self.sequence = tmp