Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/gxformat2/normalize.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
0:d30785e31577 | 1:56ad4e20f292 |
---|---|
1 """Abstractions for uniform across formats.""" | |
2 from gxformat2._scripts import ensure_format2 | |
3 from gxformat2._yaml import ordered_load | |
4 from gxformat2.converter import _outputs_as_list, convert_inputs_to_steps, steps_as_list | |
5 | |
6 NON_INPUT_TYPES = ["tool", "subworkflow", "pause"] | |
7 | |
8 | |
9 def steps_normalized(workflow_dict=None, workflow_path=None): | |
10 """Walk over a normalized step rep. across workflow formats.""" | |
11 workflow_dict = _ensure_format2(workflow_dict=workflow_dict, workflow_path=workflow_path) | |
12 steps = steps_as_list(workflow_dict) | |
13 convert_inputs_to_steps(workflow_dict, steps) | |
14 return steps | |
15 | |
16 | |
17 def inputs_normalized(**kwd): | |
18 """Call steps_normalized and retain just the input steps normalized.""" | |
19 steps = steps_normalized(**kwd) | |
20 input_steps = [] | |
21 for step in steps: | |
22 step_type = step.get("type") or 'tool' | |
23 if step_type in NON_INPUT_TYPES: | |
24 continue | |
25 | |
26 input_steps.append(step) | |
27 | |
28 return input_steps | |
29 | |
30 | |
31 def outputs_normalized(**kwd): | |
32 """Ensure Format2 and return outputs. | |
33 | |
34 Probably should go farther and normalize source -> outputSource, | |
35 but doesn't yet do this. | |
36 """ | |
37 workflow_dict = _ensure_format2(**kwd) | |
38 return _outputs_as_list(workflow_dict) | |
39 | |
40 | |
41 def walk_id_list_or_dict(dict_or_list): | |
42 """Walk over idmap regardless of list or dict representation.""" | |
43 if isinstance(dict_or_list, list): | |
44 for item in dict_or_list: | |
45 yield item["id"], item | |
46 else: | |
47 for item in dict_or_list.items(): | |
48 yield item | |
49 | |
50 | |
51 def ensure_implicit_step_outs(workflow_dict): | |
52 """Ensure implicit 'out' dicts allowed by format2 are filled in for CWL.""" | |
53 outputs_by_label = {} | |
54 | |
55 def register_step_output(step_label, output_name): | |
56 if step_label not in outputs_by_label: | |
57 outputs_by_label[step_label] = set() | |
58 outputs_by_label[step_label].add(output_name) | |
59 | |
60 def register_output_source(output_source): | |
61 if "/" in output_source: | |
62 step, output_name = output_source.split("/", 1) | |
63 register_step_output(step, output_name) | |
64 | |
65 for output_name, output in walk_id_list_or_dict(workflow_dict.get("outputs", {})): | |
66 if "outputSource" in output: | |
67 output_source = output["outputSource"] | |
68 if "/" in output_source: | |
69 step, output_name = output_source.split("/", 1) | |
70 register_step_output(step, output_name) | |
71 | |
72 for step in steps_as_list(workflow_dict): | |
73 step_in = step.get("in", {}) | |
74 for step_in_name, step_in_def in step_in.items(): | |
75 register_output_source(step_in_def) | |
76 | |
77 for step in steps_as_list(workflow_dict): | |
78 label = step["label"] | |
79 if "out" not in step: | |
80 step["out"] = [] | |
81 for out in outputs_by_label.get(label, []): | |
82 step_out = step["out"] | |
83 if isinstance(step_out, list): | |
84 if out not in step_out: | |
85 step_out.append(out) | |
86 else: | |
87 step_out[out] = {} | |
88 | |
89 | |
90 def _ensure_format2(workflow_dict=None, workflow_path=None): | |
91 if workflow_path is not None: | |
92 assert workflow_dict is None | |
93 with open(workflow_path, "r") as f: | |
94 workflow_dict = ordered_load(f) | |
95 | |
96 workflow_dict = ensure_format2(workflow_dict) | |
97 return workflow_dict |