Mercurial > repos > cpt > cpt_annotation_table
view phage_annotation_table.py @ 2:6998c2570145 draft
planemo upload commit 852ac96ca53a2ffa0947e6df5e24671866b642f5
author | cpt |
---|---|
date | Sun, 23 Jul 2023 01:42:48 +0000 |
parents | 32e011fa615c |
children |
line wrap: on
line source
#!/usr/bin/env python # vim: set fileencoding=utf-8 import os import argparse from gff3 import ( genes, get_gff3_id, get_rbs_from, feature_test_true, feature_lambda, feature_test_type, ) from CPT_GFFParser import gffParse, gffWrite from Bio import SeqIO from jinja2 import Environment, FileSystemLoader import logging from math import floor logging.basicConfig(level=logging.DEBUG) log = logging.getLogger(name="pat") # Path to script, required because of Galaxy. SCRIPT_PATH = os.path.dirname(os.path.realpath(__file__)) # Path to the HTML template for the report def genes_all(feature_list, feature_type=["gene"], sort=False): """ Simple filter to extract gene features from the feature set. """ if not sort: for x in feature_lambda( feature_list, feature_test_type, {"types": feature_type}, subfeatures=True ): yield x else: data = list(genes_all(feature_list, feature_type, sort=False)) data = sorted(data, key=lambda feature: feature.location.start) for x in data: yield x def checkSubs(feature, qualName): subFeats = [] res = "" subFeats = feature.sub_features while len(subFeats) > 0: for feat in subFeats: for i in feat.qualifiers.keys(): for j in qualName: if i == j: if res == "": res = feat.qualifiers[i][0] else: res += "; " + feat.qualifiers[i][0] if res != "": return res tempFeats = [] for feat in subFeats: # Should be breadth-first results for x in feat.sub_features: tempFeats.append(x) subFeats = tempFeats return res def annotation_table_report(record, types, wanted_cols, gaf_data, searchSubs): getTypes = [] for x in [y.strip() for y in types.split(",")]: getTypes.append(x) getTypes.append("gene") sorted_features = list(genes_all(record.features, getTypes, sort=True)) if wanted_cols is None or len(wanted_cols.strip()) == 0: return [], [] useSubs = searchSubs def rid(record, feature): """Organism ID""" return record.id def id(record, feature): """ID""" return feature.id def featureType(record, feature): """Type""" return feature.type def name(record, feature): """Name""" for x in ["Name", "name"]: for y in feature.qualifiers.keys(): if x == y: return feature.qualifiers[x][0] if useSubs: res = checkSubs(feature, ["Name", "name"]) if res != "": return res return "None" def start(record, feature): """Boundary""" return str(feature.location.start + 1) def end(record, feature): """Boundary""" return str(feature.location.end) def location(record, feature): """Location""" return str(feature.location.start + 1) + "..{0.end}".format(feature.location) def length(record, feature): """CDS Length (AA)""" if feature.type == "CDS": cdss = [feature] else: cdss = list(genes(feature.sub_features, feature_type="CDS", sort=True)) if cdss == []: return "None" res = (sum([len(cds) for cds in cdss]) / 3) - 1 if floor(res) == res: res = int(res) return str(res) def notes(record, feature): """User entered Notes""" for x in ["Note", "note", "Notes", "notes"]: for y in feature.qualifiers.keys(): if x == y: return feature.qualifiers[x][0] if useSubs: res = checkSubs(feature, ["Note", "note", "Notes", "notes"]) if res != "": return res return "None" def date_created(record, feature): """Created""" return feature.qualifiers.get("date_creation", ["None"])[0] def date_last_modified(record, feature): """Last Modified""" res = feature.qualifiers.get("date_last_modified", ["None"])[0] if res != "None": return res if useSubs: res = checkSubs(feature, ["date_last_modified"]) if res != "": return res return "None" def description(record, feature): """Description""" res = feature.qualifiers.get("description", ["None"])[0] if res != "None": return res if useSubs: res = checkSubs(feature, ["description"]) if res != "": return res return "None" def owner(record, feature): """Owner User who created the feature. In a 464 scenario this may be one of the TAs.""" for x in ["Owner", "owner"]: for y in feature.qualifiers.keys(): if x == y: return feature.qualifiers[x][0] if useSubs: res = checkSubs(feature, ["Owner", "owner"]) if res != "": return res return "None" def product(record, feature): """Product User entered product qualifier (collects "Product" and "product" entries)""" """User entered Notes""" for x in ["product", "Product"]: for y in feature.qualifiers.keys(): if x == y: return feature.qualifiers[x][0] if useSubs: res = checkSubs(feature, ["product", "Product"]) if res != "": return res return "None" def note(record, feature): """Note User entered Note qualifier(s)""" return feature.qualifiers.get("Note", []) def strand(record, feature): """Strand""" return "+" if feature.location.strand > 0 else "-" def sd_spacing(record, feature): """Shine-Dalgarno spacing""" rbss = get_rbs_from(gene) if len(rbss) == 0: return "None" else: resp = [] for rbs in rbss: cdss = list(genes(feature.sub_features, feature_type="CDS", sort=True)) if len(cdss) == 0: return "No CDS" if rbs.location.strand > 0: distance = min( cdss, key=lambda x: x.location.start - rbs.location.end ) distance_val = str(distance.location.start - rbs.location.end) resp.append(distance_val) else: distance = min( cdss, key=lambda x: x.location.end - rbs.location.start ) distance_val = str(rbs.location.start - distance.location.end) resp.append(distance_val) if len(resp) == 1: return str(resp[0]) return resp def sd_seq(record, feature): """Shine-Dalgarno sequence""" rbss = get_rbs_from(gene) if len(rbss) == 0: return "None" else: resp = [] for rbs in rbss: resp.append(str(rbs.extract(record).seq)) if len(resp) == 1: return str(resp[0]) else: return resp def start_codon(record, feature): """Start Codon""" if feature.type == "CDS": cdss = [feature] else: cdss = list(genes(feature.sub_features, feature_type="CDS", sort=True)) data = [x for x in cdss] if len(data) == 1: return str(data[0].extract(record).seq[0:3]) else: return [ "{0} ({1.location.start}..{1.location.end}:{1.location.strand})".format( x.extract(record).seq[0:3], x ) for x in data ] def stop_codon(record, feature): """Stop Codon""" return str(feature.extract(record).seq[-3:]) def dbxrefs(record, feature): """DBxrefs""" """User entered Notes""" for x in ["Dbxref", "db_xref", "DB_xref", "DBxref", "DB_Xref", "DBXref"]: for y in feature.qualifiers.keys(): if x == y: return feature.qualifiers[x][0] return "None" def upstream_feature(record, feature): """Next gene upstream""" if feature.strand > 0: upstream_features = [ x for x in sorted_features if ( x.location.start < feature.location.start and x.type == "gene" and x.strand == feature.strand ) ] if len(upstream_features) > 0: foundSelf = False featCheck = upstream_features[-1].sub_features for x in featCheck: if x == feature: foundSelf = True break featCheck = featCheck + x.sub_features if foundSelf: if len(upstream_features) > 1: return upstream_features[-2] return None return upstream_features[-1] else: return None else: upstream_features = [ x for x in sorted_features if ( x.location.end > feature.location.end and x.type == "gene" and x.strand == feature.strand ) ] if len(upstream_features) > 0: foundSelf = False featCheck = upstream_features[0].sub_features for x in featCheck: if x == feature: foundSelf = True break featCheck = featCheck + x.sub_features if foundSelf: if len(upstream_features) > 1: return upstream_features[1] return None return upstream_features[0] else: return None def upstream_feature__name(record, feature): """Next gene upstream""" up = upstream_feature(record, feature) if up: return str(up.id) return "None" def ig_dist(record, feature): """Distance to next upstream gene on same strand""" up = upstream_feature(record, feature) if up: dist = None if feature.strand > 0: dist = feature.location.start - up.location.end else: dist = up.location.start - feature.location.end return str(dist) else: return "None" def _main_gaf_func(record, feature, gaf_data, attr): if feature.id in gaf_data: return [x[attr] for x in gaf_data[feature.id]] return [] def gaf_annotation_extension(record, feature, gaf_data): """GAF Annotation Extension Contains cross references to other ontologies that can be used to qualify or enhance the annotation. The cross-reference is prefaced by an appropriate GO relationship; references to multiple ontologies can be entered. For example, if a gene product is localized to the mitochondria of lymphocytes, the GO ID (column 5) would be mitochondrion ; GO:0005439, and the annotation extension column would contain a cross-reference to the term lymphocyte from the Cell Type Ontology. """ return _main_gaf_func(record, feature, gaf_data, "annotation_extension") def gaf_aspect(record, feature, gaf_data): """GAF Aspect code E.g. P (biological process), F (molecular function) or C (cellular component) """ return _main_gaf_func(record, feature, gaf_data, "aspect") def gaf_assigned_by(record, feature, gaf_data): """GAF Creating Organisation""" return _main_gaf_func(record, feature, gaf_data, "assigned_by") def gaf_date(record, feature, gaf_data): """GAF Creation Date""" return _main_gaf_func(record, feature, gaf_data, "date") def gaf_db(record, feature, gaf_data): """GAF DB""" return _main_gaf_func(record, feature, gaf_data, "db") def gaf_db_reference(record, feature, gaf_data): """GAF DB Reference""" return _main_gaf_func(record, feature, gaf_data, "db_reference") def gaf_evidence_code(record, feature, gaf_data): """GAF Evidence Code""" return _main_gaf_func(record, feature, gaf_data, "evidence_code") def gaf_go_id(record, feature, gaf_data): """GAF GO ID""" return _main_gaf_func(record, feature, gaf_data, "go_id") def gaf_go_term(record, feature, gaf_data): """GAF GO Term""" return _main_gaf_func(record, feature, gaf_data, "go_term") def gaf_id(record, feature, gaf_data): """GAF ID""" return _main_gaf_func(record, feature, gaf_data, "id") def gaf_notes(record, feature, gaf_data): """GAF Notes""" return _main_gaf_func(record, feature, gaf_data, "notes") def gaf_owner(record, feature, gaf_data): """GAF Creator""" return _main_gaf_func(record, feature, gaf_data, "owner") def gaf_with_or_from(record, feature, gaf_data): """GAF With/From""" return _main_gaf_func(record, feature, gaf_data, "with_or_from") cols = [] data = [] funcs = [] lcl = locals() for x in [y.strip().lower() for y in wanted_cols.split(",")]: if not x: continue if x == "type": x = "featureType" if x in lcl: funcs.append(lcl[x]) # Keep track of docs func_doc = lcl[x].__doc__.strip().split("\n\n") # If there's a double newline, assume following text is the # "help" and the first part is the "name". Generate empty help # if not provided if len(func_doc) == 1: func_doc += [""] cols.append(func_doc) elif "__" in x: chosen_funcs = [lcl[y] for y in x.split("__")] func_doc = [ " of ".join( [y.__doc__.strip().split("\n\n")[0] for y in chosen_funcs[::-1]] ) ] cols.append(func_doc) funcs.append(chosen_funcs) for gene in genes_all(record.features, getTypes, sort=True): row = [] for func in funcs: if isinstance(func, list): # If we have a list of functions, repeatedly apply them value = gene for f in func: if value is None: value = "None" break value = f(record, value) else: # Otherwise just apply the lone function if func.__name__.startswith("gaf_"): value = func(record, gene, gaf_data) else: value = func(record, gene) if isinstance(value, list): collapsed_value = ", ".join(value) value = [str(collapsed_value)] # .encode("unicode_escape")] else: value = str(value) # .encode("unicode_escape") row.append(value) # print row data.append(row) return data, cols def parseGafData(file): cols = [] data = {} # '10d04a01-5ed8-49c8-b724-d6aa4df5a98d': { # 'annotation_extension': '', # 'aspect': '', # 'assigned_by': 'CPT', # 'date': '2017-05-04T16:25:22.161916Z', # 'db': 'UniProtKB', # 'db_reference': 'GO_REF:0000100', # 'evidence_code': 'ISA', # 'gene': '0d307196-833d-46e8-90e9-d80f7a041d88', # 'go_id': 'GO:0039660', # 'go_term': 'structural constituent of virion', # 'id': '10d04a01-5ed8-49c8-b724-d6aa4df5a98d', # 'notes': 'hit was putative minor structural protein', # 'owner': 'amarc1@tamu.edu', # 'with_or_from': 'UNIREF90:B2ZYZ7' # }, for row in file: if row.startswith("#"): # Header cols = ( row.strip().replace("# ", "").replace("GO Term", "go_term").split("\t") ) else: line = row.strip().split("\t") tmp = dict(zip(cols, line)) if "gene" not in tmp.keys(): continue if tmp["gene"] not in data: data[tmp["gene"]] = [] data[tmp["gene"]].append(tmp) return data def evaluate_and_report( annotations, genome, types="gene", reportTemplateName="phage_annotation_validator.html", annotationTableCols="", gafData=None, searchSubs=False, ): """ Generate our HTML evaluation of the genome """ # Get features from GFF file seq_dict = SeqIO.to_dict(SeqIO.parse(genome, "fasta")) # Get the first GFF3 record # TODO: support multiple GFF3 files. at_table_data = [] gaf = {} if gafData: gaf = parseGafData(gafData) for record in gffParse(annotations, base_dict=seq_dict): if reportTemplateName.endswith(".html"): record.id = record.id.replace(".", "-") log.info("Producing an annotation table for %s" % record.id) annotation_table_data, annotation_table_col_names = annotation_table_report( record, types, annotationTableCols, gaf, searchSubs ) at_table_data.append((record, annotation_table_data)) # break # This is data that will go into our HTML template kwargs = { "annotation_table_data": at_table_data, "annotation_table_col_names": annotation_table_col_names, } env = Environment( loader=FileSystemLoader(SCRIPT_PATH), trim_blocks=True, lstrip_blocks=True ) if reportTemplateName.endswith(".html"): env.filters["nice_id"] = str(get_gff3_id).replace(".", "-") else: env.filters["nice_id"] = get_gff3_id def join(listy): return "\n".join(listy) env.filters.update({"join": join}) tpl = env.get_template(reportTemplateName) return tpl.render(**kwargs).encode("utf-8") if __name__ == "__main__": parser = argparse.ArgumentParser( description="rebase gff3 features against parent locations", epilog="" ) parser.add_argument( "annotations", type=argparse.FileType("r"), help="Parent GFF3 annotations" ) parser.add_argument("genome", type=argparse.FileType("r"), help="Genome Sequence") parser.add_argument( "--types", help="Select extra types to display in output (Will always include gene)", ) parser.add_argument( "--reportTemplateName", help="Report template file name", default="phageqc_report_full.html", ) parser.add_argument( "--annotationTableCols", help="Select columns to report in the annotation table output format", ) parser.add_argument( "--gafData", help="CPT GAF-like table", type=argparse.FileType("r") ) parser.add_argument( "--searchSubs", help="Attempt to populate fields from sub-features if qualifier is empty", action="store_true", ) args = parser.parse_args() print(evaluate_and_report(**vars(args)).decode("utf-8"))