Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/cwltool/subgraph.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 import urllib | |
2 from collections import namedtuple | |
3 from typing import ( | |
4 Dict, | |
5 List, | |
6 MutableMapping, | |
7 MutableSequence, | |
8 Optional, | |
9 Set, | |
10 Tuple, | |
11 cast, | |
12 ) | |
13 | |
14 from ruamel.yaml.comments import CommentedMap | |
15 | |
16 from .utils import CWLObjectType, aslist | |
17 from .workflow import Workflow, WorkflowStep | |
18 | |
19 Node = namedtuple("Node", ("up", "down", "type")) | |
20 UP = "up" | |
21 DOWN = "down" | |
22 INPUT = "input" | |
23 OUTPUT = "output" | |
24 STEP = "step" | |
25 | |
26 | |
27 def subgraph_visit( | |
28 current: str, | |
29 nodes: MutableMapping[str, Node], | |
30 visited: Set[str], | |
31 direction: str, | |
32 ) -> None: | |
33 | |
34 if current in visited: | |
35 return | |
36 visited.add(current) | |
37 | |
38 if direction == DOWN: | |
39 d = nodes[current].down | |
40 if direction == UP: | |
41 d = nodes[current].up | |
42 for c in d: | |
43 subgraph_visit(c, nodes, visited, direction) | |
44 | |
45 | |
46 def declare_node(nodes: Dict[str, Node], nodeid: str, tp: Optional[str]) -> Node: | |
47 if nodeid in nodes: | |
48 n = nodes[nodeid] | |
49 if n.type is None: | |
50 nodes[nodeid] = Node(n.up, n.down, tp) | |
51 else: | |
52 nodes[nodeid] = Node([], [], tp) | |
53 return nodes[nodeid] | |
54 | |
55 | |
56 def find_step(steps: List[WorkflowStep], stepid: str) -> Optional[CWLObjectType]: | |
57 for st in steps: | |
58 if st.tool["id"] == stepid: | |
59 return st.tool | |
60 return None | |
61 | |
62 | |
63 def get_subgraph(roots: MutableSequence[str], tool: Workflow) -> CommentedMap: | |
64 if tool.tool["class"] != "Workflow": | |
65 raise Exception("Can only extract subgraph from workflow") | |
66 | |
67 nodes: Dict[str, Node] = {} | |
68 | |
69 for inp in tool.tool["inputs"]: | |
70 declare_node(nodes, inp["id"], INPUT) | |
71 | |
72 for out in tool.tool["outputs"]: | |
73 declare_node(nodes, out["id"], OUTPUT) | |
74 for i in aslist(out.get("outputSource", [])): | |
75 # source is upstream from output (dependency) | |
76 nodes[out["id"]].up.append(i) | |
77 # output is downstream from source | |
78 declare_node(nodes, i, None) | |
79 nodes[i].down.append(out["id"]) | |
80 | |
81 for st in tool.tool["steps"]: | |
82 step = declare_node(nodes, st["id"], STEP) | |
83 for i in st["in"]: | |
84 if "source" not in i: | |
85 continue | |
86 for src in aslist(i["source"]): | |
87 # source is upstream from step (dependency) | |
88 step.up.append(src) | |
89 # step is downstream from source | |
90 declare_node(nodes, src, None) | |
91 nodes[src].down.append(st["id"]) | |
92 for out in st["out"]: | |
93 # output is downstream from step | |
94 step.down.append(out) | |
95 # step is upstream from output | |
96 declare_node(nodes, out, None) | |
97 nodes[out].up.append(st["id"]) | |
98 | |
99 # Find all the downstream nodes from the starting points | |
100 visited_down: Set[str] = set() | |
101 for r in roots: | |
102 if nodes[r].type == OUTPUT: | |
103 subgraph_visit(r, nodes, visited_down, UP) | |
104 else: | |
105 subgraph_visit(r, nodes, visited_down, DOWN) | |
106 | |
107 # Now make sure all the nodes are connected to upstream inputs | |
108 visited: Set[str] = set() | |
109 rewire: Dict[str, Tuple[str, CWLObjectType]] = {} | |
110 for v in visited_down: | |
111 visited.add(v) | |
112 if nodes[v].type in (STEP, OUTPUT): | |
113 for u in nodes[v].up: | |
114 if u in visited_down: | |
115 continue | |
116 if nodes[u].type == INPUT: | |
117 visited.add(u) | |
118 else: | |
119 # rewire | |
120 df = urllib.parse.urldefrag(u) | |
121 rn = str(df[0] + "#" + df[1].replace("/", "_")) | |
122 if nodes[v].type == STEP: | |
123 wfstep = find_step(tool.steps, v) | |
124 if wfstep is not None: | |
125 for inp in cast( | |
126 MutableSequence[CWLObjectType], wfstep["inputs"] | |
127 ): | |
128 if "source" in inp and u in cast( | |
129 CWLObjectType, inp["source"] | |
130 ): | |
131 rewire[u] = (rn, cast(CWLObjectType, inp["type"])) | |
132 break | |
133 else: | |
134 raise Exception("Could not find step %s" % v) | |
135 | |
136 extracted = CommentedMap() | |
137 for f in tool.tool: | |
138 if f in ("steps", "inputs", "outputs"): | |
139 extracted[f] = [] | |
140 for i in tool.tool[f]: | |
141 if i["id"] in visited: | |
142 if f == "steps": | |
143 for inport in i["in"]: | |
144 if "source" not in inport: | |
145 continue | |
146 if isinstance(inport["source"], MutableSequence): | |
147 inport["source"] = [ | |
148 rewire[s][0] | |
149 for s in inport["source"] | |
150 if s in rewire | |
151 ] | |
152 elif inport["source"] in rewire: | |
153 inport["source"] = rewire[inport["source"]][0] | |
154 extracted[f].append(i) | |
155 else: | |
156 extracted[f] = tool.tool[f] | |
157 | |
158 for rv in rewire.values(): | |
159 extracted["inputs"].append({"id": rv[0], "type": rv[1]}) | |
160 | |
161 return extracted | |
162 | |
163 | |
164 def get_step(tool: Workflow, step_id: str) -> CommentedMap: | |
165 | |
166 extracted = CommentedMap() | |
167 | |
168 step = find_step(tool.steps, step_id) | |
169 if step is None: | |
170 raise Exception(f"Step {step_id} was not found") | |
171 | |
172 extracted["steps"] = [step] | |
173 extracted["inputs"] = [] | |
174 extracted["outputs"] = [] | |
175 | |
176 for inport in cast(List[CWLObjectType], step["in"]): | |
177 name = cast(str, inport["id"]).split("#")[-1].split("/")[-1] | |
178 extracted["inputs"].append({"id": name, "type": "Any"}) | |
179 inport["source"] = name | |
180 if "linkMerge" in inport: | |
181 del inport["linkMerge"] | |
182 | |
183 for outport in cast(List[str], step["out"]): | |
184 name = outport.split("#")[-1].split("/")[-1] | |
185 extracted["outputs"].append( | |
186 {"id": name, "type": "Any", "outputSource": f"{step_id}/{name}"} | |
187 ) | |
188 | |
189 for f in tool.tool: | |
190 if f not in ("steps", "inputs", "outputs"): | |
191 extracted[f] = tool.tool[f] | |
192 | |
193 return extracted |