comparison env/lib/python3.9/site-packages/cwltool/cwlrdf.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 import urllib
2 from codecs import StreamWriter
3 from typing import Any, Dict, Optional, TextIO, Union, cast
4
5 from rdflib import Graph
6 from ruamel.yaml.comments import CommentedMap
7 from schema_salad.jsonld_context import makerdf
8 from schema_salad.utils import ContextType
9
10 from .cwlviewer import CWLViewer
11 from .process import Process
12
13
14 def gather(tool: Process, ctx: ContextType) -> Graph:
15 g = Graph()
16
17 def visitor(t: CommentedMap) -> None:
18 makerdf(t["id"], t, ctx, graph=g)
19
20 tool.visit(visitor)
21 return g
22
23
24 def printrdf(wflow: Process, ctx: ContextType, style: str) -> str:
25 """Serialize the CWL document into a string, ready for printing."""
26 rdf = gather(wflow, ctx).serialize(format=style, encoding="utf-8")
27 if not rdf:
28 return ""
29 return cast(str, rdf.decode("utf-8"))
30
31
32 def lastpart(uri: Any) -> str:
33 uri2 = str(uri)
34 if "/" in uri2:
35 return uri2[uri2.rindex("/") + 1 :]
36 return uri2
37
38
39 def dot_with_parameters(g: Graph, stdout: Union[TextIO, StreamWriter]) -> None:
40 qres = g.query(
41 """SELECT ?step ?run ?runtype
42 WHERE {
43 ?step cwl:run ?run .
44 ?run rdf:type ?runtype .
45 }"""
46 )
47
48 for step, run, _ in qres:
49 stdout.write(
50 '"%s" [label="%s"]\n'
51 % (lastpart(step), "{} ({})".format(lastpart(step), lastpart(run)))
52 )
53
54 qres = g.query(
55 """SELECT ?step ?inp ?source
56 WHERE {
57 ?wf Workflow:steps ?step .
58 ?step cwl:inputs ?inp .
59 ?inp cwl:source ?source .
60 }"""
61 )
62
63 for step, inp, source in qres:
64 stdout.write('"%s" [shape=box]\n' % (lastpart(inp)))
65 stdout.write(
66 '"{}" -> "{}" [label="{}"]\n'.format(lastpart(source), lastpart(inp), "")
67 )
68 stdout.write(
69 '"{}" -> "{}" [label="{}"]\n'.format(lastpart(inp), lastpart(step), "")
70 )
71
72 qres = g.query(
73 """SELECT ?step ?out
74 WHERE {
75 ?wf Workflow:steps ?step .
76 ?step cwl:outputs ?out .
77 }"""
78 )
79
80 for step, out in qres:
81 stdout.write('"%s" [shape=box]\n' % (lastpart(out)))
82 stdout.write(
83 '"{}" -> "{}" [label="{}"]\n'.format(lastpart(step), lastpart(out), "")
84 )
85
86 qres = g.query(
87 """SELECT ?out ?source
88 WHERE {
89 ?wf cwl:outputs ?out .
90 ?out cwl:source ?source .
91 }"""
92 )
93
94 for out, source in qres:
95 stdout.write('"%s" [shape=octagon]\n' % (lastpart(out)))
96 stdout.write(
97 '"{}" -> "{}" [label="{}"]\n'.format(lastpart(source), lastpart(out), "")
98 )
99
100 qres = g.query(
101 """SELECT ?inp
102 WHERE {
103 ?wf rdf:type cwl:Workflow .
104 ?wf cwl:inputs ?inp .
105 }"""
106 )
107
108 for (inp,) in qres:
109 stdout.write('"%s" [shape=octagon]\n' % (lastpart(inp)))
110
111
112 def dot_without_parameters(g: Graph, stdout: Union[TextIO, StreamWriter]) -> None:
113 dotname = {} # type: Dict[str,str]
114 clusternode = {}
115
116 stdout.write("compound=true\n")
117
118 subworkflows = set()
119 qres = g.query(
120 """SELECT ?run
121 WHERE {
122 ?wf rdf:type cwl:Workflow .
123 ?wf Workflow:steps ?step .
124 ?step cwl:run ?run .
125 ?run rdf:type cwl:Workflow .
126 } ORDER BY ?wf"""
127 )
128 for (run,) in qres:
129 subworkflows.add(run)
130
131 qres = g.query(
132 """SELECT ?wf ?step ?run ?runtype
133 WHERE {
134 ?wf rdf:type cwl:Workflow .
135 ?wf Workflow:steps ?step .
136 ?step cwl:run ?run .
137 ?run rdf:type ?runtype .
138 } ORDER BY ?wf"""
139 )
140
141 currentwf = None # type: Optional[str]
142 for wf, step, _run, runtype in qres:
143 if step not in dotname:
144 dotname[step] = lastpart(step)
145
146 if wf != currentwf:
147 if currentwf is not None:
148 stdout.write("}\n")
149 if wf in subworkflows:
150 if wf not in dotname:
151 dotname[wf] = "cluster_" + lastpart(wf)
152 stdout.write(
153 'subgraph "{}" {{ label="{}"\n'.format(dotname[wf], lastpart(wf))
154 )
155 currentwf = wf
156 clusternode[wf] = step
157 else:
158 currentwf = None
159
160 if str(runtype) != "https://w3id.org/cwl/cwl#Workflow":
161 stdout.write(
162 '"%s" [label="%s"]\n'
163 % (dotname[step], urllib.parse.urldefrag(str(step))[1])
164 )
165
166 if currentwf is not None:
167 stdout.write("}\n")
168
169 qres = g.query(
170 """SELECT DISTINCT ?src ?sink ?srcrun ?sinkrun
171 WHERE {
172 ?wf1 Workflow:steps ?src .
173 ?wf2 Workflow:steps ?sink .
174 ?src cwl:out ?out .
175 ?inp cwl:source ?out .
176 ?sink cwl:in ?inp .
177 ?src cwl:run ?srcrun .
178 ?sink cwl:run ?sinkrun .
179 }"""
180 )
181
182 for src, sink, srcrun, sinkrun in qres:
183 attr = ""
184 if srcrun in clusternode:
185 attr += 'ltail="%s"' % dotname[srcrun]
186 src = clusternode[srcrun]
187 if sinkrun in clusternode:
188 attr += ' lhead="%s"' % dotname[sinkrun]
189 sink = clusternode[sinkrun]
190 stdout.write('"{}" -> "{}" [{}]\n'.format(dotname[src], dotname[sink], attr))
191
192
193 def printdot(
194 wf: Process,
195 ctx: ContextType,
196 stdout: Union[TextIO, StreamWriter],
197 ) -> None:
198 cwl_viewer = CWLViewer(printrdf(wf, ctx, "n3")) # type: CWLViewer
199 stdout.write(cwl_viewer.dot())