Mercurial > repos > cpt > cpt_gff_to_gbk
diff gff2gb.py @ 3:c8fcb7246ac3 draft
planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author | cpt |
---|---|
date | Mon, 05 Jun 2023 02:44:32 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gff2gb.py Mon Jun 05 02:44:32 2023 +0000 @@ -0,0 +1,453 @@ +#!/usr/bin/env python +"""Convert a GFF and associated FASTA file into GenBank format. + +Usage: +gff_to_genbank.py <GFF annotation file> <FASTA sequence file> +""" +import argparse +import sys +import re +import copy +import itertools +import logging +from Bio import SeqIO + +# from Bio.Alphabet import generic_dna +from Bio.SeqFeature import CompoundLocation, FeatureLocation +from CPT_GFFParser import gffParse, gffWrite +from gff3 import ( + feature_lambda, + wa_unified_product_name, + is_uuid, + feature_test_type, + fsort, + feature_test_true, + feature_test_quals, +) + +default_name = re.compile(r"^gene_(\d+)$") +logging.basicConfig(level=logging.INFO) + + +def rename_key(ds, k_f, k_t): + """Rename a key in a dictionary and return it, FP style""" + # If they key is not in the dictionary, just return immediately + if k_f not in ds: + return ds + + # Otherwise, we check if the target key is in there + if k_t in ds: + # If it is, we need to append + ds[k_t] += ds[k_f] + else: + # if not, we can just set. + ds[k_t] = ds[k_f] + + # Remove source + del ds[k_f] + return ds + + +def gff3_to_genbank(gff_file, fasta_file, transltbl): + fasta_input = SeqIO.to_dict(SeqIO.parse(fasta_file, "fasta")) # , generic_dna)) + gff_iter = gffParse(gff_file, fasta_input) + + for record in gff_iter: + yield handle_record(record, transltbl) + + +def handle_non_gene_features(features): + # These are NON-GENE features (maybe terminators? etc?) + for feature in feature_lambda( + features, + feature_test_type, + {"type": "gene"}, + subfeatures=False, + invert=True, + recurse=True, # used to catch RBS from new apollo runs (used to be False) + ): + if feature.type in ( + "terminator", + "tRNA", + "Shine_Dalgarno_sequence", + "sequence_feature", + "recombination_feature", + "sequence_alteration", + "binding_site", + ): + yield feature + elif feature.type in ("CDS",): + pass + else: + yield feature + + +def fminmax(feature): + fmin = None + fmax = None + for sf in feature_lambda([feature], feature_test_true, {}, subfeatures=True): + if fmin is None: + fmin = sf.location.start + fmax = sf.location.end + if sf.location.start < fmin: + fmin = sf.location.start + if sf.location.end > fmax: + fmax = sf.location.end + return fmin, fmax + + +def fix_gene_boundaries(feature): + # There is a frustrating bug in apollo whereby we have created gene + # features which are LARGER than expected, but we cannot see this. + # We only see a perfect sized gene + great SD together. + # + # So, we have this awful hack to clamp the location of the gene + # feature to the contained mRNAs. This is good enough for now. + fmin, fmax = fminmax(feature) + if feature.location.strand > 0: + feature.location = FeatureLocation(fmin, fmax, strand=1) + else: + feature.location = FeatureLocation(fmin, fmax, strand=-1) + return feature + + +def fix_gene_qualifiers(name, feature, fid): + for mRNA in feature.sub_features: + mRNA.qualifiers["locus_tag"] = "CPT_%s_%03d" % (name, fid) + # And some exons below that + sf_replacement = [] + for sf in mRNA.sub_features: + # We set a locus_tag on ALL sub features + sf.qualifiers["locus_tag"] = "CPT_%s_%03d" % (name, fid) + # Remove Names which are UUIDs + # NOT GOOD PRACTICE + try: + if is_uuid(sf.qualifiers["Name"][0]): + del sf.qualifiers["Name"] + except KeyError: + continue # might should go back to pass, I have not put thought into this still + + # If it is the RBS exon (mis-labelled by apollo as 'exon') + if sf.type == "exon" and len(sf) < 10: + sf.type = "Shine_Dalgarno_sequence" + sf_replacement.append(sf) + # and if it is the CDS + elif sf.type == "CDS": + # Update CDS qualifiers with all info that was on parent + sf.qualifiers.update(feature.qualifiers) + sf_replacement.append(sf) + else: + sf_replacement.append(sf) + if mRNA.type == "tRNA": + mRNA.qualifiers["product"] = mRNA.qualifiers["Name"] + # Handle multiple child CDS features by merging them. + # Replace the subfeatures on the mRNA + mRNA.sub_features = merge_multi_cds(sf_replacement) + return feature + + +def fix_frameshifted(features): + logging.info("Fixing Frameshifted group: [%s]", str(features)) + genes = features + # Find all mRNAs (plus reduce nested list into flattened one) + mRNAs = sum([f.sub_features for f in genes], []) + # Find all CDSs (plus reduce nested list into flattened one) + cdss = sum([m.sub_features for m in mRNAs], []) + # List to store the RBSs which we'll break apart + re-attach later. + rbss = [] + # List to store all of the CDSs (i.e. cdss - rbss) + cdss2 = [] + # Copy genes + clean out subfeatures. We'll re-use these constructs. + fixed_features = copy.deepcopy(genes) + for f in fixed_features: + f.sub_features = [] + # Copy / empty out mRNAs + fixed_mrnas = copy.deepcopy(mRNAs) + for f in fixed_mrnas: + f.sub_features = [] + f.qualifiers = {} + # Fill rbss + cdss2 + for cds in cdss: + if "frameshift" in cds.qualifiers: + del cds.qualifiers["frameshift"] + # Ignore short features, as those are RBSs + if len(cds) < 15: + rbss.append(cds) + continue + # Otherwise cdss. + else: + cdss2.append(cds) + # Ok, now have cdss2 to deal with. + other = [] + # Find the two with least value for distance between end / start (strand aware). + # For every possible pair, we'll check their distance + match_data = {} + for (a, b) in itertools.permutations(cdss2, 2): + if a.location.start < b.location.start: + # A is downstream of B + match_data[(a, b)] = b.location.start - a.location.end + else: + match_data[(a, b)] = a.location.start - b.location.end + + # Now we'll find the features which are closest in terms of start/end + ((merge_a, merge_b), value) = max(match_data.items(), key=lambda kv: kv[1]) + # And get the non-matching features into other + for f in cdss2: + if f != merge_a and f != merge_b: + other.append(f) + # Back to the merge_a/b + # With those, we'll merge them into one feature, and discard the other. + merge_a.location = CompoundLocation([merge_a.location, merge_b.location]) + # The gene + RBSs should be identical and two/two. + assert len(fixed_features) == 2 + # If not, we can just duplicate the RBS, doesn't matter. + noRBS = len(rbss) == 0 + if len(rbss) != 2 and not noRBS: + rbss = [rbss[0], copy.deepcopy(rbss[0])] + # Now re-construct. + gene_0 = fixed_features[0] + gene_1 = fixed_features[1] + mRNA_0 = fixed_mrnas[0] + mRNA_1 = fixed_mrnas[1] + + if not noRBS: + mRNA_0.sub_features = [rbss[0], merge_a] + mRNA_1.sub_features = other + [rbss[1]] + else: + mRNA_0.sub_features = [merge_a] + mRNA_1.sub_features = other + mRNA_0 = fix_gene_boundaries(mRNA_0) + mRNA_1 = fix_gene_boundaries(mRNA_1) + + gene_0.sub_features = [mRNA_0] + gene_1.sub_features = [mRNA_1] + gene_0 = fix_gene_boundaries(gene_0) + gene_1 = fix_gene_boundaries(gene_1) + + return fixed_features + + +def fix_frameshifts(features): + # Collect all gene features where at least one subfeature has a + # frameshift=??? annotation. + def has_frameshift_qual(f): + return ( + len( + list( + feature_lambda( + f.sub_features, feature_test_quals, {"frameshift": None} + ) + ) + ) + > 0 + ) + + def has_frameshift_qual_val(f, val): + return ( + len( + list( + feature_lambda( + f.sub_features, feature_test_quals, {"frameshift": val} + ) + ) + ) + > 0 + ) + + def get_frameshift_qual(f): + for f in feature_lambda( + f.sub_features, feature_test_quals, {"frameshift": None} + ): + return f.qualifiers["frameshift"] + + to_frameshift = [x for x in features if x.type == "gene" and has_frameshift_qual(x)] + fixed = [x for x in features if x not in to_frameshift] + + frameshift_keys = set(sum(map(get_frameshift_qual, to_frameshift), [])) + for key in frameshift_keys: + # Get features matching that key + current = [x for x in to_frameshift if has_frameshift_qual_val(x, key)] + # Fix them and append them + fixed += fix_frameshifted(current) + + return fixed + + +def remove_useless_features(features): + # Drop mRNAs, apollo crap, useless CDSs + for f in features: + if f.type in ( + "non_canonical_three_prime_splice_site", + "non_canonical_five_prime_splice_site", + "stop_codon_read_through", + "mRNA", + "exon", + ): + continue + else: + if f.type == "CDS" and len(f) < 10: + # Another RBS mistake + continue + # We use the full GO term, but it should be less than that. + if f.type == "Shine_Dalgarno_sequence": + f.type = "RBS" + + if f.type == "sequence_feature": + f.type = "misc_feature" + + if f.type == "recombination_feature": + f.type = "misc_recomb" + + if f.type == "sequence_alteration": + f.type = "variation" + + if f.type == "binding_site": + f.type = "misc_binding" + + yield f + + +def merge_multi_cds(mRNA_sf): + cdss = [x for x in mRNA_sf if x.type == "CDS"] + non_cdss = [x for x in mRNA_sf if x.type != "CDS"] + if len(cdss) <= 1: + return non_cdss + cdss + else: + # Grab all locations, and sort them so we can work with them rationally. + locations = sorted([x.location for x in cdss], key=lambda y: y.start) + # Pick randomly a main CDS + main_cds = cdss[0] + # We'll merge the other CDSs into this one. + main_cds.location = CompoundLocation(locations) + return non_cdss + [main_cds] + + +def handle_record(record, transltbl): + full_feats = [] + for feature in fsort(record.features): + if ( + feature.type == "region" + and "source" in feature.qualifiers + and "GenBank" in feature.qualifiers["source"] + ): + feature.type = "source" + + if "comment1" in feature.qualifiers: + del feature.qualifiers["comment1"] + + if "Note" in feature.qualifiers: + record.annotations = feature.qualifiers + if len(feature.qualifiers["Note"]) > 1: + record.annotations["comment"] = feature.qualifiers["Note"][1] + del feature.qualifiers["Note"] + + if "comment" in feature.qualifiers: + del feature.qualifiers["comment"] + + # We'll work on a separate copy of features to avoid modifying a list + # we're iterating over + replacement_feats = [] + replacement_feats += list(handle_non_gene_features(record.features)) + + # Renumbering requires sorting + fid = 0 + for feature in fsort( + feature_lambda( + record.features, feature_test_type, {"type": "gene"}, subfeatures=True + ) + ): + # Our modifications only involve genes + fid += 1 + + feature = fix_gene_boundaries(feature) + # Which have mRNAs we'll drop later + feature = fix_gene_qualifiers(record.id, feature, fid) + + # Wipe out the parent gene's data, leaving only a locus_tag + feature.qualifiers = {"locus_tag": "CPT_%s_%03d" % (record.id, fid)} + + # Patch our features back in (even if they're non-gene features) + replacement_feats.append(feature) + + replacement_feats = fix_frameshifts(replacement_feats) + # exit(0) + flat_features = feature_lambda( + replacement_feats, lambda x: True, {}, subfeatures=True + ) + + flat_features = remove_useless_features(flat_features) + + # Meat of our modifications + for flat_feat in flat_features: + # Try and figure out a name. We gave conflicting instructions, so + # this isn't as trivial as it should be. + protein_product = wa_unified_product_name(flat_feat) + + for x in ( + "source", + "phase", + "Parent", + "ID", + "owner", + "date_creation", + "date_last_modified", + "datasetSource", + ): + if x in flat_feat.qualifiers: + if x == "ID": + flat_feat._ID = flat_feat.qualifiers["ID"] + del flat_feat.qualifiers[x] + + # Add product tag + if flat_feat.type == "CDS": + flat_feat.qualifiers["product"] = [protein_product] + flat_feat.qualifiers["transl_table"] = [transltbl] + if "Product" in flat_feat.qualifiers: + del flat_feat.qualifiers["Product"] + elif flat_feat.type == "RBS": + if "locus_tag" not in flat_feat.qualifiers.keys(): + continue + + elif flat_feat.type == "terminator": + flat_feat.type = "regulatory" + flat_feat.qualifiers = {"regulatory_class": "terminator"} + + # In genbank format, note is lower case. + flat_feat.qualifiers = rename_key(flat_feat.qualifiers, "Note", "note") + flat_feat.qualifiers = rename_key(flat_feat.qualifiers, "description", "note") + flat_feat.qualifiers = rename_key(flat_feat.qualifiers, "protein", "note") + flat_feat.qualifiers = rename_key(flat_feat.qualifiers, "Dbxref", "db_xref") + if "Name" in flat_feat.qualifiers: + del flat_feat.qualifiers["Name"] + + # more apollo nonsense + if "Manually set translation start" in flat_feat.qualifiers.get("note", []): + flat_feat.qualifiers["note"].remove("Manually set translation start") + + # Append the feature + full_feats.append(flat_feat) + + # Update our features + record.features = fsort(full_feats) + # Strip off record names that would cause crashes. + record.name = record.name[0:16] + return record + + +if __name__ == "__main__": + # Grab all of the filters from our plugin loader + parser = argparse.ArgumentParser(description="Convert gff3 to gbk") + parser.add_argument("gff_file", type=argparse.FileType("r"), help="GFF3 file") + parser.add_argument("fasta_file", type=argparse.FileType("r"), help="Fasta Input") + parser.add_argument( + "--transltbl", + type=int, + default=11, + help="Translation Table choice for CDS tag, default 11", + ) + args = parser.parse_args() + + for record in gff3_to_genbank(**vars(args)): + record.annotations["molecule_type"] = "DNA" + # record.seq.alphabet = generic_dna + SeqIO.write([record], sys.stdout, "genbank")