diff env/lib/python3.9/site-packages/cwltool/load_tool.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/cwltool/load_tool.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,520 @@
+"""Loads a CWL document."""
+
+import hashlib
+import logging
+import os
+import re
+import urllib
+import uuid
+from typing import (
+    Any,
+    Dict,
+    List,
+    MutableMapping,
+    MutableSequence,
+    Optional,
+    Tuple,
+    Union,
+    cast,
+)
+
+from ruamel.yaml.comments import CommentedMap, CommentedSeq
+from schema_salad.exceptions import ValidationException
+from schema_salad.ref_resolver import Loader, file_uri
+from schema_salad.schema import validate_doc
+from schema_salad.sourceline import SourceLine, cmap
+from schema_salad.utils import (
+    ContextType,
+    FetcherCallableType,
+    IdxResultType,
+    ResolveType,
+    json_dumps,
+)
+
+from . import CWL_CONTENT_TYPES, process, update
+from .context import LoadingContext
+from .errors import WorkflowException
+from .loghandler import _logger
+from .process import Process, get_schema, shortname
+from .update import ALLUPDATES
+from .utils import CWLObjectType, ResolverType, visit_class
+
+jobloaderctx = {
+    "cwl": "https://w3id.org/cwl/cwl#",
+    "cwltool": "http://commonwl.org/cwltool#",
+    "path": {"@type": "@id"},
+    "location": {"@type": "@id"},
+    "id": "@id",
+}  # type: ContextType
+
+
+overrides_ctx = {
+    "overrideTarget": {"@type": "@id"},
+    "cwltool": "http://commonwl.org/cwltool#",
+    "http://commonwl.org/cwltool#overrides": {
+        "@id": "cwltool:overrides",
+        "mapSubject": "overrideTarget",
+    },
+    "requirements": {
+        "@id": "https://w3id.org/cwl/cwl#requirements",
+        "mapSubject": "class",
+    },
+}  # type: ContextType
+
+
+def default_loader(
+    fetcher_constructor: Optional[FetcherCallableType] = None,
+    enable_dev: bool = False,
+    doc_cache: bool = True,
+) -> Loader:
+    return Loader(
+        jobloaderctx,
+        fetcher_constructor=fetcher_constructor,
+        allow_attachments=lambda r: enable_dev,
+        doc_cache=doc_cache,
+    )
+
+
+def resolve_tool_uri(
+    argsworkflow: str,
+    resolver: Optional[ResolverType] = None,
+    fetcher_constructor: Optional[FetcherCallableType] = None,
+    document_loader: Optional[Loader] = None,
+) -> Tuple[str, str]:
+
+    uri = None  # type: Optional[str]
+    split = urllib.parse.urlsplit(argsworkflow)
+    # In case of Windows path, urlsplit misjudge Drive letters as scheme, here we are skipping that
+    if split.scheme and split.scheme in ["http", "https", "file"]:
+        uri = argsworkflow
+    elif os.path.exists(os.path.abspath(argsworkflow)):
+        uri = file_uri(str(os.path.abspath(argsworkflow)))
+    elif resolver is not None:
+        uri = resolver(
+            document_loader or default_loader(fetcher_constructor), argsworkflow
+        )
+
+    if uri is None:
+        raise ValidationException("Not found: '%s'" % argsworkflow)
+
+    if argsworkflow != uri:
+        _logger.info("Resolved '%s' to '%s'", argsworkflow, uri)
+
+    fileuri = urllib.parse.urldefrag(uri)[0]
+    return uri, fileuri
+
+
+def fetch_document(
+    argsworkflow: Union[str, CWLObjectType],
+    loadingContext: Optional[LoadingContext] = None,
+) -> Tuple[LoadingContext, CommentedMap, str]:
+    """Retrieve a CWL document."""
+    if loadingContext is None:
+        loadingContext = LoadingContext()
+        loadingContext.loader = default_loader()
+    else:
+        loadingContext = loadingContext.copy()
+        if loadingContext.loader is None:
+            loadingContext.loader = default_loader(
+                loadingContext.fetcher_constructor,
+                enable_dev=loadingContext.enable_dev,
+                doc_cache=loadingContext.doc_cache,
+            )
+
+    if isinstance(argsworkflow, str):
+        uri, fileuri = resolve_tool_uri(
+            argsworkflow,
+            resolver=loadingContext.resolver,
+            document_loader=loadingContext.loader,
+        )
+        workflowobj = cast(
+            CommentedMap,
+            loadingContext.loader.fetch(fileuri, content_types=CWL_CONTENT_TYPES),
+        )
+        return loadingContext, workflowobj, uri
+    if isinstance(argsworkflow, MutableMapping):
+        uri = (
+            cast(str, argsworkflow["id"])
+            if argsworkflow.get("id")
+            else "_:" + str(uuid.uuid4())
+        )
+        workflowobj = cast(
+            CommentedMap, cmap(cast(Dict[str, Any], argsworkflow), fn=uri)
+        )
+        loadingContext.loader.idx[uri] = workflowobj
+        return loadingContext, workflowobj, uri
+    raise ValidationException("Must be URI or object: '%s'" % argsworkflow)
+
+
+def _convert_stdstreams_to_files(
+    workflowobj: Union[
+        CWLObjectType, MutableSequence[Union[CWLObjectType, str, int]], str
+    ]
+) -> None:
+    if isinstance(workflowobj, MutableMapping):
+        if workflowobj.get("class") == "CommandLineTool":
+            with SourceLine(
+                workflowobj,
+                "outputs",
+                ValidationException,
+                _logger.isEnabledFor(logging.DEBUG),
+            ):
+                outputs = workflowobj.get("outputs", [])
+                if not isinstance(outputs, CommentedSeq):
+                    raise ValidationException('"outputs" section is not ' "valid.")
+                for out in cast(
+                    MutableSequence[CWLObjectType], workflowobj.get("outputs", [])
+                ):
+                    if not isinstance(out, CommentedMap):
+                        raise ValidationException(
+                            f"Output '{out}' is not a valid OutputParameter."
+                        )
+                    for streamtype in ["stdout", "stderr"]:
+                        if out.get("type") == streamtype:
+                            if "outputBinding" in out:
+                                raise ValidationException(
+                                    "Not allowed to specify outputBinding when"
+                                    " using %s shortcut." % streamtype
+                                )
+                            if streamtype in workflowobj:
+                                filename = workflowobj[streamtype]
+                            else:
+                                filename = str(
+                                    hashlib.sha1(  # nosec
+                                        json_dumps(workflowobj, sort_keys=True).encode(
+                                            "utf-8"
+                                        )
+                                    ).hexdigest()
+                                )
+                                workflowobj[streamtype] = filename
+                            out["type"] = "File"
+                            out["outputBinding"] = cmap({"glob": filename})
+            for inp in cast(
+                MutableSequence[CWLObjectType], workflowobj.get("inputs", [])
+            ):
+                if inp.get("type") == "stdin":
+                    if "inputBinding" in inp:
+                        raise ValidationException(
+                            "Not allowed to specify inputBinding when"
+                            " using stdin shortcut."
+                        )
+                    if "stdin" in workflowobj:
+                        raise ValidationException(
+                            "Not allowed to specify stdin path when"
+                            " using stdin type shortcut."
+                        )
+                    else:
+                        workflowobj["stdin"] = (
+                            "$(inputs.%s.path)"
+                            % cast(str, inp["id"]).rpartition("#")[2]
+                        )
+                        inp["type"] = "File"
+        else:
+            for entry in workflowobj.values():
+                _convert_stdstreams_to_files(
+                    cast(
+                        Union[
+                            CWLObjectType,
+                            MutableSequence[Union[CWLObjectType, str, int]],
+                            str,
+                        ],
+                        entry,
+                    )
+                )
+    if isinstance(workflowobj, MutableSequence):
+        for entry in workflowobj:
+            _convert_stdstreams_to_files(
+                cast(
+                    Union[
+                        CWLObjectType,
+                        MutableSequence[Union[CWLObjectType, str, int]],
+                        str,
+                    ],
+                    entry,
+                )
+            )
+
+
+def _add_blank_ids(
+    workflowobj: Union[CWLObjectType, MutableSequence[Union[CWLObjectType, str]]]
+) -> None:
+    if isinstance(workflowobj, MutableMapping):
+        if (
+            "run" in workflowobj
+            and isinstance(workflowobj["run"], MutableMapping)
+            and "id" not in workflowobj["run"]
+            and "$import" not in workflowobj["run"]
+        ):
+            workflowobj["run"]["id"] = str(uuid.uuid4())
+        for entry in workflowobj.values():
+            _add_blank_ids(
+                cast(
+                    Union[CWLObjectType, MutableSequence[Union[CWLObjectType, str]]],
+                    entry,
+                )
+            )
+    if isinstance(workflowobj, MutableSequence):
+        for entry in workflowobj:
+            _add_blank_ids(
+                cast(
+                    Union[CWLObjectType, MutableSequence[Union[CWLObjectType, str]]],
+                    entry,
+                )
+            )
+
+
+def resolve_and_validate_document(
+    loadingContext: LoadingContext,
+    workflowobj: Union[CommentedMap, CommentedSeq],
+    uri: str,
+    preprocess_only: bool = False,
+    skip_schemas: Optional[bool] = None,
+) -> Tuple[LoadingContext, str]:
+    """Validate a CWL document."""
+    if not loadingContext.loader:
+        raise ValueError("loadingContext must have a loader.")
+    else:
+        loader = loadingContext.loader
+    loadingContext = loadingContext.copy()
+
+    if not isinstance(workflowobj, MutableMapping):
+        raise ValueError(
+            "workflowjobj must be a dict, got '{}': {}".format(
+                type(workflowobj), workflowobj
+            )
+        )
+
+    jobobj = None
+    if "cwl:tool" in workflowobj:
+        jobobj, _ = loader.resolve_all(workflowobj, uri)
+        uri = urllib.parse.urljoin(uri, workflowobj["https://w3id.org/cwl/cwl#tool"])
+        del cast(Dict[str, Any], jobobj)["https://w3id.org/cwl/cwl#tool"]
+
+        workflowobj = fetch_document(uri, loadingContext)[1]
+
+    fileuri = urllib.parse.urldefrag(uri)[0]
+
+    cwlVersion = loadingContext.metadata.get("cwlVersion")
+    if not cwlVersion:
+        cwlVersion = workflowobj.get("cwlVersion")
+    if not cwlVersion and fileuri != uri:
+        # The tool we're loading is a fragment of a bigger file.  Get
+        # the document root element and look for cwlVersion there.
+        metadata = cast(CWLObjectType, fetch_document(fileuri, loadingContext)[1])
+        cwlVersion = cast(str, metadata.get("cwlVersion"))
+    if not cwlVersion:
+        raise ValidationException(
+            "No cwlVersion found. "
+            "Use the following syntax in your CWL document to declare "
+            "the version: cwlVersion: <version>.\n"
+            "Note: if this is a CWL draft-2 (pre v1.0) document then it "
+            "will need to be upgraded first."
+        )
+
+    if not isinstance(cwlVersion, str):
+        with SourceLine(workflowobj, "cwlVersion", ValidationException):
+            raise ValidationException(
+                "'cwlVersion' must be a string, got {}".format(type(cwlVersion))
+            )
+    # strip out version
+    cwlVersion = re.sub(r"^(?:cwl:|https://w3id.org/cwl/cwl#)", "", cwlVersion)
+    if cwlVersion not in list(ALLUPDATES):
+        # print out all the Supported Versions of cwlVersion
+        versions = []
+        for version in list(ALLUPDATES):
+            if "dev" in version:
+                version += " (with --enable-dev flag only)"
+            versions.append(version)
+        versions.sort()
+        raise ValidationException(
+            "The CWL reference runner no longer supports pre CWL v1.0 "
+            "documents. Supported versions are: "
+            "\n{}".format("\n".join(versions))
+        )
+
+    if (
+        isinstance(jobobj, CommentedMap)
+        and "http://commonwl.org/cwltool#overrides" in jobobj
+    ):
+        loadingContext.overrides_list.extend(resolve_overrides(jobobj, uri, uri))
+        del jobobj["http://commonwl.org/cwltool#overrides"]
+
+    if (
+        isinstance(jobobj, CommentedMap)
+        and "https://w3id.org/cwl/cwl#requirements" in jobobj
+    ):
+        if cwlVersion not in ("v1.1.0-dev1", "v1.1"):
+            raise ValidationException(
+                "`cwl:requirements` in the input object is not part of CWL "
+                "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
+                "can set the cwlVersion to v1.1 or greater."
+            )
+        loadingContext.overrides_list.append(
+            {
+                "overrideTarget": uri,
+                "requirements": jobobj["https://w3id.org/cwl/cwl#requirements"],
+            }
+        )
+        del jobobj["https://w3id.org/cwl/cwl#requirements"]
+
+    (sch_document_loader, avsc_names) = process.get_schema(cwlVersion)[:2]
+
+    if isinstance(avsc_names, Exception):
+        raise avsc_names
+
+    processobj = None  # type: Optional[ResolveType]
+    document_loader = Loader(
+        sch_document_loader.ctx,
+        schemagraph=sch_document_loader.graph,
+        idx=loader.idx,
+        cache=sch_document_loader.cache,
+        fetcher_constructor=loadingContext.fetcher_constructor,
+        skip_schemas=skip_schemas,
+        doc_cache=loadingContext.doc_cache,
+    )
+
+    if cwlVersion == "v1.0":
+        _add_blank_ids(workflowobj)
+
+    document_loader.resolve_all(workflowobj, fileuri)
+    processobj, metadata = document_loader.resolve_ref(uri)
+    if not isinstance(processobj, (CommentedMap, CommentedSeq)):
+        raise ValidationException("Workflow must be a CommentedMap or CommentedSeq.")
+
+    if not hasattr(processobj.lc, "filename"):
+        processobj.lc.filename = fileuri
+
+    if loadingContext.metadata:
+        metadata = loadingContext.metadata
+
+    if not isinstance(metadata, CommentedMap):
+        raise ValidationException(
+            "metadata must be a CommentedMap, was %s" % type(metadata)
+        )
+
+    if isinstance(processobj, CommentedMap):
+        uri = processobj["id"]
+
+    _convert_stdstreams_to_files(workflowobj)
+
+    if isinstance(jobobj, CommentedMap):
+        loadingContext.jobdefaults = jobobj
+
+    loadingContext.loader = document_loader
+    loadingContext.avsc_names = avsc_names
+    loadingContext.metadata = metadata
+
+    if preprocess_only:
+        return loadingContext, uri
+
+    if loadingContext.do_validate:
+        validate_doc(avsc_names, processobj, document_loader, loadingContext.strict)
+
+    # None means default behavior (do update)
+    if loadingContext.do_update in (True, None):
+        if "cwlVersion" not in metadata:
+            metadata["cwlVersion"] = cwlVersion
+        processobj = update.update(
+            processobj, document_loader, fileuri, loadingContext.enable_dev, metadata
+        )
+        document_loader.idx[processobj["id"]] = processobj
+
+        def update_index(pr: CommentedMap) -> None:
+            if "id" in pr:
+                document_loader.idx[pr["id"]] = pr
+
+        visit_class(
+            processobj, ("CommandLineTool", "Workflow", "ExpressionTool"), update_index
+        )
+
+    return loadingContext, uri
+
+
+def make_tool(
+    uri: Union[str, CommentedMap, CommentedSeq], loadingContext: LoadingContext
+) -> Process:
+    """Make a Python CWL object."""
+    if loadingContext.loader is None:
+        raise ValueError("loadingContext must have a loader")
+    resolveduri, metadata = loadingContext.loader.resolve_ref(uri)
+
+    processobj = None
+    if isinstance(resolveduri, MutableSequence):
+        for obj in resolveduri:
+            if obj["id"].endswith("#main"):
+                processobj = obj
+                break
+        if not processobj:
+            raise WorkflowException(
+                "Tool file contains graph of multiple objects, must specify "
+                "one of #%s"
+                % ", #".join(
+                    urllib.parse.urldefrag(i["id"])[1] for i in resolveduri if "id" in i
+                )
+            )
+    elif isinstance(resolveduri, MutableMapping):
+        processobj = resolveduri
+    else:
+        raise Exception("Must resolve to list or dict")
+
+    tool = loadingContext.construct_tool_object(processobj, loadingContext)
+
+    if loadingContext.jobdefaults:
+        jobobj = loadingContext.jobdefaults
+        for inp in tool.tool["inputs"]:
+            if shortname(inp["id"]) in jobobj:
+                inp["default"] = jobobj[shortname(inp["id"])]
+
+    return tool
+
+
+def load_tool(
+    argsworkflow: Union[str, CWLObjectType],
+    loadingContext: Optional[LoadingContext] = None,
+) -> Process:
+
+    loadingContext, workflowobj, uri = fetch_document(argsworkflow, loadingContext)
+
+    loadingContext, uri = resolve_and_validate_document(
+        loadingContext, workflowobj, uri
+    )
+
+    return make_tool(uri, loadingContext)
+
+
+def resolve_overrides(
+    ov: IdxResultType,
+    ov_uri: str,
+    baseurl: str,
+) -> List[CWLObjectType]:
+    ovloader = Loader(overrides_ctx)
+    ret, _ = ovloader.resolve_all(ov, baseurl)
+    if not isinstance(ret, CommentedMap):
+        raise Exception("Expected CommentedMap, got %s" % type(ret))
+    cwl_docloader = get_schema("v1.0")[0]
+    cwl_docloader.resolve_all(ret, ov_uri)
+    return cast(List[CWLObjectType], ret["http://commonwl.org/cwltool#overrides"])
+
+
+def load_overrides(ov: str, base_url: str) -> List[CWLObjectType]:
+    ovloader = Loader(overrides_ctx)
+    return resolve_overrides(ovloader.fetch(ov), ov, base_url)
+
+
+def recursive_resolve_and_validate_document(
+    loadingContext: LoadingContext,
+    workflowobj: Union[CommentedMap, CommentedSeq],
+    uri: str,
+    preprocess_only: bool = False,
+    skip_schemas: Optional[bool] = None,
+) -> Tuple[LoadingContext, str, Process]:
+    """Validate a CWL document, checking that a tool object can be built."""
+    loadingContext, uri = resolve_and_validate_document(
+        loadingContext,
+        workflowobj,
+        uri,
+        preprocess_only=preprocess_only,
+        skip_schemas=skip_schemas,
+    )
+    tool = make_tool(uri, loadingContext)
+    return loadingContext, uri, tool