view varscan.py @ 6:ab2f908f0b08 draft

planemo upload for repository https://github.com/galaxyproject/iuc/tree/master/tools/varscan commit 4b7660df2384d8c8c6b5a6f02a00d13211e6ff6c
author iuc
date Fri, 18 Jan 2019 19:41:30 -0500
parents d062703d6f13
children 7c5e867820cf
line wrap: on
line source

#!/usr/bin/env python3
from __future__ import print_function

import argparse
import io
import os
import subprocess
import sys
import tempfile
import time
from contextlib import ExitStack
from functools import partial
from threading import Thread

import pysam


class VariantCallingError (RuntimeError):
    """Exception class for issues with samtools and varscan subprocesses."""

    def __init__(self, message=None, call='', error=''):
        self.message = message
        self.call = call.strip()
        self.error = error.strip()

    def __str__(self):
        if self.message is None:
            return ''
        if self.error:
            msg_header = '"{0}" failed with:\n{1}\n\n'.format(
                self.call, self.error
            )
        else:
            msg_header = '{0} failed.\n'
            'No further information about this error is available.\n\n'.format(
                self.call
            )
        return msg_header + self.message


class VarScanCaller (object):
    def __init__(self, ref_genome, bam_input_files,
                 max_depth=None,
                 min_mapqual=None, min_basequal=None,
                 threads=1, verbose=False, quiet=True
                 ):
        self.ref_genome = ref_genome
        self.bam_input_files = bam_input_files
        self.max_depth = max_depth
        self.min_mapqual = min_mapqual
        self.min_basequal = min_basequal
        self.threads = threads
        self.verbose = verbose
        self.quiet = quiet

        with pysam.FastaFile(ref_genome) as ref_fa:
            self.ref_contigs = ref_fa.references
            self.ref_lengths = ref_fa.lengths

        self.pileup_engine = ['samtools', 'mpileup']
        self.varcall_engine = ['varscan', 'somatic']
        self.requires_stdout_redirect = False
        self.TemporaryContigVCF = partial(
            tempfile.NamedTemporaryFile,
            mode='wb', suffix='', delete=False, dir=os.getcwd()
        )
        self.tmpfiles = []

    def _get_pysam_pileup_args(self):
        param_dict = {}
        if self.max_depth is not None:
            param_dict['max_depth'] = self.max_depth
        if self.min_mapqual is not None:
            param_dict['min_mapping_quality'] = self.min_mapqual
        if self.min_basequal is not None:
            param_dict['min_base_quality'] = self.min_basequal
        param_dict['compute_baq'] = False
        param_dict['stepper'] = 'samtools'
        return param_dict

    def varcall_parallel(self, normal_purity=None, tumor_purity=None,
                         min_coverage=None,
                         min_var_count=None,
                         min_var_freq=None, min_hom_freq=None,
                         p_value=None, somatic_p_value=None,
                         threads=None, verbose=None, quiet=None
                         ):
        if not threads:
            threads = self.threads
        if verbose is None:
            verbose = self.verbose
        if quiet is None:
            quiet = self.quiet
        # mapping of method parameters to varcall engine command line options
        varcall_engine_option_mapping = [
            ('--normal-purity', normal_purity),
            ('--tumor-purity', tumor_purity),
            ('--min-coverage', min_coverage),
            ('--min-reads2', min_var_count),
            ('--min-var-freq', min_var_freq),
            ('--min-freq-for-hom', min_hom_freq),
            ('--p-value', p_value),
            ('--somatic-p-value', somatic_p_value),
            ('--min-avg-qual', self.min_basequal)
        ]
        varcall_engine_options = []
        for option, value in varcall_engine_option_mapping:
            if value is not None:
                varcall_engine_options += [option, str(value)]
        pileup_engine_options = ['-B']
        if self.max_depth is not None:
            pileup_engine_options += ['-d', str(self.max_depth)]
        if self.min_mapqual is not None:
            pileup_engine_options += ['-q', str(self.min_mapqual)]
        if self.min_basequal is not None:
            pileup_engine_options += ['-Q', str(self.min_basequal)]

        # Create a tuple of calls to samtools mpileup and varscan for
        # each contig. The contig name is stored as the third element of
        # that tuple.
        # The calls are stored in the reverse order of the contig list so
        # that they can be popped off later in the original order
        calls = [(
            self.pileup_engine + pileup_engine_options + [
                '-r', contig + ':',
                '-f', self.ref_genome
            ] + self.bam_input_files,
            self.varcall_engine + [
                '-', '{out}', '--output-vcf', '1', '--mpileup', '1'
            ] + varcall_engine_options,
            contig
        ) for contig in self.ref_contigs[::-1]]

        if verbose:
            print('Starting variant calling ..')

        # launch subprocesses and monitor their status
        subprocesses = []
        error_table = {}
        tmp_io_started = []
        tmp_io_finished = []
        self.tmpfiles = []

        def enqueue_stderr_output(out, stderr_buffer):
            for line in iter(out.readline, b''):
                # Eventually we are going to print the contents of
                # the stderr_buffer to sys.stderr so we can
                # decode things here using its encoding.
                # We do a 'backslashreplace' just to be on the safe side.
                stderr_buffer.write(line.decode(sys.stderr.encoding,
                                                'backslashreplace'))
            out.close()

        try:
            while subprocesses or calls:
                while calls and len(subprocesses) < threads:
                    # There are still calls waiting for execution and we
                    # have unoccupied threads so we launch a new combined
                    # call to samtools mpileup and the variant caller.

                    # pop the call arguments from our call stack
                    call = calls.pop()
                    # get the name of the contig that this call is going
                    # to work on
                    contig = call[2]
                    # Based on the contig name, generate a readable and
                    # file system-compatible prefix and use it to create
                    # a named temporary file, to which the call output
                    # will be redirected.
                    # At the moment we create the output file we add it to
                    # the list of all temporary output files so that we can
                    # remove it eventually during cleanup.
                    call_out = self.TemporaryContigVCF(
                        prefix=''.join(
                            c if c.isalnum() else '_' for c in contig
                        ) + '_',
                    )
                    # maintain a list of variant call outputs
                    # in the order the subprocesses got launched
                    tmp_io_started.append(call_out.name)
                    if self.requires_stdout_redirect:
                        # redirect stdout to the temporary file just created
                        stdout_p2 = call_out
                        stderr_p2 = subprocess.PIPE
                    else:
                        # variant caller wants to write output to file directly
                        stdout_p2 = subprocess.PIPE
                        stderr_p2 = subprocess.STDOUT
                        call[1][call[1].index('{out}')] = call_out.name
                        call_out.close()
                    # for reporting purposes, join the arguments for the
                    # samtools and the variant caller calls into readable
                    # strings
                    c_str = (' '.join(call[0]), ' '.join(call[1]))
                    error_table[c_str] = [io.StringIO(), io.StringIO()]
                    # start the subprocesses
                    p1 = subprocess.Popen(
                        call[0],
                        stdout=subprocess.PIPE, stderr=subprocess.PIPE
                    )
                    p2 = subprocess.Popen(
                        call[1],
                        stdin=p1.stdout,
                        stdout=stdout_p2,
                        stderr=stderr_p2
                    )
                    # subprocess bookkeeping
                    subprocesses.append((c_str, p1, p2, call_out, contig))
                    # make sure our newly launched call does not block
                    # because its buffers fill up
                    p1.stdout.close()
                    t1 = Thread(
                        target=enqueue_stderr_output,
                        args=(p1.stderr, error_table[c_str][0])
                    )
                    t2 = Thread(
                        target=enqueue_stderr_output,
                        args=(
                            p2.stderr
                            if self.requires_stdout_redirect else
                            p2.stdout,
                            error_table[c_str][1]
                        )
                    )
                    t1.daemon = t2.daemon = True
                    t1.start()
                    t2.start()

                    if verbose:
                        print(
                            'Calling variants for contig: {0}'
                            .format(call[2])
                        )

                # monitor all running calls to see if any of them are done
                for call, p1, p2, ofo, contig in subprocesses:
                    p1_stat = p1.poll()
                    p2_stat = p2.poll()
                    if p1_stat is not None or p2_stat is not None:
                        # There is an outcome for this process!
                        # Lets see what it is
                        if p1_stat or p2_stat:
                            print()
                            print(
                                error_table[call][0].getvalue(),
                                error_table[call][1].getvalue(),
                                file=sys.stderr
                            )
                            raise VariantCallingError(
                                'Variant Calling for contig {0} failed.'
                                .format(contig),
                                call='{0} | {1}'.format(call[0], call[1])
                            )
                        if p1_stat == 0 and p2_stat is None:
                            # VarScan is not handling the no output from
                            # samtools mpileup situation correctly so maybe
                            # that's the issue here
                            last_words = error_table[call][1].getvalue(
                            ).splitlines()[-4:]
                            if len(last_words) < 4 or any(
                                not msg.startswith('Input stream not ready')
                                for msg in last_words
                            ):
                                break
                                # lets give this process a bit more time
                            # VarScan is waiting for input it will never
                            # get, stop it.
                            p2.terminate()
                            subprocesses.remove((call, p1, p2, ofo, contig))
                            ofo.close()
                            break
                        if p2_stat == 0:
                            # Things went fine.
                            # maintain a list of variant call outputs
                            # that finished successfully (in the order
                            # they finished)
                            tmp_io_finished.append(ofo.name)
                            if verbose:
                                print()
                                print('Contig {0} finished.'.format(contig))
                            if not quiet:
                                print()
                                print(
                                    'stderr output from samtools mpileup/'
                                    'bcftools:'.upper(),
                                    file=sys.stderr
                                )
                                print(
                                    error_table[call][0].getvalue(),
                                    error_table[call][1].getvalue(),
                                    file=sys.stderr
                                )
                            # Discard the collected stderr output from
                            # the call, remove the call from the list of
                            # running calls and close its output file.
                            del error_table[call]
                            subprocesses.remove((call, p1, p2, ofo, contig))
                            # Closing the output file is important or we
                            # may hit the file system limit for open files
                            # if there are lots of contigs.
                            ofo.close()
                            break
                # wait a bit in between monitoring cycles
                time.sleep(2)
        finally:
            for call, p1, p2, ofo, contig in subprocesses:
                # make sure we do not leave running subprocesses behind
                for proc in (p1, p2):
                    try:
                        proc.terminate()
                    except Exception:
                        pass
                # close currently open files
                ofo.close()
            # store the files with finished content in the order that
            # the corresponding jobs were launched
            self.tmpfiles = [f for f in tmp_io_started if f in tmp_io_finished]
            # Make sure remaining buffered stderr output of
            # subprocesses does not get lost.
            # Currently, we don't screen this output for real errors,
            # but simply rewrite everything.
            if not quiet and error_table:
                print()
                print(
                    'stderr output from samtools mpileup/bcftools:'.upper(),
                    file=sys.stderr
                )
                for call, errors in error_table.items():
                    print(' | '.join(call), ':', file=sys.stderr)
                    print('-' * 20, file=sys.stderr)
                    print('samtools mpileup output:', file=sys.stderr)
                    print(errors[0].getvalue(), file=sys.stderr)
                    print('varscan somatic output:', file=sys.stderr)
                    print(errors[1].getvalue(), file=sys.stderr)

    def _add_ref_contigs_to_header(self, header):
        for chrom, length in zip(self.ref_contigs, self.ref_lengths):
            header.add_meta(
                'contig',
                items=[('ID', chrom), ('length', length)]
            )

    def _add_filters_to_header(self, header):
        varscan_fpfilters = {
            'VarCount': 'Fewer than {min_var_count2} variant-supporting reads',
            'VarFreq': 'Variant allele frequency below {min_var_freq2}',
            'VarAvgRL':
                'Average clipped length of variant-supporting reads < '
                '{min_var_len}',
            'VarReadPos': 'Relative average read position < {min_var_readpos}',
            'VarDist3':
                'Average distance to effective 3\' end < {min_var_dist3}',
            'VarMMQS':
                'Average mismatch quality sum for variant reads > '
                '{max_var_mmqs}',
            'VarMapQual':
                'Average mapping quality of variant reads < {min_var_mapqual}',
            'VarBaseQual':
                'Average base quality of variant reads < {min_var_basequal}',
            'Strand':
                'Strand representation of variant reads < {min_strandedness}',
            'RefAvgRL':
                'Average clipped length of ref-supporting reads < '
                '{min_ref_len}',
            'RefReadPos':
                'Relative average read position < {min_ref_readpos}',
            'RefDist3':
                'Average distance to effective 3\' end < {min_ref_dist3}',
            'RefMapQual':
                'Average mapping quality of reference reads < '
                '{min_ref_mapqual}',
            'RefBaseQual':
                'Average base quality of ref-supporting reads < '
                '{min_ref_basequal}',
            'RefMMQS':
                'Average mismatch quality sum for ref-supporting reads > '
                '{max_ref_mmqs}',
            'MMQSdiff':
                'Mismatch quality sum difference (var - ref) > '
                '{max_mmqs_diff}',
            'MinMMQSdiff':
                'Mismatch quality sum difference (var - ref) < '
                '{max_mmqs_diff}',
            'MapQualDiff':
                'Mapping quality difference (ref - var) > {max_mapqual_diff}',
            'MaxBAQdiff':
                'Average base quality difference (ref - var) > '
                '{max_basequal_diff}',
            'ReadLenDiff':
                'Average supporting read length difference (ref - var) > '
                '{max_relative_len_diff}',
        }
        for filter_id, description in varscan_fpfilters.items():
            header.filters.add(filter_id, None, None, description)

    def _add_indel_info_flag_to_header(self, header):
        header.info.add(
            'INDEL', 0, 'Flag', 'Indicates that the variant is an INDEL'
        )

    def _standardize_format_fields(self, header):
        """Add standard FORMAT key declarations to a VCF header."""

        format_field_specs = [
            ('GT', '1', 'String', 'Genotype'),
            ('GQ', '1', 'Integer', 'Genotype quality'),
            ('DP', '1', 'Integer', 'Read depth'),
            ('AD', 'R', 'Integer', 'Read depth for each allele'),
            ('ADF', 'R', 'Integer',
             'Read depth for each allele on the forward strand'),
            ('ADR', 'R', 'Integer',
             'Read depth for each allele on the reverse strand')
        ]
        # Existing FORMAT declarations cannot be overwritten.
        # The only viable strategy is to mark them as removed,
        # then merge the header with another one containing the
        # correct declarations. This is what is implemented below.
        temp_header = pysam.VariantHeader()
        for spec in format_field_specs:
            temp_header.formats.add(*spec)
            if spec[0] in header.formats:
                header.formats.remove_header(spec[0])
        header.merge(temp_header)

    def _compile_common_header(self, varcall_template, no_filters=False):
        # fix the header generated by VarScan
        # by adding reference and contig information
        common_header = pysam.VariantHeader()
        common_header.add_meta('reference', value=self.ref_genome)
        self._add_ref_contigs_to_header(common_header)
        if not no_filters:
            # add filter info
            self._add_filters_to_header(common_header)
        # change the source information
        common_header.add_meta('source', value='varscan.py')
        # declare an INDEL flag for record INFO fields
        self._add_indel_info_flag_to_header(common_header)
        # generate standard FORMAT field declarations
        # including a correct AD declaration to prevent VarScan's
        # non-standard one from getting propagated
        self._standardize_format_fields(common_header)
        # take the remaining metadata from the template header produced by
        # VarScan
        with pysam.VariantFile(varcall_template, 'r') as original_data:
            varscan_header = original_data.header
        for sample in varscan_header.samples:
            common_header.samples.add(sample)
        common_header.merge(varscan_header)
        # don't propagate non-standard and redundant FORMAT keys written
        # by VarScan
        common_header.formats.remove_header('FREQ')
        common_header.formats.remove_header('RD')
        common_header.formats.remove_header('DP4')
        return common_header

    def _fix_read_gt_fields(self, read):
        """Migrate non-standard genotype fields to standard ones."""

        for gt_field in read.samples.values():
            # generate a standard AD field by combining VarScan's
            # RD and non-standard AD fields
            gt_field['AD'] = (gt_field['RD'], gt_field['AD'][0])
            # split VarScan's DP4 field into the standard fields
            # ADF and ADR
            gt_field['ADF'] = (
                int(gt_field['DP4'][0]), int(gt_field['DP4'][2])
            )
            gt_field['ADR'] = (
                int(gt_field['DP4'][1]), int(gt_field['DP4'][3])
            )
        # remove redundant fields
        # FREQ is easy to calculate from AD
        del read.format['FREQ']
        del read.format['RD']
        del read.format['DP4']

    def pileup_masker(self, mask):
        def apply_mask_on_pileup(piled_items):
            for item, status in zip(piled_items, mask):
                if status:
                    yield item
        return apply_mask_on_pileup

    def get_allele_specific_pileup_column_stats(
        self, allele, pile_column, ref_fetch
    ):
        # number of reads supporting the given allele on
        # forward and reverse strand, and in total
        var_reads_plus = var_reads_minus = 0
        var_supp_read_mask = []
        for base in pile_column.get_query_sequences():
            if base == allele:
                # allele supporting read on + strand
                var_reads_plus += 1
                var_supp_read_mask.append(True)
            elif base.upper() == allele:
                # allele supporting read on - strand
                var_reads_minus += 1
                var_supp_read_mask.append(True)
            else:
                var_supp_read_mask.append(False)
        var_reads_total = var_reads_plus + var_reads_minus

        if var_reads_total == 0:
            # No stats without reads!
            return None

        var_supp_only = self.pileup_masker(var_supp_read_mask)

        # average mapping quality of the reads supporting the
        # given allele
        avg_mapping_quality = sum(
            mq for mq in var_supp_only(
                pile_column.get_mapping_qualities()
            )
        ) / var_reads_total

        # for the remaining stats we need access to complete
        # read information
        piled_reads = [
            p for p in var_supp_only(pile_column.pileups)
        ]
        assert len(piled_reads) == var_reads_total
        sum_avg_base_qualities = 0
        sum_dist_from_center = 0
        sum_dist_from_3prime = 0
        sum_clipped_length = 0
        sum_unclipped_length = 0
        sum_num_mismatches_as_fraction = 0
        sum_mismatch_qualities = 0

        for p in piled_reads:
            sum_avg_base_qualities += sum(
                p.alignment.query_qualities
            ) / p.alignment.infer_query_length()
            sum_clipped_length += p.alignment.query_alignment_length
            unclipped_length = p.alignment.infer_read_length()
            sum_unclipped_length += unclipped_length
            read_center = p.alignment.query_alignment_length / 2
            sum_dist_from_center += 1 - abs(
                p.query_position - read_center
            ) / read_center
            if p.alignment.is_reverse:
                sum_dist_from_3prime += p.query_position / unclipped_length
            else:
                sum_dist_from_3prime += 1 - p.query_position / unclipped_length

            sum_num_mismatches = 0
            for qpos, rpos in p.alignment.get_aligned_pairs():
                if qpos is not None and rpos is not None:
                    if p.alignment.query_sequence[qpos] != ref_fetch(
                        rpos, rpos + 1
                    ).upper():  # ref bases can be lowercase!
                        sum_num_mismatches += 1
                        sum_mismatch_qualities += p.alignment.query_qualities[
                            qpos
                        ]
            sum_num_mismatches_as_fraction += (
                sum_num_mismatches / p.alignment.query_alignment_length
            )
        avg_basequality = sum_avg_base_qualities / var_reads_total
        avg_pos_as_fraction = sum_dist_from_center / var_reads_total
        avg_num_mismatches_as_fraction = (
            sum_num_mismatches_as_fraction / var_reads_total
        )
        avg_sum_mismatch_qualities = sum_mismatch_qualities / var_reads_total
        avg_clipped_length = sum_clipped_length / var_reads_total
        avg_distance_to_effective_3p_end = (
            sum_dist_from_3prime / var_reads_total
        )

        return (
            avg_mapping_quality,
            avg_basequality,
            var_reads_plus,
            var_reads_minus,
            avg_pos_as_fraction,
            avg_num_mismatches_as_fraction,
            avg_sum_mismatch_qualities,
            avg_clipped_length,
            avg_distance_to_effective_3p_end
        )

    def _postprocess_variant_records(self, invcf,
                                     min_var_count2, min_var_count2_lc,
                                     min_var_freq2, max_somatic_p,
                                     max_somatic_p_depth,
                                     min_ref_readpos, min_var_readpos,
                                     min_ref_dist3, min_var_dist3,
                                     min_ref_len, min_var_len,
                                     max_relative_len_diff,
                                     min_strandedness, min_strand_reads,
                                     min_ref_basequal, min_var_basequal,
                                     max_basequal_diff,
                                     min_ref_mapqual, min_var_mapqual,
                                     max_mapqual_diff,
                                     max_ref_mmqs, max_var_mmqs,
                                     min_mmqs_diff, max_mmqs_diff,
                                     **args):
        # set FILTER field according to Varscan criteria
        # multiple FILTER entries must be separated by semicolons
        # No filters applied should be indicated with MISSING

        # since posterior filters are always applied to just one sample,
        # a better place to store the info is in the FT genotype field:
        # can be PASS, '.' to indicate that filters have not been applied,
        # or a semicolon-separated list of filters that failed
        # unfortunately, gemini does not support this field

        with ExitStack() as io_stack:
            normal_reads, tumor_reads = (
                io_stack.enter_context(
                    pysam.Samfile(fn, 'rb')) for fn in self.bam_input_files
            )
            refseq = io_stack.enter_context(pysam.FastaFile(self.ref_genome))
            pileup_args = self._get_pysam_pileup_args()
            for record in invcf:
                if any(len(allele) > 1 for allele in record.alleles):
                    # skip indel postprocessing for the moment
                    yield record
                    continue
                # get pileup for genomic region affected by this variant
                if record.info['SS'] == '2':
                    # a somatic variant => generate pileup from tumor data
                    pile = tumor_reads.pileup(
                        record.chrom, record.start, record.stop,
                        **pileup_args
                    )
                    sample_of_interest = 'TUMOR'
                elif record.info['SS'] in ['1', '3']:
                    # a germline or LOH variant => pileup from normal data
                    pile = normal_reads.pileup(
                        record.chrom, record.start, record.stop,
                        **pileup_args
                    )
                    sample_of_interest = 'NORMAL'
                else:
                    # TO DO: figure out if there is anything interesting to do
                    # for SS status codes 0 (reference) and 5 (unknown)
                    yield record
                    continue

                # apply false-positive filtering a la varscan fpfilter
                # find the variant site in the pileup columns
                for pile_column in pile:
                    if pile_column.reference_pos == record.start:
                        break
                # extract required information
                # overall read depth at the site
                read_depth = pile_column.get_num_aligned()
                assert read_depth > 0
                # no multiallelic sites in varscan
                assert len(record.alleles) == 2
                if record.samples[sample_of_interest]['RD'] > 0:
                    ref_stats, alt_stats = [
                        self.get_allele_specific_pileup_column_stats(
                            allele,
                            pile_column,
                            partial(
                                pysam.FastaFile.fetch, refseq, record.chrom
                            )
                        )
                        for allele in record.alleles
                    ]
                else:
                    ref_stats = None
                    alt_stats = self.get_allele_specific_pileup_column_stats(
                        record.alleles[1],
                        pile_column,
                        partial(pysam.FastaFile.fetch, refseq, record.chrom)
                    )
                ref_count = 0
                if ref_stats:
                    ref_count = ref_stats[2] + ref_stats[3]
                    if ref_stats[1] < min_ref_basequal:
                        record.filter.add('RefBaseQual')
                    if ref_count >= 2:
                        if ref_stats[0] < min_ref_mapqual:
                            record.filter.add('RefMapQual')
                        if ref_stats[4] < min_ref_readpos:
                            record.filter.add('RefReadPos')
                        # ref_stats[5] (avg_num_mismatches_as_fraction
                        # is not a filter criterion in VarScan fpfilter
                        if ref_stats[6] > max_ref_mmqs:
                            record.filter.add('RefMMQS')
                        if ref_stats[7] < min_ref_len:
                            # VarScan fpfilter does not apply this filter
                            # for indels, but there is no reason
                            # not to do it.
                            record.filter.add('RefAvgRL')
                        if ref_stats[8] < min_ref_dist3:
                            record.filter.add('RefDist3')
                if alt_stats:
                    alt_count = alt_stats[2] + alt_stats[3]
                    if (
                        alt_count < min_var_count2_lc
                    ) or (
                        read_depth >= max_somatic_p_depth and
                        alt_count < min_var_count2
                    ):
                        record.filter.add('VarCount')
                    if alt_count / read_depth < min_var_freq2:
                        record.filter.add('VarFreq')
                    if alt_stats[1] < min_var_basequal:
                        record.filter.add('VarBaseQual')
                    if alt_count > min_strand_reads:
                        if (
                            alt_stats[2] / alt_count < min_strandedness
                        ) or (
                            alt_stats[3] / alt_count < min_strandedness
                        ):
                            record.filter.add('Strand')
                    if alt_stats[2] + alt_stats[3] >= 2:
                        if alt_stats[0] < min_var_mapqual:
                            record.filter.add('VarMapQual')
                        if alt_stats[4] < min_var_readpos:
                            record.filter.add('VarReadPos')
                        # alt_stats[5] (avg_num_mismatches_as_fraction
                        # is not a filter criterion in VarScan fpfilter
                        if alt_stats[6] > max_var_mmqs:
                            record.filter.add('VarMMQS')
                        if alt_stats[7] < min_var_len:
                            # VarScan fpfilter does not apply this filter
                            # for indels, but there is no reason
                            # not to do it.
                            record.filter.add('VarAvgRL')
                        if alt_stats[8] < min_var_dist3:
                            record.filter.add('VarDist3')
                    if ref_count >= 2 and alt_count >= 2:
                        if (ref_stats[0] - alt_stats[0]) > max_mapqual_diff:
                            record.filter.add('MapQualDiff')
                        if (ref_stats[1] - alt_stats[1]) > max_basequal_diff:
                            record.filter.add('MaxBAQdiff')
                        mmqs_diff = alt_stats[6] - ref_stats[6]
                        if mmqs_diff < min_mmqs_diff:
                            record.filter.add('MinMMQSdiff')
                        if mmqs_diff > max_mmqs_diff:
                            record.filter.add('MMQSdiff')
                        if (
                            1 - alt_stats[7] / ref_stats[7]
                        ) > max_relative_len_diff:
                            record.filter.add('ReadLenDiff')
                else:
                    # No variant-supporting reads for this record!
                    # This can happen in rare cases because of
                    # samtools mpileup issues, but indicates a
                    # rather unreliable variant call.
                    record.filter.add('VarCount')
                    record.filter.add('VarFreq')
                yield record

    def _indel_flagged_records(self, vcf):
        for record in vcf:
            record.info['INDEL'] = True
            yield record

    def _merge_generator(self, vcf1, vcf2):
        try:
            record1 = next(vcf1)
        except StopIteration:
            for record2 in vcf2:
                yield record2
            return
        try:
            record2 = next(vcf2)
        except StopIteration:
            yield record1
            for record1 in vcf1:
                yield record1
            return
        while True:
            if (record1.start, record1.stop) < (record2.start, record2.stop):
                yield record1
                try:
                    record1 = next(vcf1)
                except StopIteration:
                    yield record2
                    for record2 in vcf2:
                        yield record2
                    return
            else:
                yield record2
                try:
                    record2 = next(vcf2)
                except StopIteration:
                    yield record1
                    for record1 in vcf1:
                        yield record1
                    return

    def merge_and_postprocess(self, snps_out, indels_out=None,
                              no_filters=False, **filter_args):
        temporary_data = self.tmpfiles
        self.tmpfiles = []
        temporary_snp_files = [f + '.snp.vcf' for f in temporary_data]
        temporary_indel_files = [f + '.indel.vcf' for f in temporary_data]

        for f in temporary_data:
            try:
                os.remove(f)
            except Exception:
                pass

        def noop_gen(data, **kwargs):
            for d in data:
                yield d

        if no_filters:
            apply_filters = noop_gen
        else:
            apply_filters = self._postprocess_variant_records

        output_header = self._compile_common_header(
            temporary_snp_files[0],
            no_filters
        )
        if indels_out is None:
            with open(snps_out, 'w') as o:
                o.write(str(output_header).format(**filter_args))
                for snp_f, indel_f in zip(
                    temporary_snp_files, temporary_indel_files
                ):
                    with pysam.VariantFile(snp_f, 'r') as snp_invcf:
                        # fix the input header on the fly
                        # to avoid Warnings from htslib about missing contig
                        # info
                        self._add_ref_contigs_to_header(snp_invcf.header)
                        self._add_filters_to_header(snp_invcf.header)
                        self._add_indel_info_flag_to_header(snp_invcf.header)
                        self._standardize_format_fields(snp_invcf.header)
                        with pysam.VariantFile(indel_f, 'r') as indel_invcf:
                            # fix the input header on the fly
                            # to avoid Warnings from htslib about missing
                            # contig info
                            self._add_ref_contigs_to_header(indel_invcf.header)
                            self._add_filters_to_header(indel_invcf.header)
                            self._add_indel_info_flag_to_header(
                                indel_invcf.header
                            )
                            self._standardize_format_fields(indel_invcf.header)
                            for record in apply_filters(
                                self._merge_generator(
                                    snp_invcf,
                                    self._indel_flagged_records(indel_invcf)
                                ),
                                **filter_args
                            ):
                                self._fix_read_gt_fields(record)
                                o.write(str(record))
                    try:
                        os.remove(snp_f)
                    except Exception:
                        pass
                    try:
                        os.remove(indel_f)
                    except Exception:
                        pass

        else:
            with open(snps_out, 'w') as o:
                o.write(str(output_header).format(**filter_args))
                for f in temporary_snp_files:
                    with pysam.VariantFile(f, 'r') as invcf:
                        # fix the input header on the fly
                        # to avoid Warnings from htslib about missing
                        # contig info and errors because of undeclared
                        # filters
                        self._add_ref_contigs_to_header(invcf.header)
                        self._add_filters_to_header(invcf.header)
                        self._standardize_format_fields(invcf.header)
                        for record in apply_filters(
                            invcf, **filter_args
                        ):
                            self._fix_read_gt_fields(record)
                            o.write(str(record))
                    try:
                        os.remove(f)
                    except Exception:
                        pass
            with open(indels_out, 'w') as o:
                o.write(str(output_header))
                for f in temporary_indel_files:
                    with pysam.VariantFile(f, 'r') as invcf:
                        # fix the input header on the fly
                        # to avoid Warnings from htslib about missing
                        # contig info and errors because of undeclared
                        # filters
                        self._add_ref_contigs_to_header(invcf.header)
                        self._add_filters_to_header(invcf.header)
                        self._add_indel_info_flag_to_header(invcf.header)
                        self._standardize_format_fields(invcf.header)
                        for record in apply_filters(
                            self._indel_flagged_records(invcf), **filter_args
                        ):
                            self._fix_read_gt_fields(record)
                            o.write(str(record))
                    try:
                        os.remove(f)
                    except Exception:
                        pass


