Mercurial > repos > cpt > cpt_putative_usp
comparison cpt.py @ 1:f7afd1480d0f draft
planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
| author | cpt |
|---|---|
| date | Mon, 05 Jun 2023 02:51:59 +0000 |
| parents | |
| children | da1be0114755 |
comparison
equal
deleted
inserted
replaced
| 0:13c3e2d60fa6 | 1:f7afd1480d0f |
|---|---|
| 1 #!/usr/bin/env python | |
| 2 import regex as re | |
| 3 from Bio.Seq import Seq, reverse_complement, translate | |
| 4 from Bio.SeqRecord import SeqRecord | |
| 5 from Bio import SeqIO | |
| 6 from Bio.Data import CodonTable | |
| 7 import logging | |
| 8 | |
| 9 logging.basicConfig() | |
| 10 log = logging.getLogger() | |
| 11 | |
| 12 PHAGE_IN_MIDDLE = re.compile("^(?P<host>.*)\s*phage (?P<phage>.*)$") | |
| 13 BACTERIOPHAGE_IN_MIDDLE = re.compile("^(?P<host>.*)\s*bacteriophage (?P<phage>.*)$") | |
| 14 STARTS_WITH_PHAGE = re.compile( | |
| 15 "^(bacterio|vibrio|Bacterio|Vibrio|)?[Pp]hage (?P<phage>.*)$" | |
| 16 ) | |
| 17 NEW_STYLE_NAMES = re.compile("(?P<phage>v[A-Z]_[A-Z][a-z]{2}_.*)") | |
| 18 | |
| 19 | |
| 20 def phage_name_parser(name): | |
| 21 host = None | |
| 22 phage = None | |
| 23 name = name.replace(", complete genome.", "") | |
| 24 name = name.replace(", complete genome", "") | |
| 25 | |
| 26 m = BACTERIOPHAGE_IN_MIDDLE.match(name) | |
| 27 if m: | |
| 28 host = m.group("host") | |
| 29 phage = m.group("phage") | |
| 30 return (host, phage) | |
| 31 | |
| 32 m = PHAGE_IN_MIDDLE.match(name) | |
| 33 if m: | |
| 34 host = m.group("host") | |
| 35 phage = m.group("phage") | |
| 36 return (host, phage) | |
| 37 | |
| 38 m = STARTS_WITH_PHAGE.match(name) | |
| 39 if m: | |
| 40 phage = m.group("phage") | |
| 41 return (host, phage) | |
| 42 | |
| 43 m = NEW_STYLE_NAMES.match(name) | |
| 44 if m: | |
| 45 phage = m.group("phage") | |
| 46 return (host, phage) | |
| 47 | |
| 48 return (host, phage) | |
| 49 | |
| 50 | |
| 51 class OrfFinder(object): | |
| 52 def __init__(self, table, ftype, ends, min_len, strand): | |
| 53 self.table = table | |
| 54 self.table_obj = CodonTable.ambiguous_generic_by_id[table] | |
| 55 self.ends = ends | |
| 56 self.ftype = ftype | |
| 57 self.min_len = min_len | |
| 58 self.starts = sorted(self.table_obj.start_codons) | |
| 59 self.stops = sorted(self.table_obj.stop_codons) | |
| 60 self.re_starts = re.compile("|".join(self.starts)) | |
| 61 self.re_stops = re.compile("|".join(self.stops)) | |
| 62 self.strand = strand | |
| 63 | |
| 64 def locate(self, fasta_file, out_nuc, out_prot, out_bed, out_gff3): | |
| 65 seq_format = "fasta" | |
| 66 log.debug("Genetic code table %i" % self.table) | |
| 67 log.debug("Minimum length %i aa" % self.min_len) | |
| 68 | |
| 69 out_count = 0 | |
| 70 | |
| 71 out_gff3.write("##gff-version 3\n") | |
| 72 | |
| 73 for idx, record in enumerate(SeqIO.parse(fasta_file, seq_format)): | |
| 74 for i, (f_start, f_end, f_strand, n, t) in enumerate( | |
| 75 self.get_all_peptides(str(record.seq).upper()) | |
| 76 ): | |
| 77 out_count += 1 | |
| 78 | |
| 79 descr = "length %i aa, %i bp, from %s..%s[%s] of %s" % ( | |
| 80 len(t), | |
| 81 len(n), | |
| 82 f_start, | |
| 83 f_end, | |
| 84 f_strand, | |
| 85 record.description, | |
| 86 ) | |
| 87 fid = record.id + "|%s%i" % (self.ftype, i + 1) | |
| 88 | |
| 89 r = SeqRecord(Seq(n), id=fid, name="", description=descr) | |
| 90 t = SeqRecord(Seq(t), id=fid, name="", description=descr) | |
| 91 | |
| 92 SeqIO.write(r, out_nuc, "fasta") | |
| 93 SeqIO.write(t, out_prot, "fasta") | |
| 94 | |
| 95 nice_strand = "+" if f_strand == +1 else "-" | |
| 96 | |
| 97 out_bed.write( | |
| 98 "\t".join( | |
| 99 map(str, [record.id, f_start, f_end, fid, 0, nice_strand]) | |
| 100 ) | |
| 101 + "\n" | |
| 102 ) | |
| 103 | |
| 104 out_gff3.write( | |
| 105 "\t".join( | |
| 106 map( | |
| 107 str, | |
| 108 [ | |
| 109 record.id, | |
| 110 "getOrfsOrCds", | |
| 111 "CDS", | |
| 112 f_start + 1, | |
| 113 f_end, | |
| 114 ".", | |
| 115 nice_strand, | |
| 116 0, | |
| 117 "ID=%s.%s.%s" % (self.ftype, idx, i + 1), | |
| 118 ], | |
| 119 ) | |
| 120 ) | |
| 121 + "\n" | |
| 122 ) | |
| 123 log.info("Found %i %ss", out_count, self.ftype) | |
| 124 | |
| 125 def start_chop_and_trans(self, s, strict=True): | |
| 126 """Returns offset, trimmed nuc, protein.""" | |
| 127 if strict: | |
| 128 assert s[-3:] in self.stops, s | |
| 129 assert len(s) % 3 == 0 | |
| 130 for match in self.re_starts.finditer(s, overlapped=True): | |
| 131 # Must check the start is in frame | |
| 132 start = match.start() | |
| 133 if start % 3 == 0: | |
| 134 n = s[start:] | |
| 135 assert len(n) % 3 == 0, "%s is len %i" % (n, len(n)) | |
| 136 if strict: | |
| 137 t = translate(n, self.table) | |
| 138 else: | |
| 139 # Use when missing stop codon, | |
| 140 t = "M" + translate(n[3:], self.table, to_stop=True) | |
| 141 yield start, n, t # Edited by CPT to be a generator | |
| 142 | |
| 143 def break_up_frame(self, s): | |
| 144 """Returns offset, nuc, protein.""" | |
| 145 start = 0 | |
| 146 for match in self.re_stops.finditer(s, overlapped=True): | |
| 147 index = match.start() + 3 | |
| 148 if index % 3 != 0: | |
| 149 continue | |
| 150 n = s[start:index] | |
| 151 for (offset, n, t) in self.start_chop_and_trans(n): | |
| 152 if n and len(t) >= self.min_len: | |
| 153 yield start + offset, n, t | |
| 154 start = index | |
| 155 | |
| 156 def putative_genes_in_sequence(self, nuc_seq): | |
| 157 """Returns start, end, strand, nucleotides, protein. | |
| 158 Co-ordinates are Python style zero-based. | |
| 159 """ | |
| 160 nuc_seq = nuc_seq.upper() | |
| 161 # TODO - Refactor to use a generator function (in start order) | |
| 162 # rather than making a list and sorting? | |
| 163 answer = [] | |
| 164 full_len = len(nuc_seq) | |
| 165 | |
| 166 for frame in range(0, 3): | |
| 167 for offset, n, t in self.break_up_frame(nuc_seq[frame:]): | |
| 168 start = frame + offset # zero based | |
| 169 answer.append((start, start + len(n), +1, n, t)) | |
| 170 | |
| 171 rc = reverse_complement(nuc_seq) | |
| 172 for frame in range(0, 3): | |
| 173 for offset, n, t in self.break_up_frame(rc[frame:]): | |
| 174 start = full_len - frame - offset # zero based | |
| 175 answer.append((start, start - len(n), -1, n, t)) | |
| 176 answer.sort() | |
| 177 return answer | |
| 178 | |
| 179 def get_all_peptides(self, nuc_seq): | |
| 180 """Returns start, end, strand, nucleotides, protein. | |
| 181 | |
| 182 Co-ordinates are Python style zero-based. | |
| 183 """ | |
| 184 # Refactored into generator by CPT | |
| 185 full_len = len(nuc_seq) | |
| 186 if self.strand != "reverse": | |
| 187 for frame in range(0, 3): | |
| 188 for offset, n, t in self.break_up_frame(nuc_seq[frame:]): | |
| 189 start = frame + offset # zero based | |
| 190 yield (start, start + len(n), +1, n, t) | |
| 191 if self.strand != "forward": | |
| 192 rc = reverse_complement(nuc_seq) | |
| 193 for frame in range(0, 3): | |
| 194 for offset, n, t in self.break_up_frame(rc[frame:]): | |
| 195 start = full_len - frame - offset # zero based | |
| 196 yield (start - len(n), start, -1, n, t) | |
| 197 | |
| 198 | |
| 199 class MGAFinder(object): | |
| 200 def __init__(self, table, ftype, ends, min_len): | |
| 201 self.table = table | |
| 202 self.table_obj = CodonTable.ambiguous_generic_by_id[table] | |
| 203 self.ends = ends | |
| 204 self.ftype = ftype | |
| 205 self.min_len = min_len | |
| 206 self.starts = sorted(self.table_obj.start_codons) | |
| 207 self.stops = sorted(self.table_obj.stop_codons) | |
| 208 self.re_starts = re.compile("|".join(self.starts)) | |
| 209 self.re_stops = re.compile("|".join(self.stops)) | |
| 210 | |
| 211 def locate(self, fasta_file, out_nuc, out_prot, out_bed, out_gff3): | |
| 212 seq_format = "fasta" | |
| 213 log.debug("Genetic code table %i" % self.table) | |
| 214 log.debug("Minimum length %i aa" % self.min_len) | |
| 215 | |
| 216 out_count = 0 | |
| 217 | |
| 218 out_gff3.write("##gff-version 3\n") | |
| 219 | |
| 220 for idx, record in enumerate(SeqIO.parse(fasta_file, seq_format)): | |
| 221 for i, (f_start, f_end, f_strand, n, t) in enumerate( | |
| 222 self.get_all_peptides(str(record.seq).upper()) | |
| 223 ): | |
| 224 out_count += 1 | |
| 225 | |
| 226 descr = "length %i aa, %i bp, from %s..%s[%s] of %s" % ( | |
| 227 len(t), | |
| 228 len(n), | |
| 229 f_start, | |
| 230 f_end, | |
| 231 f_strand, | |
| 232 record.description, | |
| 233 ) | |
| 234 fid = record.id + "|%s%i" % (self.ftype, i + 1) | |
| 235 | |
| 236 r = SeqRecord(Seq(n), id=fid, name="", description=descr) | |
| 237 t = SeqRecord(Seq(t), id=fid, name="", description=descr) | |
| 238 | |
| 239 SeqIO.write(r, out_nuc, "fasta") | |
| 240 SeqIO.write(t, out_prot, "fasta") | |
| 241 | |
| 242 nice_strand = "+" if f_strand == +1 else "-" | |
| 243 | |
| 244 out_bed.write( | |
| 245 "\t".join( | |
| 246 map(str, [record.id, f_start, f_end, fid, 0, nice_strand]) | |
| 247 ) | |
| 248 + "\n" | |
| 249 ) | |
| 250 | |
| 251 out_gff3.write( | |
| 252 "\t".join( | |
| 253 map( | |
| 254 str, | |
| 255 [ | |
| 256 record.id, | |
| 257 "getOrfsOrCds", | |
| 258 "CDS", | |
| 259 f_start + 1, | |
| 260 f_end, | |
| 261 ".", | |
| 262 nice_strand, | |
| 263 0, | |
| 264 "ID=%s.%s.%s" % (self.ftype, idx, i + 1), | |
| 265 ], | |
| 266 ) | |
| 267 ) | |
| 268 + "\n" | |
| 269 ) | |
| 270 log.info("Found %i %ss", out_count, self.ftype) | |
| 271 | |
| 272 def start_chop_and_trans(self, s, strict=True): | |
| 273 """Returns offset, trimmed nuc, protein.""" | |
| 274 if strict: | |
| 275 assert s[-3:] in self.stops, s | |
| 276 assert len(s) % 3 == 0 | |
| 277 for match in self.re_starts.finditer(s, overlapped=True): | |
| 278 # Must check the start is in frame | |
| 279 start = match.start() | |
| 280 if start % 3 == 0: | |
| 281 n = s[start:] | |
| 282 assert len(n) % 3 == 0, "%s is len %i" % (n, len(n)) | |
| 283 if strict: | |
| 284 t = translate(n, self.table) | |
| 285 else: | |
| 286 # Use when missing stop codon, | |
| 287 t = "M" + translate(n[3:], self.table, to_stop=True) | |
| 288 yield start, n, t | |
| 289 | |
| 290 def break_up_frame(self, s): | |
| 291 """Returns offset, nuc, protein.""" | |
| 292 start = 0 | |
| 293 for match in self.re_stops.finditer(s, overlapped=True): | |
| 294 index = match.start() + 3 | |
| 295 if index % 3 != 0: | |
| 296 continue | |
| 297 n = s[start:index] | |
| 298 for (offset, n, t) in self.start_chop_and_trans(n): | |
| 299 if n and len(t) >= self.min_len: | |
| 300 yield start + offset, n, t | |
| 301 start = index | |
| 302 | |
| 303 def putative_genes_in_sequence(self, nuc_seq): | |
| 304 """Returns start, end, strand, nucleotides, protein. | |
| 305 Co-ordinates are Python style zero-based. | |
| 306 """ | |
| 307 nuc_seq = nuc_seq.upper() | |
| 308 # TODO - Refactor to use a generator function (in start order) | |
| 309 # rather than making a list and sorting? | |
| 310 answer = [] | |
| 311 full_len = len(nuc_seq) | |
| 312 | |
| 313 for frame in range(0, 3): | |
| 314 for offset, n, t in self.break_up_frame(nuc_seq[frame:]): | |
| 315 start = frame + offset # zero based | |
| 316 answer.append((start, start + len(n), +1, n, t)) | |
| 317 | |
| 318 rc = reverse_complement(nuc_seq) | |
| 319 for frame in range(0, 3): | |
| 320 for offset, n, t in self.break_up_frame(rc[frame:]): | |
| 321 start = full_len - frame - offset # zero based | |
| 322 answer.append((start, start - len(n), -1, n, t)) | |
| 323 answer.sort() | |
| 324 return answer | |
| 325 | |
| 326 def get_all_peptides(self, nuc_seq): | |
| 327 """Returns start, end, strand, nucleotides, protein. | |
| 328 | |
| 329 Co-ordinates are Python style zero-based. | |
| 330 """ | |
| 331 # Refactored into generator by CPT | |
| 332 | |
| 333 full_len = len(nuc_seq) | |
| 334 for frame in range(0, 3): | |
| 335 for offset, n, t in self.break_up_frame(nuc_seq[frame:]): | |
| 336 start = frame + offset # zero based | |
| 337 yield (start, start + len(n), +1, n, t) | |
| 338 rc = reverse_complement(nuc_seq) | |
| 339 for frame in range(0, 3): | |
| 340 for offset, n, t in self.break_up_frame(rc[frame:]): | |
| 341 start = full_len - frame - offset # zero based | |
| 342 yield (start - len(n), start, -1, n, t) |
