Mercurial > repos > cpt > cpt_blastn_to_gff
comparison cpt_blastn_to_gff/blast_to_gff3.py @ 0:54c3aabcb3e7 draft
Uploaded
author | cpt |
---|---|
date | Fri, 13 May 2022 04:42:45 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:54c3aabcb3e7 |
---|---|
1 #!/usr/bin/env python | |
2 import argparse | |
3 import copy | |
4 import logging | |
5 import re | |
6 import sys | |
7 from CPT_GFFParser import gffParse, gffWrite, gffSeqFeature | |
8 from Bio.Blast import NCBIXML | |
9 from Bio.Seq import Seq | |
10 from Bio.SeqRecord import SeqRecord | |
11 from Bio.SeqFeature import SeqFeature, FeatureLocation | |
12 | |
13 logging.basicConfig(level=logging.INFO) | |
14 log = logging.getLogger(name="blast2gff3") | |
15 | |
16 __doc__ = """ | |
17 Convert BlastXML or Blast 25 Column Table output into GFF3 | |
18 """ | |
19 | |
20 # note for all FeatureLocations, Biopython saves in zero index and Blast provides one indexed locations, thus a Blast Location of (123,500) should be saved as (122, 500) | |
21 def blast2gff3(blast, blastxml=False, blasttab=False, include_seq=False): | |
22 # Call correct function based on xml or tabular file input, raise error if neither or both are provided | |
23 if blastxml and blasttab: | |
24 raise Exception("Cannot provide both blast xml and tabular flag") | |
25 | |
26 if blastxml: | |
27 return blastxml2gff3(blast, include_seq) | |
28 elif blasttab: | |
29 return blasttsv2gff3(blast, include_seq) | |
30 else: | |
31 raise Exception("Must provide either blast xml or tabular flag") | |
32 | |
33 | |
34 def check_bounds(ps, pe, qs, qe): | |
35 # simplify the constant boundary checking used in subfeature generation | |
36 if qs < ps: | |
37 ps = qs | |
38 if qe > pe: | |
39 pe = qe | |
40 if ps <= 0: | |
41 ps = 1 | |
42 return (min(ps, pe), max(ps, pe)) | |
43 | |
44 | |
45 def clean_string(s): | |
46 clean_str = re.sub("\|", "_", s) # Replace any \ or | with _ | |
47 clean_str = re.sub( | |
48 "[^A-Za-z0-9_\ .-]", "", clean_str | |
49 ) # Remove any non-alphanumeric or _.- chars | |
50 return clean_str | |
51 | |
52 | |
53 def clean_slist(l): | |
54 cleaned_list = [] | |
55 for s in l: | |
56 cleaned_list.append(clean_string(s)) | |
57 return cleaned_list | |
58 | |
59 | |
60 def blastxml2gff3(blastxml, include_seq=False): | |
61 | |
62 blast_records = NCBIXML.parse(blastxml) | |
63 for idx_record, record in enumerate(blast_records): | |
64 # http://www.sequenceontology.org/browser/release_2.4/term/SO:0000343 | |
65 # match_type = { # Currently we can only handle BLASTN, BLASTP | |
66 # "BLASTN": "nucleotide_match", | |
67 # "BLASTP": "protein_match", | |
68 # }.get(record.application, "match") | |
69 match_type = "match" | |
70 collected_records = [] | |
71 | |
72 recid = record.query | |
73 if " " in recid: | |
74 recid = clean_string(recid[0 : recid.index(" ")]) | |
75 | |
76 for idx_hit, hit in enumerate(record.alignments): | |
77 # gotta check all hsps in a hit to see boundaries | |
78 rec = SeqRecord("", id=recid) | |
79 parent_match_start = 0 | |
80 parent_match_end = 0 | |
81 hit_qualifiers = { | |
82 "ID": "b2g.%s.%s" % (idx_record, idx_hit), | |
83 "source": "blast", | |
84 "accession": hit.accession, | |
85 "hit_id": clean_string(hit.hit_id), | |
86 "score": None, | |
87 "length": hit.length, | |
88 "hit_titles": clean_slist(hit.title.split(" >")), | |
89 "hsp_count": len(hit.hsps), | |
90 } | |
91 desc = hit.title.split(" >")[0] | |
92 hit_qualifiers["Name"] = desc | |
93 sub_features = [] | |
94 for idx_hsp, hsp in enumerate(hit.hsps): | |
95 if idx_hsp == 0: | |
96 # -2 and +1 for start/end to convert 0 index of python to 1 index of people, -2 on start because feature location saving issue | |
97 parent_match_start = hsp.query_start | |
98 parent_match_end = hsp.query_end | |
99 hit_qualifiers["score"] = hsp.expect | |
100 # generate qualifiers to be added to gff3 feature | |
101 hit_qualifiers["score"] = min(hit_qualifiers["score"], hsp.expect) | |
102 hsp_qualifiers = { | |
103 "ID": "b2g.%s.%s.hsp%s" % (idx_record, idx_hit, idx_hsp), | |
104 "source": "blast", | |
105 "score": hsp.expect, | |
106 "accession": hit.accession, | |
107 "hit_id": clean_string(hit.hit_id), | |
108 "length": hit.length, | |
109 "hit_titles": clean_slist(hit.title.split(" >")), | |
110 } | |
111 if include_seq: | |
112 if ( | |
113 "blast_qseq", | |
114 "blast_sseq", | |
115 "blast_mseq", | |
116 ) in hit_qualifiers.keys(): | |
117 hit_qualifiers.update( | |
118 { | |
119 "blast_qseq": hit_qualifiers["blast_qseq"] + hsp.query, | |
120 "blast_sseq": hit_qualifiers["blast_sseq"] + hsp.sbjct, | |
121 "blast_mseq": hit_qualifiers["blast_mseq"] + hsp.match, | |
122 } | |
123 ) | |
124 else: | |
125 hit_qualifiers.update( | |
126 { | |
127 "blast_qseq": hsp.query, | |
128 "blast_sseq": hsp.sbjct, | |
129 "blast_mseq": hsp.match, | |
130 } | |
131 ) | |
132 for prop in ( | |
133 "score", | |
134 "bits", | |
135 "identities", | |
136 "positives", | |
137 "gaps", | |
138 "align_length", | |
139 "strand", | |
140 "frame", | |
141 "query_start", | |
142 "query_end", | |
143 "sbjct_start", | |
144 "sbjct_end", | |
145 ): | |
146 hsp_qualifiers["blast_" + prop] = getattr(hsp, prop, None) | |
147 | |
148 # check if parent boundary needs to increase to envelope hsp | |
149 # if hsp.query_start < parent_match_start: | |
150 # parent_match_start = hsp.query_start - 1 | |
151 # if hsp.query_end > parent_match_end: | |
152 # parent_match_end = hsp.query_end + 1 | |
153 | |
154 parent_match_start, parent_match_end = check_bounds( | |
155 parent_match_start, parent_match_end, hsp.query_start, hsp.query_end | |
156 ) | |
157 | |
158 # add hsp to the gff3 feature as a "match_part" | |
159 sub_features.append( | |
160 gffSeqFeature( | |
161 FeatureLocation(hsp.query_start - 1, hsp.query_end), | |
162 type="match_part", | |
163 strand=0, | |
164 qualifiers=copy.deepcopy(hsp_qualifiers), | |
165 ) | |
166 ) | |
167 | |
168 # Build the top level seq feature for the hit | |
169 hit_qualifiers["description"] = "Residue %s..%s hit to %s" % (parent_match_start, parent_match_end, desc,) | |
170 top_feature = gffSeqFeature( | |
171 FeatureLocation(parent_match_start - 1, parent_match_end), | |
172 type=match_type, | |
173 strand=0, | |
174 qualifiers=hit_qualifiers, | |
175 ) | |
176 # add the generated subfeature hsp match_parts to the hit feature | |
177 top_feature.sub_features = copy.deepcopy( | |
178 sorted(sub_features, key=lambda x: int(x.location.start)) | |
179 ) | |
180 # Add the hit feature to the record | |
181 rec.features.append(top_feature) | |
182 rec.annotations = {} | |
183 collected_records.append(rec) | |
184 | |
185 if not len(collected_records): | |
186 print("##gff-version 3\n##sequence-region null 1 4") | |
187 | |
188 for rec in collected_records: | |
189 yield rec | |
190 | |
191 | |
192 def combine_records(records): | |
193 # Go through each record and identify those records with | |
194 cleaned_records = {} | |
195 for rec in records: | |
196 combo_id = ( | |
197 rec.features[0].qualifiers["target"] | |
198 + rec.features[0].qualifiers["accession"] | |
199 ) | |
200 if combo_id not in cleaned_records.keys(): | |
201 # First instance of a query ID + subject ID combination | |
202 # Save this record as it's only item | |
203 newid = rec.features[0].qualifiers["ID"] + ".0" | |
204 rec.features[0].qualifiers["ID"] = newid | |
205 rec.features[0].sub_features[0].qualifiers["ID"] = newid + ".hsp0" | |
206 cleaned_records[combo_id] = rec | |
207 else: | |
208 # Query ID + Subject ID has appeared before | |
209 # Combine the Match Parts as subfeatures | |
210 sub_features = copy.deepcopy( | |
211 cleaned_records[combo_id].features[0].sub_features | |
212 ) | |
213 addtnl_features = rec.features[0].sub_features | |
214 # add the current records sub features to the ones previous | |
215 for feat in addtnl_features: | |
216 sub_features.append(feat) | |
217 cleaned_records[combo_id].features[0].subfeatures = copy.deepcopy( | |
218 sub_features | |
219 ) | |
220 cleaned_records[combo_id].features[0].qualifiers["score"] = min(cleaned_records[combo_id].features[0].qualifiers["score"], rec.features[0].qualifiers["score"]) | |
221 # now we need to update the IDs for the features when combined | |
222 # sort them into the proper order, then apply new ids | |
223 # and also ensure the parent record boundaries fit the whole span of subfeatures | |
224 sub_features = sorted(sub_features, key=lambda x: int(x.location.start)) | |
225 new_parent_start = cleaned_records[combo_id].features[0].location.start + 1 | |
226 new_parent_end = cleaned_records[combo_id].features[0].location.end | |
227 for idx, feat in enumerate(sub_features): | |
228 feat.qualifiers["ID"] = "%s.hsp%s" % ( | |
229 cleaned_records[combo_id].features[0].qualifiers["ID"], | |
230 idx, | |
231 ) | |
232 new_parent_start, new_parent_end = check_bounds( | |
233 new_parent_start, | |
234 new_parent_end, | |
235 feat.location.start + 1, | |
236 feat.location.end, | |
237 ) | |
238 cleaned_records[combo_id].features[0].qualifiers["score"] = min(cleaned_records[combo_id].features[0].qualifiers["score"], feat.qualifiers["blast_score"]) | |
239 # if feat.location.start < new_parent_start: | |
240 # new_parent_start = feat.location.start - 1 | |
241 # if feat.location.end > new_parent_end: | |
242 # new_parent_end = feat.location.end + 1 | |
243 cleaned_records[combo_id].features[0].location = FeatureLocation( | |
244 new_parent_start - 1, new_parent_end | |
245 ) | |
246 cleaned_records[combo_id].features[0].qualifiers[ | |
247 "description" | |
248 ] = "Residue %s..%s hit to %s" % ( | |
249 new_parent_start, | |
250 new_parent_end, | |
251 cleaned_records[combo_id].features[0].qualifiers["Name"], | |
252 ) | |
253 # save the renamed and ordered feature list to record | |
254 cleaned_records[combo_id].features[0].sub_features = copy.deepcopy( | |
255 sub_features | |
256 ) | |
257 return sorted( | |
258 cleaned_records.values(), key=lambda x: int(x.features[0].location.start) | |
259 ) | |
260 | |
261 | |
262 def blasttsv2gff3(blasttsv, include_seq=False): | |
263 | |
264 # http://www.sequenceontology.org/browser/release_2.4/term/SO:0000343 | |
265 # match_type = { # Currently we can only handle BLASTN, BLASTP | |
266 # "BLASTN": "nucleotide_match", | |
267 # "BLASTP": "protein_match", | |
268 # }.get(type, "match") | |
269 match_type = "match" | |
270 | |
271 columns = [ | |
272 "qseqid", # 01 Query Seq-id (ID of your sequence) | |
273 "sseqid", # 02 Subject Seq-id (ID of the database hit) | |
274 "pident", # 03 Percentage of identical matches | |
275 "length", # 04 Alignment length | |
276 "mismatch", # 05 Number of mismatches | |
277 "gapopen", # 06 Number of gap openings | |
278 "qstart", # 07 Start of alignment in query | |
279 "qend", # 08 End of alignment in query | |
280 "sstart", # 09 Start of alignment in subject (database hit) | |
281 "send", # 10 End of alignment in subject (database hit) | |
282 "evalue", # 11 Expectation value (E-value) | |
283 "bitscore", # 12 Bit score | |
284 "sallseqid", # 13 All subject Seq-id(s), separated by a ';' | |
285 "score", # 14 Raw score | |
286 "nident", # 15 Number of identical matches | |
287 "positive", # 16 Number of positive-scoring matches | |
288 "gaps", # 17 Total number of gaps | |
289 "ppos", # 18 Percentage of positive-scoring matches | |
290 "qframe", # 19 Query frame | |
291 "sframe", # 20 Subject frame | |
292 "qseq", # 21 Aligned part of query sequence | |
293 "sseq", # 22 Aligned part of subject sequence | |
294 "qlen", # 23 Query sequence length | |
295 "slen", # 24 Subject sequence length | |
296 "salltitles", # 25 All subject title(s), separated by a '<>' | |
297 ] | |
298 collected_records = [] | |
299 for record_idx, record in enumerate(blasttsv): | |
300 if record.startswith("#"): | |
301 continue | |
302 | |
303 dc = {k: v for (k, v) in zip(columns, (x.strip() for x in record.split("\t")))} | |
304 | |
305 rec = SeqRecord("", id=dc["qseqid"]) | |
306 | |
307 feature_id = "b2g.%s" % (record_idx) | |
308 hit_qualifiers = { | |
309 "ID": feature_id, | |
310 "Name": (dc["salltitles"].split("<>")[0]), | |
311 "description": "Residue {sstart}..{send} hit to {x}".format( | |
312 x=dc["salltitles"].split("<>")[0], **dc | |
313 ), | |
314 "source": "blast", | |
315 "score": dc["evalue"], | |
316 "accession": clean_string(dc["sseqid"]), | |
317 "length": dc["qlen"], | |
318 "hit_titles": clean_slist(dc["salltitles"].split("<>")), | |
319 "target": clean_string(dc["qseqid"]), | |
320 } | |
321 hsp_qualifiers = {"source": "blast"} | |
322 for key in dc.keys(): | |
323 # Add the remaining BLAST info to the GFF qualifiers | |
324 if key in ("salltitles", "sallseqid", "sseqid", "qseqid", "qseq", "sseq",): | |
325 continue | |
326 hsp_qualifiers["blast_%s" % key] = clean_string(dc[key]) | |
327 | |
328 # Below numbers stored as strings, convert to proper form | |
329 for ( | |
330 integer_numerical_key | |
331 ) in "gapopen gaps length mismatch nident positive qend qframe qlen qstart score send sframe slen sstart".split( | |
332 " " | |
333 ): | |
334 dc[integer_numerical_key] = int(dc[integer_numerical_key]) | |
335 | |
336 for float_numerical_key in "bitscore evalue pident ppos".split(" "): | |
337 dc[float_numerical_key] = float(dc[float_numerical_key]) | |
338 | |
339 parent_match_start = dc["qstart"] | |
340 parent_match_end = dc["qend"] | |
341 | |
342 parent_match_start, parent_match_end = check_bounds( | |
343 parent_match_start, parent_match_end, dc["qstart"], dc["qend"] | |
344 ) | |
345 | |
346 # The ``match`` feature will hold one or more ``match_part``s | |
347 top_feature = gffSeqFeature( | |
348 FeatureLocation( | |
349 min(parent_match_start, parent_match_end) - 1, | |
350 max(parent_match_start, parent_match_end), | |
351 ), | |
352 type=match_type, | |
353 strand=0, | |
354 qualifiers=hit_qualifiers, | |
355 ) | |
356 top_feature.sub_features = [] | |
357 # There is a possibility of multiple lines containing the HSPS | |
358 # for the same hit. | |
359 # Unlike the parent feature, ``match_part``s have sources. | |
360 hsp_qualifiers["ID"] = clean_string(dc["sseqid"]) | |
361 match_part_start = dc["qstart"] | |
362 match_part_end = dc["qend"] | |
363 | |
364 top_feature.sub_features.append( | |
365 gffSeqFeature( | |
366 FeatureLocation( | |
367 min(match_part_start, match_part_end) - 1, | |
368 max(match_part_start, match_part_end), | |
369 ), | |
370 type="match_part", | |
371 strand=0, | |
372 qualifiers=copy.deepcopy(hsp_qualifiers), | |
373 ) | |
374 ) | |
375 top_feature.sub_features = sorted( | |
376 top_feature.sub_features, key=lambda x: int(x.location.start) | |
377 ) | |
378 rec.features = [top_feature] | |
379 rec.annotations = {} | |
380 collected_records.append(rec) | |
381 | |
382 collected_records = combine_records(collected_records) | |
383 if not len(collected_records): | |
384 print("##gff-version 3\n##sequence-region null 1 4") | |
385 for rec in collected_records: | |
386 yield rec | |
387 | |
388 | |
389 if __name__ == "__main__": | |
390 parser = argparse.ArgumentParser( | |
391 description="Convert BlastP or BlastN output to GFF3, must provide XML or Tabular output", | |
392 epilog="", | |
393 ) | |
394 parser.add_argument( | |
395 "blast", | |
396 type=argparse.FileType("r"), | |
397 help="Blast XML or 25 Column Tabular Output file", | |
398 ) | |
399 parser.add_argument( | |
400 "--blastxml", action="store_true", help="Process file as Blast XML Output" | |
401 ) | |
402 parser.add_argument( | |
403 "--blasttab", | |
404 action="store_true", | |
405 help="Process file as Blast 25 Column Tabular Output", | |
406 ) | |
407 parser.add_argument( | |
408 "--include_seq", | |
409 action="store_true", | |
410 help="Include sequence, only used for Blast XML", | |
411 ) | |
412 args = parser.parse_args() | |
413 | |
414 for rec in blast2gff3(**vars(args)): | |
415 if len(rec.features): | |
416 gffWrite([rec], sys.stdout) |