22
|
1 """Classes and functions for working with variant filters."""
|
|
2
|
|
3 from __builtin__ import __import__
|
|
4 from abc import abstractproperty
|
|
5 import abc
|
|
6 import argparse
|
|
7 import glob
|
|
8 import inspect
|
|
9 import logging
|
|
10 import os
|
|
11 import re
|
|
12 import sys
|
|
13
|
|
14 import vcf
|
|
15 import vcf.filters
|
|
16 from vcf.parser import _Filter
|
|
17
|
|
18 IUPAC_CODES = {frozenset(["A"]): "A",
|
|
19 frozenset(["C"]): "C",
|
|
20 frozenset(["G"]): "G",
|
|
21 frozenset(["T"]): "T",
|
|
22 frozenset(["A", "G"]): "R",
|
|
23 frozenset(["C", "T"]): "Y",
|
|
24 frozenset(["G", "C"]): "S",
|
|
25 frozenset(["A", "T"]): "W",
|
|
26 frozenset(["G", "T"]): "K",
|
|
27 frozenset(["A", "C"]): "M",
|
|
28 frozenset(["C", "G", "T"]): "B",
|
|
29 frozenset(["A", "G", "T"]): "D",
|
|
30 frozenset(["A", "C", "T"]): "H",
|
|
31 frozenset(["A", "C", "G"]): "V"
|
|
32 }
|
|
33
|
|
34 class PHEFilterBase(vcf.filters.Base):
|
|
35 """Base class for VCF filters."""
|
|
36 __meta__ = abc.ABCMeta
|
|
37
|
|
38 magic_sep = ":"
|
|
39 decoder_pattern = re.compile(magic_sep)
|
|
40
|
|
41 @abc.abstractproperty
|
|
42 def parameter(self):
|
|
43 """Short name of parameter being filtered."""
|
|
44 return self.parameter
|
|
45
|
|
46 @abc.abstractproperty
|
|
47 def _default_threshold(self):
|
|
48 """Default threshold for filtering."""
|
|
49 return self._default_threshold
|
|
50
|
|
51 def __init__(self, args):
|
|
52 super(PHEFilterBase, self).__init__(args)
|
|
53
|
|
54 # Change the threshold to custom gq value.
|
|
55 self.threshold = self._default_threshold
|
|
56
|
|
57 if isinstance(args, dict):
|
|
58 self.threshold = args.get(self.parameter)
|
|
59
|
|
60 def __str__(self):
|
|
61 return self.filter_name()
|
|
62
|
|
63 def _check_record(self, record):
|
|
64 if self.is_uncallable(record):
|
|
65 return False
|
|
66 elif record.is_monomorphic:
|
|
67 return None
|
|
68 else:
|
|
69 return True
|
|
70
|
|
71 @abc.abstractmethod
|
|
72 def short_desc(self):
|
|
73 """Short description of the filter (included in VCF)."""
|
|
74 raise NotImplementedError("Get short description is not implemented.")
|
|
75
|
|
76 def get_config(self):
|
|
77 """This is used for reconstructing filter."""
|
|
78 return {self.parameter: self.threshold}
|
|
79
|
|
80 def filter_name(self):
|
|
81 """Create filter names by their parameter separated by magic.
|
|
82 E.g. if filter parameter is ad_ratio and threshold is 0.9 then
|
|
83 ad_ratio:0.9 if the filter name.
|
|
84 """
|
|
85 return "%s%s%s" % (self.parameter, self.magic_sep, self.threshold)
|
|
86
|
|
87 @staticmethod
|
|
88 def decode(filter_id):
|
|
89 """Decode name of filter."""
|
|
90 conf = {}
|
|
91
|
|
92 if PHEFilterBase.magic_sep in filter_id:
|
|
93 info = PHEFilterBase.decoder_pattern.split(filter_id)
|
|
94 assert len(info) == 2
|
|
95 conf[info[0]] = info[1]
|
|
96 return conf
|
|
97
|
|
98 def is_gap(self):
|
|
99 return False
|
|
100
|
|
101 def is_n(self):
|
|
102 return True
|
|
103
|
|
104 def is_uncallable(self, record):
|
|
105 """Filter a :py:class:`vcf.model._Record`."""
|
|
106
|
|
107 if len(record.samples) > 1:
|
|
108 logging.warn("More than 1 sample detected. Only first is considered.")
|
|
109
|
|
110 try:
|
|
111 record_gt = record.samples[0].data.GT
|
|
112 except AttributeError:
|
|
113 logging.warn("Could not retrieve GQ score POS %i", record.POS)
|
|
114 record_gt = "./."
|
|
115
|
|
116 if record_gt == "./.":
|
|
117 return True
|
|
118 else:
|
|
119 return False
|
|
120
|
|
121 @staticmethod
|
|
122 def call_concensus(record):
|
|
123 if not record.FILTER:
|
|
124 sample_ad = frozenset([str(c).upper() for c in record.ALT])
|
|
125 return IUPAC_CODES.get(sample_ad, "N")
|
|
126
|
|
127 else:
|
|
128 sample_ad = frozenset([str(c).upper() for c in record.ALT] + [record.REF])
|
|
129
|
|
130 return IUPAC_CODES.get(sample_ad, "N")
|
|
131
|
|
132 def dynamic_filter_loader():
|
|
133 """Fancy way of dynamically importing existing filters.
|
|
134
|
|
135 Returns
|
|
136 -------
|
|
137 dict:
|
|
138 Available filters dictionary. Keys are parameters that
|
|
139 can be supplied to the filters.
|
|
140 """
|
|
141
|
|
142 # We assume the filters are in the same directory as THIS file.
|
|
143 filter_dir = os.path.dirname(__file__)
|
|
144 filter_dir = os.path.abspath(filter_dir)
|
|
145
|
|
146 # This is populated when the module is first imported.
|
|
147 avail_filters = {}
|
|
148
|
|
149 # Add this directory to the syspath.
|
|
150 sys.path.insert(0, filter_dir)
|
|
151
|
|
152 # Find all "py" files.
|
|
153 for filter_mod in glob.glob(os.path.join(filter_dir, "*.py")):
|
|
154
|
|
155 # Derive name of the module where filter is.
|
|
156 filter_mod_file = os.path.basename(filter_mod)
|
|
157
|
|
158 # Ignore this file, obviously.
|
|
159 if filter_mod_file.startswith("__init__"):
|
|
160 continue
|
|
161
|
|
162 # Import the module with a filter.
|
|
163 mod = __import__(filter_mod_file.replace(".pyc", "").replace(".py", ""))
|
|
164
|
|
165 # Find all the classes contained in this module.
|
|
166 classes = inspect.getmembers(mod, inspect.isclass)
|
|
167 for cls_name, cls in classes:
|
|
168 # For each class, if it is a sublass of PHEFilterBase, add it.
|
|
169 if cls_name != "PHEFilterBase" and issubclass(cls, PHEFilterBase):
|
|
170 # The parameters are inherited and defined within each filter.
|
|
171 avail_filters[cls.parameter] = cls
|
|
172
|
|
173 sys.path.remove(filter_dir)
|
|
174
|
|
175 return avail_filters
|
|
176
|
|
177 _avail_filters = dynamic_filter_loader()
|
|
178
|
|
179 def available_filters():
|
|
180 """Return list of available filters."""
|
|
181 return _avail_filters.keys()
|
|
182
|
|
183 def str_to_filters(filters):
|
|
184 """Convert from filter string to array of filters.
|
|
185 E.g. ad_ration:0.9,min_depth:5
|
|
186
|
|
187 Parameters:
|
|
188 -----------
|
|
189 filters: str
|
|
190 String version of filters, separated by comma.
|
|
191
|
|
192 Returns:
|
|
193 --------
|
|
194 list:
|
|
195 List of :py:class:`phe.variant_filters.PHEFilterBase` instances.
|
|
196 """
|
|
197
|
|
198 config = {}
|
|
199 for kv_pair in filters.split(","):
|
|
200 pair = kv_pair.split(":")
|
|
201 assert len(pair) == 2, "Filters should be separated by ':' %s" % kv_pair
|
|
202
|
|
203 # We don't care about casting them to correct type because Filters
|
|
204 # will do it for us.
|
|
205 config[pair[0]] = pair[1]
|
|
206
|
|
207 return make_filters(config)
|
|
208
|
|
209 def make_filters(config):
|
|
210 """Create a list of filters from *config*.
|
|
211
|
|
212 Parameters:
|
|
213 -----------
|
|
214 config: dict, optional
|
|
215 Dictionary with parameter: value pairs. For each parameter, an
|
|
216 appropriate Filter will be found and instanciated.
|
|
217
|
|
218 Returns:
|
|
219 --------
|
|
220 list:
|
|
221 List of :py:class:`PHEFilterBase` filters.
|
|
222 """
|
|
223 filters = []
|
|
224
|
|
225 if config:
|
|
226 for custom_filter in config:
|
|
227 if custom_filter in _avail_filters:
|
|
228 filters.append(_avail_filters[custom_filter](config))
|
|
229 else:
|
|
230 logging.error("Could not find appropriate filter for %s",
|
|
231 custom_filter)
|
|
232 raise Exception("Filter %s could not be created. Please check your filter arguments." % custom_filter)
|
|
233
|
|
234 return filters
|