Mercurial > repos > ulfschaefer > vcfs2fasta
comparison phe/variant/__init__.py @ 14:f72039c5faa4 draft
Uploaded
author | ulfschaefer |
---|---|
date | Wed, 16 Dec 2015 07:29:05 -0500 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
13:2e69ce9dca65 | 14:f72039c5faa4 |
---|---|
1 """Classes and methods to work with variants and such.""" | |
2 import abc | |
3 #ulf | |
4 # from collections import OrderedDict | |
5 try: | |
6 from collections import OrderedDict | |
7 except ImportError: | |
8 from ordereddict import OrderedDict | |
9 | |
10 import logging | |
11 import pickle | |
12 | |
13 from vcf import filters | |
14 import vcf | |
15 from vcf.parser import _Filter | |
16 | |
17 from phe.metadata import PHEMetaData | |
18 from phe.variant_filters import make_filters, PHEFilterBase, str_to_filters | |
19 | |
20 | |
21 class VCFTemplate(object): | |
22 """This is a small hack class for the Template used in generating | |
23 VCF file.""" | |
24 | |
25 def __init__(self, vcf_reader): | |
26 self.infos = vcf_reader.infos | |
27 self.formats = vcf_reader.formats | |
28 self.filters = vcf_reader.filters | |
29 self.alts = vcf_reader.alts | |
30 self.contigs = vcf_reader.contigs | |
31 self.metadata = vcf_reader.metadata | |
32 self._column_headers = vcf_reader._column_headers | |
33 self.samples = vcf_reader.samples | |
34 | |
35 class VariantSet(object): | |
36 """A convenient representation of set of variants. | |
37 TODO: Implement iterator and generator for the variant set. | |
38 """ | |
39 | |
40 _reader = None | |
41 | |
42 def __init__(self, vcf_in, filters=None): | |
43 """Constructor of variant set. | |
44 | |
45 Parameters: | |
46 ----------- | |
47 vcf_in: str | |
48 Path to the VCF file for loading information. | |
49 filters: str or dict, optional | |
50 Dictionary or string of the filter:threshold key value pairs. | |
51 """ | |
52 self.vcf_in = vcf_in | |
53 self._reader = vcf.Reader(filename=vcf_in) | |
54 self.out_template = VCFTemplate(self._reader) | |
55 | |
56 self.filters = [] | |
57 if filters is not None: | |
58 if isinstance(filters, str): | |
59 self.filters = str_to_filters(filters) | |
60 elif isinstance(filters, dict): | |
61 self.filters = make_filters(config=filters) | |
62 elif isinstance(filters, list): | |
63 self.filters = filters | |
64 else: | |
65 logging.warn("Could not create filters from %s", filters) | |
66 else: | |
67 reader = vcf.Reader(filename=self.vcf_in) | |
68 filters = {} | |
69 for filter_id in reader.filters: | |
70 filters.update(PHEFilterBase.decode(filter_id)) | |
71 | |
72 if filters: | |
73 self.filters = make_filters(config=filters) | |
74 | |
75 self.variants = [] | |
76 | |
77 def filter_variants(self, keep_only_snps=True): | |
78 """Create a variant """ | |
79 | |
80 if self._reader is None: | |
81 # Create a reader class from input VCF. | |
82 self._reader = vcf.Reader(filename=self.vcf_in) | |
83 | |
84 # get list of existing filters. | |
85 existing_filters = {} | |
86 removed_filters = [] | |
87 | |
88 for filter_id in self._reader.filters: | |
89 conf = PHEFilterBase.decode(filter_id) | |
90 tuple(conf.keys()) | |
91 existing_filters.update({tuple(conf.keys()):filter_id}) | |
92 | |
93 # Add each filter we are going to use to the record. | |
94 # This is needed for writing out proper #FILTER header in VCF. | |
95 for record_filter in self.filters: | |
96 # We know that each filter has short description method. | |
97 short_doc = record_filter.short_desc() | |
98 short_doc = short_doc.split('\n')[0].lstrip() | |
99 | |
100 filter_name = PHEFilterBase.decode(record_filter.filter_name()) | |
101 | |
102 # Check if the sample has been filtered for this type of filter | |
103 # in the past. If so remove is, because it is going to be refiltered. | |
104 if tuple(filter_name) in existing_filters: | |
105 logging.info("Removing existing filter: %s", existing_filters[tuple(filter_name)]) | |
106 removed_filters.append(existing_filters[tuple(filter_name)]) | |
107 del self._reader.filters[existing_filters[tuple(filter_name)]] | |
108 | |
109 self._reader.filters[record_filter.filter_name()] = _Filter(record_filter.filter_name(), short_doc) | |
110 | |
111 # For each record (POSITION) apply set of filters. | |
112 for record in self._reader: | |
113 | |
114 # If this record failed filters and we removed some, | |
115 # check is they need to be removed from record. | |
116 if isinstance(record.FILTER, list) and len(record.FILTER) > 0: | |
117 for filter_id in removed_filters: | |
118 if filter_id in record.FILTER: | |
119 record.FILTER.remove(filter_id) | |
120 | |
121 for record_filter in self.filters: | |
122 | |
123 # Call to __call__ method in each filter. | |
124 result = record_filter(record) | |
125 | |
126 # Record is KEPT if filter returns None | |
127 if result == None: | |
128 continue | |
129 | |
130 # If we got this far, then record is filtered OUT. | |
131 record.add_filter(record_filter.filter_name()) | |
132 | |
133 # After applying all filters, check if FILTER is None. | |
134 # If it is, then record PASSED all filters. | |
135 if record.FILTER is None or record.FILTER == []: | |
136 record.FILTER = 'PASS' | |
137 # FIXME: Does this work for indels? | |
138 if keep_only_snps and record.is_snp: | |
139 self.variants.append(record) | |
140 else: | |
141 self.variants.append(record) | |
142 | |
143 self.update_filters(self._reader.filters) | |
144 | |
145 return [ variant for variant in self.variants if variant.FILTER == "PASS"] | |
146 | |
147 def add_metadata(self, info): | |
148 """Add metadata to the variant set. | |
149 | |
150 Parameters: | |
151 ----------- | |
152 info: dict | |
153 Dictionary of key value pairs to be inserted into metadata. | |
154 """ | |
155 for info_key, metadata in info.items(): | |
156 self.out_template.metadata[info_key] = metadata | |
157 | |
158 def write_variants(self, vcf_out, only_snps=False, only_good=False): | |
159 """Write variants to a VCF file. | |
160 | |
161 Parameters: | |
162 ----------- | |
163 vcf_out: str | |
164 Path to the file where VCF data is written. | |
165 only_snps: bool, optional | |
166 True is *only* SNP are to be written to the output (default: False). | |
167 only_good: bool, optional | |
168 True if only those records that PASS all filters should be written | |
169 (default: False). | |
170 | |
171 Returns: | |
172 int: | |
173 Number of records written. | |
174 """ | |
175 written_variants = 0 | |
176 with open(vcf_out, "w") as out_vcf: | |
177 writer = vcf.Writer(out_vcf, self.out_template) | |
178 for record in self.variants: | |
179 | |
180 if only_snps and not record.is_snp: | |
181 continue | |
182 | |
183 if only_good and record.FILTER != "PASS" or record.FILTER is None: | |
184 continue | |
185 | |
186 writer.write_record(record) | |
187 written_variants += 1 | |
188 | |
189 return written_variants | |
190 | |
191 def _write_bad_variants(self, vcf_out): | |
192 """**PRIVATE:** Write only those records that **haven't** passed.""" | |
193 written_variants = 0 | |
194 with open(vcf_out, "w") as out_vcf: | |
195 writer = vcf.Writer(out_vcf, self.out_template) | |
196 for record in self.variants: | |
197 if record.FILTER != "PASS" and record.FILTER is not None: | |
198 writer.write_record(record) | |
199 written_variants += 1 | |
200 return written_variants | |
201 | |
202 def serialise(self, out_file): | |
203 """Save the data in this class to a file for future use/reload. | |
204 | |
205 Parameters: | |
206 ----------- | |
207 out_file: str | |
208 path to file where the data should be written to. | |
209 | |
210 Returns: | |
211 -------- | |
212 int: | |
213 Number of variants written. | |
214 """ | |
215 written_variants = 0 | |
216 with open(out_file, "w") as out_vcf: | |
217 writer = vcf.Writer(out_vcf, self.out_template) | |
218 for record in self.variants: | |
219 writer.write_record(record) | |
220 written_variants += 1 | |
221 | |
222 return written_variants | |
223 | |
224 def update_filters(self, new_filters): | |
225 """Update internal filters in the output template.""" | |
226 for new_filter, filter_data in new_filters.items(): | |
227 self.out_template.filters[new_filter] = filter_data | |
228 | |
229 | |
230 class VariantCaller(PHEMetaData): | |
231 """Abstract class used for access to the implemented variant callers.""" | |
232 | |
233 __metaclass__ = abc.ABCMeta | |
234 | |
235 def __init__(self, cmd_options=None): | |
236 """Constructor for variant caller. | |
237 | |
238 Parameters: | |
239 ----------- | |
240 cmd_options: str, optional | |
241 Command options to pass to the variant command. | |
242 """ | |
243 self.cmd_options = cmd_options | |
244 | |
245 super(VariantCaller, self).__init__() | |
246 | |
247 @abc.abstractmethod | |
248 def make_vcf(self, *args, **kwargs): | |
249 """Create a VCF from **BAM** file. | |
250 | |
251 Parameters: | |
252 ----------- | |
253 ref: str | |
254 Path to the reference file. | |
255 bam: str | |
256 Path to the indexed **BAM** file for calling variants. | |
257 vcf_file: str | |
258 path to the VCF file where data will be written to. | |
259 | |
260 Returns: | |
261 -------- | |
262 bool: | |
263 True if variant calling was successful, False otherwise. | |
264 """ | |
265 raise NotImplementedError("make_vcf is not implemented yet.") | |
266 | |
267 @abc.abstractmethod | |
268 def create_aux_files(self, ref): | |
269 """Create needed (if any) auxiliary files. | |
270 These files are required for proper functioning of the variant caller. | |
271 """ | |
272 raise NotImplementedError("create_aux_files is not implemeted.") | |
273 | |
274 @abc.abstractmethod | |
275 def get_info(self, plain=False): | |
276 """Get information about this variant caller.""" | |
277 raise NotImplementedError("Get info has not been implemented yet." | |
278 ) | |
279 def get_meta(self): | |
280 """Get the metadata about this variant caller.""" | |
281 od = self.get_info() | |
282 od["ID"] = "VariantCaller" | |
283 return OrderedDict({"PHEVariantMetaData": [od]}) | |
284 | |
285 @abc.abstractmethod | |
286 def get_version(self): | |
287 """Get the version of the underlying command used.""" | |
288 raise NotImplementedError("Get version has not been implemented yet.") |