comparison env/lib/python3.9/site-packages/cwltool/subgraph.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 import urllib
2 from collections import namedtuple
3 from typing import (
4 Dict,
5 List,
6 MutableMapping,
7 MutableSequence,
8 Optional,
9 Set,
10 Tuple,
11 cast,
12 )
13
14 from ruamel.yaml.comments import CommentedMap
15
16 from .utils import CWLObjectType, aslist
17 from .workflow import Workflow, WorkflowStep
18
19 Node = namedtuple("Node", ("up", "down", "type"))
20 UP = "up"
21 DOWN = "down"
22 INPUT = "input"
23 OUTPUT = "output"
24 STEP = "step"
25
26
27 def subgraph_visit(
28 current: str,
29 nodes: MutableMapping[str, Node],
30 visited: Set[str],
31 direction: str,
32 ) -> None:
33
34 if current in visited:
35 return
36 visited.add(current)
37
38 if direction == DOWN:
39 d = nodes[current].down
40 if direction == UP:
41 d = nodes[current].up
42 for c in d:
43 subgraph_visit(c, nodes, visited, direction)
44
45
46 def declare_node(nodes: Dict[str, Node], nodeid: str, tp: Optional[str]) -> Node:
47 if nodeid in nodes:
48 n = nodes[nodeid]
49 if n.type is None:
50 nodes[nodeid] = Node(n.up, n.down, tp)
51 else:
52 nodes[nodeid] = Node([], [], tp)
53 return nodes[nodeid]
54
55
56 def find_step(steps: List[WorkflowStep], stepid: str) -> Optional[CWLObjectType]:
57 for st in steps:
58 if st.tool["id"] == stepid:
59 return st.tool
60 return None
61
62
63 def get_subgraph(roots: MutableSequence[str], tool: Workflow) -> CommentedMap:
64 if tool.tool["class"] != "Workflow":
65 raise Exception("Can only extract subgraph from workflow")
66
67 nodes: Dict[str, Node] = {}
68
69 for inp in tool.tool["inputs"]:
70 declare_node(nodes, inp["id"], INPUT)
71
72 for out in tool.tool["outputs"]:
73 declare_node(nodes, out["id"], OUTPUT)
74 for i in aslist(out.get("outputSource", [])):
75 # source is upstream from output (dependency)
76 nodes[out["id"]].up.append(i)
77 # output is downstream from source
78 declare_node(nodes, i, None)
79 nodes[i].down.append(out["id"])
80
81 for st in tool.tool["steps"]:
82 step = declare_node(nodes, st["id"], STEP)
83 for i in st["in"]:
84 if "source" not in i:
85 continue
86 for src in aslist(i["source"]):
87 # source is upstream from step (dependency)
88 step.up.append(src)
89 # step is downstream from source
90 declare_node(nodes, src, None)
91 nodes[src].down.append(st["id"])
92 for out in st["out"]:
93 # output is downstream from step
94 step.down.append(out)
95 # step is upstream from output
96 declare_node(nodes, out, None)
97 nodes[out].up.append(st["id"])
98
99 # Find all the downstream nodes from the starting points
100 visited_down: Set[str] = set()
101 for r in roots:
102 if nodes[r].type == OUTPUT:
103 subgraph_visit(r, nodes, visited_down, UP)
104 else:
105 subgraph_visit(r, nodes, visited_down, DOWN)
106
107 # Now make sure all the nodes are connected to upstream inputs
108 visited: Set[str] = set()
109 rewire: Dict[str, Tuple[str, CWLObjectType]] = {}
110 for v in visited_down:
111 visited.add(v)
112 if nodes[v].type in (STEP, OUTPUT):
113 for u in nodes[v].up:
114 if u in visited_down:
115 continue
116 if nodes[u].type == INPUT:
117 visited.add(u)
118 else:
119 # rewire
120 df = urllib.parse.urldefrag(u)
121 rn = str(df[0] + "#" + df[1].replace("/", "_"))
122 if nodes[v].type == STEP:
123 wfstep = find_step(tool.steps, v)
124 if wfstep is not None:
125 for inp in cast(
126 MutableSequence[CWLObjectType], wfstep["inputs"]
127 ):
128 if "source" in inp and u in cast(
129 CWLObjectType, inp["source"]
130 ):
131 rewire[u] = (rn, cast(CWLObjectType, inp["type"]))
132 break
133 else:
134 raise Exception("Could not find step %s" % v)
135
136 extracted = CommentedMap()
137 for f in tool.tool:
138 if f in ("steps", "inputs", "outputs"):
139 extracted[f] = []
140 for i in tool.tool[f]:
141 if i["id"] in visited:
142 if f == "steps":
143 for inport in i["in"]:
144 if "source" not in inport:
145 continue
146 if isinstance(inport["source"], MutableSequence):
147 inport["source"] = [
148 rewire[s][0]
149 for s in inport["source"]
150 if s in rewire
151 ]
152 elif inport["source"] in rewire:
153 inport["source"] = rewire[inport["source"]][0]
154 extracted[f].append(i)
155 else:
156 extracted[f] = tool.tool[f]
157
158 for rv in rewire.values():
159 extracted["inputs"].append({"id": rv[0], "type": rv[1]})
160
161 return extracted
162
163
164 def get_step(tool: Workflow, step_id: str) -> CommentedMap:
165
166 extracted = CommentedMap()
167
168 step = find_step(tool.steps, step_id)
169 if step is None:
170 raise Exception(f"Step {step_id} was not found")
171
172 extracted["steps"] = [step]
173 extracted["inputs"] = []
174 extracted["outputs"] = []
175
176 for inport in cast(List[CWLObjectType], step["in"]):
177 name = cast(str, inport["id"]).split("#")[-1].split("/")[-1]
178 extracted["inputs"].append({"id": name, "type": "Any"})
179 inport["source"] = name
180 if "linkMerge" in inport:
181 del inport["linkMerge"]
182
183 for outport in cast(List[str], step["out"]):
184 name = outport.split("#")[-1].split("/")[-1]
185 extracted["outputs"].append(
186 {"id": name, "type": "Any", "outputSource": f"{step_id}/{name}"}
187 )
188
189 for f in tool.tool:
190 if f not in ("steps", "inputs", "outputs"):
191 extracted[f] = tool.tool[f]
192
193 return extracted