Mercurial > repos > guerler > springsuite
diff planemo/lib/python3.7/site-packages/gxformat2/normalize.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/planemo/lib/python3.7/site-packages/gxformat2/normalize.py Fri Jul 31 00:32:28 2020 -0400 @@ -0,0 +1,97 @@ +"""Abstractions for uniform across formats.""" +from gxformat2._scripts import ensure_format2 +from gxformat2._yaml import ordered_load +from gxformat2.converter import _outputs_as_list, convert_inputs_to_steps, steps_as_list + +NON_INPUT_TYPES = ["tool", "subworkflow", "pause"] + + +def steps_normalized(workflow_dict=None, workflow_path=None): + """Walk over a normalized step rep. across workflow formats.""" + workflow_dict = _ensure_format2(workflow_dict=workflow_dict, workflow_path=workflow_path) + steps = steps_as_list(workflow_dict) + convert_inputs_to_steps(workflow_dict, steps) + return steps + + +def inputs_normalized(**kwd): + """Call steps_normalized and retain just the input steps normalized.""" + steps = steps_normalized(**kwd) + input_steps = [] + for step in steps: + step_type = step.get("type") or 'tool' + if step_type in NON_INPUT_TYPES: + continue + + input_steps.append(step) + + return input_steps + + +def outputs_normalized(**kwd): + """Ensure Format2 and return outputs. + + Probably should go farther and normalize source -> outputSource, + but doesn't yet do this. + """ + workflow_dict = _ensure_format2(**kwd) + return _outputs_as_list(workflow_dict) + + +def walk_id_list_or_dict(dict_or_list): + """Walk over idmap regardless of list or dict representation.""" + if isinstance(dict_or_list, list): + for item in dict_or_list: + yield item["id"], item + else: + for item in dict_or_list.items(): + yield item + + +def ensure_implicit_step_outs(workflow_dict): + """Ensure implicit 'out' dicts allowed by format2 are filled in for CWL.""" + outputs_by_label = {} + + def register_step_output(step_label, output_name): + if step_label not in outputs_by_label: + outputs_by_label[step_label] = set() + outputs_by_label[step_label].add(output_name) + + def register_output_source(output_source): + if "/" in output_source: + step, output_name = output_source.split("/", 1) + register_step_output(step, output_name) + + for output_name, output in walk_id_list_or_dict(workflow_dict.get("outputs", {})): + if "outputSource" in output: + output_source = output["outputSource"] + if "/" in output_source: + step, output_name = output_source.split("/", 1) + register_step_output(step, output_name) + + for step in steps_as_list(workflow_dict): + step_in = step.get("in", {}) + for step_in_name, step_in_def in step_in.items(): + register_output_source(step_in_def) + + for step in steps_as_list(workflow_dict): + label = step["label"] + if "out" not in step: + step["out"] = [] + for out in outputs_by_label.get(label, []): + step_out = step["out"] + if isinstance(step_out, list): + if out not in step_out: + step_out.append(out) + else: + step_out[out] = {} + + +def _ensure_format2(workflow_dict=None, workflow_path=None): + if workflow_path is not None: + assert workflow_dict is None + with open(workflow_path, "r") as f: + workflow_dict = ordered_load(f) + + workflow_dict = ensure_format2(workflow_dict) + return workflow_dict