Mercurial > repos > fubar > jbrowse2
diff blastxml_to_gapped_gff3.py @ 0:d78175596286 draft
planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/jbrowse2 commit cd77dffaad652cfb75b98bde5231beaa6d63cd5b-dirty
author | fubar |
---|---|
date | Mon, 08 Jan 2024 09:20:33 +0000 |
parents | |
children | 4c201a3d4755 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/blastxml_to_gapped_gff3.py Mon Jan 08 09:20:33 2024 +0000 @@ -0,0 +1,300 @@ +#!/usr/bin/env python +import argparse +import copy +import logging +import re +import sys + +from BCBio import GFF + +logging.basicConfig(level=logging.INFO) +log = logging.getLogger(name="blastxml2gff3") + +__doc__ = """ +BlastXML files, when transformed to GFF3, do not normally show gaps in the +blast hits. This tool aims to fill that "gap". +""" + + +def blastxml2gff3(blastxml, min_gap=3, trim=False, trim_end=False, include_seq=False): + from Bio.Blast import NCBIXML + from Bio.Seq import Seq + from Bio.SeqRecord import SeqRecord + from Bio.SeqFeature import SeqFeature, SimpleLocation + + blast_records = NCBIXML.parse(blastxml) + for idx_record, record in enumerate(blast_records): + # http://www.sequenceontology.org/browser/release_2.4/term/SO:0000343 + match_type = { # Currently we can only handle BLASTN, BLASTP + "BLASTN": "nucleotide_match", + "BLASTP": "protein_match", + }.get(record.application, "match") + + recid = record.query + if " " in recid: + recid = recid[0: recid.index(" ")] + + rec = SeqRecord(Seq("ACTG"), id=recid) + for idx_hit, hit in enumerate(record.alignments): + for idx_hsp, hsp in enumerate(hit.hsps): + qualifiers = { + "ID": "b2g.%s.%s.%s" % (idx_record, idx_hit, idx_hsp), + "source": "blast", + "score": hsp.expect, + "accession": hit.accession, + "hit_id": hit.hit_id, + "length": hit.length, + "hit_titles": hit.title.split(" >"), + } + if include_seq: + qualifiers.update( + { + "blast_qseq": hsp.query, + "blast_sseq": hsp.sbjct, + "blast_mseq": hsp.match, + } + ) + + for prop in ( + "score", + "bits", + "identities", + "positives", + "gaps", + "align_length", + "strand", + "frame", + "query_start", + "query_end", + "sbjct_start", + "sbjct_end", + ): + qualifiers["blast_" + prop] = getattr(hsp, prop, None) + + desc = hit.title.split(" >")[0] + qualifiers["description"] = desc[desc.index(" "):] + + # This required a fair bit of sketching out/match to figure out + # the first time. + # + # the match_start location must account for queries and + # subjecst that start at locations other than 1 + parent_match_start = hsp.query_start - hsp.sbjct_start + # The end is the start + hit.length because the match itself + # may be longer than the parent feature, so we use the supplied + # subject/hit length to calculate the real ending of the target + # protein. + parent_match_end = hsp.query_start + hit.length + hsp.query.count("-") + + # If we trim the left end, we need to trim without losing information. + used_parent_match_start = parent_match_start + if trim: + if parent_match_start < 1: + used_parent_match_start = 0 + + if trim or trim_end: + if parent_match_end > hsp.query_end: + parent_match_end = hsp.query_end + 1 + + # The ``match`` feature will hold one or more ``match_part``s + top_feature = SeqFeature( + SimpleLocation(used_parent_match_start, parent_match_end, strand=0), + type=match_type, + qualifiers=qualifiers, + ) + + # Unlike the parent feature, ``match_part``s have sources. + part_qualifiers = { + "source": "blast", + } + top_feature.sub_features = [] + for idx_part, (start, end, cigar) in enumerate( + generate_parts( + hsp.query, hsp.match, hsp.sbjct, ignore_under=min_gap + ) + ): + part_qualifiers["Gap"] = cigar + part_qualifiers["ID"] = qualifiers["ID"] + (".%s" % idx_part) + + # Otherwise, we have to account for the subject start's location + match_part_start = parent_match_start + hsp.sbjct_start + start - 1 + + # We used to use hsp.align_length here, but that includes + # gaps in the parent sequence + # + # Furthermore align_length will give calculation errors in weird places + # So we just use (end-start) for simplicity + match_part_end = match_part_start + (end - start) + + top_feature.sub_features.append( + SeqFeature( + SimpleLocation(match_part_start, match_part_end, strand=1), + type="match_part", + qualifiers=copy.deepcopy(part_qualifiers), + ) + ) + + rec.features.append(top_feature) + rec.annotations = {} + yield rec + + +def __remove_query_gaps(query, match, subject): + """remove positions in all three based on gaps in query + + In order to simplify math and calculations...we remove all of the gaps + based on gap locations in the query sequence:: + + Q:ACTG-ACTGACTG + S:ACTGAAC---CTG + + will become:: + + Q:ACTGACTGACTG + S:ACTGAC---CTG + + which greatly simplifies the process of identifying the correct location + for a match_part + """ + prev = 0 + fq = "" + fm = "" + fs = "" + for position in re.finditer("-", query): + fq += query[prev: position.start()] + fm += match[prev: position.start()] + fs += subject[prev: position.start()] + prev = position.start() + 1 + fq += query[prev:] + fm += match[prev:] + fs += subject[prev:] + + return (fq, fm, fs) + + +def generate_parts(query, match, subject, ignore_under=3): + region_q = [] + region_m = [] + region_s = [] + + (query, match, subject) = __remove_query_gaps(query, match, subject) + + region_start = -1 + region_end = -1 + mismatch_count = 0 + for i, (q, m, s) in enumerate(zip(query, match, subject)): + + # If we have a match + if m != " " or m == "+": + if region_start == -1: + region_start = i + # It's a new region, we need to reset or it's pre-seeded with + # spaces + region_q = [] + region_m = [] + region_s = [] + region_end = i + mismatch_count = 0 + else: + mismatch_count += 1 + + region_q.append(q) + region_m.append(m) + region_s.append(s) + + if mismatch_count >= ignore_under and region_start != -1 and region_end != -1: + region_q = region_q[0:-ignore_under] + region_m = region_m[0:-ignore_under] + region_s = region_s[0:-ignore_under] + yield region_start, region_end + 1, cigar_from_string( + region_q, region_m, region_s, strict_m=True + ) + region_q = [] + region_m = [] + region_s = [] + + region_start = -1 + region_end = -1 + mismatch_count = 0 + + yield region_start, region_end + 1, cigar_from_string( + region_q, region_m, region_s, strict_m=True + ) + + +def _qms_to_matches(query, match, subject, strict_m=True): + matchline = [] + + for (q, m, s) in zip(query, match, subject): + ret = "" + + if m != " " or m == "+": + ret = "=" + elif m == " ": + if q == "-": + ret = "D" + elif s == "-": + ret = "I" + else: + ret = "X" + else: + log.warn("Bad data: \n\t%s\n\t%s\n\t%s\n" % (query, match, subject)) + + if strict_m: + if ret == "=" or ret == "X": + ret = "M" + + matchline.append(ret) + return matchline + + +def _matchline_to_cigar(matchline): + cigar_line = [] + last_char = matchline[0] + count = 0 + for char in matchline: + if char == last_char: + count += 1 + else: + cigar_line.append("%s%s" % (last_char, count)) + count = 1 + last_char = char + cigar_line.append("%s%s" % (last_char, count)) + return " ".join(cigar_line) + + +def cigar_from_string(query, match, subject, strict_m=True): + matchline = _qms_to_matches(query, match, subject, strict_m=strict_m) + if len(matchline) > 0: + return _matchline_to_cigar(matchline) + else: + return "" + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Convert Blast XML to gapped GFF3", epilog="" + ) + parser.add_argument( + "blastxml", type=argparse.FileType("r"), help="Blast XML Output" + ) + parser.add_argument( + "--min_gap", + type=int, + help="Maximum gap size before generating a new match_part", + default=3, + ) + parser.add_argument( + "--trim", + action="store_true", + help="Trim blast hits to be only as long as the parent feature", + ) + parser.add_argument( + "--trim_end", action="store_true", help="Cut blast results off at end of gene" + ) + parser.add_argument("--include_seq", action="store_true", help="Include sequence") + args = parser.parse_args() + + for rec in blastxml2gff3(**vars(args)): + if len(rec.features): + GFF.write([rec], sys.stdout)