# HG changeset patch # User fubar # Date 1606098121 0 # Node ID d4d88d3932854c3ffe7161e25991614a67ecffe2 # Parent 67628c7dc9f3e8f15e482f57870fd4e047ac99aa Uploaded diff -r 67628c7dc9f3 -r d4d88d393285 toolfactory/galaxy-tool-test --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/toolfactory/galaxy-tool-test Mon Nov 23 02:22:01 2020 +0000 @@ -0,0 +1,457 @@ +#!/usr/bin/env python + +import argparse +import datetime as dt +import json +import logging +import os +import sys +import tempfile +from collections import namedtuple +from concurrent.futures import thread, ThreadPoolExecutor + +import yaml + +from galaxy.tool_util.verify.interactor import ( + DictClientTestConfig, + GalaxyInteractorApi, + verify_tool, +) + +DESCRIPTION = """Script to quickly run a tool test against a running Galaxy instance.""" +DEFAULT_SUITE_NAME = "Galaxy Tool Tests" +ALL_TESTS = -1 +ALL_TOOLS = "*" +ALL_VERSION = "*" +LATEST_VERSION = None + + +TestReference = namedtuple("TestReference", ["tool_id", "tool_version", "test_index"]) +TestException = namedtuple("TestException", ["tool_id", "exception", "was_recorded"]) + + +class Results: + + def __init__(self, default_suitename, test_json, append=False): + self.test_json = test_json or "-" + test_results = [] + test_exceptions = [] + suitename = default_suitename + if append: + assert test_json != "-" + with open(test_json) as f: + previous_results = json.load(f) + test_results = previous_results["tests"] + if "suitename" in previous_results: + suitename = previous_results["suitename"] + self.test_results = test_results + self.test_exceptions = test_exceptions + self.suitename = suitename + + def register_result(self, result): + self.test_results.append(result) + + def register_exception(self, test_exception): + self.test_exceptions.append(test_exception) + + def already_successful(self, test_reference): + test_id = _test_id_for_reference(test_reference) + for test_result in self.test_results: + if test_result.get('id') != test_id: + continue + + has_data = test_result.get('has_data', False) + if has_data: + test_data = test_result.get("data", {}) + if 'status' in test_data and test_data['status'] == 'success': + return True + + return False + + def write(self): + tests = sorted(self.test_results, key=lambda el: el['id']) + n_passed, n_failures, n_skips = 0, 0, 0 + n_errors = len([e for e in self.test_exceptions if not e.was_recorded]) + for test in tests: + has_data = test.get('has_data', False) + if has_data: + test_data = test.get("data", {}) + if 'status' not in test_data: + raise Exception(f"Test result data {test_data} doesn't contain a status key.") + status = test_data['status'] + if status == "success": + n_passed += 1 + elif status == "error": + n_errors += 1 + elif status == "skip": + n_skips += 1 + elif status == "failure": + n_failures += 1 + report_obj = { + 'version': '0.1', + 'suitename': self.suitename, + 'results': { + 'total': n_passed + n_failures + n_skips + n_errors, + 'errors': n_errors, + 'failures': n_failures, + 'skips': n_skips, + }, + 'tests': tests, + } + if self.test_json == "-": + print(json.dumps(report_obj)) + else: + with open(self.test_json, "w") as f: + json.dump(report_obj, f) + + def info_message(self): + messages = [] + passed_tests = self._tests_with_status('success') + messages.append("Passed tool tests ({}): {}".format( + len(passed_tests), + [t["id"] for t in passed_tests] + )) + failed_tests = self._tests_with_status('failure') + messages.append("Failed tool tests ({}): {}".format( + len(failed_tests), + [t["id"] for t in failed_tests] + )) + skiped_tests = self._tests_with_status('skip') + messages.append("Skipped tool tests ({}): {}".format( + len(skiped_tests), + [t["id"] for t in skiped_tests] + )) + errored_tests = self._tests_with_status('error') + messages.append("Errored tool tests ({}): {}".format( + len(errored_tests), + [t["id"] for t in errored_tests] + )) + return "\n".join(messages) + + @property + def success_count(self): + self._tests_with_status('success') + + @property + def skip_count(self): + self._tests_with_status('skip') + + @property + def error_count(self): + return self._tests_with_status('error') + len(self.test_exceptions) + + @property + def failure_count(self): + return self._tests_with_status('failure') + + def _tests_with_status(self, status): + return [t for t in self.test_results if t.get("data", {}).get("status") == status] + + +def test_tools( + galaxy_interactor, + test_references, + results, + log=None, + parallel_tests=1, + history_per_test_case=False, + no_history_cleanup=False, + retries=0, + verify_kwds=None, +): + """Run through tool tests and write report. + + Refactor this into Galaxy in 21.01. + """ + verify_kwds = (verify_kwds or {}).copy() + tool_test_start = dt.datetime.now() + history_created = False + if history_per_test_case: + test_history = None + else: + history_created = True + test_history = galaxy_interactor.new_history(history_name=f"History for {results.suitename}") + verify_kwds.update({ + "no_history_cleanup": no_history_cleanup, + "test_history": test_history, + }) + with ThreadPoolExecutor(max_workers=parallel_tests) as executor: + try: + for test_reference in test_references: + _test_tool( + executor=executor, + test_reference=test_reference, + results=results, + galaxy_interactor=galaxy_interactor, + log=log, + retries=retries, + verify_kwds=verify_kwds, + ) + finally: + # Always write report, even if test was cancelled. + try: + executor.shutdown(wait=True) + except KeyboardInterrupt: + executor._threads.clear() + thread._threads_queues.clear() + results.write() + if log: + log.info("Report written to '%s'", os.path.abspath(results.test_json)) + log.info(results.info_message()) + log.info("Total tool test time: {}".format(dt.datetime.now() - tool_test_start)) + if history_created and not no_history_cleanup: + galaxy_interactor.delete_history(test_history) + + +def _test_id_for_reference(test_reference): + tool_id = test_reference.tool_id + tool_version = test_reference.tool_version + test_index = test_reference.test_index + + if tool_version and tool_id.endswith("/" + tool_version): + tool_id = tool_id[:-len("/" + tool_version)] + + label_base = tool_id + if tool_version: + label_base += "/" + str(tool_version) + + test_id = label_base + "-" + str(test_index) + return test_id + + +def _test_tool( + executor, + test_reference, + results, + galaxy_interactor, + log, + retries, + verify_kwds, +): + tool_id = test_reference.tool_id + tool_version = test_reference.tool_version + test_index = test_reference.test_index + # If given a tool_id with a version suffix, strip it off so we can treat tool_version + # correctly at least in client_test_config. + if tool_version and tool_id.endswith("/" + tool_version): + tool_id = tool_id[:-len("/" + tool_version)] + + test_id = _test_id_for_reference(test_reference) + + def run_test(): + run_retries = retries + job_data = None + job_exception = None + + def register(job_data_): + nonlocal job_data + job_data = job_data_ + + try: + while run_retries >= 0: + job_exception = None + try: + if log: + log.info("Executing test '%s'", test_id) + verify_tool( + tool_id, galaxy_interactor, test_index=test_index, tool_version=tool_version, + register_job_data=register, **verify_kwds + ) + if log: + log.info("Test '%s' passed", test_id) + break + except Exception as e: + if log: + log.warning("Test '%s' failed", test_id, exc_info=True) + + job_exception = e + run_retries -= 1 + finally: + if job_data is not None: + results.register_result({ + "id": test_id, + "has_data": True, + "data": job_data, + }) + if job_exception is not None: + was_recorded = job_data is not None + test_exception = TestException(tool_id, job_exception, was_recorded) + results.register_exception(test_exception) + + executor.submit(run_test) + + +def build_case_references( + galaxy_interactor, + tool_id=ALL_TOOLS, + tool_version=LATEST_VERSION, + test_index=ALL_TESTS, + page_size=0, + page_number=0, + check_against=None, + log=None, +): + test_references = [] + if tool_id == ALL_TOOLS: + tests_summary = galaxy_interactor.get_tests_summary() + for tool_id, tool_versions_dict in tests_summary.items(): + for tool_version, summary in tool_versions_dict.items(): + for test_index in range(summary["count"]): + test_reference = TestReference(tool_id, tool_version, test_index) + test_references.append(test_reference) + else: + assert tool_id + tool_test_dicts = galaxy_interactor.get_tool_tests(tool_id, tool_version=tool_version) or {} + for i, tool_test_dict in enumerate(tool_test_dicts): + this_tool_version = tool_test_dict.get("tool_version", tool_version) + this_test_index = i + if test_index == ALL_TESTS or i == test_index: + test_reference = TestReference(tool_id, this_tool_version, this_test_index) + test_references.append(test_reference) + + if check_against: + filtered_test_references = [] + for test_reference in test_references: + if check_against.already_successful(test_reference): + if log is not None: + log.debug(f"Found successful test for {test_reference}, skipping") + continue + filtered_test_references.append(test_reference) + log.info(f"Skipping {len(test_references)-len(filtered_test_references)} out of {len(test_references)} tests.") + test_references = filtered_test_references + + if page_size > 0: + slice_start = page_size * page_number + slice_end = page_size * (page_number + 1) + test_references = test_references[slice_start:slice_end] + + return test_references + + +def main(argv=None): + if argv is None: + argv = sys.argv[1:] + + args = _arg_parser().parse_args(argv) + log = setup_global_logger(__name__, verbose=args.verbose) + client_test_config_path = args.client_test_config + if client_test_config_path is not None: + log.debug(f"Reading client config path {client_test_config_path}") + with open(client_test_config_path) as f: + client_test_config = yaml.full_load(f) + else: + client_test_config = {} + + def get_option(key): + arg_val = getattr(args, key, None) + if arg_val is None and key in client_test_config: + val = client_test_config.get(key) + else: + val = arg_val + return val + + output_json_path = get_option("output_json") + galaxy_interactor_kwds = { + "galaxy_url": get_option("galaxy_url"), + "master_api_key": get_option("admin_key"), + "api_key": get_option("key"), + "keep_outputs_dir": args.output, + "download_attempts": get_option("download_attempts"), + "download_sleep": get_option("download_sleep"), + } + tool_id = args.tool_id + tool_version = args.tool_version + tools_client_test_config = DictClientTestConfig(client_test_config.get("tools")) + verbose = args.verbose + + galaxy_interactor = GalaxyInteractorApi(**galaxy_interactor_kwds) + results = Results(args.suite_name, output_json_path, append=args.append) + check_against = None if not args.skip_successful else results + test_references = build_case_references( + galaxy_interactor, + tool_id=tool_id, + tool_version=tool_version, + test_index=args.test_index, + page_size=args.page_size, + page_number=args.page_number, + check_against=check_against, + log=log, + ) + log.debug(f"Built {len(test_references)} test references to executed.") + verify_kwds = dict( + client_test_config=tools_client_test_config, + force_path_paste=args.force_path_paste, + skip_with_reference_data=not args.with_reference_data, + quiet=not verbose, + ) + test_tools( + galaxy_interactor, + test_references, + results, + log=log, + parallel_tests=args.parallel_tests, + history_per_test_case=args.history_per_test_case, + no_history_cleanup=args.no_history_cleanup, + verify_kwds=verify_kwds, + ) + exceptions = results.test_exceptions + if exceptions: + exception = exceptions[0] + if hasattr(exception, "exception"): + exception = exception.exception + raise exception + + +def setup_global_logger(name, log_file=None, verbose=False): + formatter = logging.Formatter('%(asctime)s %(levelname)-5s - %(message)s') + console = logging.StreamHandler() + console.setFormatter(formatter) + + logger = logging.getLogger(name) + logger.setLevel(logging.DEBUG if verbose else logging.INFO) + logger.addHandler(console) + + if not log_file: + # delete = false is chosen here because it is always nice to have a log file + # ready if you need to debug. Not having the "if only I had set a log file" + # moment after the fact. + temp = tempfile.NamedTemporaryFile(prefix="ephemeris_", delete=False) + log_file = temp.name + file_handler = logging.FileHandler(log_file) + logger.addHandler(file_handler) + logger.info(f"Storing log file in: {log_file}") + return logger + + +def _arg_parser(): + parser = argparse.ArgumentParser(description=DESCRIPTION) + parser.add_argument('-u', '--galaxy-url', default="http://localhost:8080", help='Galaxy URL') + parser.add_argument('-k', '--key', default=None, help='Galaxy User API Key') + parser.add_argument('-a', '--admin-key', default=None, help='Galaxy Admin API Key') + parser.add_argument('--force_path_paste', default=False, action="store_true", help='This requires Galaxy-side config option "allow_path_paste" enabled. Allows for fetching test data locally. Only for admins.') + parser.add_argument('-t', '--tool-id', default=ALL_TOOLS, help='Tool ID') + parser.add_argument('--tool-version', default=None, help='Tool Version (if tool id supplied). Defaults to just latest version, use * to test all versions') + parser.add_argument('-i', '--test-index', default=ALL_TESTS, type=int, help='Tool Test Index (starting at 0) - by default all tests will run.') + parser.add_argument('-o', '--output', default=None, help='directory to dump outputs to') + parser.add_argument('--append', default=False, action="store_true", help="Extend a test record json (created with --output-json) with additional tests.") + parser.add_argument('--skip-successful', default=False, action="store_true", help="When used with --append, skip previously run successful tests.") + parser.add_argument('-j', '--output-json', default=None, help='output metadata json') + parser.add_argument('--verbose', default=False, action="store_true", help="Verbose logging.") + parser.add_argument('-c', '--client-test-config', default=None, help="Test config YAML to help with client testing") + parser.add_argument('--suite-name', default=DEFAULT_SUITE_NAME, help="Suite name for tool test output") + parser.add_argument('--with-reference-data', dest="with_reference_data", default=False, action="store_true") + parser.add_argument('--skip-with-reference-data', dest="with_reference_data", action="store_false", help="Skip tests the Galaxy server believes use data tables or loc files.") + parser.add_argument('--history-per-suite', dest="history_per_test_case", default=False, action="store_false", help="Create new history per test suite (all tests in same history).") + parser.add_argument('--history-per-test-case', dest="history_per_test_case", action="store_true", help="Create new history per test case.") + parser.add_argument('--no-history-cleanup', default=False, action="store_true", help="Perserve histories created for testing.") + parser.add_argument('--parallel-tests', default=1, type=int, help="Parallel tests.") + parser.add_argument('--retries', default=0, type=int, help="Retry failed tests.") + parser.add_argument('--page-size', default=0, type=int, help="If positive, use pagination and just run one 'page' to tool tests.") + parser.add_argument('--page-number', default=0, type=int, help="If page size is used, run this 'page' of tests - starts with 0.") + parser.add_argument('--download-attempts', default=1, type=int, help="Galaxy may return a transient 500 status code for download if test results are written but not yet accessible.") + parser.add_argument('--download-sleep', default=1, type=int, help="If download attempts is greater than 1, the amount to sleep between download attempts.") + return parser + + +if __name__ == "__main__": + main() diff -r 67628c7dc9f3 -r d4d88d393285 toolfactory/galaxyxml/__init__.py --- a/toolfactory/galaxyxml/__init__.py Sun Nov 22 06:29:33 2020 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,64 +0,0 @@ -from builtins import object -from builtins import str - -from lxml import etree - - -class GalaxyXML(object): - def __init__(self): - self.root = etree.Element("root") - - def export(self): - return etree.tostring(self.root, pretty_print=True, encoding="unicode") - - -class Util(object): - @classmethod - def coerce(cls, data, kill_lists=False): - """Recursive data sanitisation - """ - if isinstance(data, dict): - return {k: cls.coerce(v, kill_lists=kill_lists) for k, v in list(data.items()) if v is not None} - elif isinstance(data, list): - if kill_lists: - return cls.coerce(data[0]) - else: - return [cls.coerce(v, kill_lists=kill_lists) for v in data] - else: - return cls.coerce_value(data) - - @classmethod - def coerce_value(cls, obj): - """Make everything a string! - """ - if isinstance(obj, bool): - if obj: - return "true" - else: - return "false" - elif isinstance(obj, str): - return obj - else: - return str(obj) - - @classmethod - def clean_kwargs(cls, params, final=False): - if "kwargs" in params: - kwargs = params["kwargs"] - for k in kwargs: - params[k] = kwargs[k] - del params["kwargs"] - if "self" in params: - del params["self"] - - if "__class__" in params: - del params["__class__"] - - # There will be more params, it would be NICE to use a whitelist - # instead of a blacklist, but until we have more data let's just - # blacklist stuff we see commonly. - if final: - for blacklist in ("positional",): - if blacklist in params: - del params[blacklist] - return params diff -r 67628c7dc9f3 -r d4d88d393285 toolfactory/galaxyxml/tool/__init__.py --- a/toolfactory/galaxyxml/tool/__init__.py Sun Nov 22 06:29:33 2020 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,184 +0,0 @@ -import copy -import logging - -from galaxyxml import GalaxyXML, Util -from galaxyxml.tool.parameters import XMLParam - -from lxml import etree - -VALID_TOOL_TYPES = ("data_source", "data_source_async") -VALID_URL_METHODS = ("get", "post") - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - - -class Tool(GalaxyXML): - - def __init__( - self, - name, - id, - version, - description, - executable, - hidden=False, - tool_type=None, - URL_method=None, - workflow_compatible=True, - interpreter=None, - version_command="interpreter filename.exe --version", - command_override=None, - ): - - self.executable = executable - self.interpreter = interpreter - self.command_override = command_override - kwargs = { - "name": name, - "id": id, - "version": version, - "hidden": hidden, - "workflow_compatible": workflow_compatible, - } - self.version_command = version_command - - # Remove some of the default values to make tools look a bit nicer - if not hidden: - del kwargs["hidden"] - if workflow_compatible: - del kwargs["workflow_compatible"] - - kwargs = Util.coerce(kwargs) - self.root = etree.Element("tool", **kwargs) - - if tool_type is not None: - if tool_type not in VALID_TOOL_TYPES: - raise Exception("Tool type must be one of %s" % ",".join(VALID_TOOL_TYPES)) - else: - kwargs["tool_type"] = tool_type - - if URL_method is not None: - if URL_method in VALID_URL_METHODS: - kwargs["URL_method"] = URL_method - else: - raise Exception("URL_method must be one of %s" % ",".join(VALID_URL_METHODS)) - - description_node = etree.SubElement(self.root, "description") - description_node.text = description - - def add_comment(self, comment_txt): - comment = etree.Comment(comment_txt) - self.root.insert(0, comment) - - def append_version_command(self): - version_command = etree.SubElement(self.root, "version_command") - try: - version_command.text = etree.CDATA(self.version_command) - except Exception: - pass - - def append(self, sub_node): - if issubclass(type(sub_node), XMLParam): - self.root.append(sub_node.node) - else: - self.root.append(sub_node) - - def clean_command_string(self, command_line): - clean = [] - for x in command_line: - if x is not [] and x is not [""]: - clean.append(x) - - return "\n".join(clean) - - def export(self, keep_old_command=False): # noqa - - export_xml = copy.deepcopy(self) - - try: - export_xml.append(export_xml.edam_operations) - except Exception: - pass - - try: - export_xml.append(export_xml.edam_topics) - except Exception: - pass - - try: - export_xml.append(export_xml.requirements) - except Exception: - pass - - try: - export_xml.append(export_xml.configfiles) - except Exception: - pass - - if self.command_override: - command_line = self.command_override - else: - command_line = [] - try: - command_line.append(export_xml.inputs.cli()) - except Exception as e: - logger.warning(str(e)) - - try: - command_line.append(export_xml.outputs.cli()) - except Exception: - pass - - # Add stdio section - stdio = etree.SubElement(export_xml.root, "stdio") - etree.SubElement(stdio, "exit_code", range="1:", level="fatal") - - # Append version command - export_xml.append_version_command() - - # Steal interpreter from kwargs - command_kwargs = {} - if export_xml.interpreter is not None: - command_kwargs["interpreter"] = export_xml.interpreter - - # Add command section - command_node = etree.SubElement(export_xml.root, "command", **command_kwargs) - - if keep_old_command: - if getattr(self, "command", None): - command_node.text = etree.CDATA(export_xml.command) - else: - logger.warning("The tool does not have any old command stored. " + "Only the command line is written.") - command_node.text = export_xml.executable - else: - if self.command_override: - actual_cli = export_xml.clean_command_string(command_line) - else: - actual_cli = "%s %s" % (export_xml.executable, export_xml.clean_command_string(command_line)) - command_node.text = etree.CDATA(actual_cli.strip()) - - try: - export_xml.append(export_xml.inputs) - except Exception: - pass - - try: - export_xml.append(export_xml.outputs) - except Exception: - pass - - try: - export_xml.append(export_xml.tests) - except Exception: - pass - - help_element = etree.SubElement(export_xml.root, "help") - help_element.text = etree.CDATA(export_xml.help) - - try: - export_xml.append(export_xml.citations) - except Exception: - pass - - return super(Tool, export_xml).export() diff -r 67628c7dc9f3 -r d4d88d393285 toolfactory/galaxyxml/tool/__pycache__/__init__.cpython-36.pyc Binary file toolfactory/galaxyxml/tool/__pycache__/__init__.cpython-36.pyc has changed diff -r 67628c7dc9f3 -r d4d88d393285 toolfactory/galaxyxml/tool/__pycache__/import_xml.cpython-36.pyc Binary file toolfactory/galaxyxml/tool/__pycache__/import_xml.cpython-36.pyc has changed diff -r 67628c7dc9f3 -r d4d88d393285 toolfactory/galaxyxml/tool/import_xml.py --- a/toolfactory/galaxyxml/tool/import_xml.py Sun Nov 22 06:29:33 2020 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,713 +0,0 @@ -import logging -import xml.etree.ElementTree as ET - -import galaxyxml.tool as gxt -import galaxyxml.tool.parameters as gxtp - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - - -class GalaxyXmlParser(object): - """ - Class to import content from an existing Galaxy XML wrapper. - """ - - def _init_tool(self, xml_root): - """ - Init tool from existing xml tool. - - :param xml_root: root of the galaxy xml file. - :type xml_root: :class:`xml.etree._Element` - """ - version_cmd = None - description = None - for child in xml_root: - if child.tag == "description": - description = child.text - elif child.tag == "command": - executable = child.text.split()[0] - command = child.text - elif child.tag == "version_command": - version_cmd = child.text - - tool = gxt.Tool( - xml_root.attrib["name"], - xml_root.attrib["id"], - xml_root.attrib.get("version", None), - description, - executable, - hidden=xml_root.attrib.get("hidden", False), - tool_type=xml_root.attrib.get("tool_type", None), - URL_method=xml_root.attrib.get("URL_method", None), - workflow_compatible=xml_root.attrib.get("workflow_compatible", True), - version_command=version_cmd, - ) - tool.command = command - return tool - - def _load_description(self, tool, desc_root): - """ - is already loaded during initiation. - - :param tool: Tool object from galaxyxml. - :type tool: :class:`galaxyxml.tool.Tool` - :param desc_root: root of tag. - :type desc_root: :class:`xml.etree._Element` - """ - logger.info(" is loaded during initiation of the object.") - - def _load_version_command(self, tool, vers_root): - """ - is already loaded during initiation. - - :param tool: Tool object from galaxyxml. - :type tool: :class:`galaxyxml.tool.Tool` - :param vers_root: root of tag. - :type vers_root: :class:`xml.etree._Element` - """ - logger.info(" is loaded during initiation of the object.") - - def _load_stdio(self, tool, stdio_root): - """ - So far, is automatically generated by galaxyxml. - - :param tool: Tool object from galaxyxml. - :type tool: :class:`galaxyxml.tool.Tool` - :param desc_root: root of tag. - :type desc_root: :class:`xml.etree._Element` - """ - logger.info(" is not loaded but automatically generated by galaxyxml.") - - def _load_command(self, tool, desc_root): - """ - is already loaded during initiation. - - :param tool: Tool object from galaxyxml. - :type tool: :class:`galaxyxml.tool.Tool` - :param desc_root: root of tag. - :type desc_root: :class:`xml.etree._Element` - """ - logger.info(" is loaded during initiation of the object.") - - def _load_help(self, tool, help_root): - """ - Load the content of the into the tool. - - :param tool: Tool object from galaxyxml. - :type tool: :class:`galaxyxml.tool.Tool` - :param requirements_root: root of tag. - :type requirements_root: :class:`xml.etree._Element` - """ - tool.help = help_root.text - - def _load_requirements(self, tool, requirements_root): - """ - Add to the tool. - - :param tool: Tool object from galaxyxml. - :type tool: :class:`galaxyxml.tool.Tool` - :param requirements_root: root of tag. - :type requirements_root: :class:`xml.etree._Element` - """ - tool.requirements = gxtp.Requirements() - for req in requirements_root: - req_type = req.attrib["type"] - value = req.text - if req.tag == "requirement": - version = req.attrib.get("version", None) - tool.requirements.append(gxtp.Requirement(req_type, value, version=version)) - elif req.tag == "container": - tool.requirements.append(gxtp.Container(req_type, value)) - else: - logger.warning(req.tag + " is not a valid tag for requirements child") - - def _load_edam_topics(self, tool, topics_root): - """ - Add to the tool. - - :param tool: Tool object from galaxyxml. - :type tool: :class:`galaxyxml.tool.Tool` - :param topics_root: root of tag. - :type topics_root: :class:`xml.etree._Element` - """ - tool.edam_topics = gxtp.EdamTopics() - for edam_topic in topics_root: - tool.edam_topics.append(gxtp.EdamTopic(edam_topic.text)) - - def _load_edam_operations(self, tool, operations_root): - """ - Add to the tool. - - :param tool: Tool object from galaxyxml. - :type tool: :class:`galaxyxml.tool.Tool` - :param operations_root: root of tag. - :type operations_root: :class:`xml.etree._Element` - """ - tool.edam_operations = gxtp.EdamOperations() - for edam_op in operations_root: - tool.edam_operations.append(gxtp.EdamOperation(edam_op.text)) - - def _load_configfiles(self, tool, configfiles_root): - """ - Add to the tool. - - :param tool: Tool object from galaxyxml. - :type tool: :class:`galaxyxml.tool.Tool` - :param configfiles_root: root of tag. - :type configfiles_root: :class:`xml.etree._Element` - """ - tool.configfiles = gxtp.Configfiles() - for conf in configfiles_root: - name = conf.attrib["name"] - value = conf.text - tool.configfiles.append(gxtp.Configfile(name, value)) - - def _load_citations(self, tool, citations_root): - """ - Add to the tool. - - :param tool: Tool object from galaxyxml. - :type tool: :class:`galaxyxml.tool.Tool` - :param citations_root: root of tag. - :type citations_root: :class:`xml.etree._Element` - """ - tool.citations = gxtp.Citations() - for cit in citations_root: - cit_type = cit.attrib["type"] - value = cit.text - tool.citations.append(gxtp.Citation(cit_type, value)) - - def _load_inputs(self, tool, inputs_root): - """ - Add to the tool using the :class:`galaxyxml.tool.import_xml.InputsParser` object. - - :param tool: Tool object from galaxyxml. - :type tool: :class:`galaxyxml.tool.Tool` - :param inputs_root: root of tag. - :type inputs_root: :class:`xml.etree._Element` - """ - tool.inputs = gxtp.Inputs() - inp_parser = InputsParser() - inp_parser.load_inputs(tool.inputs, inputs_root) - - def _load_outputs(self, tool, outputs_root): - """ - Add to the tool using the :class:`galaxyxml.tool.import_xml.OutputsParser` object. - - :param tool: Tool object from galaxyxml. - :type tool: :class:`galaxyxml.tool.Tool` - :param outputs_root: root of tag. - :type outputs_root: :class:`xml.etree._Element` - """ - tool.outputs = gxtp.Outputs() - out_parser = OutputsParser() - out_parser.load_outputs(tool.outputs, outputs_root) - - def _load_tests(self, tool, tests_root): - """ - Add to the tool using the :class:`galaxyxml.tool.import_xml.TestsParser` object. - - :param tool: Tool object from galaxyxml. - :type tool: :class:`galaxyxml.tool.Tool` - :param tests_root: root of tag. - :type tests_root: :class:`xml.etree._Element` - """ - tool.tests = gxtp.Tests() - tests_parser = TestsParser() - tests_parser.load_tests(tool.tests, tests_root) - - def import_xml(self, xml_path): - """ - Load existing xml into the :class:`galaxyxml.tool.Tool` object. - - :param xml_path: Path of the XML to be loaded. - :type xml_path: STRING - :return: XML content in the galaxyxml model. - :rtype: :class:`galaxyxml.tool.Tool` - """ - xml_root = ET.parse(xml_path).getroot() - tool = self._init_tool(xml_root) - # Now we import each tag's field - for child in xml_root: - try: - getattr(self, "_load_{}".format(child.tag))(tool, child) - except AttributeError: - logger.warning(child.tag + " tag is not processed.") - return tool - - -class InputsParser(object): - """ - Class to parse content of the tag from a Galaxy XML wrapper. - """ - - def _load_text_param(self, root, text_param): - """ - Add to the root. - - :param root: root to append the param to. - :param text_param: root of tag. - :type text_param: :class:`xml.etree._Element` - """ - root.append( - gxtp.TextParam( - text_param.attrib["name"], - optional=text_param.get("optional", None), - label=text_param.get("label", None), - help=text_param.get("help", None), - value=text_param.get("value", None), - ) - ) - - def _load_data_param(self, root, data_param): - """ - Add to the root. - - :param root: root to append the param to. - :param data_param: root of tag. - :type data_param: :class:`xml.etree._Element` - """ - root.append( - gxtp.DataParam( - data_param.attrib["name"], - optional=data_param.attrib.get("optional", None), - label=data_param.attrib.get("label", None), - help=data_param.attrib.get("help", None), - format=data_param.attrib.get("format", None), - multiple=data_param.attrib.get("multiple", None), - ) - ) - - def _load_boolean_param(self, root, bool_param): - """ - Add to the root. - - :param root: root to append the param to. - :param bool_param: root of tag. - :type bool_param: :class:`xml.etree._Element` - """ - root.append( - gxtp.BooleanParam( - bool_param.attrib["name"], - optional=bool_param.attrib.get("optional", None), - label=bool_param.attrib.get("label", None), - help=bool_param.attrib.get("help", None), - checked=bool_param.attrib.get("checked", False), - truevalue=bool_param.attrib.get("truevalue", None), - falsevalue=bool_param.attrib.get("falsevalue", None), - ) - ) - - def _load_integer_param(self, root, int_param): - """ - Add to the root. - - :param root: root to append the param to. - :param int_param: root of tag. - :type int_param: :class:`xml.etree._Element` - """ - root.append( - gxtp.IntegerParam( - int_param.attrib["name"], - int_param.attrib.get("value", None), - optional=int_param.attrib.get("optional", None), - label=int_param.attrib.get("label", None), - help=int_param.attrib.get("help", None), - min=int_param.attrib.get("min", None), - max=int_param.attrib.get("max", None), - ) - ) - - def _load_float_param(self, root, float_param): - """ - Add to the root. - - :param root: root to append the param to. - :param float_param: root of tag. - :type float_param: :class:`xml.etree._Element` - """ - root.append( - gxtp.FloatParam( - float_param.attrib["name"], - float_param.attrib.get("value", None), - optional=float_param.attrib.get("optional", None), - label=float_param.attrib.get("label", None), - help=float_param.attrib.get("help", None), - min=float_param.attrib.get("min", None), - max=float_param.attrib.get("max", None), - ) - ) - - def _load_option_select(self, root, option): - """ - Add