Mercurial > repos > xuebing > sharplabtool
comparison tools/human_genome_variation/senatag.py @ 0:9071e359b9a3
Uploaded
author | xuebing |
---|---|
date | Fri, 09 Mar 2012 19:37:19 -0500 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:9071e359b9a3 |
---|---|
1 #!/usr/bin/env python | |
2 | |
3 """ | |
4 This tool takes the following file pairs as input: | |
5 a) input_snp : A file with identifiers for SNPs (one on each line) | |
6 b) ldfile : A file where each line has the following | |
7 snp list | |
8 where "snp" is an identifier for one SNP and the "list" is a | |
9 comma separated list of all the other snps that are in LD with | |
10 it (as per some threshold of rsquare) | |
11 | |
12 The output is a set of tag SNPs for the given datasets | |
13 | |
14 The algorithm is as follows: | |
15 | |
16 a) Construct a graph for each population, where each node is a SNP and two nodes | |
17 are connected using an edge iff they are in LD. | |
18 b) For each SNP, count the total number of connected nodes, which have not yet | |
19 been visited. | |
20 c) Find the SNP with the highest count and assign it to be a tag SNP. | |
21 d) Mark that SNP and all the snps connected to it as "visited". This should be | |
22 done for each population. | |
23 e) Continue steps b-e until all SNPs, in all populations have been visited. | |
24 """ | |
25 | |
26 from sys import argv, stderr, exit | |
27 from getopt import getopt, GetoptError | |
28 | |
29 import os | |
30 import heapq | |
31 | |
32 __author__ = "Aakrosh Ratan" | |
33 __email__ = "ratan@bx.psu.edu" | |
34 | |
35 # do we want the debug information to be printed? | |
36 debug_flag = False | |
37 | |
38 class node: | |
39 def __init__(self, name): | |
40 self.name = name | |
41 self.edges = [] | |
42 self.visited = False | |
43 | |
44 # return the number of nodes connected to this node, that have yet to be | |
45 # visited | |
46 def num_not_visited(self): | |
47 num = 0 | |
48 for n in self.edges: | |
49 if n.visited == False: num += 1 | |
50 return num | |
51 | |
52 def __cmp__(self, other): | |
53 return other.num_not_visited() - self.num_not_visited() | |
54 | |
55 def __str__(self): | |
56 return self.name | |
57 | |
58 class graph: | |
59 def __init__(self): | |
60 self.nodes = {} | |
61 | |
62 def __str__(self): | |
63 string = "" | |
64 for n1 in self.nodes.values(): | |
65 n2s = [x.name for x in n1.edges] | |
66 string += "%s %s\n" % (n1.name, ",".join(n2s)) | |
67 return string[:-1] | |
68 | |
69 def add_node(self, n): | |
70 self.nodes[n.name] = n | |
71 | |
72 def add_edges(self, n1, n2): | |
73 assert n1.name in self.nodes | |
74 assert n2.name in self.nodes | |
75 n1.edges.append(n2) | |
76 n2.edges.append(n1) | |
77 | |
78 def check_graph(self): | |
79 for n in self.nodes.values(): | |
80 ms = [x for x in n.edges] | |
81 for m in ms: | |
82 if n not in m.edges: | |
83 print >> stderr, "check : %s - %s" % (n,m) | |
84 | |
85 def construct_graph(ldfile, snpfile): | |
86 # construct the initial graph. add all the SNPs as nodes | |
87 g = graph() | |
88 file = open(snpfile, "r") | |
89 | |
90 for line in file: | |
91 # ignore empty lines and add the remainder to the graph | |
92 if len(line.strip()) == 0: continue | |
93 n = node(line.strip()) | |
94 g.add_node(n) | |
95 | |
96 file.close() | |
97 print >> stderr, "Added %d nodes to a graph" % len(g.nodes) | |
98 | |
99 # now add all the edges | |
100 file = open(ldfile, "r") | |
101 | |
102 for line in file: | |
103 tokens = line.split() | |
104 assert len(tokens) == 2 | |
105 | |
106 # if this node is in the graph, then we need to construct an edge from | |
107 # this node to all the nodes which are highly related to it | |
108 if tokens[0] in g.nodes: | |
109 n1 = g.nodes[tokens[0]] | |
110 n2s = [g.nodes[x] for x in tokens[1].split(",")] | |
111 | |
112 for n2 in n2s: | |
113 g.add_edges(n1, n2) | |
114 | |
115 file.close() | |
116 print >> stderr, "Added all edges to the graph" | |
117 | |
118 return g | |
119 | |
120 def check_output(g, tagsnps): | |
121 # find all the nodes in the graph | |
122 allsnps = [x.name for x in g.nodes.values()] | |
123 | |
124 # find the nodes that are covered by our tagsnps | |
125 mysnps = [x.name for x in tagsnps] | |
126 | |
127 for n in tagsnps: | |
128 for m in n.edges: | |
129 mysnps.append(m.name) | |
130 | |
131 mysnps = list(set(mysnps)) | |
132 | |
133 if set(allsnps) != set(mysnps): | |
134 diff = list(set(allsnps) - set(mysnps)) | |
135 print >> stderr, "%s are not covered" % ",".join(diff) | |
136 | |
137 def main(ldfile, snpsfile, required, excluded): | |
138 # construct the graph | |
139 g = construct_graph(ldfile, snpsfile) | |
140 if debug_flag == True: g.check_graph() | |
141 | |
142 tagsnps = [] | |
143 neighbors = {} | |
144 | |
145 # take care of the SNPs that are required to be TagSNPs | |
146 for s in required: | |
147 t = g.nodes[s] | |
148 | |
149 t.visited = True | |
150 ns = [] | |
151 | |
152 for n in t.edges: | |
153 if n.visited == False: ns.append(n.name) | |
154 n.visited = True | |
155 | |
156 tagsnps.append(t) | |
157 neighbors[t.name] = list(set(ns)) | |
158 | |
159 # find the tag SNPs for this graph | |
160 data = [x for x in g.nodes.values()] | |
161 heapq.heapify(data) | |
162 | |
163 while data: | |
164 s = heapq.heappop(data) | |
165 | |
166 if s.visited == True or s.name in excluded: continue | |
167 | |
168 s.visited = True | |
169 ns = [] | |
170 | |
171 for n in s.edges: | |
172 if n.visited == False: ns.append(n.name) | |
173 n.visited = True | |
174 | |
175 tagsnps.append(s) | |
176 neighbors[s.name] = list(set(ns)) | |
177 | |
178 heapq.heapify(data) | |
179 | |
180 for s in tagsnps: | |
181 if len(neighbors[s.name]) > 0: | |
182 print "%s\t%s" % (s, ",".join(neighbors[s.name])) | |
183 continue | |
184 print s | |
185 | |
186 if debug_flag == True: check_output(g, tagsnps) | |
187 | |
188 def read_list(filename): | |
189 assert os.path.exists(filename) == True | |
190 file = open(filename, "r") | |
191 list = {} | |
192 | |
193 for line in file: | |
194 list[line.strip()] = 1 | |
195 | |
196 file.close() | |
197 return list | |
198 | |
199 def usage(): | |
200 f = stderr | |
201 print >> f, "usage:" | |
202 print >> f, "senatag [options] neighborhood.txt inputsnps.txt" | |
203 print >> f, "where inputsnps.txt is a file of snps from one population" | |
204 print >> f, "where neighborhood.txt is neighborhood details for the pop." | |
205 print >> f, "where the options are:" | |
206 print >> f, "-h,--help : print usage and quit" | |
207 print >> f, "-d,--debug: print debug information" | |
208 print >> f, "-e,--excluded : file with names of SNPs that cannot be TagSNPs" | |
209 print >> f, "-r,--required : file with names of SNPs that should be TagSNPs" | |
210 | |
211 if __name__ == "__main__": | |
212 try: | |
213 opts, args = getopt(argv[1:], "hdr:e:",\ | |
214 ["help", "debug", "required=", "excluded="]) | |
215 except GetoptError, err: | |
216 print str(err) | |
217 usage() | |
218 exit(2) | |
219 | |
220 required = {} | |
221 excluded = {} | |
222 | |
223 for o, a in opts: | |
224 if o in ("-h", "--help"): | |
225 usage() | |
226 exit() | |
227 elif o in ("-d", "--debug"): | |
228 debug_flag = True | |
229 elif o in ("-r", "--required"): | |
230 required = read_list(a) | |
231 elif o in ("-e", "--excluded"): | |
232 excluded = read_list(a) | |
233 else: | |
234 assert False, "unhandled option" | |
235 | |
236 if len(args) != 2: | |
237 usage() | |
238 exit(3) | |
239 | |
240 assert os.path.exists(args[0]) == True | |
241 assert os.path.exists(args[1]) == True | |
242 | |
243 main(args[0], args[1], required, excluded) |