Mercurial > repos > shellac > sam_consensus_v3
diff env/lib/python3.9/site-packages/cwltool/cwlrdf.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.9/site-packages/cwltool/cwlrdf.py Mon Mar 22 18:12:50 2021 +0000 @@ -0,0 +1,199 @@ +import urllib +from codecs import StreamWriter +from typing import Any, Dict, Optional, TextIO, Union, cast + +from rdflib import Graph +from ruamel.yaml.comments import CommentedMap +from schema_salad.jsonld_context import makerdf +from schema_salad.utils import ContextType + +from .cwlviewer import CWLViewer +from .process import Process + + +def gather(tool: Process, ctx: ContextType) -> Graph: + g = Graph() + + def visitor(t: CommentedMap) -> None: + makerdf(t["id"], t, ctx, graph=g) + + tool.visit(visitor) + return g + + +def printrdf(wflow: Process, ctx: ContextType, style: str) -> str: + """Serialize the CWL document into a string, ready for printing.""" + rdf = gather(wflow, ctx).serialize(format=style, encoding="utf-8") + if not rdf: + return "" + return cast(str, rdf.decode("utf-8")) + + +def lastpart(uri: Any) -> str: + uri2 = str(uri) + if "/" in uri2: + return uri2[uri2.rindex("/") + 1 :] + return uri2 + + +def dot_with_parameters(g: Graph, stdout: Union[TextIO, StreamWriter]) -> None: + qres = g.query( + """SELECT ?step ?run ?runtype + WHERE { + ?step cwl:run ?run . + ?run rdf:type ?runtype . + }""" + ) + + for step, run, _ in qres: + stdout.write( + '"%s" [label="%s"]\n' + % (lastpart(step), "{} ({})".format(lastpart(step), lastpart(run))) + ) + + qres = g.query( + """SELECT ?step ?inp ?source + WHERE { + ?wf Workflow:steps ?step . + ?step cwl:inputs ?inp . + ?inp cwl:source ?source . + }""" + ) + + for step, inp, source in qres: + stdout.write('"%s" [shape=box]\n' % (lastpart(inp))) + stdout.write( + '"{}" -> "{}" [label="{}"]\n'.format(lastpart(source), lastpart(inp), "") + ) + stdout.write( + '"{}" -> "{}" [label="{}"]\n'.format(lastpart(inp), lastpart(step), "") + ) + + qres = g.query( + """SELECT ?step ?out + WHERE { + ?wf Workflow:steps ?step . + ?step cwl:outputs ?out . + }""" + ) + + for step, out in qres: + stdout.write('"%s" [shape=box]\n' % (lastpart(out))) + stdout.write( + '"{}" -> "{}" [label="{}"]\n'.format(lastpart(step), lastpart(out), "") + ) + + qres = g.query( + """SELECT ?out ?source + WHERE { + ?wf cwl:outputs ?out . + ?out cwl:source ?source . + }""" + ) + + for out, source in qres: + stdout.write('"%s" [shape=octagon]\n' % (lastpart(out))) + stdout.write( + '"{}" -> "{}" [label="{}"]\n'.format(lastpart(source), lastpart(out), "") + ) + + qres = g.query( + """SELECT ?inp + WHERE { + ?wf rdf:type cwl:Workflow . + ?wf cwl:inputs ?inp . + }""" + ) + + for (inp,) in qres: + stdout.write('"%s" [shape=octagon]\n' % (lastpart(inp))) + + +def dot_without_parameters(g: Graph, stdout: Union[TextIO, StreamWriter]) -> None: + dotname = {} # type: Dict[str,str] + clusternode = {} + + stdout.write("compound=true\n") + + subworkflows = set() + qres = g.query( + """SELECT ?run + WHERE { + ?wf rdf:type cwl:Workflow . + ?wf Workflow:steps ?step . + ?step cwl:run ?run . + ?run rdf:type cwl:Workflow . + } ORDER BY ?wf""" + ) + for (run,) in qres: + subworkflows.add(run) + + qres = g.query( + """SELECT ?wf ?step ?run ?runtype + WHERE { + ?wf rdf:type cwl:Workflow . + ?wf Workflow:steps ?step . + ?step cwl:run ?run . + ?run rdf:type ?runtype . + } ORDER BY ?wf""" + ) + + currentwf = None # type: Optional[str] + for wf, step, _run, runtype in qres: + if step not in dotname: + dotname[step] = lastpart(step) + + if wf != currentwf: + if currentwf is not None: + stdout.write("}\n") + if wf in subworkflows: + if wf not in dotname: + dotname[wf] = "cluster_" + lastpart(wf) + stdout.write( + 'subgraph "{}" {{ label="{}"\n'.format(dotname[wf], lastpart(wf)) + ) + currentwf = wf + clusternode[wf] = step + else: + currentwf = None + + if str(runtype) != "https://w3id.org/cwl/cwl#Workflow": + stdout.write( + '"%s" [label="%s"]\n' + % (dotname[step], urllib.parse.urldefrag(str(step))[1]) + ) + + if currentwf is not None: + stdout.write("}\n") + + qres = g.query( + """SELECT DISTINCT ?src ?sink ?srcrun ?sinkrun + WHERE { + ?wf1 Workflow:steps ?src . + ?wf2 Workflow:steps ?sink . + ?src cwl:out ?out . + ?inp cwl:source ?out . + ?sink cwl:in ?inp . + ?src cwl:run ?srcrun . + ?sink cwl:run ?sinkrun . + }""" + ) + + for src, sink, srcrun, sinkrun in qres: + attr = "" + if srcrun in clusternode: + attr += 'ltail="%s"' % dotname[srcrun] + src = clusternode[srcrun] + if sinkrun in clusternode: + attr += ' lhead="%s"' % dotname[sinkrun] + sink = clusternode[sinkrun] + stdout.write('"{}" -> "{}" [{}]\n'.format(dotname[src], dotname[sink], attr)) + + +def printdot( + wf: Process, + ctx: ContextType, + stdout: Union[TextIO, StreamWriter], +) -> None: + cwl_viewer = CWLViewer(printrdf(wf, ctx, "n3")) # type: CWLViewer + stdout.write(cwl_viewer.dot())