Mercurial > repos > ulfschaefer > vcfs2fasta
view phe/variant/__init__.py @ 14:f72039c5faa4 draft
Uploaded
author | ulfschaefer |
---|---|
date | Wed, 16 Dec 2015 07:29:05 -0500 |
parents | |
children |
line wrap: on
line source
"""Classes and methods to work with variants and such.""" import abc #ulf # from collections import OrderedDict try: from collections import OrderedDict except ImportError: from ordereddict import OrderedDict import logging import pickle from vcf import filters import vcf from vcf.parser import _Filter from phe.metadata import PHEMetaData from phe.variant_filters import make_filters, PHEFilterBase, str_to_filters class VCFTemplate(object): """This is a small hack class for the Template used in generating VCF file.""" def __init__(self, vcf_reader): self.infos = vcf_reader.infos self.formats = vcf_reader.formats self.filters = vcf_reader.filters self.alts = vcf_reader.alts self.contigs = vcf_reader.contigs self.metadata = vcf_reader.metadata self._column_headers = vcf_reader._column_headers self.samples = vcf_reader.samples class VariantSet(object): """A convenient representation of set of variants. TODO: Implement iterator and generator for the variant set. """ _reader = None def __init__(self, vcf_in, filters=None): """Constructor of variant set. Parameters: ----------- vcf_in: str Path to the VCF file for loading information. filters: str or dict, optional Dictionary or string of the filter:threshold key value pairs. """ self.vcf_in = vcf_in self._reader = vcf.Reader(filename=vcf_in) self.out_template = VCFTemplate(self._reader) self.filters = [] if filters is not None: if isinstance(filters, str): self.filters = str_to_filters(filters) elif isinstance(filters, dict): self.filters = make_filters(config=filters) elif isinstance(filters, list): self.filters = filters else: logging.warn("Could not create filters from %s", filters) else: reader = vcf.Reader(filename=self.vcf_in) filters = {} for filter_id in reader.filters: filters.update(PHEFilterBase.decode(filter_id)) if filters: self.filters = make_filters(config=filters) self.variants = [] def filter_variants(self, keep_only_snps=True): """Create a variant """ if self._reader is None: # Create a reader class from input VCF. self._reader = vcf.Reader(filename=self.vcf_in) # get list of existing filters. existing_filters = {} removed_filters = [] for filter_id in self._reader.filters: conf = PHEFilterBase.decode(filter_id) tuple(conf.keys()) existing_filters.update({tuple(conf.keys()):filter_id}) # Add each filter we are going to use to the record. # This is needed for writing out proper #FILTER header in VCF. for record_filter in self.filters: # We know that each filter has short description method. short_doc = record_filter.short_desc() short_doc = short_doc.split('\n')[0].lstrip() filter_name = PHEFilterBase.decode(record_filter.filter_name()) # Check if the sample has been filtered for this type of filter # in the past. If so remove is, because it is going to be refiltered. if tuple(filter_name) in existing_filters: logging.info("Removing existing filter: %s", existing_filters[tuple(filter_name)]) removed_filters.append(existing_filters[tuple(filter_name)]) del self._reader.filters[existing_filters[tuple(filter_name)]] self._reader.filters[record_filter.filter_name()] = _Filter(record_filter.filter_name(), short_doc) # For each record (POSITION) apply set of filters. for record in self._reader: # If this record failed filters and we removed some, # check is they need to be removed from record. if isinstance(record.FILTER, list) and len(record.FILTER) > 0: for filter_id in removed_filters: if filter_id in record.FILTER: record.FILTER.remove(filter_id) for record_filter in self.filters: # Call to __call__ method in each filter. result = record_filter(record) # Record is KEPT if filter returns None if result == None: continue # If we got this far, then record is filtered OUT. record.add_filter(record_filter.filter_name()) # After applying all filters, check if FILTER is None. # If it is, then record PASSED all filters. if record.FILTER is None or record.FILTER == []: record.FILTER = 'PASS' # FIXME: Does this work for indels? if keep_only_snps and record.is_snp: self.variants.append(record) else: self.variants.append(record) self.update_filters(self._reader.filters) return [ variant for variant in self.variants if variant.FILTER == "PASS"] def add_metadata(self, info): """Add metadata to the variant set. Parameters: ----------- info: dict Dictionary of key value pairs to be inserted into metadata. """ for info_key, metadata in info.items(): self.out_template.metadata[info_key] = metadata def write_variants(self, vcf_out, only_snps=False, only_good=False): """Write variants to a VCF file. Parameters: ----------- vcf_out: str Path to the file where VCF data is written. only_snps: bool, optional True is *only* SNP are to be written to the output (default: False). only_good: bool, optional True if only those records that PASS all filters should be written (default: False). Returns: int: Number of records written. """ written_variants = 0 with open(vcf_out, "w") as out_vcf: writer = vcf.Writer(out_vcf, self.out_template) for record in self.variants: if only_snps and not record.is_snp: continue if only_good and record.FILTER != "PASS" or record.FILTER is None: continue writer.write_record(record) written_variants += 1 return written_variants def _write_bad_variants(self, vcf_out): """**PRIVATE:** Write only those records that **haven't** passed.""" written_variants = 0 with open(vcf_out, "w") as out_vcf: writer = vcf.Writer(out_vcf, self.out_template) for record in self.variants: if record.FILTER != "PASS" and record.FILTER is not None: writer.write_record(record) written_variants += 1 return written_variants def serialise(self, out_file): """Save the data in this class to a file for future use/reload. Parameters: ----------- out_file: str path to file where the data should be written to. Returns: -------- int: Number of variants written. """ written_variants = 0 with open(out_file, "w") as out_vcf: writer = vcf.Writer(out_vcf, self.out_template) for record in self.variants: writer.write_record(record) written_variants += 1 return written_variants def update_filters(self, new_filters): """Update internal filters in the output template.""" for new_filter, filter_data in new_filters.items(): self.out_template.filters[new_filter] = filter_data class VariantCaller(PHEMetaData): """Abstract class used for access to the implemented variant callers.""" __metaclass__ = abc.ABCMeta def __init__(self, cmd_options=None): """Constructor for variant caller. Parameters: ----------- cmd_options: str, optional Command options to pass to the variant command. """ self.cmd_options = cmd_options super(VariantCaller, self).__init__() @abc.abstractmethod def make_vcf(self, *args, **kwargs): """Create a VCF from **BAM** file. Parameters: ----------- ref: str Path to the reference file. bam: str Path to the indexed **BAM** file for calling variants. vcf_file: str path to the VCF file where data will be written to. Returns: -------- bool: True if variant calling was successful, False otherwise. """ raise NotImplementedError("make_vcf is not implemented yet.") @abc.abstractmethod def create_aux_files(self, ref): """Create needed (if any) auxiliary files. These files are required for proper functioning of the variant caller. """ raise NotImplementedError("create_aux_files is not implemeted.") @abc.abstractmethod def get_info(self, plain=False): """Get information about this variant caller.""" raise NotImplementedError("Get info has not been implemented yet." ) def get_meta(self): """Get the metadata about this variant caller.""" od = self.get_info() od["ID"] = "VariantCaller" return OrderedDict({"PHEVariantMetaData": [od]}) @abc.abstractmethod def get_version(self): """Get the version of the underlying command used.""" raise NotImplementedError("Get version has not been implemented yet.")