Mercurial > repos > guerler > springsuite
view planemo/lib/python3.7/site-packages/gxformat2/normalize.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
line wrap: on
line source
"""Abstractions for uniform across formats.""" from gxformat2._scripts import ensure_format2 from gxformat2._yaml import ordered_load from gxformat2.converter import _outputs_as_list, convert_inputs_to_steps, steps_as_list NON_INPUT_TYPES = ["tool", "subworkflow", "pause"] def steps_normalized(workflow_dict=None, workflow_path=None): """Walk over a normalized step rep. across workflow formats.""" workflow_dict = _ensure_format2(workflow_dict=workflow_dict, workflow_path=workflow_path) steps = steps_as_list(workflow_dict) convert_inputs_to_steps(workflow_dict, steps) return steps def inputs_normalized(**kwd): """Call steps_normalized and retain just the input steps normalized.""" steps = steps_normalized(**kwd) input_steps = [] for step in steps: step_type = step.get("type") or 'tool' if step_type in NON_INPUT_TYPES: continue input_steps.append(step) return input_steps def outputs_normalized(**kwd): """Ensure Format2 and return outputs. Probably should go farther and normalize source -> outputSource, but doesn't yet do this. """ workflow_dict = _ensure_format2(**kwd) return _outputs_as_list(workflow_dict) def walk_id_list_or_dict(dict_or_list): """Walk over idmap regardless of list or dict representation.""" if isinstance(dict_or_list, list): for item in dict_or_list: yield item["id"], item else: for item in dict_or_list.items(): yield item def ensure_implicit_step_outs(workflow_dict): """Ensure implicit 'out' dicts allowed by format2 are filled in for CWL.""" outputs_by_label = {} def register_step_output(step_label, output_name): if step_label not in outputs_by_label: outputs_by_label[step_label] = set() outputs_by_label[step_label].add(output_name) def register_output_source(output_source): if "/" in output_source: step, output_name = output_source.split("/", 1) register_step_output(step, output_name) for output_name, output in walk_id_list_or_dict(workflow_dict.get("outputs", {})): if "outputSource" in output: output_source = output["outputSource"] if "/" in output_source: step, output_name = output_source.split("/", 1) register_step_output(step, output_name) for step in steps_as_list(workflow_dict): step_in = step.get("in", {}) for step_in_name, step_in_def in step_in.items(): register_output_source(step_in_def) for step in steps_as_list(workflow_dict): label = step["label"] if "out" not in step: step["out"] = [] for out in outputs_by_label.get(label, []): step_out = step["out"] if isinstance(step_out, list): if out not in step_out: step_out.append(out) else: step_out[out] = {} def _ensure_format2(workflow_dict=None, workflow_path=None): if workflow_path is not None: assert workflow_dict is None with open(workflow_path, "r") as f: workflow_dict = ordered_load(f) workflow_dict = ensure_format2(workflow_dict) return workflow_dict