def varscan_call(ref_genome, normal, tumor, output_path, **args):
    """Preparse arguments and orchestrate calling and postprocessing."""

    if args.pop('split_output'):
        if '%T' in output_path:
            out = (
                output_path.replace('%T', 'snp'),
                output_path.replace('%T', 'indel')
            )
        else:
            out = (
                output_path + '.snp',
                output_path + '.indel'
            )
    else:
        out = (output_path, None)

    instance_args = {
        k: args.pop(k) for k in [
            'max_depth',
            'min_mapqual',
            'min_basequal',
            'threads',
            'verbose',
            'quiet'
        ]
    }
    varscan_somatic_args = {
        k: args.pop(k) for k in [
            'normal_purity',
            'tumor_purity',
            'min_coverage',
            'min_var_count',
            'min_var_freq',
            'min_hom_freq',
            'somatic_p_value',
            'p_value'
        ]
    }

    v = VarScanCaller(ref_genome, [normal, tumor], **instance_args)
    v.varcall_parallel(**varscan_somatic_args)
    v.merge_and_postprocess(*out, **args)


if __name__ == '__main__':
    p = argparse.ArgumentParser()
    p.add_argument(
        'ref_genome',
        metavar='reference_genome',
        help='the reference genome (in fasta format)'
    )
    p.add_argument(
        '--normal',
        metavar='BAM_file', required=True,
        help='the BAM input file of aligned reads from the normal sample'
    )
    p.add_argument(
        '--tumor',
        metavar='BAM_file', required=True,
        help='the BAM input file of aligned reads from the tumor sample'
    )
    p.add_argument(
        '-o', '--ofile', required=True,
        metavar='OFILE', dest='output_path',
        help='Name of the variant output file. With --split-output, the name '
             'may use the %%T replacement token or will be used as the '
             'basename for the two output files to be generated (see '
             '-s|--split-output below).'
    )
    p.add_argument(
        '-s', '--split-output',
        dest='split_output', action='store_true', default=False,
        help='indicate that separate output files for SNPs and indels '
             'should be generated (original VarScan behavior). If specified, '
             '%%T in the --ofile file name will be replaced with "snp" and '
             '"indel" to generate the names of the SNP and indel output '
             'files, respectively. If %%T is not found in the file name, it '
             'will get interpreted as a basename to which ".snp"/".indel" '
             'will be appended.'
    )
    p.add_argument(
        '-t', '--threads',
        type=int, default=1,
        help='level of parallelism'
    )
    p.add_argument(
        '-v', '--verbose',
        action='store_true',
        help='be verbose about progress'
    )
    p.add_argument(
        '-q', '--quiet',
        action='store_true',
        help='suppress output from wrapped tools'
    )
    call_group = p.add_argument_group('Variant calling parameters')
    call_group.add_argument(
        '--normal-purity',
        dest='normal_purity', type=float,
        default=1.0,
        help='Estimated purity of the normal sample (default: 1.0)'
    )
    call_group.add_argument(
        '--tumor-purity',
        dest='tumor_purity', type=float,
        default=1.0,
        help='Estimated purity of the tumor sample (default: 1.0)'
    )
    call_group.add_argument(
        '--max-pileup-depth',
        dest='max_depth', type=int, default=8000,
        help='Maximum depth of generated pileups (samtools mpileup -d option; '
             'default: 8000)'
    )
    call_group.add_argument(
        '--min-basequal',
        dest='min_basequal', type=int,
        default=13,
        help='Minimum base quality at the variant position to use a read '
             '(default: 13)'
    )
    call_group.add_argument(
        '--min-mapqual',
        dest='min_mapqual', type=int,
        default=0,
        help='Minimum mapping quality required to use a read '
             '(default: 0)'
    )
    call_group.add_argument(
        '--min-coverage',
        dest='min_coverage', type=int,
        default=8,
        help='Minimum site coverage required in the normal and in the tumor '
             'sample to call a variant (default: 8)'
    )
    call_group.add_argument(
        '--min-var-count',
        dest='min_var_count', type=int,
        default=2,
        help='Minimum number of variant-supporting reads required to call a '
             'variant (default: 2)'
    )
    call_group.add_argument(
        '--min-var-freq',
        dest='min_var_freq', type=float,
        default=0.1,
        help='Minimum variant allele frequency for calling (default: 0.1)'
    )
    call_group.add_argument(
        '--min-hom-freq',
        dest='min_hom_freq', type=float,
        default=0.75,
        help='Minimum variant allele frequency for homozygous call '
             '(default: 0.75)'
    )
    call_group.add_argument(
        '--p-value',
        dest='p_value', type=float,
        default=0.99,
        help='P-value threshold for heterozygous call (default: 0.99)'
    )
    call_group.add_argument(
        '--somatic-p-value',
        dest='somatic_p_value', type=float,
        default=0.05,
        help='P-value threshold for somatic call (default: 0.05)'
    )
    filter_group = p.add_argument_group('Posterior variant filter parameters')
    filter_group.add_argument(
        '--no-filters',
        dest='no_filters', action='store_true',
        help='Disable all posterior variant filters. '
             'If specified, all following options will be ignored'
    )
    filter_group.add_argument(
        '--min-var-count2',
        dest='min_var_count2', type=int,
        default=4,
        help='Minimum number of variant-supporting reads (default: 4)'
    )
    filter_group.add_argument(
        '--min-var-count2-lc',
        dest='min_var_count2_lc', type=int,
        default=2,
        help='Minimum number of variant-supporting reads when depth below '
             '--somatic-p-depth (default: 2)'
    )
    filter_group.add_argument(
        '--min-var-freq2',
        dest='min_var_freq2', type=float,
        default=0.05,
        help='Minimum variant allele frequency (default: 0.05)'
    )
    filter_group.add_argument(
        '--max-somatic-p',
        dest='max_somatic_p', type=float,
        default=0.05,
        help='Maximum somatic p-value (default: 0.05)'
    )
    filter_group.add_argument(
        '--max-somatic-p-depth',
        dest='max_somatic_p_depth', type=int,
        default=10,
        help='Depth required to run --max-somatic-p filter (default: 10)'
    )
    filter_group.add_argument(
        '--min-ref-readpos',
        dest='min_ref_readpos', type=float,
        default=0.1,
        help='Minimum average relative distance of site from the ends of '
             'ref-supporting reads (default: 0.1)'
    )
    filter_group.add_argument(
        '--min-var-readpos',
        dest='min_var_readpos', type=float,
        default=0.1,
        help='Minimum average relative distance of site from the ends of '
             'variant-supporting reads (default: 0.1)'
    )
    filter_group.add_argument(
        '--min-ref-dist3',
        dest='min_ref_dist3', type=float,
        default=0.1,
        help='Minimum average relative distance of site from the effective '
             '3\'end of ref-supporting reads (default: 0.1)'
    )
    filter_group.add_argument(
        '--min-var-dist3',
        dest='min_var_dist3', type=float,
        default=0.1,
        help='Minimum average relative distance of site from the effective '
             '3\'end of variant-supporting reads (default: 0.1)'
    )
    filter_group.add_argument(
        '--min-ref-len',
        dest='min_ref_len', type=int,
        default=90,
        help='Minimum average trimmed length of reads supporting the ref '
             'allele (default: 90)'
    )
    filter_group.add_argument(
        '--min-var-len',
        dest='min_var_len', type=int,
        default=90,
        help='Minimum average trimmed length of reads supporting the variant '
             'allele (default: 90)'
    )
    filter_group.add_argument(
        '--max-len-diff',
        dest='max_relative_len_diff', type=float,
        default=0.25,
        help='Maximum average relative read length difference (ref - var; '
             'default: 0.25)'
    )
    filter_group.add_argument(
        '--min-strandedness',
        dest='min_strandedness', type=float,
        default=0.01,
        help='Minimum fraction of variant reads from each strand '
             '(default: 0.01)'
    )
    filter_group.add_argument(
        '--min-strand-reads',
        dest='min_strand_reads', type=int,
        default=5,
        help='Minimum allele depth required to run --min-strandedness filter '
             '(default: 5)'
    )
    filter_group.add_argument(
        '--min-ref-basequal',
        dest='min_ref_basequal', type=int,
        default=15,
        help='Minimum average base quality for the ref allele (default: 15)'
    )
    filter_group.add_argument(
        '--min-var-basequal',
        dest='min_var_basequal', type=int,
        default=15,
        help='Minimum average base quality for the variant allele '
             '(default: 15)'
    )
    filter_group.add_argument(
        '--max-basequal-diff',
        dest='max_basequal_diff', type=int,
        default=50,
        help='Maximum average base quality diff (ref - var; default: 50)'
    )
    filter_group.add_argument(
        '--min-ref-mapqual',
        dest='min_ref_mapqual', type=int,
        default=15,
        help='Minimum average mapping quality of reads supporting the ref '
             'allele (default: 15)'
    )
    filter_group.add_argument(
        '--min-var-mapqual',
        dest='min_var_mapqual', type=int,
        default=15,
        help='Minimum average mapping quality of reads supporting the variant '
             'allele (default: 15)'
    )
    filter_group.add_argument(
        '--max-mapqual-diff',
        dest='max_mapqual_diff', type=int,
        default=50,
        help='Maximum average mapping quality difference (ref - var; '
             'default: 50)'
    )
    filter_group.add_argument(
        '--max-ref-mmqs',
        dest='max_ref_mmqs', type=int,
        default=100,
        help='Maximum mismatch quality sum of reads supporting the ref '
             'allele (default: 100)'
    )
    filter_group.add_argument(
        '--max-var-mmqs',
        dest='max_var_mmqs', type=int,
        default=100,
        help='Maximum mismatch quality sum of reads supporting the variant '
             'allele (default: 100)'
    )
    filter_group.add_argument(
        '--min-mmqs-diff',
        dest='min_mmqs_diff', type=int,
        default=0,
        help='Minimum mismatch quality sum difference (var - ref; default: 0)'
    )
    filter_group.add_argument(
        '--max-mmqs-diff',
        dest='max_mmqs_diff', type=int,
        default=50,
        help='Maximum mismatch quality sum difference (var - ref; default: 50)'
    )
    args = vars(p.parse_args())
    varscan_call(**args)