Mercurial > repos > shellac > sam_consensus_v3
diff env/lib/python3.9/site-packages/cwltool/subgraph.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.9/site-packages/cwltool/subgraph.py Mon Mar 22 18:12:50 2021 +0000 @@ -0,0 +1,193 @@ +import urllib +from collections import namedtuple +from typing import ( + Dict, + List, + MutableMapping, + MutableSequence, + Optional, + Set, + Tuple, + cast, +) + +from ruamel.yaml.comments import CommentedMap + +from .utils import CWLObjectType, aslist +from .workflow import Workflow, WorkflowStep + +Node = namedtuple("Node", ("up", "down", "type")) +UP = "up" +DOWN = "down" +INPUT = "input" +OUTPUT = "output" +STEP = "step" + + +def subgraph_visit( + current: str, + nodes: MutableMapping[str, Node], + visited: Set[str], + direction: str, +) -> None: + + if current in visited: + return + visited.add(current) + + if direction == DOWN: + d = nodes[current].down + if direction == UP: + d = nodes[current].up + for c in d: + subgraph_visit(c, nodes, visited, direction) + + +def declare_node(nodes: Dict[str, Node], nodeid: str, tp: Optional[str]) -> Node: + if nodeid in nodes: + n = nodes[nodeid] + if n.type is None: + nodes[nodeid] = Node(n.up, n.down, tp) + else: + nodes[nodeid] = Node([], [], tp) + return nodes[nodeid] + + +def find_step(steps: List[WorkflowStep], stepid: str) -> Optional[CWLObjectType]: + for st in steps: + if st.tool["id"] == stepid: + return st.tool + return None + + +def get_subgraph(roots: MutableSequence[str], tool: Workflow) -> CommentedMap: + if tool.tool["class"] != "Workflow": + raise Exception("Can only extract subgraph from workflow") + + nodes: Dict[str, Node] = {} + + for inp in tool.tool["inputs"]: + declare_node(nodes, inp["id"], INPUT) + + for out in tool.tool["outputs"]: + declare_node(nodes, out["id"], OUTPUT) + for i in aslist(out.get("outputSource", [])): + # source is upstream from output (dependency) + nodes[out["id"]].up.append(i) + # output is downstream from source + declare_node(nodes, i, None) + nodes[i].down.append(out["id"]) + + for st in tool.tool["steps"]: + step = declare_node(nodes, st["id"], STEP) + for i in st["in"]: + if "source" not in i: + continue + for src in aslist(i["source"]): + # source is upstream from step (dependency) + step.up.append(src) + # step is downstream from source + declare_node(nodes, src, None) + nodes[src].down.append(st["id"]) + for out in st["out"]: + # output is downstream from step + step.down.append(out) + # step is upstream from output + declare_node(nodes, out, None) + nodes[out].up.append(st["id"]) + + # Find all the downstream nodes from the starting points + visited_down: Set[str] = set() + for r in roots: + if nodes[r].type == OUTPUT: + subgraph_visit(r, nodes, visited_down, UP) + else: + subgraph_visit(r, nodes, visited_down, DOWN) + + # Now make sure all the nodes are connected to upstream inputs + visited: Set[str] = set() + rewire: Dict[str, Tuple[str, CWLObjectType]] = {} + for v in visited_down: + visited.add(v) + if nodes[v].type in (STEP, OUTPUT): + for u in nodes[v].up: + if u in visited_down: + continue + if nodes[u].type == INPUT: + visited.add(u) + else: + # rewire + df = urllib.parse.urldefrag(u) + rn = str(df[0] + "#" + df[1].replace("/", "_")) + if nodes[v].type == STEP: + wfstep = find_step(tool.steps, v) + if wfstep is not None: + for inp in cast( + MutableSequence[CWLObjectType], wfstep["inputs"] + ): + if "source" in inp and u in cast( + CWLObjectType, inp["source"] + ): + rewire[u] = (rn, cast(CWLObjectType, inp["type"])) + break + else: + raise Exception("Could not find step %s" % v) + + extracted = CommentedMap() + for f in tool.tool: + if f in ("steps", "inputs", "outputs"): + extracted[f] = [] + for i in tool.tool[f]: + if i["id"] in visited: + if f == "steps": + for inport in i["in"]: + if "source" not in inport: + continue + if isinstance(inport["source"], MutableSequence): + inport["source"] = [ + rewire[s][0] + for s in inport["source"] + if s in rewire + ] + elif inport["source"] in rewire: + inport["source"] = rewire[inport["source"]][0] + extracted[f].append(i) + else: + extracted[f] = tool.tool[f] + + for rv in rewire.values(): + extracted["inputs"].append({"id": rv[0], "type": rv[1]}) + + return extracted + + +def get_step(tool: Workflow, step_id: str) -> CommentedMap: + + extracted = CommentedMap() + + step = find_step(tool.steps, step_id) + if step is None: + raise Exception(f"Step {step_id} was not found") + + extracted["steps"] = [step] + extracted["inputs"] = [] + extracted["outputs"] = [] + + for inport in cast(List[CWLObjectType], step["in"]): + name = cast(str, inport["id"]).split("#")[-1].split("/")[-1] + extracted["inputs"].append({"id": name, "type": "Any"}) + inport["source"] = name + if "linkMerge" in inport: + del inport["linkMerge"] + + for outport in cast(List[str], step["out"]): + name = outport.split("#")[-1].split("/")[-1] + extracted["outputs"].append( + {"id": name, "type": "Any", "outputSource": f"{step_id}/{name}"} + ) + + for f in tool.tool: + if f not in ("steps", "inputs", "outputs"): + extracted[f] = tool.tool[f] + + return extracted