Mercurial > repos > cpt > cpt_phageqc_annotations
comparison cpt_phageqc_annotation/cpt.py @ 0:c3140b08d703 draft default tip
Uploaded
author | cpt |
---|---|
date | Fri, 17 Jun 2022 13:00:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:c3140b08d703 |
---|---|
1 #!/usr/bin/env python | |
2 import regex as re | |
3 from Bio.Seq import Seq, reverse_complement, translate | |
4 from Bio.SeqRecord import SeqRecord | |
5 from Bio import SeqIO | |
6 from Bio.Data import CodonTable | |
7 import logging | |
8 | |
9 logging.basicConfig() | |
10 log = logging.getLogger() | |
11 | |
12 PHAGE_IN_MIDDLE = re.compile("^(?P<host>.*)\s*phage (?P<phage>.*)$") | |
13 BACTERIOPHAGE_IN_MIDDLE = re.compile("^(?P<host>.*)\s*bacteriophage (?P<phage>.*)$") | |
14 STARTS_WITH_PHAGE = re.compile( | |
15 "^(bacterio|vibrio|Bacterio|Vibrio|)?[Pp]hage (?P<phage>.*)$" | |
16 ) | |
17 NEW_STYLE_NAMES = re.compile("(?P<phage>v[A-Z]_[A-Z][a-z]{2}_.*)") | |
18 | |
19 | |
20 def phage_name_parser(name): | |
21 host = None | |
22 phage = None | |
23 name = name.replace(", complete genome.", "") | |
24 name = name.replace(", complete genome", "") | |
25 | |
26 m = BACTERIOPHAGE_IN_MIDDLE.match(name) | |
27 if m: | |
28 host = m.group("host") | |
29 phage = m.group("phage") | |
30 return (host, phage) | |
31 | |
32 m = PHAGE_IN_MIDDLE.match(name) | |
33 if m: | |
34 host = m.group("host") | |
35 phage = m.group("phage") | |
36 return (host, phage) | |
37 | |
38 m = STARTS_WITH_PHAGE.match(name) | |
39 if m: | |
40 phage = m.group("phage") | |
41 return (host, phage) | |
42 | |
43 m = NEW_STYLE_NAMES.match(name) | |
44 if m: | |
45 phage = m.group("phage") | |
46 return (host, phage) | |
47 | |
48 return (host, phage) | |
49 | |
50 | |
51 class OrfFinder(object): | |
52 def __init__(self, table, ftype, ends, min_len, strand): | |
53 self.table = table | |
54 self.table_obj = CodonTable.ambiguous_generic_by_id[table] | |
55 self.ends = ends | |
56 self.ftype = ftype | |
57 self.min_len = min_len | |
58 self.starts = sorted(self.table_obj.start_codons) | |
59 self.stops = sorted(self.table_obj.stop_codons) | |
60 self.re_starts = re.compile("|".join(self.starts)) | |
61 self.re_stops = re.compile("|".join(self.stops)) | |
62 self.strand = strand | |
63 | |
64 def locate(self, fasta_file, out_nuc, out_prot, out_bed, out_gff3): | |
65 seq_format = "fasta" | |
66 log.debug("Genetic code table %i" % self.table) | |
67 log.debug("Minimum length %i aa" % self.min_len) | |
68 | |
69 out_count = 0 | |
70 | |
71 out_gff3.write("##gff-version 3\n") | |
72 | |
73 for idx, record in enumerate(SeqIO.parse(fasta_file, seq_format)): | |
74 for i, (f_start, f_end, f_strand, n, t) in enumerate( | |
75 self.get_all_peptides(str(record.seq).upper()) | |
76 ): | |
77 out_count += 1 | |
78 | |
79 descr = "length %i aa, %i bp, from %s..%s[%s] of %s" % ( | |
80 len(t), | |
81 len(n), | |
82 f_start, | |
83 f_end, | |
84 f_strand, | |
85 record.description, | |
86 ) | |
87 fid = record.id + "|%s%i" % (self.ftype, i + 1) | |
88 | |
89 r = SeqRecord(Seq(n), id=fid, name="", description=descr) | |
90 t = SeqRecord(Seq(t), id=fid, name="", description=descr) | |
91 | |
92 SeqIO.write(r, out_nuc, "fasta") | |
93 SeqIO.write(t, out_prot, "fasta") | |
94 | |
95 nice_strand = "+" if f_strand == +1 else "-" | |
96 | |
97 out_bed.write( | |
98 "\t".join( | |
99 map(str, [record.id, f_start, f_end, fid, 0, nice_strand]) | |
100 ) | |
101 + "\n" | |
102 ) | |
103 | |
104 out_gff3.write( | |
105 "\t".join( | |
106 map( | |
107 str, | |
108 [ | |
109 record.id, | |
110 "getOrfsOrCds", | |
111 "CDS", | |
112 f_start + 1, | |
113 f_end, | |
114 ".", | |
115 nice_strand, | |
116 0, | |
117 "ID=%s.%s.%s" % (self.ftype, idx, i + 1), | |
118 ], | |
119 ) | |
120 ) | |
121 + "\n" | |
122 ) | |
123 log.info("Found %i %ss", out_count, self.ftype) | |
124 | |
125 def start_chop_and_trans(self, s, strict=True): | |
126 """Returns offset, trimmed nuc, protein.""" | |
127 if strict: | |
128 assert s[-3:] in self.stops, s | |
129 assert len(s) % 3 == 0 | |
130 for match in self.re_starts.finditer(s, overlapped=True): | |
131 # Must check the start is in frame | |
132 start = match.start() | |
133 if start % 3 == 0: | |
134 n = s[start:] | |
135 assert len(n) % 3 == 0, "%s is len %i" % (n, len(n)) | |
136 if strict: | |
137 t = translate(n, self.table) | |
138 else: | |
139 # Use when missing stop codon, | |
140 t = "M" + translate(n[3:], self.table, to_stop=True) | |
141 yield start, n, t # Edited by CPT to be a generator | |
142 | |
143 def break_up_frame(self, s): | |
144 """Returns offset, nuc, protein.""" | |
145 start = 0 | |
146 for match in self.re_stops.finditer(s, overlapped=True): | |
147 index = match.start() + 3 | |
148 if index % 3 != 0: | |
149 continue | |
150 n = s[start:index] | |
151 for (offset, n, t) in self.start_chop_and_trans(n): | |
152 if n and len(t) >= self.min_len: | |
153 yield start + offset, n, t | |
154 start = index | |
155 | |
156 def putative_genes_in_sequence(self, nuc_seq): | |
157 """Returns start, end, strand, nucleotides, protein. | |
158 Co-ordinates are Python style zero-based. | |
159 """ | |
160 nuc_seq = nuc_seq.upper() | |
161 # TODO - Refactor to use a generator function (in start order) | |
162 # rather than making a list and sorting? | |
163 answer = [] | |
164 full_len = len(nuc_seq) | |
165 | |
166 for frame in range(0, 3): | |
167 for offset, n, t in self.break_up_frame(nuc_seq[frame:]): | |
168 start = frame + offset # zero based | |
169 answer.append((start, start + len(n), +1, n, t)) | |
170 | |
171 rc = reverse_complement(nuc_seq) | |
172 for frame in range(0, 3): | |
173 for offset, n, t in self.break_up_frame(rc[frame:]): | |
174 start = full_len - frame - offset # zero based | |
175 answer.append((start, start - len(n), -1, n, t)) | |
176 answer.sort() | |
177 return answer | |
178 | |
179 def get_all_peptides(self, nuc_seq): | |
180 """Returns start, end, strand, nucleotides, protein. | |
181 | |
182 Co-ordinates are Python style zero-based. | |
183 """ | |
184 # Refactored into generator by CPT | |
185 full_len = len(nuc_seq) | |
186 if self.strand != "reverse": | |
187 for frame in range(0, 3): | |
188 for offset, n, t in self.break_up_frame(nuc_seq[frame:]): | |
189 start = frame + offset # zero based | |
190 yield (start, start + len(n), +1, n, t) | |
191 if self.strand != "forward": | |
192 rc = reverse_complement(nuc_seq) | |
193 for frame in range(0, 3): | |
194 for offset, n, t in self.break_up_frame(rc[frame:]): | |
195 start = full_len - frame - offset # zero based | |
196 yield (start - len(n), start, -1, n, t) | |
197 | |
198 | |
199 class MGAFinder(object): | |
200 def __init__(self, table, ftype, ends, min_len): | |
201 self.table = table | |
202 self.table_obj = CodonTable.ambiguous_generic_by_id[table] | |
203 self.ends = ends | |
204 self.ftype = ftype | |
205 self.min_len = min_len | |
206 self.starts = sorted(self.table_obj.start_codons) | |
207 self.stops = sorted(self.table_obj.stop_codons) | |
208 self.re_starts = re.compile("|".join(self.starts)) | |
209 self.re_stops = re.compile("|".join(self.stops)) | |
210 | |
211 def locate(self, fasta_file, out_nuc, out_prot, out_bed, out_gff3): | |
212 seq_format = "fasta" | |
213 log.debug("Genetic code table %i" % self.table) | |
214 log.debug("Minimum length %i aa" % self.min_len) | |
215 | |
216 out_count = 0 | |
217 | |
218 out_gff3.write("##gff-version 3\n") | |
219 | |
220 for idx, record in enumerate(SeqIO.parse(fasta_file, seq_format)): | |
221 for i, (f_start, f_end, f_strand, n, t) in enumerate( | |
222 self.get_all_peptides(str(record.seq).upper()) | |
223 ): | |
224 out_count += 1 | |
225 | |
226 descr = "length %i aa, %i bp, from %s..%s[%s] of %s" % ( | |
227 len(t), | |
228 len(n), | |
229 f_start, | |
230 f_end, | |
231 f_strand, | |
232 record.description, | |
233 ) | |
234 fid = record.id + "|%s%i" % (self.ftype, i + 1) | |
235 | |
236 r = SeqRecord(Seq(n), id=fid, name="", description=descr) | |
237 t = SeqRecord(Seq(t), id=fid, name="", description=descr) | |
238 | |
239 SeqIO.write(r, out_nuc, "fasta") | |
240 SeqIO.write(t, out_prot, "fasta") | |
241 | |
242 nice_strand = "+" if f_strand == +1 else "-" | |
243 | |
244 out_bed.write( | |
245 "\t".join( | |
246 map(str, [record.id, f_start, f_end, fid, 0, nice_strand]) | |
247 ) | |
248 + "\n" | |
249 ) | |
250 | |
251 out_gff3.write( | |
252 "\t".join( | |
253 map( | |
254 str, | |
255 [ | |
256 record.id, | |
257 "getOrfsOrCds", | |
258 "CDS", | |
259 f_start + 1, | |
260 f_end, | |
261 ".", | |
262 nice_strand, | |
263 0, | |
264 "ID=%s.%s.%s" % (self.ftype, idx, i + 1), | |
265 ], | |
266 ) | |
267 ) | |
268 + "\n" | |
269 ) | |
270 log.info("Found %i %ss", out_count, self.ftype) | |
271 | |
272 def start_chop_and_trans(self, s, strict=True): | |
273 """Returns offset, trimmed nuc, protein.""" | |
274 if strict: | |
275 assert s[-3:] in self.stops, s | |
276 assert len(s) % 3 == 0 | |
277 for match in self.re_starts.finditer(s, overlapped=True): | |
278 # Must check the start is in frame | |
279 start = match.start() | |
280 if start % 3 == 0: | |
281 n = s[start:] | |
282 assert len(n) % 3 == 0, "%s is len %i" % (n, len(n)) | |
283 if strict: | |
284 t = translate(n, self.table) | |
285 else: | |
286 # Use when missing stop codon, | |
287 t = "M" + translate(n[3:], self.table, to_stop=True) | |
288 yield start, n, t | |
289 | |
290 def break_up_frame(self, s): | |
291 """Returns offset, nuc, protein.""" | |
292 start = 0 | |
293 for match in self.re_stops.finditer(s, overlapped=True): | |
294 index = match.start() + 3 | |
295 if index % 3 != 0: | |
296 continue | |
297 n = s[start:index] | |
298 for (offset, n, t) in self.start_chop_and_trans(n): | |
299 if n and len(t) >= self.min_len: | |
300 yield start + offset, n, t | |
301 start = index | |
302 | |
303 def putative_genes_in_sequence(self, nuc_seq): | |
304 """Returns start, end, strand, nucleotides, protein. | |
305 Co-ordinates are Python style zero-based. | |
306 """ | |
307 nuc_seq = nuc_seq.upper() | |
308 # TODO - Refactor to use a generator function (in start order) | |
309 # rather than making a list and sorting? | |
310 answer = [] | |
311 full_len = len(nuc_seq) | |
312 | |
313 for frame in range(0, 3): | |
314 for offset, n, t in self.break_up_frame(nuc_seq[frame:]): | |
315 start = frame + offset # zero based | |
316 answer.append((start, start + len(n), +1, n, t)) | |
317 | |
318 rc = reverse_complement(nuc_seq) | |
319 for frame in range(0, 3): | |
320 for offset, n, t in self.break_up_frame(rc[frame:]): | |
321 start = full_len - frame - offset # zero based | |
322 answer.append((start, start - len(n), -1, n, t)) | |
323 answer.sort() | |
324 return answer | |
325 | |
326 def get_all_peptides(self, nuc_seq): | |
327 """Returns start, end, strand, nucleotides, protein. | |
328 | |
329 Co-ordinates are Python style zero-based. | |
330 """ | |
331 # Refactored into generator by CPT | |
332 | |
333 full_len = len(nuc_seq) | |
334 for frame in range(0, 3): | |
335 for offset, n, t in self.break_up_frame(nuc_seq[frame:]): | |
336 start = frame + offset # zero based | |
337 yield (start, start + len(n), +1, n, t) | |
338 rc = reverse_complement(nuc_seq) | |
339 for frame in range(0, 3): | |
340 for offset, n, t in self.break_up_frame(rc[frame:]): | |
341 start = full_len - frame - offset # zero based | |
342 yield (start - len(n), start, -1, n, t) |