Mercurial > repos > shellac > sam_consensus_v3
view env/lib/python3.9/site-packages/planemo/galaxy/workflows.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line source
"""Utilities for Galaxy workflows.""" import json import os from collections import namedtuple from urllib.parse import urlparse import yaml from ephemeris import generate_tool_list_from_ga_workflow_files from ephemeris import shed_tools from gxformat2.converter import python_to_workflow from gxformat2.interface import BioBlendImporterGalaxyInterface from gxformat2.interface import ImporterGalaxyInterface from gxformat2.normalize import inputs_normalized, outputs_normalized from planemo.io import warn FAILED_REPOSITORIES_MESSAGE = "Failed to install one or more repositories." GALAXY_WORKFLOWS_PREFIX = "gxid://workflows/" def load_shed_repos(runnable): if runnable.type.name != "galaxy_workflow": return [] path = runnable.path if path.endswith(".ga"): generate_tool_list_from_ga_workflow_files.generate_tool_list_from_workflow([path], "Tools from workflows", "tools.yml") with open("tools.yml", "r") as f: tools = yaml.safe_load(f)["tools"] else: # It'd be better to just infer this from the tool shed ID somehow than # require explicit annotation like this... I think? with open(path, "r") as f: workflow = yaml.safe_load(f) tools = workflow.get("tools", []) return tools def install_shed_repos(runnable, admin_gi, ignore_dependency_problems, install_tool_dependencies=False, install_resolver_dependencies=True, install_repository_dependencies=True): tools_info = load_shed_repos(runnable) if tools_info: install_tool_manager = shed_tools.InstallRepositoryManager(admin_gi) install_results = install_tool_manager.install_repositories(tools_info, default_install_tool_dependencies=install_tool_dependencies, default_install_resolver_dependencies=install_resolver_dependencies, default_install_repository_dependencies=install_repository_dependencies) if install_results.errored_repositories: if ignore_dependency_problems: warn(FAILED_REPOSITORIES_MESSAGE) else: raise Exception(FAILED_REPOSITORIES_MESSAGE) def import_workflow(path, admin_gi, user_gi, from_path=False): """Import a workflow path to specified Galaxy instance.""" if not from_path: importer = BioBlendImporterGalaxyInterface( admin_gi=admin_gi, user_gi=user_gi ) workflow = _raw_dict(path, importer) return user_gi.workflows.import_workflow_dict(workflow) else: path = os.path.abspath(path) workflow = user_gi.workflows.import_workflow_from_local_path(path) return workflow def _raw_dict(path, importer=None): if path.endswith(".ga"): with open(path, "r") as f: workflow = json.load(f) else: if importer is None: importer = DummyImporterGalaxyInterface() workflow_directory = os.path.dirname(path) workflow_directory = os.path.abspath(workflow_directory) with open(path, "r") as f: workflow = yaml.safe_load(f) workflow = python_to_workflow(workflow, importer, workflow_directory) return workflow def find_tool_ids(path): tool_ids = set() workflow = _raw_dict(path) def register_tool_ids(tool_ids, workflow): for step in workflow["steps"].values(): if step.get('subworkflow'): register_tool_ids(tool_ids, step['subworkflow']) elif step.get("tool_id"): tool_ids.add(step['tool_id']) register_tool_ids(tool_ids, workflow) return list(tool_ids) WorkflowOutput = namedtuple("WorkflowOutput", ["order_index", "output_name", "label"]) def remote_runnable_to_workflow_id(runnable): assert runnable.is_remote_workflow_uri parse_result = urlparse(runnable.uri) return parse_result.path[1:] def describe_outputs(runnable, gi=None): """Return a list of :class:`WorkflowOutput` objects for target workflow.""" if runnable.uri.startswith(GALAXY_WORKFLOWS_PREFIX): workflow_id = remote_runnable_to_workflow_id(runnable) assert gi is not None workflow = get_dict_from_workflow(gi, workflow_id) else: workflow = _raw_dict(runnable.path) outputs = [] for (order_index, step) in workflow["steps"].items(): step_outputs = step.get("workflow_outputs", []) for step_output in step_outputs: output = WorkflowOutput( int(order_index), step_output["output_name"], step_output["label"], ) outputs.append(output) return outputs class DummyImporterGalaxyInterface(ImporterGalaxyInterface): def import_workflow(self, workflow, **kwds): return None def input_labels(workflow_path): """Get normalized labels for workflow artifact regardless of format.""" steps = inputs_normalized(workflow_path=workflow_path) labels = [] for step in steps: step_id = input_label(step) if step_id: labels.append(step_id) return labels def required_input_steps(workflow_path): try: steps = inputs_normalized(workflow_path=workflow_path) except Exception: raise Exception("Input workflow could not be successfully normalized - try linting with planemo workflow_lint.") required_steps = [] for input_step in steps: if input_step.get("optional", False) or input_step.get("default"): continue required_steps.append(input_step) return required_steps def required_input_labels(workflow_path): return map(input_label, required_input_steps(workflow_path)) def input_label(input_step): """Get the normalized label of a step returned from inputs_normalized.""" step_id = input_step.get("id") or input_step.get("label") return step_id def output_labels(workflow_path): outputs = outputs_normalized(workflow_path=workflow_path) return [o["id"] for o in outputs] def output_stubs_for_workflow(workflow_path): """ Return output labels and class. """ outputs = {} for label in output_labels(workflow_path): if not label.startswith('_anonymous_'): outputs[label] = {'class': ''} return outputs def job_template(workflow_path): """Return a job template for specified workflow. A dictionary describing non-optional inputs that must be specified to run the workflow. """ template = {} for required_input_step in required_input_steps(workflow_path): i_label = input_label(required_input_step) input_type = required_input_step["type"] if input_type == "data": template[i_label] = { "class": "File", "path": "todo_test_data_path.ext", } elif input_type == "collection": template[i_label] = { "class": "Collection", "collection_type": "list", "elements": [ { "class": "File", "identifier": "todo_element_name", "path": "todo_test_data_path.ext", } ], } elif input_type in ['string', 'int', 'float', 'boolean', 'color']: template[i_label] = "todo_param_value" else: template[i_label] = { "TODO", # Does this work yet? } return template def new_workflow_associated_path(workflow_path, suffix="tests"): """Generate path for test or job YAML file next to workflow.""" base, input_ext = os.path.splitext(workflow_path) # prefer -tests.yml but if the author uses underscores or .yaml respect that. sep = "-" if "_" in base and "-" not in base: sep = "_" ext = "yml" if "yaml" in input_ext: ext = "yaml" return base + sep + suffix + "." + ext def get_dict_from_workflow(gi, workflow_id): return gi.workflows.export_workflow_dict(workflow_id) __all__ = ( "import_workflow", "describe_outputs", )