comparison env/lib/python3.9/site-packages/planemo/galaxy/workflows.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """Utilities for Galaxy workflows."""
2 import json
3 import os
4 from collections import namedtuple
5 from urllib.parse import urlparse
6
7 import yaml
8 from ephemeris import generate_tool_list_from_ga_workflow_files
9 from ephemeris import shed_tools
10 from gxformat2.converter import python_to_workflow
11 from gxformat2.interface import BioBlendImporterGalaxyInterface
12 from gxformat2.interface import ImporterGalaxyInterface
13 from gxformat2.normalize import inputs_normalized, outputs_normalized
14
15 from planemo.io import warn
16
17 FAILED_REPOSITORIES_MESSAGE = "Failed to install one or more repositories."
18 GALAXY_WORKFLOWS_PREFIX = "gxid://workflows/"
19
20
21 def load_shed_repos(runnable):
22 if runnable.type.name != "galaxy_workflow":
23 return []
24 path = runnable.path
25 if path.endswith(".ga"):
26 generate_tool_list_from_ga_workflow_files.generate_tool_list_from_workflow([path], "Tools from workflows", "tools.yml")
27 with open("tools.yml", "r") as f:
28 tools = yaml.safe_load(f)["tools"]
29
30 else:
31 # It'd be better to just infer this from the tool shed ID somehow than
32 # require explicit annotation like this... I think?
33 with open(path, "r") as f:
34 workflow = yaml.safe_load(f)
35
36 tools = workflow.get("tools", [])
37
38 return tools
39
40
41 def install_shed_repos(runnable, admin_gi,
42 ignore_dependency_problems,
43 install_tool_dependencies=False,
44 install_resolver_dependencies=True,
45 install_repository_dependencies=True):
46 tools_info = load_shed_repos(runnable)
47 if tools_info:
48 install_tool_manager = shed_tools.InstallRepositoryManager(admin_gi)
49 install_results = install_tool_manager.install_repositories(tools_info,
50 default_install_tool_dependencies=install_tool_dependencies,
51 default_install_resolver_dependencies=install_resolver_dependencies,
52 default_install_repository_dependencies=install_repository_dependencies)
53 if install_results.errored_repositories:
54 if ignore_dependency_problems:
55 warn(FAILED_REPOSITORIES_MESSAGE)
56 else:
57 raise Exception(FAILED_REPOSITORIES_MESSAGE)
58
59
60 def import_workflow(path, admin_gi, user_gi, from_path=False):
61 """Import a workflow path to specified Galaxy instance."""
62 if not from_path:
63 importer = BioBlendImporterGalaxyInterface(
64 admin_gi=admin_gi,
65 user_gi=user_gi
66 )
67 workflow = _raw_dict(path, importer)
68 return user_gi.workflows.import_workflow_dict(workflow)
69 else:
70 path = os.path.abspath(path)
71 workflow = user_gi.workflows.import_workflow_from_local_path(path)
72 return workflow
73
74
75 def _raw_dict(path, importer=None):
76 if path.endswith(".ga"):
77 with open(path, "r") as f:
78 workflow = json.load(f)
79 else:
80 if importer is None:
81 importer = DummyImporterGalaxyInterface()
82
83 workflow_directory = os.path.dirname(path)
84 workflow_directory = os.path.abspath(workflow_directory)
85 with open(path, "r") as f:
86 workflow = yaml.safe_load(f)
87 workflow = python_to_workflow(workflow, importer, workflow_directory)
88
89 return workflow
90
91
92 def find_tool_ids(path):
93 tool_ids = set()
94 workflow = _raw_dict(path)
95
96 def register_tool_ids(tool_ids, workflow):
97 for step in workflow["steps"].values():
98 if step.get('subworkflow'):
99 register_tool_ids(tool_ids, step['subworkflow'])
100 elif step.get("tool_id"):
101 tool_ids.add(step['tool_id'])
102
103 register_tool_ids(tool_ids, workflow)
104
105 return list(tool_ids)
106
107
108 WorkflowOutput = namedtuple("WorkflowOutput", ["order_index", "output_name", "label"])
109
110
111 def remote_runnable_to_workflow_id(runnable):
112 assert runnable.is_remote_workflow_uri
113 parse_result = urlparse(runnable.uri)
114 return parse_result.path[1:]
115
116
117 def describe_outputs(runnable, gi=None):
118 """Return a list of :class:`WorkflowOutput` objects for target workflow."""
119 if runnable.uri.startswith(GALAXY_WORKFLOWS_PREFIX):
120 workflow_id = remote_runnable_to_workflow_id(runnable)
121 assert gi is not None
122 workflow = get_dict_from_workflow(gi, workflow_id)
123 else:
124 workflow = _raw_dict(runnable.path)
125
126 outputs = []
127 for (order_index, step) in workflow["steps"].items():
128 step_outputs = step.get("workflow_outputs", [])
129 for step_output in step_outputs:
130 output = WorkflowOutput(
131 int(order_index),
132 step_output["output_name"],
133 step_output["label"],
134 )
135 outputs.append(output)
136 return outputs
137
138
139 class DummyImporterGalaxyInterface(ImporterGalaxyInterface):
140
141 def import_workflow(self, workflow, **kwds):
142 return None
143
144
145 def input_labels(workflow_path):
146 """Get normalized labels for workflow artifact regardless of format."""
147 steps = inputs_normalized(workflow_path=workflow_path)
148 labels = []
149 for step in steps:
150 step_id = input_label(step)
151 if step_id:
152 labels.append(step_id)
153 return labels
154
155
156 def required_input_steps(workflow_path):
157 try:
158 steps = inputs_normalized(workflow_path=workflow_path)
159 except Exception:
160 raise Exception("Input workflow could not be successfully normalized - try linting with planemo workflow_lint.")
161 required_steps = []
162 for input_step in steps:
163 if input_step.get("optional", False) or input_step.get("default"):
164 continue
165 required_steps.append(input_step)
166 return required_steps
167
168
169 def required_input_labels(workflow_path):
170 return map(input_label, required_input_steps(workflow_path))
171
172
173 def input_label(input_step):
174 """Get the normalized label of a step returned from inputs_normalized."""
175 step_id = input_step.get("id") or input_step.get("label")
176 return step_id
177
178
179 def output_labels(workflow_path):
180 outputs = outputs_normalized(workflow_path=workflow_path)
181 return [o["id"] for o in outputs]
182
183
184 def output_stubs_for_workflow(workflow_path):
185 """
186 Return output labels and class.
187 """
188 outputs = {}
189 for label in output_labels(workflow_path):
190 if not label.startswith('_anonymous_'):
191 outputs[label] = {'class': ''}
192 return outputs
193
194
195 def job_template(workflow_path):
196 """Return a job template for specified workflow.
197
198 A dictionary describing non-optional inputs that must be specified to
199 run the workflow.
200 """
201 template = {}
202 for required_input_step in required_input_steps(workflow_path):
203 i_label = input_label(required_input_step)
204 input_type = required_input_step["type"]
205 if input_type == "data":
206 template[i_label] = {
207 "class": "File",
208 "path": "todo_test_data_path.ext",
209 }
210 elif input_type == "collection":
211 template[i_label] = {
212 "class": "Collection",
213 "collection_type": "list",
214 "elements": [
215 {
216 "class": "File",
217 "identifier": "todo_element_name",
218 "path": "todo_test_data_path.ext",
219 }
220 ],
221 }
222 elif input_type in ['string', 'int', 'float', 'boolean', 'color']:
223 template[i_label] = "todo_param_value"
224 else:
225 template[i_label] = {
226 "TODO", # Does this work yet?
227 }
228 return template
229
230
231 def new_workflow_associated_path(workflow_path, suffix="tests"):
232 """Generate path for test or job YAML file next to workflow."""
233 base, input_ext = os.path.splitext(workflow_path)
234 # prefer -tests.yml but if the author uses underscores or .yaml respect that.
235 sep = "-"
236 if "_" in base and "-" not in base:
237 sep = "_"
238 ext = "yml"
239 if "yaml" in input_ext:
240 ext = "yaml"
241 return base + sep + suffix + "." + ext
242
243
244 def get_dict_from_workflow(gi, workflow_id):
245 return gi.workflows.export_workflow_dict(workflow_id)
246
247
248 __all__ = (
249 "import_workflow",
250 "describe_outputs",
251 )