comparison blast_html.py @ 13:c2d63adb83db

renamed files
author Jan Kanis <jan.code@jankanis.nl>
date Mon, 12 May 2014 17:13:49 +0200
parents visualise.py@a459c754cdb5
children 0b33898bba45
comparison
equal deleted inserted replaced
12:a459c754cdb5 13:c2d63adb83db
1 #!/usr/bin/env python3
2
3 # Copyright The Hyve B.V. 2014
4 # License: GPL version 3 or higher
5
6 import sys
7 import math
8 import warnings
9 from itertools import repeat
10 import argparse
11 from lxml import objectify
12 import jinja2
13
14
15
16 _filters = {}
17 def filter(func_or_name):
18 "Decorator to register a function as filter in the current jinja environment"
19 if isinstance(func_or_name, str):
20 def inner(func):
21 _filters[func_or_name] = func
22 return func
23 return inner
24 else:
25 _filters[func_or_name.__name__] = func_or_name
26 return func_or_name
27
28
29 def color_idx(length):
30 if length < 40:
31 return 0
32 elif length < 50:
33 return 1
34 elif length < 80:
35 return 2
36 elif length < 200:
37 return 3
38 return 4
39
40 @filter
41 def fmt(val, fmt):
42 return format(float(val), fmt)
43
44 @filter
45 def firsttitle(hit):
46 return hit.Hit_def.text.split('>')[0]
47
48 @filter
49 def othertitles(hit):
50 """Split a hit.Hit_def that contains multiple titles up, splitting out the hit ids from the titles."""
51 id_titles = hit.Hit_def.text.split('>')
52
53 titles = []
54 for t in id_titles[1:]:
55 fullid, title = t.split(' ', 1)
56 hitid, id = fullid.split('|', 2)[1:3]
57 titles.append(dict(id = id,
58 hitid = hitid,
59 fullid = fullid,
60 title = title))
61 return titles
62
63 @filter
64 def hitid(hit):
65 return hit.Hit_id.text.split('|', 2)[1]
66
67 @filter
68 def seqid(hit):
69 return hit.Hit_id.text.split('|', 2)[2]
70
71 @filter
72 def alignment_pre(hsp):
73 return (
74 "Query {:>7s} {} {}\n".format(hsp['Hsp_query-from'], hsp.Hsp_qseq, hsp['Hsp_query-to']) +
75 " {:7s} {}\n".format('', hsp.Hsp_midline) +
76 "Subject{:>7s} {} {}".format(hsp['Hsp_hit-from'], hsp.Hsp_hseq, hsp['Hsp_hit-to'])
77 )
78
79 @filter('len')
80 def hsplen(node):
81 return int(node['Hsp_align-len'])
82
83 @filter
84 def asframe(frame):
85 if frame == 1:
86 return 'Plus'
87 elif frame == -1:
88 return 'Minus'
89 raise Exception("frame should be either +1 or -1")
90
91 def genelink(hit, type='genbank', hsp=None):
92 if not isinstance(hit, str):
93 hit = hitid(hit)
94 link = "http://www.ncbi.nlm.nih.gov/nucleotide/{}?report={}&log$=nuclalign".format(hit, type)
95 if hsp != None:
96 link += "&from={}&to={}".format(hsp['Hsp_hit-from'], hsp['Hsp_hit-to'])
97 return jinja2.Markup(link)
98
99
100
101
102 class BlastVisualize:
103
104 colors = ('black', 'blue', 'green', 'magenta', 'red')
105
106 max_scale_labels = 10
107
108 templatename = 'visualise.html.jinja'
109
110 def __init__(self, input):
111 self.input = input
112
113 self.blast = objectify.parse(self.input).getroot()
114 self.loader = jinja2.FileSystemLoader(searchpath='.')
115 self.environment = jinja2.Environment(loader=self.loader,
116 lstrip_blocks=True, trim_blocks=True, autoescape=True)
117
118 self.environment.filters['color'] = lambda length: match_colors[color_idx(length)]
119
120 for name, filter in _filters.items():
121 self.environment.filters[name] = filter
122
123 self.query_length = int(self.blast["BlastOutput_query-len"])
124 self.hits = self.blast.BlastOutput_iterations.Iteration.Iteration_hits.Hit
125 # sort hits by longest hotspot first
126 self.ordered_hits = sorted(self.hits,
127 key=lambda h: max(hsplen(hsp) for hsp in h.Hit_hsps.Hsp),
128 reverse=True)
129
130 def render(self, output):
131 template = self.environment.get_template(self.templatename)
132
133 params = (('Query ID', self.blast["BlastOutput_query-ID"]),
134 ('Query definition', self.blast["BlastOutput_query-def"]),
135 ('Query length', self.blast["BlastOutput_query-len"]),
136 ('Program', self.blast.BlastOutput_version),
137 ('Database', self.blast.BlastOutput_db),
138 )
139
140 if len(self.blast.BlastOutput_iterations.Iteration) > 1:
141 warnings.warn("Multiple 'Iteration' elements found, showing only the first")
142
143 output.write(template.render(blast=self.blast,
144 length=self.query_length,
145 hits=self.blast.BlastOutput_iterations.Iteration.Iteration_hits.Hit,
146 colors=self.colors,
147 match_colors=self.match_colors(),
148 queryscale=self.queryscale(),
149 hit_info=self.hit_info(),
150 genelink=genelink,
151 params=params))
152
153
154 def match_colors(self):
155 """
156 An iterator that yields lists of length-color pairs.
157 """
158
159 percent_multiplier = 100 / self.query_length
160
161 for hit in self.hits:
162 # sort hotspots from short to long, so we can overwrite index colors of
163 # short matches with those of long ones.
164 hotspots = sorted(hit.Hit_hsps.Hsp, key=lambda hsp: hsplen(hsp))
165 table = bytearray([255]) * self.query_length
166 for hsp in hotspots:
167 frm = hsp['Hsp_query-from'] - 1
168 to = int(hsp['Hsp_query-to'])
169 table[frm:to] = repeat(color_idx(hsplen(hsp)), to - frm)
170
171 matches = []
172 last = table[0]
173 count = 0
174 for i in range(self.query_length):
175 if table[i] == last:
176 count += 1
177 continue
178 matches.append((count * percent_multiplier, self.colors[last] if last != 255 else 'none'))
179 last = table[i]
180 count = 1
181 matches.append((count * percent_multiplier, self.colors[last] if last != 255 else 'none'))
182
183 yield dict(colors=matches, link="#hit"+hit.Hit_num.text, defline=firsttitle(hit))
184
185
186 def queryscale(self):
187 skip = math.ceil(self.query_length / self.max_scale_labels)
188 percent_multiplier = 100 / self.query_length
189 for i in range(1, self.query_length+1):
190 if i % skip == 0:
191 yield dict(label = i, width = skip * percent_multiplier)
192 if self.query_length % skip != 0:
193 yield dict(label = self.query_length, width = (self.query_length % skip) * percent_multiplier)
194
195
196 def hit_info(self):
197
198 for hit in self.ordered_hits:
199 hsps = hit.Hit_hsps.Hsp
200
201 cover = [False] * self.query_length
202 for hsp in hsps:
203 cover[hsp['Hsp_query-from']-1 : int(hsp['Hsp_query-to'])] = repeat(True, hsplen(hsp))
204 cover_count = cover.count(True)
205
206 def hsp_val(path):
207 return (float(hsp[path]) for hsp in hsps)
208
209 yield dict(hit = hit,
210 title = firsttitle(hit),
211 link_id = hit.Hit_num,
212 maxscore = "{:.1f}".format(max(hsp_val('Hsp_bit-score'))),
213 totalscore = "{:.1f}".format(sum(hsp_val('Hsp_bit-score'))),
214 cover = "{:.0%}".format(cover_count / self.query_length),
215 e_value = "{:.4g}".format(min(hsp_val('Hsp_evalue'))),
216 # FIXME: is this the correct formula vv?
217 ident = "{:.0%}".format(float(min(hsp.Hsp_identity / hsplen(hsp) for hsp in hsps))),
218 accession = hit.Hit_accession)
219
220
221 def main():
222
223 parser = argparse.ArgumentParser(description="Convert a BLAST XML result into a nicely readable html page",
224 usage="{} [-i] INPUT [-o OUTPUT]".format(sys.argv[0]))
225 input_group = parser.add_mutually_exclusive_group(required=True)
226 input_group.add_argument('positional_arg', metavar='INPUT', nargs='?', type=argparse.FileType(mode='r'),
227 help='The input Blast XML file, same as -i/--input')
228 input_group.add_argument('-i', '--input', type=argparse.FileType(mode='r'),
229 help='The input Blast XML file')
230 parser.add_argument('-o', '--output', type=argparse.FileType(mode='w'), default=sys.stdout,
231 help='The output html file')
232
233 args = parser.parse_args()
234 if args.input == None:
235 args.input = args.positional_arg
236 if args.input == None:
237 parser.error('no input specified')
238
239 b = BlastVisualize(args.input)
240 b.render(args.output)
241
242
243 if __name__ == '__main__':
244 main()
245