Mercurial > repos > urgi-team > teiso
diff TEisotools-1.1.a/commons/core/seq/BioseqDB.py @ 16:836ce3d9d47a draft default tip
Uploaded
author | urgi-team |
---|---|
date | Thu, 21 Jul 2016 07:42:47 -0400 |
parents | 255c852351c5 |
children |
line wrap: on
line diff
--- a/TEisotools-1.1.a/commons/core/seq/BioseqDB.py Thu Jul 21 07:36:44 2016 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,494 +0,0 @@ -# Copyright INRA (Institut National de la Recherche Agronomique) -# http://www.inra.fr -# http://urgi.versailles.inra.fr -# -# This software is governed by the CeCILL license under French law and -# abiding by the rules of distribution of free software. You can use, -# modify and/ or redistribute the software under the terms of the CeCILL -# license as circulated by CEA, CNRS and INRIA at the following URL -# "http://www.cecill.info". -# -# As a counterpart to the access to the source code and rights to copy, -# modify and redistribute granted by the license, users are provided only -# with a limited warranty and the software's author, the holder of the -# economic rights, and the successive licensors have only limited -# liability. -# -# In this respect, the user's attention is drawn to the risks associated -# with loading, using, modifying and/or developing or reproducing the -# software by the user in light of its specific status of free software, -# that may mean that it is complicated to manipulate, and that also -# therefore means that it is reserved for developers and experienced -# professionals having in-depth computer knowledge. Users are therefore -# encouraged to load and test the software's suitability as regards their -# requirements in conditions enabling the security of their systems and/or -# data to be ensured and, more generally, to use and operate it in the -# same conditions as regards security. -# -# The fact that you are presently reading this means that you have had -# knowledge of the CeCILL license and that you accept its terms. - - -import sys -import re -from commons.core.seq.Bioseq import Bioseq -from commons.core.stat.Stat import Stat - - -## Handle a collection of a Bioseq (header-sequence) -# -class BioseqDB( object ): - - def __init__( self, name="" ): - self.idx = {} - self.idx_renamed = {} - self.db = [] - self.name = name - if name != "": - faFile = open( name ) - self.read( faFile ) - faFile.close() - self.mean_seq_lgth = None - self.stat = Stat() - - - ## Equal operator - # - def __eq__( self, o ): - if type(o) is type(self): - selfSize = self.getSize() - if selfSize != o.getSize(): - return False - nbEqualInstances = 0 - for i in self.db: - atLeastOneIsEqual = False - for j in o.db: - if i == j: - atLeastOneIsEqual = True - continue - if atLeastOneIsEqual: - nbEqualInstances += 1 - if nbEqualInstances == selfSize: - return True - return False - - ## Not equal operator - # - def __ne__(self, o): - return not self.__eq__(o) - - ## Change the name of the BioseqDB - # - # @param name the BioseqDB name - # - def setName(self, name): - self.name = name - - - ## Record each sequence of the input file as a list of Bioseq instances - # - # @param faFileHandler handler of a fasta file - # - def read( self, faFileHandler ): - while True: - seq = Bioseq() - seq.read( faFileHandler ) - if seq.sequence == None: - break - self.add( seq ) - - - ## Write all Bioseq of BioseqDB in a formatted fasta file (60 character long) - # - # @param faFileHandler file handler of a fasta file - # - def write( self, faFileHandler ): - for bs in self.db: - bs.writeABioseqInAFastaFile( faFileHandler ) - - - ## Write all Bioseq of BioseqDB in a formatted fasta file (60 character long) - # - # @param outFaFileName file name of fasta file - # @param mode 'write' or 'append' - # - def save( self, outFaFileName, mode="w" ): - outFaFile = open( outFaFileName, mode ) - self.write( outFaFile ) - outFaFile.close() - - - ## Read a formatted fasta file and load it in the BioseqDB instance - # - # @param inFaFileName file name of fasta file - # - def load(self, inFaFileName): - fichier = open(inFaFileName) - self.read(fichier) - fichier.close() - - - ## Reverse each sequence of the collection - # - def reverse( self ): - for bs in self.db: - bs.reverse() - - - ## Turn each sequence into its complement - # - def complement( self ): - for bs in self.db: - bs.complement() - - - ## Reverse and complement each sequence - # - def reverseComplement( self ): - for bs in self.db: - bs.reverseComplement() - - - ## Set the collection from a list of Bioseq instances - # - def setData( self, lBioseqs ): - for i in lBioseqs: - self.add( i ) - - - ## Initialization of each attribute of the collection - # - def reset( self ): - self.db = [] - self.idx = {} - self.name = None - self.mean_seq_lgth = None - self.stat.reset() - - - ## Remove all the gap of the sequences of the collection - # - def cleanGap(self): - for iBioSeq in self.db: - iBioSeq.cleanGap() - - - ## Add a Bioseq instance and update the attributes - # - # @param bs a Bioseq instance - # - def add( self, bs ): - if self.idx.has_key( bs.header ): - sys.stderr.write( "ERROR: two sequences with same header '%s'\n" % ( bs.header ) ) - sys.exit(1) - self.db.append( bs ) - self.idx[ bs.header ] = len(self.db) - 1 - self.idx_renamed[ bs.header.replace("::","-").replace(":","-").replace(",","-").replace(" ","_") ] = len(self.db) - 1 - - - ## Give the Bioseq instance corresponding to specified index - # - # @return a Bioseq instance - # - def __getitem__(self,index): - if index < len(self.db): - return self.db[index] - - - ## Give the number of sequences in the bank - # - # @return an integer - # - def getSize( self ): - return len( self.db ) - - - ## Give the cumulative sequence length in the bank - # - # @return an integer - # - def getLength( self ): - cumLength = 0 - for iBioseq in self.db: - cumLength += iBioseq.getLength() - - return cumLength - - - ## Return the length of a given sequence via its header - # - # @return an integer - # - def getSeqLength( self, header ): - return self.fetch(header).getLength() - - - ## Return a list with the sequence headers - # - def getHeaderList( self ): - lHeaders = [] - for bs in self.db: - lHeaders.append( bs.header ) - return lHeaders - - - ## Return a list with the sequences - # - def getSequencesList( self ): - lSeqs = [] - for bs in self.db: - lSeqs.append( bs.getSequence() ) - return lSeqs - - - ## Give the Bioseq instance of the BioseqDB specified by its header - # - # @warning name of this method not appropriate getBioseqByHeader is proposed - # @param header string - # @return a Bioseq instance - # - def fetch( self, header ): - idx = self.idx.get(header,None) - if idx is not None: - return self.db[idx] - else: - idx = self.idx_renamed.get(header,None) - if idx is not None: - return self.db[idx] - else: - raise Exception("Header: "+header+" not found") - - - ## Get a list of Bioseq instances based on a list of headers - # - # @param lHeader list - # @return a list of Bioseq instances - # - def fetchList( self, lheader ): - result = [] - for headerName in lheader: - result.append(self.fetch( headerName )) - return result - - - ## Sort self on its Bioseq size, possibly by decreasing length - # - # @param reverse boolean - # - def sortByLength(self, reverse = False): - self.db.sort(key = lambda iBS: iBS.getLength(), reverse = reverse) - - - ## Give the Bioseq instance of the BioseqDB specified by its renamed header - # In renamed header "::", ":", "," character are been replaced by "-" and " " by "_" - # - # @param renamedHeader string - # @return a Bioseq instance - # - def getBioseqByRenamedHeader( self, renamedHeader ): - return self.db[self.idx_renamed[renamedHeader]] - - - ## Count the number of times the given nucleotide is present in the bank. - # - # @param nt character (nt or aa) - # @return an integer - # - def countNt( self, nt ): - total = 0 - for iBioseq in self.db: - total+= iBioseq.countNt( nt ) - return total - - - ## Count the number of times each nucleotide (A,T,G,C,N) is present in the bank. - # - # @return a dictionary with nucleotide as key and an integer as values - # - def countAllNt( self ): - dNt2Count = {} - for nt in ["A","T","G","C","N"]: - dNt2Count[ nt ] = self.countNt( nt ) - return dNt2Count - - - ## Extract a sub BioseqDB of specified size which beginning at specified start - # - # @param start integer index of first included Bioseq - # @param size integer size of expected BioseqDB - # @return a BioseqDB - # - def extractPart(self, start, size): - iShorterBioseqDB = BioseqDB() - for iBioseq in self.db[start:(start + size)]: - iShorterBioseqDB.add(iBioseq) - return iShorterBioseqDB - - - ## Extract a sub BioseqDB with the specified number of best length Bioseq - # - # @param numBioseq integer the number of Bioseq searched - # @return a BioseqDB - # - def bestLength(self, numBioseq): - length_list = [] - numseq = 0 - for each_seq in self.db: - if each_seq.sequence == None: - l=0 - else: - l = each_seq.getLength() - length_list.append(l) - numseq = numseq + 1 - - length_list.sort() - size = len(length_list) - if numBioseq < size: - len_min = length_list[size-numBioseq] - else: - len_min = length_list[0] - - numseq = 0 - nbsave = 0 - bestSeqs = BioseqDB() - bestSeqs.setName(self.name) - for each_seq in self.db: - if each_seq.sequence == None: - l=0 - else : - l = each_seq.getLength() - numseq = numseq + 1 - if l >= len_min: - bestSeqs.add(each_seq) - nbsave = nbsave + 1 - if nbsave == numBioseq : - break - return bestSeqs - - - ## Extract a sub BioseqDB from a file with Bioseq header containing the specified pattern - # - # @param pattern regular expression of wished Bioseq header - # @param inFileName name of fasta file in which we want extract the BioseqDB - # - def extractPatternOfFile(self, pattern, inFileName): - if pattern=="" : - return - srch=re.compile(pattern) - file_db=open(inFileName) - numseq=0 - nbsave=0 - while 1: - seq=Bioseq() - seq.read(file_db) - if seq.sequence==None: - break - numseq+=1 - m=srch.search(seq.header) - if m: - self.add(seq) - nbsave+=1 - file_db.close() - - - ## Extract a sub BioseqDB from the instance with all Bioseq header containing the specified pattern - # - # @param pattern regular expression of wished Bioseq header - # - # @return a BioseqDB - # - def getByPattern(self,pattern): - if pattern=="" : - return - iBioseqDB=BioseqDB() - srch=re.compile(pattern) - for iBioseq in self.db: - if srch.search(iBioseq.header): - iBioseqDB.add(iBioseq) - return iBioseqDB - - - ## Extract a sub BioseqDB from the instance with all Bioseq header not containing the specified pattern - # - # @param pattern regular expression of not wished Bioseq header - # - # @return a BioseqDB - # - def getDiffFromPattern(self,pattern): - if pattern=="" : - return - iBioseqDB=BioseqDB() - srch=re.compile(pattern) - for iBioseq in self.db: - if not srch.search(iBioseq.header): - iBioseqDB.add(iBioseq) - return iBioseqDB - - #TODO: to run several times to remove all concerned sequences when big data. How to fix it ? - ## Remove from the instance all Bioseq which header contains the specified pattern - # - # @param pattern regular expression of not wished Bioseq header - # - def rmByPattern(self,pattern): - if pattern=="" : - return - srch=re.compile(pattern) - for seq in self.db: - if srch.search(seq.header): - self.db.remove(seq) - - - ## Copy a part from another BioseqDB in the BioseqDB if Bioseq have got header containing the specified pattern - # - # @warning this method is called extractPattern in pyRepet.seq.BioseqDB - # - # @param pattern regular expression of wished Bioseq header - # @param sourceBioseqDB the BioseqDB from which we want extract Bioseq - # - def addBioseqFromABioseqDBIfHeaderContainPattern(self, pattern, sourceBioseqDB): - if pattern=="" : - return - srch=re.compile(pattern) - for seq in sourceBioseqDB.db: - m=srch.search(seq.header) - if m: - self.add(seq) - - - ## Up-case the sequence characters in all sequences - # - def upCase( self ): - for bs in self.db: - bs.upCase() - - - ## Split each gapped Bioseq in a list and store all in a dictionary - # - # @return a dict, keys are bioseq headers, values are list of Map instances - # - def getDictOfLMapsWithoutGaps( self ): - dSeq2Maps = {} - - for bs in self.db: - dSeq2Maps[ bs.header ] = bs.getLMapWhithoutGap() - - return dSeq2Maps - - ## Give the list of the sequence length in the bank - # - # @return an list - # - def getListOfSequencesLength( self ): - lLength = [] - for iBioseq in self.db: - lLength.append(iBioseq.getLength()) - - return lLength - - ## Return sequence length for a list of sequence header - # - def getSeqLengthByListOfName( self, lHeaderName ): - lseqLength=[] - for headerName in lHeaderName: - lseqLength.append(self.getSeqLength( headerName )) - return lseqLength