0
|
1 """Classes and functions for working with variant filters."""
|
|
2
|
|
3 from __builtin__ import __import__
|
|
4 from abc import abstractproperty
|
|
5 import abc
|
|
6 import argparse
|
|
7 import glob
|
|
8 import inspect
|
|
9 import logging
|
|
10 import os
|
|
11 import re
|
|
12 import sys
|
|
13
|
|
14 import vcf
|
|
15 import vcf.filters
|
|
16 from vcf.parser import _Filter
|
|
17
|
|
18 IUPAC_CODES = {frozenset(["A", "G"]): "R",
|
|
19 frozenset(["C", "T"]): "Y",
|
|
20 frozenset(["G", "C"]): "S",
|
|
21 frozenset(["A", "T"]): "W",
|
|
22 frozenset(["G", "T"]): "K",
|
|
23 frozenset(["A", "C"]): "M",
|
|
24 frozenset(["C", "G", "T"]): "B",
|
|
25 frozenset(["A", "G", "T"]): "D",
|
|
26 frozenset(["A", "C", "T"]): "H",
|
|
27 frozenset(["A", "C", "G"]): "V"
|
|
28 }
|
|
29
|
|
30 class PHEFilterBase(vcf.filters.Base):
|
|
31 """Base class for VCF filters."""
|
|
32 __meta__ = abc.ABCMeta
|
|
33
|
|
34 magic_sep = ":"
|
|
35 decoder_pattern = re.compile(magic_sep)
|
|
36
|
|
37 @abc.abstractproperty
|
|
38 def parameter(self):
|
|
39 """Short name of parameter being filtered."""
|
|
40 return self.parameter
|
|
41
|
|
42 @abc.abstractproperty
|
|
43 def _default_threshold(self):
|
|
44 """Default threshold for filtering."""
|
|
45 return self._default_threshold
|
|
46
|
|
47 def __init__(self, args):
|
|
48 super(PHEFilterBase, self).__init__(args)
|
|
49
|
|
50 # Change the threshold to custom gq value.
|
|
51 self.threshold = self._default_threshold
|
|
52
|
|
53 if isinstance(args, dict):
|
|
54 self.threshold = args.get(self.parameter)
|
|
55
|
|
56 def __str__(self):
|
|
57 return self.filter_name()
|
|
58
|
|
59 @abc.abstractmethod
|
|
60 def short_desc(self):
|
|
61 """Short description of the filter (included in VCF)."""
|
|
62 raise NotImplementedError("Get short description is not implemented.")
|
|
63
|
|
64 def get_config(self):
|
|
65 """This is used for reconstructing filter."""
|
|
66 return {self.parameter: self.threshold}
|
|
67
|
|
68 def filter_name(self):
|
|
69 """Create filter names by their parameter separated by magic.
|
|
70 E.g. if filter parameter is ad_ratio and threshold is 0.9 then
|
|
71 ad_ratio:0.9 if the filter name.
|
|
72 """
|
|
73 return "%s%s%s" % (self.parameter, self.magic_sep, self.threshold)
|
|
74
|
|
75 @staticmethod
|
|
76 def decode(filter_id):
|
|
77 """Decode name of filter."""
|
|
78 conf = {}
|
|
79
|
|
80 if PHEFilterBase.magic_sep in filter_id:
|
|
81 info = PHEFilterBase.decoder_pattern.split(filter_id)
|
|
82 assert len(info) == 2
|
|
83 conf[info[0]] = info[1]
|
|
84 return conf
|
|
85
|
|
86 def is_gap(self):
|
|
87 return False
|
|
88
|
|
89 def is_n(self):
|
|
90 return True
|
|
91
|
|
92 @staticmethod
|
|
93 def call_concensus(record):
|
|
94 extended_code = "N"
|
|
95 try:
|
|
96 sample_ad = set([str(c) for c in record.ALT] + [record.REF])
|
|
97
|
|
98
|
|
99 for code, cov in IUPAC_CODES.items():
|
|
100 if sample_ad == cov:
|
|
101 extended_code = code
|
|
102 break
|
|
103 except AttributeError:
|
|
104 extended_code = "N"
|
|
105
|
|
106 return extended_code
|
|
107
|
|
108 def dynamic_filter_loader():
|
|
109 """Fancy way of dynamically importing existing filters.
|
|
110
|
|
111 Returns
|
|
112 -------
|
|
113 dict:
|
|
114 Available filters dictionary. Keys are parameters that
|
|
115 can be supplied to the filters.
|
|
116 """
|
|
117
|
|
118 # We assume the filters are in the same directory as THIS file.
|
|
119 filter_dir = os.path.dirname(__file__)
|
|
120 filter_dir = os.path.abspath(filter_dir)
|
|
121
|
|
122 # This is populated when the module is first imported.
|
|
123 avail_filters = {}
|
|
124
|
|
125 # Add this directory to the syspath.
|
|
126 sys.path.insert(0, filter_dir)
|
|
127
|
|
128 # Find all "py" files.
|
|
129 for filter_mod in glob.glob(os.path.join(filter_dir, "*.py")):
|
|
130
|
|
131 # Derive name of the module where filter is.
|
|
132 filter_mod_file = os.path.basename(filter_mod)
|
|
133
|
|
134 # Ignore this file, obviously.
|
|
135 if filter_mod_file.startswith("__init__"):
|
|
136 continue
|
|
137
|
|
138 # Import the module with a filter.
|
|
139 mod = __import__(filter_mod_file.replace(".pyc", "").replace(".py", ""))
|
|
140
|
|
141 # Find all the classes contained in this module.
|
|
142 classes = inspect.getmembers(mod, inspect.isclass)
|
|
143 for cls_name, cls in classes:
|
|
144 # For each class, if it is a sublass of PHEFilterBase, add it.
|
|
145 if cls_name != "PHEFilterBase" and issubclass(cls, PHEFilterBase):
|
|
146 # The parameters are inherited and defined within each filter.
|
|
147 avail_filters[cls.parameter] = cls
|
|
148
|
|
149 sys.path.remove(filter_dir)
|
|
150
|
|
151 return avail_filters
|
|
152
|
|
153 _avail_filters = dynamic_filter_loader()
|
|
154
|
|
155 def available_filters():
|
|
156 """Return list of available filters."""
|
|
157 return _avail_filters.keys()
|
|
158
|
|
159 def str_to_filters(filters):
|
|
160 """Convert from filter string to array of filters.
|
|
161 E.g. ad_ration:0.9,min_depth:5
|
|
162
|
|
163 Parameters:
|
|
164 -----------
|
|
165 filters: str
|
|
166 String version of filters, separated by comma.
|
|
167
|
|
168 Returns:
|
|
169 --------
|
|
170 list:
|
|
171 List of :py:class:`phe.variant_filters.PHEFilterBase` instances.
|
|
172 """
|
|
173
|
|
174 config = {}
|
|
175 for kv_pair in filters.split(","):
|
|
176 pair = kv_pair.split(":")
|
|
177 assert len(pair) == 2, "Filters should be separated by ':' %s" % kv_pair
|
|
178
|
|
179 # We don't care about casting them to correct type because Filters
|
|
180 # will do it for us.
|
|
181 config[pair[0]] = pair[1]
|
|
182
|
|
183 return make_filters(config)
|
|
184
|
|
185 def make_filters(config):
|
|
186 """Create a list of filters from *config*.
|
|
187
|
|
188 Parameters:
|
|
189 -----------
|
|
190 config: dict, optional
|
|
191 Dictionary with parameter: value pairs. For each parameter, an
|
|
192 appropriate Filter will be found and instanciated.
|
|
193
|
|
194 Returns:
|
|
195 --------
|
|
196 list:
|
|
197 List of :py:class:`PHEFilterBase` filters.
|
|
198 """
|
|
199 filters = []
|
|
200
|
|
201 if config:
|
|
202 for custom_filter in config:
|
|
203 if custom_filter in _avail_filters:
|
|
204 filters.append(_avail_filters[custom_filter](config))
|
|
205 else:
|
|
206 logging.warn("Could not find appropriate filter for %s",
|
|
207 custom_filter)
|
|
208
|
|
209 return filters
|