0
|
1 import copy
|
|
2 import logging
|
|
3
|
|
4 log = logging.getLogger()
|
|
5 log.setLevel(logging.WARN)
|
|
6
|
|
7
|
|
8 def feature_lambda(
|
|
9 feature_list,
|
|
10 test,
|
|
11 test_kwargs,
|
|
12 subfeatures=True,
|
|
13 parent=None,
|
|
14 invert=False,
|
|
15 recurse=True,
|
|
16 ):
|
|
17 """Recursively search through features, testing each with a test function, yielding matches.
|
|
18
|
|
19 GFF3 is a hierachical data structure, so we need to be able to recursively
|
|
20 search through features. E.g. if you're looking for a feature with
|
|
21 ID='bob.42', you can't just do a simple list comprehension with a test
|
|
22 case. You don't know how deeply burried bob.42 will be in the feature tree. This is where feature_lambda steps in.
|
|
23
|
|
24 :type feature_list: list
|
|
25 :param feature_list: an iterable of features
|
|
26
|
|
27 :type test: function reference
|
|
28 :param test: a closure with the method signature (feature, **kwargs) where
|
|
29 the kwargs are those passed in the next argument. This
|
|
30 function should return True or False, True if the feature is
|
|
31 to be yielded as part of the main feature_lambda function, or
|
|
32 False if it is to be ignored. This function CAN mutate the
|
|
33 features passed to it (think "apply").
|
|
34
|
|
35 :type test_kwargs: dictionary
|
|
36 :param test_kwargs: kwargs to pass to your closure when it is called.
|
|
37
|
|
38 :type subfeatures: boolean
|
|
39 :param subfeatures: when a feature is matched, should just that feature be
|
|
40 yielded to the caller, or should the entire sub_feature
|
|
41 tree for that feature be included? subfeatures=True is
|
|
42 useful in cases such as searching for a gene feature,
|
|
43 and wanting to know what RBS/Shine_Dalgarno_sequences
|
|
44 are in the sub_feature tree (which can be accomplished
|
|
45 with two feature_lambda calls). subfeatures=False is
|
|
46 useful in cases when you want to process (and possibly
|
|
47 return) the entire feature tree, such as applying a
|
|
48 qualifier to every single feature.
|
|
49
|
|
50 :type invert: boolean
|
|
51 :param invert: Negate/invert the result of the filter.
|
|
52
|
|
53 :rtype: yielded list
|
|
54 :return: Yields a list of matching features.
|
|
55 """
|
|
56 # Either the top level set of [features] or the subfeature attribute
|
|
57 for feature in feature_list:
|
|
58 feature._parent = parent
|
|
59 if not parent:
|
|
60 # Set to self so we cannot go above root.
|
|
61 feature._parent = feature
|
|
62 test_result = test(feature, **test_kwargs)
|
|
63 # if (not invert and test_result) or (invert and not test_result):
|
|
64 if invert ^ test_result:
|
|
65 if not subfeatures:
|
|
66 feature_copy = copy.deepcopy(feature)
|
|
67 feature_copy.sub_features = list()
|
|
68 yield feature_copy
|
|
69 else:
|
|
70 yield feature
|
|
71
|
|
72 if recurse and hasattr(feature, "sub_features"):
|
|
73 for x in feature_lambda(
|
|
74 feature.sub_features,
|
|
75 test,
|
|
76 test_kwargs,
|
|
77 subfeatures=subfeatures,
|
|
78 parent=feature,
|
|
79 invert=invert,
|
|
80 recurse=recurse,
|
|
81 ):
|
|
82 yield x
|
|
83
|
|
84
|
|
85 def fetchParent(feature):
|
|
86 if not hasattr(feature, "_parent") or feature._parent is None:
|
|
87 return feature
|
|
88 else:
|
|
89 return fetchParent(feature._parent)
|
|
90
|
|
91
|
|
92 def feature_test_true(feature, **kwargs):
|
|
93 return True
|
|
94
|
|
95
|
|
96 def feature_test_type(feature, **kwargs):
|
|
97 if "type" in kwargs:
|
|
98 return str(feature.type).upper() == str(kwargs["type"]).upper()
|
|
99 elif "types" in kwargs:
|
|
100 for x in kwargs["types"]:
|
|
101 if str(feature.type).upper() == str(x).upper():
|
|
102 return True
|
|
103 return False
|
|
104 raise Exception("Incorrect feature_test_type call, need type or types")
|
|
105
|
|
106
|
|
107 def feature_test_qual_value(feature, **kwargs):
|
|
108 """Test qualifier values.
|
|
109
|
|
110 For every feature, check that at least one value in
|
|
111 feature.quailfiers(kwargs['qualifier']) is in kwargs['attribute_list']
|
|
112 """
|
|
113 if isinstance(kwargs["qualifier"], list):
|
|
114 for qualifier in kwargs["qualifier"]:
|
|
115 for attribute_value in feature.qualifiers.get(qualifier, []):
|
|
116 if attribute_value in kwargs["attribute_list"]:
|
|
117 return True
|
|
118 else:
|
|
119 for attribute_value in feature.qualifiers.get(kwargs["qualifier"], []):
|
|
120 if attribute_value in kwargs["attribute_list"]:
|
|
121 return True
|
|
122 return False
|
|
123
|
|
124
|
|
125 def feature_test_location(feature, **kwargs):
|
|
126 if "strand" in kwargs:
|
|
127 if feature.location.strand != kwargs["strand"]:
|
|
128 return False
|
|
129
|
|
130 return feature.location.start <= kwargs["loc"] <= feature.location.end
|
|
131
|
|
132
|
|
133 def feature_test_quals(feature, **kwargs):
|
|
134 """
|
|
135 Example::
|
|
136
|
|
137 a = Feature(qualifiers={'Note': ['Some notes', 'Aasdf']})
|
|
138
|
|
139 # Check if a contains a Note
|
|
140 feature_test_quals(a, {'Note': None}) # Returns True
|
|
141 feature_test_quals(a, {'Product': None}) # Returns False
|
|
142
|
|
143 # Check if a contains a note with specific value
|
|
144 feature_test_quals(a, {'Note': ['ome']}) # Returns True
|
|
145
|
|
146 # Check if a contains a note with specific value
|
|
147 feature_test_quals(a, {'Note': ['other']}) # Returns False
|
|
148 """
|
|
149 for key in kwargs:
|
|
150 if key not in feature.qualifiers:
|
|
151 return False
|
|
152
|
|
153 # Key is present, no value specified
|
|
154 if kwargs[key] is None:
|
|
155 return True
|
|
156
|
|
157 # Otherwise there is a key value we're looking for.
|
|
158 # so we make a list of matches
|
|
159 matches = []
|
|
160 # And check all of the feature qualifier valuse
|
|
161 for value in feature.qualifiers[key]:
|
|
162 # For that kwargs[key] value
|
|
163 for x in kwargs[key]:
|
|
164 matches.append(x in value)
|
|
165
|
|
166 # If none matched, then we return false.
|
|
167 if not any(matches):
|
|
168 return False
|
|
169
|
|
170 return True
|
|
171
|
|
172
|
|
173 def feature_test_contains(feature, **kwargs):
|
|
174 if "index" in kwargs:
|
|
175 return feature.location.start < kwargs["index"] < feature.location.end
|
|
176 elif "range" in kwargs:
|
|
177 return (
|
|
178 feature.location.start < kwargs["range"]["start"] < feature.location.end
|
|
179 and feature.location.start < kwargs["range"]["end"] < feature.location.end
|
|
180 )
|
|
181 else:
|
|
182 raise RuntimeError("Must use index or range keyword")
|
|
183
|
|
184
|
|
185 def get_id(feature=None, parent_prefix=None):
|
|
186 result = ""
|
|
187 if parent_prefix is not None:
|
|
188 result += parent_prefix + "|"
|
|
189 if "locus_tag" in feature.qualifiers:
|
|
190 result += feature.qualifiers["locus_tag"][0]
|
|
191 elif "gene" in feature.qualifiers:
|
|
192 result += feature.qualifiers["gene"][0]
|
|
193 elif "Gene" in feature.qualifiers:
|
|
194 result += feature.qualifiers["Gene"][0]
|
|
195 elif "product" in feature.qualifiers:
|
|
196 result += feature.qualifiers["product"][0]
|
|
197 elif "Product" in feature.qualifiers:
|
|
198 result += feature.qualifiers["Product"][0]
|
|
199 elif "Name" in feature.qualifiers:
|
|
200 result += feature.qualifiers["Name"][0]
|
|
201 else:
|
|
202 return feature.id
|
|
203 # Leaving in case bad things happen.
|
|
204 # result += '%s_%s_%s_%s' % (
|
|
205 # feature.id,
|
|
206 # feature.location.start,
|
|
207 # feature.location.end,
|
|
208 # feature.location.strand
|
|
209 # )
|
|
210 return result
|
|
211
|
|
212
|
|
213 def get_gff3_id(gene):
|
|
214 return gene.qualifiers.get("Name", [gene.id])[0]
|
|
215
|
|
216
|
|
217 def ensure_location_in_bounds(start=0, end=0, parent_length=0):
|
|
218 # This prevents frameshift errors
|
|
219 while start < 0:
|
|
220 start += 3
|
|
221 while end < 0:
|
|
222 end += 3
|
|
223 while start > parent_length:
|
|
224 start -= 3
|
|
225 while end > parent_length:
|
|
226 end -= 3
|
|
227 return (start, end)
|
|
228
|
|
229
|
|
230 def coding_genes(feature_list):
|
|
231 for x in genes(feature_list):
|
|
232 if (
|
|
233 len(
|
|
234 list(
|
|
235 feature_lambda(
|
|
236 x.sub_features,
|
|
237 feature_test_type,
|
|
238 {"type": "CDS"},
|
|
239 subfeatures=False,
|
|
240 )
|
|
241 )
|
|
242 )
|
|
243 > 0
|
|
244 ):
|
|
245 yield x
|
|
246
|
|
247
|
|
248 def genes(feature_list, feature_type="gene", sort=False):
|
|
249 """
|
|
250 Simple filter to extract gene features from the feature set.
|
|
251 """
|
|
252
|
|
253 if not sort:
|
|
254 for x in feature_lambda(
|
|
255 feature_list, feature_test_type, {"type": feature_type}, subfeatures=True
|
|
256 ):
|
|
257 yield x
|
|
258 else:
|
|
259 data = list(genes(feature_list, feature_type=feature_type, sort=False))
|
|
260 data = sorted(data, key=lambda feature: feature.location.start)
|
|
261 for x in data:
|
|
262 yield x
|
|
263
|
|
264
|
|
265 def wa_unified_product_name(feature):
|
|
266 """
|
|
267 Try and figure out a name. We gave conflicting instructions, so
|
|
268 this isn't as trivial as it should be. Sometimes it will be in
|
|
269 'product' or 'Product', othertimes in 'Name'
|
|
270 """
|
|
271 # Manually applied tags.
|
|
272 protein_product = feature.qualifiers.get(
|
|
273 "product", feature.qualifiers.get("Product", [None])
|
|
274 )[0]
|
|
275
|
|
276 # If neither of those are available ...
|
|
277 if protein_product is None:
|
|
278 # And there's a name...
|
|
279 if "Name" in feature.qualifiers:
|
|
280 if not is_uuid(feature.qualifiers["Name"][0]):
|
|
281 protein_product = feature.qualifiers["Name"][0]
|
|
282
|
|
283 return protein_product
|
|
284
|
|
285
|
|
286 def is_uuid(name):
|
|
287 return name.count("-") == 4 and len(name) == 36
|
|
288
|
|
289
|
|
290 def get_rbs_from(gene):
|
|
291 # Normal RBS annotation types
|
|
292 rbs_rbs = list(
|
|
293 feature_lambda(
|
|
294 gene.sub_features, feature_test_type, {"type": "RBS"}, subfeatures=False
|
|
295 )
|
|
296 )
|
|
297 rbs_sds = list(
|
|
298 feature_lambda(
|
|
299 gene.sub_features,
|
|
300 feature_test_type,
|
|
301 {"type": "Shine_Dalgarno_sequence"},
|
|
302 subfeatures=False,
|
|
303 )
|
|
304 )
|
|
305 # Fraking apollo
|
|
306 apollo_exons = list(
|
|
307 feature_lambda(
|
|
308 gene.sub_features, feature_test_type, {"type": "exon"}, subfeatures=False
|
|
309 )
|
|
310 )
|
|
311 apollo_exons = [x for x in apollo_exons if len(x) < 10]
|
|
312 # These are more NCBI's style
|
|
313 regulatory_elements = list(
|
|
314 feature_lambda(
|
|
315 gene.sub_features,
|
|
316 feature_test_type,
|
|
317 {"type": "regulatory"},
|
|
318 subfeatures=False,
|
|
319 )
|
|
320 )
|
|
321 rbs_regulatory = list(
|
|
322 feature_lambda(
|
|
323 regulatory_elements,
|
|
324 feature_test_quals,
|
|
325 {"regulatory_class": ["ribosome_binding_site"]},
|
|
326 subfeatures=False,
|
|
327 )
|
|
328 )
|
|
329 # Here's hoping you find just one ;)
|
|
330 return rbs_rbs + rbs_sds + rbs_regulatory + apollo_exons
|
|
331
|
|
332
|
|
333 def nice_name(record):
|
|
334 """
|
|
335 get the real name rather than NCBI IDs and so on. If fails, will return record.id
|
|
336 """
|
|
337 name = record.id
|
|
338 likely_parental_contig = list(genes(record.features, feature_type="contig"))
|
|
339 if len(likely_parental_contig) == 1:
|
|
340 name = likely_parental_contig[0].qualifiers.get("organism", [name])[0]
|
|
341 return name
|
|
342
|
|
343
|
|
344 def fsort(it):
|
|
345 for i in sorted(it, key=lambda x: int(x.location.start)):
|
|
346 yield i
|