Mercurial > repos > cpt > cpt_blastp_to_gff
comparison blast_to_gff3.py @ 3:3e3d85f41503 draft
planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author | cpt |
---|---|
date | Mon, 05 Jun 2023 02:39:51 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
2:803d25d24586 | 3:3e3d85f41503 |
---|---|
1 #!/usr/bin/env python | |
2 import argparse | |
3 import copy | |
4 import logging | |
5 import re | |
6 import sys | |
7 from CPT_GFFParser import gffParse, gffWrite, gffSeqFeature | |
8 from Bio.Blast import NCBIXML | |
9 from Bio.Seq import Seq | |
10 from Bio.SeqRecord import SeqRecord | |
11 from Bio.SeqFeature import SeqFeature, FeatureLocation | |
12 | |
13 logging.basicConfig(level=logging.INFO) | |
14 log = logging.getLogger(name="blast2gff3") | |
15 | |
16 __doc__ = """ | |
17 Convert BlastXML or Blast 25 Column Table output into GFF3 | |
18 """ | |
19 | |
20 # note for all FeatureLocations, Biopython saves in zero index and Blast provides one indexed locations, thus a Blast Location of (123,500) should be saved as (122, 500) | |
21 def blast2gff3(blast, blastxml=False, blasttab=False, include_seq=False): | |
22 # Call correct function based on xml or tabular file input, raise error if neither or both are provided | |
23 if blastxml and blasttab: | |
24 raise Exception("Cannot provide both blast xml and tabular flag") | |
25 | |
26 if blastxml: | |
27 return blastxml2gff3(blast, include_seq) | |
28 elif blasttab: | |
29 return blasttsv2gff3(blast, include_seq) | |
30 else: | |
31 raise Exception("Must provide either blast xml or tabular flag") | |
32 | |
33 | |
34 def check_bounds(ps, pe, qs, qe): | |
35 # simplify the constant boundary checking used in subfeature generation | |
36 if qs < ps: | |
37 ps = qs | |
38 if qe > pe: | |
39 pe = qe | |
40 if ps <= 0: | |
41 ps = 1 | |
42 return (min(ps, pe), max(ps, pe)) | |
43 | |
44 | |
45 def clean_string(s): | |
46 clean_str = re.sub("\|", "_", s) # Replace any \ or | with _ | |
47 clean_str = re.sub( | |
48 "[^A-Za-z0-9_\ .-]", "", clean_str | |
49 ) # Remove any non-alphanumeric or _.- chars | |
50 return clean_str | |
51 | |
52 | |
53 def clean_slist(l): | |
54 cleaned_list = [] | |
55 for s in l: | |
56 cleaned_list.append(clean_string(s)) | |
57 return cleaned_list | |
58 | |
59 | |
60 def blastxml2gff3(blastxml, include_seq=False): | |
61 | |
62 blast_records = NCBIXML.parse(blastxml) | |
63 for idx_record, record in enumerate(blast_records): | |
64 # http://www.sequenceontology.org/browser/release_2.4/term/SO:0000343 | |
65 # match_type = { # Currently we can only handle BLASTN, BLASTP | |
66 # "BLASTN": "nucleotide_match", | |
67 # "BLASTP": "protein_match", | |
68 # }.get(record.application, "match") | |
69 match_type = "match" | |
70 collected_records = [] | |
71 | |
72 recid = record.query | |
73 if " " in recid: | |
74 recid = clean_string(recid[0 : recid.index(" ")]) | |
75 | |
76 for idx_hit, hit in enumerate(record.alignments): | |
77 # gotta check all hsps in a hit to see boundaries | |
78 rec = SeqRecord("", id=recid) | |
79 parent_match_start = 0 | |
80 parent_match_end = 0 | |
81 hit_qualifiers = { | |
82 "ID": "b2g.%s.%s" % (idx_record, idx_hit), | |
83 "source": "blast", | |
84 "accession": hit.accession, | |
85 "hit_id": clean_string(hit.hit_id), | |
86 "score": None, | |
87 "length": hit.length, | |
88 "hit_titles": clean_slist(hit.title.split(" >")), | |
89 "hsp_count": len(hit.hsps), | |
90 } | |
91 desc = hit.title.split(" >")[0] | |
92 hit_qualifiers["Name"] = desc | |
93 sub_features = [] | |
94 for idx_hsp, hsp in enumerate(hit.hsps): | |
95 if idx_hsp == 0: | |
96 # -2 and +1 for start/end to convert 0 index of python to 1 index of people, -2 on start because feature location saving issue | |
97 parent_match_start = hsp.query_start | |
98 parent_match_end = hsp.query_end | |
99 hit_qualifiers["score"] = hsp.expect | |
100 # generate qualifiers to be added to gff3 feature | |
101 hit_qualifiers["score"] = min(hit_qualifiers["score"], hsp.expect) | |
102 hsp_qualifiers = { | |
103 "ID": "b2g.%s.%s.hsp%s" % (idx_record, idx_hit, idx_hsp), | |
104 "source": "blast", | |
105 "score": hsp.expect, | |
106 "accession": hit.accession, | |
107 "hit_id": clean_string(hit.hit_id), | |
108 "length": hit.length, | |
109 "hit_titles": clean_slist(hit.title.split(" >")), | |
110 } | |
111 if include_seq: | |
112 if ( | |
113 "blast_qseq", | |
114 "blast_sseq", | |
115 "blast_mseq", | |
116 ) in hit_qualifiers.keys(): | |
117 hit_qualifiers.update( | |
118 { | |
119 "blast_qseq": hit_qualifiers["blast_qseq"] + hsp.query, | |
120 "blast_sseq": hit_qualifiers["blast_sseq"] + hsp.sbjct, | |
121 "blast_mseq": hit_qualifiers["blast_mseq"] + hsp.match, | |
122 } | |
123 ) | |
124 else: | |
125 hit_qualifiers.update( | |
126 { | |
127 "blast_qseq": hsp.query, | |
128 "blast_sseq": hsp.sbjct, | |
129 "blast_mseq": hsp.match, | |
130 } | |
131 ) | |
132 for prop in ( | |
133 "score", | |
134 "bits", | |
135 "identities", | |
136 "positives", | |
137 "gaps", | |
138 "align_length", | |
139 "strand", | |
140 "frame", | |
141 "query_start", | |
142 "query_end", | |
143 "sbjct_start", | |
144 "sbjct_end", | |
145 ): | |
146 hsp_qualifiers["blast_" + prop] = getattr(hsp, prop, None) | |
147 | |
148 # check if parent boundary needs to increase to envelope hsp | |
149 # if hsp.query_start < parent_match_start: | |
150 # parent_match_start = hsp.query_start - 1 | |
151 # if hsp.query_end > parent_match_end: | |
152 # parent_match_end = hsp.query_end + 1 | |
153 | |
154 parent_match_start, parent_match_end = check_bounds( | |
155 parent_match_start, parent_match_end, hsp.query_start, hsp.query_end | |
156 ) | |
157 | |
158 # add hsp to the gff3 feature as a "match_part" | |
159 sub_features.append( | |
160 gffSeqFeature( | |
161 FeatureLocation(hsp.query_start - 1, hsp.query_end), | |
162 type="match_part", | |
163 strand=0, | |
164 qualifiers=copy.deepcopy(hsp_qualifiers), | |
165 ) | |
166 ) | |
167 | |
168 # Build the top level seq feature for the hit | |
169 hit_qualifiers["description"] = "Residue %s..%s hit to %s" % ( | |
170 parent_match_start, | |
171 parent_match_end, | |
172 desc, | |
173 ) | |
174 top_feature = gffSeqFeature( | |
175 FeatureLocation(parent_match_start - 1, parent_match_end), | |
176 type=match_type, | |
177 strand=0, | |
178 qualifiers=hit_qualifiers, | |
179 ) | |
180 # add the generated subfeature hsp match_parts to the hit feature | |
181 top_feature.sub_features = copy.deepcopy( | |
182 sorted(sub_features, key=lambda x: int(x.location.start)) | |
183 ) | |
184 # Add the hit feature to the record | |
185 rec.features.append(top_feature) | |
186 rec.annotations = {} | |
187 collected_records.append(rec) | |
188 | |
189 if not len(collected_records): | |
190 print("##gff-version 3\n##sequence-region null 1 4") | |
191 | |
192 for rec in collected_records: | |
193 yield rec | |
194 | |
195 | |
196 def combine_records(records): | |
197 # Go through each record and identify those records with | |
198 cleaned_records = {} | |
199 for rec in records: | |
200 combo_id = ( | |
201 rec.features[0].qualifiers["target"] | |
202 + rec.features[0].qualifiers["accession"] | |
203 ) | |
204 if combo_id not in cleaned_records.keys(): | |
205 # First instance of a query ID + subject ID combination | |
206 # Save this record as it's only item | |
207 newid = rec.features[0].qualifiers["ID"] + ".0" | |
208 rec.features[0].qualifiers["ID"] = newid | |
209 rec.features[0].sub_features[0].qualifiers["ID"] = newid + ".hsp0" | |
210 cleaned_records[combo_id] = rec | |
211 else: | |
212 # Query ID + Subject ID has appeared before | |
213 # Combine the Match Parts as subfeatures | |
214 sub_features = copy.deepcopy( | |
215 cleaned_records[combo_id].features[0].sub_features | |
216 ) | |
217 addtnl_features = rec.features[0].sub_features | |
218 # add the current records sub features to the ones previous | |
219 for feat in addtnl_features: | |
220 sub_features.append(feat) | |
221 cleaned_records[combo_id].features[0].subfeatures = copy.deepcopy( | |
222 sub_features | |
223 ) | |
224 cleaned_records[combo_id].features[0].qualifiers["score"] = min( | |
225 cleaned_records[combo_id].features[0].qualifiers["score"], | |
226 rec.features[0].qualifiers["score"], | |
227 ) | |
228 # now we need to update the IDs for the features when combined | |
229 # sort them into the proper order, then apply new ids | |
230 # and also ensure the parent record boundaries fit the whole span of subfeatures | |
231 sub_features = sorted(sub_features, key=lambda x: int(x.location.start)) | |
232 new_parent_start = cleaned_records[combo_id].features[0].location.start + 1 | |
233 new_parent_end = cleaned_records[combo_id].features[0].location.end | |
234 for idx, feat in enumerate(sub_features): | |
235 feat.qualifiers["ID"] = "%s.hsp%s" % ( | |
236 cleaned_records[combo_id].features[0].qualifiers["ID"], | |
237 idx, | |
238 ) | |
239 new_parent_start, new_parent_end = check_bounds( | |
240 new_parent_start, | |
241 new_parent_end, | |
242 feat.location.start + 1, | |
243 feat.location.end, | |
244 ) | |
245 cleaned_records[combo_id].features[0].qualifiers["score"] = min( | |
246 cleaned_records[combo_id].features[0].qualifiers["score"], | |
247 feat.qualifiers["blast_score"], | |
248 ) | |
249 # if feat.location.start < new_parent_start: | |
250 # new_parent_start = feat.location.start - 1 | |
251 # if feat.location.end > new_parent_end: | |
252 # new_parent_end = feat.location.end + 1 | |
253 cleaned_records[combo_id].features[0].location = FeatureLocation( | |
254 new_parent_start - 1, new_parent_end | |
255 ) | |
256 cleaned_records[combo_id].features[0].qualifiers[ | |
257 "description" | |
258 ] = "Residue %s..%s hit to %s" % ( | |
259 new_parent_start, | |
260 new_parent_end, | |
261 cleaned_records[combo_id].features[0].qualifiers["Name"], | |
262 ) | |
263 # save the renamed and ordered feature list to record | |
264 cleaned_records[combo_id].features[0].sub_features = copy.deepcopy( | |
265 sub_features | |
266 ) | |
267 return sorted( | |
268 cleaned_records.values(), key=lambda x: int(x.features[0].location.start) | |
269 ) | |
270 | |
271 | |
272 def blasttsv2gff3(blasttsv, include_seq=False): | |
273 | |
274 # http://www.sequenceontology.org/browser/release_2.4/term/SO:0000343 | |
275 # match_type = { # Currently we can only handle BLASTN, BLASTP | |
276 # "BLASTN": "nucleotide_match", | |
277 # "BLASTP": "protein_match", | |
278 # }.get(type, "match") | |
279 match_type = "match" | |
280 | |
281 columns = [ | |
282 "qseqid", # 01 Query Seq-id (ID of your sequence) | |
283 "sseqid", # 02 Subject Seq-id (ID of the database hit) | |
284 "pident", # 03 Percentage of identical matches | |
285 "length", # 04 Alignment length | |
286 "mismatch", # 05 Number of mismatches | |
287 "gapopen", # 06 Number of gap openings | |
288 "qstart", # 07 Start of alignment in query | |
289 "qend", # 08 End of alignment in query | |
290 "sstart", # 09 Start of alignment in subject (database hit) | |
291 "send", # 10 End of alignment in subject (database hit) | |
292 "evalue", # 11 Expectation value (E-value) | |
293 "bitscore", # 12 Bit score | |
294 "sallseqid", # 13 All subject Seq-id(s), separated by a ';' | |
295 "score", # 14 Raw score | |
296 "nident", # 15 Number of identical matches | |
297 "positive", # 16 Number of positive-scoring matches | |
298 "gaps", # 17 Total number of gaps | |
299 "ppos", # 18 Percentage of positive-scoring matches | |
300 "qframe", # 19 Query frame | |
301 "sframe", # 20 Subject frame | |
302 "qseq", # 21 Aligned part of query sequence | |
303 "sseq", # 22 Aligned part of subject sequence | |
304 "qlen", # 23 Query sequence length | |
305 "slen", # 24 Subject sequence length | |
306 "salltitles", # 25 All subject title(s), separated by a '<>' | |
307 ] | |
308 collected_records = [] | |
309 for record_idx, record in enumerate(blasttsv): | |
310 if record.startswith("#"): | |
311 continue | |
312 | |
313 dc = {k: v for (k, v) in zip(columns, (x.strip() for x in record.split("\t")))} | |
314 | |
315 rec = SeqRecord("", id=dc["qseqid"]) | |
316 | |
317 feature_id = "b2g.%s" % (record_idx) | |
318 hit_qualifiers = { | |
319 "ID": feature_id, | |
320 "Name": (dc["salltitles"].split("<>")[0]), | |
321 "description": "Residue {sstart}..{send} hit to {x}".format( | |
322 x=dc["salltitles"].split("<>")[0], **dc | |
323 ), | |
324 "source": "blast", | |
325 "score": dc["evalue"], | |
326 "accession": clean_string(dc["sseqid"]), | |
327 "length": dc["qlen"], | |
328 "hit_titles": clean_slist(dc["salltitles"].split("<>")), | |
329 "target": clean_string(dc["qseqid"]), | |
330 } | |
331 hsp_qualifiers = {"source": "blast"} | |
332 for key in dc.keys(): | |
333 # Add the remaining BLAST info to the GFF qualifiers | |
334 if key in ( | |
335 "salltitles", | |
336 "sallseqid", | |
337 "sseqid", | |
338 "qseqid", | |
339 "qseq", | |
340 "sseq", | |
341 ): | |
342 continue | |
343 hsp_qualifiers["blast_%s" % key] = clean_string(dc[key]) | |
344 | |
345 # Below numbers stored as strings, convert to proper form | |
346 for ( | |
347 integer_numerical_key | |
348 ) in "gapopen gaps length mismatch nident positive qend qframe qlen qstart score send sframe slen sstart".split( | |
349 " " | |
350 ): | |
351 dc[integer_numerical_key] = int(dc[integer_numerical_key]) | |
352 | |
353 for float_numerical_key in "bitscore evalue pident ppos".split(" "): | |
354 dc[float_numerical_key] = float(dc[float_numerical_key]) | |
355 | |
356 parent_match_start = dc["qstart"] | |
357 parent_match_end = dc["qend"] | |
358 | |
359 parent_match_start, parent_match_end = check_bounds( | |
360 parent_match_start, parent_match_end, dc["qstart"], dc["qend"] | |
361 ) | |
362 | |
363 # The ``match`` feature will hold one or more ``match_part``s | |
364 top_feature = gffSeqFeature( | |
365 FeatureLocation( | |
366 min(parent_match_start, parent_match_end) - 1, | |
367 max(parent_match_start, parent_match_end), | |
368 ), | |
369 type=match_type, | |
370 strand=0, | |
371 qualifiers=hit_qualifiers, | |
372 ) | |
373 top_feature.sub_features = [] | |
374 # There is a possibility of multiple lines containing the HSPS | |
375 # for the same hit. | |
376 # Unlike the parent feature, ``match_part``s have sources. | |
377 hsp_qualifiers["ID"] = clean_string(dc["sseqid"]) | |
378 match_part_start = dc["qstart"] | |
379 match_part_end = dc["qend"] | |
380 | |
381 top_feature.sub_features.append( | |
382 gffSeqFeature( | |
383 FeatureLocation( | |
384 min(match_part_start, match_part_end) - 1, | |
385 max(match_part_start, match_part_end), | |
386 ), | |
387 type="match_part", | |
388 strand=0, | |
389 qualifiers=copy.deepcopy(hsp_qualifiers), | |
390 ) | |
391 ) | |
392 top_feature.sub_features = sorted( | |
393 top_feature.sub_features, key=lambda x: int(x.location.start) | |
394 ) | |
395 rec.features = [top_feature] | |
396 rec.annotations = {} | |
397 collected_records.append(rec) | |
398 | |
399 collected_records = combine_records(collected_records) | |
400 if not len(collected_records): | |
401 print("##gff-version 3\n##sequence-region null 1 4") | |
402 for rec in collected_records: | |
403 yield rec | |
404 | |
405 | |
406 if __name__ == "__main__": | |
407 parser = argparse.ArgumentParser( | |
408 description="Convert BlastP or BlastN output to GFF3, must provide XML or Tabular output", | |
409 epilog="", | |
410 ) | |
411 parser.add_argument( | |
412 "blast", | |
413 type=argparse.FileType("r"), | |
414 help="Blast XML or 25 Column Tabular Output file", | |
415 ) | |
416 parser.add_argument( | |
417 "--blastxml", action="store_true", help="Process file as Blast XML Output" | |
418 ) | |
419 parser.add_argument( | |
420 "--blasttab", | |
421 action="store_true", | |
422 help="Process file as Blast 25 Column Tabular Output", | |
423 ) | |
424 parser.add_argument( | |
425 "--include_seq", | |
426 action="store_true", | |
427 help="Include sequence, only used for Blast XML", | |
428 ) | |
429 args = parser.parse_args() | |
430 | |
431 for rec in blast2gff3(**vars(args)): | |
432 if len(rec.features): | |
433 gffWrite([rec], sys.stdout) |