Mercurial > repos > jankanis > blast2html
comparison blast_html.py @ 13:c2d63adb83db
renamed files
author | Jan Kanis <jan.code@jankanis.nl> |
---|---|
date | Mon, 12 May 2014 17:13:49 +0200 |
parents | visualise.py@a459c754cdb5 |
children | 0b33898bba45 |
comparison
equal
deleted
inserted
replaced
12:a459c754cdb5 | 13:c2d63adb83db |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Copyright The Hyve B.V. 2014 | |
4 # License: GPL version 3 or higher | |
5 | |
6 import sys | |
7 import math | |
8 import warnings | |
9 from itertools import repeat | |
10 import argparse | |
11 from lxml import objectify | |
12 import jinja2 | |
13 | |
14 | |
15 | |
16 _filters = {} | |
17 def filter(func_or_name): | |
18 "Decorator to register a function as filter in the current jinja environment" | |
19 if isinstance(func_or_name, str): | |
20 def inner(func): | |
21 _filters[func_or_name] = func | |
22 return func | |
23 return inner | |
24 else: | |
25 _filters[func_or_name.__name__] = func_or_name | |
26 return func_or_name | |
27 | |
28 | |
29 def color_idx(length): | |
30 if length < 40: | |
31 return 0 | |
32 elif length < 50: | |
33 return 1 | |
34 elif length < 80: | |
35 return 2 | |
36 elif length < 200: | |
37 return 3 | |
38 return 4 | |
39 | |
40 @filter | |
41 def fmt(val, fmt): | |
42 return format(float(val), fmt) | |
43 | |
44 @filter | |
45 def firsttitle(hit): | |
46 return hit.Hit_def.text.split('>')[0] | |
47 | |
48 @filter | |
49 def othertitles(hit): | |
50 """Split a hit.Hit_def that contains multiple titles up, splitting out the hit ids from the titles.""" | |
51 id_titles = hit.Hit_def.text.split('>') | |
52 | |
53 titles = [] | |
54 for t in id_titles[1:]: | |
55 fullid, title = t.split(' ', 1) | |
56 hitid, id = fullid.split('|', 2)[1:3] | |
57 titles.append(dict(id = id, | |
58 hitid = hitid, | |
59 fullid = fullid, | |
60 title = title)) | |
61 return titles | |
62 | |
63 @filter | |
64 def hitid(hit): | |
65 return hit.Hit_id.text.split('|', 2)[1] | |
66 | |
67 @filter | |
68 def seqid(hit): | |
69 return hit.Hit_id.text.split('|', 2)[2] | |
70 | |
71 @filter | |
72 def alignment_pre(hsp): | |
73 return ( | |
74 "Query {:>7s} {} {}\n".format(hsp['Hsp_query-from'], hsp.Hsp_qseq, hsp['Hsp_query-to']) + | |
75 " {:7s} {}\n".format('', hsp.Hsp_midline) + | |
76 "Subject{:>7s} {} {}".format(hsp['Hsp_hit-from'], hsp.Hsp_hseq, hsp['Hsp_hit-to']) | |
77 ) | |
78 | |
79 @filter('len') | |
80 def hsplen(node): | |
81 return int(node['Hsp_align-len']) | |
82 | |
83 @filter | |
84 def asframe(frame): | |
85 if frame == 1: | |
86 return 'Plus' | |
87 elif frame == -1: | |
88 return 'Minus' | |
89 raise Exception("frame should be either +1 or -1") | |
90 | |
91 def genelink(hit, type='genbank', hsp=None): | |
92 if not isinstance(hit, str): | |
93 hit = hitid(hit) | |
94 link = "http://www.ncbi.nlm.nih.gov/nucleotide/{}?report={}&log$=nuclalign".format(hit, type) | |
95 if hsp != None: | |
96 link += "&from={}&to={}".format(hsp['Hsp_hit-from'], hsp['Hsp_hit-to']) | |
97 return jinja2.Markup(link) | |
98 | |
99 | |
100 | |
101 | |
102 class BlastVisualize: | |
103 | |
104 colors = ('black', 'blue', 'green', 'magenta', 'red') | |
105 | |
106 max_scale_labels = 10 | |
107 | |
108 templatename = 'visualise.html.jinja' | |
109 | |
110 def __init__(self, input): | |
111 self.input = input | |
112 | |
113 self.blast = objectify.parse(self.input).getroot() | |
114 self.loader = jinja2.FileSystemLoader(searchpath='.') | |
115 self.environment = jinja2.Environment(loader=self.loader, | |
116 lstrip_blocks=True, trim_blocks=True, autoescape=True) | |
117 | |
118 self.environment.filters['color'] = lambda length: match_colors[color_idx(length)] | |
119 | |
120 for name, filter in _filters.items(): | |
121 self.environment.filters[name] = filter | |
122 | |
123 self.query_length = int(self.blast["BlastOutput_query-len"]) | |
124 self.hits = self.blast.BlastOutput_iterations.Iteration.Iteration_hits.Hit | |
125 # sort hits by longest hotspot first | |
126 self.ordered_hits = sorted(self.hits, | |
127 key=lambda h: max(hsplen(hsp) for hsp in h.Hit_hsps.Hsp), | |
128 reverse=True) | |
129 | |
130 def render(self, output): | |
131 template = self.environment.get_template(self.templatename) | |
132 | |
133 params = (('Query ID', self.blast["BlastOutput_query-ID"]), | |
134 ('Query definition', self.blast["BlastOutput_query-def"]), | |
135 ('Query length', self.blast["BlastOutput_query-len"]), | |
136 ('Program', self.blast.BlastOutput_version), | |
137 ('Database', self.blast.BlastOutput_db), | |
138 ) | |
139 | |
140 if len(self.blast.BlastOutput_iterations.Iteration) > 1: | |
141 warnings.warn("Multiple 'Iteration' elements found, showing only the first") | |
142 | |
143 output.write(template.render(blast=self.blast, | |
144 length=self.query_length, | |
145 hits=self.blast.BlastOutput_iterations.Iteration.Iteration_hits.Hit, | |
146 colors=self.colors, | |
147 match_colors=self.match_colors(), | |
148 queryscale=self.queryscale(), | |
149 hit_info=self.hit_info(), | |
150 genelink=genelink, | |
151 params=params)) | |
152 | |
153 | |
154 def match_colors(self): | |
155 """ | |
156 An iterator that yields lists of length-color pairs. | |
157 """ | |
158 | |
159 percent_multiplier = 100 / self.query_length | |
160 | |
161 for hit in self.hits: | |
162 # sort hotspots from short to long, so we can overwrite index colors of | |
163 # short matches with those of long ones. | |
164 hotspots = sorted(hit.Hit_hsps.Hsp, key=lambda hsp: hsplen(hsp)) | |
165 table = bytearray([255]) * self.query_length | |
166 for hsp in hotspots: | |
167 frm = hsp['Hsp_query-from'] - 1 | |
168 to = int(hsp['Hsp_query-to']) | |
169 table[frm:to] = repeat(color_idx(hsplen(hsp)), to - frm) | |
170 | |
171 matches = [] | |
172 last = table[0] | |
173 count = 0 | |
174 for i in range(self.query_length): | |
175 if table[i] == last: | |
176 count += 1 | |
177 continue | |
178 matches.append((count * percent_multiplier, self.colors[last] if last != 255 else 'none')) | |
179 last = table[i] | |
180 count = 1 | |
181 matches.append((count * percent_multiplier, self.colors[last] if last != 255 else 'none')) | |
182 | |
183 yield dict(colors=matches, link="#hit"+hit.Hit_num.text, defline=firsttitle(hit)) | |
184 | |
185 | |
186 def queryscale(self): | |
187 skip = math.ceil(self.query_length / self.max_scale_labels) | |
188 percent_multiplier = 100 / self.query_length | |
189 for i in range(1, self.query_length+1): | |
190 if i % skip == 0: | |
191 yield dict(label = i, width = skip * percent_multiplier) | |
192 if self.query_length % skip != 0: | |
193 yield dict(label = self.query_length, width = (self.query_length % skip) * percent_multiplier) | |
194 | |
195 | |
196 def hit_info(self): | |
197 | |
198 for hit in self.ordered_hits: | |
199 hsps = hit.Hit_hsps.Hsp | |
200 | |
201 cover = [False] * self.query_length | |
202 for hsp in hsps: | |
203 cover[hsp['Hsp_query-from']-1 : int(hsp['Hsp_query-to'])] = repeat(True, hsplen(hsp)) | |
204 cover_count = cover.count(True) | |
205 | |
206 def hsp_val(path): | |
207 return (float(hsp[path]) for hsp in hsps) | |
208 | |
209 yield dict(hit = hit, | |
210 title = firsttitle(hit), | |
211 link_id = hit.Hit_num, | |
212 maxscore = "{:.1f}".format(max(hsp_val('Hsp_bit-score'))), | |
213 totalscore = "{:.1f}".format(sum(hsp_val('Hsp_bit-score'))), | |
214 cover = "{:.0%}".format(cover_count / self.query_length), | |
215 e_value = "{:.4g}".format(min(hsp_val('Hsp_evalue'))), | |
216 # FIXME: is this the correct formula vv? | |
217 ident = "{:.0%}".format(float(min(hsp.Hsp_identity / hsplen(hsp) for hsp in hsps))), | |
218 accession = hit.Hit_accession) | |
219 | |
220 | |
221 def main(): | |
222 | |
223 parser = argparse.ArgumentParser(description="Convert a BLAST XML result into a nicely readable html page", | |
224 usage="{} [-i] INPUT [-o OUTPUT]".format(sys.argv[0])) | |
225 input_group = parser.add_mutually_exclusive_group(required=True) | |
226 input_group.add_argument('positional_arg', metavar='INPUT', nargs='?', type=argparse.FileType(mode='r'), | |
227 help='The input Blast XML file, same as -i/--input') | |
228 input_group.add_argument('-i', '--input', type=argparse.FileType(mode='r'), | |
229 help='The input Blast XML file') | |
230 parser.add_argument('-o', '--output', type=argparse.FileType(mode='w'), default=sys.stdout, | |
231 help='The output html file') | |
232 | |
233 args = parser.parse_args() | |
234 if args.input == None: | |
235 args.input = args.positional_arg | |
236 if args.input == None: | |
237 parser.error('no input specified') | |
238 | |
239 b = BlastVisualize(args.input) | |
240 b.render(args.output) | |
241 | |
242 | |
243 if __name__ == '__main__': | |
244 main() | |
245 |