view blast_html.py @ 20:53cd304c5f26

Add index for multiple results; fix layout of query ruler for edge case The query ruler did not layout nicely if the last segment was too short to contain the text. For very short last segments the text is now displayed after the ruler.
author Jan Kanis <jan.code@jankanis.nl>
date Wed, 14 May 2014 16:20:29 +0200
parents 67ddcb807b7d
children
line wrap: on
line source

#!/usr/bin/env python3

# Copyright The Hyve B.V. 2014
# License: GPL version 3 or higher

import sys
import math
import warnings
from os import path
from itertools import repeat
import argparse
from lxml import objectify
import jinja2



_filters = {}
def filter(func_or_name):
    "Decorator to register a function as filter in the current jinja environment"
    if isinstance(func_or_name, str):
        def inner(func):
            _filters[func_or_name] = func.__name__
            return func
        return inner
    else:
        _filters[func_or_name.__name__] = func_or_name.__name__
        return func_or_name


def color_idx(length):
    if length < 40:
        return 0
    elif length < 50:
        return 1
    elif length < 80:
        return 2
    elif length < 200:
        return 3
    return 4

@filter
def fmt(val, fmt):
    return format(float(val), fmt)

@filter
def firsttitle(hit):
    return hit.Hit_def.text.split('>')[0]

@filter
def othertitles(hit):
    """Split a hit.Hit_def that contains multiple titles up, splitting out the hit ids from the titles."""
    id_titles = hit.Hit_def.text.split('>')

    titles = []
    for t in id_titles[1:]:
        fullid, title = t.split(' ', 1)
        hitid, id = fullid.split('|', 2)[1:3]
        titles.append(dict(id = id,
                           hitid = hitid,
                           fullid = fullid,
                           title = title))
    return titles

@filter
def hitid(hit):
    return hit.Hit_id.text.split('|', 2)[1]

@filter
def seqid(hit):
    return hit.Hit_id.text.split('|', 2)[2]

@filter
def alignment_pre(hsp):
    return (
        "Query  {:>7s}  {}  {}\n".format(hsp['Hsp_query-from'].text, hsp.Hsp_qseq, hsp['Hsp_query-to']) +
        "       {:7s}  {}\n".format('', hsp.Hsp_midline) +
        "Subject{:>7s}  {}  {}".format(hsp['Hsp_hit-from'].text, hsp.Hsp_hseq, hsp['Hsp_hit-to'])
    )

@filter('len')
def blastxml_len(node):
    if node.tag == 'Hsp':
        return int(node['Hsp_align-len'])
    elif node.tag == 'Iteration':
        return int(node['Iteration_query-len'])
    raise Exception("Unknown XML node type: "+node.tag)
        

@filter
def asframe(frame):
    if frame == 1:
        return 'Plus'
    elif frame == -1:
        return 'Minus'
    raise Exception("frame should be either +1 or -1")

def genelink(hit, type='genbank', hsp=None):
    if not isinstance(hit, str):
        hit = hitid(hit)
    link = "http://www.ncbi.nlm.nih.gov/nucleotide/{}?report={}&log$=nuclalign".format(hit, type)
    if hsp != None:
        link += "&from={}&to={}".format(hsp['Hsp_hit-from'], hsp['Hsp_hit-to'])
    return link


# javascript escape filter based on Django's, from https://github.com/dsissitka/khan-website/blob/master/templatefilters.py#L112-139
# I've removed the html escapes, since html escaping is already being performed by the template engine.

_base_js_escapes = (
    ('\\', r'\u005C'),
    ('\'', r'\u0027'),
    ('"', r'\u0022'),
    # ('>', r'\u003E'),
    # ('<', r'\u003C'),
    # ('&', r'\u0026'),
    # ('=', r'\u003D'),
    # ('-', r'\u002D'),
    # (';', r'\u003B'),
    # (u'\u2028', r'\u2028'),
    # (u'\u2029', r'\u2029')
)

# Escape every ASCII character with a value less than 32. This is
# needed a.o. to prevent html parsers from jumping out of javascript
# parsing mode.
_js_escapes = (_base_js_escapes +
               tuple(('%c' % z, '\\u%04X' % z) for z in range(32)))

@filter
def js_string_escape(value):
    """Escape javascript string literal escapes. Note that this only works
    within javascript string literals, not in general javascript
    snippets."""

    value = str(value)

    for bad, good in _js_escapes:
        value = value.replace(bad, good)

    return value

@filter
def hits(result):
    # sort hits by longest hotspot first
    return sorted(result.Iteration_hits.findall('Hit'),
                  key=lambda h: max(blastxml_len(hsp) for hsp in h.Hit_hsps.Hsp),
                  reverse=True)



