Mercurial > repos > yutaka-saito > bsfcall
view bin/maf-swap @ 0:06f8460885ff
migrate from GitHub
author | yutaka-saito |
---|---|
date | Sun, 19 Apr 2015 20:51:13 +0900 |
parents | |
children |
line wrap: on
line source
#! /usr/bin/env python # Read MAF-format alignments, and write them, after moving the Nth # sequence to the top in each alignment. # Before writing, if the top sequence would be on the - strand, then # flip all the strands. But don't do this if the top sequence is # translated DNA. # Seems to work with Python 2.x, x>=4 import fileinput, itertools, optparse, os, signal, string, sys def filterComments(lines): for i in lines: if i.startswith("#"): print i, else: yield i def mafInput(lines): for k, v in itertools.groupby(lines, str.isspace): if not k: yield list(v) def indexOfNthSequence(mafLines, n): for i, line in enumerate(mafLines): if line.startswith("s"): if n == 1: return i n -= 1 raise Exception("encountered an alignment with too few sequences") def rangeOfNthSequence(mafLines, n): """Get the range of lines associated with the Nth sequence.""" start = indexOfNthSequence(mafLines, n) stop = start + 1 while stop < len(mafLines): line = mafLines[stop] if not (line.startswith("q") or line.startswith("i")): break stop += 1 return start, stop complement = string.maketrans('ACGTNSWRYKMBDHVacgtnswrykmbdhv', 'TGCANSWYRMKVHDBtgcanswyrmkvhdb') # doesn't handle "U" in RNA sequences def revcomp(seq): return seq[::-1].translate(complement) def flippedMafS(words): alnStart = int(words[2]) alnSize = int(words[3]) strand = words[4] seqSize = int(words[5]) alnString = words[6] newStart = seqSize - alnStart - alnSize if strand == "-": newStrand = "+" else: newStrand = "-" newString = revcomp(alnString) out = words[0], words[1], newStart, alnSize, newStrand, seqSize, newString return map(str, out) def flippedMafP(words): flippedString = words[1][::-1] return words[:1] + [flippedString] def flippedMafQ(words): qualityString = words[2] flippedString = qualityString[::-1] return words[:2] + [flippedString] def flippedMafLine(mafLine): words = mafLine.split() if words[0] == "s": return flippedMafS(words) elif words[0] == "p": return flippedMafP(words) elif words[0] == "q": return flippedMafQ(words) else: return words def maxlen(s): return max(map(len, s)) def sLineFieldWidths(mafLines): sLines = (i for i in mafLines if i[0] == "s") sColumns = zip(*sLines) return map(maxlen, sColumns) def joinedMafS(words, fieldWidths): formatParams = itertools.chain(*zip(fieldWidths, words)) return "%*s %-*s %*s %*s %*s %*s %*s\n" % tuple(formatParams) def joinedMafLine(words, fieldWidths): if words[0] == "s": return joinedMafS(words, fieldWidths) elif words[0] == "q": words = words[:2] + [""] * 4 + words[2:] return joinedMafS(words, fieldWidths) elif words[0] == "p": words = words[:1] + [""] * 5 + words[1:] return joinedMafS(words, fieldWidths) else: return " ".join(words) + "\n" def flippedMaf(mafLines): flippedLines = map(flippedMafLine, mafLines) fieldWidths = sLineFieldWidths(flippedLines) return (joinedMafLine(i, fieldWidths) for i in flippedLines) def isCanonicalStrand(mafLine): words = mafLine.split() strand = words[4] if strand == "+": return True alnString = words[6] if "/" in alnString or "\\" in alnString: return True # frameshifts alnSize = int(words[3]) gapCount = alnString.count("-") if len(alnString) - gapCount < alnSize: return True # translated DNA return False def mafSwap(opts, args): inputLines = fileinput.input(args) for mafLines in mafInput(filterComments(inputLines)): start, stop = rangeOfNthSequence(mafLines, opts.n) mafLines[1:stop] = mafLines[start:stop] + mafLines[1:start] if not isCanonicalStrand(mafLines[1]): mafLines = flippedMaf(mafLines) for i in mafLines: print i, print # blank line after each alignment if __name__ == "__main__": signal.signal(signal.SIGPIPE, signal.SIG_DFL) # avoid silly error message usage = "%prog [options] my-alignments.maf" description = "Change the order of sequences in MAF-format alignments." op = optparse.OptionParser(usage=usage, description=description) op.add_option("-n", type="int", default=2, help="move the Nth sequence to the top (default: %default)") (opts, args) = op.parse_args() if opts.n < 1: op.error("option -n: should be >= 1") try: mafSwap(opts, args) except KeyboardInterrupt: pass # avoid silly error message except Exception, e: prog = os.path.basename(sys.argv[0]) sys.exit(prog + ": error: " + str(e))