Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/gxformat2/converter.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
0:d30785e31577 | 1:56ad4e20f292 |
---|---|
1 """Functionality for converting a Format 2 workflow into a standard Galaxy workflow.""" | |
2 from __future__ import print_function | |
3 | |
4 import copy | |
5 import json | |
6 import logging | |
7 import os | |
8 import sys | |
9 import uuid | |
10 from collections import OrderedDict | |
11 | |
12 from ._labels import Labels | |
13 from ._yaml import ordered_load | |
14 | |
15 # source: step#output and $link: step#output instead of outputSource: step/output and $link: step/output | |
16 SUPPORT_LEGACY_CONNECTIONS = os.environ.get("GXFORMAT2_SUPPORT_LEGACY_CONNECTIONS") == "1" | |
17 STEP_TYPES = [ | |
18 "subworkflow", | |
19 "data_input", | |
20 "data_collection_input", | |
21 "tool", | |
22 "pause", | |
23 "parameter_input", | |
24 ] | |
25 | |
26 STEP_TYPE_ALIASES = { | |
27 'input': 'data_input', | |
28 'input_collection': 'data_collection_input', | |
29 'parameter': 'parameter_input', | |
30 } | |
31 | |
32 RUN_ACTIONS_TO_STEPS = { | |
33 'GalaxyWorkflow': 'run_workflow_to_step', | |
34 'GalaxyTool': 'run_tool_to_step', | |
35 } | |
36 | |
37 POST_JOB_ACTIONS = { | |
38 'hide': { | |
39 'action_class': "HideDatasetAction", | |
40 'default': False, | |
41 'arguments': lambda x: x, | |
42 }, | |
43 'rename': { | |
44 'action_class': 'RenameDatasetAction', | |
45 'default': {}, | |
46 'arguments': lambda x: {'newname': x}, | |
47 }, | |
48 'delete_intermediate_datasets': { | |
49 'action_class': 'DeleteIntermediatesAction', | |
50 'default': False, | |
51 'arguments': lambda x: x, | |
52 }, | |
53 'change_datatype': { | |
54 'action_class': 'ChangeDatatypeAction', | |
55 'default': {}, | |
56 'arguments': lambda x: {'newtype': x}, | |
57 }, | |
58 'set_columns': { | |
59 'action_class': 'ColumnSetAction', | |
60 'default': {}, | |
61 'arguments': lambda x: x, | |
62 }, | |
63 'add_tags': { | |
64 'action_class': 'TagDatasetAction', | |
65 'default': [], | |
66 'arguments': lambda x: {'tags': ",".join(x)}, | |
67 }, | |
68 'remove_tags': { | |
69 'action_class': 'RemoveTagDatasetAction', | |
70 'default': [], | |
71 'arguments': lambda x: {'tags': ",".join(x)}, | |
72 }, | |
73 } | |
74 | |
75 log = logging.getLogger(__name__) | |
76 | |
77 | |
78 def rename_arg(argument): | |
79 return argument | |
80 | |
81 | |
82 def clean_connection(value): | |
83 if value and "#" in value and SUPPORT_LEGACY_CONNECTIONS: | |
84 # Hope these are just used by Galaxy testing workflows and such, and not in production workflows. | |
85 log.warn("Legacy workflow syntax for connections [%s] will not be supported in the future" % value) | |
86 value = value.replace("#", "/", 1) | |
87 else: | |
88 return value | |
89 | |
90 | |
91 class ImportOptions(object): | |
92 | |
93 def __init__(self): | |
94 self.deduplicate_subworkflows = False | |
95 | |
96 | |
97 def yaml_to_workflow(has_yaml, galaxy_interface, workflow_directory, import_options=None): | |
98 """Convert a Format 2 workflow into standard Galaxy format from supplied stream.""" | |
99 as_python = ordered_load(has_yaml) | |
100 return python_to_workflow(as_python, galaxy_interface, workflow_directory, import_options=import_options) | |
101 | |
102 | |
103 def python_to_workflow(as_python, galaxy_interface, workflow_directory=None, import_options=None): | |
104 """Convert a Format 2 workflow into standard Galaxy format from supplied dictionary.""" | |
105 if "yaml_content" in as_python: | |
106 as_python = ordered_load(as_python["yaml_content"]) | |
107 | |
108 if workflow_directory is None: | |
109 workflow_directory = os.path.abspath(".") | |
110 | |
111 conversion_context = ConversionContext( | |
112 galaxy_interface, | |
113 workflow_directory, | |
114 import_options, | |
115 ) | |
116 as_python = _preprocess_graphs(as_python, conversion_context) | |
117 subworkflows = None | |
118 if conversion_context.import_options.deduplicate_subworkflows: | |
119 # TODO: import only required workflows... | |
120 # TODO: dag sort these... | |
121 subworkflows = OrderedDict() | |
122 for graph_id, subworkflow_content in conversion_context.graph_ids.items(): | |
123 if graph_id == "main": | |
124 continue | |
125 subworkflow_conversion_context = conversion_context.get_subworkflow_conversion_context_graph("#" + graph_id) | |
126 subworkflows[graph_id] = _python_to_workflow(copy.deepcopy(subworkflow_content), subworkflow_conversion_context) | |
127 converted = _python_to_workflow(as_python, conversion_context) | |
128 if subworkflows is not None: | |
129 converted["subworkflows"] = subworkflows | |
130 return converted | |
131 | |
132 | |
133 # move to a utils file? | |
134 def steps_as_list(format2_workflow, add_label=True): | |
135 """Return steps as a list, converting ID map to list representation if needed.""" | |
136 steps = format2_workflow["steps"] | |
137 steps = _convert_dict_to_id_list_if_needed(steps, add_label=True) | |
138 return steps | |
139 | |
140 | |
141 def ensure_step_position(step, order_index): | |
142 """Ensure step contains a position definition.""" | |
143 if "position" not in step: | |
144 step["position"] = { | |
145 "left": 10 * order_index, | |
146 "top": 10 * order_index | |
147 } | |
148 | |
149 | |
150 def _outputs_as_list(as_python): | |
151 outputs = as_python.get("outputs", []) | |
152 outputs = _convert_dict_to_id_list_if_needed(outputs) | |
153 return outputs | |
154 | |
155 | |
156 def _python_to_workflow(as_python, conversion_context): | |
157 | |
158 if "class" not in as_python: | |
159 raise Exception("This is not a not a valid Galaxy workflow definition, must define a class.") | |
160 | |
161 if as_python["class"] != "GalaxyWorkflow": | |
162 raise Exception("This is not a not a valid Galaxy workflow definition, 'class' must be 'GalaxyWorkflow'.") | |
163 | |
164 # .ga files don't have this, drop it so it isn't interpreted as a format 2 workflow. | |
165 as_python.pop("class") | |
166 | |
167 _ensure_defaults(as_python, { | |
168 "a_galaxy_workflow": "true", | |
169 "format-version": "0.1", | |
170 "name": "Workflow", | |
171 "uuid": str(uuid.uuid4()), | |
172 }) | |
173 _populate_annotation(as_python) | |
174 | |
175 steps = steps_as_list(as_python) | |
176 | |
177 convert_inputs_to_steps(as_python, steps) | |
178 | |
179 if isinstance(steps, list): | |
180 steps_as_dict = OrderedDict() | |
181 for i, step in enumerate(steps): | |
182 steps_as_dict[str(i)] = step | |
183 if "id" not in step: | |
184 step["id"] = i | |
185 | |
186 if "label" in step: | |
187 label = step["label"] | |
188 conversion_context.labels[label] = i | |
189 | |
190 # TODO: this really should be optional in Galaxy API. | |
191 ensure_step_position(step, i) | |
192 | |
193 as_python["steps"] = steps_as_dict | |
194 steps = steps_as_dict | |
195 | |
196 for step in steps.values(): | |
197 step_type = step.get("type", None) | |
198 if "run" in step: | |
199 if step_type is not None: | |
200 raise Exception("Steps specified as run actions cannot specify a type.") | |
201 run_action = step.get("run") | |
202 run_action = conversion_context.get_runnable_description(run_action) | |
203 if isinstance(run_action, dict): | |
204 run_class = run_action["class"] | |
205 run_to_step_function = eval(RUN_ACTIONS_TO_STEPS[run_class]) | |
206 | |
207 run_to_step_function(conversion_context, step, run_action) | |
208 else: | |
209 step["content_id"] = run_action | |
210 step["type"] = "subworkflow" | |
211 del step["run"] | |
212 | |
213 for step in steps.values(): | |
214 step_type = step.get("type", "tool") | |
215 step_type = STEP_TYPE_ALIASES.get(step_type, step_type) | |
216 if step_type not in STEP_TYPES: | |
217 raise Exception("Unknown step type encountered %s" % step_type) | |
218 step["type"] = step_type | |
219 eval("transform_%s" % step_type)(conversion_context, step) | |
220 | |
221 outputs = as_python.get("outputs", []) | |
222 outputs = _convert_dict_to_id_list_if_needed(outputs) | |
223 | |
224 for output in outputs: | |
225 assert isinstance(output, dict), "Output definition must be dictionary" | |
226 assert "source" in output or "outputSource" in output, "Output definition must specify source" | |
227 | |
228 if "label" in output and "id" in output: | |
229 raise Exception("label and id are aliases for outputs, may only define one") | |
230 if "label" not in output and "id" not in output: | |
231 label = "" | |
232 | |
233 raw_label = output.pop("label", None) | |
234 raw_id = output.pop("id", None) | |
235 label = raw_label or raw_id | |
236 if Labels.is_anonymous_output_label(label): | |
237 label = None | |
238 source = clean_connection(output.get("outputSource")) | |
239 if source is None and SUPPORT_LEGACY_CONNECTIONS: | |
240 source = output.get("source").replace("#", "/", 1) | |
241 id, output_name = conversion_context.step_output(source) | |
242 step = steps[str(id)] | |
243 workflow_output = { | |
244 "output_name": output_name, | |
245 "label": label, | |
246 "uuid": output.get("uuid", None) | |
247 } | |
248 if "workflow_outputs" not in step: | |
249 step["workflow_outputs"] = [] | |
250 step["workflow_outputs"].append(workflow_output) | |
251 | |
252 return as_python | |
253 | |
254 | |
255 def _preprocess_graphs(as_python, conversion_context): | |
256 if not isinstance(as_python, dict): | |
257 raise Exception("This is not a not a valid Galaxy workflow definition.") | |
258 | |
259 format_version = as_python.get("format-version", "v2.0") | |
260 assert format_version == "v2.0" | |
261 | |
262 if "class" not in as_python and "$graph" in as_python: | |
263 for subworkflow in as_python["$graph"]: | |
264 if not isinstance(subworkflow, dict): | |
265 raise Exception("Malformed workflow content in $graph") | |
266 if "id" not in subworkflow: | |
267 raise Exception("No subworkflow ID found for entry in $graph.") | |
268 subworkflow_id = subworkflow["id"] | |
269 if subworkflow_id == "main": | |
270 as_python = subworkflow | |
271 | |
272 conversion_context.register_runnable(subworkflow) | |
273 | |
274 return as_python | |
275 | |
276 | |
277 def convert_inputs_to_steps(workflow_dict, steps): | |
278 """Convert workflow inputs to a steps in array - like in native Galaxy.""" | |
279 if "inputs" not in workflow_dict: | |
280 return | |
281 | |
282 inputs = workflow_dict.pop("inputs", []) | |
283 new_steps = [] | |
284 inputs = _convert_dict_to_id_list_if_needed(inputs) | |
285 for input_def_raw in inputs: | |
286 input_def = input_def_raw.copy() | |
287 | |
288 if "label" in input_def and "id" in input_def: | |
289 raise Exception("label and id are aliases for inputs, may only define one") | |
290 if "label" not in input_def and "id" not in input_def: | |
291 raise Exception("Input must define a label.") | |
292 | |
293 raw_label = input_def.pop("label", None) | |
294 raw_id = input_def.pop("id", None) | |
295 label = raw_label or raw_id | |
296 | |
297 if not label: | |
298 raise Exception("Input label must not be empty.") | |
299 | |
300 input_type = input_def.pop("type", "data") | |
301 if input_type in ["File", "data", "data_input"]: | |
302 step_type = "data_input" | |
303 elif input_type in ["collection", "data_collection", "data_collection_input"]: | |
304 step_type = "data_collection_input" | |
305 elif input_type in ["text", "integer", "float", "color", "boolean"]: | |
306 step_type = "parameter_input" | |
307 input_def["parameter_type"] = input_type | |
308 else: | |
309 raise Exception("Input type must be a data file or collection.") | |
310 | |
311 step_def = input_def | |
312 step_def.update({ | |
313 "type": step_type, | |
314 "label": label, | |
315 }) | |
316 new_steps.append(step_def) | |
317 | |
318 for i, new_step in enumerate(new_steps): | |
319 steps.insert(i, new_step) | |
320 | |
321 | |
322 def run_workflow_to_step(conversion_context, step, run_action): | |
323 step["type"] = "subworkflow" | |
324 if conversion_context.import_options.deduplicate_subworkflows and _is_graph_id_reference(run_action): | |
325 step["content_id"] = run_action | |
326 else: | |
327 subworkflow_conversion_context = conversion_context.get_subworkflow_conversion_context(step) | |
328 step["subworkflow"] = _python_to_workflow( | |
329 copy.deepcopy(run_action), | |
330 subworkflow_conversion_context, | |
331 ) | |
332 | |
333 | |
334 def _is_graph_id_reference(run_action): | |
335 return run_action and not isinstance(run_action, dict) | |
336 | |
337 | |
338 def transform_data_input(context, step): | |
339 transform_input(context, step, default_name="Input dataset") | |
340 | |
341 | |
342 def transform_data_collection_input(context, step): | |
343 transform_input(context, step, default_name="Input dataset collection") | |
344 | |
345 | |
346 def transform_parameter_input(context, step): | |
347 transform_input(context, step, default_name="input_parameter") | |
348 | |
349 | |
350 def transform_input(context, step, default_name): | |
351 default_name = step.get("label", default_name) | |
352 _populate_annotation(step) | |
353 _ensure_inputs_connections(step) | |
354 | |
355 if "inputs" not in step: | |
356 step["inputs"] = [{}] | |
357 | |
358 step_inputs = step["inputs"][0] | |
359 if "name" in step_inputs: | |
360 name = step_inputs["name"] | |
361 else: | |
362 name = default_name | |
363 | |
364 _ensure_defaults(step_inputs, { | |
365 "name": name, | |
366 "description": "", | |
367 }) | |
368 tool_state = { | |
369 "name": name | |
370 } | |
371 for attrib in ["collection_type", "parameter_type", "optional", "default", "format", "restrictions", "restrictOnConnections", "suggestions"]: | |
372 if attrib in step: | |
373 tool_state[attrib] = step[attrib] | |
374 | |
375 _populate_tool_state(step, tool_state) | |
376 | |
377 | |
378 def transform_pause(context, step, default_name="Pause for dataset review"): | |
379 default_name = step.get("label", default_name) | |
380 _populate_annotation(step) | |
381 | |
382 _ensure_inputs_connections(step) | |
383 | |
384 if "inputs" not in step: | |
385 step["inputs"] = [{}] | |
386 | |
387 step_inputs = step["inputs"][0] | |
388 if "name" in step_inputs: | |
389 name = step_inputs["name"] | |
390 else: | |
391 name = default_name | |
392 | |
393 _ensure_defaults(step_inputs, { | |
394 "name": name, | |
395 }) | |
396 tool_state = { | |
397 "name": name | |
398 } | |
399 | |
400 connect = _init_connect_dict(step) | |
401 _populate_input_connections(context, step, connect) | |
402 _populate_tool_state(step, tool_state) | |
403 | |
404 | |
405 def transform_subworkflow(context, step): | |
406 _populate_annotation(step) | |
407 | |
408 _ensure_inputs_connections(step) | |
409 | |
410 tool_state = { | |
411 } | |
412 | |
413 connect = _init_connect_dict(step) | |
414 _populate_input_connections(context, step, connect) | |
415 _populate_tool_state(step, tool_state) | |
416 | |
417 | |
418 def _runtime_value(): | |
419 return {"__class__": "RuntimeValue"} | |
420 | |
421 | |
422 def transform_tool(context, step): | |
423 if "tool_id" not in step: | |
424 raise Exception("Tool steps must define a tool_id.") | |
425 | |
426 _ensure_defaults(step, { | |
427 "name": step['tool_id'], | |
428 "post_job_actions": {}, | |
429 "tool_version": None, | |
430 }) | |
431 post_job_actions = step["post_job_actions"] | |
432 _populate_annotation(step) | |
433 | |
434 tool_state = { | |
435 # TODO: Galaxy should not require tool state actually specify a __page__. | |
436 "__page__": 0, | |
437 } | |
438 | |
439 connect = _init_connect_dict(step) | |
440 | |
441 def append_link(key, value): | |
442 if key not in connect: | |
443 connect[key] = [] | |
444 assert "$link" in value | |
445 link_value = value["$link"] | |
446 connect[key].append(clean_connection(link_value)) | |
447 | |
448 def replace_links(value, key=""): | |
449 if _is_link(value): | |
450 append_link(key, value) | |
451 # Filled in by the connection, so to force late | |
452 # validation of the field just mark as RuntimeValue. | |
453 # It would be better I guess if this were some other | |
454 # value dedicated to this purpose (e.g. a ficitious | |
455 # {"__class__": "ConnectedValue"}) that could be further | |
456 # validated by Galaxy. | |
457 return _runtime_value() | |
458 if isinstance(value, dict): | |
459 new_values = {} | |
460 for k, v in value.items(): | |
461 new_key = _join_prefix(key, k) | |
462 new_values[k] = replace_links(v, new_key) | |
463 return new_values | |
464 elif isinstance(value, list): | |
465 new_values = [] | |
466 for i, v in enumerate(value): | |
467 # If we are a repeat we need to modify the key | |
468 # but not if values are actually $links. | |
469 if _is_link(v): | |
470 append_link(key, v) | |
471 new_values.append(None) | |
472 else: | |
473 new_key = "%s_%d" % (key, i) | |
474 new_values.append(replace_links(v, new_key)) | |
475 return new_values | |
476 else: | |
477 return value | |
478 | |
479 # TODO: handle runtime inputs and state together. | |
480 runtime_inputs = step.get("runtime_inputs", []) | |
481 if "state" in step or runtime_inputs: | |
482 step_state = step.pop("state", {}) | |
483 step_state = replace_links(step_state) | |
484 | |
485 for key, value in step_state.items(): | |
486 tool_state[key] = json.dumps(value) | |
487 for runtime_input in runtime_inputs: | |
488 tool_state[runtime_input] = json.dumps(_runtime_value()) | |
489 elif "tool_state" in step: | |
490 tool_state.update(step.get("tool_state")) | |
491 | |
492 # Fill in input connections | |
493 _populate_input_connections(context, step, connect) | |
494 | |
495 _populate_tool_state(step, tool_state) | |
496 | |
497 # Handle outputs. | |
498 out = step.pop("out", None) | |
499 if out is None: | |
500 # Handle LEGACY 19.XX outputs key. | |
501 out = step.pop("outputs", []) | |
502 out = _convert_dict_to_id_list_if_needed(out) | |
503 for output in out: | |
504 name = output["id"] | |
505 for action_key, action_dict in POST_JOB_ACTIONS.items(): | |
506 action_argument = output.get(action_key, action_dict['default']) | |
507 if action_argument: | |
508 action_class = action_dict['action_class'] | |
509 action_name = action_class + name | |
510 action = _action( | |
511 action_class, | |
512 name, | |
513 arguments=action_dict['arguments'](action_argument) | |
514 ) | |
515 post_job_actions[action_name] = action | |
516 | |
517 | |
518 def run_tool_to_step(conversion_context, step, run_action): | |
519 tool_description = conversion_context.galaxy_interface.import_tool( | |
520 run_action | |
521 ) | |
522 step["type"] = "tool" | |
523 step["tool_id"] = tool_description["tool_id"] | |
524 step["tool_version"] = tool_description["tool_version"] | |
525 step["tool_hash"] = tool_description.get("tool_hash") | |
526 step["tool_uuid"] = tool_description.get("uuid") | |
527 | |
528 | |
529 class BaseConversionContext(object): | |
530 | |
531 def __init__(self): | |
532 self.labels = {} | |
533 self.subworkflow_conversion_contexts = {} | |
534 | |
535 def step_id(self, label_or_id): | |
536 if label_or_id in self.labels: | |
537 id = self.labels[label_or_id] | |
538 else: | |
539 id = label_or_id | |
540 return int(id) | |
541 | |
542 def step_output(self, value): | |
543 value_parts = str(value).split("/") | |
544 if len(value_parts) == 1: | |
545 value_parts.append("output") | |
546 id = self.step_id(value_parts[0]) | |
547 return id, value_parts[1] | |
548 | |
549 def get_subworkflow_conversion_context(self, step): | |
550 # TODO: sometimes this method takes format2 steps and some times converted native ones | |
551 # (for input connections) - redo this so the type signature is stronger. | |
552 step_id = step.get("id") | |
553 run_action = step.get("run") | |
554 if self.import_options.deduplicate_subworkflows and _is_graph_id_reference(run_action): | |
555 subworkflow_conversion_context = self.get_subworkflow_conversion_context_graph(run_action) | |
556 return subworkflow_conversion_context | |
557 if "content_id" in step: | |
558 subworkflow_conversion_context = self.get_subworkflow_conversion_context_graph(step["content_id"]) | |
559 return subworkflow_conversion_context | |
560 | |
561 if step_id not in self.subworkflow_conversion_contexts: | |
562 | |
563 subworkflow_conversion_context = SubworkflowConversionContext( | |
564 self | |
565 ) | |
566 self.subworkflow_conversion_contexts[step_id] = subworkflow_conversion_context | |
567 return self.subworkflow_conversion_contexts[step_id] | |
568 | |
569 def get_runnable_description(self, run_action): | |
570 if "@import" in run_action: | |
571 if len(run_action) > 1: | |
572 raise Exception("@import must be only key if present.") | |
573 | |
574 run_action_path = run_action["@import"] | |
575 runnable_path = os.path.join(self.workflow_directory, run_action_path) | |
576 with open(runnable_path, "r") as f: | |
577 runnable_description = ordered_load(f) | |
578 run_action = runnable_description | |
579 | |
580 if not self.import_options.deduplicate_subworkflows and _is_graph_id_reference(run_action): | |
581 run_action = self.graph_ids[run_action[1:]] | |
582 | |
583 return run_action | |
584 | |
585 | |
586 class ConversionContext(BaseConversionContext): | |
587 | |
588 def __init__(self, galaxy_interface, workflow_directory, import_options=None): | |
589 super(ConversionContext, self).__init__() | |
590 self.import_options = import_options or ImportOptions() | |
591 self.graph_ids = OrderedDict() | |
592 self.graph_id_subworkflow_conversion_contexts = {} | |
593 self.workflow_directory = workflow_directory | |
594 self.galaxy_interface = galaxy_interface | |
595 | |
596 def register_runnable(self, run_action): | |
597 assert "id" in run_action | |
598 self.graph_ids[run_action["id"]] = run_action | |
599 | |
600 def get_subworkflow_conversion_context_graph(self, graph_id): | |
601 if graph_id not in self.graph_id_subworkflow_conversion_contexts: | |
602 subworkflow_conversion_context = SubworkflowConversionContext( | |
603 self | |
604 ) | |
605 self.graph_id_subworkflow_conversion_contexts[graph_id] = subworkflow_conversion_context | |
606 return self.graph_id_subworkflow_conversion_contexts[graph_id] | |
607 | |
608 | |
609 class SubworkflowConversionContext(BaseConversionContext): | |
610 | |
611 def __init__(self, parent_context): | |
612 super(SubworkflowConversionContext, self).__init__() | |
613 self.parent_context = parent_context | |
614 | |
615 @property | |
616 def graph_ids(self): | |
617 return self.parent_context.graph_ids | |
618 | |
619 @property | |
620 def workflow_directory(self): | |
621 return self.parent_context.workflow_directory | |
622 | |
623 @property | |
624 def import_options(self): | |
625 return self.parent_context.import_options | |
626 | |
627 @property | |
628 def galaxy_interface(self): | |
629 return self.parent_context.galaxy_interface | |
630 | |
631 def get_subworkflow_conversion_context_graph(self, graph_id): | |
632 return self.parent_context.get_subworkflow_conversion_context_graph(graph_id) | |
633 | |
634 | |
635 def _action(type, name, arguments={}): | |
636 return { | |
637 "action_arguments": arguments, | |
638 "action_type": type, | |
639 "output_name": name, | |
640 } | |
641 | |
642 | |
643 def _is_link(value): | |
644 return isinstance(value, dict) and "$link" in value | |
645 | |
646 | |
647 def _join_prefix(prefix, key): | |
648 if prefix: | |
649 new_key = "%s|%s" % (prefix, key) | |
650 else: | |
651 new_key = key | |
652 return new_key | |
653 | |
654 | |
655 def _init_connect_dict(step): | |
656 if "connect" not in step: | |
657 step["connect"] = {} | |
658 | |
659 connect = step["connect"] | |
660 del step["connect"] | |
661 | |
662 # handle CWL-style in dict connections. | |
663 if "in" in step: | |
664 step_in = step["in"] | |
665 assert isinstance(step_in, dict) | |
666 connection_keys = set() | |
667 for key, value in step_in.items(): | |
668 # TODO: this can be a list right? | |
669 if isinstance(value, dict) and 'source' in value: | |
670 value = value["source"] | |
671 elif isinstance(value, dict) and 'default' in value: | |
672 continue | |
673 elif isinstance(value, dict): | |
674 raise KeyError('step input must define either source or default %s' % value) | |
675 connect[key] = [value] | |
676 connection_keys.add(key) | |
677 | |
678 for key in connection_keys: | |
679 del step_in[key] | |
680 | |
681 if len(step_in) == 0: | |
682 del step['in'] | |
683 | |
684 return connect | |
685 | |
686 | |
687 def _populate_input_connections(context, step, connect): | |
688 _ensure_inputs_connections(step) | |
689 input_connections = step["input_connections"] | |
690 is_subworkflow_step = step.get("type") == "subworkflow" | |
691 | |
692 for key, values in connect.items(): | |
693 input_connection_value = [] | |
694 if not isinstance(values, list): | |
695 values = [values] | |
696 for value in values: | |
697 if not isinstance(value, dict): | |
698 if key == "$step": | |
699 value += "/__NO_INPUT_OUTPUT_NAME__" | |
700 id, output_name = context.step_output(value) | |
701 value = {"id": id, "output_name": output_name} | |
702 if is_subworkflow_step: | |
703 subworkflow_conversion_context = context.get_subworkflow_conversion_context(step) | |
704 input_subworkflow_step_id = subworkflow_conversion_context.step_id(key) | |
705 value["input_subworkflow_step_id"] = input_subworkflow_step_id | |
706 input_connection_value.append(value) | |
707 if key == "$step": | |
708 key = "__NO_INPUT_OUTPUT_NAME__" | |
709 input_connections[key] = input_connection_value | |
710 | |
711 | |
712 def _populate_annotation(step): | |
713 if "annotation" not in step and "doc" in step: | |
714 annotation = step.pop("doc") | |
715 step["annotation"] = annotation | |
716 elif "annotation" not in step: | |
717 step["annotation"] = "" | |
718 | |
719 | |
720 def _ensure_inputs_connections(step): | |
721 if "input_connections" not in step: | |
722 step["input_connections"] = {} | |
723 | |
724 | |
725 def _ensure_defaults(in_dict, defaults): | |
726 for key, value in defaults.items(): | |
727 if key not in in_dict: | |
728 in_dict[key] = value | |
729 | |
730 | |
731 def _populate_tool_state(step, tool_state): | |
732 step["tool_state"] = json.dumps(tool_state) | |
733 | |
734 | |
735 def _convert_dict_to_id_list_if_needed(dict_or_list, add_label=False): | |
736 rval = dict_or_list | |
737 if isinstance(dict_or_list, dict): | |
738 rval = [] | |
739 for key, value in dict_or_list.items(): | |
740 if not isinstance(value, dict): | |
741 value = {"type": value} | |
742 if add_label: | |
743 value["label"] = key | |
744 else: | |
745 value["id"] = key | |
746 rval.append(value) | |
747 return rval | |
748 | |
749 | |
750 def main(argv): | |
751 print(json.dumps(yaml_to_workflow(argv[0]))) | |
752 | |
753 | |
754 if __name__ == "__main__": | |
755 main(sys.argv) | |
756 | |
757 __all__ = ( | |
758 'yaml_to_workflow', | |
759 'python_to_workflow', | |
760 ) |