comparison planemo/lib/python3.7/site-packages/gxformat2/normalize.py @ 1:56ad4e20f292 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:32:28 -0400
parents
children
comparison
equal deleted inserted replaced
0:d30785e31577 1:56ad4e20f292
1 """Abstractions for uniform across formats."""
2 from gxformat2._scripts import ensure_format2
3 from gxformat2._yaml import ordered_load
4 from gxformat2.converter import _outputs_as_list, convert_inputs_to_steps, steps_as_list
5
6 NON_INPUT_TYPES = ["tool", "subworkflow", "pause"]
7
8
9 def steps_normalized(workflow_dict=None, workflow_path=None):
10 """Walk over a normalized step rep. across workflow formats."""
11 workflow_dict = _ensure_format2(workflow_dict=workflow_dict, workflow_path=workflow_path)
12 steps = steps_as_list(workflow_dict)
13 convert_inputs_to_steps(workflow_dict, steps)
14 return steps
15
16
17 def inputs_normalized(**kwd):
18 """Call steps_normalized and retain just the input steps normalized."""
19 steps = steps_normalized(**kwd)
20 input_steps = []
21 for step in steps:
22 step_type = step.get("type") or 'tool'
23 if step_type in NON_INPUT_TYPES:
24 continue
25
26 input_steps.append(step)
27
28 return input_steps
29
30
31 def outputs_normalized(**kwd):
32 """Ensure Format2 and return outputs.
33
34 Probably should go farther and normalize source -> outputSource,
35 but doesn't yet do this.
36 """
37 workflow_dict = _ensure_format2(**kwd)
38 return _outputs_as_list(workflow_dict)
39
40
41 def walk_id_list_or_dict(dict_or_list):
42 """Walk over idmap regardless of list or dict representation."""
43 if isinstance(dict_or_list, list):
44 for item in dict_or_list:
45 yield item["id"], item
46 else:
47 for item in dict_or_list.items():
48 yield item
49
50
51 def ensure_implicit_step_outs(workflow_dict):
52 """Ensure implicit 'out' dicts allowed by format2 are filled in for CWL."""
53 outputs_by_label = {}
54
55 def register_step_output(step_label, output_name):
56 if step_label not in outputs_by_label:
57 outputs_by_label[step_label] = set()
58 outputs_by_label[step_label].add(output_name)
59
60 def register_output_source(output_source):
61 if "/" in output_source:
62 step, output_name = output_source.split("/", 1)
63 register_step_output(step, output_name)
64
65 for output_name, output in walk_id_list_or_dict(workflow_dict.get("outputs", {})):
66 if "outputSource" in output:
67 output_source = output["outputSource"]
68 if "/" in output_source:
69 step, output_name = output_source.split("/", 1)
70 register_step_output(step, output_name)
71
72 for step in steps_as_list(workflow_dict):
73 step_in = step.get("in", {})
74 for step_in_name, step_in_def in step_in.items():
75 register_output_source(step_in_def)
76
77 for step in steps_as_list(workflow_dict):
78 label = step["label"]
79 if "out" not in step:
80 step["out"] = []
81 for out in outputs_by_label.get(label, []):
82 step_out = step["out"]
83 if isinstance(step_out, list):
84 if out not in step_out:
85 step_out.append(out)
86 else:
87 step_out[out] = {}
88
89
90 def _ensure_format2(workflow_dict=None, workflow_path=None):
91 if workflow_path is not None:
92 assert workflow_dict is None
93 with open(workflow_path, "r") as f:
94 workflow_dict = ordered_load(f)
95
96 workflow_dict = ensure_format2(workflow_dict)
97 return workflow_dict