Mercurial > repos > shellac > sam_consensus_v3
diff env/lib/python3.9/site-packages/planemo/galaxy/test/actions.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.9/site-packages/planemo/galaxy/test/actions.py Mon Mar 22 18:12:50 2021 +0000 @@ -0,0 +1,399 @@ +"""Actions related to running and reporting on Galaxy-specific testing.""" + +import io +import json +import os +from distutils.dir_util import copy_tree + +import click +from galaxy.util import unicodify + +from planemo.exit_codes import ( + EXIT_CODE_GENERIC_FAILURE, + EXIT_CODE_NO_SUCH_TARGET, + EXIT_CODE_OK, +) +from planemo.galaxy.run import ( + run_galaxy_command, + setup_venv, +) +from planemo.io import error, info, shell_join, warn +from planemo.reports import ( + allure, + build_report, +) +from planemo.test.results import get_dict_value +from . import structures as test_structures + + +NO_XUNIT_REPORT_MESSAGE = ("Cannot locate xUnit report [%s] for tests - " + "required to build planemo report and summarize " + "tests.") +NO_JSON_REPORT_MESSAGE = ("Cannot locate JSON report [%s] for tests - " + "required to build planemo report and summarize " + "tests.") +REPORT_NOT_CHANGED = ("Galaxy failed to update test report [%s] for tests - " + "required to build planemo report and summarize " + "tests.") +NO_TESTS_MESSAGE = "No tests were executed - see Galaxy output for details." +ALL_TESTS_PASSED_MESSAGE = "All %d test(s) executed passed." +PROBLEM_COUNT_MESSAGE = ("There were problems with %d test(s) - out of %d " + "test(s) executed. See %s for detailed breakdown.") +GENERIC_PROBLEMS_MESSAGE = ("One or more tests failed. See %s for detailed " + "breakdown.") +GENERIC_TESTS_PASSED_MESSAGE = "No failing tests encountered." +TEST_DATA_UPDATED_MESSAGE = "Test data were updated and tests were rerun." +TEST_DATA_NOT_UPDATED_MESSAGE = "%s Therefore, no test data were updated." % ALL_TESTS_PASSED_MESSAGE + + +def run_in_config(ctx, config, run=run_galaxy_command, test_data_target_dir=None, **kwds): + """Run Galaxy tests with the run_tests.sh command. + + The specified `config` object describes the context for tool + execution. + """ + config_directory = config.config_directory + html_report_file = kwds["test_output"] + + job_output_files = kwds.get("job_output_files", None) + if job_output_files is None: + job_output_files = os.path.join(config_directory, "jobfiles") + + xunit_report_file = _xunit_state(kwds, config) + xunit_report_file_tracker = _FileChangeTracker(xunit_report_file) + structured_report_file = _structured_report_file(kwds, config) + structured_report_file_tracker = _FileChangeTracker(structured_report_file) + + info("Testing using galaxy_root %s", config.galaxy_root) + # TODO: Allow running dockerized Galaxy here instead. + server_ini = os.path.join(config_directory, "galaxy.ini") + config.env["GALAXY_CONFIG_FILE"] = server_ini + config.env["GALAXY_TEST_VERBOSE_ERRORS"] = "true" + config.env["GALAXY_TEST_SAVE"] = job_output_files + + cd_to_galaxy_command = ['cd', config.galaxy_root] + test_cmd = test_structures.GalaxyTestCommand( + html_report_file, + xunit_report_file, + structured_report_file, + failed=kwds.get("failed", False), + installed=kwds.get("installed", False), + ).build() + setup_common_startup_args = "" + if kwds.get("skip_venv", False): + setup_common_startup_args = shell_join( + 'COMMON_STARTUP_ARGS=--skip-venv', + 'export COMMON_STARTUP_ARGS', + 'echo "Set COMMON_STARTUP_ARGS to ${COMMON_STARTUP_ARGS}"', + ) + setup_venv_command = setup_venv(ctx, kwds) + cmd = shell_join( + cd_to_galaxy_command, + setup_common_startup_args, + setup_venv_command, + test_cmd, + ) + action = "Testing tools" + return_code = run( + ctx, + cmd, + config.env, + action + ) + if return_code != 0 and kwds.get('update_test_data', False): + for test_data_dir in [config.test_data_dir, test_data_target_dir]: + if test_data_dir: + copy_tree(job_output_files, test_data_dir) + kwds['test_data_updated'] = True + info('Test data updated. Rerunning...') + return_code = run( + ctx, + cmd, + config.env, + action + ) + + _check_test_outputs(xunit_report_file_tracker, structured_report_file_tracker) + test_results = test_structures.GalaxyTestResults( + structured_report_file, + xunit_report_file, + html_report_file, + return_code, + ) + + structured_data = test_results.structured_data + return handle_reports_and_summary( + ctx, + structured_data, + exit_code=test_results.exit_code, + kwds=kwds + ) + + +def handle_reports_and_summary(ctx, structured_data, exit_code=None, kwds=None): + """Produce reports and print summary, return 0 if tests passed. + + If ``exit_code`` is set - use underlying test source for return + code and test success determination, otherwise infer from supplied + test data. + """ + if kwds is None: + kwds = {} + handle_reports(ctx, structured_data, kwds) + summary_exit_code = _handle_summary( + structured_data, + **kwds + ) + return exit_code if exit_code is not None else summary_exit_code + + +def merge_reports(input_paths, output_path): + reports = [] + for path in input_paths: + with io.open(path, encoding='utf-8') as f: + reports.append(json.load(f)) + tests = [] + for report in reports: + tests.extend(report["tests"]) + tests = sorted(tests, key=lambda k: k['id']) + merged_report = {"tests": tests} + with io.open(output_path, mode="w", encoding='utf-8') as out: + out.write(unicodify(json.dumps(merged_report))) + + +def handle_reports(ctx, structured_data, kwds): + """Write reports based on user specified kwds.""" + exceptions = [] + structured_report_file = kwds.get("test_output_json", None) + if structured_report_file and not os.path.exists(structured_report_file): + try: + with io.open(structured_report_file, mode="w", encoding='utf-8') as f: + f.write(unicodify(json.dumps(structured_data))) + except Exception as e: + exceptions.append(e) + + for report_type in ["html", "markdown", "text", "xunit", "junit", "allure"]: + try: + _handle_test_output_file( + ctx, report_type, structured_data, kwds + ) + except Exception as e: + exceptions.append(e) + + if len(exceptions) > 0: + raise exceptions[0] + + +def _handle_test_output_file(ctx, report_type, test_data, kwds): + kwd_name = "test_output" + if report_type != "html": + kwd_name = "test_output_%s" % report_type + + path = kwds.get(kwd_name, None) + if path is None: + message = "No file specified for %s, skipping test output." % kwd_name + ctx.vlog(message) + return + + if report_type == "allure": + file_modication_datatime = kwds.get("file_modication_datatime") + allure.write_results(path, test_data, file_modication_datatime=file_modication_datatime) + return + + try: + contents = build_report.build_report( + test_data, report_type=report_type + ) + except Exception: + message = "Problem producing report file %s for %s" % ( + path, kwd_name + ) + ctx.vlog(message, exception=True) + raise + + try: + with io.open(path, mode='w', encoding='utf-8') as handle: + handle.write(unicodify(contents)) + except Exception: + message = "Problem writing output file %s for %s" % ( + kwd_name, path + ) + ctx.vlog(message, exception=True) + raise + + +def _handle_summary( + structured_data, + **kwds +): + summary_dict = get_dict_value("summary", structured_data) + num_tests = get_dict_value("num_tests", summary_dict) + num_failures = get_dict_value("num_failures", summary_dict) + num_errors = get_dict_value("num_errors", summary_dict) + num_problems = num_failures + num_errors + + summary_exit_code = EXIT_CODE_OK + if num_problems > 0: + summary_exit_code = EXIT_CODE_GENERIC_FAILURE + elif num_tests == 0: + summary_exit_code = EXIT_CODE_NO_SUCH_TARGET + + summary_style = kwds.get("summary") + if kwds.get('test_data_updated'): + info(TEST_DATA_UPDATED_MESSAGE) + if summary_style != "none": + if num_tests == 0: + warn(NO_TESTS_MESSAGE) + elif num_problems == 0: + if kwds.get('update_test_data') and not kwds.get('test_data_updated'): + info(TEST_DATA_NOT_UPDATED_MESSAGE % num_tests) + else: + info(ALL_TESTS_PASSED_MESSAGE % num_tests) + elif num_problems: + html_report_file = kwds.get("test_output") + message_args = (num_problems, num_tests, html_report_file) + message = PROBLEM_COUNT_MESSAGE % message_args + warn(message) + + _summarize_tests_full( + structured_data, + **kwds + ) + + return summary_exit_code + + +def _summarize_tests_full( + structured_data, + **kwds +): + tests = get_dict_value("tests", structured_data) + for test_case_data in tests: + _summarize_test_case(test_case_data, **kwds) + + +def passed(xunit_testcase_el): + did_pass = True + for child_el in list(xunit_testcase_el): + if child_el.tag in ["failure", "error"]: + did_pass = False + return did_pass + + +def _summarize_test_case(structured_data, **kwds): + summary_style = kwds.get("summary") + test_id = test_structures.case_id( + raw_id=get_dict_value("id", structured_data) + ) + status = get_dict_value( + "status", + get_dict_value("data", structured_data) + ) + if status != "success": + state = click.style("failed", bold=True, fg='red') + else: + state = click.style("passed", bold=True, fg='green') + click.echo(test_id.label + ": " + state) + if summary_style != "minimal": + _print_command_line(structured_data, test_id) + + +def _print_command_line(test, test_id): + execution_problem = test.get("execution_problem", None) + if execution_problem: + click.echo("| command: *could not execute job, no command generated* ") + return + + job = None + try: + job = test["job"] + except (KeyError, IndexError): + click.echo("| command: *failed to find job for test object [%s]" % test) + return + try: + command = job["command_line"] + except (KeyError, IndexError): + click.echo("| command: *failed to find command_line for job object [%s]" % job) + return + + click.echo("| command: %s" % command) + + +def _check_test_outputs( + xunit_report_file_tracker, + structured_report_file_tracker +): + if not os.path.exists(xunit_report_file_tracker.path): + message = NO_XUNIT_REPORT_MESSAGE % xunit_report_file_tracker.path + error(message) + raise Exception(message) + + if not os.path.exists(structured_report_file_tracker.path): + message = NO_JSON_REPORT_MESSAGE % structured_report_file_tracker.path + error(message) + raise Exception(message) + + if not xunit_report_file_tracker.changed(): + message = REPORT_NOT_CHANGED % xunit_report_file_tracker.path + error(message) + raise Exception(message) + + if not structured_report_file_tracker.changed(): + message = REPORT_NOT_CHANGED % structured_report_file_tracker.path + error(message) + raise Exception(message) + + +def _xunit_state(kwds, config): + # This has been supported in Galaxy for well over a year, just going to assume + # it from here on out. + # xunit_supported = True + # if shell("grep -q xunit '%s'/run_tests.sh" % config.galaxy_root): + # xunit_supported = False + + xunit_report_file = kwds.get("test_output_xunit", None) + if xunit_report_file is None: + xunit_report_file = os.path.join(config.config_directory, "xunit.xml") + + return xunit_report_file + + +def _structured_report_file(kwds, config): + # This has been supported in Galaxy for well over a year, just going to assume + # it from here on out. + # structured_data_supported = True + # if shell("grep -q structured_data '%s'/run_tests.sh" % config.galaxy_root): + # structured_data_supported = False + + structured_report_file = None + structured_report_file = kwds.get("test_output_json", None) + if structured_report_file is None: + conf_dir = config.config_directory + structured_report_file = os.path.join(conf_dir, "structured_data.json") + + return structured_report_file + + +class _FileChangeTracker(object): + + def __init__(self, path): + modification_time = None + if os.path.exists(path): + modification_time = os.path.getmtime(path) + + self.path = path + self.modification_time = modification_time + + def changed(self): + if self.modification_time: + new_modification_time = os.path.getmtime(self.path) + return self.modification_time != new_modification_time + else: + return os.path.exists(self.path) + + +__all__ = ( + "run_in_config", + "handle_reports", + "handle_reports_and_summary", +)