Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/cwltool/subgraph.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
| author | shellac |
|---|---|
| date | Mon, 22 Mar 2021 18:12:50 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:4f3585e2f14b |
|---|---|
| 1 import urllib | |
| 2 from collections import namedtuple | |
| 3 from typing import ( | |
| 4 Dict, | |
| 5 List, | |
| 6 MutableMapping, | |
| 7 MutableSequence, | |
| 8 Optional, | |
| 9 Set, | |
| 10 Tuple, | |
| 11 cast, | |
| 12 ) | |
| 13 | |
| 14 from ruamel.yaml.comments import CommentedMap | |
| 15 | |
| 16 from .utils import CWLObjectType, aslist | |
| 17 from .workflow import Workflow, WorkflowStep | |
| 18 | |
| 19 Node = namedtuple("Node", ("up", "down", "type")) | |
| 20 UP = "up" | |
| 21 DOWN = "down" | |
| 22 INPUT = "input" | |
| 23 OUTPUT = "output" | |
| 24 STEP = "step" | |
| 25 | |
| 26 | |
| 27 def subgraph_visit( | |
| 28 current: str, | |
| 29 nodes: MutableMapping[str, Node], | |
| 30 visited: Set[str], | |
| 31 direction: str, | |
| 32 ) -> None: | |
| 33 | |
| 34 if current in visited: | |
| 35 return | |
| 36 visited.add(current) | |
| 37 | |
| 38 if direction == DOWN: | |
| 39 d = nodes[current].down | |
| 40 if direction == UP: | |
| 41 d = nodes[current].up | |
| 42 for c in d: | |
| 43 subgraph_visit(c, nodes, visited, direction) | |
| 44 | |
| 45 | |
| 46 def declare_node(nodes: Dict[str, Node], nodeid: str, tp: Optional[str]) -> Node: | |
| 47 if nodeid in nodes: | |
| 48 n = nodes[nodeid] | |
| 49 if n.type is None: | |
| 50 nodes[nodeid] = Node(n.up, n.down, tp) | |
| 51 else: | |
| 52 nodes[nodeid] = Node([], [], tp) | |
| 53 return nodes[nodeid] | |
| 54 | |
| 55 | |
| 56 def find_step(steps: List[WorkflowStep], stepid: str) -> Optional[CWLObjectType]: | |
| 57 for st in steps: | |
| 58 if st.tool["id"] == stepid: | |
| 59 return st.tool | |
| 60 return None | |
| 61 | |
| 62 | |
| 63 def get_subgraph(roots: MutableSequence[str], tool: Workflow) -> CommentedMap: | |
| 64 if tool.tool["class"] != "Workflow": | |
| 65 raise Exception("Can only extract subgraph from workflow") | |
| 66 | |
| 67 nodes: Dict[str, Node] = {} | |
| 68 | |
| 69 for inp in tool.tool["inputs"]: | |
| 70 declare_node(nodes, inp["id"], INPUT) | |
| 71 | |
| 72 for out in tool.tool["outputs"]: | |
| 73 declare_node(nodes, out["id"], OUTPUT) | |
| 74 for i in aslist(out.get("outputSource", [])): | |
| 75 # source is upstream from output (dependency) | |
| 76 nodes[out["id"]].up.append(i) | |
| 77 # output is downstream from source | |
| 78 declare_node(nodes, i, None) | |
| 79 nodes[i].down.append(out["id"]) | |
| 80 | |
| 81 for st in tool.tool["steps"]: | |
| 82 step = declare_node(nodes, st["id"], STEP) | |
| 83 for i in st["in"]: | |
| 84 if "source" not in i: | |
| 85 continue | |
| 86 for src in aslist(i["source"]): | |
| 87 # source is upstream from step (dependency) | |
| 88 step.up.append(src) | |
| 89 # step is downstream from source | |
| 90 declare_node(nodes, src, None) | |
| 91 nodes[src].down.append(st["id"]) | |
| 92 for out in st["out"]: | |
| 93 # output is downstream from step | |
| 94 step.down.append(out) | |
| 95 # step is upstream from output | |
| 96 declare_node(nodes, out, None) | |
| 97 nodes[out].up.append(st["id"]) | |
| 98 | |
| 99 # Find all the downstream nodes from the starting points | |
| 100 visited_down: Set[str] = set() | |
| 101 for r in roots: | |
| 102 if nodes[r].type == OUTPUT: | |
| 103 subgraph_visit(r, nodes, visited_down, UP) | |
| 104 else: | |
| 105 subgraph_visit(r, nodes, visited_down, DOWN) | |
| 106 | |
| 107 # Now make sure all the nodes are connected to upstream inputs | |
| 108 visited: Set[str] = set() | |
| 109 rewire: Dict[str, Tuple[str, CWLObjectType]] = {} | |
| 110 for v in visited_down: | |
| 111 visited.add(v) | |
| 112 if nodes[v].type in (STEP, OUTPUT): | |
| 113 for u in nodes[v].up: | |
| 114 if u in visited_down: | |
| 115 continue | |
| 116 if nodes[u].type == INPUT: | |
| 117 visited.add(u) | |
| 118 else: | |
| 119 # rewire | |
| 120 df = urllib.parse.urldefrag(u) | |
| 121 rn = str(df[0] + "#" + df[1].replace("/", "_")) | |
| 122 if nodes[v].type == STEP: | |
| 123 wfstep = find_step(tool.steps, v) | |
| 124 if wfstep is not None: | |
| 125 for inp in cast( | |
| 126 MutableSequence[CWLObjectType], wfstep["inputs"] | |
| 127 ): | |
| 128 if "source" in inp and u in cast( | |
| 129 CWLObjectType, inp["source"] | |
| 130 ): | |
| 131 rewire[u] = (rn, cast(CWLObjectType, inp["type"])) | |
| 132 break | |
| 133 else: | |
| 134 raise Exception("Could not find step %s" % v) | |
| 135 | |
| 136 extracted = CommentedMap() | |
| 137 for f in tool.tool: | |
| 138 if f in ("steps", "inputs", "outputs"): | |
| 139 extracted[f] = [] | |
| 140 for i in tool.tool[f]: | |
| 141 if i["id"] in visited: | |
| 142 if f == "steps": | |
| 143 for inport in i["in"]: | |
| 144 if "source" not in inport: | |
| 145 continue | |
| 146 if isinstance(inport["source"], MutableSequence): | |
| 147 inport["source"] = [ | |
| 148 rewire[s][0] | |
| 149 for s in inport["source"] | |
| 150 if s in rewire | |
| 151 ] | |
| 152 elif inport["source"] in rewire: | |
| 153 inport["source"] = rewire[inport["source"]][0] | |
| 154 extracted[f].append(i) | |
| 155 else: | |
| 156 extracted[f] = tool.tool[f] | |
| 157 | |
| 158 for rv in rewire.values(): | |
| 159 extracted["inputs"].append({"id": rv[0], "type": rv[1]}) | |
| 160 | |
| 161 return extracted | |
| 162 | |
| 163 | |
| 164 def get_step(tool: Workflow, step_id: str) -> CommentedMap: | |
| 165 | |
| 166 extracted = CommentedMap() | |
| 167 | |
| 168 step = find_step(tool.steps, step_id) | |
| 169 if step is None: | |
| 170 raise Exception(f"Step {step_id} was not found") | |
| 171 | |
| 172 extracted["steps"] = [step] | |
| 173 extracted["inputs"] = [] | |
| 174 extracted["outputs"] = [] | |
| 175 | |
| 176 for inport in cast(List[CWLObjectType], step["in"]): | |
| 177 name = cast(str, inport["id"]).split("#")[-1].split("/")[-1] | |
| 178 extracted["inputs"].append({"id": name, "type": "Any"}) | |
| 179 inport["source"] = name | |
| 180 if "linkMerge" in inport: | |
| 181 del inport["linkMerge"] | |
| 182 | |
| 183 for outport in cast(List[str], step["out"]): | |
| 184 name = outport.split("#")[-1].split("/")[-1] | |
| 185 extracted["outputs"].append( | |
| 186 {"id": name, "type": "Any", "outputSource": f"{step_id}/{name}"} | |
| 187 ) | |
| 188 | |
| 189 for f in tool.tool: | |
| 190 if f not in ("steps", "inputs", "outputs"): | |
| 191 extracted[f] = tool.tool[f] | |
| 192 | |
| 193 return extracted |
