0
|
1 ## @file utilities.py
|
|
2 #
|
|
3 # Gather here utility methods for phageterm. Used in both CPU and GPU version.
|
|
4 #from string import maketrans
|
|
5 import re
|
|
6 import random
|
|
7 import sys
|
|
8
|
|
9 import numpy as np
|
|
10 import datetime
|
|
11
|
|
12 if sys.version_info < (3,):
|
|
13 import string
|
|
14 TRANSTAB = string.maketrans("ACGTN", "TGCAN")
|
|
15 else:
|
|
16 TRANSTAB = str.maketrans("ACGTN", "TGCAN")
|
|
17
|
|
18 def checkReportTitle(report_title):
|
|
19 """Normalise report title (take out any special char)"""
|
|
20 default_title="Analysis_"
|
|
21 right_now=datetime.datetime.now()
|
|
22 default_title+=str(right_now.month)
|
|
23 default_title+=str(right_now.day)
|
|
24 default_title+="_"
|
|
25 default_title+=str(right_now.hour)
|
|
26 default_title+=str(right_now.minute)
|
|
27 titleNorm = ""
|
|
28 charok = list(range(48,58)) + list(range(65,91)) + list(range(97,123)) + [45,95]
|
|
29 for char in report_title:
|
|
30 if ord(char) in charok:
|
|
31 titleNorm += char
|
|
32 if len(titleNorm) > 1:
|
|
33 return titleNorm[:20]
|
|
34 else:
|
|
35 return default
|
|
36
|
|
37 ### SEQUENCE manipulation function
|
|
38 def changeCase(seq):
|
|
39 """Change lower case to UPPER CASE for a sequence string."""
|
|
40 return seq.upper()
|
|
41
|
|
42
|
|
43 def reverseComplement(seq, transtab=str.maketrans('ATGCN', 'TACGN')):
|
|
44 """Reverse Complement a sequence."""
|
|
45 return changeCase(seq).translate(transtab)[::-1]
|
|
46
|
|
47 def longest_common_substring(read, refseq):
|
|
48 """Longest common substring between two strings."""
|
|
49 m = [[0] * (1 + len(refseq)) for i in range(1 + len(read))]
|
|
50 longest, x_longest = 0, 0
|
|
51 for x in range(1, 1 + len(read)):
|
|
52 for y in range(1, 1 + len(refseq)):
|
|
53 if read[x - 1] == refseq[y - 1]:
|
|
54 m[x][y] = m[x - 1][y - 1] + 1
|
|
55 if m[x][y] > longest:
|
|
56 longest = m[x][y]
|
|
57 x_longest = x
|
|
58 else:
|
|
59 m[x][y] = 0
|
|
60 return read[x_longest - longest: x_longest]
|
|
61
|
|
62 def hybridCoverage(read, sequence, hybrid_coverage, start, end):
|
|
63 """Return hybrid coverage."""
|
|
64 aligned_part_only = longest_common_substring(read, sequence[start:end])
|
|
65 for i in range(start, min(len(sequence),start+len(aligned_part_only))):
|
|
66 hybrid_coverage[i]+=1
|
|
67 return hybrid_coverage
|
|
68
|
|
69 ## Determines if readPart maps against Sequence.
|
|
70 #
|
|
71 # @param readPart A part of a read (seed characters usually)
|
|
72 # @param sequence (a contig)
|
|
73 # It choses randomly a mapping position amongst all mappings found.
|
|
74 # It returns 2 numbers: the start and stop position of the chosen mapping location.
|
|
75 def applyCoverage(readPart, sequence):
|
|
76 """Return a random match of a read onto the sequence. """
|
|
77 position = []
|
|
78 for pos in re.finditer(readPart,sequence):
|
|
79 position.append(pos)
|
|
80 if len(position) > 0:
|
|
81 match = random.choice(position)
|
|
82 return match.start(), match.end()
|
|
83 else:
|
|
84 return -1, -1
|
|
85
|
|
86 def correctEdge(coverage, edge):
|
|
87 """Correction of the Edge coverage. """
|
|
88 correctCov = np.array([len(coverage[0])*[0], len(coverage[0])*[0]])
|
|
89 End = len(coverage[0])
|
|
90 covSta = range(edge)
|
|
91 covEnd = range(End-edge,End)
|
|
92 for i in range(len(coverage)):
|
|
93 for j in range(len(coverage[i])):
|
|
94 correctCov[i][j] = coverage[i][j]
|
|
95 for k in covSta:
|
|
96 correctCov[i][k+edge] += coverage[i][k+End-edge]
|
|
97 for l in covEnd:
|
|
98 correctCov[i][l-edge] += coverage[i][l-End+edge]
|
|
99 return correctCov
|
|
100
|
|
101 # utility class for storing results of decisionProcess function
|
|
102 class DecisionProcessOutput:
|
|
103 def __init__(self, Redundant, Permuted, P_class, P_type, P_seqcoh, P_concat,
|
|
104 P_orient, P_left, P_right, Mu_like):
|
|
105 pass
|
|
106
|