comparison gff2gb.py @ 3:c8fcb7246ac3 draft

planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author cpt
date Mon, 05 Jun 2023 02:44:32 +0000
parents
children
comparison
equal deleted inserted replaced
2:6795d3349462 3:c8fcb7246ac3
1 #!/usr/bin/env python
2 """Convert a GFF and associated FASTA file into GenBank format.
3
4 Usage:
5 gff_to_genbank.py <GFF annotation file> <FASTA sequence file>
6 """
7 import argparse
8 import sys
9 import re
10 import copy
11 import itertools
12 import logging
13 from Bio import SeqIO
14
15 # from Bio.Alphabet import generic_dna
16 from Bio.SeqFeature import CompoundLocation, FeatureLocation
17 from CPT_GFFParser import gffParse, gffWrite
18 from gff3 import (
19 feature_lambda,
20 wa_unified_product_name,
21 is_uuid,
22 feature_test_type,
23 fsort,
24 feature_test_true,
25 feature_test_quals,
26 )
27
28 default_name = re.compile(r"^gene_(\d+)$")
29 logging.basicConfig(level=logging.INFO)
30
31
32 def rename_key(ds, k_f, k_t):
33 """Rename a key in a dictionary and return it, FP style"""
34 # If they key is not in the dictionary, just return immediately
35 if k_f not in ds:
36 return ds
37
38 # Otherwise, we check if the target key is in there
39 if k_t in ds:
40 # If it is, we need to append
41 ds[k_t] += ds[k_f]
42 else:
43 # if not, we can just set.
44 ds[k_t] = ds[k_f]
45
46 # Remove source
47 del ds[k_f]
48 return ds
49
50
51 def gff3_to_genbank(gff_file, fasta_file, transltbl):
52 fasta_input = SeqIO.to_dict(SeqIO.parse(fasta_file, "fasta")) # , generic_dna))
53 gff_iter = gffParse(gff_file, fasta_input)
54
55 for record in gff_iter:
56 yield handle_record(record, transltbl)
57
58
59 def handle_non_gene_features(features):
60 # These are NON-GENE features (maybe terminators? etc?)
61 for feature in feature_lambda(
62 features,
63 feature_test_type,
64 {"type": "gene"},
65 subfeatures=False,
66 invert=True,
67 recurse=True, # used to catch RBS from new apollo runs (used to be False)
68 ):
69 if feature.type in (
70 "terminator",
71 "tRNA",
72 "Shine_Dalgarno_sequence",
73 "sequence_feature",
74 "recombination_feature",
75 "sequence_alteration",
76 "binding_site",
77 ):
78 yield feature
79 elif feature.type in ("CDS",):
80 pass
81 else:
82 yield feature
83
84
85 def fminmax(feature):
86 fmin = None
87 fmax = None
88 for sf in feature_lambda([feature], feature_test_true, {}, subfeatures=True):
89 if fmin is None:
90 fmin = sf.location.start
91 fmax = sf.location.end
92 if sf.location.start < fmin:
93 fmin = sf.location.start
94 if sf.location.end > fmax:
95 fmax = sf.location.end
96 return fmin, fmax
97
98
99 def fix_gene_boundaries(feature):
100 # There is a frustrating bug in apollo whereby we have created gene
101 # features which are LARGER than expected, but we cannot see this.
102 # We only see a perfect sized gene + great SD together.
103 #
104 # So, we have this awful hack to clamp the location of the gene
105 # feature to the contained mRNAs. This is good enough for now.
106 fmin, fmax = fminmax(feature)
107 if feature.location.strand > 0:
108 feature.location = FeatureLocation(fmin, fmax, strand=1)
109 else:
110 feature.location = FeatureLocation(fmin, fmax, strand=-1)
111 return feature
112
113
114 def fix_gene_qualifiers(name, feature, fid):
115 for mRNA in feature.sub_features:
116 mRNA.qualifiers["locus_tag"] = "CPT_%s_%03d" % (name, fid)
117 # And some exons below that
118 sf_replacement = []
119 for sf in mRNA.sub_features:
120 # We set a locus_tag on ALL sub features
121 sf.qualifiers["locus_tag"] = "CPT_%s_%03d" % (name, fid)
122 # Remove Names which are UUIDs
123 # NOT GOOD PRACTICE
124 try:
125 if is_uuid(sf.qualifiers["Name"][0]):
126 del sf.qualifiers["Name"]
127 except KeyError:
128 continue # might should go back to pass, I have not put thought into this still
129
130 # If it is the RBS exon (mis-labelled by apollo as 'exon')
131 if sf.type == "exon" and len(sf) < 10:
132 sf.type = "Shine_Dalgarno_sequence"
133 sf_replacement.append(sf)
134 # and if it is the CDS
135 elif sf.type == "CDS":
136 # Update CDS qualifiers with all info that was on parent
137 sf.qualifiers.update(feature.qualifiers)
138 sf_replacement.append(sf)
139 else:
140 sf_replacement.append(sf)
141 if mRNA.type == "tRNA":
142 mRNA.qualifiers["product"] = mRNA.qualifiers["Name"]
143 # Handle multiple child CDS features by merging them.
144 # Replace the subfeatures on the mRNA
145 mRNA.sub_features = merge_multi_cds(sf_replacement)
146 return feature
147
148
149 def fix_frameshifted(features):
150 logging.info("Fixing Frameshifted group: [%s]", str(features))
151 genes = features
152 # Find all mRNAs (plus reduce nested list into flattened one)
153 mRNAs = sum([f.sub_features for f in genes], [])
154 # Find all CDSs (plus reduce nested list into flattened one)
155 cdss = sum([m.sub_features for m in mRNAs], [])
156 # List to store the RBSs which we'll break apart + re-attach later.
157 rbss = []
158 # List to store all of the CDSs (i.e. cdss - rbss)
159 cdss2 = []
160 # Copy genes + clean out subfeatures. We'll re-use these constructs.
161 fixed_features = copy.deepcopy(genes)
162 for f in fixed_features:
163 f.sub_features = []
164 # Copy / empty out mRNAs
165 fixed_mrnas = copy.deepcopy(mRNAs)
166 for f in fixed_mrnas:
167 f.sub_features = []
168 f.qualifiers = {}
169 # Fill rbss + cdss2
170 for cds in cdss:
171 if "frameshift" in cds.qualifiers:
172 del cds.qualifiers["frameshift"]
173 # Ignore short features, as those are RBSs
174 if len(cds) < 15:
175 rbss.append(cds)
176 continue
177 # Otherwise cdss.
178 else:
179 cdss2.append(cds)
180 # Ok, now have cdss2 to deal with.
181 other = []
182 # Find the two with least value for distance between end / start (strand aware).
183 # For every possible pair, we'll check their distance
184 match_data = {}
185 for (a, b) in itertools.permutations(cdss2, 2):
186 if a.location.start < b.location.start:
187 # A is downstream of B
188 match_data[(a, b)] = b.location.start - a.location.end
189 else:
190 match_data[(a, b)] = a.location.start - b.location.end
191
192 # Now we'll find the features which are closest in terms of start/end
193 ((merge_a, merge_b), value) = max(match_data.items(), key=lambda kv: kv[1])
194 # And get the non-matching features into other
195 for f in cdss2:
196 if f != merge_a and f != merge_b:
197 other.append(f)
198 # Back to the merge_a/b
199 # With those, we'll merge them into one feature, and discard the other.
200 merge_a.location = CompoundLocation([merge_a.location, merge_b.location])
201 # The gene + RBSs should be identical and two/two.
202 assert len(fixed_features) == 2
203 # If not, we can just duplicate the RBS, doesn't matter.
204 noRBS = len(rbss) == 0
205 if len(rbss) != 2 and not noRBS:
206 rbss = [rbss[0], copy.deepcopy(rbss[0])]
207 # Now re-construct.
208 gene_0 = fixed_features[0]
209 gene_1 = fixed_features[1]
210 mRNA_0 = fixed_mrnas[0]
211 mRNA_1 = fixed_mrnas[1]
212
213 if not noRBS:
214 mRNA_0.sub_features = [rbss[0], merge_a]
215 mRNA_1.sub_features = other + [rbss[1]]
216 else:
217 mRNA_0.sub_features = [merge_a]
218 mRNA_1.sub_features = other
219 mRNA_0 = fix_gene_boundaries(mRNA_0)
220 mRNA_1 = fix_gene_boundaries(mRNA_1)
221
222 gene_0.sub_features = [mRNA_0]
223 gene_1.sub_features = [mRNA_1]
224 gene_0 = fix_gene_boundaries(gene_0)
225 gene_1 = fix_gene_boundaries(gene_1)
226
227 return fixed_features
228
229
230 def fix_frameshifts(features):
231 # Collect all gene features where at least one subfeature has a
232 # frameshift=??? annotation.
233 def has_frameshift_qual(f):
234 return (
235 len(
236 list(
237 feature_lambda(
238 f.sub_features, feature_test_quals, {"frameshift": None}
239 )
240 )
241 )
242 > 0
243 )
244
245 def has_frameshift_qual_val(f, val):
246 return (
247 len(
248 list(
249 feature_lambda(
250 f.sub_features, feature_test_quals, {"frameshift": val}
251 )
252 )
253 )
254 > 0
255 )
256
257 def get_frameshift_qual(f):
258 for f in feature_lambda(
259 f.sub_features, feature_test_quals, {"frameshift": None}
260 ):
261 return f.qualifiers["frameshift"]
262
263 to_frameshift = [x for x in features if x.type == "gene" and has_frameshift_qual(x)]
264 fixed = [x for x in features if x not in to_frameshift]
265
266 frameshift_keys = set(sum(map(get_frameshift_qual, to_frameshift), []))
267 for key in frameshift_keys:
268 # Get features matching that key
269 current = [x for x in to_frameshift if has_frameshift_qual_val(x, key)]
270 # Fix them and append them
271 fixed += fix_frameshifted(current)
272
273 return fixed
274
275
276 def remove_useless_features(features):
277 # Drop mRNAs, apollo crap, useless CDSs
278 for f in features:
279 if f.type in (
280 "non_canonical_three_prime_splice_site",
281 "non_canonical_five_prime_splice_site",
282 "stop_codon_read_through",
283 "mRNA",
284 "exon",
285 ):
286 continue
287 else:
288 if f.type == "CDS" and len(f) < 10:
289 # Another RBS mistake
290 continue
291 # We use the full GO term, but it should be less than that.
292 if f.type == "Shine_Dalgarno_sequence":
293 f.type = "RBS"
294
295 if f.type == "sequence_feature":
296 f.type = "misc_feature"
297
298 if f.type == "recombination_feature":
299 f.type = "misc_recomb"
300
301 if f.type == "sequence_alteration":
302 f.type = "variation"
303
304 if f.type == "binding_site":
305 f.type = "misc_binding"
306
307 yield f
308
309
310 def merge_multi_cds(mRNA_sf):
311 cdss = [x for x in mRNA_sf if x.type == "CDS"]
312 non_cdss = [x for x in mRNA_sf if x.type != "CDS"]
313 if len(cdss) <= 1:
314 return non_cdss + cdss
315 else:
316 # Grab all locations, and sort them so we can work with them rationally.
317 locations = sorted([x.location for x in cdss], key=lambda y: y.start)
318 # Pick randomly a main CDS
319 main_cds = cdss[0]
320 # We'll merge the other CDSs into this one.
321 main_cds.location = CompoundLocation(locations)
322 return non_cdss + [main_cds]
323
324
325 def handle_record(record, transltbl):
326 full_feats = []
327 for feature in fsort(record.features):
328 if (
329 feature.type == "region"
330 and "source" in feature.qualifiers
331 and "GenBank" in feature.qualifiers["source"]
332 ):
333 feature.type = "source"
334
335 if "comment1" in feature.qualifiers:
336 del feature.qualifiers["comment1"]
337
338 if "Note" in feature.qualifiers:
339 record.annotations = feature.qualifiers
340 if len(feature.qualifiers["Note"]) > 1:
341 record.annotations["comment"] = feature.qualifiers["Note"][1]
342 del feature.qualifiers["Note"]
343
344 if "comment" in feature.qualifiers:
345 del feature.qualifiers["comment"]
346
347 # We'll work on a separate copy of features to avoid modifying a list
348 # we're iterating over
349 replacement_feats = []
350 replacement_feats += list(handle_non_gene_features(record.features))
351
352 # Renumbering requires sorting
353 fid = 0
354 for feature in fsort(
355 feature_lambda(
356 record.features, feature_test_type, {"type": "gene"}, subfeatures=True
357 )
358 ):
359 # Our modifications only involve genes
360 fid += 1
361
362 feature = fix_gene_boundaries(feature)
363 # Which have mRNAs we'll drop later
364 feature = fix_gene_qualifiers(record.id, feature, fid)
365
366 # Wipe out the parent gene's data, leaving only a locus_tag
367 feature.qualifiers = {"locus_tag": "CPT_%s_%03d" % (record.id, fid)}
368
369 # Patch our features back in (even if they're non-gene features)
370 replacement_feats.append(feature)
371
372 replacement_feats = fix_frameshifts(replacement_feats)
373 # exit(0)
374 flat_features = feature_lambda(
375 replacement_feats, lambda x: True, {}, subfeatures=True
376 )
377
378 flat_features = remove_useless_features(flat_features)
379
380 # Meat of our modifications
381 for flat_feat in flat_features:
382 # Try and figure out a name. We gave conflicting instructions, so
383 # this isn't as trivial as it should be.
384 protein_product = wa_unified_product_name(flat_feat)
385
386 for x in (
387 "source",
388 "phase",
389 "Parent",
390 "ID",
391 "owner",
392 "date_creation",
393 "date_last_modified",
394 "datasetSource",
395 ):
396 if x in flat_feat.qualifiers:
397 if x == "ID":
398 flat_feat._ID = flat_feat.qualifiers["ID"]
399 del flat_feat.qualifiers[x]
400
401 # Add product tag
402 if flat_feat.type == "CDS":
403 flat_feat.qualifiers["product"] = [protein_product]
404 flat_feat.qualifiers["transl_table"] = [transltbl]
405 if "Product" in flat_feat.qualifiers:
406 del flat_feat.qualifiers["Product"]
407 elif flat_feat.type == "RBS":
408 if "locus_tag" not in flat_feat.qualifiers.keys():
409 continue
410
411 elif flat_feat.type == "terminator":
412 flat_feat.type = "regulatory"
413 flat_feat.qualifiers = {"regulatory_class": "terminator"}
414
415 # In genbank format, note is lower case.
416 flat_feat.qualifiers = rename_key(flat_feat.qualifiers, "Note", "note")
417 flat_feat.qualifiers = rename_key(flat_feat.qualifiers, "description", "note")
418 flat_feat.qualifiers = rename_key(flat_feat.qualifiers, "protein", "note")
419 flat_feat.qualifiers = rename_key(flat_feat.qualifiers, "Dbxref", "db_xref")
420 if "Name" in flat_feat.qualifiers:
421 del flat_feat.qualifiers["Name"]
422
423 # more apollo nonsense
424 if "Manually set translation start" in flat_feat.qualifiers.get("note", []):
425 flat_feat.qualifiers["note"].remove("Manually set translation start")
426
427 # Append the feature
428 full_feats.append(flat_feat)
429
430 # Update our features
431 record.features = fsort(full_feats)
432 # Strip off record names that would cause crashes.
433 record.name = record.name[0:16]
434 return record
435
436
437 if __name__ == "__main__":
438 # Grab all of the filters from our plugin loader
439 parser = argparse.ArgumentParser(description="Convert gff3 to gbk")
440 parser.add_argument("gff_file", type=argparse.FileType("r"), help="GFF3 file")
441 parser.add_argument("fasta_file", type=argparse.FileType("r"), help="Fasta Input")
442 parser.add_argument(
443 "--transltbl",
444 type=int,
445 default=11,
446 help="Translation Table choice for CDS tag, default 11",
447 )
448 args = parser.parse_args()
449
450 for record in gff3_to_genbank(**vars(args)):
451 record.annotations["molecule_type"] = "DNA"
452 # record.seq.alphabet = generic_dna
453 SeqIO.write([record], sys.stdout, "genbank")