view search_model_validation.py @ 40:96bb4c49e30e draft default tip

planemo upload for repository https://github.com/bgruening/galaxytools/tree/master/tools/sklearn commit 57a0433defa3cbc37ab34fbb0ebcfaeb680db8d5
author bgruening
date Sun, 05 Nov 2023 15:34:49 +0000
parents 61edd9e5c17f
children
line wrap: on
line source

import argparse
import json
import os
import sys
import warnings
from distutils.version import LooseVersion as Version

import imblearn
import joblib
import numpy as np
import pandas as pd
import skrebate
from galaxy_ml import __version__ as galaxy_ml_version
from galaxy_ml.binarize_target import IRAPSClassifier
from galaxy_ml.model_persist import dump_model_to_h5, load_model_from_h5
from galaxy_ml.utils import (
    clean_params,
    get_cv,
    get_main_estimator,
    get_module,
    get_scoring,
    read_columns,
    SafeEval,
    try_get_attr
)
from scipy.io import mmread
from sklearn import (
    cluster,
    decomposition,
    feature_selection,
    kernel_approximation,
    model_selection,
    preprocessing,
)
from sklearn.exceptions import FitFailedWarning
from sklearn.model_selection import _search, _validation
from sklearn.model_selection._validation import _score, cross_validate
from sklearn.preprocessing import LabelEncoder
from skopt import BayesSearchCV

N_JOBS = int(os.environ.get("GALAXY_SLOTS", 1))
# handle  disk cache
CACHE_DIR = os.path.join(os.getcwd(), "cached")
NON_SEARCHABLE = (
    "n_jobs",
    "pre_dispatch",
    "memory",
    "_path",
    "_dir",
    "nthread",
    "callbacks",
)


