diff commons/core/seq/Bioseq.py @ 36:44d5973c188c

Uploaded
author m-zytnicki
date Tue, 30 Apr 2013 15:02:29 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/commons/core/seq/Bioseq.py	Tue Apr 30 15:02:29 2013 -0400
@@ -0,0 +1,735 @@
+# Copyright INRA (Institut National de la Recherche Agronomique)
+# http://www.inra.fr
+# http://urgi.versailles.inra.fr
+#
+# This software is governed by the CeCILL license under French law and
+# abiding by the rules of distribution of free software.  You can  use, 
+# modify and/ or redistribute the software under the terms of the CeCILL
+# license as circulated by CEA, CNRS and INRIA at the following URL
+# "http://www.cecill.info". 
+#
+# As a counterpart to the access to the source code and  rights to copy,
+# modify and redistribute granted by the license, users are provided only
+# with a limited warranty  and the software's author,  the holder of the
+# economic rights,  and the successive licensors  have only  limited
+# liability. 
+#
+# In this respect, the user's attention is drawn to the risks associated
+# with loading,  using,  modifying and/or developing or reproducing the
+# software by the user in light of its specific status of free software,
+# that may mean  that it is complicated to manipulate,  and  that  also
+# therefore means  that it is reserved for developers  and  experienced
+# professionals having in-depth computer knowledge. Users are therefore
+# encouraged to load and test the software's suitability as regards their
+# requirements in conditions enabling the security of their systems and/or 
+# data to be ensured and,  more generally, to use and operate it in the 
+# same conditions as regards security. 
+#
+# The fact that you are presently reading this means that you have had
+# knowledge of the CeCILL license and that you accept its terms.
+
+
+import sys
+import string
+import re
+import random
+import cStringIO
+from commons.core.coord.Map import Map
+from commons.core.checker.RepetException import RepetException
+
+DNA_ALPHABET_WITH_N = set( ['A','T','G','C','N'] )
+IUPAC = set(['A','T','G','C','U','R','Y','M','K','W','S','B','D','H','V','N'])
+
+
+## Record a sequence with its header
+#
+class Bioseq( object ):
+    
+    header = ""
+    sequence = ""
+    
+    ## constructor
+    #
+    # @param name the header of sequence
+    # @param seq sequence (DNA, RNA, protein)
+    #
+    def __init__( self, name="", seq="" ):
+        self.header = name
+        self.sequence = seq
+        
+        
+    ## Equal operator
+    #        
+    def __eq__( self, o ):
+        if self.header==o.header and self.sequence==o.sequence:
+            return True
+        return False
+    
+    
+    ## overload __repr__
+    #
+    def __repr__( self ):
+        return "%s;%s" % ( self.header, self.sequence )
+    
+    
+    ## set attribute header
+    #
+    # @param header a string
+    #
+    def setHeader( self, header ):
+        self.header = header
+        
+        
+    ## get attribute header
+    #
+    # @return header
+    def getHeader(self):
+        return self.header
+    
+    
+    ## set attribute sequence
+    #
+    # @param sequence a string
+    #
+    def setSequence( self, sequence ):
+        self.sequence = sequence
+        
+        
+    def getSequence(self):
+        return self.sequence
+        
+    ## reset
+    #
+    def reset( self ):
+        self.setHeader( "" )
+        self.setSequence( "" )
+        
+        
+    ## Test if bioseq is empty
+    #
+    def isEmpty( self ):
+        return self.header == "" and self.sequence == ""
+    
+    
+    ## Reverse the sequence
+    #
+    def reverse( self ):
+        tmp = self.sequence
+        self.sequence = tmp[::-1]
+        
+        
+    ## Turn the sequence into its complement
+    #  Force upper case letters
+    #  @warning: old name in pyRepet.Bioseq realComplement
+    #
+    def complement( self ):
+        complement = ""
+        self.upCase()
+        for i in xrange(0,len(self.sequence),1):
+            if self.sequence[i] == "A":
+                complement += "T"
+            elif self.sequence[i] == "T":
+                complement += "A"
+            elif self.sequence[i] == "C":
+                complement += "G"
+            elif self.sequence[i] == "G":
+                complement += "C"
+            elif self.sequence[i] == "M":
+                complement += "K"
+            elif self.sequence[i] == "R":
+                complement += "Y"
+            elif self.sequence[i] == "W":
+                complement += "W"
+            elif self.sequence[i] == "S":
+                complement += "S"
+            elif self.sequence[i] == "Y":
+                complement += "R"
+            elif self.sequence[i] == "K":
+                complement += "M"
+            elif self.sequence[i] == "V":
+                complement += "B"
+            elif self.sequence[i] == "H":
+                complement += "D"
+            elif self.sequence[i] == "D":
+                complement += "H"
+            elif self.sequence[i] == "B":
+                complement += "V"
+            elif self.sequence[i] == "N":
+                complement += "N"
+            elif self.sequence[i] == "-":
+                complement += "-"
+            else:
+                print "WARNING: unknown symbol '%s', replacing it by N" % ( self.sequence[i] )
+                complement += "N"
+        self.sequence = complement
+        
+        
+    ## Reverse and complement the sequence
+    #
+    #  Force upper case letters
+    #  @warning: old name in pyRepet.Bioseq : complement  
+    # 
+    def reverseComplement( self ):
+        self.reverse()
+        self.complement()
+        
+        
+    ## Remove gap in the sequence
+    #
+    def cleanGap(self):
+        self.sequence = self.sequence.replace("-","")
+        
+        
+    ## Copy current Bioseq Instance
+    #
+    # @return: a Bioseq instance, a copy of current sequence.
+    #
+    def copyBioseqInstance(self):
+        seq = Bioseq()
+        seq.sequence = self.sequence
+        seq.header = self.header
+        return seq
+    
+    
+    ## Add phase information after the name of sequence in header
+    #
+    # @param phase integer representing phase (1, 2, 3, -1, -2, -3)
+    #
+    def setFrameInfoOnHeader(self, phase):
+        if " " in self.header:
+            name, desc = self.header.split(" ", 1)
+            name = name + "_" + str(phase)
+            self.header = name + " " + desc
+        else:
+            self.header = self.header + "_" + str(phase)
+            
+            
+    ## Fill Bioseq attributes with fasta file
+    #  
+    # @param faFileHandler file handler of a fasta file
+    #
+    def read( self, faFileHandler ):
+        line = faFileHandler.readline()
+        if line == "":
+            self.header = None
+            self.sequence = None
+            return
+        while line == "\n":
+            line = faFileHandler.readline()
+        if line[0] == '>':
+            self.header = string.rstrip(line[1:])
+        else:
+            print "error, line is",string.rstrip(line)
+            return
+        line = " "
+        seq = cStringIO.StringIO()
+        while line:
+            prev_pos = faFileHandler.tell()
+            line = faFileHandler.readline()
+            if line == "":
+                break
+            if line[0] == '>':
+                faFileHandler.seek( prev_pos )
+                break
+            seq.write( string.rstrip(line) )
+        self.sequence = seq.getvalue()
+        
+        
+    ## Create a subsequence with a modified header
+    #  
+    # @param s integer start a required subsequence
+    # @param e integer end a required subsequence
+    #
+    # @return a Bioseq instance, a subsequence of current sequence
+    #       
+    def subseq( self, s, e=0 ):
+        if e == 0 :
+            e=len( self.sequence )
+        if s > e :
+            print "error: start must be < or = to end"
+            return
+        if s <= 0 :
+            print "error: start must be > 0"
+            return
+        sub = Bioseq()
+        sub.header = self.header + " fragment " + str(s) + ".." + str(e)
+        sub.sequence = self.sequence[(s-1):e]
+        return sub
+        
+        
+    ## Get the nucleotide or aminoacid at the given position
+    #  
+    # @param pos integer nucleotide or aminoacid position
+    #
+    # @return a string
+    #       
+    def getNtFromPosition(self, pos):
+        result = None
+        if not (pos < 1 or pos > self.getLength()):
+            result = self.sequence[pos - 1]
+        return result
+    
+    
+    ## Print in stdout the Bioseq in fasta format with 60 characters lines
+    #  
+    # @param l length of required sequence default is whole sequence
+    # 
+    def view(self,l=0):
+        print '>'+self.header
+        i=0
+        if(l==0):
+            l=len(self.sequence)
+        seq=self.sequence[0:l]
+            
+        while i<len(seq):
+            print seq[i:i+60]
+            i=i+60        
+            
+            
+    ## Get length of sequence
+    #
+    # @param avoidN boolean don't count 'N' nucleotides
+    #
+    # @return length of current sequence
+    # 
+    def getLength( self, countN = True ):
+        if countN:
+            return len(self.sequence)
+        else:
+            return len(self.sequence) - self.countNt('N')
+            
+            
+    ## Return the proportion of a specific character
+    # 
+    # @param nt character that we want to count
+    #
+    def propNt( self, nt ):
+        return self.countNt( nt ) / float( self.getLength() )
+    
+    
+    ## Count occurrence of specific character
+    # 
+    # @param nt character that we want to count
+    #
+    # @return nb of occurrences
+    #
+    def countNt( self, nt ):
+        return self.sequence.count( nt )
+    
+    
+    ## Count occurrence of each nucleotide in current seq 
+    #
+    # @return a dict, keys are nucleotides, values are nb of occurrences
+    #
+    def countAllNt( self ):
+        dNt2Count = {}
+        for nt in ["A","T","G","C","N"]:
+            dNt2Count[ nt ] = self.countNt( nt )
+        return dNt2Count
+    
+    
+    ## Return a dict with the number of occurrences for each combination of ATGC of specified size and number of word found
+    #
+    # @param size integer required length word
+    #
+    def occ_word( self, size ):
+        occ = {}
+        if size == 0:
+            return occ,0
+        nbword = 0
+        srch = re.compile('[^ATGC]+')
+        wordlist = self._createWordList( size )
+        for i in wordlist:
+            occ[i] = 0
+        lenseq = len(self.sequence)
+        i = 0
+        while i < lenseq-size+1:
+            word = self.sequence[i:i+size].upper()
+            m = srch.search(word)
+            if m == None:
+                occ[word] = occ[word]+1
+                nbword = nbword + 1
+                i = i + 1
+            else:
+                i = i + m.end(0)
+        return occ, nbword
+    
+    
+    ## Return a dictionary with the frequency of occurs for each combination of ATGC of specified size
+    #
+    # @param size integer required length word
+    #
+    def freq_word( self, size ):
+        dOcc, nbWords = self.occ_word( size )
+        freq = {}
+        for word in dOcc.keys():
+            freq[word] = float(dOcc[word]) / nbWords
+        return freq
+    
+    
+    ## Find ORF in each phase
+    #
+    # @return: a dict, keys are phases, values are stop codon positions.
+    #
+    def findORF (self):
+        orf = {0:[],1:[],2:[]}
+        length = len (self.sequence)
+        for i in xrange(0,length):
+            triplet = self.sequence[i:i+3] 
+            if ( triplet == "TAA" or triplet == "TAG" or triplet == "TGA"):
+                phase = i % 3
+                orf[phase].append(i)
+        return orf
+    
+    
+    ## Convert the sequence into upper case
+    #
+    def upCase( self ):
+        self.sequence = self.sequence.upper()
+        
+        
+    ## Convert the sequence into lower case
+    #
+    def lowCase( self ):
+        self.sequence = self.sequence.lower()
+        
+        
+    ## Extract the cluster of the fragment (output from Grouper)
+    #
+    # @return cluster id (string)
+    #    
+    def getClusterID( self ):
+        data = self.header.split()
+        return data[0].split("Cl")[1]
+    
+    
+    ## Extract the group of the sequence (output from Grouper)
+    #
+    # @return group id (string)
+    #
+    def getGroupID( self ):
+        data = self.header.split()
+        return data[0].split("Gr")[1].split("Cl")[0]
+    
+    
+    ## Get the header of the full sequence (output from Grouper)
+    #
+    # @example  'Dmel_Grouper_3091_Malign_3:LARD' from '>MbS1566Gr81Cl81 Dmel_Grouper_3091_Malign_3:LARD {Fragment} 1..5203' 
+    # @return header (string)
+    #
+    def getHeaderFullSeq( self ):
+        data = self.header.split()
+        return data[1]
+    
+    
+    ## Get the strand of the fragment (output from Grouper)
+    #
+    # @return: strand (+ or -)
+    #
+    def getFragStrand( self ):
+        data = self.header.split()
+        coord = data[3].split("..")
+        if int(coord[0]) < int(coord[-1]):
+            return "+"
+        else:
+            return "-"
+        
+        
+    ## Get A, T, G, C or N from an IUPAC letter
+    #  IUPAC = ['A','T','G','C','U','R','Y','M','K','W','S','B','D','H','V','N']
+    #
+    # @return A, T, G, C or N
+    #
+    def getATGCNFromIUPAC( self, nt ):
+        subset = ["A","T","G","C","N"]
+
+        if nt in subset:
+            return nt
+        elif nt == "U":
+            return "T"
+        elif nt == "R":
+            return random.choice( "AG" )
+        elif nt == "Y":
+            return random.choice( "CT" )
+        elif nt == "M":
+            return random.choice( "CA" )
+        elif nt == "K":
+            return random.choice( "TG" )
+        elif nt == "W":
+            return random.choice( "TA" )
+        elif nt == "S":
+            return random.choice( "CG" )
+        elif nt == "B":
+            return random.choice( "CTG" )
+        elif nt == "D":
+            return random.choice( "ATG" )
+        elif nt == "H":
+            return random.choice( "ATC" )
+        elif nt == "V":
+            return random.choice( "ACG" )
+        else:
+            return "N"
+    
+    ## Get nucleotide from an IUPAC letter and a nucleotide
+    # Works only for IUPAC code with two possibilities ['R','Y','M','K','W','S']
+    # Examples:
+    # Y and C returns T
+    # Y and T returns C
+    # B and C throws RepetException
+    #
+    # @return A, T, G, C
+    #
+    def getATGCNFromIUPACandATGCN(self, IUPACCode, nt):
+        if IUPACCode == "R":
+            possibleNt = set(["A", "G"])
+            if nt not in possibleNt:
+                raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
+            return (possibleNt - set(nt)).pop()
+        
+        elif IUPACCode == "Y":
+            possibleNt = set(["C", "T"])
+            if nt not in possibleNt:
+                raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
+            return (possibleNt - set(nt)).pop()
+        
+        elif IUPACCode == "M":
+            possibleNt = set(["A", "C"])
+            if nt not in possibleNt:
+                raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
+            return (possibleNt - set(nt)).pop()
+        
+        elif IUPACCode == "K":
+            possibleNt = set(["T", "G"])
+            if nt not in possibleNt:
+                raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
+            return (possibleNt - set(nt)).pop()
+        
+        elif IUPACCode == "W":
+            possibleNt = set(["A", "T"])
+            if nt not in possibleNt:
+                raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
+            return (possibleNt - set(nt)).pop()
+        
+        elif IUPACCode == "S":
+            possibleNt = set(["C", "G"])
+            if nt not in possibleNt:
+                raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
+            return (possibleNt - set(nt)).pop()
+
+        else:
+            raise RepetException("Can't retrieve the third nucleotide from IUPAC code '%s' and nucleotide '%s'" % (IUPACCode, nt))
+    
+    def getSeqWithOnlyATGCN( self ):
+        newSeq = ""
+        for nt in self.sequence:
+            newSeq += self.getATGCNFromIUPAC( nt )
+        return newSeq
+    
+    
+    ## Replace any symbol not in (A,T,G,C,N) by another nucleotide it represents
+    #
+    def partialIUPAC( self ):
+        self.sequence = self.getSeqWithOnlyATGCN()
+    
+    
+    ## Remove non Unix end-of-line symbols, if any
+    #
+    def checkEOF( self ):
+        symbol = "\r"   # corresponds to '^M' from Windows
+        if symbol in self.sequence:
+            print "WARNING: Windows EOF removed in '%s'" % ( self.header )
+            sys.stdout.flush()
+            newSeq = self.sequence.replace( symbol, "" )
+            self.sequence = newSeq
+            
+            
+    ## Write Bioseq instance into a fasta file handler
+    #
+    # @param faFileHandler file handler of a fasta file
+    # 
+    def write( self, faFileHandler ):
+        faFileHandler.write( ">%s\n" % ( self.header ) )
+        self.writeSeqInFasta( faFileHandler )
+        
+        
+    ## Write only the sequence of Bioseq instance into a fasta file handler
+    #
+    # @param faFileHandler file handler of a fasta file
+    #
+    def writeSeqInFasta( self, faFileHandler ):
+        i = 0
+        while i < self.getLength():
+            faFileHandler.write( "%s\n" % ( self.sequence[i:i+60] ) )
+            i += 60
+            
+            
+    ## Append Bioseq instance to a fasta file
+    #
+    # @param faFile name of a fasta file as a string
+    # @param mode 'write' or 'append'
+    #
+    def save( self, faFile, mode="a" ):
+        faFileHandler = open( faFile, mode )
+        self.write( faFileHandler )
+        faFileHandler.close()
+        
+        
+    ## Append Bioseq instance to a fasta file
+    #  
+    # @param faFile name of a fasta file as a string
+    #
+    def appendBioseqInFile( self, faFile ):
+        self.save( faFile, "a" )
+        
+        
+    ## Write Bioseq instance into a fasta file handler
+    #  
+    # @param faFileHandler file handler on a file with writing right
+    #
+    def writeABioseqInAFastaFile( self, faFileHandler ):
+        self.write( faFileHandler )
+        
+        
+    ## Write Bioseq instance with other header into a fasta file handler
+    #
+    # @param faFileHandler file handler on a file with writing right
+    # @param otherHeader a string representing a new header (without the > and the \n)
+    #
+    def writeWithOtherHeader( self, faFileHandler, otherHeader ):
+        self.header = otherHeader
+        self.write( faFileHandler )
+        
+        
+    ## Append Bioseq header and Bioseq sequence in a fasta file
+    #  
+    # @param faFileHandler file handler on a file with writing right
+    # @param otherHeader a string representing a new header (without the > and the \n)
+    #
+    def writeABioseqInAFastaFileWithOtherHeader( self, faFileHandler, otherHeader ):
+        self.writeWithOtherHeader( faFileHandler, otherHeader )
+        
+        
+    ## get the list of Maps corresponding to seq without gap
+    #
+    # @warning This method was called getMap() in pyRepet.Bioseq
+    # @return a list of Map object 
+    # 
+    def getLMapWhithoutGap( self ):
+        lMaps = []
+        countSite = 1
+        countSubseq = 1
+        inGap = False
+        startMap = -1
+        endMap = -1
+
+        # initialize with the first site
+        if self.sequence[0] == "-":
+            inGap = True
+        else:
+            startMap = countSite
+
+        # for each remaining site
+        for site in self.sequence[1:]:
+            countSite += 1
+
+            # if it is a gap
+            if site == "-":
+
+                # if this is the beginning of a gap, record the previous subsequence
+                if inGap == False:
+                    inGap = True
+                    endMap = countSite - 1
+                    lMaps.append( Map( "%s_subSeq%i" % (self.header,countSubseq), self.header, startMap, endMap ) )
+                    countSubseq += 1
+
+            # if it is NOT a gap
+            if site != "-":
+
+                # if it is the end of a gap, begin the next subsequence
+                if inGap == True:
+                    inGap = False
+                    startMap = countSite
+
+                # if it is the last site
+                if countSite == self.getLength():
+                    endMap = countSite
+                    lMaps.append( Map( "%s_subSeq%i" % (self.header,countSubseq), self.header, startMap, endMap ) )
+
+        return lMaps
+    
+    
+    ## get the percentage of GC
+    #
+    # @return a percentage
+    # 
+    def getGCpercentage( self ):
+        tmpSeq = self.getSeqWithOnlyATGCN()
+        nbGC = tmpSeq.count( "G" ) + tmpSeq.count( "C" )
+        return 100 * nbGC / float( self.getLength() )
+    
+    ## get the percentage of GC of a sequence without counting N in sequence length
+    #
+    # @return a percentage
+    # 
+    def getGCpercentageInSequenceWithoutCountNInLength(self):
+        tmpSeq = self.getSeqWithOnlyATGCN()
+        nbGC = tmpSeq.count( "G" ) + tmpSeq.count( "C" )
+        return 100 * nbGC / float( self.getLength() - self.countNt("N") )
+    
+    ## get the 5 prime subsequence of a given length at the given position 
+    #
+    # @param position integer
+    # @param flankLength integer subsequence length
+    # @return a sequence string
+    # 
+    def get5PrimeFlank(self, position, flankLength):
+        if(position == 1):
+            return ""
+        else:
+            startOfFlank = 1
+            endOfFlank = position -1
+        
+            if((position - flankLength) > 0):
+                startOfFlank = position - flankLength
+            else:
+                startOfFlank = 1
+            
+            return self.subseq(startOfFlank, endOfFlank).sequence
+            
+            
+    ## get the 3 prime subsequence of a given length at the given position 
+    #  In the case of indels, the polymorphism length can be specified
+    #
+    # @param position integer
+    # @param flankLength integer subsequence length
+    # @param polymLength integer polymorphism length
+    # @return a sequence string
+    # 
+    def get3PrimeFlank(self, position, flankLength, polymLength = 1):
+        if((position + polymLength) > len( self.sequence )):
+            return ""
+        else:
+            startOfFlank = position + polymLength
+         
+            if((position+polymLength+flankLength) > len( self.sequence )):
+                endOfFlank =  len( self.sequence )
+            else:
+                endOfFlank =  position+polymLength+flankLength-1
+        
+            return self.subseq(startOfFlank, endOfFlank).sequence
+    
+    
+    def _createWordList(self,size,l=['A','T','G','C']):
+        if size == 1 :
+            return l
+        else:
+            l2 = []
+            for i in l:
+                for j in ['A','T','G','C']:
+                    l2.append( i + j )
+        return self._createWordList(size-1,l2)
+    
+    
+    def removeSymbol( self, symbol ):
+        tmp = self.sequence.replace( symbol, "" )
+        self.sequence = tmp