Mercurial > repos > shellac > sam_consensus_v3
view env/lib/python3.9/site-packages/cwltool/cwlrdf.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line source
import urllib from codecs import StreamWriter from typing import Any, Dict, Optional, TextIO, Union, cast from rdflib import Graph from ruamel.yaml.comments import CommentedMap from schema_salad.jsonld_context import makerdf from schema_salad.utils import ContextType from .cwlviewer import CWLViewer from .process import Process def gather(tool: Process, ctx: ContextType) -> Graph: g = Graph() def visitor(t: CommentedMap) -> None: makerdf(t["id"], t, ctx, graph=g) tool.visit(visitor) return g def printrdf(wflow: Process, ctx: ContextType, style: str) -> str: """Serialize the CWL document into a string, ready for printing.""" rdf = gather(wflow, ctx).serialize(format=style, encoding="utf-8") if not rdf: return "" return cast(str, rdf.decode("utf-8")) def lastpart(uri: Any) -> str: uri2 = str(uri) if "/" in uri2: return uri2[uri2.rindex("/") + 1 :] return uri2 def dot_with_parameters(g: Graph, stdout: Union[TextIO, StreamWriter]) -> None: qres = g.query( """SELECT ?step ?run ?runtype WHERE { ?step cwl:run ?run . ?run rdf:type ?runtype . }""" ) for step, run, _ in qres: stdout.write( '"%s" [label="%s"]\n' % (lastpart(step), "{} ({})".format(lastpart(step), lastpart(run))) ) qres = g.query( """SELECT ?step ?inp ?source WHERE { ?wf Workflow:steps ?step . ?step cwl:inputs ?inp . ?inp cwl:source ?source . }""" ) for step, inp, source in qres: stdout.write('"%s" [shape=box]\n' % (lastpart(inp))) stdout.write( '"{}" -> "{}" [label="{}"]\n'.format(lastpart(source), lastpart(inp), "") ) stdout.write( '"{}" -> "{}" [label="{}"]\n'.format(lastpart(inp), lastpart(step), "") ) qres = g.query( """SELECT ?step ?out WHERE { ?wf Workflow:steps ?step . ?step cwl:outputs ?out . }""" ) for step, out in qres: stdout.write('"%s" [shape=box]\n' % (lastpart(out))) stdout.write( '"{}" -> "{}" [label="{}"]\n'.format(lastpart(step), lastpart(out), "") ) qres = g.query( """SELECT ?out ?source WHERE { ?wf cwl:outputs ?out . ?out cwl:source ?source . }""" ) for out, source in qres: stdout.write('"%s" [shape=octagon]\n' % (lastpart(out))) stdout.write( '"{}" -> "{}" [label="{}"]\n'.format(lastpart(source), lastpart(out), "") ) qres = g.query( """SELECT ?inp WHERE { ?wf rdf:type cwl:Workflow . ?wf cwl:inputs ?inp . }""" ) for (inp,) in qres: stdout.write('"%s" [shape=octagon]\n' % (lastpart(inp))) def dot_without_parameters(g: Graph, stdout: Union[TextIO, StreamWriter]) -> None: dotname = {} # type: Dict[str,str] clusternode = {} stdout.write("compound=true\n") subworkflows = set() qres = g.query( """SELECT ?run WHERE { ?wf rdf:type cwl:Workflow . ?wf Workflow:steps ?step . ?step cwl:run ?run . ?run rdf:type cwl:Workflow . } ORDER BY ?wf""" ) for (run,) in qres: subworkflows.add(run) qres = g.query( """SELECT ?wf ?step ?run ?runtype WHERE { ?wf rdf:type cwl:Workflow . ?wf Workflow:steps ?step . ?step cwl:run ?run . ?run rdf:type ?runtype . } ORDER BY ?wf""" ) currentwf = None # type: Optional[str] for wf, step, _run, runtype in qres: if step not in dotname: dotname[step] = lastpart(step) if wf != currentwf: if currentwf is not None: stdout.write("}\n") if wf in subworkflows: if wf not in dotname: dotname[wf] = "cluster_" + lastpart(wf) stdout.write( 'subgraph "{}" {{ label="{}"\n'.format(dotname[wf], lastpart(wf)) ) currentwf = wf clusternode[wf] = step else: currentwf = None if str(runtype) != "https://w3id.org/cwl/cwl#Workflow": stdout.write( '"%s" [label="%s"]\n' % (dotname[step], urllib.parse.urldefrag(str(step))[1]) ) if currentwf is not None: stdout.write("}\n") qres = g.query( """SELECT DISTINCT ?src ?sink ?srcrun ?sinkrun WHERE { ?wf1 Workflow:steps ?src . ?wf2 Workflow:steps ?sink . ?src cwl:out ?out . ?inp cwl:source ?out . ?sink cwl:in ?inp . ?src cwl:run ?srcrun . ?sink cwl:run ?sinkrun . }""" ) for src, sink, srcrun, sinkrun in qres: attr = "" if srcrun in clusternode: attr += 'ltail="%s"' % dotname[srcrun] src = clusternode[srcrun] if sinkrun in clusternode: attr += ' lhead="%s"' % dotname[sinkrun] sink = clusternode[sinkrun] stdout.write('"{}" -> "{}" [{}]\n'.format(dotname[src], dotname[sink], attr)) def printdot( wf: Process, ctx: ContextType, stdout: Union[TextIO, StreamWriter], ) -> None: cwl_viewer = CWLViewer(printrdf(wf, ctx, "n3")) # type: CWLViewer stdout.write(cwl_viewer.dot())