def _eval_search_params(params_builder):
    search_params = {}

    for p in params_builder["param_set"]:
        search_list = p["sp_list"].strip()
        if search_list == "":
            continue

        param_name = p["sp_name"]
        if param_name.lower().endswith(NON_SEARCHABLE):
            print(
                "Warning: `%s` is not eligible for search and was "
                "omitted!" % param_name
            )
            continue

        if not search_list.startswith(":"):
            safe_eval = SafeEval(load_scipy=True, load_numpy=True)
            ev = safe_eval(search_list)
            search_params[param_name] = ev
        else:
            # Have `:` before search list, asks for estimator evaluatio
            safe_eval_es = SafeEval(load_estimators=True)
            search_list = search_list[1:].strip()
            # TODO maybe add regular express check
            ev = safe_eval_es(search_list)
            preprocessings = (
                preprocessing.StandardScaler(),
                preprocessing.Binarizer(),
                preprocessing.MaxAbsScaler(),
                preprocessing.Normalizer(),
                preprocessing.MinMaxScaler(),
                preprocessing.PolynomialFeatures(),
                preprocessing.RobustScaler(),
                feature_selection.SelectKBest(),
                feature_selection.GenericUnivariateSelect(),
                feature_selection.SelectPercentile(),
                feature_selection.SelectFpr(),
                feature_selection.SelectFdr(),
                feature_selection.SelectFwe(),
                feature_selection.VarianceThreshold(),
                decomposition.FactorAnalysis(random_state=0),
                decomposition.FastICA(random_state=0),
                decomposition.IncrementalPCA(),
                decomposition.KernelPCA(random_state=0, n_jobs=N_JOBS),
                decomposition.LatentDirichletAllocation(random_state=0, n_jobs=N_JOBS),
                decomposition.MiniBatchDictionaryLearning(
                    random_state=0, n_jobs=N_JOBS
                ),
                decomposition.MiniBatchSparsePCA(random_state=0, n_jobs=N_JOBS),
                decomposition.NMF(random_state=0),
                decomposition.PCA(random_state=0),
                decomposition.SparsePCA(random_state=0, n_jobs=N_JOBS),
                decomposition.TruncatedSVD(random_state=0),
                kernel_approximation.Nystroem(random_state=0),
                kernel_approximation.RBFSampler(random_state=0),
                kernel_approximation.AdditiveChi2Sampler(),
                kernel_approximation.SkewedChi2Sampler(random_state=0),
                cluster.FeatureAgglomeration(),
                skrebate.ReliefF(n_jobs=N_JOBS),
                skrebate.SURF(n_jobs=N_JOBS),
                skrebate.SURFstar(n_jobs=N_JOBS),
                skrebate.MultiSURF(n_jobs=N_JOBS),
                skrebate.MultiSURFstar(n_jobs=N_JOBS),
                imblearn.under_sampling.ClusterCentroids(random_state=0, n_jobs=N_JOBS),
                imblearn.under_sampling.CondensedNearestNeighbour(
                    random_state=0, n_jobs=N_JOBS
                ),
                imblearn.under_sampling.EditedNearestNeighbours(n_jobs=N_JOBS),
                imblearn.under_sampling.RepeatedEditedNearestNeighbours(n_jobs=N_JOBS),
                imblearn.under_sampling.AllKNN(n_jobs=N_JOBS),
                imblearn.under_sampling.InstanceHardnessThreshold(
                    random_state=0, n_jobs=N_JOBS
                ),
                imblearn.under_sampling.NearMiss(n_jobs=N_JOBS),
                imblearn.under_sampling.NeighbourhoodCleaningRule(n_jobs=N_JOBS),
                imblearn.under_sampling.OneSidedSelection(
                    random_state=0, n_jobs=N_JOBS
                ),
                imblearn.under_sampling.RandomUnderSampler(random_state=0),
                imblearn.under_sampling.TomekLinks(n_jobs=N_JOBS),
                imblearn.over_sampling.ADASYN(random_state=0, n_jobs=N_JOBS),
                imblearn.over_sampling.BorderlineSMOTE(random_state=0, n_jobs=N_JOBS),
                imblearn.over_sampling.KMeansSMOTE(random_state=0, n_jobs=N_JOBS),
                imblearn.over_sampling.RandomOverSampler(random_state=0),
                imblearn.over_sampling.SMOTE(random_state=0, n_jobs=N_JOBS),
                imblearn.over_sampling.SMOTEN(random_state=0, n_jobs=N_JOBS),
                imblearn.over_sampling.SMOTENC(
                    categorical_features=[], random_state=0, n_jobs=N_JOBS
                ),
                imblearn.over_sampling.SVMSMOTE(random_state=0, n_jobs=N_JOBS),
                imblearn.combine.SMOTEENN(random_state=0),
                imblearn.combine.SMOTETomek(random_state=0),
            )
            newlist = []
            for obj in ev:
                if obj is None:
                    newlist.append(None)
                elif obj == "all_0":
                    newlist.extend(preprocessings[0:35])
                elif obj == "sk_prep_all":  # no KernalCenter()
                    newlist.extend(preprocessings[0:7])
                elif obj == "fs_all":
                    newlist.extend(preprocessings[7:14])
                elif obj == "decomp_all":
                    newlist.extend(preprocessings[14:25])
                elif obj == "k_appr_all":
                    newlist.extend(preprocessings[25:29])
                elif obj == "reb_all":
                    newlist.extend(preprocessings[30:35])
                elif obj == "imb_all":
                    newlist.extend(preprocessings[35:54])
                elif type(obj) is int and -1 < obj < len(preprocessings):
                    newlist.append(preprocessings[obj])
                elif hasattr(obj, "get_params"):  # user uploaded object
                    if "n_jobs" in obj.get_params():
                        newlist.append(obj.set_params(n_jobs=N_JOBS))
                    else:
                        newlist.append(obj)
                else:
                    sys.exit("Unsupported estimator type: %r" % (obj))

            search_params[param_name] = newlist

    return search_params


