diff env/lib/python3.9/site-packages/cwltool/cwlrdf.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/cwltool/cwlrdf.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,199 @@
+import urllib
+from codecs import StreamWriter
+from typing import Any, Dict, Optional, TextIO, Union, cast
+
+from rdflib import Graph
+from ruamel.yaml.comments import CommentedMap
+from schema_salad.jsonld_context import makerdf
+from schema_salad.utils import ContextType
+
+from .cwlviewer import CWLViewer
+from .process import Process
+
+
+def gather(tool: Process, ctx: ContextType) -> Graph:
+    g = Graph()
+
+    def visitor(t: CommentedMap) -> None:
+        makerdf(t["id"], t, ctx, graph=g)
+
+    tool.visit(visitor)
+    return g
+
+
+def printrdf(wflow: Process, ctx: ContextType, style: str) -> str:
+    """Serialize the CWL document into a string, ready for printing."""
+    rdf = gather(wflow, ctx).serialize(format=style, encoding="utf-8")
+    if not rdf:
+        return ""
+    return cast(str, rdf.decode("utf-8"))
+
+
+def lastpart(uri: Any) -> str:
+    uri2 = str(uri)
+    if "/" in uri2:
+        return uri2[uri2.rindex("/") + 1 :]
+    return uri2
+
+
+def dot_with_parameters(g: Graph, stdout: Union[TextIO, StreamWriter]) -> None:
+    qres = g.query(
+        """SELECT ?step ?run ?runtype
+           WHERE {
+              ?step cwl:run ?run .
+              ?run rdf:type ?runtype .
+           }"""
+    )
+
+    for step, run, _ in qres:
+        stdout.write(
+            '"%s" [label="%s"]\n'
+            % (lastpart(step), "{} ({})".format(lastpart(step), lastpart(run)))
+        )
+
+    qres = g.query(
+        """SELECT ?step ?inp ?source
+           WHERE {
+              ?wf Workflow:steps ?step .
+              ?step cwl:inputs ?inp .
+              ?inp cwl:source ?source .
+           }"""
+    )
+
+    for step, inp, source in qres:
+        stdout.write('"%s" [shape=box]\n' % (lastpart(inp)))
+        stdout.write(
+            '"{}" -> "{}" [label="{}"]\n'.format(lastpart(source), lastpart(inp), "")
+        )
+        stdout.write(
+            '"{}" -> "{}" [label="{}"]\n'.format(lastpart(inp), lastpart(step), "")
+        )
+
+    qres = g.query(
+        """SELECT ?step ?out
+           WHERE {
+              ?wf Workflow:steps ?step .
+              ?step cwl:outputs ?out .
+           }"""
+    )
+
+    for step, out in qres:
+        stdout.write('"%s" [shape=box]\n' % (lastpart(out)))
+        stdout.write(
+            '"{}" -> "{}" [label="{}"]\n'.format(lastpart(step), lastpart(out), "")
+        )
+
+    qres = g.query(
+        """SELECT ?out ?source
+           WHERE {
+              ?wf cwl:outputs ?out .
+              ?out cwl:source ?source .
+           }"""
+    )
+
+    for out, source in qres:
+        stdout.write('"%s" [shape=octagon]\n' % (lastpart(out)))
+        stdout.write(
+            '"{}" -> "{}" [label="{}"]\n'.format(lastpart(source), lastpart(out), "")
+        )
+
+    qres = g.query(
+        """SELECT ?inp
+           WHERE {
+              ?wf rdf:type cwl:Workflow .
+              ?wf cwl:inputs ?inp .
+           }"""
+    )
+
+    for (inp,) in qres:
+        stdout.write('"%s" [shape=octagon]\n' % (lastpart(inp)))
+
+
+def dot_without_parameters(g: Graph, stdout: Union[TextIO, StreamWriter]) -> None:
+    dotname = {}  # type: Dict[str,str]
+    clusternode = {}
+
+    stdout.write("compound=true\n")
+
+    subworkflows = set()
+    qres = g.query(
+        """SELECT ?run
+           WHERE {
+              ?wf rdf:type cwl:Workflow .
+              ?wf Workflow:steps ?step .
+              ?step cwl:run ?run .
+              ?run rdf:type cwl:Workflow .
+           } ORDER BY ?wf"""
+    )
+    for (run,) in qres:
+        subworkflows.add(run)
+
+    qres = g.query(
+        """SELECT ?wf ?step ?run ?runtype
+           WHERE {
+              ?wf rdf:type cwl:Workflow .
+              ?wf Workflow:steps ?step .
+              ?step cwl:run ?run .
+              ?run rdf:type ?runtype .
+           } ORDER BY ?wf"""
+    )
+
+    currentwf = None  # type: Optional[str]
+    for wf, step, _run, runtype in qres:
+        if step not in dotname:
+            dotname[step] = lastpart(step)
+
+        if wf != currentwf:
+            if currentwf is not None:
+                stdout.write("}\n")
+            if wf in subworkflows:
+                if wf not in dotname:
+                    dotname[wf] = "cluster_" + lastpart(wf)
+                stdout.write(
+                    'subgraph "{}" {{ label="{}"\n'.format(dotname[wf], lastpart(wf))
+                )
+                currentwf = wf
+                clusternode[wf] = step
+            else:
+                currentwf = None
+
+        if str(runtype) != "https://w3id.org/cwl/cwl#Workflow":
+            stdout.write(
+                '"%s" [label="%s"]\n'
+                % (dotname[step], urllib.parse.urldefrag(str(step))[1])
+            )
+
+    if currentwf is not None:
+        stdout.write("}\n")
+
+    qres = g.query(
+        """SELECT DISTINCT ?src ?sink ?srcrun ?sinkrun
+           WHERE {
+              ?wf1 Workflow:steps ?src .
+              ?wf2 Workflow:steps ?sink .
+              ?src cwl:out ?out .
+              ?inp cwl:source ?out .
+              ?sink cwl:in ?inp .
+              ?src cwl:run ?srcrun .
+              ?sink cwl:run ?sinkrun .
+           }"""
+    )
+
+    for src, sink, srcrun, sinkrun in qres:
+        attr = ""
+        if srcrun in clusternode:
+            attr += 'ltail="%s"' % dotname[srcrun]
+            src = clusternode[srcrun]
+        if sinkrun in clusternode:
+            attr += ' lhead="%s"' % dotname[sinkrun]
+            sink = clusternode[sinkrun]
+        stdout.write('"{}" -> "{}" [{}]\n'.format(dotname[src], dotname[sink], attr))
+
+
+def printdot(
+    wf: Process,
+    ctx: ContextType,
+    stdout: Union[TextIO, StreamWriter],
+) -> None:
+    cwl_viewer = CWLViewer(printrdf(wf, ctx, "n3"))  # type: CWLViewer
+    stdout.write(cwl_viewer.dot())