0
|
1 #! /usr/bin/env python
|
|
2
|
|
3 # Read MAF-format alignments, and write them, after moving the Nth
|
|
4 # sequence to the top in each alignment.
|
|
5
|
|
6 # Before writing, if the top sequence would be on the - strand, then
|
|
7 # flip all the strands. But don't do this if the top sequence is
|
|
8 # translated DNA.
|
|
9
|
|
10 # Seems to work with Python 2.x, x>=4
|
|
11
|
|
12 import fileinput, itertools, optparse, os, signal, string, sys
|
|
13
|
|
14 def filterComments(lines):
|
|
15 for i in lines:
|
|
16 if i.startswith("#"): print i,
|
|
17 else: yield i
|
|
18
|
|
19 def mafInput(lines):
|
|
20 for k, v in itertools.groupby(lines, str.isspace):
|
|
21 if not k: yield list(v)
|
|
22
|
|
23 def indexOfNthSequence(mafLines, n):
|
|
24 for i, line in enumerate(mafLines):
|
|
25 if line.startswith("s"):
|
|
26 if n == 1: return i
|
|
27 n -= 1
|
|
28 raise Exception("encountered an alignment with too few sequences")
|
|
29
|
|
30 def rangeOfNthSequence(mafLines, n):
|
|
31 """Get the range of lines associated with the Nth sequence."""
|
|
32 start = indexOfNthSequence(mafLines, n)
|
|
33 stop = start + 1
|
|
34 while stop < len(mafLines):
|
|
35 line = mafLines[stop]
|
|
36 if not (line.startswith("q") or line.startswith("i")): break
|
|
37 stop += 1
|
|
38 return start, stop
|
|
39
|
|
40 complement = string.maketrans('ACGTNSWRYKMBDHVacgtnswrykmbdhv',
|
|
41 'TGCANSWYRMKVHDBtgcanswyrmkvhdb')
|
|
42 # doesn't handle "U" in RNA sequences
|
|
43 def revcomp(seq):
|
|
44 return seq[::-1].translate(complement)
|
|
45
|
|
46 def flippedMafS(words):
|
|
47 alnStart = int(words[2])
|
|
48 alnSize = int(words[3])
|
|
49 strand = words[4]
|
|
50 seqSize = int(words[5])
|
|
51 alnString = words[6]
|
|
52 newStart = seqSize - alnStart - alnSize
|
|
53 if strand == "-": newStrand = "+"
|
|
54 else: newStrand = "-"
|
|
55 newString = revcomp(alnString)
|
|
56 out = words[0], words[1], newStart, alnSize, newStrand, seqSize, newString
|
|
57 return map(str, out)
|
|
58
|
|
59 def flippedMafP(words):
|
|
60 flippedString = words[1][::-1]
|
|
61 return words[:1] + [flippedString]
|
|
62
|
|
63 def flippedMafQ(words):
|
|
64 qualityString = words[2]
|
|
65 flippedString = qualityString[::-1]
|
|
66 return words[:2] + [flippedString]
|
|
67
|
|
68 def flippedMafLine(mafLine):
|
|
69 words = mafLine.split()
|
|
70 if words[0] == "s": return flippedMafS(words)
|
|
71 elif words[0] == "p": return flippedMafP(words)
|
|
72 elif words[0] == "q": return flippedMafQ(words)
|
|
73 else: return words
|
|
74
|
|
75 def maxlen(s):
|
|
76 return max(map(len, s))
|
|
77
|
|
78 def sLineFieldWidths(mafLines):
|
|
79 sLines = (i for i in mafLines if i[0] == "s")
|
|
80 sColumns = zip(*sLines)
|
|
81 return map(maxlen, sColumns)
|
|
82
|
|
83 def joinedMafS(words, fieldWidths):
|
|
84 formatParams = itertools.chain(*zip(fieldWidths, words))
|
|
85 return "%*s %-*s %*s %*s %*s %*s %*s\n" % tuple(formatParams)
|
|
86
|
|
87 def joinedMafLine(words, fieldWidths):
|
|
88 if words[0] == "s":
|
|
89 return joinedMafS(words, fieldWidths)
|
|
90 elif words[0] == "q":
|
|
91 words = words[:2] + [""] * 4 + words[2:]
|
|
92 return joinedMafS(words, fieldWidths)
|
|
93 elif words[0] == "p":
|
|
94 words = words[:1] + [""] * 5 + words[1:]
|
|
95 return joinedMafS(words, fieldWidths)
|
|
96 else:
|
|
97 return " ".join(words) + "\n"
|
|
98
|
|
99 def flippedMaf(mafLines):
|
|
100 flippedLines = map(flippedMafLine, mafLines)
|
|
101 fieldWidths = sLineFieldWidths(flippedLines)
|
|
102 return (joinedMafLine(i, fieldWidths) for i in flippedLines)
|
|
103
|
|
104 def isCanonicalStrand(mafLine):
|
|
105 words = mafLine.split()
|
|
106 strand = words[4]
|
|
107 if strand == "+": return True
|
|
108 alnString = words[6]
|
|
109 if "/" in alnString or "\\" in alnString: return True # frameshifts
|
|
110 alnSize = int(words[3])
|
|
111 gapCount = alnString.count("-")
|
|
112 if len(alnString) - gapCount < alnSize: return True # translated DNA
|
|
113 return False
|
|
114
|
|
115 def mafSwap(opts, args):
|
|
116 inputLines = fileinput.input(args)
|
|
117 for mafLines in mafInput(filterComments(inputLines)):
|
|
118 start, stop = rangeOfNthSequence(mafLines, opts.n)
|
|
119 mafLines[1:stop] = mafLines[start:stop] + mafLines[1:start]
|
|
120 if not isCanonicalStrand(mafLines[1]):
|
|
121 mafLines = flippedMaf(mafLines)
|
|
122 for i in mafLines: print i,
|
|
123 print # blank line after each alignment
|
|
124
|
|
125 if __name__ == "__main__":
|
|
126 signal.signal(signal.SIGPIPE, signal.SIG_DFL) # avoid silly error message
|
|
127
|
|
128 usage = "%prog [options] my-alignments.maf"
|
|
129 description = "Change the order of sequences in MAF-format alignments."
|
|
130 op = optparse.OptionParser(usage=usage, description=description)
|
|
131 op.add_option("-n", type="int", default=2,
|
|
132 help="move the Nth sequence to the top (default: %default)")
|
|
133 (opts, args) = op.parse_args()
|
|
134 if opts.n < 1: op.error("option -n: should be >= 1")
|
|
135
|
|
136 try: mafSwap(opts, args)
|
|
137 except KeyboardInterrupt: pass # avoid silly error message
|
|
138 except Exception, e:
|
|
139 prog = os.path.basename(sys.argv[0])
|
|
140 sys.exit(prog + ": error: " + str(e))
|