Mercurial > repos > cpt > cpt_gff_to_gbk
comparison gff2gb.py @ 3:c8fcb7246ac3 draft
planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author | cpt |
---|---|
date | Mon, 05 Jun 2023 02:44:32 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
2:6795d3349462 | 3:c8fcb7246ac3 |
---|---|
1 #!/usr/bin/env python | |
2 """Convert a GFF and associated FASTA file into GenBank format. | |
3 | |
4 Usage: | |
5 gff_to_genbank.py <GFF annotation file> <FASTA sequence file> | |
6 """ | |
7 import argparse | |
8 import sys | |
9 import re | |
10 import copy | |
11 import itertools | |
12 import logging | |
13 from Bio import SeqIO | |
14 | |
15 # from Bio.Alphabet import generic_dna | |
16 from Bio.SeqFeature import CompoundLocation, FeatureLocation | |
17 from CPT_GFFParser import gffParse, gffWrite | |
18 from gff3 import ( | |
19 feature_lambda, | |
20 wa_unified_product_name, | |
21 is_uuid, | |
22 feature_test_type, | |
23 fsort, | |
24 feature_test_true, | |
25 feature_test_quals, | |
26 ) | |
27 | |
28 default_name = re.compile(r"^gene_(\d+)$") | |
29 logging.basicConfig(level=logging.INFO) | |
30 | |
31 | |
32 def rename_key(ds, k_f, k_t): | |
33 """Rename a key in a dictionary and return it, FP style""" | |
34 # If they key is not in the dictionary, just return immediately | |
35 if k_f not in ds: | |
36 return ds | |
37 | |
38 # Otherwise, we check if the target key is in there | |
39 if k_t in ds: | |
40 # If it is, we need to append | |
41 ds[k_t] += ds[k_f] | |
42 else: | |
43 # if not, we can just set. | |
44 ds[k_t] = ds[k_f] | |
45 | |
46 # Remove source | |
47 del ds[k_f] | |
48 return ds | |
49 | |
50 | |
51 def gff3_to_genbank(gff_file, fasta_file, transltbl): | |
52 fasta_input = SeqIO.to_dict(SeqIO.parse(fasta_file, "fasta")) # , generic_dna)) | |
53 gff_iter = gffParse(gff_file, fasta_input) | |
54 | |
55 for record in gff_iter: | |
56 yield handle_record(record, transltbl) | |
57 | |
58 | |
59 def handle_non_gene_features(features): | |
60 # These are NON-GENE features (maybe terminators? etc?) | |
61 for feature in feature_lambda( | |
62 features, | |
63 feature_test_type, | |
64 {"type": "gene"}, | |
65 subfeatures=False, | |
66 invert=True, | |
67 recurse=True, # used to catch RBS from new apollo runs (used to be False) | |
68 ): | |
69 if feature.type in ( | |
70 "terminator", | |
71 "tRNA", | |
72 "Shine_Dalgarno_sequence", | |
73 "sequence_feature", | |
74 "recombination_feature", | |
75 "sequence_alteration", | |
76 "binding_site", | |
77 ): | |
78 yield feature | |
79 elif feature.type in ("CDS",): | |
80 pass | |
81 else: | |
82 yield feature | |
83 | |
84 | |
85 def fminmax(feature): | |
86 fmin = None | |
87 fmax = None | |
88 for sf in feature_lambda([feature], feature_test_true, {}, subfeatures=True): | |
89 if fmin is None: | |
90 fmin = sf.location.start | |
91 fmax = sf.location.end | |
92 if sf.location.start < fmin: | |
93 fmin = sf.location.start | |
94 if sf.location.end > fmax: | |
95 fmax = sf.location.end | |
96 return fmin, fmax | |
97 | |
98 | |
99 def fix_gene_boundaries(feature): | |
100 # There is a frustrating bug in apollo whereby we have created gene | |
101 # features which are LARGER than expected, but we cannot see this. | |
102 # We only see a perfect sized gene + great SD together. | |
103 # | |
104 # So, we have this awful hack to clamp the location of the gene | |
105 # feature to the contained mRNAs. This is good enough for now. | |
106 fmin, fmax = fminmax(feature) | |
107 if feature.location.strand > 0: | |
108 feature.location = FeatureLocation(fmin, fmax, strand=1) | |
109 else: | |
110 feature.location = FeatureLocation(fmin, fmax, strand=-1) | |
111 return feature | |
112 | |
113 | |
114 def fix_gene_qualifiers(name, feature, fid): | |
115 for mRNA in feature.sub_features: | |
116 mRNA.qualifiers["locus_tag"] = "CPT_%s_%03d" % (name, fid) | |
117 # And some exons below that | |
118 sf_replacement = [] | |
119 for sf in mRNA.sub_features: | |
120 # We set a locus_tag on ALL sub features | |
121 sf.qualifiers["locus_tag"] = "CPT_%s_%03d" % (name, fid) | |
122 # Remove Names which are UUIDs | |
123 # NOT GOOD PRACTICE | |
124 try: | |
125 if is_uuid(sf.qualifiers["Name"][0]): | |
126 del sf.qualifiers["Name"] | |
127 except KeyError: | |
128 continue # might should go back to pass, I have not put thought into this still | |
129 | |
130 # If it is the RBS exon (mis-labelled by apollo as 'exon') | |
131 if sf.type == "exon" and len(sf) < 10: | |
132 sf.type = "Shine_Dalgarno_sequence" | |
133 sf_replacement.append(sf) | |
134 # and if it is the CDS | |
135 elif sf.type == "CDS": | |
136 # Update CDS qualifiers with all info that was on parent | |
137 sf.qualifiers.update(feature.qualifiers) | |
138 sf_replacement.append(sf) | |
139 else: | |
140 sf_replacement.append(sf) | |
141 if mRNA.type == "tRNA": | |
142 mRNA.qualifiers["product"] = mRNA.qualifiers["Name"] | |
143 # Handle multiple child CDS features by merging them. | |
144 # Replace the subfeatures on the mRNA | |
145 mRNA.sub_features = merge_multi_cds(sf_replacement) | |
146 return feature | |
147 | |
148 | |
149 def fix_frameshifted(features): | |
150 logging.info("Fixing Frameshifted group: [%s]", str(features)) | |
151 genes = features | |
152 # Find all mRNAs (plus reduce nested list into flattened one) | |
153 mRNAs = sum([f.sub_features for f in genes], []) | |
154 # Find all CDSs (plus reduce nested list into flattened one) | |
155 cdss = sum([m.sub_features for m in mRNAs], []) | |
156 # List to store the RBSs which we'll break apart + re-attach later. | |
157 rbss = [] | |
158 # List to store all of the CDSs (i.e. cdss - rbss) | |
159 cdss2 = [] | |
160 # Copy genes + clean out subfeatures. We'll re-use these constructs. | |
161 fixed_features = copy.deepcopy(genes) | |
162 for f in fixed_features: | |
163 f.sub_features = [] | |
164 # Copy / empty out mRNAs | |
165 fixed_mrnas = copy.deepcopy(mRNAs) | |
166 for f in fixed_mrnas: | |
167 f.sub_features = [] | |
168 f.qualifiers = {} | |
169 # Fill rbss + cdss2 | |
170 for cds in cdss: | |
171 if "frameshift" in cds.qualifiers: | |
172 del cds.qualifiers["frameshift"] | |
173 # Ignore short features, as those are RBSs | |
174 if len(cds) < 15: | |
175 rbss.append(cds) | |
176 continue | |
177 # Otherwise cdss. | |
178 else: | |
179 cdss2.append(cds) | |
180 # Ok, now have cdss2 to deal with. | |
181 other = [] | |
182 # Find the two with least value for distance between end / start (strand aware). | |
183 # For every possible pair, we'll check their distance | |
184 match_data = {} | |
185 for (a, b) in itertools.permutations(cdss2, 2): | |
186 if a.location.start < b.location.start: | |
187 # A is downstream of B | |
188 match_data[(a, b)] = b.location.start - a.location.end | |
189 else: | |
190 match_data[(a, b)] = a.location.start - b.location.end | |
191 | |
192 # Now we'll find the features which are closest in terms of start/end | |
193 ((merge_a, merge_b), value) = max(match_data.items(), key=lambda kv: kv[1]) | |
194 # And get the non-matching features into other | |
195 for f in cdss2: | |
196 if f != merge_a and f != merge_b: | |
197 other.append(f) | |
198 # Back to the merge_a/b | |
199 # With those, we'll merge them into one feature, and discard the other. | |
200 merge_a.location = CompoundLocation([merge_a.location, merge_b.location]) | |
201 # The gene + RBSs should be identical and two/two. | |
202 assert len(fixed_features) == 2 | |
203 # If not, we can just duplicate the RBS, doesn't matter. | |
204 noRBS = len(rbss) == 0 | |
205 if len(rbss) != 2 and not noRBS: | |
206 rbss = [rbss[0], copy.deepcopy(rbss[0])] | |
207 # Now re-construct. | |
208 gene_0 = fixed_features[0] | |
209 gene_1 = fixed_features[1] | |
210 mRNA_0 = fixed_mrnas[0] | |
211 mRNA_1 = fixed_mrnas[1] | |
212 | |
213 if not noRBS: | |
214 mRNA_0.sub_features = [rbss[0], merge_a] | |
215 mRNA_1.sub_features = other + [rbss[1]] | |
216 else: | |
217 mRNA_0.sub_features = [merge_a] | |
218 mRNA_1.sub_features = other | |
219 mRNA_0 = fix_gene_boundaries(mRNA_0) | |
220 mRNA_1 = fix_gene_boundaries(mRNA_1) | |
221 | |
222 gene_0.sub_features = [mRNA_0] | |
223 gene_1.sub_features = [mRNA_1] | |
224 gene_0 = fix_gene_boundaries(gene_0) | |
225 gene_1 = fix_gene_boundaries(gene_1) | |
226 | |
227 return fixed_features | |
228 | |
229 | |
230 def fix_frameshifts(features): | |
231 # Collect all gene features where at least one subfeature has a | |
232 # frameshift=??? annotation. | |
233 def has_frameshift_qual(f): | |
234 return ( | |
235 len( | |
236 list( | |
237 feature_lambda( | |
238 f.sub_features, feature_test_quals, {"frameshift": None} | |
239 ) | |
240 ) | |
241 ) | |
242 > 0 | |
243 ) | |
244 | |
245 def has_frameshift_qual_val(f, val): | |
246 return ( | |
247 len( | |
248 list( | |
249 feature_lambda( | |
250 f.sub_features, feature_test_quals, {"frameshift": val} | |
251 ) | |
252 ) | |
253 ) | |
254 > 0 | |
255 ) | |
256 | |
257 def get_frameshift_qual(f): | |
258 for f in feature_lambda( | |
259 f.sub_features, feature_test_quals, {"frameshift": None} | |
260 ): | |
261 return f.qualifiers["frameshift"] | |
262 | |
263 to_frameshift = [x for x in features if x.type == "gene" and has_frameshift_qual(x)] | |
264 fixed = [x for x in features if x not in to_frameshift] | |
265 | |
266 frameshift_keys = set(sum(map(get_frameshift_qual, to_frameshift), [])) | |
267 for key in frameshift_keys: | |
268 # Get features matching that key | |
269 current = [x for x in to_frameshift if has_frameshift_qual_val(x, key)] | |
270 # Fix them and append them | |
271 fixed += fix_frameshifted(current) | |
272 | |
273 return fixed | |
274 | |
275 | |
276 def remove_useless_features(features): | |
277 # Drop mRNAs, apollo crap, useless CDSs | |
278 for f in features: | |
279 if f.type in ( | |
280 "non_canonical_three_prime_splice_site", | |
281 "non_canonical_five_prime_splice_site", | |
282 "stop_codon_read_through", | |
283 "mRNA", | |
284 "exon", | |
285 ): | |
286 continue | |
287 else: | |
288 if f.type == "CDS" and len(f) < 10: | |
289 # Another RBS mistake | |
290 continue | |
291 # We use the full GO term, but it should be less than that. | |
292 if f.type == "Shine_Dalgarno_sequence": | |
293 f.type = "RBS" | |
294 | |
295 if f.type == "sequence_feature": | |
296 f.type = "misc_feature" | |
297 | |
298 if f.type == "recombination_feature": | |
299 f.type = "misc_recomb" | |
300 | |
301 if f.type == "sequence_alteration": | |
302 f.type = "variation" | |
303 | |
304 if f.type == "binding_site": | |
305 f.type = "misc_binding" | |
306 | |
307 yield f | |
308 | |
309 | |
310 def merge_multi_cds(mRNA_sf): | |
311 cdss = [x for x in mRNA_sf if x.type == "CDS"] | |
312 non_cdss = [x for x in mRNA_sf if x.type != "CDS"] | |
313 if len(cdss) <= 1: | |
314 return non_cdss + cdss | |
315 else: | |
316 # Grab all locations, and sort them so we can work with them rationally. | |
317 locations = sorted([x.location for x in cdss], key=lambda y: y.start) | |
318 # Pick randomly a main CDS | |
319 main_cds = cdss[0] | |
320 # We'll merge the other CDSs into this one. | |
321 main_cds.location = CompoundLocation(locations) | |
322 return non_cdss + [main_cds] | |
323 | |
324 | |
325 def handle_record(record, transltbl): | |
326 full_feats = [] | |
327 for feature in fsort(record.features): | |
328 if ( | |
329 feature.type == "region" | |
330 and "source" in feature.qualifiers | |
331 and "GenBank" in feature.qualifiers["source"] | |
332 ): | |
333 feature.type = "source" | |
334 | |
335 if "comment1" in feature.qualifiers: | |
336 del feature.qualifiers["comment1"] | |
337 | |
338 if "Note" in feature.qualifiers: | |
339 record.annotations = feature.qualifiers | |
340 if len(feature.qualifiers["Note"]) > 1: | |
341 record.annotations["comment"] = feature.qualifiers["Note"][1] | |
342 del feature.qualifiers["Note"] | |
343 | |
344 if "comment" in feature.qualifiers: | |
345 del feature.qualifiers["comment"] | |
346 | |
347 # We'll work on a separate copy of features to avoid modifying a list | |
348 # we're iterating over | |
349 replacement_feats = [] | |
350 replacement_feats += list(handle_non_gene_features(record.features)) | |
351 | |
352 # Renumbering requires sorting | |
353 fid = 0 | |
354 for feature in fsort( | |
355 feature_lambda( | |
356 record.features, feature_test_type, {"type": "gene"}, subfeatures=True | |
357 ) | |
358 ): | |
359 # Our modifications only involve genes | |
360 fid += 1 | |
361 | |
362 feature = fix_gene_boundaries(feature) | |
363 # Which have mRNAs we'll drop later | |
364 feature = fix_gene_qualifiers(record.id, feature, fid) | |
365 | |
366 # Wipe out the parent gene's data, leaving only a locus_tag | |
367 feature.qualifiers = {"locus_tag": "CPT_%s_%03d" % (record.id, fid)} | |
368 | |
369 # Patch our features back in (even if they're non-gene features) | |
370 replacement_feats.append(feature) | |
371 | |
372 replacement_feats = fix_frameshifts(replacement_feats) | |
373 # exit(0) | |
374 flat_features = feature_lambda( | |
375 replacement_feats, lambda x: True, {}, subfeatures=True | |
376 ) | |
377 | |
378 flat_features = remove_useless_features(flat_features) | |
379 | |
380 # Meat of our modifications | |
381 for flat_feat in flat_features: | |
382 # Try and figure out a name. We gave conflicting instructions, so | |
383 # this isn't as trivial as it should be. | |
384 protein_product = wa_unified_product_name(flat_feat) | |
385 | |
386 for x in ( | |
387 "source", | |
388 "phase", | |
389 "Parent", | |
390 "ID", | |
391 "owner", | |
392 "date_creation", | |
393 "date_last_modified", | |
394 "datasetSource", | |
395 ): | |
396 if x in flat_feat.qualifiers: | |
397 if x == "ID": | |
398 flat_feat._ID = flat_feat.qualifiers["ID"] | |
399 del flat_feat.qualifiers[x] | |
400 | |
401 # Add product tag | |
402 if flat_feat.type == "CDS": | |
403 flat_feat.qualifiers["product"] = [protein_product] | |
404 flat_feat.qualifiers["transl_table"] = [transltbl] | |
405 if "Product" in flat_feat.qualifiers: | |
406 del flat_feat.qualifiers["Product"] | |
407 elif flat_feat.type == "RBS": | |
408 if "locus_tag" not in flat_feat.qualifiers.keys(): | |
409 continue | |
410 | |
411 elif flat_feat.type == "terminator": | |
412 flat_feat.type = "regulatory" | |
413 flat_feat.qualifiers = {"regulatory_class": "terminator"} | |
414 | |
415 # In genbank format, note is lower case. | |
416 flat_feat.qualifiers = rename_key(flat_feat.qualifiers, "Note", "note") | |
417 flat_feat.qualifiers = rename_key(flat_feat.qualifiers, "description", "note") | |
418 flat_feat.qualifiers = rename_key(flat_feat.qualifiers, "protein", "note") | |
419 flat_feat.qualifiers = rename_key(flat_feat.qualifiers, "Dbxref", "db_xref") | |
420 if "Name" in flat_feat.qualifiers: | |
421 del flat_feat.qualifiers["Name"] | |
422 | |
423 # more apollo nonsense | |
424 if "Manually set translation start" in flat_feat.qualifiers.get("note", []): | |
425 flat_feat.qualifiers["note"].remove("Manually set translation start") | |
426 | |
427 # Append the feature | |
428 full_feats.append(flat_feat) | |
429 | |
430 # Update our features | |
431 record.features = fsort(full_feats) | |
432 # Strip off record names that would cause crashes. | |
433 record.name = record.name[0:16] | |
434 return record | |
435 | |
436 | |
437 if __name__ == "__main__": | |
438 # Grab all of the filters from our plugin loader | |
439 parser = argparse.ArgumentParser(description="Convert gff3 to gbk") | |
440 parser.add_argument("gff_file", type=argparse.FileType("r"), help="GFF3 file") | |
441 parser.add_argument("fasta_file", type=argparse.FileType("r"), help="Fasta Input") | |
442 parser.add_argument( | |
443 "--transltbl", | |
444 type=int, | |
445 default=11, | |
446 help="Translation Table choice for CDS tag, default 11", | |
447 ) | |
448 args = parser.parse_args() | |
449 | |
450 for record in gff3_to_genbank(**vars(args)): | |
451 record.annotations["molecule_type"] = "DNA" | |
452 # record.seq.alphabet = generic_dna | |
453 SeqIO.write([record], sys.stdout, "genbank") |