Mercurial > repos > cpt > cpt_intersect_adj
comparison intersect_and_adjacent.py @ 0:923d2528480d draft
planemo upload commit 8500f316f8325380aa5d0000fbd13dd96012b1bb
author | cpt |
---|---|
date | Thu, 08 Aug 2024 04:14:38 +0000 |
parents | |
children | 10000414e916 |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:923d2528480d |
---|---|
1 #!/usr/bin/env python | |
2 import logging | |
3 import argparse | |
4 from intervaltree import IntervalTree, Interval | |
5 from CPT_GFFParser import gffParse, gffWrite | |
6 from Bio.SeqRecord import SeqRecord | |
7 from Bio.Seq import Seq | |
8 | |
9 logging.basicConfig(level=logging.INFO) | |
10 log = logging.getLogger(__name__) | |
11 | |
12 | |
13 def validFeat(rec): | |
14 for feat in rec.features: | |
15 if feat.type != "remark" and feat.type != "annotation": | |
16 return True | |
17 return False | |
18 | |
19 | |
20 def treeFeatures(features, window): | |
21 for feat in features: | |
22 # Interval(begin, end, data) | |
23 yield Interval( | |
24 int(feat.location.start) - int(window), | |
25 int(feat.location.end) + int(window), | |
26 feat.id, | |
27 ) | |
28 | |
29 | |
30 def treeFeatures_noRem(features, window): | |
31 for feat in features: | |
32 if feat.type == "remark" or feat.type == "annotation": | |
33 continue | |
34 # Interval(begin, end, data) | |
35 yield Interval( | |
36 int(feat.location.start) - int(window), | |
37 int(feat.location.end) + int(window), | |
38 feat.id, | |
39 ) | |
40 | |
41 | |
42 def intersect(a, b, window, stranding): | |
43 rec_a = list(gffParse(a)) | |
44 rec_b = list(gffParse(b)) | |
45 rec_a_out = [] | |
46 rec_b_out = [] | |
47 maxLen = min(len(rec_a), len(rec_b)) | |
48 iterate = 0 | |
49 | |
50 if maxLen > 0: | |
51 while iterate < maxLen: | |
52 rec_a_i = rec_a[iterate] | |
53 rec_b_i = rec_b[iterate] | |
54 | |
55 if (not validFeat(rec_a_i)) or (not validFeat(rec_b_i)): | |
56 rec_a_out.append( | |
57 SeqRecord( | |
58 rec_a[iterate].seq, | |
59 rec_a[iterate].id, | |
60 rec_a[iterate].name, | |
61 rec_a[iterate].description, | |
62 rec_a[iterate].dbxrefs, | |
63 [], | |
64 rec_a[iterate].annotations, | |
65 ) | |
66 ) | |
67 rec_b_out.append( | |
68 SeqRecord( | |
69 rec_b[iterate].seq, | |
70 rec_b[iterate].id, | |
71 rec_b[iterate].name, | |
72 rec_b[iterate].description, | |
73 rec_b[iterate].dbxrefs, | |
74 [], | |
75 rec_b[iterate].annotations, | |
76 ) | |
77 ) | |
78 iterate += 1 | |
79 continue | |
80 | |
81 a_neg = [] | |
82 a_pos = [] | |
83 b_neg = [] | |
84 b_pos = [] | |
85 tree_a = [] | |
86 tree_b = [] | |
87 if stranding == True: | |
88 for feat in rec_a_i.features: | |
89 if feat.type == "remark" or feat.type == "annotation": | |
90 continue | |
91 if feat.strand > 0: | |
92 a_pos.append( | |
93 Interval( | |
94 int(feat.location.start) - int(window), | |
95 int(feat.location.end) + int(window), | |
96 feat.id, | |
97 ) | |
98 ) | |
99 else: | |
100 a_neg.append( | |
101 Interval( | |
102 int(feat.location.start) - int(window), | |
103 int(feat.location.end) + int(window), | |
104 feat.id, | |
105 ) | |
106 ) | |
107 | |
108 for feat in rec_b_i.features: | |
109 if feat.type == "remark" or feat.type == "annotation": | |
110 continue | |
111 if feat.strand > 0: | |
112 b_pos.append( | |
113 Interval( | |
114 int(feat.location.start) - int(window), | |
115 int(feat.location.end) + int(window), | |
116 feat.id, | |
117 ) | |
118 ) | |
119 else: | |
120 b_neg.append( | |
121 Interval( | |
122 int(feat.location.start) - int(window), | |
123 int(feat.location.end) + int(window), | |
124 feat.id, | |
125 ) | |
126 ) | |
127 | |
128 else: | |
129 for feat in rec_a_i.features: | |
130 if feat.type == "remark" or feat.type == "annotation": | |
131 continue | |
132 tree_a.append( | |
133 Interval( | |
134 int(feat.location.start) - int(window), | |
135 int(feat.location.end) + int(window), | |
136 feat.id, | |
137 ) | |
138 ) | |
139 for feat in rec_b_i.features: | |
140 if feat.type == "remark" or feat.type == "annotation": | |
141 continue | |
142 tree_b.append( | |
143 Interval( | |
144 int(feat.location.start) - int(window), | |
145 int(feat.location.end) + int(window), | |
146 feat.id, | |
147 ) | |
148 ) | |
149 if stranding: | |
150 # builds interval tree from Interval objects of form (start, end, id) for each feature | |
151 # tree_a = IntervalTree(list(treeFeatures_noRem(rec_a_i.features, window))) | |
152 # tree_b = IntervalTree(list(treeFeatures_noRem(rec_b_i.features, window))) | |
153 # else: | |
154 tree_a_pos = IntervalTree(a_pos) | |
155 tree_a_neg = IntervalTree(a_neg) | |
156 tree_b_pos = IntervalTree(b_pos) | |
157 tree_b_neg = IntervalTree(b_neg) | |
158 else: | |
159 tree_a = IntervalTree(tree_a) | |
160 tree_b = IntervalTree(tree_b) | |
161 | |
162 # Used to map ids back to features later | |
163 rec_a_map = {f.id: f for f in rec_a_i.features} | |
164 rec_b_map = {f.id: f for f in rec_b_i.features} | |
165 | |
166 rec_a_hits_in_b = [] | |
167 rec_b_hits_in_a = [] | |
168 | |
169 for feature in rec_a_i.features: | |
170 # Save each feature in rec_a that overlaps a feature in rec_b | |
171 # hits = tree_b.find_range((int(feature.location.start), int(feature.location.end))) | |
172 | |
173 if feature.type == "remark" or feature.type == "annotation": | |
174 continue | |
175 | |
176 if stranding == False: | |
177 hits = tree_b[ | |
178 int(feature.location.start) : int(feature.location.end) | |
179 ] | |
180 | |
181 # feature id is saved in interval result.data, use map to get full feature | |
182 for hit in hits: | |
183 rec_a_hits_in_b.append(rec_b_map[hit.data]) | |
184 | |
185 else: | |
186 if feature.strand > 0: | |
187 hits_pos = tree_b_pos[ | |
188 int(feature.location.start) : int(feature.location.end) | |
189 ] | |
190 for hit in hits_pos: | |
191 rec_a_hits_in_b.append(rec_b_map[hit.data]) | |
192 else: | |
193 hits_neg = tree_b_neg[ | |
194 int(feature.location.start) : int(feature.location.end) | |
195 ] | |
196 for hit in hits_neg: | |
197 rec_a_hits_in_b.append(rec_b_map[hit.data]) | |
198 | |
199 for feature in rec_b_i.features: | |
200 if feature.type == "remark" or feature.type == "annotation": | |
201 continue | |
202 | |
203 if stranding == False: | |
204 hits = tree_a[ | |
205 int(feature.location.start) : int(feature.location.end) | |
206 ] | |
207 | |
208 # feature id is saved in interval result.data, use map to get full feature | |
209 for hit in hits: | |
210 rec_b_hits_in_a.append(rec_a_map[hit.data]) | |
211 | |
212 else: | |
213 if feature.strand > 0: | |
214 hits_pos = tree_a_pos[ | |
215 int(feature.location.start) : int(feature.location.end) | |
216 ] | |
217 for hit in hits_pos: | |
218 rec_b_hits_in_a.append(rec_a_map[hit.data]) | |
219 else: | |
220 hits_neg = tree_a_neg[ | |
221 int(feature.location.start) : int(feature.location.end) | |
222 ] | |
223 for hit in hits_neg: | |
224 rec_b_hits_in_a.append(rec_a_map[hit.data]) | |
225 | |
226 # Remove duplicate features using sets | |
227 rec_a_out.append( | |
228 SeqRecord( | |
229 rec_a[iterate].seq, | |
230 rec_a[iterate].id, | |
231 rec_a[iterate].name, | |
232 rec_a[iterate].description, | |
233 rec_a[iterate].dbxrefs, | |
234 sorted(set(rec_a_hits_in_b), key=lambda feat: feat.location.start), | |
235 rec_a[iterate].annotations, | |
236 ) | |
237 ) | |
238 rec_b_out.append( | |
239 SeqRecord( | |
240 rec_b[iterate].seq, | |
241 rec_b[iterate].id, | |
242 rec_b[iterate].name, | |
243 rec_b[iterate].description, | |
244 rec_b[iterate].dbxrefs, | |
245 sorted(set(rec_b_hits_in_a), key=lambda feat: feat.location.start), | |
246 rec_b[iterate].annotations, | |
247 ) | |
248 ) | |
249 iterate += 1 | |
250 | |
251 else: | |
252 # If one input is empty, output two empty result files. | |
253 rec_a_out = [SeqRecord(Seq(""), "none")] | |
254 rec_b_out = [SeqRecord(Seq(""), "none")] | |
255 return rec_a_out, rec_b_out | |
256 | |
257 | |
258 if __name__ == "__main__": | |
259 parser = argparse.ArgumentParser( | |
260 description="rebase gff3 features against parent locations", epilog="" | |
261 ) | |
262 parser.add_argument("a", type=argparse.FileType("r")) | |
263 parser.add_argument("b", type=argparse.FileType("r")) | |
264 parser.add_argument( | |
265 "window", | |
266 type=int, | |
267 default=50, | |
268 help="Allows features this far away to still be considered 'adjacent'", | |
269 ) | |
270 parser.add_argument( | |
271 "-stranding", | |
272 action="store_true", | |
273 help="Only allow adjacency for same-strand features", | |
274 ) | |
275 parser.add_argument("--oa", type=str, default="a_hits_near_b.gff") | |
276 parser.add_argument("--ob", type=str, default="b_hits_near_a.gff") | |
277 args = parser.parse_args() | |
278 | |
279 b, a = intersect(args.a, args.b, args.window, args.stranding) | |
280 | |
281 with open(args.oa, "w") as handle: | |
282 for rec in a: | |
283 gffWrite([rec], handle) | |
284 | |
285 with open(args.ob, "w") as handle: | |
286 for rec in b: | |
287 gffWrite([rec], handle) |