Mercurial > repos > yutaka-saito > bsfcall
view bin/maf-cull @ 2:f274c166e738 default tip
remove comments in bsfcall_wrapper.xml
author | yutaka-saito |
---|---|
date | Sun, 19 Apr 2015 23:02:04 +0900 |
parents | 06f8460885ff |
children |
line wrap: on
line source
#! /usr/bin/env python # Read MAF-format alignments. Write them, omitting alignments whose # coordinates in the top-most sequence are contained in those of >= # cullingLimit higher-scoring alignments. # Alignments on opposite strands are not considered to contain each # other. # The alignments must be sorted by sequence name, then strand, then # start coordinate. # This algorithm is not theoretically optimal, but it is simple and # probably fast in practice. Optimal algorithms are described in: # Winnowing sequences from a database search. # Berman P, Zhang Z, Wolf YI, Koonin EV, Miller W. # J Comput Biol. 2000 Feb-Apr;7(1-2):293-302. # (Use a "priority search tree" or an "interval tree".) # Seems to work with Python 2.x, x>=4 import fileinput, itertools, operator, optparse, os, signal, sys # The intervals must have size > 0. def isFresh(oldInterval, newInterval): return oldInterval.end > newInterval.start def freshIntervals(storedIntervals, newInterval): """Yields those storedIntervals that overlap newInterval.""" return [i for i in storedIntervals if isFresh(i, newInterval)] def isDominated(dog, queen): return dog.score < queen.score and dog.end <= queen.end def isWanted(newInterval, storedIntervals, cullingLimit): """Is newInterval dominated by < cullingLimit storedIntervals?""" dominators = (i for i in storedIntervals if isDominated(newInterval, i)) return len(list(dominators)) < cullingLimit # Check that the intervals are sorted by start position, and further # sort them in descending order of score. def sortedIntervals(intervals): oldStart = () for k, v in itertools.groupby(intervals, operator.attrgetter("start")): if k < oldStart: raise Exception("the input is not sorted properly") oldStart = k for i in sorted(v, key=operator.attrgetter("score"), reverse=True): yield i def culledIntervals(intervals, cullingLimit): """Yield intervals contained in < cullingLimit higher-scoring intervals.""" storedIntervals = [] for i in sortedIntervals(intervals): storedIntervals = freshIntervals(storedIntervals, i) if isWanted(i, storedIntervals, cullingLimit): yield i storedIntervals.append(i) class Maf: def __init__(self, lines): self.lines = lines try: aLine = lines[0] aWords = aLine.split() scoreGen = (i for i in aWords if i.startswith("score=")) scoreWord = scoreGen.next() self.score = float(scoreWord.split("=")[1]) except: raise Exception("missing score") try: sLine = lines[1] sWords = sLine.split() seqName = sWords[1] alnStart = int(sWords[2]) alnSize = int(sWords[3]) strand = sWords[4] self.start = seqName, strand, alnStart self.end = seqName, strand, alnStart + alnSize except: raise Exception("can't interpret the MAF data") def mafInput(lines): for k, v in itertools.groupby(lines, str.isspace): if not k: yield Maf(list(v)) def filterComments(lines): for i in lines: if i.startswith("#"): print i, else: yield i def mafCull(opts, args): inputLines = fileinput.input(args) inputMafs = mafInput(filterComments(inputLines)) for maf in culledIntervals(inputMafs, opts.limit): for i in maf.lines: print i, print # blank line after each alignment if __name__ == "__main__": signal.signal(signal.SIGPIPE, signal.SIG_DFL) # avoid silly error message usage = "%prog [options] my-alignments.maf" description = "Cull alignments whose top-sequence coordinates are contained in LIMIT or more higher-scoring alignments." op = optparse.OptionParser(usage=usage, description=description) op.add_option("-l", "--limit", type="int", default=2, help="culling limit (default: %default)") (opts, args) = op.parse_args() if opts.limit < 1: op.error("option -l: should be >= 1") try: mafCull(opts, args) except KeyboardInterrupt: pass # avoid silly error message except Exception, e: prog = os.path.basename(sys.argv[0]) sys.exit(prog + ": error: " + str(e))