def _handle_X_y(
    estimator,
    params,
    infile1,
    infile2,
    loaded_df={},
    ref_seq=None,
    intervals=None,
    targets=None,
    fasta_path=None,
):
    """read inputs

    Params
    -------
    estimator : estimator object
    params : dict
        Galaxy tool parameter inputs
    infile1 : str
        File path to dataset containing features
    infile2 : str
        File path to dataset containing target values
    loaded_df : dict
        Contains loaded DataFrame objects with file path as keys
    ref_seq : str
        File path to dataset containing genome sequence file
    interval : str
        File path to dataset containing interval file
    targets : str
        File path to dataset compressed target bed file
    fasta_path : str
        File path to dataset containing fasta file


    Returns
    -------
    estimator : estimator object after setting new attributes
    X : numpy array
    y : numpy array
    """
    estimator_params = estimator.get_params()

    input_type = params["input_options"]["selected_input"]
    # tabular input
    if input_type == "tabular":
        header = "infer" if params["input_options"]["header1"] else None
        column_option = params["input_options"]["column_selector_options_1"][
            "selected_column_selector_option"
        ]
        if column_option in [
            "by_index_number",
            "all_but_by_index_number",
            "by_header_name",
            "all_but_by_header_name",
        ]:
            c = params["input_options"]["column_selector_options_1"]["col1"]
        else:
            c = None

        df_key = infile1 + repr(header)

        if df_key in loaded_df:
            infile1 = loaded_df[df_key]

        df = pd.read_csv(infile1, sep="\t", header=header, parse_dates=True)
        loaded_df[df_key] = df

        X = read_columns(df, c=c, c_option=column_option).astype(float)
    # sparse input
    elif input_type == "sparse":
        X = mmread(open(infile1, "r"))

    # fasta_file input
    elif input_type == "seq_fasta":
        pyfaidx = get_module("pyfaidx")
        sequences = pyfaidx.Fasta(fasta_path)
        n_seqs = len(sequences.keys())
        X = np.arange(n_seqs)[:, np.newaxis]
        for param in estimator_params.keys():
            if param.endswith("fasta_path"):
                estimator.set_params(**{param: fasta_path})
                break
        else:
            raise ValueError(
                "The selected estimator doesn't support "
                "fasta file input! Please consider using "
                "KerasGBatchClassifier with "
                "FastaDNABatchGenerator/FastaProteinBatchGenerator "
                "or having GenomeOneHotEncoder/ProteinOneHotEncoder "
                "in pipeline!"
            )

    elif input_type == "refseq_and_interval":
        path_params = {
            "data_batch_generator__ref_genome_path": ref_seq,
            "data_batch_generator__intervals_path": intervals,
            "data_batch_generator__target_path": targets,
        }
        estimator.set_params(**path_params)
        n_intervals = sum(1 for line in open(intervals))
        X = np.arange(n_intervals)[:, np.newaxis]

    # Get target y
    header = "infer" if params["input_options"]["header2"] else None
    column_option = params["input_options"]["column_selector_options_2"][
        "selected_column_selector_option2"
    ]
    if column_option in [
        "by_index_number",
        "all_but_by_index_number",
        "by_header_name",
        "all_but_by_header_name",
    ]:
        c = params["input_options"]["column_selector_options_2"]["col2"]
    else:
        c = None

    df_key = infile2 + repr(header)
    if df_key in loaded_df:
        infile2 = loaded_df[df_key]
    else:
        infile2 = pd.read_csv(infile2, sep="\t", header=header, parse_dates=True)
        loaded_df[df_key] = infile2

    y = read_columns(
        infile2,
        c=c,
        c_option=column_option,
        sep="\t",
        header=header,
        parse_dates=True,
    )
    if len(y.shape) == 2 and y.shape[1] == 1:
        y = y.ravel()
    if input_type == "refseq_and_interval":
        estimator.set_params(data_batch_generator__features=y.ravel().tolist())
        y = None
    # end y

    return estimator, X, y


def _do_outer_cv(searcher, X, y, outer_cv, scoring, error_score="raise", outfile=None):
    """Do outer cross-validation for nested CV

    Parameters
    ----------
    searcher : object
        SearchCV object
    X : numpy array
        Containing features
    y : numpy array
        Target values or labels
    outer_cv : int or CV splitter
        Control the cv splitting
    scoring : object
        Scorer
    error_score: str, float or numpy float
        Whether to raise fit error or return an value
    outfile : str
        File path to store the restuls
    """
    if error_score == "raise":
        rval = cross_validate(
            searcher,
            X,
            y,
            scoring=scoring,
            cv=outer_cv,
            n_jobs=N_JOBS,
            verbose=0,
            error_score=error_score,
        )
    else:
        warnings.simplefilter("always", FitFailedWarning)
        with warnings.catch_warnings(record=True) as w:
            try:
                rval = cross_validate(
                    searcher,
                    X,
                    y,
                    scoring=scoring,
                    cv=outer_cv,
                    n_jobs=N_JOBS,
                    verbose=0,
                    error_score=error_score,
                )
            except ValueError:
                pass
            for warning in w:
                print(repr(warning.message))

    keys = list(rval.keys())
    for k in keys:
        if k.startswith("test"):
            rval["mean_" + k] = np.mean(rval[k])
            rval["std_" + k] = np.std(rval[k])
        if k.endswith("time"):
            rval.pop(k)
    rval = pd.DataFrame(rval)
    rval = rval[sorted(rval.columns)]
    rval.to_csv(path_or_buf=outfile, sep="\t", header=True, index=False)


