comparison TEisotools-1.0/commons/core/seq/Bioseq.py @ 6:20ec0d14798e draft

Uploaded
author urgi-team
date Wed, 20 Jul 2016 05:00:24 -0400
parents
children
comparison
equal deleted inserted replaced
5:4093a2fb58be 6:20ec0d14798e
1 # Copyright INRA (Institut National de la Recherche Agronomique)
2 # http://www.inra.fr
3 # http://urgi.versailles.inra.fr
4 #
5 # This software is governed by the CeCILL license under French law and
6 # abiding by the rules of distribution of free software. You can use,
7 # modify and/ or redistribute the software under the terms of the CeCILL
8 # license as circulated by CEA, CNRS and INRIA at the following URL
9 # "http://www.cecill.info".
10 #
11 # As a counterpart to the access to the source code and rights to copy,
12 # modify and redistribute granted by the license, users are provided only
13 # with a limited warranty and the software's author, the holder of the
14 # economic rights, and the successive licensors have only limited
15 # liability.
16 #
17 # In this respect, the user's attention is drawn to the risks associated
18 # with loading, using, modifying and/or developing or reproducing the
19 # software by the user in light of its specific status of free software,
20 # that may mean that it is complicated to manipulate, and that also
21 # therefore means that it is reserved for developers and experienced
22 # professionals having in-depth computer knowledge. Users are therefore
23 # encouraged to load and test the software's suitability as regards their
24 # requirements in conditions enabling the security of their systems and/or
25 # data to be ensured and, more generally, to use and operate it in the
26 # same conditions as regards security.
27 #
28 # The fact that you are presently reading this means that you have had
29 # knowledge of the CeCILL license and that you accept its terms.
30
31
32 import re
33 import sys
34 import random
35 import string
36 import cStringIO
37 from commons.core.coord.Map import Map
38 from commons.core.checker.RepetException import RepetException
39
40 DNA_ALPHABET_WITH_N = set(['A', 'T', 'G', 'C', 'N'])
41 IUPAC = set(['A', 'T', 'G', 'C', 'U', 'R', 'Y', 'M', 'K', 'W', 'S', 'B', 'D', 'H', 'V', 'N'])
42
43
44 ## Record a sequence with its header
45 #
46 class Bioseq(object):
47
48 __slots__ = ("header", "sequence", '__dict__')
49
50 ## constructor
51 #
52 # @param name the header of sequence
53 # @param seq sequence (DNA, RNA, protein)
54 #
55 def __init__(self, name = "", seq = ""):
56 self.header = name
57 self.sequence = seq
58
59
60 ## Equal operator
61 #
62 def __eq__(self, o):
63 if type(o) is type(self) and self.header == o.header and self.sequence == o.sequence:
64 return True
65 return False
66
67 ## Not equal operator
68 #
69 def __ne__(self, o):
70 return not self.__eq__(o)
71
72 ## overload __repr__
73 #
74 def __repr__(self):
75 return "%s;%s" % (self.header, self.sequence)
76
77
78 ## set attribute header
79 #
80 # @param header a string
81 #
82 def setHeader(self, header):
83 self.header = header
84
85
86 ## get attribute header
87 #
88 # @return header
89 def getHeader(self):
90 return self.header
91
92
93 ## set attribute sequence
94 #
95 # @param sequence a string
96 #
97 def setSequence(self, sequence):
98 self.sequence = sequence
99
100
101 def getSequence(self):
102 return self.sequence
103
104 ## reset
105 #
106 def reset(self):
107 self.setHeader("")
108 self.setSequence("")
109
110
111 ## Test if bioseq is empty
112 #
113 def isEmpty(self):
114 return self.header == "" and self.sequence == ""
115
116
117 ## Reverse the sequence
118 #
119 def reverse(self):
120 tmp = self.sequence
121 self.sequence = tmp[::-1]
122
123
124 ## Turn the sequence into its complement
125 # Force upper case letters
126 # @warning: old name in pyRepet.Bioseq realComplement
127 #
128 def complement(self):
129 complement = ""
130 self.upCase()
131 for i in xrange(0, len(self.sequence), 1):
132 if self.sequence[i] == "A":
133 complement += "T"
134 elif self.sequence[i] == "T":
135 complement += "A"
136 elif self.sequence[i] == "C":
137 complement += "G"
138 elif self.sequence[i] == "G":
139 complement += "C"
140 elif self.sequence[i] == "M":
141 complement += "K"
142 elif self.sequence[i] == "R":
143 complement += "Y"
144 elif self.sequence[i] == "W":
145 complement += "W"
146 elif self.sequence[i] == "S":
147 complement += "S"
148 elif self.sequence[i] == "Y":
149 complement += "R"
150 elif self.sequence[i] == "K":
151 complement += "M"
152 elif self.sequence[i] == "V":
153 complement += "B"
154 elif self.sequence[i] == "H":
155 complement += "D"
156 elif self.sequence[i] == "D":
157 complement += "H"
158 elif self.sequence[i] == "B":
159 complement += "V"
160 elif self.sequence[i] == "N":
161 complement += "N"
162 elif self.sequence[i] == "-":
163 complement += "-"
164 else:
165 print "WARNING: unknown symbol '%s', replacing it by N" % (self.sequence[i])
166 complement += "N"
167 self.sequence = complement
168
169
170 ## Reverse and complement the sequence
171 #
172 # Force upper case letters
173 # @warning: old name in pyRepet.Bioseq : complement
174 #
175 def reverseComplement(self):
176 self.reverse()
177 self.complement()
178
179
180 ## Remove gap in the sequence
181 #
182 def cleanGap(self):
183 self.sequence = self.sequence.replace("-", "")
184
185
186 ## Copy current Bioseq Instance
187 #
188 # @return: a Bioseq instance, a copy of current sequence.
189 #
190 def copyBioseqInstance(self):
191 seq = Bioseq()
192 seq.sequence = self.sequence
193 seq.header = self.header
194 return seq
195
196
197 ## Add phase information after the name of sequence in header
198 #
199 # @param phase integer representing phase (1, 2, 3, -1, -2, -3)
200 #
201 def setFrameInfoOnHeader(self, phase):
202 if " " in self.header:
203 name, desc = self.header.split(" ", 1)
204 name = name + "_" + str(phase)
205 self.header = name + " " + desc
206 else:
207 self.header = self.header + "_" + str(phase)
208
209
210 ## Fill Bioseq attributes with fasta file
211 #
212 # @param faFileHandler file handler of a fasta file
213 #
214 def read(self, faFileHandler):
215 line = faFileHandler.readline()
216 if line == "":
217 self.header = None
218 self.sequence = None
219 return
220 while line == "\n":
221 line = faFileHandler.readline()
222 if line[0] == '>':
223 self.header = string.rstrip(line[1:])
224 else:
225 print "error, line is", string.rstrip(line)
226 return
227 line = " "
228 seq = cStringIO.StringIO()
229 while line:
230 prev_pos = faFileHandler.tell()
231 line = faFileHandler.readline()
232 if line == "":
233 break
234 if line[0] == '>':
235 faFileHandler.seek(prev_pos)
236 break
237 seq.write(string.rstrip(line))
238 self.sequence = seq.getvalue()
239
240
241 ## Create a subsequence with a modified header
242 #
243 # @param s integer start a required subsequence
244 # @param e integer end a required subsequence
245 #
246 # @return a Bioseq instance, a subsequence of current sequence
247 #
248 def subseq(self, s, e = 0):
249 if e == 0 :
250 e = len(self.sequence)
251 if s > e :
252 print "error: start must be < or = to end"
253 return
254 if s <= 0 :
255 print "error: start must be > 0"
256 return
257 sub = Bioseq()
258 sub.header = self.header + " fragment " + str(s) + ".." + str(e)
259 sub.sequence = self.sequence[(s - 1):e]
260 return sub
261
262
263 ## Get the nucleotide or aminoacid at the given position
264 #
265 # @param pos integer nucleotide or aminoacid position
266 #
267 # @return a string
268 #
269 def getNtFromPosition(self, pos):
270 result = None
271 if not (pos < 1 or pos > self.getLength()):
272 result = self.sequence[pos - 1]
273 return result
274
275
276 ## Print in stdout the Bioseq in fasta format with 60 characters lines
277 #
278 # @param l length of required sequence default is whole sequence
279 #
280 def view(self, l = 0):
281 print '>' + self.header
282 i = 0
283 if(l == 0):
284 l = len(self.sequence)
285 seq = self.sequence[0:l]
286
287 while i < len(seq):
288 print seq[i:i + 60]
289 i = i + 60
290
291
292 ## Get length of sequence
293 #
294 # @param avoidN boolean don't count 'N' nucleotides
295 #
296 # @return length of current sequence
297 #
298 def getLength(self, countN = True):
299 if countN:
300 return len(self.sequence)
301 else:
302 return len(self.sequence) - self.countNt('N')
303
304
305 ## Return the proportion of a specific character
306 #
307 # @param nt character that we want to count
308 #
309 def propNt(self, nt):
310 return self.countNt(nt) / float(self.getLength())
311
312
313 ## Count occurrence of specific character
314 #
315 # @param nt character that we want to count
316 #
317 # @return nb of occurrences
318 #
319 def countNt(self, nt):
320 return self.sequence.count(nt)
321
322
323 ## Count occurrence of each nucleotide in current seq
324 #
325 # @return a dict, keys are nucleotides, values are nb of occurrences
326 #
327 def countAllNt(self):
328 dNt2Count = {}
329 for nt in ["A", "T", "G", "C", "N"]:
330 dNt2Count[ nt ] = self.countNt(nt)
331 return dNt2Count
332
333
334 ## Return a dict with the number of occurrences for each combination of ATGC of specified size and number of word found
335 #
336 # @param size integer required length word
337 #
338 def occ_word(self, size):
339 occ = {}
340 if size == 0:
341 return occ, 0
342 nbword = 0
343 srch = re.compile('[^ATGC]+')
344 wordlist = self._createWordList(size)
345 for i in wordlist:
346 occ[i] = 0
347 lenseq = len(self.sequence)
348 i = 0
349 while i < lenseq - size + 1:
350 word = self.sequence[i:i + size].upper()
351 m = srch.search(word)
352 if m == None:
353 occ[word] = occ[word] + 1
354 nbword = nbword + 1
355 i = i + 1
356 else:
357 i = i + m.end(0)
358 return occ, nbword
359
360
361 ## Return a dictionary with the frequency of occurs for each combination of ATGC of specified size
362 #
363 # @param size integer required length word
364 #
365 def freq_word(self, size):
366 dOcc, nbWords = self.occ_word(size)
367 freq = {}
368 for word in dOcc.keys():
369 freq[word] = float(dOcc[word]) / nbWords
370 return freq
371
372
373 ## Find ORF in each phase
374 #
375 # @return: a dict, keys are phases, values are stop codon positions.
376 #
377 def findORF (self):
378 orf = {0:[], 1:[], 2:[]}
379 length = len (self.sequence)
380 for i in xrange(0, length):
381 triplet = self.sequence[i:i + 3]
382 if (triplet == "TAA" or triplet == "TAG" or triplet == "TGA"):
383 phase = i % 3
384 orf[phase].append(i)
385 return orf
386
387
388 ## Convert the sequence into upper case
389 #
390 def upCase(self):
391 self.sequence = self.sequence.upper()
392
393
394 ## Convert the sequence into lower case
395 #
396 def lowCase(self):
397 self.sequence = self.sequence.lower()
398
399
400 ## Extract the cluster of the fragment (output from Grouper)
401 #
402 # @return cluster id (string)
403 #
404 def getClusterID(self):
405 data = self.header.split()
406 return data[0].split("Cl")[1]
407
408
409 ## Extract the group of the sequence (output from Grouper)
410 #
411 # @return group id (string)
412 #
413 def getGroupID(self):
414 data = self.header.split()
415 return data[0].split("Gr")[1].split("Cl")[0]
416
417
418 ## Get the header of the full sequence (output from Grouper)
419 #
420 # @example 'Dmel_Grouper_3091_Malign_3:LARD' from '>MbS1566Gr81Cl81 Dmel_Grouper_3091_Malign_3:LARD {Fragment} 1..5203'
421 # @return header (string)
422 #
423 def getHeaderFullSeq(self):
424 data = self.header.split()
425 return data[1]
426
427
428 ## Get the strand of the fragment (output from Grouper)
429 #
430 # @return: strand (+ or -)
431 #
432 def getFragStrand(self):
433 data = self.header.split()
434 coord = data[3].split("..")
435 if int(coord[0]) < int(coord[-1]):
436 return "+"
437 else:
438 return "-"
439
440
441 ## Get A, T, G, C or N from an IUPAC letter
442 # IUPAC = ['A','T','G','C','U','R','Y','M','K','W','S','B','D','H','V','N']
443 #
444 # @return A, T, G, C or N
445 #
446 def getATGCNFromIUPAC(self, nt):
447 subset = ["A", "T", "G", "C", "N"]
448
449 if nt in subset:
450 return nt
451 elif nt == "U":
452 return "T"
453 elif nt == "R":
454 return random.choice("AG")
455 elif nt == "Y":
456 return random.choice("CT")
457 elif nt == "M":
458 return random.choice("CA")
459 elif nt == "K":
460 return random.choice("TG")
461 elif nt == "W":
462 return random.choice("TA")
463 elif nt == "S":
464 return random.choice("CG")
465 elif nt == "B":
466 return random.choice("CTG")
467 elif nt == "D":
468 return random.choice("ATG")
469 elif nt == "H":
470 return random.choice("ATC")
471 elif nt == "V":
472 return random.choice("ACG")
473 else:
474 return "N"
475
476 ## Get nucleotide from an IUPAC letter and a nucleotide
477 # Works only for IUPAC code with two possibilities ['R','Y','M','K','W','S']
478 # Examples:
479 # Y and C returns T
480 # Y and T returns C
481 # B and C throws RepetException
482 #
483 # @return A, T, G, C
484 #
485 def getATGCNFromIUPACandATGCN(self, IUPACCode, nt):
486 if IUPACCode == "R":
487 possibleNt = set(["A", "G"])
488 if nt not in possibleNt:
489 raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
490 return (possibleNt - set(nt)).pop()
491
492 elif IUPACCode == "Y":
493 possibleNt = set(["C", "T"])
494 if nt not in possibleNt:
495 raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
496 return (possibleNt - set(nt)).pop()
497
498 elif IUPACCode == "M":
499 possibleNt = set(["A", "C"])
500 if nt not in possibleNt:
501 raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
502 return (possibleNt - set(nt)).pop()
503
504 elif IUPACCode == "K":
505 possibleNt = set(["T", "G"])
506 if nt not in possibleNt:
507 raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
508 return (possibleNt - set(nt)).pop()
509
510 elif IUPACCode == "W":
511 possibleNt = set(["A", "T"])
512 if nt not in possibleNt:
513 raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
514 return (possibleNt - set(nt)).pop()
515
516 elif IUPACCode == "S":
517 possibleNt = set(["C", "G"])
518 if nt not in possibleNt:
519 raise RepetException("IUPAC code '%s' and nucleotide '%s' are not compatible" % (IUPACCode, nt))
520 return (possibleNt - set(nt)).pop()
521
522 else:
523 raise RepetException("Can't retrieve the third nucleotide from IUPAC code '%s' and nucleotide '%s'" % (IUPACCode, nt))
524
525 def getSeqWithOnlyATGCN(self):
526 newSeq = ""
527 for nt in self.sequence:
528 newSeq += self.getATGCNFromIUPAC(nt)
529 return newSeq
530
531
532 ## Replace any symbol not in (A,T,G,C,N) by another nucleotide it represents
533 #
534 def partialIUPAC(self):
535 self.sequence = self.getSeqWithOnlyATGCN()
536
537
538 ## Remove non Unix end-of-line symbols, if any
539 #
540 def checkEOF(self):
541 symbol = "\r" # corresponds to '^M' from Windows
542 if symbol in self.sequence:
543 print "WARNING: Windows EOF removed in '%s'" % (self.header)
544 sys.stdout.flush()
545 newSeq = self.sequence.replace(symbol, "")
546 self.sequence = newSeq
547
548
549 ## Write Bioseq instance into a fasta file handler
550 #
551 # @param faFileHandler file handler of a fasta file
552 #
553 def write(self, faFileHandler):
554 faFileHandler.write(">%s\n" % (self.header))
555 self.writeSeqInFasta(faFileHandler)
556
557
558 ## Write only the sequence of Bioseq instance into a fasta file handler
559 #
560 # @param faFileHandler file handler of a fasta file
561 #
562 def writeSeqInFasta(self, faFileHandler):
563 i = 0
564 while i < self.getLength():
565 faFileHandler.write("%s\n" % (self.sequence[i:i + 60]))
566 i += 60
567
568
569 ## Append Bioseq instance to a fasta file
570 #
571 # @param faFile name of a fasta file as a string
572 # @param mode 'write' or 'append'
573 #
574 def save(self, faFile, mode = "a"):
575 faFileHandler = open(faFile, mode)
576 self.write(faFileHandler)
577 faFileHandler.close()
578
579
580 ## Append Bioseq instance to a fasta file
581 #
582 # @param faFile name of a fasta file as a string
583 #
584 def appendBioseqInFile(self, faFile):
585 self.save(faFile, "a")
586
587
588 ## Write Bioseq instance into a fasta file handler
589 #
590 # @param faFileHandler file handler on a file with writing right
591 #
592 def writeABioseqInAFastaFile(self, faFileHandler):
593 self.write(faFileHandler)
594
595
596 ## Write Bioseq instance with other header into a fasta file handler
597 #
598 # @param faFileHandler file handler on a file with writing right
599 # @param otherHeader a string representing a new header (without the > and the \n)
600 #
601 def writeWithOtherHeader(self, faFileHandler, otherHeader):
602 self.header = otherHeader
603 self.write(faFileHandler)
604
605
606 ## Append Bioseq header and Bioseq sequence in a fasta file
607 #
608 # @param faFileHandler file handler on a file with writing right
609 # @param otherHeader a string representing a new header (without the > and the \n)
610 #
611 def writeABioseqInAFastaFileWithOtherHeader(self, faFileHandler, otherHeader):
612 self.writeWithOtherHeader(faFileHandler, otherHeader)
613
614
615 ## get the list of Maps corresponding to seq without gap
616 #
617 # @warning This method was called getMap() in pyRepet.Bioseq
618 # @return a list of Map object
619 #
620 def getLMapWhithoutGap(self):
621 lMaps = []
622 countSite = 1
623 countSubseq = 1
624 inGap = False
625 startMap = -1
626 endMap = -1
627
628 # initialize with the first site
629 if self.sequence[0] == "-":
630 inGap = True
631 else:
632 startMap = countSite
633
634 # for each remaining site
635 for site in self.sequence[1:]:
636 countSite += 1
637
638 # if it is a gap
639 if site == "-":
640
641 # if this is the beginning of a gap, record the previous subsequence
642 if inGap == False:
643 inGap = True
644 endMap = countSite - 1
645 lMaps.append(Map("%s_subSeq%i" % (self.header, countSubseq), self.header, startMap, endMap))
646 countSubseq += 1
647
648 # if it is NOT a gap
649 if site != "-":
650
651 # if it is the end of a gap, begin the next subsequence
652 if inGap == True:
653 inGap = False
654 startMap = countSite
655
656 # if it is the last site
657 if countSite == self.getLength():
658 endMap = countSite
659 lMaps.append(Map("%s_subSeq%i" % (self.header, countSubseq), self.header, startMap, endMap))
660
661 return lMaps
662
663
664 ## get the percentage of GC
665 #
666 # @return a percentage
667 #
668 def getGCpercentage(self):
669 tmpSeq = self.getSeqWithOnlyATGCN()
670 nbGC = tmpSeq.count("G") + tmpSeq.count("C")
671 return 100 * nbGC / float(self.getLength())
672
673 ## get the percentage of GC of a sequence without counting N in sequence length
674 #
675 # @return a percentage
676 #
677 def getGCpercentageInSequenceWithoutCountNInLength(self):
678 tmpSeq = self.getSeqWithOnlyATGCN()
679 nbGC = tmpSeq.count("G") + tmpSeq.count("C")
680 return 100 * nbGC / float(self.getLength() - self.countNt("N"))
681
682 ## get the 5 prime subsequence of a given length at the given position
683 #
684 # @param position integer
685 # @param flankLength integer subsequence length
686 # @return a sequence string
687 #
688 def get5PrimeFlank(self, position, flankLength):
689 if(position == 1):
690 return ""
691 else:
692 startOfFlank = 1
693 endOfFlank = position - 1
694
695 if((position - flankLength) > 0):
696 startOfFlank = position - flankLength
697 else:
698 startOfFlank = 1
699
700 return self.subseq(startOfFlank, endOfFlank).sequence
701
702
703 ## get the 3 prime subsequence of a given length at the given position
704 # In the case of indels, the polymorphism length can be specified
705 #
706 # @param position integer
707 # @param flankLength integer subsequence length
708 # @param polymLength integer polymorphism length
709 # @return a sequence string
710 #
711 def get3PrimeFlank(self, position, flankLength, polymLength = 1):
712 if((position + polymLength) > len(self.sequence)):
713 return ""
714 else:
715 startOfFlank = position + polymLength
716
717 if((position + polymLength + flankLength) > len(self.sequence)):
718 endOfFlank = len(self.sequence)
719 else:
720 endOfFlank = position + polymLength + flankLength - 1
721
722 return self.subseq(startOfFlank, endOfFlank).sequence
723
724
725 def _createWordList(self, size, l = ['A', 'T', 'G', 'C']):
726 if size == 1 :
727 return l
728 else:
729 l2 = []
730 for i in l:
731 for j in ['A', 'T', 'G', 'C']:
732 l2.append(i + j)
733 return self._createWordList(size - 1, l2)
734
735
736 def removeSymbol(self, symbol):
737 tmp = self.sequence.replace(symbol, "")
738 self.sequence = tmp