class BlastVisualize:

    colors = ('black', 'blue', 'green', 'magenta', 'red')

    max_scale_labels = 10

    def __init__(self, input, templatedir, templatename):
        self.input = input
        self.templatename = templatename

        self.blast = objectify.parse(self.input).getroot()
        self.loader = jinja2.FileSystemLoader(searchpath=templatedir)
        self.environment = jinja2.Environment(loader=self.loader,
                                              lstrip_blocks=True, trim_blocks=True, autoescape=True)

        self._addfilters(self.environment)


    def _addfilters(self, environment):
        for filtername, funcname in _filters.items():
            try:
                environment.filters[filtername] = getattr(self, funcname)
            except AttributeError:
                environment.filters[filtername] = globals()[funcname]

    def render(self, output):
        template = self.environment.get_template(self.templatename)

        params = (('Query ID', self.blast["BlastOutput_query-ID"]),
                  ('Query definition', self.blast["BlastOutput_query-def"]),
                  ('Query length', self.blast["BlastOutput_query-len"]),
                  ('Program', self.blast.BlastOutput_version),
                  ('Database', self.blast.BlastOutput_db),
        )

        output.write(template.render(blast=self.blast,
                                     iterations=self.blast.BlastOutput_iterations.Iteration,
                                     colors=self.colors,
                                     # match_colors=self.match_colors(),
                                     # hit_info=self.hit_info(),
                                     genelink=genelink,
                                     params=params))

    @filter
    def match_colors(self, result):
        """
        An iterator that yields lists of length-color pairs. 
        """

        query_length = blastxml_len(result)
        
        percent_multiplier = 100 / query_length

        for hit in hits(result):
            # sort hotspots from short to long, so we can overwrite index colors of
            # short matches with those of long ones.
            hotspots = sorted(hit.Hit_hsps.Hsp, key=lambda hsp: blastxml_len(hsp))
            table = bytearray([255]) * query_length
            for hsp in hotspots:
                frm = hsp['Hsp_query-from'] - 1
                to = int(hsp['Hsp_query-to'])
                table[frm:to] = repeat(color_idx(blastxml_len(hsp)), to - frm)

            matches = []
            last = table[0]
            count = 0
            for i in range(query_length):
                if table[i] == last:
                    count += 1
                    continue
                matches.append((count * percent_multiplier, self.colors[last] if last != 255 else 'transparent'))
                last = table[i]
                count = 1
            matches.append((count * percent_multiplier, self.colors[last] if last != 255 else 'transparent'))

            yield dict(colors=matches, link="#hit"+hit.Hit_num.text, defline=firsttitle(hit))

    @filter
    def queryscale(self, result):
        query_length = blastxml_len(result)
        skip = math.ceil(query_length / self.max_scale_labels)
        percent_multiplier = 100 / query_length
        for i in range(1, query_length+1):
            if i % skip == 0:
                yield dict(label = i, width = skip * percent_multiplier, shorter = False)
        if query_length % skip != 0:
            yield dict(label = query_length,
                       width = (query_length % skip) * percent_multiplier,
                       shorter = True)

    @filter
    def hit_info(self, result):

        query_length = blastxml_len(result)

        for hit in hits(result):
            hsps = hit.Hit_hsps.Hsp

            cover = [False] * query_length
            for hsp in hsps:
                cover[hsp['Hsp_query-from']-1 : int(hsp['Hsp_query-to'])] = repeat(True, blastxml_len(hsp))
            cover_count = cover.count(True)

            def hsp_val(path):
                return (float(hsp[path]) for hsp in hsps)

            yield dict(hit = hit,
                       title = firsttitle(hit),
                       link_id = hit.Hit_num,
                       maxscore = "{:.1f}".format(max(hsp_val('Hsp_bit-score'))),
                       totalscore = "{:.1f}".format(sum(hsp_val('Hsp_bit-score'))),
                       cover = "{:.0%}".format(cover_count / query_length),
                       e_value = "{:.4g}".format(min(hsp_val('Hsp_evalue'))),
                       # FIXME: is this the correct formula vv?
                       ident = "{:.0%}".format(float(min(hsp.Hsp_identity / blastxml_len(hsp) for hsp in hsps))),
                       accession = hit.Hit_accession)

def main():

    parser = argparse.ArgumentParser(description="Convert a BLAST XML result into a nicely readable html page",
                                     usage="{} [-i] INPUT [-o OUTPUT]".format(sys.argv[0]))
    input_group = parser.add_mutually_exclusive_group(required=True)
    input_group.add_argument('positional_arg', metavar='INPUT', nargs='?', type=argparse.FileType(mode='r'),
                             help='The input Blast XML file, same as -i/--input')
    input_group.add_argument('-i', '--input', type=argparse.FileType(mode='r'), 
                             help='The input Blast XML file')
    parser.add_argument('-o', '--output', type=argparse.FileType(mode='w'), default=sys.stdout,
                        help='The output html file')
    # We just want the file name here, so jinja can open the file
    # itself. But it is easier to just use a FileType so argparse can
    # handle the errors. This introduces a small race condition when
    # jinja later tries to re-open the template file, but we don't
    # care too much.
    parser.add_argument('--template', type=argparse.FileType(mode='r'), default='blast_html.html.jinja',
                        help='The template file to use. Defaults to blast_html.html.jinja')

    args = parser.parse_args()
    if args.input == None:
        args.input = args.positional_arg
    if args.input == None:
        parser.error('no input specified')

    templatedir, templatename = path.split(args.template.name)
    args.template.close()
    if not templatedir:
        templatedir = '.'

    b = BlastVisualize(args.input, templatedir, templatename)
    b.render(args.output)


if __name__ == '__main__':
    main()