comparison phe/variant_filters/__init__.py @ 0:834a312c0114 draft

Uploaded
author ulfschaefer
date Thu, 10 Dec 2015 09:22:39 -0500
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:834a312c0114
1 """Classes and functions for working with variant filters."""
2
3 from __builtin__ import __import__
4 from abc import abstractproperty
5 import abc
6 import argparse
7 import glob
8 import inspect
9 import logging
10 import os
11 import re
12 import sys
13
14 import vcf
15 import vcf.filters
16 from vcf.parser import _Filter
17
18 IUPAC_CODES = {frozenset(["A", "G"]): "R",
19 frozenset(["C", "T"]): "Y",
20 frozenset(["G", "C"]): "S",
21 frozenset(["A", "T"]): "W",
22 frozenset(["G", "T"]): "K",
23 frozenset(["A", "C"]): "M",
24 frozenset(["C", "G", "T"]): "B",
25 frozenset(["A", "G", "T"]): "D",
26 frozenset(["A", "C", "T"]): "H",
27 frozenset(["A", "C", "G"]): "V"
28 }
29
30 class PHEFilterBase(vcf.filters.Base):
31 """Base class for VCF filters."""
32 __meta__ = abc.ABCMeta
33
34 magic_sep = ":"
35 decoder_pattern = re.compile(magic_sep)
36
37 @abc.abstractproperty
38 def parameter(self):
39 """Short name of parameter being filtered."""
40 return self.parameter
41
42 @abc.abstractproperty
43 def _default_threshold(self):
44 """Default threshold for filtering."""
45 return self._default_threshold
46
47 def __init__(self, args):
48 super(PHEFilterBase, self).__init__(args)
49
50 # Change the threshold to custom gq value.
51 self.threshold = self._default_threshold
52
53 if isinstance(args, dict):
54 self.threshold = args.get(self.parameter)
55
56 def __str__(self):
57 return self.filter_name()
58
59 @abc.abstractmethod
60 def short_desc(self):
61 """Short description of the filter (included in VCF)."""
62 raise NotImplementedError("Get short description is not implemented.")
63
64 def get_config(self):
65 """This is used for reconstructing filter."""
66 return {self.parameter: self.threshold}
67
68 def filter_name(self):
69 """Create filter names by their parameter separated by magic.
70 E.g. if filter parameter is ad_ratio and threshold is 0.9 then
71 ad_ratio:0.9 if the filter name.
72 """
73 return "%s%s%s" % (self.parameter, self.magic_sep, self.threshold)
74
75 @staticmethod
76 def decode(filter_id):
77 """Decode name of filter."""
78 conf = {}
79
80 if PHEFilterBase.magic_sep in filter_id:
81 info = PHEFilterBase.decoder_pattern.split(filter_id)
82 assert len(info) == 2
83 conf[info[0]] = info[1]
84 return conf
85
86 def is_gap(self):
87 return False
88
89 def is_n(self):
90 return True
91
92 @staticmethod
93 def call_concensus(record):
94 extended_code = "N"
95 try:
96 sample_ad = set([str(c) for c in record.ALT] + [record.REF])
97
98
99 for code, cov in IUPAC_CODES.items():
100 if sample_ad == cov:
101 extended_code = code
102 break
103 except AttributeError:
104 extended_code = "N"
105
106 return extended_code
107
108 def dynamic_filter_loader():
109 """Fancy way of dynamically importing existing filters.
110
111 Returns
112 -------
113 dict:
114 Available filters dictionary. Keys are parameters that
115 can be supplied to the filters.
116 """
117
118 # We assume the filters are in the same directory as THIS file.
119 filter_dir = os.path.dirname(__file__)
120 filter_dir = os.path.abspath(filter_dir)
121
122 # This is populated when the module is first imported.
123 avail_filters = {}
124
125 # Add this directory to the syspath.
126 sys.path.insert(0, filter_dir)
127
128 # Find all "py" files.
129 for filter_mod in glob.glob(os.path.join(filter_dir, "*.py")):
130
131 # Derive name of the module where filter is.
132 filter_mod_file = os.path.basename(filter_mod)
133
134 # Ignore this file, obviously.
135 if filter_mod_file.startswith("__init__"):
136 continue
137
138 # Import the module with a filter.
139 mod = __import__(filter_mod_file.replace(".pyc", "").replace(".py", ""))
140
141 # Find all the classes contained in this module.
142 classes = inspect.getmembers(mod, inspect.isclass)
143 for cls_name, cls in classes:
144 # For each class, if it is a sublass of PHEFilterBase, add it.
145 if cls_name != "PHEFilterBase" and issubclass(cls, PHEFilterBase):
146 # The parameters are inherited and defined within each filter.
147 avail_filters[cls.parameter] = cls
148
149 sys.path.remove(filter_dir)
150
151 return avail_filters
152
153 _avail_filters = dynamic_filter_loader()
154
155 def available_filters():
156 """Return list of available filters."""
157 return _avail_filters.keys()
158
159 def str_to_filters(filters):
160 """Convert from filter string to array of filters.
161 E.g. ad_ration:0.9,min_depth:5
162
163 Parameters:
164 -----------
165 filters: str
166 String version of filters, separated by comma.
167
168 Returns:
169 --------
170 list:
171 List of :py:class:`phe.variant_filters.PHEFilterBase` instances.
172 """
173
174 config = {}
175 for kv_pair in filters.split(","):
176 pair = kv_pair.split(":")
177 assert len(pair) == 2, "Filters should be separated by ':' %s" % kv_pair
178
179 # We don't care about casting them to correct type because Filters
180 # will do it for us.
181 config[pair[0]] = pair[1]
182
183 return make_filters(config)
184
185 def make_filters(config):
186 """Create a list of filters from *config*.
187
188 Parameters:
189 -----------
190 config: dict, optional
191 Dictionary with parameter: value pairs. For each parameter, an
192 appropriate Filter will be found and instanciated.
193
194 Returns:
195 --------
196 list:
197 List of :py:class:`PHEFilterBase` filters.
198 """
199 filters = []
200
201 if config:
202 for custom_filter in config:
203 if custom_filter in _avail_filters:
204 filters.append(_avail_filters[custom_filter](config))
205 else:
206 logging.warn("Could not find appropriate filter for %s",
207 custom_filter)
208
209 return filters