comparison env/lib/python3.9/site-packages/gxformat2/abstract.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """Module for exporting Galaxy workflows to CWL abstract interface."""
2 import argparse
3 import sys
4 from typing import Any, Dict
5
6 from gxformat2._scripts import ensure_format2
7 from gxformat2.converter import steps_as_list
8 from gxformat2.normalize import NormalizedWorkflow, walk_id_list_or_dict
9 from gxformat2.yaml import ordered_dump_to_path, ordered_load
10
11 CWL_VERSION = "v1.2"
12
13 SCRIPT_DESCRIPTION = """
14 This script converts the an executable Galaxy workflow (in either format -
15 Format 2 or native .ga) into an abstract CWL representation.
16
17 In order to represent Galaxy tool executions in the Common Workflow Language
18 workflow language, they are serialized as v1.2+ abstract 'Operation' classes.
19 Because abstract 'Operation' classes are used, the resulting CWL workflow is
20 not executable - either in Galaxy or by CWL implementations. The resulting CWL
21 file should be thought of more as a common metadata specification describing
22 the workflow structure.
23 """
24
25
26 def from_dict(workflow_dict: dict, subworkflow=False):
27 """Convert dictified Galaxy workflow into abstract CWL representation."""
28 # TODO: pass some sort of flag to ensure_format2 to make sure information
29 # about step outputs that may be present in native format is not lost when
30 # converting to Format2.
31 workflow_dict = ensure_format2(workflow_dict)
32 normalized_workflow = NormalizedWorkflow(workflow_dict)
33 workflow_dict = normalized_workflow.normalized_workflow_dict
34
35 requirements = {} # type: Dict[str, Any]
36 abstract_dict = {
37 'class': 'Workflow',
38 } # type: Dict[str, Any]
39 if not subworkflow:
40 abstract_dict["cwlVersion"] = CWL_VERSION
41 # inputs and outputs already mostly in CWL format...
42
43 # TODO: add test case where format2 input without inputs declaration is used
44 abstract_dict["inputs"] = _format2_inputs_to_abstract(workflow_dict.get("inputs", {}))
45 abstract_dict["outputs"] = _format2_outputs_to_abstract(workflow_dict.get("outputs", {}))
46 steps = {}
47 for format2_step in steps_as_list(workflow_dict, add_ids=True, inputs_offset=len(abstract_dict["inputs"]), mutate=False):
48 label = format2_step.get("label") or format2_step.get("id")
49 assert label is not None
50 label = str(label)
51 steps[label] = _format2_step_to_abstract(format2_step, requirements=requirements)
52
53 abstract_dict["steps"] = steps
54 if requirements:
55 abstract_dict['requirements'] = requirements
56 return abstract_dict
57
58
59 def _format2_step_to_abstract(format2_step, requirements):
60 """Convert Format2 step CWL 1.2+ abstract operation."""
61 abstract_step = {}
62 if "run" in format2_step:
63 # probably encountered in subworkflow.
64 format2_run = format2_step["run"]
65 format2_run_class = format2_run["class"]
66 requirements["SubworkflowFeatureRequirement"] = {}
67 if format2_run_class == "GalaxyWorkflow":
68 # preprocess to ensure it has outs - should the original call be recursive?
69 step_run = from_dict(format2_run, subworkflow=True)
70 abstract_step["run"] = step_run
71 else:
72 raise NotImplementedError("Unknown runnabled type encountered [%s]" % format2_run_class)
73 else:
74 step_run = {
75 "class": "Operation",
76 "doc": format2_step.get("doc", ""),
77 "inputs": {}, # TODO
78 "outputs": {}, # TODO
79 }
80 abstract_step["run"] = step_run
81 abstract_step["in"] = _format2_in_to_abstract(format2_step.get("in", []))
82 abstract_step["out"] = _format2_out_to_abstract(format2_step)
83 return abstract_step
84
85
86 def _format2_in_to_abstract(in_dict):
87 """Convert Format2 'in' dict for step into CWL abstract 'in' dict."""
88 return in_dict
89
90
91 def _format2_out_to_abstract(format2_step, run=None):
92 """Convert Format2 'out' list for step into CWL abstract 'out' list."""
93 cwl_out = []
94 if "out" in format2_step:
95 out = format2_step.get("out")
96 if isinstance(out, dict):
97 for out_name, out_def in out.items():
98 # discard PJA info when converting to abstract CWL
99 cwl_out.append(out_name)
100 else:
101 cwl_out = out
102
103 return cwl_out
104
105
106 def _format2_inputs_to_abstract(inputs):
107 """Strip Galaxy extensions or namespace them."""
108 abstract_inputs = {}
109
110 for input_name, input_def in walk_id_list_or_dict(inputs):
111 if isinstance(input_def, dict):
112 input_type = input_def.get("type")
113 else:
114 input_type = input_def
115 input_def = {"type": input_type}
116
117 if input_type == "data":
118 input_def["type"] = "File"
119
120 _format2_type_to_abstract(input_def)
121
122 # Strip off Galaxy extensions
123 input_def.pop("position", None)
124 abstract_inputs[input_name] = input_def
125
126 return abstract_inputs
127
128
129 def _format2_type_to_abstract(has_type):
130 format2_type = has_type.pop("type")
131 if format2_type == "data":
132 cwl_type = "File"
133 elif format2_type == "collection":
134 # TODO: handled nested collections, pairs, etc...
135 cwl_type = "File[]"
136 else:
137 cwl_type = format2_type
138 optional = has_type.pop("optional", False)
139 if optional:
140 cwl_type += "?"
141 has_type["type"] = cwl_type
142
143
144 def _format2_outputs_to_abstract(outputs):
145 """Strip Galaxy extensions or namespace them."""
146 for output_name, output in walk_id_list_or_dict(outputs):
147 if "type" not in output:
148 output["type"] = "File"
149 return outputs
150
151
152 def main(argv=None):
153 """Entry point for script to export abstract interface."""
154 if argv is None:
155 argv = sys.argv[1:]
156
157 args = _parser().parse_args(argv)
158
159 workflow_path = args.input_path
160 output_path = args.output_path or (workflow_path + ".abstract.cwl")
161
162 if workflow_path == "-":
163 workflow_dict = ordered_load(sys.stdin)
164 else:
165 with open(workflow_path, "r") as f:
166 workflow_dict = ordered_load(f)
167
168 abstract_dict = from_dict(workflow_dict)
169 ordered_dump_to_path(abstract_dict, output_path)
170 return 0
171
172
173 def _parser():
174 parser = argparse.ArgumentParser(description=SCRIPT_DESCRIPTION)
175 parser.add_argument('input_path', metavar='INPUT', type=str,
176 help='input workflow path (.ga/gxwf.yml)')
177 parser.add_argument('output_path', metavar='OUTPUT', type=str, nargs="?",
178 help='output workflow path (.cwl)')
179 return parser
180
181
182 if __name__ == "__main__":
183 sys.exit(main())
184
185
186 __all__ = ('main', 'from_dict')