Mercurial > repos > fubar > tool_factory_2
changeset 39:2cd6555baf44 draft
Uploaded
author | fubar |
---|---|
date | Wed, 12 Aug 2020 19:00:44 -0400 |
parents | a30536c100bf |
children | 51fa77152988 |
files | toolfactory/__init__.py toolfactory/galaxyxml/__init__.py toolfactory/galaxyxml/__pycache__/__init__.cpython-36.pyc toolfactory/galaxyxml/tool/__init__.py toolfactory/galaxyxml/tool/__pycache__/__init__.cpython-36.pyc toolfactory/galaxyxml/tool/import_xml.py toolfactory/galaxyxml/tool/parameters/__init__.py toolfactory/galaxyxml/tool/parameters/__pycache__/__init__.cpython-36.pyc toolfactory/rgToolFactory2.py toolfactory/rgToolFactory2.xml |
diffstat | 9 files changed, 9 insertions(+), 1701 deletions(-) [+] |
line wrap: on
line diff
--- a/toolfactory/galaxyxml/__init__.py Wed Aug 12 01:43:46 2020 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,64 +0,0 @@ -from builtins import object -from builtins import str - -from lxml import etree - - -class GalaxyXML(object): - def __init__(self): - self.root = etree.Element("root") - - def export(self): - return etree.tostring(self.root, pretty_print=True, encoding="unicode") - - -class Util(object): - @classmethod - def coerce(cls, data, kill_lists=False): - """Recursive data sanitisation - """ - if isinstance(data, dict): - return {k: cls.coerce(v, kill_lists=kill_lists) for k, v in list(data.items()) if v is not None} - elif isinstance(data, list): - if kill_lists: - return cls.coerce(data[0]) - else: - return [cls.coerce(v, kill_lists=kill_lists) for v in data] - else: - return cls.coerce_value(data) - - @classmethod - def coerce_value(cls, obj): - """Make everything a string! - """ - if isinstance(obj, bool): - if obj: - return "true" - else: - return "false" - elif isinstance(obj, str): - return obj - else: - return str(obj) - - @classmethod - def clean_kwargs(cls, params, final=False): - if "kwargs" in params: - kwargs = params["kwargs"] - for k in kwargs: - params[k] = kwargs[k] - del params["kwargs"] - if "self" in params: - del params["self"] - - if "__class__" in params: - del params["__class__"] - - # There will be more params, it would be NICE to use a whitelist - # instead of a blacklist, but until we have more data let's just - # blacklist stuff we see commonly. - if final: - for blacklist in ("positional",): - if blacklist in params: - del params[blacklist] - return params
--- a/toolfactory/galaxyxml/tool/__init__.py Wed Aug 12 01:43:46 2020 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,181 +0,0 @@ -import copy -import logging - -from galaxyxml import GalaxyXML, Util -from galaxyxml.tool.parameters import XMLParam - -from lxml import etree - -VALID_TOOL_TYPES = ("data_source", "data_source_async") -VALID_URL_METHODS = ("get", "post") - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - - -class Tool(GalaxyXML): - - def __init__( - self, - name, - id, - version, - description, - executable, - hidden=False, - tool_type=None, - URL_method=None, - workflow_compatible=True, - interpreter=None, - version_command="interpreter filename.exe --version", - command_line_override=None, - ): - - self.executable = executable - self.interpreter = interpreter - self.command_line_override = command_line_override - kwargs = { - "name": name, - "id": id, - "version": version, - "hidden": hidden, - "workflow_compatible": workflow_compatible, - } - self.version_command = version_command - - # Remove some of the default values to make tools look a bit nicer - if not hidden: - del kwargs["hidden"] - if workflow_compatible: - del kwargs["workflow_compatible"] - - kwargs = Util.coerce(kwargs) - self.root = etree.Element("tool", **kwargs) - - if tool_type is not None: - if tool_type not in VALID_TOOL_TYPES: - raise Exception("Tool type must be one of %s" % ",".join(VALID_TOOL_TYPES)) - else: - kwargs["tool_type"] = tool_type - - if URL_method is not None: - if URL_method in VALID_URL_METHODS: - kwargs["URL_method"] = URL_method - else: - raise Exception("URL_method must be one of %s" % ",".join(VALID_URL_METHODS)) - - description_node = etree.SubElement(self.root, "description") - description_node.text = description - - def add_comment(self, comment_txt): - comment = etree.Comment(comment_txt) - self.root.insert(0, comment) - - def append_version_command(self): - version_command = etree.SubElement(self.root, "version_command") - try: - version_command.text = etree.CDATA(self.version_command) - except Exception: - pass - - def append(self, sub_node): - if issubclass(type(sub_node), XMLParam): - self.root.append(sub_node.node) - else: - self.root.append(sub_node) - - def clean_command_string(self, command_line): - clean = [] - for x in command_line: - if x is not [] and x is not [""]: - clean.append(x) - - return "\n".join(clean) - - def export(self, keep_old_command=False): # noqa - - export_xml = copy.deepcopy(self) - - try: - export_xml.append(export_xml.edam_operations) - except Exception: - pass - - try: - export_xml.append(export_xml.edam_topics) - except Exception: - pass - - try: - export_xml.append(export_xml.requirements) - except Exception: - pass - - try: - export_xml.append(export_xml.configfiles) - except Exception: - pass - - if self.command_line_override: - command_line = self.command_line_override - else: - command_line = [] - try: - command_line.append(export_xml.inputs.cli()) - except Exception as e: - logger.warning(str(e)) - - try: - command_line.append(export_xml.outputs.cli()) - except Exception: - pass - - # Add stdio section - stdio = etree.SubElement(export_xml.root, "stdio") - etree.SubElement(stdio, "exit_code", range="1:", level="fatal") - - # Append version command - export_xml.append_version_command() - - # Steal interpreter from kwargs - command_kwargs = {} - if export_xml.interpreter is not None: - command_kwargs["interpreter"] = export_xml.interpreter - - # Add command section - command_node = etree.SubElement(export_xml.root, "command", **command_kwargs) - - if keep_old_command: - if getattr(self, "command", None): - command_node.text = etree.CDATA(export_xml.command) - else: - logger.warning("The tool does not have any old command stored. " + "Only the command line is written.") - command_node.text = export_xml.executable - else: - actual_cli = "%s %s" % (export_xml.executable, export_xml.clean_command_string(command_line)) - command_node.text = etree.CDATA(actual_cli.strip()) - - try: - export_xml.append(export_xml.inputs) - except Exception: - pass - - try: - export_xml.append(export_xml.outputs) - except Exception: - pass - - try: - export_xml.append(export_xml.tests) - except Exception: - pass - - help_element = etree.SubElement(export_xml.root, "help") - help_element.text = etree.CDATA(export_xml.help) - - try: - export_xml.append(export_xml.citations) - except Exception: - pass - - return super(Tool, export_xml).export()
--- a/toolfactory/galaxyxml/tool/import_xml.py Wed Aug 12 01:43:46 2020 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,713 +0,0 @@ -import logging -import xml.etree.ElementTree as ET - -import galaxyxml.tool as gxt -import galaxyxml.tool.parameters as gxtp - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - - -class GalaxyXmlParser(object): - """ - Class to import content from an existing Galaxy XML wrapper. - """ - - def _init_tool(self, xml_root): - """ - Init tool from existing xml tool. - - :param xml_root: root of the galaxy xml file. - :type xml_root: :class:`xml.etree._Element` - """ - version_cmd = None - description = None - for child in xml_root: - if child.tag == "description": - description = child.text - elif child.tag == "command": - executable = child.text.split()[0] - command = child.text - elif child.tag == "version_command": - version_cmd = child.text - - tool = gxt.Tool( - xml_root.attrib["name"], - xml_root.attrib["id"], - xml_root.attrib.get("version", None), - description, - executable, - hidden=xml_root.attrib.get("hidden", False), - tool_type=xml_root.attrib.get("tool_type", None), - URL_method=xml_root.attrib.get("URL_method", None), - workflow_compatible=xml_root.attrib.get("workflow_compatible", True), - version_command=version_cmd, - ) - tool.command = command - return tool - - def _load_description(self, tool, desc_root): - """ - <description> is already loaded during initiation. - - :param tool: Tool object from galaxyxml. - :type tool: :class:`galaxyxml.tool.Tool` - :param desc_root: root of <description> tag. - :type desc_root: :class:`xml.etree._Element` - """ - logger.info("<description> is loaded during initiation of the object.") - - def _load_version_command(self, tool, vers_root): - """ - <version_command> is already loaded during initiation. - - :param tool: Tool object from galaxyxml. - :type tool: :class:`galaxyxml.tool.Tool` - :param vers_root: root of <version_command> tag. - :type vers_root: :class:`xml.etree._Element` - """ - logger.info("<version_command> is loaded during initiation of the object.") - - def _load_stdio(self, tool, stdio_root): - """ - So far, <stdio> is automatically generated by galaxyxml. - - :param tool: Tool object from galaxyxml. - :type tool: :class:`galaxyxml.tool.Tool` - :param desc_root: root of <stdio> tag. - :type desc_root: :class:`xml.etree._Element` - """ - logger.info("<stdio> is not loaded but automatically generated by galaxyxml.") - - def _load_command(self, tool, desc_root): - """ - <command> is already loaded during initiation. - - :param tool: Tool object from galaxyxml. - :type tool: :class:`galaxyxml.tool.Tool` - :param desc_root: root of <command> tag. - :type desc_root: :class:`xml.etree._Element` - """ - logger.info("<command> is loaded during initiation of the object.") - - def _load_help(self, tool, help_root): - """ - Load the content of the <help> into the tool. - - :param tool: Tool object from galaxyxml. - :type tool: :class:`galaxyxml.tool.Tool` - :param requirements_root: root of <help> tag. - :type requirements_root: :class:`xml.etree._Element` - """ - tool.help = help_root.text - - def _load_requirements(self, tool, requirements_root): - """ - Add <requirements> to the tool. - - :param tool: Tool object from galaxyxml. - :type tool: :class:`galaxyxml.tool.Tool` - :param requirements_root: root of <requirements> tag. - :type requirements_root: :class:`xml.etree._Element` - """ - tool.requirements = gxtp.Requirements() - for req in requirements_root: - req_type = req.attrib["type"] - value = req.text - if req.tag == "requirement": - version = req.attrib.get("version", None) - tool.requirements.append(gxtp.Requirement(req_type, value, version=version)) - elif req.tag == "container": - tool.requirements.append(gxtp.Container(req_type, value)) - else: - logger.warning(req.tag + " is not a valid tag for requirements child") - - def _load_edam_topics(self, tool, topics_root): - """ - Add <edam_topics> to the tool. - - :param tool: Tool object from galaxyxml. - :type tool: :class:`galaxyxml.tool.Tool` - :param topics_root: root of <edam_topics> tag. - :type topics_root: :class:`xml.etree._Element` - """ - tool.edam_topics = gxtp.EdamTopics() - for edam_topic in topics_root: - tool.edam_topics.append(gxtp.EdamTopic(edam_topic.text)) - - def _load_edam_operations(self, tool, operations_root): - """ - Add <edam_operations> to the tool. - - :param tool: Tool object from galaxyxml. - :type tool: :class:`galaxyxml.tool.Tool` - :param operations_root: root of <edam_operations> tag. - :type operations_root: :class:`xml.etree._Element` - """ - tool.edam_operations = gxtp.EdamOperations() - for edam_op in operations_root: - tool.edam_operations.append(gxtp.EdamOperation(edam_op.text)) - - def _load_configfiles(self, tool, configfiles_root): - """ - Add <configfiles> to the tool. - - :param tool: Tool object from galaxyxml. - :type tool: :class:`galaxyxml.tool.Tool` - :param configfiles_root: root of <configfiles> tag. - :type configfiles_root: :class:`xml.etree._Element` - """ - tool.configfiles = gxtp.Configfiles() - for conf in configfiles_root: - name = conf.attrib["name"] - value = conf.text - tool.configfiles.append(gxtp.Configfile(name, value)) - - def _load_citations(self, tool, citations_root): - """ - Add <citations> to the tool. - - :param tool: Tool object from galaxyxml. - :type tool: :class:`galaxyxml.tool.Tool` - :param citations_root: root of <citations> tag. - :type citations_root: :class:`xml.etree._Element` - """ - tool.citations = gxtp.Citations() - for cit in citations_root: - cit_type = cit.attrib["type"] - value = cit.text - tool.citations.append(gxtp.Citation(cit_type, value)) - - def _load_inputs(self, tool, inputs_root): - """ - Add <inputs> to the tool using the :class:`galaxyxml.tool.import_xml.InputsParser` object. - - :param tool: Tool object from galaxyxml. - :type tool: :class:`galaxyxml.tool.Tool` - :param inputs_root: root of <inputs> tag. - :type inputs_root: :class:`xml.etree._Element` - """ - tool.inputs = gxtp.Inputs() - inp_parser = InputsParser() - inp_parser.load_inputs(tool.inputs, inputs_root) - - def _load_outputs(self, tool, outputs_root): - """ - Add <outputs> to the tool using the :class:`galaxyxml.tool.import_xml.OutputsParser` object. - - :param tool: Tool object from galaxyxml. - :type tool: :class:`galaxyxml.tool.Tool` - :param outputs_root: root of <outputs> tag. - :type outputs_root: :class:`xml.etree._Element` - """ - tool.outputs = gxtp.Outputs() - out_parser = OutputsParser() - out_parser.load_outputs(tool.outputs, outputs_root) - - def _load_tests(self, tool, tests_root): - """ - Add <tests> to the tool using the :class:`galaxyxml.tool.import_xml.TestsParser` object. - - :param tool: Tool object from galaxyxml. - :type tool: :class:`galaxyxml.tool.Tool` - :param tests_root: root of <tests> tag. - :type tests_root: :class:`xml.etree._Element` - """ - tool.tests = gxtp.Tests() - tests_parser = TestsParser() - tests_parser.load_tests(tool.tests, tests_root) - - def import_xml(self, xml_path): - """ - Load existing xml into the :class:`galaxyxml.tool.Tool` object. - - :param xml_path: Path of the XML to be loaded. - :type xml_path: STRING - :return: XML content in the galaxyxml model. - :rtype: :class:`galaxyxml.tool.Tool` - """ - xml_root = ET.parse(xml_path).getroot() - tool = self._init_tool(xml_root) - # Now we import each tag's field - for child in xml_root: - try: - getattr(self, "_load_{}".format(child.tag))(tool, child) - except AttributeError: - logger.warning(child.tag + " tag is not processed.") - return tool - - -class InputsParser(object): - """ - Class to parse content of the <inputs> tag from a Galaxy XML wrapper. - """ - - def _load_text_param(self, root, text_param): - """ - Add <param type="text" /> to the root. - - :param root: root to append the param to. - :param text_param: root of <param> tag. - :type text_param: :class:`xml.etree._Element` - """ - root.append( - gxtp.TextParam( - text_param.attrib["name"], - optional=text_param.get("optional", None), - label=text_param.get("label", None), - help=text_param.get("help", None), - value=text_param.get("value", None), - ) - ) - - def _load_data_param(self, root, data_param): - """ - Add <param type="data" /> to the root. - - :param root: root to append the param to. - :param data_param: root of <param> tag. - :type data_param: :class:`xml.etree._Element` - """ - root.append( - gxtp.DataParam( - data_param.attrib["name"], - optional=data_param.attrib.get("optional", None), - label=data_param.attrib.get("label", None), - help=data_param.attrib.get("help", None), - format=data_param.attrib.get("format", None), - multiple=data_param.attrib.get("multiple", None), - ) - ) - - def _load_boolean_param(self, root, bool_param): - """ - Add <param type="boolean" /> to the root. - - :param root: root to append the param to. - :param bool_param: root of <param> tag. - :type bool_param: :class:`xml.etree._Element` - """ - root.append( - gxtp.BooleanParam( - bool_param.attrib["name"], - optional=bool_param.attrib.get("optional", None), - label=bool_param.attrib.get("label", None), - help=bool_param.attrib.get("help", None), - checked=bool_param.attrib.get("checked", False), - truevalue=bool_param.attrib.get("truevalue", None), - falsevalue=bool_param.attrib.get("falsevalue", None), - ) - ) - - def _load_integer_param(self, root, int_param): - """ - Add <param type="integer" /> to the root. - - :param root: root to append the param to. - :param int_param: root of <param> tag. - :type int_param: :class:`xml.etree._Element` - """ - root.append( - gxtp.IntegerParam( - int_param.attrib["name"], - int_param.attrib.get("value", None), - optional=int_param.attrib.get("optional", None), - label=int_param.attrib.get("label", None), - help=int_param.attrib.get("help", None), - min=int_param.attrib.get("min", None), - max=int_param.attrib.get("max", None), - ) - ) - - def _load_float_param(self, root, float_param): - """ - Add <param type="float" /> to the root. - - :param root: root to append the param to. - :param float_param: root of <param> tag. - :type float_param: :class:`xml.etree._Element` - """ - root.append( - gxtp.FloatParam( - float_param.attrib["name"], - float_param.attrib.get("value", None), - optional=float_param.attrib.get("optional", None), - label=float_param.attrib.get("label", None), - help=float_param.attrib.get("help", None), - min=float_param.attrib.get("min", None), - max=float_param.attrib.get("max", None), - ) - ) - - def _load_option_select(self, root, option): - """ - Add <option> to the root (usually <param type="select" />). - - :param root: root to append the param to. - :param option: root of <option> tag. - :type float_param: :class:`xml.etree._Element` - """ - root.append( - gxtp.SelectOption( - option.attrib.get("value", None), option.text, selected=option.attrib.get("selected", False) - ) - ) - - def _load_column_options(self, root, column): - """ - Add <column> to the root (usually <options>). - - :param root: root to append the param to. - :param option: root of <column> tag. - :type float_param: :class:`xml.etree._Element` - """ - root.append(gxtp.Column(column.attrib["name"], column.attrib["index"])) - - def _load_filter_options(self, root, filter): - """ - Add <filter> to the root (usually <options>). - - :param root: root to append the param to. - :param option: root of <filter> tag. - :type float_param: :class:`xml.etree._Element` - """ - root.append( - gxtp.Filter( - filter.attrib["type"], - column=filter.attrib.get("column", None), - name=filter.attrib.get("name", None), - ref=filter.attrib.get("ref", None), - key=filter.attrib.get("key", None), - multiple=filter.attrib.get("multiple", None), - separator=filter.attrib.get("separator", None), - keep=filter.attrib.get("keep", None), - value=filter.attrib.get("value", None), - ref_attribute=filter.attrib.get("ref_attribute", None), - index=filter.attrib.get("index", None), - ) - ) - - def _load_options_select(self, root, options): - """ - Add <options> to the root (usually <param type="select" />). - - :param root: root to append the param to. - :param option: root of <options> tag. - :type float_param: :class:`xml.etree._Element` - """ - opts = gxtp.Options( - from_dataset=options.attrib.get("from_dataset", None), - from_file=options.attrib.get("from_file", None), - from_data_table=options.attrib.get("from_data_table", None), - from_parameter=options.attrib.get("from_parameter", None), - ) - # Deal with child nodes (usually filter and column) - for opt_child in options: - try: - getattr(self, "_load_{}_options".format(opt_child.tag))(opts, opt_child) - except AttributeError: - logger.warning(opt_child.tag + " tag is not processed for <options>.") - root.append(opts) - - def _load_select_param(self, root, sel_param): - """ - Add <param type="select" /> to the root. - - :param root: root to append the param to. - :param sel_param: root of <param> tag. - :type sel_param: :class:`xml.etree._Element` - """ - select_param = gxtp.SelectParam( - sel_param.attrib["name"], - optional=sel_param.attrib.get("optional", None), - label=sel_param.attrib.get("label", None), - help=sel_param.attrib.get("help", None), - data_ref=sel_param.attrib.get("data_ref", None), - display=sel_param.attrib.get("display", None), - multiple=sel_param.attrib.get("multiple", None), - ) - # Deal with child nodes (usually option and options) - for sel_child in sel_param: - try: - getattr(self, "_load_{}_select".format(sel_child.tag))(select_param, sel_child) - except AttributeError: - logger.warning(sel_child.tag + " tag is not processed for <param type='select'>.") - root.append(select_param) - - def _load_param(self, root, param_root): - """ - Method to select which type of <param> is being added to the root. - - :param root: root to attach param to. - :param param_root: root of <param> tag. - :type param_root: :class:`xml.etree._Element` - """ - param_type = param_root.attrib["type"] - try: - getattr(self, "_load_{}_param".format(param_type))(root, param_root) - except AttributeError: - logger.warning(param_type + " tag is not processed for <param>.") - - def _load_when(self, root, when_root): - """ - Add <when> to the root (usually <conditional>). - - :param root: root to append when to. - :param when_root: root of <when> tag. - :type when_root: :class:`xml.etree._Element` - """ - when = gxtp.When(when_root.attrib["value"]) - # Deal with child nodes - self.load_inputs(when, when_root) - root.append(when) - - def _load_conditional(self, root, conditional_root): - """ - Add <conditional> to the root. - - :param root: root to append conditional to. - :param conditional_root: root of <conditional> tag. - :type conditional_root: :class:`xml.etree._Element` - """ - value_ref_in_group = conditional_root.attrib.get("value_ref_in_group", None) - # Other optional parameters need to be added to conditional object - conditional = gxtp.Conditional( - conditional_root.attrib["name"], - value_from=conditional_root.attrib.get("value_from", None), - value_ref=conditional_root.attrib.get("value_ref", None), - value_ref_in_group=value_ref_in_group, - label=conditional_root.attrib.get("label", None), - ) - # Deal with child nodes - self.load_inputs(conditional, conditional_root) - root.append(conditional) - - def _load_section(self, root, section_root): - """ - Add <section> to the root. - - :param root: root to append conditional to. - :param section_root: root of <section> tag. - :type section_root: :class:`xml.etree._Element` - """ - section = gxtp.Section( - section_root.attrib["name"], - section_root.attrib["title"], - expanded=section_root.attrib.get("expanded", None), - help=section_root.attrib.get("help", None), - ) - # Deal with child nodes - self.load_inputs(section, section_root) - root.append(section) - - def _load_repeat(self, root, repeat_root): - """ - Add <repeat> to the root. - - :param root: root to append repeat to. - :param repeat_root: root of <repeat> tag. - :param repeat_root: :class:`xml.etree._Element` - """ - repeat = gxtp.Repeat( - repeat_root.attrib["name"], - repeat_root.attrib["title"], - min=repeat_root.attrib.get("min", None), - max=repeat_root.attrib.get("max", None), - default=repeat_root.attrib.get("default", None), - ) - # Deal with child nodes - self.load_inputs(repeat, repeat_root) - root.append(repeat) - - def load_inputs(self, root, inputs_root): - """ - Add <inputs.tag> to the root (it can be any tags with children such as - <inputs>, <repeat>, <section> ...) - - :param root: root to attach inputs to (either <inputs> or <when>). - :param inputs_root: root of <inputs> tag. - :type inputs_root: :class:`xml.etree._Element` - """ - for inp_child in inputs_root: - try: - getattr(self, "_load_{}".format(inp_child.tag))(root, inp_child) - except AttributeError: - logger.warning(inp_child.tag + " tag is not processed for <" + inputs_root.tag + "> tag.") - - -class OutputsParser(object): - """ - Class to parse content of the <outputs> tag from a Galaxy XML wrapper. - """ - - def _load_data(self, outputs_root, data_root): - """ - Add <data> to <outputs>. - - :param outputs_root: <outputs> root to append <data> to. - :param data_root: root of <data> tag. - :param data_root: :class:`xml.etree._Element` - """ - data = gxtp.OutputData( - data_root.attrib.get("name", None), - data_root.attrib.get("format", None), - format_source=data_root.attrib.get("format_source", None), - metadata_source=data_root.attrib.get("metadata_source", None), - label=data_root.attrib.get("label", None), - from_work_dir=data_root.attrib.get("from_work_dir", None), - hidden=data_root.attrib.get("hidden", False), - ) - # Deal with child nodes - for data_child in data_root: - try: - getattr(self, "_load_{}".format(data_child.tag))(data, data_child) - except AttributeError: - logger.warning(data_child.tag + " tag is not processed for <data>.") - outputs_root.append(data) - - def _load_change_format(self, root, chfmt_root): - """ - Add <change_format> to root (<data>). - - :param root: root to append <change_format> to. - :param chfm_root: root of <change_format> tag. - :param chfm_root: :class:`xml.etree._Element` - """ - change_format = gxtp.ChangeFormat() - for chfmt_child in chfmt_root: - change_format.append( - gxtp.ChangeFormatWhen( - chfmt_child.attrib["input"], chfmt_child.attrib["format"], chfmt_child.attrib["value"] - ) - ) - root.append(change_format) - - def _load_collection(self, outputs_root, coll_root): - """ - Add <collection> to <outputs>. - - :param outputs_root: <outputs> root to append <collection> to. - :param coll_root: root of <collection> tag. - :param coll_root: :class:`xml.etree._Element` - """ - collection = gxtp.OutputCollection( - coll_root.attrib["name"], - type=coll_root.attrib.get("type", None), - label=coll_root.attrib.get("label", None), - format_source=coll_root.attrib.get("format_source", None), - type_source=coll_root.attrib.get("type_source", None), - structured_like=coll_root.attrib.get("structured_like", None), - inherit_format=coll_root.attrib.get("inherit_format", None), - ) - # Deal with child nodes - for coll_child in coll_root: - try: - getattr(self, "_load_{}".format(coll_child.tag))(collection, coll_child) - except AttributeError: - logger.warning(coll_child.tag + " tag is not processed for <collection>.") - outputs_root.append(collection) - - def _load_discover_datasets(self, root, disc_root): - """ - Add <discover_datasets> to root (<collection>). - - :param root: root to append <collection> to. - :param disc_root: root of <discover_datasets> tag. - :param disc_root: :class:`xml.etree._Element` - """ - root.append( - gxtp.DiscoverDatasets( - disc_root.attrib["pattern"], - directory=disc_root.attrib.get("directory", None), - format=disc_root.attrib.get("format", None), - ext=disc_root.attrib.get("ext", None), - visible=disc_root.attrib.get("visible", None), - ) - ) - - def _load_filter(self, root, filter_root): - """ - Add <filter> to root (<collection> or <data>). - - :param root: root to append <collection> to. - :param coll_root: root of <filter> tag. - :param coll_root: :class:`xml.etree._Element` - """ - root.append(gxtp.OutputFilter(filter_root.text)) - - def load_outputs(self, root, outputs_root): - """ - Add <outputs> to the root. - - :param root: root to attach <outputs> to (<tool>). - :param tests_root: root of <outputs> tag. - :type tests_root: :class:`xml.etree._Element` - """ - for out_child in outputs_root: - try: - getattr(self, "_load_{}".format(out_child.tag))(root, out_child) - except AttributeError: - logger.warning(out_child.tag + " tag is not processed for <outputs>.") - - -class TestsParser(object): - """ - Class to parse content of the <tests> tag from a Galaxy XML wrapper. - """ - - def _load_param(self, test_root, param_root): - """ - Add <param> to the <test>. - - :param root: <test> root to append <param> to. - :param repeat_root: root of <param> tag. - :param repeat_root: :class:`xml.etree._Element` - """ - test_root.append( - gxtp.TestParam( - param_root.attrib["name"], - value=param_root.attrib.get("value", None), - ftype=param_root.attrib.get("ftype", None), - dbkey=param_root.attrib.get("dbkey", None), - ) - ) - - def _load_output(self, test_root, output_root): - """ - Add <output> to the <test>. - - :param root: <test> root to append <output> to. - :param repeat_root: root of <output> tag. - :param repeat_root: :class:`xml.etree._Element` - """ - test_root.append( - gxtp.TestOutput( - name=output_root.attrib.get("name", None), - file=output_root.attrib.get("file", None), - ftype=output_root.attrib.get("ftype", None), - sort=output_root.attrib.get("sort", None), - value=output_root.attrib.get("value", None), - md5=output_root.attrib.get("md5", None), - checksum=output_root.attrib.get("checksum", None), - compare=output_root.attrib.get("compare", None), - lines_diff=output_root.attrib.get("lines_diff", None), - delta=output_root.attrib.get("delta", None), - ) - ) - - def load_tests(self, root, tests_root): - """ - Add <tests> to the root. - - :param root: root to attach <tests> to (<tool>). - :param tests_root: root of <tests> tag. - :type tests_root: :class:`xml.etree._Element` - """ - for test_root in tests_root: - test = gxtp.Test() - for test_child in test_root: - try: - getattr(self, "_load_{}".format(test_child.tag))(test, test_child) - except AttributeError: - logger.warning(test_child.tag + " tag is not processed within <test>.") - root.append(test)
--- a/toolfactory/galaxyxml/tool/parameters/__init__.py Wed Aug 12 01:43:46 2020 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,739 +0,0 @@ -from builtins import object -from builtins import str - -from galaxyxml import Util - -from lxml import etree - - - -class XMLParam(object): - name = "node" - - def __init__(self, *args, **kwargs): - # http://stackoverflow.com/a/12118700 - self.children = [] - kwargs = {k: v for k, v in list(kwargs.items()) if v is not None} - kwargs = Util.coerce(kwargs, kill_lists=True) - kwargs = Util.clean_kwargs(kwargs, final=True) - self.node = etree.Element(self.name, **kwargs) - - def append(self, sub_node): - if self.acceptable_child(sub_node): - # If one of ours, they aren't etree nodes, they're custom objects - if issubclass(type(sub_node), XMLParam): - self.node.append(sub_node.node) - self.children.append(sub_node) - else: - raise Exception( - "Child was unacceptable to parent (%s is not appropriate for %s)" % (type(self), type(sub_node)) - ) - else: - raise Exception( - "Child was unacceptable to parent (%s is not appropriate for %s)" % (type(self), type(sub_node)) - ) - - def validate(self): - # Very few need validation, but some nodes we may want to have - # validation routines on. Should only be called when DONE. - for child in self.children: - # If any child fails to validate return false. - if not child.validate(): - return False - return True - - def cli(self): - lines = [] - for child in self.children: - lines.append(child.command_line()) - # lines += child.command_line() - return "\n".join(lines) - - def command_line(self): - return None - - -class RequestParamTranslation(XMLParam): - name = "request_param_translation" - - def __init__(self, **kwargs): - self.node = etree.Element(self.name) - - def acceptable_child(self, child): - return isinstance(child, RequestParamTranslation) - - -class RequestParam(XMLParam): - name = "request_param" - - def __init__(self, galaxy_name, remote_name, missing, **kwargs): - # TODO: bulk copy locals into self.attr? - self.galaxy_name = galaxy_name - # http://stackoverflow.com/a/1408860 - params = Util.clean_kwargs(locals().copy()) - super(RequestParam, self).__init__(**params) - - def acceptable_child(self, child): - return isinstance(child, AppendParam) and self.galaxy_name == "URL" - - -class AppendParam(XMLParam): - name = "append_param" - - def __init__(self, separator="&", first_separator="?", join="=", **kwargs): - params = Util.clean_kwargs(locals().copy()) - super(AppendParam, self).__init__(**params) - - def acceptable_child(self, child): - return isinstance(child, AppendParamValue) - - -class AppendParamValue(XMLParam): - name = "value" - - def __init__(self, name="_export", missing="1", **kwargs): - params = Util.clean_kwargs(locals().copy()) - super(AppendParamValue, self).__init__(**params) - - def acceptable_child(self, child): - return False - - -class EdamOperations(XMLParam): - name = "edam_operations" - - def acceptable_child(self, child): - return issubclass(type(child), EdamOperation) - - def has_operation(self, edam_operation): - """ - Check the presence of a given edam_operation. - - :type edam_operation: STRING - """ - for operation in self.children: - if operation.node.text == edam_operation: - return True - return False - - -class EdamOperation(XMLParam): - name = "edam_operation" - - def __init__(self, value): - super(EdamOperation, self).__init__() - self.node.text = str(value) - - -class EdamTopics(XMLParam): - name = "edam_topics" - - def acceptable_child(self, child): - return issubclass(type(child), EdamTopic) - - def has_topic(self, edam_topic): - """ - Check the presence of a given edam_topic. - - :type edam_topic: STRING - """ - for topic in self.children: - if topic.node.text == edam_topic: - return True - return False - - -class EdamTopic(XMLParam): - name = "edam_topic" - - def __init__(self, value): - super(EdamTopic, self).__init__() - self.node.text = str(value) - - -class Requirements(XMLParam): - name = "requirements" - # This bodes to be an issue -__- - - def acceptable_child(self, child): - return issubclass(type(child), Requirement) or issubclass(type(child), Container) - - -class Requirement(XMLParam): - name = "requirement" - - def __init__(self, type, value, version=None, **kwargs): - params = Util.clean_kwargs(locals().copy()) - passed_kwargs = {} - passed_kwargs["version"] = params["version"] - passed_kwargs["type"] = params["type"] - super(Requirement, self).__init__(**passed_kwargs) - self.node.text = str(value) - - -class Container(XMLParam): - name = "container" - - def __init__(self, type, value, **kwargs): - params = Util.clean_kwargs(locals().copy()) - passed_kwargs = {} - passed_kwargs["type"] = params["type"] - super(Container, self).__init__(**passed_kwargs) - self.node.text = str(value) - - -class Configfiles(XMLParam): - name = "configfiles" - - def acceptable_child(self, child): - return issubclass(type(child), Configfile) or issubclass(type(child), ConfigfileDefaultInputs) - - -class Configfile(XMLParam): - name = "configfile" - - def __init__(self, name, text, **kwargs): - params = Util.clean_kwargs(locals().copy()) - passed_kwargs = {} - passed_kwargs["name"] = params["name"] - super(Configfile, self).__init__(**passed_kwargs) - self.node.text = etree.CDATA(str(text)) - - -class ConfigfileDefaultInputs(XMLParam): - name = "inputs" - - def __init__(self, name, **kwargs): - params = Util.clean_kwargs(locals().copy()) - passed_kwargs = {} - passed_kwargs["name"] = params["name"] - super(ConfigfileDefaultInputs, self).__init__(**passed_kwargs) - - -class Inputs(XMLParam): - name = "inputs" - # This bodes to be an issue -__- - - def __init__(self, action=None, check_value=None, method=None, target=None, nginx_upload=None, **kwargs): - params = Util.clean_kwargs(locals().copy()) - super(Inputs, self).__init__(**params) - - def acceptable_child(self, child): - return issubclass(type(child), InputParameter) - - -class InputParameter(XMLParam): - def __init__(self, name, **kwargs): - # TODO: look at - self.mako_identifier = name - # We use kwargs instead of the usual locals(), so manually copy the - # name to kwargs - if name is not None: - kwargs["name"] = name - - # Handle positional parameters - if "positional" in kwargs and kwargs["positional"]: - self.positional = True - else: - self.positional = False - - if "num_dashes" in kwargs: - self.num_dashes = kwargs["num_dashes"] - del kwargs["num_dashes"] - else: - self.num_dashes = 0 - - self.space_between_arg = " " - - # Not sure about this :( - # https://wiki.galaxyproject.org/Tools/BestPractices#Parameter_help - if "label" in kwargs: - # TODO: replace with positional attribute - if len(self.flag()) > 0: - if kwargs["label"] is None: - kwargs["label"] = "Author did not provide help for this parameter... " - if not self.positional: - kwargs["argument"] = self.flag() - - super(InputParameter, self).__init__(**kwargs) - - def command_line(self): - before = self.command_line_before() - cli = self.command_line_actual() - after = self.command_line_after() - - complete = [x for x in (before, cli, after) if x is not None] - return "\n".join(complete) - - def command_line_before(self): - try: - return self.command_line_before_override - except Exception: - return None - - def command_line_after(self): - try: - return self.command_line_after_override - except Exception: - return None - - def command_line_actual(self): - try: - return self.command_line_override - except Exception: - if self.positional: - return self.mako_name() - else: - return "%s%s%s" % (self.flag(), self.space_between_arg, self.mako_name()) - - def mako_name(self): - # TODO: enhance logic to check up parents for things like - # repeat>condotion>param - return "$" + self.mako_identifier - - def flag(self): - flag = "-" * self.num_dashes - return flag + self.mako_identifier - - -class Section(InputParameter): - name = "section" - - def __init__(self, name, title, expanded=None, help=None, **kwargs): - params = Util.clean_kwargs(locals().copy()) - super(Section, self).__init__(**params) - - def acceptable_child(self, child): - return issubclass(type(child), InputParameter) - - -class Repeat(InputParameter): - name = "repeat" - - def __init__(self, name, title, min=None, max=None, default=None, **kwargs): - params = Util.clean_kwargs(locals().copy()) - # Allow overriding - self.command_line_before_override = "#for $i in $%s:" % name - self.command_line_after_override = "#end for" - # self.command_line_override - super(Repeat, self).__init__(**params) - - def acceptable_child(self, child): - return issubclass(type(child), InputParameter) - - def command_line_actual(self): - if hasattr(self, "command_line_override"): - return self.command_line_override - else: - return "%s" % self.mako_name() - - -class Conditional(InputParameter): - name = "conditional" - - def __init__(self, name, **kwargs): - params = Util.clean_kwargs(locals().copy()) - super(Conditional, self).__init__(**params) - - def acceptable_child(self, child): - return issubclass(type(child), InputParameter) and not isinstance(child, Conditional) - - def validate(self): - # Find a way to check if one of the kids is a WHEN - pass - - -class When(InputParameter): - name = "when" - - def __init__(self, value): - params = Util.clean_kwargs(locals().copy()) - super(When, self).__init__(None, **params) - - def acceptable_child(self, child): - return issubclass(type(child), InputParameter) - - -class Param(InputParameter): - name = "param" - - # This...isn't really valid as-is, and shouldn't be used. - def __init__(self, name, optional=None, label=None, help=None, **kwargs): - params = Util.clean_kwargs(locals().copy()) - params["type"] = self.type - super(Param, self).__init__(**params) - - if type(self) == Param: - raise Exception("Param class is not an actual parameter type, use a subclass of Param") - - def acceptable_child(self, child): - return issubclass(type(child, InputParameter) or isinstance(child), ValidatorParam) - - -class TextParam(Param): - type = "text" - - def __init__(self, name, optional=None, label=None, help=None, value=None, **kwargs): - params = Util.clean_kwargs(locals().copy()) - super(TextParam, self).__init__(**params) - - def command_line_actual(self): - try: - return self.command_line_override - except Exception: - if self.positional: - return self.mako_name() - else: - return f"{self.flag}{self.space_between_arg}'{self.mako_name()}'" - - -class _NumericParam(Param): - def __init__(self, name, value, optional=None, label=None, help=None, min=None, max=None, **kwargs): - params = Util.clean_kwargs(locals().copy()) - super(_NumericParam, self).__init__(**params) - - -class IntegerParam(_NumericParam): - type = "integer" - - -class FloatParam(_NumericParam): - type = "float" - - -class BooleanParam(Param): - type = "boolean" - - def __init__( - self, name, optional=None, label=None, help=None, checked=False, truevalue=None, falsevalue=None, **kwargs - ): - params = Util.clean_kwargs(locals().copy()) - - super(BooleanParam, self).__init__(**params) - if truevalue is None: - # If truevalue and falsevalue are None, then we use "auto", the IUC - # recommended default. - # - # truevalue is set to the parameter's value, and falsevalue is not. - # - # Unfortunately, mako_identifier is set as a result of the super - # call, which we shouldn't call TWICE, so we'll just hack around this :( - # params['truevalue'] = '%s%s' % (self.) - self.node.attrib["truevalue"] = self.flag() - - if falsevalue is None: - self.node.attrib["falsevalue"] = "" - - def command_line_actual(self): - if hasattr(self, "command_line_override"): - return self.command_line_override - else: - return "%s" % self.mako_name() - - -class DataParam(Param): - type = "data" - - def __init__(self, name, optional=None, label=None, help=None, format=None, multiple=None, **kwargs): - params = Util.clean_kwargs(locals().copy()) - super(DataParam, self).__init__(**params) - - -class SelectParam(Param): - type = "select" - - def __init__( - self, - name, - optional=None, - label=None, - help=None, - data_ref=None, - display=None, - multiple=None, - options=None, - default=None, - **kwargs - ): - params = Util.clean_kwargs(locals().copy()) - del params["options"] - del params["default"] - - super(SelectParam, self).__init__(**params) - - if options is not None and default is not None: - if default not in options: - raise Exception("Specified a default that isn't in options") - - if options: - for k, v in list(sorted(options.items())): - selected = k == default - self.append(SelectOption(k, v, selected=selected)) - - def acceptable_child(self, child): - return issubclass(type(child), SelectOption) or issubclass(type(child), Options) - - -class SelectOption(InputParameter): - name = "option" - - def __init__(self, value, text, selected=False, **kwargs): - params = Util.clean_kwargs(locals().copy()) - - passed_kwargs = {} - if selected: - passed_kwargs["selected"] = "true" - passed_kwargs["value"] = params["value"] - - super(SelectOption, self).__init__(None, **passed_kwargs) - self.node.text = str(text) - - -class Options(InputParameter): - name = "options" - - def __init__(self, from_dataset=None, from_file=None, from_data_table=None, from_parameter=None, **kwargs): - params = Util.clean_kwargs(locals().copy()) - super(Options, self).__init__(None, **params) - - def acceptable_child(self, child): - return issubclass(type(child), Column) or issubclass(type(child), Filter) - - -class Column(InputParameter): - name = "column" - - def __init__(self, name, index, **kwargs): - params = Util.clean_kwargs(locals().copy()) - super(Column, self).__init__(**params) - - -class Filter(InputParameter): - name = "filter" - - def __init__( - self, - type, - column=None, - name=None, - ref=None, - key=None, - multiple=None, - separator=None, - keep=None, - value=None, - ref_attribute=None, - index=None, - **kwargs - ): - params = Util.clean_kwargs(locals().copy()) - super(Filter, self).__init__(**params) - - -class ValidatorParam(InputParameter): - name = "validator" - - def __init__( - self, - type, - message=None, - filename=None, - metadata_name=None, - metadata_column=None, - line_startswith=None, - min=None, - max=None, - **kwargs - ): - params = Util.clean_kwargs(locals().copy()) - super(ValidatorParam, self).__init__(**params) - - -class Outputs(XMLParam): - name = "outputs" - - def acceptable_child(self, child): - return isinstance(child, OutputData) or isinstance(child, OutputCollection) - - -class OutputData(XMLParam): - """Copypasta of InputParameter, needs work - """ - - name = "data" - - def __init__( - self, - name, - format, - format_source=None, - metadata_source=None, - label=None, - from_work_dir=None, - hidden=False, - **kwargs - ): - # TODO: validate format_source&metadata_source against something in the - # XMLParam children tree. - self.mako_identifier = name - if "num_dashes" in kwargs: - self.num_dashes = kwargs["num_dashes"] - del kwargs["num_dashes"] - else: - self.num_dashes = 0 - self.space_between_arg = " " - params = Util.clean_kwargs(locals().copy()) - - super(OutputData, self).__init__(**params) - - def command_line(self): - if hasattr(self, "command_line_override"): - return self.command_line_override - else: - return "%s%s%s" % (self.flag(), self.space_between_arg, self.mako_name()) - - def mako_name(self): - return "$" + self.mako_identifier - - def flag(self): - flag = "-" * self.num_dashes - return flag + self.mako_identifier - - def acceptable_child(self, child): - return isinstance(child, OutputFilter) or isinstance(child, ChangeFormat) or isinstance(child, DiscoverDatasets) - - -class OutputFilter(XMLParam): - name = "filter" - - def __init__(self, text, **kwargs): - params = Util.clean_kwargs(locals().copy()) - del params["text"] - super(OutputFilter, self).__init__(**params) - self.node.text = text - - def acceptable_child(self, child): - return False - - -class ChangeFormat(XMLParam): - name = "change_format" - - def __init__(self, **kwargs): - params = Util.clean_kwargs(locals().copy()) - super(ChangeFormat, self).__init__(**params) - - def acceptable_child(self, child): - return isinstance(child, ChangeFormatWhen) - - -class ChangeFormatWhen(XMLParam): - name = "when" - - def __init__(self, input, format, value, **kwargs): - params = Util.clean_kwargs(locals().copy()) - super(ChangeFormatWhen, self).__init__(**params) - - def acceptable_child(self, child): - return False - - -class OutputCollection(XMLParam): - name = "collection" - - def __init__( - self, - name, - type=None, - label=None, - format_source=None, - type_source=None, - structured_like=None, - inherit_format=None, - **kwargs - ): - params = Util.clean_kwargs(locals().copy()) - super(OutputCollection, self).__init__(**params) - - def acceptable_child(self, child): - return isinstance(child, OutputData) or isinstance(child, OutputFilter) or isinstance(child, DiscoverDatasets) - - -class DiscoverDatasets(XMLParam): - name = "discover_datasets" - - def __init__(self, pattern, directory=None, format=None, ext=None, visible=None, **kwargs): - params = Util.clean_kwargs(locals().copy()) - super(DiscoverDatasets, self).__init__(**params) - - -class Tests(XMLParam): - name = "tests" - - def acceptable_child(self, child): - return issubclass(type(child), Test) - - -class Test(XMLParam): - name = "test" - - def acceptable_child(self, child): - return isinstance(child, TestParam) or isinstance(child, TestOutput) - - -class TestParam(XMLParam): - name = "param" - - def __init__(self, name, value=None, ftype=None, dbkey=None, **kwargs): - params = Util.clean_kwargs(locals().copy()) - super(TestParam, self).__init__(**params) - - -class TestOutput(XMLParam): - name = "output" - - def __init__( - self, - name=None, - file=None, - ftype=None, - sort=None, - value=None, - md5=None, - checksum=None, - compare=None, - lines_diff=None, - delta=None, - **kwargs - ): - params = Util.clean_kwargs(locals().copy()) - super(TestOutput, self).__init__(**params) - - -class Citations(XMLParam): - name = "citations" - - def acceptable_child(self, child): - return issubclass(type(child), Citation) - - def has_citation(self, type, value): - """ - Check the presence of a given citation. - - :type type: STRING - :type value: STRING - """ - for citation in self.children: - if citation.node.attrib["type"] == type and citation.node.text == value: - return True - return False - - -class Citation(XMLParam): - name = "citation" - - def __init__(self, type, value): - passed_kwargs = {} - passed_kwargs["type"] = type - super(Citation, self).__init__(**passed_kwargs) - self.node.text = str(value)
--- a/toolfactory/rgToolFactory2.py Wed Aug 12 01:43:46 2020 -0400 +++ b/toolfactory/rgToolFactory2.py Wed Aug 12 19:00:44 2020 -0400 @@ -17,8 +17,10 @@ # removed all the old complications including making the new tool use this same script # galaxyxml now generates the tool xml https://github.com/hexylena/galaxyxml # No support for automatic HTML file creation from arbitrary outputs -# TODO: add option to run that code as a post execution hook -# TODO: add additional history input parameters - currently only one +# essential problem is to create two command lines - one for the tool xml and a different +# one to run the executable with the supplied test data and settings +# Be simpler to write the tool, then run it with planemo and soak up the test outputs. + import argparse @@ -249,6 +251,8 @@ ) art = "%s.%s" % (self.tool_name, self.args.interpreter_name) artifact = open(art, "wb") + if self.args.interpreter_name == "python": + artifact.write(bytes("#!/usr/bin/env python\n", "utf8")) artifact.write(bytes(self.script, "utf8")) artifact.close() aCL(self.args.interpreter_name) @@ -541,7 +545,8 @@ ) self.tool.add_comment("Source in git at: %s" % (toolFactoryURL)) self.tool.add_comment( - "Cite: Creating re-usable tools from scripts doi: 10.1093/bioinformatics/bts573" + "Cite: Creating re-usable tools from scripts doi: \ + 10.1093/bioinformatics/bts573" ) exml = self.tool.export() xf = open('%s.xml' % self.tool_name, "w")
--- a/toolfactory/rgToolFactory2.xml Wed Aug 12 01:43:46 2020 -0400 +++ b/toolfactory/rgToolFactory2.xml Wed Aug 12 19:00:44 2020 -0400 @@ -122,7 +122,7 @@ </macros> <requirements> <requirement type="package">python</requirement> - <requirement type="package">lxml</requirement> + <requirement type="package">galaxyxml</requirement> </requirements> <command interpreter="python"><![CDATA[ #import os