Mercurial > repos > shellac > sam_consensus_v3
diff env/lib/python3.9/site-packages/galaxy/tool_util/parser/xml.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.9/site-packages/galaxy/tool_util/parser/xml.py Mon Mar 22 18:12:50 2021 +0000 @@ -0,0 +1,1173 @@ +import json +import logging +import re +import uuid +from math import isinf + +import packaging.version + +from galaxy.tool_util.deps import requirements +from galaxy.tool_util.parser.util import ( + DEFAULT_DELTA, + DEFAULT_DELTA_FRAC +) +from galaxy.util import ( + string_as_bool, + xml_text, + xml_to_string +) +from .interface import ( + InputSource, + PageSource, + PagesSource, + TestCollectionDef, + TestCollectionOutputDef, + ToolSource, +) +from .output_actions import ToolOutputActionGroup +from .output_collection_def import dataset_collector_descriptions_from_elem +from .output_objects import ( + ToolExpressionOutput, + ToolOutput, + ToolOutputCollection, + ToolOutputCollectionStructure +) +from .stdio import ( + aggressive_error_checks, + error_on_exit_code, + StdioErrorLevel, + ToolStdioExitCode, + ToolStdioRegex, +) + + +log = logging.getLogger(__name__) + + +class XmlToolSource(ToolSource): + """ Responsible for parsing a tool from classic Galaxy representation. + """ + + def __init__(self, xml_tree, source_path=None, macro_paths=None): + self.xml_tree = xml_tree + self.root = xml_tree.getroot() + self._source_path = source_path + self._macro_paths = macro_paths or [] + self.legacy_defaults = self.parse_profile() == "16.01" + + def to_string(self): + return xml_to_string(self.root) + + def parse_version(self): + return self.root.get("version", None) + + def parse_id(self): + return self.root.get("id") + + def parse_tool_module(self): + root = self.root + if root.find("type") is not None: + type_elem = root.find("type") + module = type_elem.get('module', 'galaxy.tools') + cls = type_elem.get('class') + return module, cls + + return None + + def parse_action_module(self): + root = self.root + action_elem = root.find("action") + if action_elem is not None: + module = action_elem.get('module') + cls = action_elem.get('class') + return module, cls + else: + return None + + def parse_tool_type(self): + root = self.root + if root.get('tool_type', None) is not None: + return root.get('tool_type') + + def parse_name(self): + return self.root.get("name") + + def parse_edam_operations(self): + edam_ops = self.root.find("edam_operations") + if edam_ops is None: + return [] + return [edam_op.text for edam_op in edam_ops.findall("edam_operation")] + + def parse_edam_topics(self): + edam_topics = self.root.find("edam_topics") + if edam_topics is None: + return [] + return [edam_topic.text for edam_topic in edam_topics.findall("edam_topic")] + + def parse_xrefs(self): + xrefs = self.root.find("xrefs") + if xrefs is None: + return [] + return [dict(value=xref.text.strip(), reftype=xref.attrib['type']) for xref in xrefs.findall("xref") if xref.get("type")] + + def parse_description(self): + return xml_text(self.root, "description") + + def parse_is_multi_byte(self): + return self._get_attribute_as_bool("is_multi_byte", self.default_is_multi_byte) + + def parse_display_interface(self, default): + return self._get_attribute_as_bool("display_interface", default) + + def parse_require_login(self, default): + return self._get_attribute_as_bool("require_login", default) + + def parse_request_param_translation_elem(self): + return self.root.find("request_param_translation") + + def parse_command(self): + command_el = self._command_el + return ((command_el is not None) and command_el.text) or None + + def parse_expression(self): + """ Return string containing command to run. + """ + expression_el = self.root.find("expression") + if expression_el is not None: + expression_type = expression_el.get("type") + if expression_type != "ecma5.1": + raise Exception("Unknown expression type [%s] encountered" % expression_type) + return expression_el.text + return None + + def parse_environment_variables(self): + environment_variables_el = self.root.find("environment_variables") + if environment_variables_el is None: + return [] + + environment_variables = [] + for environment_variable_el in environment_variables_el.findall("environment_variable"): + template = environment_variable_el.text + inject = environment_variable_el.get("inject") + if inject: + assert not template, "Cannot specify inject and environment variable template." + assert inject in ["api_key"] + if template: + assert not inject, "Cannot specify inject and environment variable template." + definition = { + "name": environment_variable_el.get("name"), + "template": template, + "inject": inject, + "strip": string_as_bool(environment_variable_el.get("strip", False)), + } + environment_variables.append( + definition + ) + return environment_variables + + def parse_home_target(self): + target = "job_home" if self.parse_profile() >= "18.01" else "shared_home" + command_el = self._command_el + command_legacy = (command_el is not None) and command_el.get("use_shared_home", None) + if command_legacy is not None: + target = "shared_home" if string_as_bool(command_legacy) else "job_home" + return target + + def parse_tmp_target(self): + # Default to not touching TMPDIR et. al. but if job_tmp is set + # in job_conf then do. This is a very conservative approach that shouldn't + # break or modify any configurations by default. + return "job_tmp_if_explicit" + + def parse_interpreter(self): + interpreter = None + command_el = self._command_el + if command_el is not None: + interpreter = command_el.get("interpreter", None) + if interpreter and not self.legacy_defaults: + log.warning("Deprecated interpreter attribute on command element is now ignored.") + interpreter = None + return interpreter + + def parse_version_command(self): + version_cmd = self.root.find("version_command") + if version_cmd is not None: + return version_cmd.text + else: + return None + + def parse_version_command_interpreter(self): + if self.parse_version_command() is not None: + version_cmd = self.root.find("version_command") + version_cmd_interpreter = version_cmd.get("interpreter", None) + if version_cmd_interpreter: + return version_cmd_interpreter + return None + + def parse_parallelism(self): + parallelism = self.root.find("parallelism") + parallelism_info = None + if parallelism is not None and parallelism.get("method"): + return ParallelismInfo(parallelism) + return parallelism_info + + def parse_interactivetool(self): + interactivetool_el = self.root.find("entry_points") + rtt = [] + if interactivetool_el is None: + return rtt + for ep_el in interactivetool_el.findall("entry_point"): + port = ep_el.find("port") + assert port is not None, ValueError('A port is required for InteractiveTools') + port = port.text.strip() + url = ep_el.find("url") + if url is not None: + url = url.text.strip() + name = ep_el.get('name', None) + if name: + name = name.strip() + requires_domain = string_as_bool(ep_el.attrib.get("requires_domain", False)) + rtt.append(dict(port=port, url=url, name=name, requires_domain=requires_domain)) + return rtt + + def parse_hidden(self): + hidden = xml_text(self.root, "hidden") + if hidden: + hidden = string_as_bool(hidden) + return hidden + + def parse_redirect_url_params_elem(self): + return self.root.find("redirect_url_params") + + def parse_sanitize(self): + return self._get_option_value("sanitize", True) + + def parse_refresh(self): + return self._get_option_value("refresh", False) + + def _get_option_value(self, key, default): + root = self.root + for option_elem in root.findall("options"): + if key in option_elem.attrib: + return string_as_bool(option_elem.get(key)) + return default + + @property + def _command_el(self): + return self.root.find("command") + + def _get_attribute_as_bool(self, attribute, default, elem=None): + if elem is None: + elem = self.root + return string_as_bool(elem.get(attribute, default)) + + def parse_requirements_and_containers(self): + return requirements.parse_requirements_from_xml(self.root) + + def parse_input_pages(self): + return XmlPagesSource(self.root) + + def parse_provided_metadata_style(self): + style = None + out_elem = self.root.find("outputs") + if out_elem is not None and "provided_metadata_style" in out_elem.attrib: + style = out_elem.attrib["provided_metadata_style"] + + if style is None: + style = "legacy" if self.parse_profile() < "17.09" else "default" + + assert style in ["legacy", "default"] + return style + + def parse_provided_metadata_file(self): + provided_metadata_file = "galaxy.json" + out_elem = self.root.find("outputs") + if out_elem is not None and "provided_metadata_file" in out_elem.attrib: + provided_metadata_file = out_elem.attrib["provided_metadata_file"] + + return provided_metadata_file + + def parse_outputs(self, tool): + out_elem = self.root.find("outputs") + outputs = {} + output_collections = {} + if out_elem is None: + return outputs, output_collections + + data_dict = {} + + def _parse(data_elem, **kwds): + output_def = self._parse_output(data_elem, tool, **kwds) + data_dict[output_def.name] = output_def + return output_def + + for _ in out_elem.findall("data"): + _parse(_) + + def _parse_expression(output_elem, **kwds): + output_def = self._parse_expression_output(output_elem, tool, **kwds) + output_def.filters = output_elem.findall('filter') + data_dict[output_def.name] = output_def + return output_def + + def _parse_collection(collection_elem): + name = collection_elem.get("name") + label = xml_text(collection_elem, "label") + default_format = collection_elem.get("format", "data") + collection_type = collection_elem.get("type", None) + collection_type_source = collection_elem.get("type_source", None) + collection_type_from_rules = collection_elem.get("type_from_rules", None) + structured_like = collection_elem.get("structured_like", None) + inherit_format = False + inherit_metadata = False + if structured_like: + inherit_format = string_as_bool(collection_elem.get("inherit_format", None)) + inherit_metadata = string_as_bool(collection_elem.get("inherit_metadata", None)) + default_format_source = collection_elem.get("format_source", None) + default_metadata_source = collection_elem.get("metadata_source", "") + filters = collection_elem.findall('filter') + + dataset_collector_descriptions = None + if collection_elem.find("discover_datasets") is not None: + dataset_collector_descriptions = dataset_collector_descriptions_from_elem(collection_elem, legacy=False) + structure = ToolOutputCollectionStructure( + collection_type=collection_type, + collection_type_source=collection_type_source, + collection_type_from_rules=collection_type_from_rules, + structured_like=structured_like, + dataset_collector_descriptions=dataset_collector_descriptions, + ) + output_collection = ToolOutputCollection( + name, + structure, + label=label, + filters=filters, + default_format=default_format, + inherit_format=inherit_format, + inherit_metadata=inherit_metadata, + default_format_source=default_format_source, + default_metadata_source=default_metadata_source, + ) + outputs[output_collection.name] = output_collection + + for data_elem in collection_elem.findall("data"): + _parse( + data_elem, + default_format=default_format, + default_format_source=default_format_source, + default_metadata_source=default_metadata_source, + ) + + for data_elem in collection_elem.findall("data"): + output_name = data_elem.get("name") + data = data_dict[output_name] + assert data + del data_dict[output_name] + output_collection.outputs[output_name] = data + output_collections[name] = output_collection + + for out_child in out_elem: + if out_child.tag == "data": + _parse(out_child) + elif out_child.tag == "collection": + _parse_collection(out_child) + elif out_child.tag == "output": + output_type = out_child.get("type") + if output_type == "data": + _parse(out_child) + elif output_type == "collection": + out_child.attrib["type"] = out_child.get("collection_type") + out_child.attrib["type_source"] = out_child.get("collection_type_source") + _parse_collection(out_child) + else: + _parse_expression(out_child) + else: + log.warning("Unknown output tag encountered [%s]" % out_child.tag) + + for output_def in data_dict.values(): + outputs[output_def.name] = output_def + return outputs, output_collections + + def _parse_output( + self, + data_elem, + tool, + default_format="data", + default_format_source=None, + default_metadata_source="", + expression_type=None, + ): + from_expression = data_elem.get("from") + output = ToolOutput(data_elem.get("name"), from_expression=from_expression) + output_format = data_elem.get("format", default_format) + auto_format = string_as_bool(data_elem.get("auto_format", "false")) + if auto_format and output_format != "data": + raise ValueError("Setting format and auto_format is not supported at this time.") + elif auto_format: + output_format = "_sniff_" + output.format = output_format + output.change_format = data_elem.findall("change_format") + output.format_source = data_elem.get("format_source", default_format_source) + output.default_identifier_source = data_elem.get("default_identifier_source", 'None') + output.metadata_source = data_elem.get("metadata_source", default_metadata_source) + output.parent = data_elem.get("parent", None) + output.label = xml_text(data_elem, "label") + output.count = int(data_elem.get("count", 1)) + output.filters = data_elem.findall('filter') + output.tool = tool + output.from_work_dir = data_elem.get("from_work_dir", None) + output.hidden = string_as_bool(data_elem.get("hidden", "")) + output.actions = ToolOutputActionGroup(output, data_elem.find('actions')) + output.dataset_collector_descriptions = dataset_collector_descriptions_from_elem(data_elem, legacy=self.legacy_defaults) + return output + + def _parse_expression_output(self, output_elem, tool, **kwds): + output_type = output_elem.get("type") + from_expression = output_elem.get("from") + output = ToolExpressionOutput( + output_elem.get("name"), + output_type, + from_expression, + ) + output.path = output_elem.get("value") + output.label = xml_text(output_elem, "label") + + output.hidden = string_as_bool(output_elem.get("hidden", "")) + output.actions = ToolOutputActionGroup(output, output_elem.find('actions')) + output.dataset_collector_descriptions = [] + return output + + def parse_stdio(self): + """ + parse error handling from command and stdio tag + + returns list of exit codes, list of regexes + + - exit_codes contain all non-zero exit codes (:-1 and 1:) if + detect_errors is default (if not legacy), exit_code, or aggressive + - the oom_exit_code if given and detect_errors is exit_code + - exit codes and regexes from the stdio tag + these are prepended to the list, i.e. are evaluated prior to regexes + and exit codes derived from the properties of the command tag. + thus more specific regexes of the same or more severe error level + are triggered first. + + """ + + command_el = self._command_el + detect_errors = None + if command_el is not None: + detect_errors = command_el.get("detect_errors") + + if detect_errors and detect_errors != "default": + if detect_errors == "exit_code": + oom_exit_code = None + if command_el is not None: + oom_exit_code = command_el.get("oom_exit_code", None) + if oom_exit_code is not None: + int(oom_exit_code) + exit_codes, regexes = error_on_exit_code(out_of_memory_exit_code=oom_exit_code) + elif detect_errors == "aggressive": + exit_codes, regexes = aggressive_error_checks() + else: + raise ValueError("Unknown detect_errors value encountered [%s]" % detect_errors) + elif len(self.root.findall('stdio')) == 0 and not self.legacy_defaults: + exit_codes, regexes = error_on_exit_code() + else: + exit_codes = [] + regexes = [] + + if len(self.root.findall('stdio')) > 0: + parser = StdioParser(self.root) + exit_codes = parser.stdio_exit_codes + exit_codes + regexes = parser.stdio_regexes + regexes + + return exit_codes, regexes + + def parse_strict_shell(self): + command_el = self._command_el + if packaging.version.parse(self.parse_profile()) < packaging.version.parse('20.09'): + default = "False" + else: + default = "True" + if command_el is not None: + return string_as_bool(command_el.get("strict", default)) + else: + return string_as_bool(default) + + def parse_help(self): + help_elem = self.root.find('help') + return help_elem.text if help_elem is not None else None + + @property + def macro_paths(self): + return self._macro_paths + + @property + def source_path(self): + return self._source_path + + def parse_tests_to_dict(self): + tests_elem = self.root.find("tests") + tests = [] + rval = dict( + tests=tests + ) + + if tests_elem is not None: + for i, test_elem in enumerate(tests_elem.findall("test")): + profile = self.parse_profile() + tests.append(_test_elem_to_dict(test_elem, i, profile)) + + return rval + + def parse_profile(self): + # Pre-16.04 or default XML defaults + # - Use standard error for error detection. + # - Don't run shells with -e + # - Auto-check for implicit multiple outputs. + # - Auto-check for $param_file. + # - Enable buggy interpreter attribute. + return self.root.get("profile", "16.01") + + def parse_license(self): + return self.root.get("license") + + def parse_python_template_version(self): + python_template_version = self.root.get("python_template_version", None) + if python_template_version is not None: + python_template_version = packaging.version.parse(python_template_version) + return python_template_version + + def parse_creator(self): + creators_el = self.root.find("creator") + if creators_el is None: + return None + + creators = [] + for creator_el in creators_el: + creator_as_dict = {} + if creator_el.tag == "person": + clazz = "Person" + elif creator_el.tag == "organization": + clazz = "Organization" + else: + continue + creator_as_dict["class"] = clazz + creator_as_dict.update(creator_el.attrib) + creators.append(creator_as_dict) + return creators + + +def _test_elem_to_dict(test_elem, i, profile=None): + rval = dict( + outputs=__parse_output_elems(test_elem), + output_collections=__parse_output_collection_elems(test_elem, profile=profile), + inputs=__parse_input_elems(test_elem, i), + expect_num_outputs=test_elem.get("expect_num_outputs"), + command=__parse_assert_list_from_elem(test_elem.find("assert_command")), + command_version=__parse_assert_list_from_elem(test_elem.find("assert_command_version")), + stdout=__parse_assert_list_from_elem(test_elem.find("assert_stdout")), + stderr=__parse_assert_list_from_elem(test_elem.find("assert_stderr")), + expect_exit_code=test_elem.get("expect_exit_code"), + expect_failure=string_as_bool(test_elem.get("expect_failure", False)), + maxseconds=test_elem.get("maxseconds", None), + ) + _copy_to_dict_if_present(test_elem, rval, ["num_outputs"]) + return rval + + +def __parse_input_elems(test_elem, i): + __expand_input_elems(test_elem) + return __parse_inputs_elems(test_elem, i) + + +def __parse_output_elems(test_elem): + outputs = [] + for output_elem in test_elem.findall("output"): + name, file, attributes = __parse_output_elem(output_elem) + outputs.append({"name": name, "value": file, "attributes": attributes}) + return outputs + + +def __parse_output_elem(output_elem): + attrib = dict(output_elem.attrib) + name = attrib.pop('name', None) + if name is None: + raise Exception("Test output does not have a 'name'") + + file, attributes = __parse_test_attributes(output_elem, attrib, parse_discovered_datasets=True) + return name, file, attributes + + +def __parse_command_elem(test_elem): + assert_elem = test_elem.find("command") + return __parse_assert_list_from_elem(assert_elem) + + +def __parse_output_collection_elems(test_elem, profile=None): + output_collections = [] + for output_collection_elem in test_elem.findall("output_collection"): + output_collection_def = __parse_output_collection_elem(output_collection_elem, profile=profile) + output_collections.append(output_collection_def) + return output_collections + + +def __parse_output_collection_elem(output_collection_elem, profile=None): + attrib = dict(output_collection_elem.attrib) + name = attrib.pop('name', None) + if name is None: + raise Exception("Test output collection does not have a 'name'") + element_tests = __parse_element_tests(output_collection_elem, profile=profile) + return TestCollectionOutputDef(name, attrib, element_tests).to_dict() + + +def __parse_element_tests(parent_element, profile=None): + element_tests = {} + for idx, element in enumerate(parent_element.findall("element")): + element_attrib = dict(element.attrib) + identifier = element_attrib.pop('name', None) + if identifier is None: + raise Exception("Test primary dataset does not have a 'identifier'") + element_tests[identifier] = __parse_test_attributes(element, element_attrib, parse_elements=True, profile=profile) + if profile and profile >= "20.09": + element_tests[identifier][1]["expected_sort_order"] = idx + + return element_tests + + +def __parse_test_attributes(output_elem, attrib, parse_elements=False, parse_discovered_datasets=False, profile=None): + assert_list = __parse_assert_list(output_elem) + + # Allow either file or value to specify a target file to compare result with + # file was traditionally used by outputs and value by extra files. + file = attrib.pop('file', attrib.pop('value', None)) + + # File no longer required if an list of assertions was present. + attributes = {} + + if 'value_json' in attrib: + attributes['object'] = json.loads(attrib.pop('value_json')) + + # Method of comparison + attributes['compare'] = attrib.pop('compare', 'diff').lower() + # Number of lines to allow to vary in logs (for dates, etc) + attributes['lines_diff'] = int(attrib.pop('lines_diff', '0')) + # Allow a file size to vary if sim_size compare + attributes['delta'] = int(attrib.pop('delta', DEFAULT_DELTA)) + attributes['delta_frac'] = float(attrib['delta_frac']) if 'delta_frac' in attrib else DEFAULT_DELTA_FRAC + attributes['sort'] = string_as_bool(attrib.pop('sort', False)) + attributes['decompress'] = string_as_bool(attrib.pop('decompress', False)) + extra_files = [] + if 'ftype' in attrib: + attributes['ftype'] = attrib['ftype'] + for extra in output_elem.findall('extra_files'): + extra_files.append(__parse_extra_files_elem(extra)) + metadata = {} + for metadata_elem in output_elem.findall('metadata'): + metadata[metadata_elem.get('name')] = metadata_elem.get('value') + md5sum = attrib.get("md5", None) + checksum = attrib.get("checksum", None) + element_tests = {} + if parse_elements: + element_tests = __parse_element_tests(output_elem, profile=profile) + + primary_datasets = {} + if parse_discovered_datasets: + for primary_elem in (output_elem.findall("discovered_dataset") or []): + primary_attrib = dict(primary_elem.attrib) + designation = primary_attrib.pop('designation', None) + if designation is None: + raise Exception("Test primary dataset does not have a 'designation'") + primary_datasets[designation] = __parse_test_attributes(primary_elem, primary_attrib) + + has_checksum = md5sum or checksum + has_nested_tests = extra_files or element_tests or primary_datasets + has_object = 'object' in attributes + if not (assert_list or file or metadata or has_checksum or has_nested_tests or has_object): + raise Exception("Test output defines nothing to check (e.g. must have a 'file' check against, assertions to check, metadata or checksum tests, etc...)") + attributes['assert_list'] = assert_list + attributes['extra_files'] = extra_files + attributes['metadata'] = metadata + attributes['md5'] = md5sum + attributes['checksum'] = checksum + attributes['elements'] = element_tests + attributes['primary_datasets'] = primary_datasets + return file, attributes + + +def __parse_assert_list(output_elem): + assert_elem = output_elem.find("assert_contents") + return __parse_assert_list_from_elem(assert_elem) + + +def __parse_assert_list_from_elem(assert_elem): + assert_list = None + + def convert_elem(elem): + """ Converts and XML element to a dictionary format, used by assertion checking code. """ + tag = elem.tag + attributes = dict(elem.attrib) + converted_children = [] + for child_elem in elem: + converted_children.append(convert_elem(child_elem)) + return {"tag": tag, "attributes": attributes, "children": converted_children} + if assert_elem is not None: + assert_list = [] + for assert_child in list(assert_elem): + assert_list.append(convert_elem(assert_child)) + + return assert_list + + +def __parse_extra_files_elem(extra): + # File or directory, when directory, compare basename + # by basename + attrib = dict(extra.attrib) + extra_type = attrib.pop('type', 'file') + extra_name = attrib.pop('name', None) + assert extra_type == 'directory' or extra_name is not None, \ + 'extra_files type (%s) requires a name attribute' % extra_type + extra_value, extra_attributes = __parse_test_attributes(extra, attrib) + return { + "value": extra_value, + "name": extra_name, + "type": extra_type, + "attributes": extra_attributes + } + + +def __expand_input_elems(root_elem, prefix=""): + __append_prefix_to_params(root_elem, prefix) + + repeat_elems = root_elem.findall('repeat') + indices = {} + for repeat_elem in repeat_elems: + name = repeat_elem.get("name") + if name not in indices: + indices[name] = 0 + index = 0 + else: + index = indices[name] + 1 + indices[name] = index + + new_prefix = __prefix_join(prefix, name, index=index) + __expand_input_elems(repeat_elem, new_prefix) + __pull_up_params(root_elem, repeat_elem) + + cond_elems = root_elem.findall('conditional') + for cond_elem in cond_elems: + new_prefix = __prefix_join(prefix, cond_elem.get("name")) + __expand_input_elems(cond_elem, new_prefix) + __pull_up_params(root_elem, cond_elem) + + section_elems = root_elem.findall('section') + for section_elem in section_elems: + new_prefix = __prefix_join(prefix, section_elem.get("name")) + __expand_input_elems(section_elem, new_prefix) + __pull_up_params(root_elem, section_elem) + + +def __append_prefix_to_params(elem, prefix): + for param_elem in elem.findall('param'): + param_elem.set("name", __prefix_join(prefix, param_elem.get("name"))) + + +def __pull_up_params(parent_elem, child_elem): + for param_elem in child_elem.findall('param'): + parent_elem.append(param_elem) + + +def __prefix_join(prefix, name, index=None): + name = name if index is None else "%s_%d" % (name, index) + return name if not prefix else f"{prefix}|{name}" + + +def _copy_to_dict_if_present(elem, rval, attributes): + for attribute in attributes: + if attribute in elem.attrib: + rval[attribute] = elem.get(attribute) + return rval + + +def __parse_inputs_elems(test_elem, i): + raw_inputs = [] + for param_elem in test_elem.findall("param"): + raw_inputs.append(__parse_param_elem(param_elem, i)) + + return raw_inputs + + +def __parse_param_elem(param_elem, i=0): + attrib = dict(param_elem.attrib) + if 'values' in attrib: + value = attrib['values'].split(',') + elif 'value' in attrib: + value = attrib['value'] + elif 'value_json' in attrib: + value = json.loads(attrib['value_json']) + else: + value = None + + children_elem = param_elem + if children_elem is not None: + # At this time, we can assume having children only + # occurs on DataToolParameter test items but this could + # change and would cause the below parsing to change + # based upon differences in children items + attrib['metadata'] = {} + attrib['composite_data'] = [] + attrib['edit_attributes'] = [] + # Composite datasets need to be renamed uniquely + composite_data_name = None + for child in children_elem: + if child.tag == 'composite_data': + file_name = child.get("value") + attrib['composite_data'].append(file_name) + if composite_data_name is None: + # Generate a unique name; each test uses a + # fresh history. + composite_data_name = '_COMPOSITE_RENAMED_t%d_%s' \ + % (i, uuid.uuid1().hex) + elif child.tag == 'metadata': + attrib['metadata'][child.get("name")] = child.get("value") + elif child.tag == 'edit_attributes': + attrib['edit_attributes'].append(child) + elif child.tag == 'collection': + attrib['collection'] = TestCollectionDef.from_xml(child, __parse_param_elem) + if composite_data_name: + # Composite datasets need implicit renaming; + # inserted at front of list so explicit declarations + # take precedence + attrib['edit_attributes'].insert(0, {'type': 'name', 'value': composite_data_name}) + name = attrib.pop('name') + return { + "name": name, + "value": value, + "attributes": attrib + } + + +class StdioParser: + + def __init__(self, root): + try: + self.stdio_exit_codes = list() + self.stdio_regexes = list() + + # We should have a single <stdio> element, but handle the case for + # multiples. + # For every stdio element, add all of the exit_code and regex + # subelements that we find: + for stdio_elem in (root.findall('stdio')): + self.parse_stdio_exit_codes(stdio_elem) + self.parse_stdio_regexes(stdio_elem) + except Exception: + log.exception("Exception in parse_stdio!") + + def parse_stdio_exit_codes(self, stdio_elem): + """ + Parse the tool's <stdio> element's <exit_code> subelements. + This will add all of those elements, if any, to self.stdio_exit_codes. + """ + try: + # Look for all <exit_code> elements. Each exit_code element must + # have a range/value. + # Exit-code ranges have precedence over a single exit code. + # So if there are value and range attributes, we use the range + # attribute. If there is neither a range nor a value, then print + # a warning and skip to the next. + for exit_code_elem in (stdio_elem.findall("exit_code")): + exit_code = ToolStdioExitCode() + # Each exit code has an optional description that can be + # part of the "desc" or "description" attributes: + exit_code.desc = exit_code_elem.get("desc") + if exit_code.desc is None: + exit_code.desc = exit_code_elem.get("description") + # Parse the error level: + exit_code.error_level = ( + self.parse_error_level(exit_code_elem.get("level"))) + code_range = exit_code_elem.get("range", "") + if code_range is None: + code_range = exit_code_elem.get("value", "") + if code_range is None: + log.warning("Tool stdio exit codes must have a range or value") + continue + # Parse the range. We look for: + # :Y + # X: + # X:Y - Split on the colon. We do not allow a colon + # without a beginning or end, though we could. + # Also note that whitespace is eliminated. + # TODO: Turn this into a single match - it should be + # more efficient. + code_range = re.sub(r"\s", "", code_range) + code_ranges = re.split(r":", code_range) + if (len(code_ranges) == 2): + if (code_ranges[0] is None or '' == code_ranges[0]): + exit_code.range_start = float("-inf") + else: + exit_code.range_start = int(code_ranges[0]) + if (code_ranges[1] is None or '' == code_ranges[1]): + exit_code.range_end = float("inf") + else: + exit_code.range_end = int(code_ranges[1]) + # If we got more than one colon, then ignore the exit code. + elif (len(code_ranges) > 2): + log.warning("Invalid tool exit_code range %s - ignored" + % code_range) + continue + # Else we have a singular value. If it's not an integer, then + # we'll just write a log message and skip this exit_code. + else: + try: + exit_code.range_start = int(code_range) + except Exception: + log.error(code_range) + log.warning("Invalid range start for tool's exit_code %s: exit_code ignored" % code_range) + continue + exit_code.range_end = exit_code.range_start + # TODO: Check if we got ">", ">=", "<", or "<=": + # Check that the range, regardless of how we got it, + # isn't bogus. If we have two infinite values, then + # the start must be -inf and the end must be +inf. + # So at least warn about this situation: + if isinf(exit_code.range_start) and isinf(exit_code.range_end): + log.warning("Tool exit_code range %s will match on all exit codes" % code_range) + self.stdio_exit_codes.append(exit_code) + except Exception: + log.exception("Exception in parse_stdio_exit_codes!") + + def parse_stdio_regexes(self, stdio_elem): + """ + Look in the tool's <stdio> elem for all <regex> subelements + that define how to look for warnings and fatal errors in + stdout and stderr. This will add all such regex elements + to the Tols's stdio_regexes list. + """ + try: + # Look for every <regex> subelement. The regular expression + # will have "match" and "source" (or "src") attributes. + for regex_elem in (stdio_elem.findall("regex")): + # TODO: Fill in ToolStdioRegex + regex = ToolStdioRegex() + # Each regex has an optional description that can be + # part of the "desc" or "description" attributes: + regex.desc = regex_elem.get("desc") + if regex.desc is None: + regex.desc = regex_elem.get("description") + # Parse the error level + regex.error_level = ( + self.parse_error_level(regex_elem.get("level"))) + regex.match = regex_elem.get("match", "") + if regex.match is None: + # TODO: Convert the offending XML element to a string + log.warning("Ignoring tool's stdio regex element %s - " + "the 'match' attribute must exist") + continue + # Parse the output sources. We look for the "src", "source", + # and "sources" attributes, in that order. If there is no + # such source, then the source defaults to stderr & stdout. + # Look for a comma and then look for "err", "error", "out", + # and "output": + output_srcs = regex_elem.get("src") + if output_srcs is None: + output_srcs = regex_elem.get("source") + if output_srcs is None: + output_srcs = regex_elem.get("sources") + if output_srcs is None: + output_srcs = "output,error" + output_srcs = re.sub(r"\s", "", output_srcs) + src_list = re.split(r",", output_srcs) + # Just put together anything to do with "out", including + # "stdout", "output", etc. Repeat for "stderr", "error", + # and anything to do with "err". If neither stdout nor + # stderr were specified, then raise a warning and scan both. + for src in src_list: + if re.search("both", src, re.IGNORECASE): + regex.stdout_match = True + regex.stderr_match = True + if re.search("out", src, re.IGNORECASE): + regex.stdout_match = True + if re.search("err", src, re.IGNORECASE): + regex.stderr_match = True + if (not regex.stdout_match and not regex.stderr_match): + log.warning("Tool id %s: unable to determine if tool " + "stream source scanning is output, error, " + "or both. Defaulting to use both." % self.id) + regex.stdout_match = True + regex.stderr_match = True + self.stdio_regexes.append(regex) + except Exception: + log.exception("Exception in parse_stdio_exit_codes!") + + # TODO: This method doesn't have to be part of the Tool class. + def parse_error_level(self, err_level): + """ + Parses error level and returns error level enumeration. If + unparsable, returns 'fatal' + """ + return_level = StdioErrorLevel.FATAL + try: + if err_level: + if (re.search("log", err_level, re.IGNORECASE)): + return_level = StdioErrorLevel.LOG + elif (re.search("qc", err_level, re.IGNORECASE)): + return_level = StdioErrorLevel.QC + elif (re.search("warning", err_level, re.IGNORECASE)): + return_level = StdioErrorLevel.WARNING + elif (re.search("fatal_oom", err_level, re.IGNORECASE)): + return_level = StdioErrorLevel.FATAL_OOM + elif (re.search("fatal", err_level, re.IGNORECASE)): + return_level = StdioErrorLevel.FATAL + else: + log.debug("Tool %s: error level %s did not match log/warning/fatal" % + (self.id, err_level)) + except Exception: + log.exception("Exception in parse_error_level") + return return_level + + +class XmlPagesSource(PagesSource): + + def __init__(self, root): + self.input_elem = root.find("inputs") + page_sources = [] + if self.input_elem is not None: + pages_elem = self.input_elem.findall("page") + for page in (pages_elem or [self.input_elem]): + page_sources.append(XmlPageSource(page)) + super().__init__(page_sources) + + @property + def inputs_defined(self): + return self.input_elem is not None + + +class XmlPageSource(PageSource): + + def __init__(self, parent_elem): + self.parent_elem = parent_elem + + def parse_display(self): + display_elem = self.parent_elem.find("display") + if display_elem is not None: + display = xml_to_string(display_elem) + else: + display = None + return display + + def parse_input_sources(self): + return map(XmlInputSource, self.parent_elem) + + +class XmlInputSource(InputSource): + + def __init__(self, input_elem): + self.input_elem = input_elem + self.input_type = self.input_elem.tag + + def parse_input_type(self): + return self.input_type + + def elem(self): + return self.input_elem + + def get(self, key, value=None): + return self.input_elem.get(key, value) + + def get_bool(self, key, default): + return string_as_bool(self.get(key, default)) + + def parse_label(self): + return xml_text(self.input_elem, "label") + + def parse_help(self): + return xml_text(self.input_elem, "help") + + def parse_sanitizer_elem(self): + return self.input_elem.find("sanitizer") + + def parse_validator_elems(self): + return self.input_elem.findall("validator") + + def parse_dynamic_options_elem(self): + """ Return a galaxy.tools.parameters.dynamic_options.DynamicOptions + if appropriate. + """ + options_elem = self.input_elem.find('options') + return options_elem + + def parse_static_options(self): + static_options = list() + elem = self.input_elem + for option in elem.findall("option"): + value = option.get("value") + selected = string_as_bool(option.get("selected", False)) + static_options.append((option.text or value, value, selected)) + return static_options + + def parse_optional(self, default=None): + """ Return boolean indicating whether parameter is optional. """ + elem = self.input_elem + if self.get('type') == "data_column": + # Allow specifing force_select for backward compat., but probably + # should use optional going forward for consistency with other + # parameters. + if "force_select" in elem.attrib: + force_select = string_as_bool(elem.get("force_select")) + else: + force_select = not string_as_bool(elem.get("optional", False)) + return not force_select + + if default is None: + default = self.default_optional + return self.get_bool("optional", default) + + def parse_conversion_tuples(self): + elem = self.input_elem + conversions = [] + for conv_elem in elem.findall("conversion"): + name = conv_elem.get("name") # name for commandline substitution + conv_extensions = conv_elem.get("type") # target datatype extension + conversions.append((name, conv_extensions)) + return conversions + + def parse_nested_inputs_source(self): + elem = self.input_elem + return XmlPageSource(elem) + + def parse_test_input_source(self): + elem = self.input_elem + input_elem = elem.find("param") + assert input_elem is not None, "<conditional> must have a child <param>" + return XmlInputSource(input_elem) + + def parse_when_input_sources(self): + elem = self.input_elem + + sources = [] + for case_elem in elem.findall("when"): + value = case_elem.get("value") + case_page_source = XmlPageSource(case_elem) + sources.append((value, case_page_source)) + return sources + + +class ParallelismInfo: + """ + Stores the information (if any) for running multiple instances of the tool in parallel + on the same set of inputs. + """ + + def __init__(self, tag): + self.method = tag.get('method') + if isinstance(tag, dict): + items = tag.items() + else: + items = tag.attrib.items() + self.attributes = dict([item for item in items if item[0] != 'method']) + if len(self.attributes) == 0: + # legacy basic mode - provide compatible defaults + self.attributes['split_size'] = 20 + self.attributes['split_mode'] = 'number_of_parts'