def _do_train_test_split_val(
    searcher,
    X,
    y,
    params,
    error_score="raise",
    primary_scoring=None,
    groups=None,
    outfile=None,
):
    """do train test split, searchCV validates on the train and then use
    the best_estimator_ to evaluate on the test

    Returns
    --------
    Fitted SearchCV object
    """
    train_test_split = try_get_attr("galaxy_ml.model_validations", "train_test_split")
    split_options = params["outer_split"]

    # splits
    if split_options["shuffle"] == "stratified":
        split_options["labels"] = y
        X, X_test, y, y_test = train_test_split(X, y, **split_options)
    elif split_options["shuffle"] == "group":
        if groups is None:
            raise ValueError(
                "No group based CV option was choosen for " "group shuffle!"
            )
        split_options["labels"] = groups
        if y is None:
            X, X_test, groups, _ = train_test_split(X, groups, **split_options)
        else:
            X, X_test, y, y_test, groups, _ = train_test_split(
                X, y, groups, **split_options
            )
    else:
        if split_options["shuffle"] == "None":
            split_options["shuffle"] = None
        X, X_test, y, y_test = train_test_split(X, y, **split_options)

    if error_score == "raise":
        searcher.fit(X, y, groups=groups)
    else:
        warnings.simplefilter("always", FitFailedWarning)
        with warnings.catch_warnings(record=True) as w:
            try:
                searcher.fit(X, y, groups=groups)
            except ValueError:
                pass
            for warning in w:
                print(repr(warning.message))

    scorer_ = searcher.scorer_

    best_estimator_ = getattr(searcher, "best_estimator_")

    # TODO Solve deep learning models in pipeline
    if best_estimator_.__class__.__name__ == "KerasGBatchClassifier":
        test_score = best_estimator_.evaluate(
            X_test,
            scorer=scorer_,
        )
    else:
        test_score = _score(best_estimator_, X_test, y_test, scorer_)

    if not isinstance(scorer_, dict):
        test_score = {primary_scoring: test_score}
    for key, value in test_score.items():
        test_score[key] = [value]
    result_df = pd.DataFrame(test_score)
    result_df.to_csv(path_or_buf=outfile, sep="\t", header=True, index=False)

    return searcher


def _set_memory(estimator, memory):
    """set memeory cache

    Parameters
    ----------
    estimator : python object
    memory : joblib.Memory object

    Returns
    -------
    estimator : estimator object after setting new attributes
    """
    if isinstance(estimator, IRAPSClassifier):
        estimator.set_params(memory=memory)
        return estimator

    estimator_params = estimator.get_params()

    new_params = {}
    for k in estimator_params.keys():
        if k.endswith("irapsclassifier__memory"):
            new_params[k] = memory

    estimator.set_params(**new_params)

    return estimator


