Mercurial > repos > cpt > cpt_putative_isp
comparison cpt.py @ 1:4e02e6e9e77d draft
planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
| author | cpt |
|---|---|
| date | Mon, 05 Jun 2023 02:51:35 +0000 |
| parents | |
| children | 08499fbf8697 |
comparison
equal
deleted
inserted
replaced
| 0:10d13d0c37d3 | 1:4e02e6e9e77d |
|---|---|
| 1 #!/usr/bin/env python | |
| 2 from Bio.Seq import Seq, reverse_complement, translate | |
| 3 from Bio.SeqRecord import SeqRecord | |
| 4 from Bio import SeqIO | |
| 5 from Bio.Data import CodonTable | |
| 6 import logging | |
| 7 | |
| 8 logging.basicConfig() | |
| 9 log = logging.getLogger() | |
| 10 | |
| 11 PHAGE_IN_MIDDLE = re.compile("^(?P<host>.*)\s*phage (?P<phage>.*)$") | |
| 12 BACTERIOPHAGE_IN_MIDDLE = re.compile("^(?P<host>.*)\s*bacteriophage (?P<phage>.*)$") | |
| 13 STARTS_WITH_PHAGE = re.compile( | |
| 14 "^(bacterio|vibrio|Bacterio|Vibrio|)?[Pp]hage (?P<phage>.*)$" | |
| 15 ) | |
| 16 NEW_STYLE_NAMES = re.compile("(?P<phage>v[A-Z]_[A-Z][a-z]{2}_.*)") | |
| 17 | |
| 18 | |
| 19 def phage_name_parser(name): | |
| 20 host = None | |
| 21 phage = None | |
| 22 name = name.replace(", complete genome.", "") | |
| 23 name = name.replace(", complete genome", "") | |
| 24 | |
| 25 m = BACTERIOPHAGE_IN_MIDDLE.match(name) | |
| 26 if m: | |
| 27 host = m.group("host") | |
| 28 phage = m.group("phage") | |
| 29 return (host, phage) | |
| 30 | |
| 31 m = PHAGE_IN_MIDDLE.match(name) | |
| 32 if m: | |
| 33 host = m.group("host") | |
| 34 phage = m.group("phage") | |
| 35 return (host, phage) | |
| 36 | |
| 37 m = STARTS_WITH_PHAGE.match(name) | |
| 38 if m: | |
| 39 phage = m.group("phage") | |
| 40 return (host, phage) | |
| 41 | |
| 42 m = NEW_STYLE_NAMES.match(name) | |
| 43 if m: | |
| 44 phage = m.group("phage") | |
| 45 return (host, phage) | |
| 46 | |
| 47 return (host, phage) | |
| 48 | |
| 49 | |
| 50 class OrfFinder(object): | |
| 51 def __init__(self, table, ftype, ends, min_len, strand): | |
| 52 self.table = table | |
| 53 self.table_obj = CodonTable.ambiguous_generic_by_id[table] | |
| 54 self.ends = ends | |
| 55 self.ftype = ftype | |
| 56 self.min_len = min_len | |
| 57 self.starts = sorted(self.table_obj.start_codons) | |
| 58 self.stops = sorted(self.table_obj.stop_codons) | |
| 59 self.re_starts = re.compile("|".join(self.starts)) | |
| 60 self.re_stops = re.compile("|".join(self.stops)) | |
| 61 self.strand = strand | |
| 62 | |
| 63 def locate(self, fasta_file, out_nuc, out_prot, out_bed, out_gff3): | |
| 64 seq_format = "fasta" | |
| 65 log.debug("Genetic code table %i" % self.table) | |
| 66 log.debug("Minimum length %i aa" % self.min_len) | |
| 67 | |
| 68 out_count = 0 | |
| 69 | |
| 70 out_gff3.write("##gff-version 3\n") | |
| 71 | |
| 72 for idx, record in enumerate(SeqIO.parse(fasta_file, seq_format)): | |
| 73 for i, (f_start, f_end, f_strand, n, t) in enumerate( | |
| 74 self.get_all_peptides(str(record.seq).upper()) | |
| 75 ): | |
| 76 out_count += 1 | |
| 77 | |
| 78 descr = "length %i aa, %i bp, from %s..%s[%s] of %s" % ( | |
| 79 len(t), | |
| 80 len(n), | |
| 81 f_start, | |
| 82 f_end, | |
| 83 f_strand, | |
| 84 record.description, | |
| 85 ) | |
| 86 fid = record.id + "|%s%i" % (self.ftype, i + 1) | |
| 87 | |
| 88 r = SeqRecord(Seq(n), id=fid, name="", description=descr) | |
| 89 t = SeqRecord(Seq(t), id=fid, name="", description=descr) | |
| 90 | |
| 91 SeqIO.write(r, out_nuc, "fasta") | |
| 92 SeqIO.write(t, out_prot, "fasta") | |
| 93 | |
| 94 nice_strand = "+" if f_strand == +1 else "-" | |
| 95 | |
| 96 out_bed.write( | |
| 97 "\t".join( | |
| 98 map(str, [record.id, f_start, f_end, fid, 0, nice_strand]) | |
| 99 ) | |
| 100 + "\n" | |
| 101 ) | |
| 102 | |
| 103 out_gff3.write( | |
| 104 "\t".join( | |
| 105 map( | |
| 106 str, | |
| 107 [ | |
| 108 record.id, | |
| 109 "getOrfsOrCds", | |
| 110 "CDS", | |
| 111 f_start + 1, | |
| 112 f_end, | |
| 113 ".", | |
| 114 nice_strand, | |
| 115 0, | |
| 116 "ID=%s.%s.%s" % (self.ftype, idx, i + 1), | |
| 117 ], | |
| 118 ) | |
| 119 ) | |
| 120 + "\n" | |
| 121 ) | |
| 122 log.info("Found %i %ss", out_count, self.ftype) | |
| 123 | |
| 124 def start_chop_and_trans(self, s, strict=True): | |
| 125 """Returns offset, trimmed nuc, protein.""" | |
| 126 if strict: | |
| 127 assert s[-3:] in self.stops, s | |
| 128 assert len(s) % 3 == 0 | |
| 129 for match in self.re_starts.finditer(s, overlapped=True): | |
| 130 # Must check the start is in frame | |
| 131 start = match.start() | |
| 132 if start % 3 == 0: | |
| 133 n = s[start:] | |
| 134 assert len(n) % 3 == 0, "%s is len %i" % (n, len(n)) | |
| 135 if strict: | |
| 136 t = translate(n, self.table) | |
| 137 else: | |
| 138 # Use when missing stop codon, | |
| 139 t = "M" + translate(n[3:], self.table, to_stop=True) | |
| 140 yield start, n, t # Edited by CPT to be a generator | |
| 141 | |
| 142 def break_up_frame(self, s): | |
| 143 """Returns offset, nuc, protein.""" | |
| 144 start = 0 | |
| 145 for match in self.re_stops.finditer(s, overlapped=True): | |
| 146 index = match.start() + 3 | |
| 147 if index % 3 != 0: | |
| 148 continue | |
| 149 n = s[start:index] | |
| 150 for (offset, n, t) in self.start_chop_and_trans(n): | |
| 151 if n and len(t) >= self.min_len: | |
| 152 yield start + offset, n, t | |
| 153 start = index | |
| 154 | |
| 155 def putative_genes_in_sequence(self, nuc_seq): | |
| 156 """Returns start, end, strand, nucleotides, protein. | |
| 157 Co-ordinates are Python style zero-based. | |
| 158 """ | |
| 159 nuc_seq = nuc_seq.upper() | |
| 160 # TODO - Refactor to use a generator function (in start order) | |
| 161 # rather than making a list and sorting? | |
| 162 answer = [] | |
| 163 full_len = len(nuc_seq) | |
| 164 | |
| 165 for frame in range(0, 3): | |
| 166 for offset, n, t in self.break_up_frame(nuc_seq[frame:]): | |
| 167 start = frame + offset # zero based | |
| 168 answer.append((start, start + len(n), +1, n, t)) | |
| 169 | |
| 170 rc = reverse_complement(nuc_seq) | |
| 171 for frame in range(0, 3): | |
| 172 for offset, n, t in self.break_up_frame(rc[frame:]): | |
| 173 start = full_len - frame - offset # zero based | |
| 174 answer.append((start, start - len(n), -1, n, t)) | |
| 175 answer.sort() | |
| 176 return answer | |
| 177 | |
| 178 def get_all_peptides(self, nuc_seq): | |
| 179 """Returns start, end, strand, nucleotides, protein. | |
| 180 | |
| 181 Co-ordinates are Python style zero-based. | |
| 182 """ | |
| 183 # Refactored into generator by CPT | |
| 184 full_len = len(nuc_seq) | |
| 185 if self.strand != "reverse": | |
| 186 for frame in range(0, 3): | |
| 187 for offset, n, t in self.break_up_frame(nuc_seq[frame:]): | |
| 188 start = frame + offset # zero based | |
| 189 yield (start, start + len(n), +1, n, t) | |
| 190 if self.strand != "forward": | |
| 191 rc = reverse_complement(nuc_seq) | |
| 192 for frame in range(0, 3): | |
| 193 for offset, n, t in self.break_up_frame(rc[frame:]): | |
| 194 start = full_len - frame - offset # zero based | |
| 195 yield (start - len(n), start, -1, n, t) | |
| 196 | |
| 197 | |
| 198 class MGAFinder(object): | |
| 199 def __init__(self, table, ftype, ends, min_len): | |
| 200 self.table = table | |
| 201 self.table_obj = CodonTable.ambiguous_generic_by_id[table] | |
| 202 self.ends = ends | |
| 203 self.ftype = ftype | |
| 204 self.min_len = min_len | |
| 205 self.starts = sorted(self.table_obj.start_codons) | |
| 206 self.stops = sorted(self.table_obj.stop_codons) | |
| 207 self.re_starts = re.compile("|".join(self.starts)) | |
| 208 self.re_stops = re.compile("|".join(self.stops)) | |
| 209 | |
| 210 def locate(self, fasta_file, out_nuc, out_prot, out_bed, out_gff3): | |
| 211 seq_format = "fasta" | |
| 212 log.debug("Genetic code table %i" % self.table) | |
| 213 log.debug("Minimum length %i aa" % self.min_len) | |
| 214 | |
| 215 out_count = 0 | |
| 216 | |
| 217 out_gff3.write("##gff-version 3\n") | |
| 218 | |
| 219 for idx, record in enumerate(SeqIO.parse(fasta_file, seq_format)): | |
| 220 for i, (f_start, f_end, f_strand, n, t) in enumerate( | |
| 221 self.get_all_peptides(str(record.seq).upper()) | |
| 222 ): | |
| 223 out_count += 1 | |
| 224 | |
| 225 descr = "length %i aa, %i bp, from %s..%s[%s] of %s" % ( | |
| 226 len(t), | |
| 227 len(n), | |
| 228 f_start, | |
| 229 f_end, | |
| 230 f_strand, | |
| 231 record.description, | |
| 232 ) | |
| 233 fid = record.id + "|%s%i" % (self.ftype, i + 1) | |
| 234 | |
| 235 r = SeqRecord(Seq(n), id=fid, name="", description=descr) | |
| 236 t = SeqRecord(Seq(t), id=fid, name="", description=descr) | |
| 237 | |
| 238 SeqIO.write(r, out_nuc, "fasta") | |
| 239 SeqIO.write(t, out_prot, "fasta") | |
| 240 | |
| 241 nice_strand = "+" if f_strand == +1 else "-" | |
| 242 | |
| 243 out_bed.write( | |
| 244 "\t".join( | |
| 245 map(str, [record.id, f_start, f_end, fid, 0, nice_strand]) | |
| 246 ) | |
| 247 + "\n" | |
| 248 ) | |
| 249 | |
| 250 out_gff3.write( | |
| 251 "\t".join( | |
| 252 map( | |
| 253 str, | |
| 254 [ | |
| 255 record.id, | |
| 256 "getOrfsOrCds", | |
| 257 "CDS", | |
| 258 f_start + 1, | |
| 259 f_end, | |
| 260 ".", | |
| 261 nice_strand, | |
| 262 0, | |
| 263 "ID=%s.%s.%s" % (self.ftype, idx, i + 1), | |
| 264 ], | |
| 265 ) | |
| 266 ) | |
| 267 + "\n" | |
| 268 ) | |
| 269 log.info("Found %i %ss", out_count, self.ftype) | |
| 270 | |
| 271 def start_chop_and_trans(self, s, strict=True): | |
| 272 """Returns offset, trimmed nuc, protein.""" | |
| 273 if strict: | |
| 274 assert s[-3:] in self.stops, s | |
| 275 assert len(s) % 3 == 0 | |
| 276 for match in self.re_starts.finditer(s, overlapped=True): | |
| 277 # Must check the start is in frame | |
| 278 start = match.start() | |
| 279 if start % 3 == 0: | |
| 280 n = s[start:] | |
| 281 assert len(n) % 3 == 0, "%s is len %i" % (n, len(n)) | |
| 282 if strict: | |
| 283 t = translate(n, self.table) | |
| 284 else: | |
| 285 # Use when missing stop codon, | |
| 286 t = "M" + translate(n[3:], self.table, to_stop=True) | |
| 287 yield start, n, t | |
| 288 | |
| 289 def break_up_frame(self, s): | |
| 290 """Returns offset, nuc, protein.""" | |
| 291 start = 0 | |
| 292 for match in self.re_stops.finditer(s, overlapped=True): | |
| 293 index = match.start() + 3 | |
| 294 if index % 3 != 0: | |
| 295 continue | |
| 296 n = s[start:index] | |
| 297 for (offset, n, t) in self.start_chop_and_trans(n): | |
| 298 if n and len(t) >= self.min_len: | |
| 299 yield start + offset, n, t | |
| 300 start = index | |
| 301 | |
| 302 def putative_genes_in_sequence(self, nuc_seq): | |
| 303 """Returns start, end, strand, nucleotides, protein. | |
| 304 Co-ordinates are Python style zero-based. | |
| 305 """ | |
| 306 nuc_seq = nuc_seq.upper() | |
| 307 # TODO - Refactor to use a generator function (in start order) | |
| 308 # rather than making a list and sorting? | |
| 309 answer = [] | |
| 310 full_len = len(nuc_seq) | |
| 311 | |
| 312 for frame in range(0, 3): | |
| 313 for offset, n, t in self.break_up_frame(nuc_seq[frame:]): | |
| 314 start = frame + offset # zero based | |
| 315 answer.append((start, start + len(n), +1, n, t)) | |
| 316 | |
| 317 rc = reverse_complement(nuc_seq) | |
| 318 for frame in range(0, 3): | |
| 319 for offset, n, t in self.break_up_frame(rc[frame:]): | |
| 320 start = full_len - frame - offset # zero based | |
| 321 answer.append((start, start - len(n), -1, n, t)) | |
| 322 answer.sort() | |
| 323 return answer | |
| 324 | |
| 325 def get_all_peptides(self, nuc_seq): | |
| 326 """Returns start, end, strand, nucleotides, protein. | |
| 327 | |
| 328 Co-ordinates are Python style zero-based. | |
| 329 """ | |
| 330 # Refactored into generator by CPT | |
| 331 | |
| 332 full_len = len(nuc_seq) | |
| 333 for frame in range(0, 3): | |
| 334 for offset, n, t in self.break_up_frame(nuc_seq[frame:]): | |
| 335 start = frame + offset # zero based | |
| 336 yield (start, start + len(n), +1, n, t) | |
| 337 rc = reverse_complement(nuc_seq) | |
| 338 for frame in range(0, 3): | |
| 339 for offset, n, t in self.break_up_frame(rc[frame:]): | |
| 340 start = full_len - frame - offset # zero based | |
| 341 yield (start - len(n), start, -1, n, t) |
