Mercurial > repos > ulfschaefer > vcfs2fasta
comparison phe/variant/__init__.py @ 22:96f393ad7fc6 draft default tip
Uploaded
author | ulfschaefer |
---|---|
date | Wed, 23 Dec 2015 04:50:58 -0500 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
21:b09ffe50c378 | 22:96f393ad7fc6 |
---|---|
1 """Classes and methods to work with _variants and such.""" | |
2 import abc | |
3 from collections import OrderedDict | |
4 import gzip | |
5 import logging | |
6 import pickle | |
7 | |
8 from vcf import filters | |
9 import vcf | |
10 from vcf.parser import _Filter | |
11 | |
12 from phe.metadata import PHEMetaData | |
13 from phe.variant_filters import make_filters, PHEFilterBase, str_to_filters | |
14 | |
15 | |
16 class VCFTemplate(object): | |
17 """This is a small hack class for the Template used in generating | |
18 VCF file.""" | |
19 | |
20 def __init__(self, vcf_reader): | |
21 self.infos = vcf_reader.infos | |
22 self.formats = vcf_reader.formats | |
23 self.filters = vcf_reader.filters | |
24 self.alts = vcf_reader.alts | |
25 self.contigs = vcf_reader.contigs | |
26 self.metadata = vcf_reader.metadata | |
27 self._column_headers = vcf_reader._column_headers | |
28 self.samples = vcf_reader.samples | |
29 | |
30 class VariantSet(object): | |
31 """A convenient representation of set of _variants. | |
32 TODO: Implement iterator and generator for the variant set. | |
33 """ | |
34 | |
35 _reader = None | |
36 | |
37 def __init__(self, vcf_in, filters=None): | |
38 """Constructor of variant set. | |
39 | |
40 Parameters: | |
41 ----------- | |
42 vcf_in: str | |
43 Path to the VCF file for loading information. | |
44 filters: str or dict, optional | |
45 Dictionary or string of the filter:threshold key value pairs. | |
46 """ | |
47 self.vcf_in = vcf_in | |
48 self._reader = vcf.Reader(filename=vcf_in) | |
49 self.out_template = VCFTemplate(self._reader) | |
50 | |
51 self.filters = [] | |
52 if filters is not None: | |
53 if isinstance(filters, str): | |
54 self.filters = str_to_filters(filters) | |
55 elif isinstance(filters, dict): | |
56 self.filters = make_filters(config=filters) | |
57 elif isinstance(filters, list): | |
58 self.filters = filters | |
59 else: | |
60 logging.warn("Could not create filters from %s", filters) | |
61 else: | |
62 reader = vcf.Reader(filename=self.vcf_in) | |
63 filters = {} | |
64 for filter_id in reader.filters: | |
65 filters.update(PHEFilterBase.decode(filter_id)) | |
66 | |
67 if filters: | |
68 self.filters = make_filters(config=filters) | |
69 | |
70 self._variants = [] | |
71 | |
72 def __iter__(self): | |
73 '''Iterator over **all** variants''' | |
74 return self.variants(only_good=False) | |
75 | |
76 def variants(self, only_good=False): | |
77 '''Generator over the variant set. | |
78 Parameters: | |
79 ----------- | |
80 only_good: bool, optional | |
81 Iff True good and bad variants are returned, | |
82 otherwise only good are returned (default: False). | |
83 ''' | |
84 for var in self._variants: | |
85 if not only_good: | |
86 yield var | |
87 elif not var.FILTER : | |
88 yield var | |
89 | |
90 def filter_variants(self, keep_only_snps=True): | |
91 """Create a variant """ | |
92 | |
93 if self._reader is None: | |
94 # Create a reader class from input VCF. | |
95 self._reader = vcf.Reader(filename=self.vcf_in) | |
96 | |
97 # get list of existing filters. | |
98 existing_filters = {} | |
99 removed_filters = [] | |
100 | |
101 for filter_id in self._reader.filters: | |
102 conf = PHEFilterBase.decode(filter_id) | |
103 tuple(conf.keys()) | |
104 existing_filters.update({tuple(conf.keys()):filter_id}) | |
105 | |
106 # Add each filter we are going to use to the record. | |
107 # This is needed for writing out proper #FILTER header in VCF. | |
108 for record_filter in self.filters: | |
109 # We know that each filter has short description method. | |
110 short_doc = record_filter.short_desc() | |
111 short_doc = short_doc.split('\n')[0].lstrip() | |
112 | |
113 filter_name = PHEFilterBase.decode(record_filter.filter_name()) | |
114 | |
115 # Check if the sample has been filtered for this type of filter | |
116 # in the past. If so remove is, because it is going to be refiltered. | |
117 if tuple(filter_name) in existing_filters: | |
118 logging.info("Removing existing filter: %s", existing_filters[tuple(filter_name)]) | |
119 removed_filters.append(existing_filters[tuple(filter_name)]) | |
120 del self._reader.filters[existing_filters[tuple(filter_name)]] | |
121 | |
122 self._reader.filters[record_filter.filter_name()] = _Filter(record_filter.filter_name(), short_doc) | |
123 | |
124 # For each record (POSITION) apply set of filters. | |
125 for record in self._reader: | |
126 | |
127 # If this record failed filters and we removed some, | |
128 # check is they need to be removed from record. | |
129 if isinstance(record.FILTER, list) and len(record.FILTER) > 0: | |
130 for filter_id in removed_filters: | |
131 if filter_id in record.FILTER: | |
132 record.FILTER.remove(filter_id) | |
133 | |
134 for record_filter in self.filters: | |
135 | |
136 # Call to __call__ method in each filter. | |
137 result = record_filter(record) | |
138 | |
139 # Record is KEPT if filter returns None | |
140 if result is None: | |
141 continue | |
142 | |
143 # If we got this far, then record is filtered OUT. | |
144 record.add_filter(record_filter.filter_name()) | |
145 | |
146 # After applying all filters, check if FILTER is None. | |
147 # If it is, then record PASSED all filters. | |
148 if record.FILTER is None or record.FILTER == []: | |
149 record.FILTER = [] | |
150 # FIXME: Does this work for indels? | |
151 if keep_only_snps and record.is_snp: | |
152 self._variants.append(record) | |
153 else: | |
154 self._variants.append(record) | |
155 | |
156 self._update_filters(self._reader.filters) | |
157 | |
158 return [ variant for variant in self._variants if not variant.FILTER] | |
159 | |
160 def add_metadata(self, info): | |
161 """Add metadata to the variant set. | |
162 | |
163 Parameters: | |
164 ----------- | |
165 info: dict | |
166 Dictionary of key value pairs to be inserted into metadata. | |
167 """ | |
168 for info_key, metadata in info.items(): | |
169 self.out_template.metadata[info_key] = metadata | |
170 | |
171 def write_variants(self, vcf_out, only_snps=False, only_good=False): | |
172 """Write _variants to a VCF file. | |
173 | |
174 Parameters: | |
175 ----------- | |
176 vcf_out: str | |
177 Path to the file where VCF data is written. | |
178 only_snps: bool, optional | |
179 True is *only* SNP are to be written to the output (default: False). | |
180 only_good: bool, optional | |
181 True if only those records that PASS all filters should be written | |
182 (default: False). | |
183 | |
184 Returns: | |
185 int: | |
186 Number of records written. | |
187 """ | |
188 written_variants = 0 | |
189 | |
190 # Check if the output file ends with .gz, then compress data. | |
191 open_func = gzip.open if vcf_out.endswith(".gz") else open | |
192 | |
193 with open_func(vcf_out, "w") as out_vcf: | |
194 writer = vcf.Writer(out_vcf, self.out_template) | |
195 | |
196 # Output internal _variants (if exist) otherwise, output data from reader. | |
197 variants = self._variants if self._variants else self._reader | |
198 | |
199 for record in variants: | |
200 | |
201 if only_snps and not record.is_snp: | |
202 continue | |
203 | |
204 if only_good and record.FILTER: | |
205 continue | |
206 | |
207 writer.write_record(record) | |
208 written_variants += 1 | |
209 | |
210 return written_variants | |
211 | |
212 def _write_bad_variants(self, vcf_out): | |
213 """**PRIVATE:** Write only those records that **haven't** passed.""" | |
214 written_variants = 0 | |
215 # Check if the output file ends with .gz, then compress data. | |
216 open_func = gzip.open if vcf_out.endswith(".gz") else open | |
217 with open_func(vcf_out, "w") as out_vcf: | |
218 writer = vcf.Writer(out_vcf, self.out_template) | |
219 for record in self._variants: | |
220 if record.FILTER != "PASS" and record.FILTER is not None: | |
221 writer.write_record(record) | |
222 written_variants += 1 | |
223 return written_variants | |
224 | |
225 def _update_filters(self, new_filters): | |
226 """Update internal filters in the output template.""" | |
227 for new_filter, filter_data in new_filters.items(): | |
228 self.out_template.filters[new_filter] = filter_data | |
229 | |
230 | |
231 class VariantCaller(PHEMetaData): | |
232 """Abstract class used for access to the implemented variant callers.""" | |
233 | |
234 __metaclass__ = abc.ABCMeta | |
235 | |
236 def __init__(self, cmd_options=None): | |
237 """Constructor for variant caller. | |
238 | |
239 Parameters: | |
240 ----------- | |
241 cmd_options: str, optional | |
242 Command options to pass to the variant command. | |
243 """ | |
244 self.cmd_options = cmd_options | |
245 | |
246 super(VariantCaller, self).__init__() | |
247 | |
248 @abc.abstractmethod | |
249 def make_vcf(self, *args, **kwargs): | |
250 """Create a VCF from **BAM** file. | |
251 | |
252 Parameters: | |
253 ----------- | |
254 ref: str | |
255 Path to the reference file. | |
256 bam: str | |
257 Path to the indexed **BAM** file for calling _variants. | |
258 vcf_file: str | |
259 path to the VCF file where data will be written to. | |
260 make_aux: bool, optional | |
261 True/False create auxilliary files (default: False). | |
262 | |
263 Returns: | |
264 -------- | |
265 bool: | |
266 True if variant calling was successful, False otherwise. | |
267 """ | |
268 raise NotImplementedError("make_vcf is not implemented yet.") | |
269 | |
270 @abc.abstractmethod | |
271 def create_aux_files(self, ref): | |
272 """Create needed (if any) auxiliary files. | |
273 These files are required for proper functioning of the variant caller. | |
274 """ | |
275 raise NotImplementedError("create_aux_files is not implemeted.") | |
276 | |
277 @abc.abstractmethod | |
278 def get_info(self, plain=False): | |
279 """Get information about this variant caller.""" | |
280 raise NotImplementedError("Get info has not been implemented yet." | |
281 ) | |
282 def get_meta(self): | |
283 """Get the metadata about this variant caller.""" | |
284 od = self.get_info() | |
285 od["ID"] = "VariantCaller" | |
286 return OrderedDict({"PHEVariantMetaData": [od]}) | |
287 | |
288 @abc.abstractmethod | |
289 def get_version(self): | |
290 """Get the version of the underlying command used.""" | |
291 raise NotImplementedError("Get version has not been implemented yet.") |