Mercurial > repos > cpt > cpt_blastn_to_gff
view blast_to_gff3.py @ 4:ec9534b04518 draft default tip
planemo upload commit f33bdf952d796c5d7a240b132af3c4cbd102decc
author | cpt |
---|---|
date | Fri, 05 Jan 2024 05:49:22 +0000 |
parents | 0d7c61693867 |
children |
line wrap: on
line source
#!/usr/bin/env python import argparse import copy import logging import re import sys from CPT_GFFParser import gffParse, gffWrite, gffSeqFeature from Bio.Blast import NCBIXML from Bio.Seq import Seq from Bio.SeqRecord import SeqRecord from Bio.SeqFeature import SeqFeature, FeatureLocation logging.basicConfig(level=logging.INFO) log = logging.getLogger(name="blast2gff3") __doc__ = """ Convert BlastXML or Blast 25 Column Table output into GFF3 """ # note for all FeatureLocations, Biopython saves in zero index and Blast provides one indexed locations, thus a Blast Location of (123,500) should be saved as (122, 500) def blast2gff3(blast, blastxml=False, blasttab=False, include_seq=False): # Call correct function based on xml or tabular file input, raise error if neither or both are provided if blastxml and blasttab: raise Exception("Cannot provide both blast xml and tabular flag") if blastxml: return blastxml2gff3(blast, include_seq) elif blasttab: return blasttsv2gff3(blast, include_seq) else: raise Exception("Must provide either blast xml or tabular flag") def check_bounds(ps, pe, qs, qe): # simplify the constant boundary checking used in subfeature generation if qs < ps: ps = qs if qe > pe: pe = qe if ps <= 0: ps = 1 return (min(ps, pe), max(ps, pe)) def clean_string(s): clean_str = re.sub("\|", "_", s) # Replace any \ or | with _ clean_str = re.sub( "[^A-Za-z0-9_\ .-]", "", clean_str ) # Remove any non-alphanumeric or _.- chars return clean_str def clean_slist(l): cleaned_list = [] for s in l: cleaned_list.append(clean_string(s)) return cleaned_list def blastxml2gff3(blastxml, include_seq=False): blast_records = NCBIXML.parse(blastxml) for idx_record, record in enumerate(blast_records): # http://www.sequenceontology.org/browser/release_2.4/term/SO:0000343 # match_type = { # Currently we can only handle BLASTN, BLASTP # "BLASTN": "nucleotide_match", # "BLASTP": "protein_match", # }.get(record.application, "match") match_type = "match" collected_records = [] recid = record.query if " " in recid: recid = clean_string(recid[0 : recid.index(" ")]) for idx_hit, hit in enumerate(record.alignments): # gotta check all hsps in a hit to see boundaries rec = SeqRecord("", id=recid) parent_match_start = 0 parent_match_end = 0 hit_qualifiers = { "ID": "b2g.%s.%s" % (idx_record, idx_hit), "source": "blast", "accession": hit.accession, "hit_id": clean_string(hit.hit_id), "score": None, "length": hit.length, "hit_titles": clean_slist(hit.title.split(" >")), "hsp_count": len(hit.hsps), } desc = hit.title.split(" >")[0] hit_qualifiers["Name"] = desc sub_features = [] for idx_hsp, hsp in enumerate(hit.hsps): if idx_hsp == 0: # -2 and +1 for start/end to convert 0 index of python to 1 index of people, -2 on start because feature location saving issue parent_match_start = hsp.query_start parent_match_end = hsp.query_end hit_qualifiers["score"] = hsp.expect # generate qualifiers to be added to gff3 feature hit_qualifiers["score"] = min(hit_qualifiers["score"], hsp.expect) hsp_qualifiers = { "ID": "b2g.%s.%s.hsp%s" % (idx_record, idx_hit, idx_hsp), "source": "blast", "score": hsp.expect, "accession": hit.accession, "hit_id": clean_string(hit.hit_id), "length": hit.length, "hit_titles": clean_slist(hit.title.split(" >")), } if include_seq: if ( "blast_qseq", "blast_sseq", "blast_mseq", ) in hit_qualifiers.keys(): hit_qualifiers.update( { "blast_qseq": hit_qualifiers["blast_qseq"] + hsp.query, "blast_sseq": hit_qualifiers["blast_sseq"] + hsp.sbjct, "blast_mseq": hit_qualifiers["blast_mseq"] + hsp.match, } ) else: hit_qualifiers.update( { "blast_qseq": hsp.query, "blast_sseq": hsp.sbjct, "blast_mseq": hsp.match, } ) for prop in ( "score", "bits", "identities", "positives", "gaps", "align_length", "strand", "frame", "query_start", "query_end", "sbjct_start", "sbjct_end", ): hsp_qualifiers["blast_" + prop] = getattr(hsp, prop, None) # check if parent boundary needs to increase to envelope hsp # if hsp.query_start < parent_match_start: # parent_match_start = hsp.query_start - 1 # if hsp.query_end > parent_match_end: # parent_match_end = hsp.query_end + 1 parent_match_start, parent_match_end = check_bounds( parent_match_start, parent_match_end, hsp.query_start, hsp.query_end ) # add hsp to the gff3 feature as a "match_part" sub_features.append( gffSeqFeature( FeatureLocation(hsp.query_start - 1, hsp.query_end), type="match_part", strand=0, qualifiers=copy.deepcopy(hsp_qualifiers), ) ) # Build the top level seq feature for the hit hit_qualifiers["description"] = "Residue %s..%s hit to %s" % ( parent_match_start, parent_match_end, desc, ) top_feature = gffSeqFeature( FeatureLocation(parent_match_start - 1, parent_match_end), type=match_type, strand=0, qualifiers=hit_qualifiers, ) # add the generated subfeature hsp match_parts to the hit feature top_feature.sub_features = copy.deepcopy( sorted(sub_features, key=lambda x: int(x.location.start)) ) # Add the hit feature to the record rec.features.append(top_feature) rec.annotations = {} collected_records.append(rec) if not len(collected_records): print("##gff-version 3\n##sequence-region null 1 4") for rec in collected_records: yield rec def combine_records(records): # Go through each record and identify those records with cleaned_records = {} for rec in records: combo_id = ( rec.features[0].qualifiers["target"] + rec.features[0].qualifiers["accession"] ) if combo_id not in cleaned_records.keys(): # First instance of a query ID + subject ID combination # Save this record as it's only item newid = rec.features[0].qualifiers["ID"] + ".0" rec.features[0].qualifiers["ID"] = newid rec.features[0].sub_features[0].qualifiers["ID"] = newid + ".hsp0" cleaned_records[combo_id] = rec else: # Query ID + Subject ID has appeared before # Combine the Match Parts as subfeatures sub_features = copy.deepcopy( cleaned_records[combo_id].features[0].sub_features ) addtnl_features = rec.features[0].sub_features # add the current records sub features to the ones previous for feat in addtnl_features: sub_features.append(feat) cleaned_records[combo_id].features[0].subfeatures = copy.deepcopy( sub_features ) cleaned_records[combo_id].features[0].qualifiers["score"] = min( cleaned_records[combo_id].features[0].qualifiers["score"], rec.features[0].qualifiers["score"], ) # now we need to update the IDs for the features when combined # sort them into the proper order, then apply new ids # and also ensure the parent record boundaries fit the whole span of subfeatures sub_features = sorted(sub_features, key=lambda x: int(x.location.start)) new_parent_start = cleaned_records[combo_id].features[0].location.start + 1 new_parent_end = cleaned_records[combo_id].features[0].location.end for idx, feat in enumerate(sub_features): feat.qualifiers["ID"] = "%s.hsp%s" % ( cleaned_records[combo_id].features[0].qualifiers["ID"], idx, ) new_parent_start, new_parent_end = check_bounds( new_parent_start, new_parent_end, feat.location.start + 1, feat.location.end, ) cleaned_records[combo_id].features[0].qualifiers["score"] = min( cleaned_records[combo_id].features[0].qualifiers["score"], feat.qualifiers["blast_score"], ) # if feat.location.start < new_parent_start: # new_parent_start = feat.location.start - 1 # if feat.location.end > new_parent_end: # new_parent_end = feat.location.end + 1 cleaned_records[combo_id].features[0].location = FeatureLocation( new_parent_start - 1, new_parent_end ) cleaned_records[combo_id].features[0].qualifiers[ "description" ] = "Residue %s..%s hit to %s" % ( new_parent_start, new_parent_end, cleaned_records[combo_id].features[0].qualifiers["Name"], ) # save the renamed and ordered feature list to record cleaned_records[combo_id].features[0].sub_features = copy.deepcopy( sub_features ) return sorted( cleaned_records.values(), key=lambda x: int(x.features[0].location.start) ) def blasttsv2gff3(blasttsv, include_seq=False): # http://www.sequenceontology.org/browser/release_2.4/term/SO:0000343 # match_type = { # Currently we can only handle BLASTN, BLASTP # "BLASTN": "nucleotide_match", # "BLASTP": "protein_match", # }.get(type, "match") match_type = "match" columns = [ "qseqid", # 01 Query Seq-id (ID of your sequence) "sseqid", # 02 Subject Seq-id (ID of the database hit) "pident", # 03 Percentage of identical matches "length", # 04 Alignment length "mismatch", # 05 Number of mismatches "gapopen", # 06 Number of gap openings "qstart", # 07 Start of alignment in query "qend", # 08 End of alignment in query "sstart", # 09 Start of alignment in subject (database hit) "send", # 10 End of alignment in subject (database hit) "evalue", # 11 Expectation value (E-value) "bitscore", # 12 Bit score "sallseqid", # 13 All subject Seq-id(s), separated by a ';' "score", # 14 Raw score "nident", # 15 Number of identical matches "positive", # 16 Number of positive-scoring matches "gaps", # 17 Total number of gaps "ppos", # 18 Percentage of positive-scoring matches "qframe", # 19 Query frame "sframe", # 20 Subject frame "qseq", # 21 Aligned part of query sequence "sseq", # 22 Aligned part of subject sequence "qlen", # 23 Query sequence length "slen", # 24 Subject sequence length "salltitles", # 25 All subject title(s), separated by a '<>' ] collected_records = [] for record_idx, record in enumerate(blasttsv): if record.startswith("#"): continue dc = {k: v for (k, v) in zip(columns, (x.strip() for x in record.split("\t")))} rec = SeqRecord("", id=dc["qseqid"]) feature_id = "b2g.%s" % (record_idx) hit_qualifiers = { "ID": feature_id, "Name": (dc["salltitles"].split("<>")[0]), "description": "Residue {sstart}..{send} hit to {x}".format( x=dc["salltitles"].split("<>")[0], **dc ), "source": "blast", "score": dc["evalue"], "accession": clean_string(dc["sseqid"]), "length": dc["qlen"], "hit_titles": clean_slist(dc["salltitles"].split("<>")), "target": clean_string(dc["qseqid"]), } hsp_qualifiers = {"source": "blast"} for key in dc.keys(): # Add the remaining BLAST info to the GFF qualifiers if key in ( "salltitles", "sallseqid", "sseqid", "qseqid", "qseq", "sseq", ): continue hsp_qualifiers["blast_%s" % key] = clean_string(dc[key]) # Below numbers stored as strings, convert to proper form for ( integer_numerical_key ) in "gapopen gaps length mismatch nident positive qend qframe qlen qstart score send sframe slen sstart".split( " " ): dc[integer_numerical_key] = int(dc[integer_numerical_key]) for float_numerical_key in "bitscore evalue pident ppos".split(" "): dc[float_numerical_key] = float(dc[float_numerical_key]) parent_match_start = dc["qstart"] parent_match_end = dc["qend"] parent_match_start, parent_match_end = check_bounds( parent_match_start, parent_match_end, dc["qstart"], dc["qend"] ) # The ``match`` feature will hold one or more ``match_part``s top_feature = gffSeqFeature( FeatureLocation( min(parent_match_start, parent_match_end) - 1, max(parent_match_start, parent_match_end), ), type=match_type, strand=0, qualifiers=hit_qualifiers, ) top_feature.sub_features = [] # There is a possibility of multiple lines containing the HSPS # for the same hit. # Unlike the parent feature, ``match_part``s have sources. hsp_qualifiers["ID"] = clean_string(dc["sseqid"]) match_part_start = dc["qstart"] match_part_end = dc["qend"] top_feature.sub_features.append( gffSeqFeature( FeatureLocation( min(match_part_start, match_part_end) - 1, max(match_part_start, match_part_end), ), type="match_part", strand=0, qualifiers=copy.deepcopy(hsp_qualifiers), ) ) top_feature.sub_features = sorted( top_feature.sub_features, key=lambda x: int(x.location.start) ) rec.features = [top_feature] rec.annotations = {} collected_records.append(rec) collected_records = combine_records(collected_records) if not len(collected_records): print("##gff-version 3\n##sequence-region null 1 4") for rec in collected_records: yield rec if __name__ == "__main__": parser = argparse.ArgumentParser( description="Convert BlastP or BlastN output to GFF3, must provide XML or Tabular output", epilog="", ) parser.add_argument( "blast", type=argparse.FileType("r"), help="Blast XML or 25 Column Tabular Output file", ) parser.add_argument( "--blastxml", action="store_true", help="Process file as Blast XML Output" ) parser.add_argument( "--blasttab", action="store_true", help="Process file as Blast 25 Column Tabular Output", ) parser.add_argument( "--include_seq", action="store_true", help="Include sequence, only used for Blast XML", ) args = parser.parse_args() for rec in blast2gff3(**vars(args)): if len(rec.features): gffWrite([rec], sys.stdout)