Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/planemo/galaxy/workflows.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 """Utilities for Galaxy workflows.""" | |
2 import json | |
3 import os | |
4 from collections import namedtuple | |
5 from urllib.parse import urlparse | |
6 | |
7 import yaml | |
8 from ephemeris import generate_tool_list_from_ga_workflow_files | |
9 from ephemeris import shed_tools | |
10 from gxformat2.converter import python_to_workflow | |
11 from gxformat2.interface import BioBlendImporterGalaxyInterface | |
12 from gxformat2.interface import ImporterGalaxyInterface | |
13 from gxformat2.normalize import inputs_normalized, outputs_normalized | |
14 | |
15 from planemo.io import warn | |
16 | |
17 FAILED_REPOSITORIES_MESSAGE = "Failed to install one or more repositories." | |
18 GALAXY_WORKFLOWS_PREFIX = "gxid://workflows/" | |
19 | |
20 | |
21 def load_shed_repos(runnable): | |
22 if runnable.type.name != "galaxy_workflow": | |
23 return [] | |
24 path = runnable.path | |
25 if path.endswith(".ga"): | |
26 generate_tool_list_from_ga_workflow_files.generate_tool_list_from_workflow([path], "Tools from workflows", "tools.yml") | |
27 with open("tools.yml", "r") as f: | |
28 tools = yaml.safe_load(f)["tools"] | |
29 | |
30 else: | |
31 # It'd be better to just infer this from the tool shed ID somehow than | |
32 # require explicit annotation like this... I think? | |
33 with open(path, "r") as f: | |
34 workflow = yaml.safe_load(f) | |
35 | |
36 tools = workflow.get("tools", []) | |
37 | |
38 return tools | |
39 | |
40 | |
41 def install_shed_repos(runnable, admin_gi, | |
42 ignore_dependency_problems, | |
43 install_tool_dependencies=False, | |
44 install_resolver_dependencies=True, | |
45 install_repository_dependencies=True): | |
46 tools_info = load_shed_repos(runnable) | |
47 if tools_info: | |
48 install_tool_manager = shed_tools.InstallRepositoryManager(admin_gi) | |
49 install_results = install_tool_manager.install_repositories(tools_info, | |
50 default_install_tool_dependencies=install_tool_dependencies, | |
51 default_install_resolver_dependencies=install_resolver_dependencies, | |
52 default_install_repository_dependencies=install_repository_dependencies) | |
53 if install_results.errored_repositories: | |
54 if ignore_dependency_problems: | |
55 warn(FAILED_REPOSITORIES_MESSAGE) | |
56 else: | |
57 raise Exception(FAILED_REPOSITORIES_MESSAGE) | |
58 | |
59 | |
60 def import_workflow(path, admin_gi, user_gi, from_path=False): | |
61 """Import a workflow path to specified Galaxy instance.""" | |
62 if not from_path: | |
63 importer = BioBlendImporterGalaxyInterface( | |
64 admin_gi=admin_gi, | |
65 user_gi=user_gi | |
66 ) | |
67 workflow = _raw_dict(path, importer) | |
68 return user_gi.workflows.import_workflow_dict(workflow) | |
69 else: | |
70 path = os.path.abspath(path) | |
71 workflow = user_gi.workflows.import_workflow_from_local_path(path) | |
72 return workflow | |
73 | |
74 | |
75 def _raw_dict(path, importer=None): | |
76 if path.endswith(".ga"): | |
77 with open(path, "r") as f: | |
78 workflow = json.load(f) | |
79 else: | |
80 if importer is None: | |
81 importer = DummyImporterGalaxyInterface() | |
82 | |
83 workflow_directory = os.path.dirname(path) | |
84 workflow_directory = os.path.abspath(workflow_directory) | |
85 with open(path, "r") as f: | |
86 workflow = yaml.safe_load(f) | |
87 workflow = python_to_workflow(workflow, importer, workflow_directory) | |
88 | |
89 return workflow | |
90 | |
91 | |
92 def find_tool_ids(path): | |
93 tool_ids = set() | |
94 workflow = _raw_dict(path) | |
95 | |
96 def register_tool_ids(tool_ids, workflow): | |
97 for step in workflow["steps"].values(): | |
98 if step.get('subworkflow'): | |
99 register_tool_ids(tool_ids, step['subworkflow']) | |
100 elif step.get("tool_id"): | |
101 tool_ids.add(step['tool_id']) | |
102 | |
103 register_tool_ids(tool_ids, workflow) | |
104 | |
105 return list(tool_ids) | |
106 | |
107 | |
108 WorkflowOutput = namedtuple("WorkflowOutput", ["order_index", "output_name", "label"]) | |
109 | |
110 | |
111 def remote_runnable_to_workflow_id(runnable): | |
112 assert runnable.is_remote_workflow_uri | |
113 parse_result = urlparse(runnable.uri) | |
114 return parse_result.path[1:] | |
115 | |
116 | |
117 def describe_outputs(runnable, gi=None): | |
118 """Return a list of :class:`WorkflowOutput` objects for target workflow.""" | |
119 if runnable.uri.startswith(GALAXY_WORKFLOWS_PREFIX): | |
120 workflow_id = remote_runnable_to_workflow_id(runnable) | |
121 assert gi is not None | |
122 workflow = get_dict_from_workflow(gi, workflow_id) | |
123 else: | |
124 workflow = _raw_dict(runnable.path) | |
125 | |
126 outputs = [] | |
127 for (order_index, step) in workflow["steps"].items(): | |
128 step_outputs = step.get("workflow_outputs", []) | |
129 for step_output in step_outputs: | |
130 output = WorkflowOutput( | |
131 int(order_index), | |
132 step_output["output_name"], | |
133 step_output["label"], | |
134 ) | |
135 outputs.append(output) | |
136 return outputs | |
137 | |
138 | |
139 class DummyImporterGalaxyInterface(ImporterGalaxyInterface): | |
140 | |
141 def import_workflow(self, workflow, **kwds): | |
142 return None | |
143 | |
144 | |
145 def input_labels(workflow_path): | |
146 """Get normalized labels for workflow artifact regardless of format.""" | |
147 steps = inputs_normalized(workflow_path=workflow_path) | |
148 labels = [] | |
149 for step in steps: | |
150 step_id = input_label(step) | |
151 if step_id: | |
152 labels.append(step_id) | |
153 return labels | |
154 | |
155 | |
156 def required_input_steps(workflow_path): | |
157 try: | |
158 steps = inputs_normalized(workflow_path=workflow_path) | |
159 except Exception: | |
160 raise Exception("Input workflow could not be successfully normalized - try linting with planemo workflow_lint.") | |
161 required_steps = [] | |
162 for input_step in steps: | |
163 if input_step.get("optional", False) or input_step.get("default"): | |
164 continue | |
165 required_steps.append(input_step) | |
166 return required_steps | |
167 | |
168 | |
169 def required_input_labels(workflow_path): | |
170 return map(input_label, required_input_steps(workflow_path)) | |
171 | |
172 | |
173 def input_label(input_step): | |
174 """Get the normalized label of a step returned from inputs_normalized.""" | |
175 step_id = input_step.get("id") or input_step.get("label") | |
176 return step_id | |
177 | |
178 | |
179 def output_labels(workflow_path): | |
180 outputs = outputs_normalized(workflow_path=workflow_path) | |
181 return [o["id"] for o in outputs] | |
182 | |
183 | |
184 def output_stubs_for_workflow(workflow_path): | |
185 """ | |
186 Return output labels and class. | |
187 """ | |
188 outputs = {} | |
189 for label in output_labels(workflow_path): | |
190 if not label.startswith('_anonymous_'): | |
191 outputs[label] = {'class': ''} | |
192 return outputs | |
193 | |
194 | |
195 def job_template(workflow_path): | |
196 """Return a job template for specified workflow. | |
197 | |
198 A dictionary describing non-optional inputs that must be specified to | |
199 run the workflow. | |
200 """ | |
201 template = {} | |
202 for required_input_step in required_input_steps(workflow_path): | |
203 i_label = input_label(required_input_step) | |
204 input_type = required_input_step["type"] | |
205 if input_type == "data": | |
206 template[i_label] = { | |
207 "class": "File", | |
208 "path": "todo_test_data_path.ext", | |
209 } | |
210 elif input_type == "collection": | |
211 template[i_label] = { | |
212 "class": "Collection", | |
213 "collection_type": "list", | |
214 "elements": [ | |
215 { | |
216 "class": "File", | |
217 "identifier": "todo_element_name", | |
218 "path": "todo_test_data_path.ext", | |
219 } | |
220 ], | |
221 } | |
222 elif input_type in ['string', 'int', 'float', 'boolean', 'color']: | |
223 template[i_label] = "todo_param_value" | |
224 else: | |
225 template[i_label] = { | |
226 "TODO", # Does this work yet? | |
227 } | |
228 return template | |
229 | |
230 | |
231 def new_workflow_associated_path(workflow_path, suffix="tests"): | |
232 """Generate path for test or job YAML file next to workflow.""" | |
233 base, input_ext = os.path.splitext(workflow_path) | |
234 # prefer -tests.yml but if the author uses underscores or .yaml respect that. | |
235 sep = "-" | |
236 if "_" in base and "-" not in base: | |
237 sep = "_" | |
238 ext = "yml" | |
239 if "yaml" in input_ext: | |
240 ext = "yaml" | |
241 return base + sep + suffix + "." + ext | |
242 | |
243 | |
244 def get_dict_from_workflow(gi, workflow_id): | |
245 return gi.workflows.export_workflow_dict(workflow_id) | |
246 | |
247 | |
248 __all__ = ( | |
249 "import_workflow", | |
250 "describe_outputs", | |
251 ) |