diff toolfactory/galaxy-tool-test @ 121:2050b2475ae5 draft

Uploaded
author fubar
date Thu, 07 Jan 2021 09:24:17 +0000
parents d4d88d393285
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/toolfactory/galaxy-tool-test	Thu Jan 07 09:24:17 2021 +0000
@@ -0,0 +1,457 @@
+#!/usr/bin/env python
+
+import argparse
+import datetime as dt
+import json
+import logging
+import os
+import sys
+import tempfile
+from collections import namedtuple
+from concurrent.futures import thread, ThreadPoolExecutor
+
+import yaml
+
+from galaxy.tool_util.verify.interactor import (
+    DictClientTestConfig,
+    GalaxyInteractorApi,
+    verify_tool,
+)
+
+DESCRIPTION = """Script to quickly run a tool test against a running Galaxy instance."""
+DEFAULT_SUITE_NAME = "Galaxy Tool Tests"
+ALL_TESTS = -1
+ALL_TOOLS = "*"
+ALL_VERSION = "*"
+LATEST_VERSION = None
+
+
+TestReference = namedtuple("TestReference", ["tool_id", "tool_version", "test_index"])
+TestException = namedtuple("TestException", ["tool_id", "exception", "was_recorded"])
+
+
+class Results:
+
+    def __init__(self, default_suitename, test_json, append=False):
+        self.test_json = test_json or "-"
+        test_results = []
+        test_exceptions = []
+        suitename = default_suitename
+        if append:
+            assert test_json != "-"
+            with open(test_json) as f:
+                previous_results = json.load(f)
+                test_results = previous_results["tests"]
+                if "suitename" in previous_results:
+                    suitename = previous_results["suitename"]
+        self.test_results = test_results
+        self.test_exceptions = test_exceptions
+        self.suitename = suitename
+
+    def register_result(self, result):
+        self.test_results.append(result)
+
+    def register_exception(self, test_exception):
+        self.test_exceptions.append(test_exception)
+
+    def already_successful(self, test_reference):
+        test_id = _test_id_for_reference(test_reference)
+        for test_result in self.test_results:
+            if test_result.get('id') != test_id:
+                continue
+
+            has_data = test_result.get('has_data', False)
+            if has_data:
+                test_data = test_result.get("data", {})
+                if 'status' in test_data and test_data['status'] == 'success':
+                    return True
+
+        return False
+
+    def write(self):
+        tests = sorted(self.test_results, key=lambda el: el['id'])
+        n_passed, n_failures, n_skips = 0, 0, 0
+        n_errors = len([e for e in self.test_exceptions if not e.was_recorded])
+        for test in tests:
+            has_data = test.get('has_data', False)
+            if has_data:
+                test_data = test.get("data", {})
+                if 'status' not in test_data:
+                    raise Exception(f"Test result data {test_data} doesn't contain a status key.")
+                status = test_data['status']
+                if status == "success":
+                    n_passed += 1
+                elif status == "error":
+                    n_errors += 1
+                elif status == "skip":
+                    n_skips += 1
+                elif status == "failure":
+                    n_failures += 1
+        report_obj = {
+            'version': '0.1',
+            'suitename': self.suitename,
+            'results': {
+                'total': n_passed + n_failures + n_skips + n_errors,
+                'errors': n_errors,
+                'failures': n_failures,
+                'skips': n_skips,
+            },
+            'tests': tests,
+        }
+        if self.test_json == "-":
+            print(json.dumps(report_obj))
+        else:
+            with open(self.test_json, "w") as f:
+                json.dump(report_obj, f)
+
+    def info_message(self):
+        messages = []
+        passed_tests = self._tests_with_status('success')
+        messages.append("Passed tool tests ({}): {}".format(
+            len(passed_tests),
+            [t["id"] for t in passed_tests]
+        ))
+        failed_tests = self._tests_with_status('failure')
+        messages.append("Failed tool tests ({}): {}".format(
+            len(failed_tests),
+            [t["id"] for t in failed_tests]
+        ))
+        skiped_tests = self._tests_with_status('skip')
+        messages.append("Skipped tool tests ({}): {}".format(
+            len(skiped_tests),
+            [t["id"] for t in skiped_tests]
+        ))
+        errored_tests = self._tests_with_status('error')
+        messages.append("Errored tool tests ({}): {}".format(
+            len(errored_tests),
+            [t["id"] for t in errored_tests]
+        ))
+        return "\n".join(messages)
+
+    @property
+    def success_count(self):
+        self._tests_with_status('success')
+
+    @property
+    def skip_count(self):
+        self._tests_with_status('skip')
+
+    @property
+    def error_count(self):
+        return self._tests_with_status('error') + len(self.test_exceptions)
+
+    @property
+    def failure_count(self):
+        return self._tests_with_status('failure')
+
+    def _tests_with_status(self, status):
+        return [t for t in self.test_results if t.get("data", {}).get("status") == status]
+
+
+def test_tools(
+    galaxy_interactor,
+    test_references,
+    results,
+    log=None,
+    parallel_tests=1,
+    history_per_test_case=False,
+    no_history_cleanup=False,
+    retries=0,
+    verify_kwds=None,
+):
+    """Run through tool tests and write report.
+
+    Refactor this into Galaxy in 21.01.
+    """
+    verify_kwds = (verify_kwds or {}).copy()
+    tool_test_start = dt.datetime.now()
+    history_created = False
+    if history_per_test_case:
+        test_history = None
+    else:
+        history_created = True
+        test_history = galaxy_interactor.new_history(history_name=f"History for {results.suitename}")
+    verify_kwds.update({
+        "no_history_cleanup": no_history_cleanup,
+        "test_history": test_history,
+    })
+    with ThreadPoolExecutor(max_workers=parallel_tests) as executor:
+        try:
+            for test_reference in test_references:
+                _test_tool(
+                    executor=executor,
+                    test_reference=test_reference,
+                    results=results,
+                    galaxy_interactor=galaxy_interactor,
+                    log=log,
+                    retries=retries,
+                    verify_kwds=verify_kwds,
+                )
+        finally:
+            # Always write report, even if test was cancelled.
+            try:
+                executor.shutdown(wait=True)
+            except KeyboardInterrupt:
+                executor._threads.clear()
+                thread._threads_queues.clear()
+            results.write()
+            if log:
+                log.info("Report written to '%s'", os.path.abspath(results.test_json))
+                log.info(results.info_message())
+                log.info("Total tool test time: {}".format(dt.datetime.now() - tool_test_start))
+            if history_created and not no_history_cleanup:
+                galaxy_interactor.delete_history(test_history)
+
+
+def _test_id_for_reference(test_reference):
+    tool_id = test_reference.tool_id
+    tool_version = test_reference.tool_version
+    test_index = test_reference.test_index
+
+    if tool_version and tool_id.endswith("/" + tool_version):
+        tool_id = tool_id[:-len("/" + tool_version)]
+
+    label_base = tool_id
+    if tool_version:
+        label_base += "/" + str(tool_version)
+
+    test_id = label_base + "-" + str(test_index)
+    return test_id
+
+
+def _test_tool(
+    executor,
+    test_reference,
+    results,
+    galaxy_interactor,
+    log,
+    retries,
+    verify_kwds,
+):
+    tool_id = test_reference.tool_id
+    tool_version = test_reference.tool_version
+    test_index = test_reference.test_index
+    # If given a tool_id with a version suffix, strip it off so we can treat tool_version
+    # correctly at least in client_test_config.
+    if tool_version and tool_id.endswith("/" + tool_version):
+        tool_id = tool_id[:-len("/" + tool_version)]
+
+    test_id = _test_id_for_reference(test_reference)
+
+    def run_test():
+        run_retries = retries
+        job_data = None
+        job_exception = None
+
+        def register(job_data_):
+            nonlocal job_data
+            job_data = job_data_
+
+        try:
+            while run_retries >= 0:
+                job_exception = None
+                try:
+                    if log:
+                        log.info("Executing test '%s'", test_id)
+                    verify_tool(
+                        tool_id, galaxy_interactor, test_index=test_index, tool_version=tool_version,
+                        register_job_data=register, **verify_kwds
+                    )
+                    if log:
+                        log.info("Test '%s' passed", test_id)
+                    break
+                except Exception as e:
+                    if log:
+                        log.warning("Test '%s' failed", test_id, exc_info=True)
+
+                    job_exception = e
+                    run_retries -= 1
+        finally:
+            if job_data is not None:
+                results.register_result({
+                    "id": test_id,
+                    "has_data": True,
+                    "data": job_data,
+                })
+            if job_exception is not None:
+                was_recorded = job_data is not None
+                test_exception = TestException(tool_id, job_exception, was_recorded)
+                results.register_exception(test_exception)
+
+    executor.submit(run_test)
+
+
+def build_case_references(
+    galaxy_interactor,
+    tool_id=ALL_TOOLS,
+    tool_version=LATEST_VERSION,
+    test_index=ALL_TESTS,
+    page_size=0,
+    page_number=0,
+    check_against=None,
+    log=None,
+):
+    test_references = []
+    if tool_id == ALL_TOOLS:
+        tests_summary = galaxy_interactor.get_tests_summary()
+        for tool_id, tool_versions_dict in tests_summary.items():
+            for tool_version, summary in tool_versions_dict.items():
+                for test_index in range(summary["count"]):
+                    test_reference = TestReference(tool_id, tool_version, test_index)
+                    test_references.append(test_reference)
+    else:
+        assert tool_id
+        tool_test_dicts = galaxy_interactor.get_tool_tests(tool_id, tool_version=tool_version) or {}
+        for i, tool_test_dict in enumerate(tool_test_dicts):
+            this_tool_version = tool_test_dict.get("tool_version", tool_version)
+            this_test_index = i
+            if test_index == ALL_TESTS or i == test_index:
+                test_reference = TestReference(tool_id, this_tool_version, this_test_index)
+                test_references.append(test_reference)
+
+    if check_against:
+        filtered_test_references = []
+        for test_reference in test_references:
+            if check_against.already_successful(test_reference):
+                if log is not None:
+                    log.debug(f"Found successful test for {test_reference}, skipping")
+                continue
+            filtered_test_references.append(test_reference)
+        log.info(f"Skipping {len(test_references)-len(filtered_test_references)} out of {len(test_references)} tests.")
+        test_references = filtered_test_references
+
+    if page_size > 0:
+        slice_start = page_size * page_number
+        slice_end = page_size * (page_number + 1)
+        test_references = test_references[slice_start:slice_end]
+
+    return test_references
+
+
+def main(argv=None):
+    if argv is None:
+        argv = sys.argv[1:]
+
+    args = _arg_parser().parse_args(argv)
+    log = setup_global_logger(__name__, verbose=args.verbose)
+    client_test_config_path = args.client_test_config
+    if client_test_config_path is not None:
+        log.debug(f"Reading client config path {client_test_config_path}")
+        with open(client_test_config_path) as f:
+            client_test_config = yaml.full_load(f)
+    else:
+        client_test_config = {}
+
+    def get_option(key):
+        arg_val = getattr(args, key, None)
+        if arg_val is None and key in client_test_config:
+            val = client_test_config.get(key)
+        else:
+            val = arg_val
+        return val
+
+    output_json_path = get_option("output_json")
+    galaxy_interactor_kwds = {
+        "galaxy_url": get_option("galaxy_url"),
+        "master_api_key": get_option("admin_key"),
+        "api_key": get_option("key"),
+        "keep_outputs_dir": args.output,
+        "download_attempts": get_option("download_attempts"),
+        "download_sleep": get_option("download_sleep"),
+    }
+    tool_id = args.tool_id
+    tool_version = args.tool_version
+    tools_client_test_config = DictClientTestConfig(client_test_config.get("tools"))
+    verbose = args.verbose
+
+    galaxy_interactor = GalaxyInteractorApi(**galaxy_interactor_kwds)
+    results = Results(args.suite_name, output_json_path, append=args.append)
+    check_against = None if not args.skip_successful else results
+    test_references = build_case_references(
+        galaxy_interactor,
+        tool_id=tool_id,
+        tool_version=tool_version,
+        test_index=args.test_index,
+        page_size=args.page_size,
+        page_number=args.page_number,
+        check_against=check_against,
+        log=log,
+    )
+    log.debug(f"Built {len(test_references)} test references to executed.")
+    verify_kwds = dict(
+        client_test_config=tools_client_test_config,
+        force_path_paste=args.force_path_paste,
+        skip_with_reference_data=not args.with_reference_data,
+        quiet=not verbose,
+    )
+    test_tools(
+        galaxy_interactor,
+        test_references,
+        results,
+        log=log,
+        parallel_tests=args.parallel_tests,
+        history_per_test_case=args.history_per_test_case,
+        no_history_cleanup=args.no_history_cleanup,
+        verify_kwds=verify_kwds,
+    )
+    exceptions = results.test_exceptions
+    if exceptions:
+        exception = exceptions[0]
+        if hasattr(exception, "exception"):
+            exception = exception.exception
+        raise exception
+
+
+def setup_global_logger(name, log_file=None, verbose=False):
+    formatter = logging.Formatter('%(asctime)s %(levelname)-5s - %(message)s')
+    console = logging.StreamHandler()
+    console.setFormatter(formatter)
+
+    logger = logging.getLogger(name)
+    logger.setLevel(logging.DEBUG if verbose else logging.INFO)
+    logger.addHandler(console)
+
+    if not log_file:
+        # delete = false is chosen here because it is always nice to have a log file
+        # ready if you need to debug. Not having the "if only I had set a log file"
+        # moment after the fact.
+        temp = tempfile.NamedTemporaryFile(prefix="ephemeris_", delete=False)
+        log_file = temp.name
+    file_handler = logging.FileHandler(log_file)
+    logger.addHandler(file_handler)
+    logger.info(f"Storing log file in: {log_file}")
+    return logger
+
+
+def _arg_parser():
+    parser = argparse.ArgumentParser(description=DESCRIPTION)
+    parser.add_argument('-u', '--galaxy-url', default="http://localhost:8080", help='Galaxy URL')
+    parser.add_argument('-k', '--key', default=None, help='Galaxy User API Key')
+    parser.add_argument('-a', '--admin-key', default=None, help='Galaxy Admin API Key')
+    parser.add_argument('--force_path_paste', default=False, action="store_true", help='This requires Galaxy-side config option "allow_path_paste" enabled. Allows for fetching test data locally. Only for admins.')
+    parser.add_argument('-t', '--tool-id', default=ALL_TOOLS, help='Tool ID')
+    parser.add_argument('--tool-version', default=None, help='Tool Version (if tool id supplied). Defaults to just latest version, use * to test all versions')
+    parser.add_argument('-i', '--test-index', default=ALL_TESTS, type=int, help='Tool Test Index (starting at 0) - by default all tests will run.')
+    parser.add_argument('-o', '--output', default=None, help='directory to dump outputs to')
+    parser.add_argument('--append', default=False, action="store_true", help="Extend a test record json (created with --output-json) with additional tests.")
+    parser.add_argument('--skip-successful', default=False, action="store_true", help="When used with --append, skip previously run successful tests.")
+    parser.add_argument('-j', '--output-json', default=None, help='output metadata json')
+    parser.add_argument('--verbose', default=False, action="store_true", help="Verbose logging.")
+    parser.add_argument('-c', '--client-test-config', default=None, help="Test config YAML to help with client testing")
+    parser.add_argument('--suite-name', default=DEFAULT_SUITE_NAME, help="Suite name for tool test output")
+    parser.add_argument('--with-reference-data', dest="with_reference_data", default=False, action="store_true")
+    parser.add_argument('--skip-with-reference-data', dest="with_reference_data", action="store_false", help="Skip tests the Galaxy server believes use data tables or loc files.")
+    parser.add_argument('--history-per-suite', dest="history_per_test_case", default=False, action="store_false", help="Create new history per test suite (all tests in same history).")
+    parser.add_argument('--history-per-test-case', dest="history_per_test_case", action="store_true", help="Create new history per test case.")
+    parser.add_argument('--no-history-cleanup', default=False, action="store_true", help="Perserve histories created for testing.")
+    parser.add_argument('--parallel-tests', default=1, type=int, help="Parallel tests.")
+    parser.add_argument('--retries', default=0, type=int, help="Retry failed tests.")
+    parser.add_argument('--page-size', default=0, type=int, help="If positive, use pagination and just run one 'page' to tool tests.")
+    parser.add_argument('--page-number', default=0, type=int, help="If page size is used, run this 'page' of tests - starts with 0.")
+    parser.add_argument('--download-attempts', default=1, type=int, help="Galaxy may return a transient 500 status code for download if test results are written but not yet accessible.")
+    parser.add_argument('--download-sleep', default=1, type=int, help="If download attempts is greater than 1, the amount to sleep between download attempts.")
+    return parser
+
+
+if __name__ == "__main__":
+    main()