Mercurial > repos > iuc > virannot_rps2tsv
view rps2tsv.py @ 0:bbaa89f070f4 draft
planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/virAnnot commit 3a3b40c15ae5e82334f016e88b1f3c5bbbb3b2cd
author | iuc |
---|---|
date | Mon, 04 Mar 2024 19:56:16 +0000 |
parents | |
children | fd7104249a3c |
line wrap: on
line source
#!/usr/bin/env python3 # Name: rps2ecsv # Author: Marie Lefebvre - INRAE # Aims: Convert rpsblast xml output to csv and add taxonomy import argparse import json import logging as log from urllib import request from urllib.error import HTTPError, URLError from Bio.Blast import NCBIXML from ete3 import NCBITaxa ncbi = NCBITaxa() def main(): options = _set_options() _set_log_level(options.verbosity) hits = _read_xml(options) _write_tsv(options, hits) def _read_xml(options): """ Parse XML RPSblast results file """ log.info("Read XML file " + options.xml_file) xml = open(options.xml_file, 'r') records = NCBIXML.parse(xml) xml_results = {} for blast_record in records: for aln in blast_record.alignments: for hit in aln.hsps: hsp = {} hit_evalue = hit.expect if hit_evalue > options.max_evalue: continue hit_frame = hit.frame[0] # frame hit_evalue = hit.expect # evalue hit_startQ = hit.query_start hit_endQ = hit.query_end hsp["frame"] = hit_frame hsp["evalue"] = hit_evalue hsp["startQ"] = hit_startQ hsp["endQ"] = hit_endQ hsp["query_id"] = blast_record.query_id hsp["cdd_id"] = aln.hit_def.split(",")[0] hsp["hit_id"] = aln.hit_id hsp["query_length"] = blast_record.query_length # length of the query hsp["description"] = aln.hit_def hsp["accession"] = aln.accession hsp["pfam_id"] = hsp["description"].split(",")[0].replace("pfam", "PF") log.info("Requeting Interpro for " + hsp["pfam_id"]) url = "https://www.ebi.ac.uk/interpro/api/entry/pfam/" + hsp["pfam_id"] + "/taxonomy/uniprot/" req = request.Request(url) try: response = request.urlopen(req) except HTTPError as e: log.debug('Http error for interpro: ', e.code) except URLError as e: log.debug('Url error for interpro: ', e.reason) else: encoded_response = response.read() decoded_response = encoded_response.decode() payload = json.loads(decoded_response) kingdoms = [] for item in payload["taxonomy_subset"]: lineage_string = item["lineage"] lineage = [int(i) for i in lineage_string] translation = ncbi.get_taxid_translator(lineage) names = list(translation.values()) taxonomy = names[1:] # remove 'root' at the begining kingdoms.append(taxonomy[0]) frequency = {kingdom: kingdoms.count(kingdom) for kingdom in kingdoms} # {'Pseudomonadota': 9, 'cellular organisms': 4} sorted_freq = dict(sorted(frequency.items(), key=lambda x: x[1], reverse=True)) concat_freq = ";".join("{}({})".format(k, v) for k, v in sorted_freq.items()) hsp["taxonomy"] = concat_freq xml_results[hsp["query_id"]] = hsp return xml_results def _write_tsv(options, hits): """ Write output """ log.info("Write output file " + options.output) headers = "#query_id\tquery_length\tcdd_id\thit_id\tevalue\tstartQ\tendQ\tframe\tdescription\tsuperkingdom\n" f = open(options.output, "w+") f.write(headers) for h in hits: f.write(h + "\t" + str(hits[h]["query_length"]) + "\t") f.write(hits[h]["cdd_id"] + "\t" + hits[h]["hit_id"] + "\t" + str(hits[h]["evalue"]) + "\t") f.write(str(hits[h]["startQ"]) + "\t" + str(hits[h]["endQ"]) + "\t" + str(hits[h]["frame"]) + "\t") f.write(hits[h]["description"] + "\t" + hits[h]["taxonomy"]) f.write("\n") f.close() def _set_options(): parser = argparse.ArgumentParser() parser.add_argument('-x', '--xml', help='XML files with results of blast', action='store', required=True, dest='xml_file') parser.add_argument('-e', '--max_evalue', help='Max evalue', action='store', type=float, default=0.0001, dest='max_evalue') parser.add_argument('-o', '--out', help='The output file (.tab).', action='store', type=str, default='./rps2tsv_output.tab', dest='output') parser.add_argument('-v', '--verbosity', help='Verbose level', action='store', type=int, choices=[1, 2, 3, 4], default=1) args = parser.parse_args() return args def _set_log_level(verbosity): if verbosity == 1: log_format = '%(asctime)s %(levelname)-8s %(message)s' log.basicConfig(level=log.INFO, format=log_format) elif verbosity == 3: log_format = '%(filename)s:%(lineno)s - %(asctime)s %(levelname)-8s %(message)s' log.basicConfig(level=log.DEBUG, format=log_format) if __name__ == "__main__": main()