comparison intersect_and_adjacent.py @ 0:923d2528480d draft

planemo upload commit 8500f316f8325380aa5d0000fbd13dd96012b1bb
author cpt
date Thu, 08 Aug 2024 04:14:38 +0000
parents
children 10000414e916
comparison
equal deleted inserted replaced
-1:000000000000 0:923d2528480d
1 #!/usr/bin/env python
2 import logging
3 import argparse
4 from intervaltree import IntervalTree, Interval
5 from CPT_GFFParser import gffParse, gffWrite
6 from Bio.SeqRecord import SeqRecord
7 from Bio.Seq import Seq
8
9 logging.basicConfig(level=logging.INFO)
10 log = logging.getLogger(__name__)
11
12
13 def validFeat(rec):
14 for feat in rec.features:
15 if feat.type != "remark" and feat.type != "annotation":
16 return True
17 return False
18
19
20 def treeFeatures(features, window):
21 for feat in features:
22 # Interval(begin, end, data)
23 yield Interval(
24 int(feat.location.start) - int(window),
25 int(feat.location.end) + int(window),
26 feat.id,
27 )
28
29
30 def treeFeatures_noRem(features, window):
31 for feat in features:
32 if feat.type == "remark" or feat.type == "annotation":
33 continue
34 # Interval(begin, end, data)
35 yield Interval(
36 int(feat.location.start) - int(window),
37 int(feat.location.end) + int(window),
38 feat.id,
39 )
40
41
42 def intersect(a, b, window, stranding):
43 rec_a = list(gffParse(a))
44 rec_b = list(gffParse(b))
45 rec_a_out = []
46 rec_b_out = []
47 maxLen = min(len(rec_a), len(rec_b))
48 iterate = 0
49
50 if maxLen > 0:
51 while iterate < maxLen:
52 rec_a_i = rec_a[iterate]
53 rec_b_i = rec_b[iterate]
54
55 if (not validFeat(rec_a_i)) or (not validFeat(rec_b_i)):
56 rec_a_out.append(
57 SeqRecord(
58 rec_a[iterate].seq,
59 rec_a[iterate].id,
60 rec_a[iterate].name,
61 rec_a[iterate].description,
62 rec_a[iterate].dbxrefs,
63 [],
64 rec_a[iterate].annotations,
65 )
66 )
67 rec_b_out.append(
68 SeqRecord(
69 rec_b[iterate].seq,
70 rec_b[iterate].id,
71 rec_b[iterate].name,
72 rec_b[iterate].description,
73 rec_b[iterate].dbxrefs,
74 [],
75 rec_b[iterate].annotations,
76 )
77 )
78 iterate += 1
79 continue
80
81 a_neg = []
82 a_pos = []
83 b_neg = []
84 b_pos = []
85 tree_a = []
86 tree_b = []
87 if stranding == True:
88 for feat in rec_a_i.features:
89 if feat.type == "remark" or feat.type == "annotation":
90 continue
91 if feat.strand > 0:
92 a_pos.append(
93 Interval(
94 int(feat.location.start) - int(window),
95 int(feat.location.end) + int(window),
96 feat.id,
97 )
98 )
99 else:
100 a_neg.append(
101 Interval(
102 int(feat.location.start) - int(window),
103 int(feat.location.end) + int(window),
104 feat.id,
105 )
106 )
107
108 for feat in rec_b_i.features:
109 if feat.type == "remark" or feat.type == "annotation":
110 continue
111 if feat.strand > 0:
112 b_pos.append(
113 Interval(
114 int(feat.location.start) - int(window),
115 int(feat.location.end) + int(window),
116 feat.id,
117 )
118 )
119 else:
120 b_neg.append(
121 Interval(
122 int(feat.location.start) - int(window),
123 int(feat.location.end) + int(window),
124 feat.id,
125 )
126 )
127
128 else:
129 for feat in rec_a_i.features:
130 if feat.type == "remark" or feat.type == "annotation":
131 continue
132 tree_a.append(
133 Interval(
134 int(feat.location.start) - int(window),
135 int(feat.location.end) + int(window),
136 feat.id,
137 )
138 )
139 for feat in rec_b_i.features:
140 if feat.type == "remark" or feat.type == "annotation":
141 continue
142 tree_b.append(
143 Interval(
144 int(feat.location.start) - int(window),
145 int(feat.location.end) + int(window),
146 feat.id,
147 )
148 )
149 if stranding:
150 # builds interval tree from Interval objects of form (start, end, id) for each feature
151 # tree_a = IntervalTree(list(treeFeatures_noRem(rec_a_i.features, window)))
152 # tree_b = IntervalTree(list(treeFeatures_noRem(rec_b_i.features, window)))
153 # else:
154 tree_a_pos = IntervalTree(a_pos)
155 tree_a_neg = IntervalTree(a_neg)
156 tree_b_pos = IntervalTree(b_pos)
157 tree_b_neg = IntervalTree(b_neg)
158 else:
159 tree_a = IntervalTree(tree_a)
160 tree_b = IntervalTree(tree_b)
161
162 # Used to map ids back to features later
163 rec_a_map = {f.id: f for f in rec_a_i.features}
164 rec_b_map = {f.id: f for f in rec_b_i.features}
165
166 rec_a_hits_in_b = []
167 rec_b_hits_in_a = []
168
169 for feature in rec_a_i.features:
170 # Save each feature in rec_a that overlaps a feature in rec_b
171 # hits = tree_b.find_range((int(feature.location.start), int(feature.location.end)))
172
173 if feature.type == "remark" or feature.type == "annotation":
174 continue
175
176 if stranding == False:
177 hits = tree_b[
178 int(feature.location.start) : int(feature.location.end)
179 ]
180
181 # feature id is saved in interval result.data, use map to get full feature
182 for hit in hits:
183 rec_a_hits_in_b.append(rec_b_map[hit.data])
184
185 else:
186 if feature.strand > 0:
187 hits_pos = tree_b_pos[
188 int(feature.location.start) : int(feature.location.end)
189 ]
190 for hit in hits_pos:
191 rec_a_hits_in_b.append(rec_b_map[hit.data])
192 else:
193 hits_neg = tree_b_neg[
194 int(feature.location.start) : int(feature.location.end)
195 ]
196 for hit in hits_neg:
197 rec_a_hits_in_b.append(rec_b_map[hit.data])
198
199 for feature in rec_b_i.features:
200 if feature.type == "remark" or feature.type == "annotation":
201 continue
202
203 if stranding == False:
204 hits = tree_a[
205 int(feature.location.start) : int(feature.location.end)
206 ]
207
208 # feature id is saved in interval result.data, use map to get full feature
209 for hit in hits:
210 rec_b_hits_in_a.append(rec_a_map[hit.data])
211
212 else:
213 if feature.strand > 0:
214 hits_pos = tree_a_pos[
215 int(feature.location.start) : int(feature.location.end)
216 ]
217 for hit in hits_pos:
218 rec_b_hits_in_a.append(rec_a_map[hit.data])
219 else:
220 hits_neg = tree_a_neg[
221 int(feature.location.start) : int(feature.location.end)
222 ]
223 for hit in hits_neg:
224 rec_b_hits_in_a.append(rec_a_map[hit.data])
225
226 # Remove duplicate features using sets
227 rec_a_out.append(
228 SeqRecord(
229 rec_a[iterate].seq,
230 rec_a[iterate].id,
231 rec_a[iterate].name,
232 rec_a[iterate].description,
233 rec_a[iterate].dbxrefs,
234 sorted(set(rec_a_hits_in_b), key=lambda feat: feat.location.start),
235 rec_a[iterate].annotations,
236 )
237 )
238 rec_b_out.append(
239 SeqRecord(
240 rec_b[iterate].seq,
241 rec_b[iterate].id,
242 rec_b[iterate].name,
243 rec_b[iterate].description,
244 rec_b[iterate].dbxrefs,
245 sorted(set(rec_b_hits_in_a), key=lambda feat: feat.location.start),
246 rec_b[iterate].annotations,
247 )
248 )
249 iterate += 1
250
251 else:
252 # If one input is empty, output two empty result files.
253 rec_a_out = [SeqRecord(Seq(""), "none")]
254 rec_b_out = [SeqRecord(Seq(""), "none")]
255 return rec_a_out, rec_b_out
256
257
258 if __name__ == "__main__":
259 parser = argparse.ArgumentParser(
260 description="rebase gff3 features against parent locations", epilog=""
261 )
262 parser.add_argument("a", type=argparse.FileType("r"))
263 parser.add_argument("b", type=argparse.FileType("r"))
264 parser.add_argument(
265 "window",
266 type=int,
267 default=50,
268 help="Allows features this far away to still be considered 'adjacent'",
269 )
270 parser.add_argument(
271 "-stranding",
272 action="store_true",
273 help="Only allow adjacency for same-strand features",
274 )
275 parser.add_argument("--oa", type=str, default="a_hits_near_b.gff")
276 parser.add_argument("--ob", type=str, default="b_hits_near_a.gff")
277 args = parser.parse_args()
278
279 b, a = intersect(args.a, args.b, args.window, args.stranding)
280
281 with open(args.oa, "w") as handle:
282 for rec in a:
283 gffWrite([rec], handle)
284
285 with open(args.ob, "w") as handle:
286 for rec in b:
287 gffWrite([rec], handle)