comparison planemo/lib/python3.7/site-packages/gxformat2/converter.py @ 1:56ad4e20f292 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:32:28 -0400
parents
children
comparison
equal deleted inserted replaced
0:d30785e31577 1:56ad4e20f292
1 """Functionality for converting a Format 2 workflow into a standard Galaxy workflow."""
2 from __future__ import print_function
3
4 import copy
5 import json
6 import logging
7 import os
8 import sys
9 import uuid
10 from collections import OrderedDict
11
12 from ._labels import Labels
13 from ._yaml import ordered_load
14
15 # source: step#output and $link: step#output instead of outputSource: step/output and $link: step/output
16 SUPPORT_LEGACY_CONNECTIONS = os.environ.get("GXFORMAT2_SUPPORT_LEGACY_CONNECTIONS") == "1"
17 STEP_TYPES = [
18 "subworkflow",
19 "data_input",
20 "data_collection_input",
21 "tool",
22 "pause",
23 "parameter_input",
24 ]
25
26 STEP_TYPE_ALIASES = {
27 'input': 'data_input',
28 'input_collection': 'data_collection_input',
29 'parameter': 'parameter_input',
30 }
31
32 RUN_ACTIONS_TO_STEPS = {
33 'GalaxyWorkflow': 'run_workflow_to_step',
34 'GalaxyTool': 'run_tool_to_step',
35 }
36
37 POST_JOB_ACTIONS = {
38 'hide': {
39 'action_class': "HideDatasetAction",
40 'default': False,
41 'arguments': lambda x: x,
42 },
43 'rename': {
44 'action_class': 'RenameDatasetAction',
45 'default': {},
46 'arguments': lambda x: {'newname': x},
47 },
48 'delete_intermediate_datasets': {
49 'action_class': 'DeleteIntermediatesAction',
50 'default': False,
51 'arguments': lambda x: x,
52 },
53 'change_datatype': {
54 'action_class': 'ChangeDatatypeAction',
55 'default': {},
56 'arguments': lambda x: {'newtype': x},
57 },
58 'set_columns': {
59 'action_class': 'ColumnSetAction',
60 'default': {},
61 'arguments': lambda x: x,
62 },
63 'add_tags': {
64 'action_class': 'TagDatasetAction',
65 'default': [],
66 'arguments': lambda x: {'tags': ",".join(x)},
67 },
68 'remove_tags': {
69 'action_class': 'RemoveTagDatasetAction',
70 'default': [],
71 'arguments': lambda x: {'tags': ",".join(x)},
72 },
73 }
74
75 log = logging.getLogger(__name__)
76
77
78 def rename_arg(argument):
79 return argument
80
81
82 def clean_connection(value):
83 if value and "#" in value and SUPPORT_LEGACY_CONNECTIONS:
84 # Hope these are just used by Galaxy testing workflows and such, and not in production workflows.
85 log.warn("Legacy workflow syntax for connections [%s] will not be supported in the future" % value)
86 value = value.replace("#", "/", 1)
87 else:
88 return value
89
90
91 class ImportOptions(object):
92
93 def __init__(self):
94 self.deduplicate_subworkflows = False
95
96
97 def yaml_to_workflow(has_yaml, galaxy_interface, workflow_directory, import_options=None):
98 """Convert a Format 2 workflow into standard Galaxy format from supplied stream."""
99 as_python = ordered_load(has_yaml)
100 return python_to_workflow(as_python, galaxy_interface, workflow_directory, import_options=import_options)
101
102
103 def python_to_workflow(as_python, galaxy_interface, workflow_directory=None, import_options=None):
104 """Convert a Format 2 workflow into standard Galaxy format from supplied dictionary."""
105 if "yaml_content" in as_python:
106 as_python = ordered_load(as_python["yaml_content"])
107
108 if workflow_directory is None:
109 workflow_directory = os.path.abspath(".")
110
111 conversion_context = ConversionContext(
112 galaxy_interface,
113 workflow_directory,
114 import_options,
115 )
116 as_python = _preprocess_graphs(as_python, conversion_context)
117 subworkflows = None
118 if conversion_context.import_options.deduplicate_subworkflows:
119 # TODO: import only required workflows...
120 # TODO: dag sort these...
121 subworkflows = OrderedDict()
122 for graph_id, subworkflow_content in conversion_context.graph_ids.items():
123 if graph_id == "main":
124 continue
125 subworkflow_conversion_context = conversion_context.get_subworkflow_conversion_context_graph("#" + graph_id)
126 subworkflows[graph_id] = _python_to_workflow(copy.deepcopy(subworkflow_content), subworkflow_conversion_context)
127 converted = _python_to_workflow(as_python, conversion_context)
128 if subworkflows is not None:
129 converted["subworkflows"] = subworkflows
130 return converted
131
132
133 # move to a utils file?
134 def steps_as_list(format2_workflow, add_label=True):
135 """Return steps as a list, converting ID map to list representation if needed."""
136 steps = format2_workflow["steps"]
137 steps = _convert_dict_to_id_list_if_needed(steps, add_label=True)
138 return steps
139
140
141 def ensure_step_position(step, order_index):
142 """Ensure step contains a position definition."""
143 if "position" not in step:
144 step["position"] = {
145 "left": 10 * order_index,
146 "top": 10 * order_index
147 }
148
149
150 def _outputs_as_list(as_python):
151 outputs = as_python.get("outputs", [])
152 outputs = _convert_dict_to_id_list_if_needed(outputs)
153 return outputs
154
155
156 def _python_to_workflow(as_python, conversion_context):
157
158 if "class" not in as_python:
159 raise Exception("This is not a not a valid Galaxy workflow definition, must define a class.")
160
161 if as_python["class"] != "GalaxyWorkflow":
162 raise Exception("This is not a not a valid Galaxy workflow definition, 'class' must be 'GalaxyWorkflow'.")
163
164 # .ga files don't have this, drop it so it isn't interpreted as a format 2 workflow.
165 as_python.pop("class")
166
167 _ensure_defaults(as_python, {
168 "a_galaxy_workflow": "true",
169 "format-version": "0.1",
170 "name": "Workflow",
171 "uuid": str(uuid.uuid4()),
172 })
173 _populate_annotation(as_python)
174
175 steps = steps_as_list(as_python)
176
177 convert_inputs_to_steps(as_python, steps)
178
179 if isinstance(steps, list):
180 steps_as_dict = OrderedDict()
181 for i, step in enumerate(steps):
182 steps_as_dict[str(i)] = step
183 if "id" not in step:
184 step["id"] = i
185
186 if "label" in step:
187 label = step["label"]
188 conversion_context.labels[label] = i
189
190 # TODO: this really should be optional in Galaxy API.
191 ensure_step_position(step, i)
192
193 as_python["steps"] = steps_as_dict
194 steps = steps_as_dict
195
196 for step in steps.values():
197 step_type = step.get("type", None)
198 if "run" in step:
199 if step_type is not None:
200 raise Exception("Steps specified as run actions cannot specify a type.")
201 run_action = step.get("run")
202 run_action = conversion_context.get_runnable_description(run_action)
203 if isinstance(run_action, dict):
204 run_class = run_action["class"]
205 run_to_step_function = eval(RUN_ACTIONS_TO_STEPS[run_class])
206
207 run_to_step_function(conversion_context, step, run_action)
208 else:
209 step["content_id"] = run_action
210 step["type"] = "subworkflow"
211 del step["run"]
212
213 for step in steps.values():
214 step_type = step.get("type", "tool")
215 step_type = STEP_TYPE_ALIASES.get(step_type, step_type)
216 if step_type not in STEP_TYPES:
217 raise Exception("Unknown step type encountered %s" % step_type)
218 step["type"] = step_type
219 eval("transform_%s" % step_type)(conversion_context, step)
220
221 outputs = as_python.get("outputs", [])
222 outputs = _convert_dict_to_id_list_if_needed(outputs)
223
224 for output in outputs:
225 assert isinstance(output, dict), "Output definition must be dictionary"
226 assert "source" in output or "outputSource" in output, "Output definition must specify source"
227
228 if "label" in output and "id" in output:
229 raise Exception("label and id are aliases for outputs, may only define one")
230 if "label" not in output and "id" not in output:
231 label = ""
232
233 raw_label = output.pop("label", None)
234 raw_id = output.pop("id", None)
235 label = raw_label or raw_id
236 if Labels.is_anonymous_output_label(label):
237 label = None
238 source = clean_connection(output.get("outputSource"))
239 if source is None and SUPPORT_LEGACY_CONNECTIONS:
240 source = output.get("source").replace("#", "/", 1)
241 id, output_name = conversion_context.step_output(source)
242 step = steps[str(id)]
243 workflow_output = {
244 "output_name": output_name,
245 "label": label,
246 "uuid": output.get("uuid", None)
247 }
248 if "workflow_outputs" not in step:
249 step["workflow_outputs"] = []
250 step["workflow_outputs"].append(workflow_output)
251
252 return as_python
253
254
255 def _preprocess_graphs(as_python, conversion_context):
256 if not isinstance(as_python, dict):
257 raise Exception("This is not a not a valid Galaxy workflow definition.")
258
259 format_version = as_python.get("format-version", "v2.0")
260 assert format_version == "v2.0"
261
262 if "class" not in as_python and "$graph" in as_python:
263 for subworkflow in as_python["$graph"]:
264 if not isinstance(subworkflow, dict):
265 raise Exception("Malformed workflow content in $graph")
266 if "id" not in subworkflow:
267 raise Exception("No subworkflow ID found for entry in $graph.")
268 subworkflow_id = subworkflow["id"]
269 if subworkflow_id == "main":
270 as_python = subworkflow
271
272 conversion_context.register_runnable(subworkflow)
273
274 return as_python
275
276
277 def convert_inputs_to_steps(workflow_dict, steps):
278 """Convert workflow inputs to a steps in array - like in native Galaxy."""
279 if "inputs" not in workflow_dict:
280 return
281
282 inputs = workflow_dict.pop("inputs", [])
283 new_steps = []
284 inputs = _convert_dict_to_id_list_if_needed(inputs)
285 for input_def_raw in inputs:
286 input_def = input_def_raw.copy()
287
288 if "label" in input_def and "id" in input_def:
289 raise Exception("label and id are aliases for inputs, may only define one")
290 if "label" not in input_def and "id" not in input_def:
291 raise Exception("Input must define a label.")
292
293 raw_label = input_def.pop("label", None)
294 raw_id = input_def.pop("id", None)
295 label = raw_label or raw_id
296
297 if not label:
298 raise Exception("Input label must not be empty.")
299
300 input_type = input_def.pop("type", "data")
301 if input_type in ["File", "data", "data_input"]:
302 step_type = "data_input"
303 elif input_type in ["collection", "data_collection", "data_collection_input"]:
304 step_type = "data_collection_input"
305 elif input_type in ["text", "integer", "float", "color", "boolean"]:
306 step_type = "parameter_input"
307 input_def["parameter_type"] = input_type
308 else:
309 raise Exception("Input type must be a data file or collection.")
310
311 step_def = input_def
312 step_def.update({
313 "type": step_type,
314 "label": label,
315 })
316 new_steps.append(step_def)
317
318 for i, new_step in enumerate(new_steps):
319 steps.insert(i, new_step)
320
321
322 def run_workflow_to_step(conversion_context, step, run_action):
323 step["type"] = "subworkflow"
324 if conversion_context.import_options.deduplicate_subworkflows and _is_graph_id_reference(run_action):
325 step["content_id"] = run_action
326 else:
327 subworkflow_conversion_context = conversion_context.get_subworkflow_conversion_context(step)
328 step["subworkflow"] = _python_to_workflow(
329 copy.deepcopy(run_action),
330 subworkflow_conversion_context,
331 )
332
333
334 def _is_graph_id_reference(run_action):
335 return run_action and not isinstance(run_action, dict)
336
337
338 def transform_data_input(context, step):
339 transform_input(context, step, default_name="Input dataset")
340
341
342 def transform_data_collection_input(context, step):
343 transform_input(context, step, default_name="Input dataset collection")
344
345
346 def transform_parameter_input(context, step):
347 transform_input(context, step, default_name="input_parameter")
348
349
350 def transform_input(context, step, default_name):
351 default_name = step.get("label", default_name)
352 _populate_annotation(step)
353 _ensure_inputs_connections(step)
354
355 if "inputs" not in step:
356 step["inputs"] = [{}]
357
358 step_inputs = step["inputs"][0]
359 if "name" in step_inputs:
360 name = step_inputs["name"]
361 else:
362 name = default_name
363
364 _ensure_defaults(step_inputs, {
365 "name": name,
366 "description": "",
367 })
368 tool_state = {
369 "name": name
370 }
371 for attrib in ["collection_type", "parameter_type", "optional", "default", "format", "restrictions", "restrictOnConnections", "suggestions"]:
372 if attrib in step:
373 tool_state[attrib] = step[attrib]
374
375 _populate_tool_state(step, tool_state)
376
377
378 def transform_pause(context, step, default_name="Pause for dataset review"):
379 default_name = step.get("label", default_name)
380 _populate_annotation(step)
381
382 _ensure_inputs_connections(step)
383
384 if "inputs" not in step:
385 step["inputs"] = [{}]
386
387 step_inputs = step["inputs"][0]
388 if "name" in step_inputs:
389 name = step_inputs["name"]
390 else:
391 name = default_name
392
393 _ensure_defaults(step_inputs, {
394 "name": name,
395 })
396 tool_state = {
397 "name": name
398 }
399
400 connect = _init_connect_dict(step)
401 _populate_input_connections(context, step, connect)
402 _populate_tool_state(step, tool_state)
403
404
405 def transform_subworkflow(context, step):
406 _populate_annotation(step)
407
408 _ensure_inputs_connections(step)
409
410 tool_state = {
411 }
412
413 connect = _init_connect_dict(step)
414 _populate_input_connections(context, step, connect)
415 _populate_tool_state(step, tool_state)
416
417
418 def _runtime_value():
419 return {"__class__": "RuntimeValue"}
420
421
422 def transform_tool(context, step):
423 if "tool_id" not in step:
424 raise Exception("Tool steps must define a tool_id.")
425
426 _ensure_defaults(step, {
427 "name": step['tool_id'],
428 "post_job_actions": {},
429 "tool_version": None,
430 })
431 post_job_actions = step["post_job_actions"]
432 _populate_annotation(step)
433
434 tool_state = {
435 # TODO: Galaxy should not require tool state actually specify a __page__.
436 "__page__": 0,
437 }
438
439 connect = _init_connect_dict(step)
440
441 def append_link(key, value):
442 if key not in connect:
443 connect[key] = []
444 assert "$link" in value
445 link_value = value["$link"]
446 connect[key].append(clean_connection(link_value))
447
448 def replace_links(value, key=""):
449 if _is_link(value):
450 append_link(key, value)
451 # Filled in by the connection, so to force late
452 # validation of the field just mark as RuntimeValue.
453 # It would be better I guess if this were some other
454 # value dedicated to this purpose (e.g. a ficitious
455 # {"__class__": "ConnectedValue"}) that could be further
456 # validated by Galaxy.
457 return _runtime_value()
458 if isinstance(value, dict):
459 new_values = {}
460 for k, v in value.items():
461 new_key = _join_prefix(key, k)
462 new_values[k] = replace_links(v, new_key)
463 return new_values
464 elif isinstance(value, list):
465 new_values = []
466 for i, v in enumerate(value):
467 # If we are a repeat we need to modify the key
468 # but not if values are actually $links.
469 if _is_link(v):
470 append_link(key, v)
471 new_values.append(None)
472 else:
473 new_key = "%s_%d" % (key, i)
474 new_values.append(replace_links(v, new_key))
475 return new_values
476 else:
477 return value
478
479 # TODO: handle runtime inputs and state together.
480 runtime_inputs = step.get("runtime_inputs", [])
481 if "state" in step or runtime_inputs:
482 step_state = step.pop("state", {})
483 step_state = replace_links(step_state)
484
485 for key, value in step_state.items():
486 tool_state[key] = json.dumps(value)
487 for runtime_input in runtime_inputs:
488 tool_state[runtime_input] = json.dumps(_runtime_value())
489 elif "tool_state" in step:
490 tool_state.update(step.get("tool_state"))
491
492 # Fill in input connections
493 _populate_input_connections(context, step, connect)
494
495 _populate_tool_state(step, tool_state)
496
497 # Handle outputs.
498 out = step.pop("out", None)
499 if out is None:
500 # Handle LEGACY 19.XX outputs key.
501 out = step.pop("outputs", [])
502 out = _convert_dict_to_id_list_if_needed(out)
503 for output in out:
504 name = output["id"]
505 for action_key, action_dict in POST_JOB_ACTIONS.items():
506 action_argument = output.get(action_key, action_dict['default'])
507 if action_argument:
508 action_class = action_dict['action_class']
509 action_name = action_class + name
510 action = _action(
511 action_class,
512 name,
513 arguments=action_dict['arguments'](action_argument)
514 )
515 post_job_actions[action_name] = action
516
517
518 def run_tool_to_step(conversion_context, step, run_action):
519 tool_description = conversion_context.galaxy_interface.import_tool(
520 run_action
521 )
522 step["type"] = "tool"
523 step["tool_id"] = tool_description["tool_id"]
524 step["tool_version"] = tool_description["tool_version"]
525 step["tool_hash"] = tool_description.get("tool_hash")
526 step["tool_uuid"] = tool_description.get("uuid")
527
528
529 class BaseConversionContext(object):
530
531 def __init__(self):
532 self.labels = {}
533 self.subworkflow_conversion_contexts = {}
534
535 def step_id(self, label_or_id):
536 if label_or_id in self.labels:
537 id = self.labels[label_or_id]
538 else:
539 id = label_or_id
540 return int(id)
541
542 def step_output(self, value):
543 value_parts = str(value).split("/")
544 if len(value_parts) == 1:
545 value_parts.append("output")
546 id = self.step_id(value_parts[0])
547 return id, value_parts[1]
548
549 def get_subworkflow_conversion_context(self, step):
550 # TODO: sometimes this method takes format2 steps and some times converted native ones
551 # (for input connections) - redo this so the type signature is stronger.
552 step_id = step.get("id")
553 run_action = step.get("run")
554 if self.import_options.deduplicate_subworkflows and _is_graph_id_reference(run_action):
555 subworkflow_conversion_context = self.get_subworkflow_conversion_context_graph(run_action)
556 return subworkflow_conversion_context
557 if "content_id" in step:
558 subworkflow_conversion_context = self.get_subworkflow_conversion_context_graph(step["content_id"])
559 return subworkflow_conversion_context
560
561 if step_id not in self.subworkflow_conversion_contexts:
562
563 subworkflow_conversion_context = SubworkflowConversionContext(
564 self
565 )
566 self.subworkflow_conversion_contexts[step_id] = subworkflow_conversion_context
567 return self.subworkflow_conversion_contexts[step_id]
568
569 def get_runnable_description(self, run_action):
570 if "@import" in run_action:
571 if len(run_action) > 1:
572 raise Exception("@import must be only key if present.")
573
574 run_action_path = run_action["@import"]
575 runnable_path = os.path.join(self.workflow_directory, run_action_path)
576 with open(runnable_path, "r") as f:
577 runnable_description = ordered_load(f)
578 run_action = runnable_description
579
580 if not self.import_options.deduplicate_subworkflows and _is_graph_id_reference(run_action):
581 run_action = self.graph_ids[run_action[1:]]
582
583 return run_action
584
585
586 class ConversionContext(BaseConversionContext):
587
588 def __init__(self, galaxy_interface, workflow_directory, import_options=None):
589 super(ConversionContext, self).__init__()
590 self.import_options = import_options or ImportOptions()
591 self.graph_ids = OrderedDict()
592 self.graph_id_subworkflow_conversion_contexts = {}
593 self.workflow_directory = workflow_directory
594 self.galaxy_interface = galaxy_interface
595
596 def register_runnable(self, run_action):
597 assert "id" in run_action
598 self.graph_ids[run_action["id"]] = run_action
599
600 def get_subworkflow_conversion_context_graph(self, graph_id):
601 if graph_id not in self.graph_id_subworkflow_conversion_contexts:
602 subworkflow_conversion_context = SubworkflowConversionContext(
603 self
604 )
605 self.graph_id_subworkflow_conversion_contexts[graph_id] = subworkflow_conversion_context
606 return self.graph_id_subworkflow_conversion_contexts[graph_id]
607
608
609 class SubworkflowConversionContext(BaseConversionContext):
610
611 def __init__(self, parent_context):
612 super(SubworkflowConversionContext, self).__init__()
613 self.parent_context = parent_context
614
615 @property
616 def graph_ids(self):
617 return self.parent_context.graph_ids
618
619 @property
620 def workflow_directory(self):
621 return self.parent_context.workflow_directory
622
623 @property
624 def import_options(self):
625 return self.parent_context.import_options
626
627 @property
628 def galaxy_interface(self):
629 return self.parent_context.galaxy_interface
630
631 def get_subworkflow_conversion_context_graph(self, graph_id):
632 return self.parent_context.get_subworkflow_conversion_context_graph(graph_id)
633
634
635 def _action(type, name, arguments={}):
636 return {
637 "action_arguments": arguments,
638 "action_type": type,
639 "output_name": name,
640 }
641
642
643 def _is_link(value):
644 return isinstance(value, dict) and "$link" in value
645
646
647 def _join_prefix(prefix, key):
648 if prefix:
649 new_key = "%s|%s" % (prefix, key)
650 else:
651 new_key = key
652 return new_key
653
654
655 def _init_connect_dict(step):
656 if "connect" not in step:
657 step["connect"] = {}
658
659 connect = step["connect"]
660 del step["connect"]
661
662 # handle CWL-style in dict connections.
663 if "in" in step:
664 step_in = step["in"]
665 assert isinstance(step_in, dict)
666 connection_keys = set()
667 for key, value in step_in.items():
668 # TODO: this can be a list right?
669 if isinstance(value, dict) and 'source' in value:
670 value = value["source"]
671 elif isinstance(value, dict) and 'default' in value:
672 continue
673 elif isinstance(value, dict):
674 raise KeyError('step input must define either source or default %s' % value)
675 connect[key] = [value]
676 connection_keys.add(key)
677
678 for key in connection_keys:
679 del step_in[key]
680
681 if len(step_in) == 0:
682 del step['in']
683
684 return connect
685
686
687 def _populate_input_connections(context, step, connect):
688 _ensure_inputs_connections(step)
689 input_connections = step["input_connections"]
690 is_subworkflow_step = step.get("type") == "subworkflow"
691
692 for key, values in connect.items():
693 input_connection_value = []
694 if not isinstance(values, list):
695 values = [values]
696 for value in values:
697 if not isinstance(value, dict):
698 if key == "$step":
699 value += "/__NO_INPUT_OUTPUT_NAME__"
700 id, output_name = context.step_output(value)
701 value = {"id": id, "output_name": output_name}
702 if is_subworkflow_step:
703 subworkflow_conversion_context = context.get_subworkflow_conversion_context(step)
704 input_subworkflow_step_id = subworkflow_conversion_context.step_id(key)
705 value["input_subworkflow_step_id"] = input_subworkflow_step_id
706 input_connection_value.append(value)
707 if key == "$step":
708 key = "__NO_INPUT_OUTPUT_NAME__"
709 input_connections[key] = input_connection_value
710
711
712 def _populate_annotation(step):
713 if "annotation" not in step and "doc" in step:
714 annotation = step.pop("doc")
715 step["annotation"] = annotation
716 elif "annotation" not in step:
717 step["annotation"] = ""
718
719
720 def _ensure_inputs_connections(step):
721 if "input_connections" not in step:
722 step["input_connections"] = {}
723
724
725 def _ensure_defaults(in_dict, defaults):
726 for key, value in defaults.items():
727 if key not in in_dict:
728 in_dict[key] = value
729
730
731 def _populate_tool_state(step, tool_state):
732 step["tool_state"] = json.dumps(tool_state)
733
734
735 def _convert_dict_to_id_list_if_needed(dict_or_list, add_label=False):
736 rval = dict_or_list
737 if isinstance(dict_or_list, dict):
738 rval = []
739 for key, value in dict_or_list.items():
740 if not isinstance(value, dict):
741 value = {"type": value}
742 if add_label:
743 value["label"] = key
744 else:
745 value["id"] = key
746 rval.append(value)
747 return rval
748
749
750 def main(argv):
751 print(json.dumps(yaml_to_workflow(argv[0])))
752
753
754 if __name__ == "__main__":
755 main(sys.argv)
756
757 __all__ = (
758 'yaml_to_workflow',
759 'python_to_workflow',
760 )