def main(
    inputs,
    infile_estimator,
    infile1,
    infile2,
    outfile_result,
    outfile_object=None,
    groups=None,
    ref_seq=None,
    intervals=None,
    targets=None,
    fasta_path=None,
):
    """
    Parameter
    ---------
    inputs : str
        File path to galaxy tool parameter.

    infile_estimator : str
        File path to estimator.

    infile1 : str
        File path to dataset containing features

    infile2 : str
        File path to dataset containing target values

    outfile_result : str
        File path to save the results, either cv_results or test result

    outfile_object : str, optional
        File path to save searchCV object

    groups : str
        File path to dataset containing groups labels

    ref_seq : str
        File path to dataset containing genome sequence file

    intervals : str
        File path to dataset containing interval file

    targets : str
        File path to dataset compressed target bed file

    fasta_path : str
        File path to dataset containing fasta file
    """
    warnings.simplefilter("ignore")

    # store read dataframe object
    loaded_df = {}

    with open(inputs, "r") as param_handler:
        params = json.load(param_handler)

    # Override the refit parameter
    params["options"]["refit"] = (
        True
        if (
            params["save"] != "nope"
            or params["outer_split"]["split_mode"] == "nested_cv"
        )
        else False
    )

    estimator = load_model_from_h5(infile_estimator)

    estimator = clean_params(estimator)

    if estimator.__class__.__name__ == "KerasGBatchClassifier":
        _fit_and_score = try_get_attr(
            "galaxy_ml.model_validations",
            "_fit_and_score",
        )

        setattr(_search, "_fit_and_score", _fit_and_score)
        setattr(_validation, "_fit_and_score", _fit_and_score)

    search_algos_and_options = params["search_algos"]
    optimizer = search_algos_and_options.pop("selected_search_algo")
    if optimizer == "skopt.BayesSearchCV":
        optimizer = BayesSearchCV
    else:
        optimizer = getattr(model_selection, optimizer)

    # handle gridsearchcv options
    options = params["options"]
    options.update(search_algos_and_options)

    if groups:
        header = (
            "infer" if (options["cv_selector"]["groups_selector"]["header_g"]) else None
        )
        column_option = options["cv_selector"]["groups_selector"][
            "column_selector_options_g"
        ]["selected_column_selector_option_g"]
        if column_option in [
            "by_index_number",
            "all_but_by_index_number",
            "by_header_name",
            "all_but_by_header_name",
        ]:
            c = options["cv_selector"]["groups_selector"]["column_selector_options_g"][
                "col_g"
            ]
        else:
            c = None

        df_key = groups + repr(header)

        groups = pd.read_csv(groups, sep="\t", header=header, parse_dates=True)
        loaded_df[df_key] = groups

        groups = read_columns(
            groups,
            c=c,
            c_option=column_option,
            sep="\t",
            header=header,
            parse_dates=True,
        )
        groups = groups.ravel()
        options["cv_selector"]["groups_selector"] = groups

    cv_selector = options.pop("cv_selector")
    if Version(galaxy_ml_version) < Version("0.8.3"):
        cv_selector.pop("n_stratification_bins", None)
    splitter, groups = get_cv(cv_selector)
    options["cv"] = splitter
    primary_scoring = options["scoring"]["primary_scoring"]
    options["scoring"] = get_scoring(options["scoring"])
    # TODO make BayesSearchCV support multiple scoring
    if optimizer == "skopt.BayesSearchCV" and isinstance(options["scoring"], dict):
        options["scoring"] = options["scoring"][primary_scoring]
        warnings.warn(
            "BayesSearchCV doesn't support multiple "
            "scorings! Primary scoring is used."
        )
    if options["error_score"]:
        options["error_score"] = "raise"
    else:
        options["error_score"] = np.NaN
    if options["refit"] and isinstance(options["scoring"], dict):
        options["refit"] = primary_scoring
    if "pre_dispatch" in options and options["pre_dispatch"] == "":
        options["pre_dispatch"] = None

    params_builder = params["search_params_builder"]
    param_grid = _eval_search_params(params_builder)

    # save the SearchCV object without fit
    if params["save"] == "save_no_fit":
        searcher = optimizer(estimator, param_grid, **options)
        dump_model_to_h5(searcher, outfile_object)
        return 0

    # read inputs and loads new attributes, like paths
    estimator, X, y = _handle_X_y(
        estimator,
        params,
        infile1,
        infile2,
        loaded_df=loaded_df,
        ref_seq=ref_seq,
        intervals=intervals,
        targets=targets,
        fasta_path=fasta_path,
    )

    label_encoder = LabelEncoder()
    if get_main_estimator(estimator).__class__.__name__ == "XGBClassifier":
        y = label_encoder.fit_transform(y)

    # cache iraps_core fits could increase search speed significantly
    memory = joblib.Memory(location=CACHE_DIR, verbose=0)
    estimator = _set_memory(estimator, memory)

    searcher = optimizer(estimator, param_grid, **options)

    split_mode = params["outer_split"].pop("split_mode")

    # Nested CV
    if split_mode == "nested_cv":
        cv_selector = params["outer_split"]["cv_selector"]
        if Version(galaxy_ml_version) < Version("0.8.3"):
            cv_selector.pop("n_stratification_bins", None)
        outer_cv, _ = get_cv(cv_selector)
        # nested CV, outer cv using cross_validate
        if options["error_score"] == "raise":
            rval = cross_validate(
                searcher,
                X,
                y,
                groups=groups,
                scoring=options["scoring"],
                cv=outer_cv,
                n_jobs=N_JOBS,
                verbose=options["verbose"],
                fit_params={"groups": groups},
                return_estimator=(params["save"] == "save_estimator"),
                error_score=options["error_score"],
                return_train_score=True,
            )
        else:
            warnings.simplefilter("always", FitFailedWarning)
            with warnings.catch_warnings(record=True) as w:
                try:
                    rval = cross_validate(
                        searcher,
                        X,
                        y,
                        groups=groups,
                        scoring=options["scoring"],
                        cv=outer_cv,
                        n_jobs=N_JOBS,
                        verbose=options["verbose"],
                        fit_params={"groups": groups},
                        return_estimator=(params["save"] == "save_estimator"),
                        error_score=options["error_score"],
                        return_train_score=True,
                    )
                except ValueError:
                    pass
                for warning in w:
                    print(repr(warning.message))

        fitted_searchers = rval.pop("estimator", [])
        if fitted_searchers:
            import os

            pwd = os.getcwd()
            save_dir = os.path.join(pwd, "cv_results_in_folds")
            try:
                os.mkdir(save_dir)
                for idx, obj in enumerate(fitted_searchers):
                    target_name = "cv_results_" + "_" + "split%d" % idx
                    target_path = os.path.join(pwd, save_dir, target_name)
                    cv_results_ = getattr(obj, "cv_results_", None)
                    if not cv_results_:
                        print("%s is not available" % target_name)
                        continue
                    cv_results_ = pd.DataFrame(cv_results_)
                    cv_results_ = cv_results_[sorted(cv_results_.columns)]
                    cv_results_.to_csv(target_path, sep="\t", header=True, index=False)
            except Exception as e:
                print(e)

        keys = list(rval.keys())
        for k in keys:
            if k.startswith("test"):
                rval["mean_" + k] = np.mean(rval[k])
                rval["std_" + k] = np.std(rval[k])
            if k.endswith("time"):
                rval.pop(k)
        rval = pd.DataFrame(rval)
        rval = rval[sorted(rval.columns)]
        rval.to_csv(path_or_buf=outfile_result, sep="\t", header=True, index=False)

        return 0

        # deprecate train test split mode
        """searcher = _do_train_test_split_val(
            searcher, X, y, params,
            primary_scoring=primary_scoring,
            error_score=options['error_score'],
            groups=groups,
            outfile=outfile_result)"""

    # no outer split
    else:
        searcher.set_params(n_jobs=N_JOBS)
        if options["error_score"] == "raise":
            searcher.fit(X, y, groups=groups)
        else:
            warnings.simplefilter("always", FitFailedWarning)
            with warnings.catch_warnings(record=True) as w:
                try:
                    searcher.fit(X, y, groups=groups)
                except ValueError:
                    pass
                for warning in w:
                    print(repr(warning.message))

        cv_results = pd.DataFrame(searcher.cv_results_)
        cv_results = cv_results[sorted(cv_results.columns)]
        cv_results.to_csv(
            path_or_buf=outfile_result, sep="\t", header=True, index=False
        )

    memory.clear(warn=False)

    # output best estimator, and weights if applicable
    if outfile_object:
        best_estimator_ = getattr(searcher, "best_estimator_", None)
        if not best_estimator_:
            warnings.warn(
                "GridSearchCV object has no attribute "
                "'best_estimator_', because either it's "
                "nested gridsearch or `refit` is False!"
            )
            return

        dump_model_to_h5(best_estimator_, outfile_object)


if __name__ == "__main__":
    aparser = argparse.ArgumentParser()
    aparser.add_argument("-i", "--inputs", dest="inputs", required=True)
    aparser.add_argument("-e", "--estimator", dest="infile_estimator")
    aparser.add_argument("-X", "--infile1", dest="infile1")
    aparser.add_argument("-y", "--infile2", dest="infile2")
    aparser.add_argument("-O", "--outfile_result", dest="outfile_result")
    aparser.add_argument("-o", "--outfile_object", dest="outfile_object")
    aparser.add_argument("-g", "--groups", dest="groups")
    aparser.add_argument("-r", "--ref_seq", dest="ref_seq")
    aparser.add_argument("-b", "--intervals", dest="intervals")
    aparser.add_argument("-t", "--targets", dest="targets")
    aparser.add_argument("-f", "--fasta_path", dest="fasta_path")
    args = aparser.parse_args()

    main(**vars(args))