view env/lib/python3.9/site-packages/cwltool/pack.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line source

"""Reformat a CWL document and all its references to be a single stream."""

import copy
import urllib
from typing import (
    Any,
    Callable,
    Dict,
    MutableMapping,
    MutableSequence,
    Optional,
    Set,
    Union,
    cast,
)

from ruamel.yaml.comments import CommentedMap, CommentedSeq
from schema_salad.ref_resolver import Loader, SubLoader
from schema_salad.utils import ResolveType

from .context import LoadingContext
from .load_tool import fetch_document, resolve_and_validate_document
from .process import shortname, uniquename
from .update import ORDERED_VERSIONS, update
from .utils import CWLObjectType, CWLOutputType

LoadRefType = Callable[[Optional[str], str], ResolveType]


def find_run(
    d: Union[CWLObjectType, ResolveType],
    loadref: LoadRefType,
    runs: Set[str],
) -> None:
    if isinstance(d, MutableSequence):
        for s in d:
            find_run(s, loadref, runs)
    elif isinstance(d, MutableMapping):
        if "run" in d and isinstance(d["run"], str):
            if d["run"] not in runs:
                runs.add(d["run"])
                find_run(loadref(None, d["run"]), loadref, runs)
        for s in d.values():
            find_run(s, loadref, runs)


def find_ids(
    d: Union[CWLObjectType, CWLOutputType, MutableSequence[CWLObjectType], None],
    ids: Set[str],
) -> None:
    if isinstance(d, MutableSequence):
        for s in d:
            find_ids(cast(CWLObjectType, s), ids)
    elif isinstance(d, MutableMapping):
        for i in ("id", "name"):
            if i in d and isinstance(d[i], str):
                ids.add(cast(str, d[i]))
        for s2 in d.values():
            find_ids(cast(CWLOutputType, s2), ids)


def replace_refs(d: Any, rewrite: Dict[str, str], stem: str, newstem: str) -> None:
    if isinstance(d, MutableSequence):
        for s, v in enumerate(d):
            if isinstance(v, str):
                if v in rewrite:
                    d[s] = rewrite[v]
                elif v.startswith(stem):
                    d[s] = newstem + v[len(stem) :]
                    rewrite[v] = d[s]
            else:
                replace_refs(v, rewrite, stem, newstem)
    elif isinstance(d, MutableMapping):
        for key, val in d.items():
            if isinstance(val, str):
                if val in rewrite:
                    d[key] = rewrite[val]
                elif val.startswith(stem):
                    id_ = val[len(stem) :]
                    # prevent appending newstems if tool is already packed
                    if id_.startswith(newstem.strip("#")):
                        d[key] = "#" + id_
                    else:
                        d[key] = newstem + id_
                    rewrite[val] = d[key]
            replace_refs(val, rewrite, stem, newstem)


def import_embed(
    d: Union[MutableSequence[CWLObjectType], CWLObjectType, CWLOutputType],
    seen: Set[str],
) -> None:
    if isinstance(d, MutableSequence):
        for v in d:
            import_embed(cast(CWLOutputType, v), seen)
    elif isinstance(d, MutableMapping):
        for n in ("id", "name"):
            if n in d:
                if isinstance(d[n], str):
                    ident = cast(str, d[n])
                    if ident in seen:
                        this = ident
                        d.clear()
                        d["$import"] = this
                    else:
                        this = ident
                        seen.add(this)
                        break

        for k in sorted(d.keys()):
            import_embed(cast(CWLOutputType, d[k]), seen)


