Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/planemo/galaxy/workflows.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
| author | shellac |
|---|---|
| date | Sat, 02 May 2020 07:14:21 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:26e78fe6e8c4 |
|---|---|
| 1 """Utilities for Galaxy workflows.""" | |
| 2 import json | |
| 3 import os | |
| 4 from collections import namedtuple | |
| 5 | |
| 6 import yaml | |
| 7 from bioblend.galaxy.client import Client | |
| 8 from ephemeris import generate_tool_list_from_ga_workflow_files | |
| 9 from ephemeris import shed_tools | |
| 10 from gxformat2.converter import python_to_workflow | |
| 11 from gxformat2.interface import BioBlendImporterGalaxyInterface | |
| 12 from gxformat2.interface import ImporterGalaxyInterface | |
| 13 | |
| 14 from planemo.io import warn | |
| 15 | |
| 16 FAILED_REPOSITORIES_MESSAGE = "Failed to install one or more repositories." | |
| 17 | |
| 18 | |
| 19 def load_shed_repos(runnable): | |
| 20 if runnable.type.name != "galaxy_workflow": | |
| 21 return [] | |
| 22 | |
| 23 path = runnable.path | |
| 24 if path.endswith(".ga"): | |
| 25 generate_tool_list_from_ga_workflow_files.generate_tool_list_from_workflow([path], "Tools from workflows", "tools.yml") | |
| 26 with open("tools.yml", "r") as f: | |
| 27 tools = yaml.safe_load(f)["tools"] | |
| 28 | |
| 29 else: | |
| 30 # It'd be better to just infer this from the tool shed ID somehow than | |
| 31 # require explicit annotation like this... I think? | |
| 32 with open(path, "r") as f: | |
| 33 workflow = yaml.safe_load(f) | |
| 34 | |
| 35 tools = workflow.get("tools", []) | |
| 36 | |
| 37 return tools | |
| 38 | |
| 39 | |
| 40 def install_shed_repos(runnable, admin_gi, ignore_dependency_problems): | |
| 41 tools_info = load_shed_repos(runnable) | |
| 42 if tools_info: | |
| 43 install_tool_manager = shed_tools.InstallRepositoryManager(admin_gi) | |
| 44 install_results = install_tool_manager.install_repositories(tools_info) | |
| 45 if install_results.errored_repositories: | |
| 46 if ignore_dependency_problems: | |
| 47 warn(FAILED_REPOSITORIES_MESSAGE) | |
| 48 else: | |
| 49 raise Exception(FAILED_REPOSITORIES_MESSAGE) | |
| 50 | |
| 51 | |
| 52 def import_workflow(path, admin_gi, user_gi, from_path=False): | |
| 53 """Import a workflow path to specified Galaxy instance.""" | |
| 54 if not from_path: | |
| 55 importer = BioBlendImporterGalaxyInterface( | |
| 56 admin_gi=admin_gi, | |
| 57 user_gi=user_gi | |
| 58 ) | |
| 59 workflow = _raw_dict(path, importer) | |
| 60 return importer.import_workflow(workflow) | |
| 61 else: | |
| 62 # TODO: Update bioblend to allow from_path. | |
| 63 path = os.path.abspath(path) | |
| 64 payload = dict( | |
| 65 from_path=path | |
| 66 ) | |
| 67 workflows_url = user_gi._make_url(user_gi.workflows) | |
| 68 workflow = Client._post(user_gi.workflows, payload, url=workflows_url) | |
| 69 return workflow | |
| 70 | |
| 71 | |
| 72 def _raw_dict(path, importer=None): | |
| 73 if path.endswith(".ga"): | |
| 74 with open(path, "r") as f: | |
| 75 workflow = json.load(f) | |
| 76 else: | |
| 77 if importer is None: | |
| 78 importer = DummyImporterGalaxyInterface() | |
| 79 | |
| 80 workflow_directory = os.path.dirname(path) | |
| 81 workflow_directory = os.path.abspath(workflow_directory) | |
| 82 with open(path, "r") as f: | |
| 83 workflow = yaml.safe_load(f) | |
| 84 workflow = python_to_workflow(workflow, importer, workflow_directory) | |
| 85 | |
| 86 return workflow | |
| 87 | |
| 88 | |
| 89 def find_tool_ids(path): | |
| 90 tool_ids = [] | |
| 91 workflow = _raw_dict(path) | |
| 92 for (order_index, step) in workflow["steps"].items(): | |
| 93 tool_id = step.get("tool_id") | |
| 94 tool_ids.append(tool_id) | |
| 95 | |
| 96 return tool_ids | |
| 97 | |
| 98 | |
| 99 WorkflowOutput = namedtuple("WorkflowOutput", ["order_index", "output_name", "label"]) | |
| 100 | |
| 101 | |
| 102 def describe_outputs(path): | |
| 103 """Return a list of :class:`WorkflowOutput` objects for target workflow.""" | |
| 104 workflow = _raw_dict(path) | |
| 105 outputs = [] | |
| 106 for (order_index, step) in workflow["steps"].items(): | |
| 107 step_outputs = step.get("workflow_outputs", []) | |
| 108 for step_output in step_outputs: | |
| 109 output = WorkflowOutput( | |
| 110 int(order_index), | |
| 111 step_output["output_name"], | |
| 112 step_output["label"], | |
| 113 ) | |
| 114 outputs.append(output) | |
| 115 return outputs | |
| 116 | |
| 117 | |
| 118 class DummyImporterGalaxyInterface(ImporterGalaxyInterface): | |
| 119 | |
| 120 def import_workflow(self, workflow, **kwds): | |
| 121 return None | |
| 122 | |
| 123 | |
| 124 __all__ = ( | |
| 125 "import_workflow", | |
| 126 "describe_outputs", | |
| 127 ) |
