diff phage_annotation_table.py @ 1:32e011fa615c draft

planemo upload commit edc74553919d09dcbe27fcadf144612c1ad3a2a2
author cpt
date Wed, 26 Apr 2023 03:42:32 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/phage_annotation_table.py	Wed Apr 26 03:42:32 2023 +0000
@@ -0,0 +1,608 @@
+#!/usr/bin/env python
+# vim: set fileencoding=utf-8
+import os
+import argparse
+from gff3 import (
+    genes,
+    get_gff3_id,
+    get_rbs_from,
+    feature_test_true,
+    feature_lambda,
+    feature_test_type,
+)
+from CPT_GFFParser import gffParse, gffWrite
+from Bio import SeqIO
+from jinja2 import Environment, FileSystemLoader
+import logging
+from math import floor
+
+logging.basicConfig(level=logging.DEBUG)
+log = logging.getLogger(name="pat")
+
+# Path to script, required because of Galaxy.
+SCRIPT_PATH = os.path.dirname(os.path.realpath(__file__))
+# Path to the HTML template for the report
+
+
+def genes_all(feature_list, feature_type=["gene"], sort=False):
+    """
+    Simple filter to extract gene features from the feature set.
+    """
+
+    if not sort:
+        for x in feature_lambda(
+            feature_list, feature_test_type, {"types": feature_type}, subfeatures=True
+        ):
+            yield x
+    else:
+        data = list(genes_all(feature_list, feature_type, sort=False))
+        data = sorted(data, key=lambda feature: feature.location.start)
+        for x in data:
+            yield x
+
+
+def checkSubs(feature, qualName):
+    subFeats = []
+    res = ""
+    subFeats = feature.sub_features
+    while len(subFeats) > 0:
+        for feat in subFeats:
+            for i in feat.qualifiers.keys():
+                for j in qualName:
+                    if i == j:
+                        if res == "":
+                            res = feat.qualifiers[i][0]
+                        else:
+                            res += "; " + feat.qualifiers[i][0]
+        if res != "":
+            return res
+        tempFeats = []
+        for feat in subFeats:  # Should be breadth-first results
+            for x in feat.sub_features:
+                tempFeats.append(x)
+        subFeats = tempFeats
+    return res
+
+
+def annotation_table_report(record, types, wanted_cols, gaf_data, searchSubs):
+    getTypes = []
+    for x in [y.strip() for y in types.split(",")]:
+        getTypes.append(x)
+    getTypes.append("gene")
+    sorted_features = list(genes_all(record.features, getTypes, sort=True))
+    if wanted_cols is None or len(wanted_cols.strip()) == 0:
+        return [], []
+    useSubs = searchSubs
+
+    def rid(record, feature):
+        """Organism ID"""
+        return record.id
+
+    def id(record, feature):
+        """ID"""
+        return feature.id
+
+    def featureType(record, feature):
+        """Type"""
+        return feature.type
+
+    def name(record, feature):
+        """Name"""
+        for x in ["Name", "name"]:
+            for y in feature.qualifiers.keys():
+                if x == y:
+                    return feature.qualifiers[x][0]
+        if useSubs:
+            res = checkSubs(feature, ["Name", "name"])
+            if res != "":
+                return res
+        return "None"
+
+    def start(record, feature):
+        """Boundary"""
+        return str(feature.location.start + 1)
+
+    def end(record, feature):
+        """Boundary"""
+        return str(feature.location.end)
+
+    def location(record, feature):
+        """Location"""
+        return str(feature.location.start + 1) + "..{0.end}".format(feature.location)
+
+    def length(record, feature):
+        """CDS Length (AA)"""
+
+        if feature.type == "CDS":
+            cdss = [feature]
+        else:
+            cdss = list(genes(feature.sub_features, feature_type="CDS", sort=True))
+
+        if cdss == []:
+            return "None"
+        res = (sum([len(cds) for cds in cdss]) / 3) - 1
+        if floor(res) == res:
+            res = int(res)
+        return str(res)
+
+    def notes(record, feature):
+        """User entered Notes"""
+        for x in ["Note", "note", "Notes", "notes"]:
+            for y in feature.qualifiers.keys():
+                if x == y:
+                    return feature.qualifiers[x][0]
+        if useSubs:
+            res = checkSubs(feature, ["Note", "note", "Notes", "notes"])
+            if res != "":
+                return res
+        return "None"
+
+    def date_created(record, feature):
+        """Created"""
+        return feature.qualifiers.get("date_creation", ["None"])[0]
+
+    def date_last_modified(record, feature):
+        """Last Modified"""
+        res = feature.qualifiers.get("date_last_modified", ["None"])[0]
+        if res != "None":
+            return res
+        if useSubs:
+            res = checkSubs(feature, ["date_last_modified"])
+            if res != "":
+                return res
+        return "None"
+
+    def description(record, feature):
+        """Description"""
+        res = feature.qualifiers.get("description", ["None"])[0]
+        if res != "None":
+            return res
+        if useSubs:
+            res = checkSubs(feature, ["description"])
+            if res != "":
+                return res
+        return "None"
+
+    def owner(record, feature):
+        """Owner
+
+        User who created the feature. In a 464 scenario this may be one of
+        the TAs."""
+        for x in ["Owner", "owner"]:
+            for y in feature.qualifiers.keys():
+                if x == y:
+                    return feature.qualifiers[x][0]
+        if useSubs:
+            res = checkSubs(feature, ["Owner", "owner"])
+            if res != "":
+                return res
+        return "None"
+
+    def product(record, feature):
+        """Product
+
+        User entered product qualifier (collects "Product" and "product"
+        entries)"""
+        """User entered Notes"""
+        for x in ["product", "Product"]:
+            for y in feature.qualifiers.keys():
+                if x == y:
+                    return feature.qualifiers[x][0]
+        if useSubs:
+            res = checkSubs(feature, ["product", "Product"])
+            if res != "":
+                return res
+        return "None"
+
+    def note(record, feature):
+        """Note
+
+        User entered Note qualifier(s)"""
+        return feature.qualifiers.get("Note", [])
+
+    def strand(record, feature):
+        """Strand"""
+        return "+" if feature.location.strand > 0 else "-"
+
+    def sd_spacing(record, feature):
+        """Shine-Dalgarno spacing"""
+        rbss = get_rbs_from(gene)
+        if len(rbss) == 0:
+            return "None"
+        else:
+            resp = []
+            for rbs in rbss:
+                cdss = list(genes(feature.sub_features, feature_type="CDS", sort=True))
+                if len(cdss) == 0:
+                    return "No CDS"
+                if rbs.location.strand > 0:
+                    distance = min(
+                        cdss, key=lambda x: x.location.start - rbs.location.end
+                    )
+                    distance_val = str(distance.location.start - rbs.location.end)
+                    resp.append(distance_val)
+                else:
+                    distance = min(
+                        cdss, key=lambda x: x.location.end - rbs.location.start
+                    )
+                    distance_val = str(rbs.location.start - distance.location.end)
+                    resp.append(distance_val)
+
+            if len(resp) == 1:
+                return str(resp[0])
+            return resp
+
+    def sd_seq(record, feature):
+        """Shine-Dalgarno sequence"""
+        rbss = get_rbs_from(gene)
+        if len(rbss) == 0:
+            return "None"
+        else:
+            resp = []
+            for rbs in rbss:
+                resp.append(str(rbs.extract(record).seq))
+            if len(resp) == 1:
+                return str(resp[0])
+            else:
+                return resp
+
+    def start_codon(record, feature):
+        """Start Codon"""
+        if feature.type == "CDS":
+            cdss = [feature]
+        else:
+            cdss = list(genes(feature.sub_features, feature_type="CDS", sort=True))
+
+        data = [x for x in cdss]
+        if len(data) == 1:
+            return str(data[0].extract(record).seq[0:3])
+        else:
+            return [
+                "{0} ({1.location.start}..{1.location.end}:{1.location.strand})".format(
+                    x.extract(record).seq[0:3], x
+                )
+                for x in data
+            ]
+
+    def stop_codon(record, feature):
+        """Stop Codon"""
+        return str(feature.extract(record).seq[-3:])
+
+    def dbxrefs(record, feature):
+        """DBxrefs"""
+        """User entered Notes"""
+        for x in ["Dbxref", "db_xref", "DB_xref", "DBxref", "DB_Xref", "DBXref"]:
+            for y in feature.qualifiers.keys():
+                if x == y:
+                    return feature.qualifiers[x][0]
+        return "None"
+
+    def upstream_feature(record, feature):
+        """Next gene upstream"""
+        if feature.strand > 0:
+            upstream_features = [
+                x
+                for x in sorted_features
+                if (
+                    x.location.start < feature.location.start
+                    and x.type == "gene"
+                    and x.strand == feature.strand
+                )
+            ]
+            if len(upstream_features) > 0:
+                foundSelf = False
+                featCheck = upstream_features[-1].sub_features
+                for x in featCheck:
+                    if x == feature:
+                        foundSelf = True
+                        break
+                    featCheck = featCheck + x.sub_features
+                if foundSelf:
+                    if len(upstream_features) > 1:
+                        return upstream_features[-2]
+                    return None
+                return upstream_features[-1]
+            else:
+                return None
+        else:
+            upstream_features = [
+                x
+                for x in sorted_features
+                if (
+                    x.location.end > feature.location.end
+                    and x.type == "gene"
+                    and x.strand == feature.strand
+                )
+            ]
+
+            if len(upstream_features) > 0:
+                foundSelf = False
+                featCheck = upstream_features[0].sub_features
+                for x in featCheck:
+                    if x == feature:
+                        foundSelf = True
+                        break
+                    featCheck = featCheck + x.sub_features
+                if foundSelf:
+                    if len(upstream_features) > 1:
+                        return upstream_features[1]
+                    return None
+                return upstream_features[0]
+            else:
+                return None
+
+    def upstream_feature__name(record, feature):
+        """Next gene upstream"""
+        up = upstream_feature(record, feature)
+        if up:
+            return str(up.id)
+        return "None"
+
+    def ig_dist(record, feature):
+        """Distance to next upstream gene on same strand"""
+        up = upstream_feature(record, feature)
+        if up:
+            dist = None
+            if feature.strand > 0:
+                dist = feature.location.start - up.location.end
+            else:
+                dist = up.location.start - feature.location.end
+            return str(dist)
+        else:
+            return "None"
+
+    def _main_gaf_func(record, feature, gaf_data, attr):
+        if feature.id in gaf_data:
+            return [x[attr] for x in gaf_data[feature.id]]
+        return []
+
+    def gaf_annotation_extension(record, feature, gaf_data):
+        """GAF Annotation Extension
+
+        Contains cross references to other ontologies that can be used
+        to qualify or enhance the annotation. The cross-reference is
+        prefaced by an appropriate GO relationship; references to
+        multiple ontologies can be entered. For example, if a gene
+        product is localized to the mitochondria of lymphocytes, the GO
+        ID (column 5) would be mitochondrion ; GO:0005439, and the
+        annotation extension column would contain a cross-reference to
+        the term lymphocyte from the Cell Type Ontology.
+        """
+        return _main_gaf_func(record, feature, gaf_data, "annotation_extension")
+
+    def gaf_aspect(record, feature, gaf_data):
+        """GAF Aspect code
+
+        E.g. P (biological process), F (molecular function) or C (cellular component)
+        """
+        return _main_gaf_func(record, feature, gaf_data, "aspect")
+
+    def gaf_assigned_by(record, feature, gaf_data):
+        """GAF Creating Organisation"""
+        return _main_gaf_func(record, feature, gaf_data, "assigned_by")
+
+    def gaf_date(record, feature, gaf_data):
+        """GAF Creation Date"""
+        return _main_gaf_func(record, feature, gaf_data, "date")
+
+    def gaf_db(record, feature, gaf_data):
+        """GAF DB"""
+        return _main_gaf_func(record, feature, gaf_data, "db")
+
+    def gaf_db_reference(record, feature, gaf_data):
+        """GAF DB Reference"""
+        return _main_gaf_func(record, feature, gaf_data, "db_reference")
+
+    def gaf_evidence_code(record, feature, gaf_data):
+        """GAF Evidence Code"""
+        return _main_gaf_func(record, feature, gaf_data, "evidence_code")
+
+    def gaf_go_id(record, feature, gaf_data):
+        """GAF GO ID"""
+        return _main_gaf_func(record, feature, gaf_data, "go_id")
+
+    def gaf_go_term(record, feature, gaf_data):
+        """GAF GO Term"""
+        return _main_gaf_func(record, feature, gaf_data, "go_term")
+
+    def gaf_id(record, feature, gaf_data):
+        """GAF ID"""
+        return _main_gaf_func(record, feature, gaf_data, "id")
+
+    def gaf_notes(record, feature, gaf_data):
+        """GAF Notes"""
+        return _main_gaf_func(record, feature, gaf_data, "notes")
+
+    def gaf_owner(record, feature, gaf_data):
+        """GAF Creator"""
+        return _main_gaf_func(record, feature, gaf_data, "owner")
+
+    def gaf_with_or_from(record, feature, gaf_data):
+        """GAF With/From"""
+        return _main_gaf_func(record, feature, gaf_data, "with_or_from")
+
+    cols = []
+    data = []
+    funcs = []
+    lcl = locals()
+    for x in [y.strip().lower() for y in wanted_cols.split(",")]:
+        if not x:
+            continue
+        if x == "type":
+            x = "featureType"
+        if x in lcl:
+            funcs.append(lcl[x])
+            # Keep track of docs
+            func_doc = lcl[x].__doc__.strip().split("\n\n")
+            # If there's a double newline, assume following text is the
+            # "help" and the first part is the "name". Generate empty help
+            # if not provided
+            if len(func_doc) == 1:
+                func_doc += [""]
+            cols.append(func_doc)
+        elif "__" in x:
+            chosen_funcs = [lcl[y] for y in x.split("__")]
+            func_doc = [
+                " of ".join(
+                    [y.__doc__.strip().split("\n\n")[0] for y in chosen_funcs[::-1]]
+                )
+            ]
+            cols.append(func_doc)
+            funcs.append(chosen_funcs)
+
+    for gene in genes_all(record.features, getTypes, sort=True):
+        row = []
+        for func in funcs:
+            if isinstance(func, list):
+                # If we have a list of functions, repeatedly apply them
+                value = gene
+                for f in func:
+                    if value is None:
+                        value = "None"
+                        break
+
+                    value = f(record, value)
+            else:
+                # Otherwise just apply the lone function
+                if func.__name__.startswith("gaf_"):
+                    value = func(record, gene, gaf_data)
+                else:
+                    value = func(record, gene)
+
+            if isinstance(value, list):
+                collapsed_value = ", ".join(value)
+                value = [str(collapsed_value)]  # .encode("unicode_escape")]
+            else:
+                value = str(value)  # .encode("unicode_escape")
+
+            row.append(value)
+        # print row
+        data.append(row)
+    return data, cols
+
+
+def parseGafData(file):
+    cols = []
+    data = {}
+    # '10d04a01-5ed8-49c8-b724-d6aa4df5a98d': {
+    # 'annotation_extension': '',
+    # 'aspect': '',
+    # 'assigned_by': 'CPT',
+    # 'date': '2017-05-04T16:25:22.161916Z',
+    # 'db': 'UniProtKB',
+    # 'db_reference': 'GO_REF:0000100',
+    # 'evidence_code': 'ISA',
+    # 'gene': '0d307196-833d-46e8-90e9-d80f7a041d88',
+    # 'go_id': 'GO:0039660',
+    # 'go_term': 'structural constituent of virion',
+    # 'id': '10d04a01-5ed8-49c8-b724-d6aa4df5a98d',
+    # 'notes': 'hit was putative minor structural protein',
+    # 'owner': 'amarc1@tamu.edu',
+    # 'with_or_from': 'UNIREF90:B2ZYZ7'
+    # },
+    for row in file:
+        if row.startswith("#"):
+            # Header
+            cols = (
+                row.strip().replace("# ", "").replace("GO Term", "go_term").split("\t")
+            )
+        else:
+            line = row.strip().split("\t")
+            tmp = dict(zip(cols, line))
+            if "gene" not in tmp.keys():
+                continue
+            if tmp["gene"] not in data:
+                data[tmp["gene"]] = []
+
+            data[tmp["gene"]].append(tmp)
+    return data
+
+
+def evaluate_and_report(
+    annotations,
+    genome,
+    types="gene",
+    reportTemplateName="phage_annotation_validator.html",
+    annotationTableCols="",
+    gafData=None,
+    searchSubs=False,
+):
+    """
+    Generate our HTML evaluation of the genome
+    """
+    # Get features from GFF file
+    seq_dict = SeqIO.to_dict(SeqIO.parse(genome, "fasta"))
+    # Get the first GFF3 record
+    # TODO: support multiple GFF3 files.
+    at_table_data = []
+    gaf = {}
+    if gafData:
+        gaf = parseGafData(gafData)
+
+    for record in gffParse(annotations, base_dict=seq_dict):
+        if reportTemplateName.endswith(".html"):
+            record.id = record.id.replace(".", "-")
+        log.info("Producing an annotation table for %s" % record.id)
+        annotation_table_data, annotation_table_col_names = annotation_table_report(
+            record, types, annotationTableCols, gaf, searchSubs
+        )
+        at_table_data.append((record, annotation_table_data))
+        # break
+
+    # This is data that will go into our HTML template
+    kwargs = {
+        "annotation_table_data": at_table_data,
+        "annotation_table_col_names": annotation_table_col_names,
+    }
+
+    env = Environment(
+        loader=FileSystemLoader(SCRIPT_PATH), trim_blocks=True, lstrip_blocks=True
+    )
+    if reportTemplateName.endswith(".html"):
+        env.filters["nice_id"] = str(get_gff3_id).replace(".", "-")
+    else:
+        env.filters["nice_id"] = get_gff3_id
+
+    def join(listy):
+        return "\n".join(listy)
+
+    env.filters.update({"join": join})
+    tpl = env.get_template(reportTemplateName)
+    return tpl.render(**kwargs).encode("utf-8")
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser(
+        description="rebase gff3 features against parent locations", epilog=""
+    )
+    parser.add_argument(
+        "annotations", type=argparse.FileType("r"), help="Parent GFF3 annotations"
+    )
+    parser.add_argument("genome", type=argparse.FileType("r"), help="Genome Sequence")
+
+    parser.add_argument(
+        "--types",
+        help="Select extra types to display in output (Will always include gene)",
+    )
+
+    parser.add_argument(
+        "--reportTemplateName",
+        help="Report template file name",
+        default="phageqc_report_full.html",
+    )
+    parser.add_argument(
+        "--annotationTableCols",
+        help="Select columns to report in the annotation table output format",
+    )
+    parser.add_argument(
+        "--gafData", help="CPT GAF-like table", type=argparse.FileType("r")
+    )
+    parser.add_argument(
+        "--searchSubs",
+        help="Attempt to populate fields from sub-features if qualifier is empty",
+        action="store_true",
+    )
+
+    args = parser.parse_args()
+
+    print(evaluate_and_report(**vars(args)).decode("utf-8"))