def pack(
    loadingContext: LoadingContext,
    uri: str,
    rewrite_out: Optional[Dict[str, str]] = None,
    loader: Optional[Loader] = None,
) -> CWLObjectType:

    # The workflow document we have in memory right now may have been
    # updated to the internal CWL version.  We need to reload the
    # document to go back to its original version.
    #
    # What's going on here is that the updater replaces the
    # documents/fragments in the index with updated ones, the
    # index is also used as a cache, so we need to go through the
    # loading process with an empty index and updating turned off
    # so we have the original un-updated documents.
    #
    loadingContext = loadingContext.copy()
    document_loader = SubLoader(loader or loadingContext.loader or Loader({}))
    loadingContext.do_update = False
    loadingContext.loader = document_loader
    loadingContext.loader.idx = {}
    loadingContext.metadata = {}
    loadingContext, docobj, uri = fetch_document(uri, loadingContext)
    loadingContext, fileuri = resolve_and_validate_document(
        loadingContext, docobj, uri, preprocess_only=True
    )
    if loadingContext.loader is None:
        raise Exception("loadingContext.loader cannot be none")
    processobj, metadata = loadingContext.loader.resolve_ref(uri)
    document_loader = loadingContext.loader

    if isinstance(processobj, MutableMapping):
        document_loader.idx[processobj["id"]] = CommentedMap(processobj.items())
    elif isinstance(processobj, MutableSequence):
        _, frag = urllib.parse.urldefrag(uri)
        for po in processobj:
            if not frag:
                if po["id"].endswith("#main"):
                    uri = po["id"]
            document_loader.idx[po["id"]] = CommentedMap(po.items())
        document_loader.idx[metadata["id"]] = CommentedMap(metadata.items())

    found_versions = {
        cast(str, loadingContext.metadata["cwlVersion"])
    }  # type: Set[str]

    def loadref(base: Optional[str], lr_uri: str) -> ResolveType:
        lr_loadingContext = loadingContext.copy()
        lr_loadingContext.metadata = {}
        lr_loadingContext, lr_workflowobj, lr_uri = fetch_document(
            lr_uri, lr_loadingContext
        )
        lr_loadingContext, lr_uri = resolve_and_validate_document(
            lr_loadingContext, lr_workflowobj, lr_uri
        )
        found_versions.add(cast(str, lr_loadingContext.metadata["cwlVersion"]))
        if lr_loadingContext.loader is None:
            raise Exception("loader should not be None")
        return lr_loadingContext.loader.resolve_ref(lr_uri, base_url=base)[0]

    ids = set()  # type: Set[str]
    find_ids(processobj, ids)

    runs = {uri}
    find_run(processobj, loadref, runs)

    # Figure out the highest version, everything needs to be updated
    # to it.
    m = 0
    for fv in found_versions:
        m = max(m, ORDERED_VERSIONS.index(fv))
    update_to_version = ORDERED_VERSIONS[m]

    for f in runs:
        find_ids(document_loader.resolve_ref(f)[0], ids)

    names = set()  # type: Set[str]
    if rewrite_out is None:
        rewrite = {}  # type: Dict[str, str]
    else:
        rewrite = rewrite_out

    mainpath, _ = urllib.parse.urldefrag(uri)

    def rewrite_id(r: str, mainuri: str) -> None:
        if r == mainuri:
            rewrite[r] = "#main"
        elif r.startswith(mainuri) and r[len(mainuri)] in ("#", "/"):
            if r[len(mainuri) :].startswith("#main/"):
                rewrite[r] = "#" + uniquename(r[len(mainuri) + 1 :], names)
            else:
                rewrite[r] = "#" + uniquename("main/" + r[len(mainuri) + 1 :], names)
        else:
            path, frag = urllib.parse.urldefrag(r)
            if path == mainpath:
                rewrite[r] = "#" + uniquename(frag, names)
            else:
                if path not in rewrite:
                    rewrite[path] = "#" + uniquename(shortname(path), names)

    sortedids = sorted(ids)

    for r in sortedids:
        rewrite_id(r, uri)

    packed = CommentedMap(
        (("$graph", CommentedSeq()), ("cwlVersion", update_to_version))
    )
    namespaces = metadata.get("$namespaces", None)

    schemas = set()  # type: Set[str]
    if "$schemas" in metadata:
        for each_schema in metadata["$schemas"]:
            schemas.add(each_schema)
    for r in sorted(runs):
        dcr, metadata = document_loader.resolve_ref(r)
        if isinstance(dcr, CommentedSeq):
            dcr = dcr[0]
            dcr = cast(CommentedMap, dcr)
        if not isinstance(dcr, MutableMapping):
            continue

        dcr = update(
            dcr,
            document_loader,
            r,
            loadingContext.enable_dev,
            metadata,
            update_to_version,
        )

        if "http://commonwl.org/cwltool#original_cwlVersion" in metadata:
            del metadata["http://commonwl.org/cwltool#original_cwlVersion"]
        if "http://commonwl.org/cwltool#original_cwlVersion" in dcr:
            del dcr["http://commonwl.org/cwltool#original_cwlVersion"]

        if "$schemas" in metadata:
            for s in metadata["$schemas"]:
                schemas.add(s)
        if dcr.get("class") not in ("Workflow", "CommandLineTool", "ExpressionTool"):
            continue
        dc = cast(Dict[str, Any], copy.deepcopy(dcr))
        v = rewrite[r]
        dc["id"] = v
        for n in ("name", "cwlVersion", "$namespaces", "$schemas"):
            if n in dc:
                del dc[n]
        packed["$graph"].append(dc)

    if schemas:
        packed["$schemas"] = list(schemas)

    for r in list(rewrite.keys()):
        v = rewrite[r]
        replace_refs(packed, rewrite, r + "/" if "#" in r else r + "#", v + "/")

    import_embed(packed, set())

    if len(packed["$graph"]) == 1:
        # duplicate 'cwlVersion' and $schemas inside $graph when there is only
        # a single item because we will print the contents inside '$graph'
        # rather than whole dict
        packed["$graph"][0]["cwlVersion"] = packed["cwlVersion"]
        if schemas:
            packed["$graph"][0]["$schemas"] = list(schemas)
    # always include $namespaces in the #main
    if namespaces:
        packed["$graph"][0]["$namespaces"] = namespaces

    return packed