Mercurial > repos > yutaka-saito > bsfcall
comparison bin/maf-swap @ 0:06f8460885ff
migrate from GitHub
author | yutaka-saito |
---|---|
date | Sun, 19 Apr 2015 20:51:13 +0900 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:06f8460885ff |
---|---|
1 #! /usr/bin/env python | |
2 | |
3 # Read MAF-format alignments, and write them, after moving the Nth | |
4 # sequence to the top in each alignment. | |
5 | |
6 # Before writing, if the top sequence would be on the - strand, then | |
7 # flip all the strands. But don't do this if the top sequence is | |
8 # translated DNA. | |
9 | |
10 # Seems to work with Python 2.x, x>=4 | |
11 | |
12 import fileinput, itertools, optparse, os, signal, string, sys | |
13 | |
14 def filterComments(lines): | |
15 for i in lines: | |
16 if i.startswith("#"): print i, | |
17 else: yield i | |
18 | |
19 def mafInput(lines): | |
20 for k, v in itertools.groupby(lines, str.isspace): | |
21 if not k: yield list(v) | |
22 | |
23 def indexOfNthSequence(mafLines, n): | |
24 for i, line in enumerate(mafLines): | |
25 if line.startswith("s"): | |
26 if n == 1: return i | |
27 n -= 1 | |
28 raise Exception("encountered an alignment with too few sequences") | |
29 | |
30 def rangeOfNthSequence(mafLines, n): | |
31 """Get the range of lines associated with the Nth sequence.""" | |
32 start = indexOfNthSequence(mafLines, n) | |
33 stop = start + 1 | |
34 while stop < len(mafLines): | |
35 line = mafLines[stop] | |
36 if not (line.startswith("q") or line.startswith("i")): break | |
37 stop += 1 | |
38 return start, stop | |
39 | |
40 complement = string.maketrans('ACGTNSWRYKMBDHVacgtnswrykmbdhv', | |
41 'TGCANSWYRMKVHDBtgcanswyrmkvhdb') | |
42 # doesn't handle "U" in RNA sequences | |
43 def revcomp(seq): | |
44 return seq[::-1].translate(complement) | |
45 | |
46 def flippedMafS(words): | |
47 alnStart = int(words[2]) | |
48 alnSize = int(words[3]) | |
49 strand = words[4] | |
50 seqSize = int(words[5]) | |
51 alnString = words[6] | |
52 newStart = seqSize - alnStart - alnSize | |
53 if strand == "-": newStrand = "+" | |
54 else: newStrand = "-" | |
55 newString = revcomp(alnString) | |
56 out = words[0], words[1], newStart, alnSize, newStrand, seqSize, newString | |
57 return map(str, out) | |
58 | |
59 def flippedMafP(words): | |
60 flippedString = words[1][::-1] | |
61 return words[:1] + [flippedString] | |
62 | |
63 def flippedMafQ(words): | |
64 qualityString = words[2] | |
65 flippedString = qualityString[::-1] | |
66 return words[:2] + [flippedString] | |
67 | |
68 def flippedMafLine(mafLine): | |
69 words = mafLine.split() | |
70 if words[0] == "s": return flippedMafS(words) | |
71 elif words[0] == "p": return flippedMafP(words) | |
72 elif words[0] == "q": return flippedMafQ(words) | |
73 else: return words | |
74 | |
75 def maxlen(s): | |
76 return max(map(len, s)) | |
77 | |
78 def sLineFieldWidths(mafLines): | |
79 sLines = (i for i in mafLines if i[0] == "s") | |
80 sColumns = zip(*sLines) | |
81 return map(maxlen, sColumns) | |
82 | |
83 def joinedMafS(words, fieldWidths): | |
84 formatParams = itertools.chain(*zip(fieldWidths, words)) | |
85 return "%*s %-*s %*s %*s %*s %*s %*s\n" % tuple(formatParams) | |
86 | |
87 def joinedMafLine(words, fieldWidths): | |
88 if words[0] == "s": | |
89 return joinedMafS(words, fieldWidths) | |
90 elif words[0] == "q": | |
91 words = words[:2] + [""] * 4 + words[2:] | |
92 return joinedMafS(words, fieldWidths) | |
93 elif words[0] == "p": | |
94 words = words[:1] + [""] * 5 + words[1:] | |
95 return joinedMafS(words, fieldWidths) | |
96 else: | |
97 return " ".join(words) + "\n" | |
98 | |
99 def flippedMaf(mafLines): | |
100 flippedLines = map(flippedMafLine, mafLines) | |
101 fieldWidths = sLineFieldWidths(flippedLines) | |
102 return (joinedMafLine(i, fieldWidths) for i in flippedLines) | |
103 | |
104 def isCanonicalStrand(mafLine): | |
105 words = mafLine.split() | |
106 strand = words[4] | |
107 if strand == "+": return True | |
108 alnString = words[6] | |
109 if "/" in alnString or "\\" in alnString: return True # frameshifts | |
110 alnSize = int(words[3]) | |
111 gapCount = alnString.count("-") | |
112 if len(alnString) - gapCount < alnSize: return True # translated DNA | |
113 return False | |
114 | |
115 def mafSwap(opts, args): | |
116 inputLines = fileinput.input(args) | |
117 for mafLines in mafInput(filterComments(inputLines)): | |
118 start, stop = rangeOfNthSequence(mafLines, opts.n) | |
119 mafLines[1:stop] = mafLines[start:stop] + mafLines[1:start] | |
120 if not isCanonicalStrand(mafLines[1]): | |
121 mafLines = flippedMaf(mafLines) | |
122 for i in mafLines: print i, | |
123 print # blank line after each alignment | |
124 | |
125 if __name__ == "__main__": | |
126 signal.signal(signal.SIGPIPE, signal.SIG_DFL) # avoid silly error message | |
127 | |
128 usage = "%prog [options] my-alignments.maf" | |
129 description = "Change the order of sequences in MAF-format alignments." | |
130 op = optparse.OptionParser(usage=usage, description=description) | |
131 op.add_option("-n", type="int", default=2, | |
132 help="move the Nth sequence to the top (default: %default)") | |
133 (opts, args) = op.parse_args() | |
134 if opts.n < 1: op.error("option -n: should be >= 1") | |
135 | |
136 try: mafSwap(opts, args) | |
137 except KeyboardInterrupt: pass # avoid silly error message | |
138 except Exception, e: | |
139 prog = os.path.basename(sys.argv[0]) | |
140 sys.exit(prog + ": error: " + str(e)) |