diff env/lib/python3.9/site-packages/cwltool/pack.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/cwltool/pack.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,284 @@
+"""Reformat a CWL document and all its references to be a single stream."""
+
+import copy
+import urllib
+from typing import (
+    Any,
+    Callable,
+    Dict,
+    MutableMapping,
+    MutableSequence,
+    Optional,
+    Set,
+    Union,
+    cast,
+)
+
+from ruamel.yaml.comments import CommentedMap, CommentedSeq
+from schema_salad.ref_resolver import Loader, SubLoader
+from schema_salad.utils import ResolveType
+
+from .context import LoadingContext
+from .load_tool import fetch_document, resolve_and_validate_document
+from .process import shortname, uniquename
+from .update import ORDERED_VERSIONS, update
+from .utils import CWLObjectType, CWLOutputType
+
+LoadRefType = Callable[[Optional[str], str], ResolveType]
+
+
+def find_run(
+    d: Union[CWLObjectType, ResolveType],
+    loadref: LoadRefType,
+    runs: Set[str],
+) -> None:
+    if isinstance(d, MutableSequence):
+        for s in d:
+            find_run(s, loadref, runs)
+    elif isinstance(d, MutableMapping):
+        if "run" in d and isinstance(d["run"], str):
+            if d["run"] not in runs:
+                runs.add(d["run"])
+                find_run(loadref(None, d["run"]), loadref, runs)
+        for s in d.values():
+            find_run(s, loadref, runs)
+
+
+def find_ids(
+    d: Union[CWLObjectType, CWLOutputType, MutableSequence[CWLObjectType], None],
+    ids: Set[str],
+) -> None:
+    if isinstance(d, MutableSequence):
+        for s in d:
+            find_ids(cast(CWLObjectType, s), ids)
+    elif isinstance(d, MutableMapping):
+        for i in ("id", "name"):
+            if i in d and isinstance(d[i], str):
+                ids.add(cast(str, d[i]))
+        for s2 in d.values():
+            find_ids(cast(CWLOutputType, s2), ids)
+
+
+def replace_refs(d: Any, rewrite: Dict[str, str], stem: str, newstem: str) -> None:
+    if isinstance(d, MutableSequence):
+        for s, v in enumerate(d):
+            if isinstance(v, str):
+                if v in rewrite:
+                    d[s] = rewrite[v]
+                elif v.startswith(stem):
+                    d[s] = newstem + v[len(stem) :]
+                    rewrite[v] = d[s]
+            else:
+                replace_refs(v, rewrite, stem, newstem)
+    elif isinstance(d, MutableMapping):
+        for key, val in d.items():
+            if isinstance(val, str):
+                if val in rewrite:
+                    d[key] = rewrite[val]
+                elif val.startswith(stem):
+                    id_ = val[len(stem) :]
+                    # prevent appending newstems if tool is already packed
+                    if id_.startswith(newstem.strip("#")):
+                        d[key] = "#" + id_
+                    else:
+                        d[key] = newstem + id_
+                    rewrite[val] = d[key]
+            replace_refs(val, rewrite, stem, newstem)
+
+
+def import_embed(
+    d: Union[MutableSequence[CWLObjectType], CWLObjectType, CWLOutputType],
+    seen: Set[str],
+) -> None:
+    if isinstance(d, MutableSequence):
+        for v in d:
+            import_embed(cast(CWLOutputType, v), seen)
+    elif isinstance(d, MutableMapping):
+        for n in ("id", "name"):
+            if n in d:
+                if isinstance(d[n], str):
+                    ident = cast(str, d[n])
+                    if ident in seen:
+                        this = ident
+                        d.clear()
+                        d["$import"] = this
+                    else:
+                        this = ident
+                        seen.add(this)
+                        break
+
+        for k in sorted(d.keys()):
+            import_embed(cast(CWLOutputType, d[k]), seen)
+
+
+def pack(
+    loadingContext: LoadingContext,
+    uri: str,
+    rewrite_out: Optional[Dict[str, str]] = None,
+    loader: Optional[Loader] = None,
+) -> CWLObjectType:
+
+    # The workflow document we have in memory right now may have been
+    # updated to the internal CWL version.  We need to reload the
+    # document to go back to its original version.
+    #
+    # What's going on here is that the updater replaces the
+    # documents/fragments in the index with updated ones, the
+    # index is also used as a cache, so we need to go through the
+    # loading process with an empty index and updating turned off
+    # so we have the original un-updated documents.
+    #
+    loadingContext = loadingContext.copy()
+    document_loader = SubLoader(loader or loadingContext.loader or Loader({}))
+    loadingContext.do_update = False
+    loadingContext.loader = document_loader
+    loadingContext.loader.idx = {}
+    loadingContext.metadata = {}
+    loadingContext, docobj, uri = fetch_document(uri, loadingContext)
+    loadingContext, fileuri = resolve_and_validate_document(
+        loadingContext, docobj, uri, preprocess_only=True
+    )
+    if loadingContext.loader is None:
+        raise Exception("loadingContext.loader cannot be none")
+    processobj, metadata = loadingContext.loader.resolve_ref(uri)
+    document_loader = loadingContext.loader
+
+    if isinstance(processobj, MutableMapping):
+        document_loader.idx[processobj["id"]] = CommentedMap(processobj.items())
+    elif isinstance(processobj, MutableSequence):
+        _, frag = urllib.parse.urldefrag(uri)
+        for po in processobj:
+            if not frag:
+                if po["id"].endswith("#main"):
+                    uri = po["id"]
+            document_loader.idx[po["id"]] = CommentedMap(po.items())
+        document_loader.idx[metadata["id"]] = CommentedMap(metadata.items())
+
+    found_versions = {
+        cast(str, loadingContext.metadata["cwlVersion"])
+    }  # type: Set[str]
+
+    def loadref(base: Optional[str], lr_uri: str) -> ResolveType:
+        lr_loadingContext = loadingContext.copy()
+        lr_loadingContext.metadata = {}
+        lr_loadingContext, lr_workflowobj, lr_uri = fetch_document(
+            lr_uri, lr_loadingContext
+        )
+        lr_loadingContext, lr_uri = resolve_and_validate_document(
+            lr_loadingContext, lr_workflowobj, lr_uri
+        )
+        found_versions.add(cast(str, lr_loadingContext.metadata["cwlVersion"]))
+        if lr_loadingContext.loader is None:
+            raise Exception("loader should not be None")
+        return lr_loadingContext.loader.resolve_ref(lr_uri, base_url=base)[0]
+
+    ids = set()  # type: Set[str]
+    find_ids(processobj, ids)
+
+    runs = {uri}
+    find_run(processobj, loadref, runs)
+
+    # Figure out the highest version, everything needs to be updated
+    # to it.
+    m = 0
+    for fv in found_versions:
+        m = max(m, ORDERED_VERSIONS.index(fv))
+    update_to_version = ORDERED_VERSIONS[m]
+
+    for f in runs:
+        find_ids(document_loader.resolve_ref(f)[0], ids)
+
+    names = set()  # type: Set[str]
+    if rewrite_out is None:
+        rewrite = {}  # type: Dict[str, str]
+    else:
+        rewrite = rewrite_out
+
+    mainpath, _ = urllib.parse.urldefrag(uri)
+
+    def rewrite_id(r: str, mainuri: str) -> None:
+        if r == mainuri:
+            rewrite[r] = "#main"
+        elif r.startswith(mainuri) and r[len(mainuri)] in ("#", "/"):
+            if r[len(mainuri) :].startswith("#main/"):
+                rewrite[r] = "#" + uniquename(r[len(mainuri) + 1 :], names)
+            else:
+                rewrite[r] = "#" + uniquename("main/" + r[len(mainuri) + 1 :], names)
+        else:
+            path, frag = urllib.parse.urldefrag(r)
+            if path == mainpath:
+                rewrite[r] = "#" + uniquename(frag, names)
+            else:
+                if path not in rewrite:
+                    rewrite[path] = "#" + uniquename(shortname(path), names)
+
+    sortedids = sorted(ids)
+
+    for r in sortedids:
+        rewrite_id(r, uri)
+
+    packed = CommentedMap(
+        (("$graph", CommentedSeq()), ("cwlVersion", update_to_version))
+    )
+    namespaces = metadata.get("$namespaces", None)
+
+    schemas = set()  # type: Set[str]
+    if "$schemas" in metadata:
+        for each_schema in metadata["$schemas"]:
+            schemas.add(each_schema)
+    for r in sorted(runs):
+        dcr, metadata = document_loader.resolve_ref(r)
+        if isinstance(dcr, CommentedSeq):
+            dcr = dcr[0]
+            dcr = cast(CommentedMap, dcr)
+        if not isinstance(dcr, MutableMapping):
+            continue
+
+        dcr = update(
+            dcr,
+            document_loader,
+            r,
+            loadingContext.enable_dev,
+            metadata,
+            update_to_version,
+        )
+
+        if "http://commonwl.org/cwltool#original_cwlVersion" in metadata:
+            del metadata["http://commonwl.org/cwltool#original_cwlVersion"]
+        if "http://commonwl.org/cwltool#original_cwlVersion" in dcr:
+            del dcr["http://commonwl.org/cwltool#original_cwlVersion"]
+
+        if "$schemas" in metadata:
+            for s in metadata["$schemas"]:
+                schemas.add(s)
+        if dcr.get("class") not in ("Workflow", "CommandLineTool", "ExpressionTool"):
+            continue
+        dc = cast(Dict[str, Any], copy.deepcopy(dcr))
+        v = rewrite[r]
+        dc["id"] = v
+        for n in ("name", "cwlVersion", "$namespaces", "$schemas"):
+            if n in dc:
+                del dc[n]
+        packed["$graph"].append(dc)
+
+    if schemas:
+        packed["$schemas"] = list(schemas)
+
+    for r in list(rewrite.keys()):
+        v = rewrite[r]
+        replace_refs(packed, rewrite, r + "/" if "#" in r else r + "#", v + "/")
+
+    import_embed(packed, set())
+
+    if len(packed["$graph"]) == 1:
+        # duplicate 'cwlVersion' and $schemas inside $graph when there is only
+        # a single item because we will print the contents inside '$graph'
+        # rather than whole dict
+        packed["$graph"][0]["cwlVersion"] = packed["cwlVersion"]
+        if schemas:
+            packed["$graph"][0]["$schemas"] = list(schemas)
+    # always include $namespaces in the #main
+    if namespaces:
+        packed["$graph"][0]["$namespaces"] = namespaces
+
+    return packed