Mercurial > repos > ulfschaefer > vcfs2fasta
diff phe/variant/__init__.py @ 14:f72039c5faa4 draft
Uploaded
author | ulfschaefer |
---|---|
date | Wed, 16 Dec 2015 07:29:05 -0500 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/phe/variant/__init__.py Wed Dec 16 07:29:05 2015 -0500 @@ -0,0 +1,288 @@ +"""Classes and methods to work with variants and such.""" +import abc +#ulf +# from collections import OrderedDict +try: + from collections import OrderedDict +except ImportError: + from ordereddict import OrderedDict + +import logging +import pickle + +from vcf import filters +import vcf +from vcf.parser import _Filter + +from phe.metadata import PHEMetaData +from phe.variant_filters import make_filters, PHEFilterBase, str_to_filters + + +class VCFTemplate(object): + """This is a small hack class for the Template used in generating + VCF file.""" + + def __init__(self, vcf_reader): + self.infos = vcf_reader.infos + self.formats = vcf_reader.formats + self.filters = vcf_reader.filters + self.alts = vcf_reader.alts + self.contigs = vcf_reader.contigs + self.metadata = vcf_reader.metadata + self._column_headers = vcf_reader._column_headers + self.samples = vcf_reader.samples + +class VariantSet(object): + """A convenient representation of set of variants. + TODO: Implement iterator and generator for the variant set. + """ + + _reader = None + + def __init__(self, vcf_in, filters=None): + """Constructor of variant set. + + Parameters: + ----------- + vcf_in: str + Path to the VCF file for loading information. + filters: str or dict, optional + Dictionary or string of the filter:threshold key value pairs. + """ + self.vcf_in = vcf_in + self._reader = vcf.Reader(filename=vcf_in) + self.out_template = VCFTemplate(self._reader) + + self.filters = [] + if filters is not None: + if isinstance(filters, str): + self.filters = str_to_filters(filters) + elif isinstance(filters, dict): + self.filters = make_filters(config=filters) + elif isinstance(filters, list): + self.filters = filters + else: + logging.warn("Could not create filters from %s", filters) + else: + reader = vcf.Reader(filename=self.vcf_in) + filters = {} + for filter_id in reader.filters: + filters.update(PHEFilterBase.decode(filter_id)) + + if filters: + self.filters = make_filters(config=filters) + + self.variants = [] + + def filter_variants(self, keep_only_snps=True): + """Create a variant """ + + if self._reader is None: + # Create a reader class from input VCF. + self._reader = vcf.Reader(filename=self.vcf_in) + + # get list of existing filters. + existing_filters = {} + removed_filters = [] + + for filter_id in self._reader.filters: + conf = PHEFilterBase.decode(filter_id) + tuple(conf.keys()) + existing_filters.update({tuple(conf.keys()):filter_id}) + + # Add each filter we are going to use to the record. + # This is needed for writing out proper #FILTER header in VCF. + for record_filter in self.filters: + # We know that each filter has short description method. + short_doc = record_filter.short_desc() + short_doc = short_doc.split('\n')[0].lstrip() + + filter_name = PHEFilterBase.decode(record_filter.filter_name()) + + # Check if the sample has been filtered for this type of filter + # in the past. If so remove is, because it is going to be refiltered. + if tuple(filter_name) in existing_filters: + logging.info("Removing existing filter: %s", existing_filters[tuple(filter_name)]) + removed_filters.append(existing_filters[tuple(filter_name)]) + del self._reader.filters[existing_filters[tuple(filter_name)]] + + self._reader.filters[record_filter.filter_name()] = _Filter(record_filter.filter_name(), short_doc) + + # For each record (POSITION) apply set of filters. + for record in self._reader: + + # If this record failed filters and we removed some, + # check is they need to be removed from record. + if isinstance(record.FILTER, list) and len(record.FILTER) > 0: + for filter_id in removed_filters: + if filter_id in record.FILTER: + record.FILTER.remove(filter_id) + + for record_filter in self.filters: + + # Call to __call__ method in each filter. + result = record_filter(record) + + # Record is KEPT if filter returns None + if result == None: + continue + + # If we got this far, then record is filtered OUT. + record.add_filter(record_filter.filter_name()) + + # After applying all filters, check if FILTER is None. + # If it is, then record PASSED all filters. + if record.FILTER is None or record.FILTER == []: + record.FILTER = 'PASS' + # FIXME: Does this work for indels? + if keep_only_snps and record.is_snp: + self.variants.append(record) + else: + self.variants.append(record) + + self.update_filters(self._reader.filters) + + return [ variant for variant in self.variants if variant.FILTER == "PASS"] + + def add_metadata(self, info): + """Add metadata to the variant set. + + Parameters: + ----------- + info: dict + Dictionary of key value pairs to be inserted into metadata. + """ + for info_key, metadata in info.items(): + self.out_template.metadata[info_key] = metadata + + def write_variants(self, vcf_out, only_snps=False, only_good=False): + """Write variants to a VCF file. + + Parameters: + ----------- + vcf_out: str + Path to the file where VCF data is written. + only_snps: bool, optional + True is *only* SNP are to be written to the output (default: False). + only_good: bool, optional + True if only those records that PASS all filters should be written + (default: False). + + Returns: + int: + Number of records written. + """ + written_variants = 0 + with open(vcf_out, "w") as out_vcf: + writer = vcf.Writer(out_vcf, self.out_template) + for record in self.variants: + + if only_snps and not record.is_snp: + continue + + if only_good and record.FILTER != "PASS" or record.FILTER is None: + continue + + writer.write_record(record) + written_variants += 1 + + return written_variants + + def _write_bad_variants(self, vcf_out): + """**PRIVATE:** Write only those records that **haven't** passed.""" + written_variants = 0 + with open(vcf_out, "w") as out_vcf: + writer = vcf.Writer(out_vcf, self.out_template) + for record in self.variants: + if record.FILTER != "PASS" and record.FILTER is not None: + writer.write_record(record) + written_variants += 1 + return written_variants + + def serialise(self, out_file): + """Save the data in this class to a file for future use/reload. + + Parameters: + ----------- + out_file: str + path to file where the data should be written to. + + Returns: + -------- + int: + Number of variants written. + """ + written_variants = 0 + with open(out_file, "w") as out_vcf: + writer = vcf.Writer(out_vcf, self.out_template) + for record in self.variants: + writer.write_record(record) + written_variants += 1 + + return written_variants + + def update_filters(self, new_filters): + """Update internal filters in the output template.""" + for new_filter, filter_data in new_filters.items(): + self.out_template.filters[new_filter] = filter_data + + +class VariantCaller(PHEMetaData): + """Abstract class used for access to the implemented variant callers.""" + + __metaclass__ = abc.ABCMeta + + def __init__(self, cmd_options=None): + """Constructor for variant caller. + + Parameters: + ----------- + cmd_options: str, optional + Command options to pass to the variant command. + """ + self.cmd_options = cmd_options + + super(VariantCaller, self).__init__() + + @abc.abstractmethod + def make_vcf(self, *args, **kwargs): + """Create a VCF from **BAM** file. + + Parameters: + ----------- + ref: str + Path to the reference file. + bam: str + Path to the indexed **BAM** file for calling variants. + vcf_file: str + path to the VCF file where data will be written to. + + Returns: + -------- + bool: + True if variant calling was successful, False otherwise. + """ + raise NotImplementedError("make_vcf is not implemented yet.") + + @abc.abstractmethod + def create_aux_files(self, ref): + """Create needed (if any) auxiliary files. + These files are required for proper functioning of the variant caller. + """ + raise NotImplementedError("create_aux_files is not implemeted.") + + @abc.abstractmethod + def get_info(self, plain=False): + """Get information about this variant caller.""" + raise NotImplementedError("Get info has not been implemented yet." + ) + def get_meta(self): + """Get the metadata about this variant caller.""" + od = self.get_info() + od["ID"] = "VariantCaller" + return OrderedDict({"PHEVariantMetaData": [od]}) + + @abc.abstractmethod + def get_version(self): + """Get the version of the underlying command used.""" + raise NotImplementedError("Get version has not been implemented yet.")