diff env/lib/python3.9/site-packages/cwltool/main.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/cwltool/main.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,1346 @@
+#!/usr/bin/env python3
+# PYTHON_ARGCOMPLETE_OK
+"""Entry point for cwltool."""
+
+import argparse
+import functools
+import io
+import logging
+import os
+import signal
+import subprocess  # nosec
+import sys
+import time
+import urllib
+from codecs import StreamWriter, getwriter
+from collections.abc import MutableMapping, MutableSequence
+from typing import (
+    IO,
+    Any,
+    Callable,
+    Dict,
+    List,
+    Mapping,
+    MutableMapping,
+    MutableSequence,
+    Optional,
+    Sized,
+    TextIO,
+    Tuple,
+    Union,
+    cast,
+)
+
+import argcomplete
+import coloredlogs
+import pkg_resources  # part of setuptools
+from ruamel import yaml
+from ruamel.yaml.comments import CommentedMap, CommentedSeq
+from schema_salad.exceptions import ValidationException
+from schema_salad.ref_resolver import Loader, file_uri, uri_file_path
+from schema_salad.sourceline import strip_dup_lineno
+from schema_salad.utils import ContextType, FetcherCallableType, json_dumps
+
+from . import CWL_CONTENT_TYPES, workflow
+from .argparser import arg_parser, generate_parser, get_default_args
+from .builder import HasReqsHints
+from .context import LoadingContext, RuntimeContext, getdefault
+from .cwlrdf import printdot, printrdf
+from .errors import UnsupportedRequirement, WorkflowException
+from .executors import JobExecutor, MultithreadedJobExecutor, SingleJobExecutor
+from .load_tool import (
+    default_loader,
+    fetch_document,
+    jobloaderctx,
+    load_overrides,
+    make_tool,
+    resolve_and_validate_document,
+    resolve_overrides,
+    resolve_tool_uri,
+)
+from .loghandler import _logger, defaultStreamHandler
+from .mpi import MpiConfig
+from .mutation import MutationManager
+from .pack import pack
+from .process import (
+    CWL_IANA,
+    Process,
+    add_sizes,
+    scandeps,
+    shortname,
+    use_custom_schema,
+    use_standard_schema,
+)
+from .procgenerator import ProcessGenerator
+from .provenance import ResearchObject
+from .resolver import ga4gh_tool_registries, tool_resolver
+from .secrets import SecretStore
+from .software_requirements import (
+    DependenciesConfiguration,
+    get_container_from_software_requirements,
+)
+from .stdfsaccess import StdFsAccess
+from .subgraph import get_step, get_subgraph
+from .update import ALLUPDATES, UPDATES
+from .utils import (
+    DEFAULT_TMP_PREFIX,
+    CWLObjectType,
+    CWLOutputAtomType,
+    CWLOutputType,
+    adjustDirObjs,
+    normalizeFilesDirs,
+    onWindows,
+    processes_to_kill,
+    trim_listing,
+    versionstring,
+    visit_class,
+    windows_default_container_id,
+)
+from .workflow import Workflow
+
+
+def _terminate_processes() -> None:
+    """Kill all spawned processes.
+
+    Processes to be killed must be appended to `utils.processes_to_kill`
+    as they are spawned.
+
+    An important caveat: since there's no supported way to kill another
+    thread in Python, this function cannot stop other threads from
+    continuing to execute while it kills the processes that they've
+    spawned. This may occasionally lead to unexpected behaviour.
+    """
+    # It's possible that another thread will spawn a new task while
+    # we're executing, so it's not safe to use a for loop here.
+    while processes_to_kill:
+        process = processes_to_kill.popleft()
+        cidfile = [
+            str(arg).split("=")[1] for arg in process.args if "--cidfile" in str(arg)
+        ]
+        if cidfile:
+            try:
+                with open(cidfile[0]) as inp_stream:
+                    p = subprocess.Popen(  # nosec
+                        ["docker", "kill", inp_stream.read()], shell=False  # nosec
+                    )
+                    try:
+                        p.wait(timeout=10)
+                    except subprocess.TimeoutExpired:
+                        p.kill()
+            except FileNotFoundError:
+                pass
+
+
+def _signal_handler(signum: int, _: Any) -> None:
+    """Kill all spawned processes and exit.
+
+    Note that it's possible for another thread to spawn a process after
+    all processes have been killed, but before Python exits.
+
+    Refer to the docstring for _terminate_processes() for other caveats.
+    """
+    _terminate_processes()
+    sys.exit(signum)
+
+
+def generate_example_input(
+    inptype: Optional[CWLOutputType],
+    default: Optional[CWLOutputType],
+) -> Tuple[Any, str]:
+    """Convert a single input schema into an example."""
+    example = None
+    comment = ""
+    defaults = {
+        "null": "null",
+        "Any": "null",
+        "boolean": False,
+        "int": 0,
+        "long": 0,
+        "float": 0.1,
+        "double": 0.1,
+        "string": "a_string",
+        "File": yaml.comments.CommentedMap(
+            [("class", "File"), ("path", "a/file/path")]
+        ),
+        "Directory": yaml.comments.CommentedMap(
+            [("class", "Directory"), ("path", "a/directory/path")]
+        ),
+    }  # type: CWLObjectType
+    if isinstance(inptype, MutableSequence):
+        optional = False
+        if "null" in inptype:
+            inptype.remove("null")
+            optional = True
+        if len(inptype) == 1:
+            example, comment = generate_example_input(inptype[0], default)
+            if optional:
+                if comment:
+                    comment = f"{comment} (optional)"
+                else:
+                    comment = "optional"
+        else:
+            example = CommentedSeq()
+            for index, entry in enumerate(inptype):
+                value, e_comment = generate_example_input(entry, default)
+                example.append(value)
+                example.yaml_add_eol_comment(e_comment, index)
+            if optional:
+                comment = "optional"
+    elif isinstance(inptype, Mapping) and "type" in inptype:
+        if inptype["type"] == "array":
+            first_item = cast(MutableSequence[CWLObjectType], inptype["items"])[0]
+            items_len = len(cast(Sized, inptype["items"]))
+            if items_len == 1 and "type" in first_item and first_item["type"] == "enum":
+                # array of just an enum then list all the options
+                example = first_item["symbols"]
+                if "name" in first_item:
+                    comment = 'array of type "{}".'.format(first_item["name"])
+            else:
+                value, comment = generate_example_input(inptype["items"], None)
+                comment = "array of " + comment
+                if items_len == 1:
+                    example = [value]
+                else:
+                    example = value
+            if default is not None:
+                example = default
+        elif inptype["type"] == "enum":
+            symbols = cast(List[str], inptype["symbols"])
+            if default is not None:
+                example = default
+            elif "default" in inptype:
+                example = inptype["default"]
+            elif len(cast(Sized, inptype["symbols"])) == 1:
+                example = symbols[0]
+            else:
+                example = "{}_enum_value".format(inptype.get("name", "valid"))
+            comment = 'enum; valid values: "{}"'.format('", "'.join(symbols))
+        elif inptype["type"] == "record":
+            example = yaml.comments.CommentedMap()
+            if "name" in inptype:
+                comment = '"{}" record type.'.format(inptype["name"])
+            for field in cast(List[CWLObjectType], inptype["fields"]):
+                value, f_comment = generate_example_input(field["type"], None)
+                example.insert(0, shortname(cast(str, field["name"])), value, f_comment)
+        elif "default" in inptype:
+            example = inptype["default"]
+            comment = 'default value of type "{}".'.format(inptype["type"])
+        else:
+            example = defaults.get(cast(str, inptype["type"]), str(inptype))
+            comment = 'type "{}".'.format(inptype["type"])
+    else:
+        if not default:
+            example = defaults.get(str(inptype), str(inptype))
+            comment = f'type "{inptype}"'
+        else:
+            example = default
+            comment = f'default value of type "{inptype}".'
+    return example, comment
+
+
+def realize_input_schema(
+    input_types: MutableSequence[CWLObjectType],
+    schema_defs: MutableMapping[str, CWLObjectType],
+) -> MutableSequence[CWLObjectType]:
+    """Replace references to named typed with the actual types."""
+    for index, entry in enumerate(input_types):
+        if isinstance(entry, str):
+            if "#" in entry:
+                _, input_type_name = entry.split("#")
+            else:
+                input_type_name = entry
+            if input_type_name in schema_defs:
+                entry = input_types[index] = schema_defs[input_type_name]
+        if isinstance(entry, Mapping):
+            if isinstance(entry["type"], str) and "#" in entry["type"]:
+                _, input_type_name = entry["type"].split("#")
+                if input_type_name in schema_defs:
+                    input_types[index]["type"] = cast(
+                        CWLOutputAtomType,
+                        realize_input_schema(
+                            cast(
+                                MutableSequence[CWLObjectType],
+                                schema_defs[input_type_name],
+                            ),
+                            schema_defs,
+                        ),
+                    )
+            if isinstance(entry["type"], MutableSequence):
+                input_types[index]["type"] = cast(
+                    CWLOutputAtomType,
+                    realize_input_schema(
+                        cast(MutableSequence[CWLObjectType], entry["type"]), schema_defs
+                    ),
+                )
+            if isinstance(entry["type"], Mapping):
+                input_types[index]["type"] = cast(
+                    CWLOutputAtomType,
+                    realize_input_schema(
+                        [cast(CWLObjectType, input_types[index]["type"])], schema_defs
+                    ),
+                )
+            if entry["type"] == "array":
+                items = (
+                    entry["items"]
+                    if not isinstance(entry["items"], str)
+                    else [entry["items"]]
+                )
+                input_types[index]["items"] = cast(
+                    CWLOutputAtomType,
+                    realize_input_schema(
+                        cast(MutableSequence[CWLObjectType], items), schema_defs
+                    ),
+                )
+            if entry["type"] == "record":
+                input_types[index]["fields"] = cast(
+                    CWLOutputAtomType,
+                    realize_input_schema(
+                        cast(MutableSequence[CWLObjectType], entry["fields"]),
+                        schema_defs,
+                    ),
+                )
+    return input_types
+
+
+def generate_input_template(tool: Process) -> CWLObjectType:
+    """Generate an example input object for the given CWL process."""
+    template = yaml.comments.CommentedMap()
+    for inp in realize_input_schema(tool.tool["inputs"], tool.schemaDefs):
+        name = shortname(cast(str, inp["id"]))
+        value, comment = generate_example_input(inp["type"], inp.get("default", None))
+        template.insert(0, name, value, comment)
+    return template
+
+
+def load_job_order(
+    args: argparse.Namespace,
+    stdin: IO[Any],
+    fetcher_constructor: Optional[FetcherCallableType],
+    overrides_list: List[CWLObjectType],
+    tool_file_uri: str,
+) -> Tuple[Optional[CWLObjectType], str, Loader]:
+
+    job_order_object = None
+    job_order_file = None
+
+    _jobloaderctx = jobloaderctx.copy()
+    loader = Loader(_jobloaderctx, fetcher_constructor=fetcher_constructor)
+
+    if len(args.job_order) == 1 and args.job_order[0][0] != "-":
+        job_order_file = args.job_order[0]
+    elif len(args.job_order) == 1 and args.job_order[0] == "-":
+        job_order_object = yaml.main.round_trip_load(stdin)
+        job_order_object, _ = loader.resolve_all(
+            job_order_object, file_uri(os.getcwd()) + "/"
+        )
+    else:
+        job_order_file = None
+
+    if job_order_object is not None:
+        input_basedir = args.basedir if args.basedir else os.getcwd()
+    elif job_order_file is not None:
+        input_basedir = (
+            args.basedir
+            if args.basedir
+            else os.path.abspath(os.path.dirname(job_order_file))
+        )
+        job_order_object, _ = loader.resolve_ref(
+            job_order_file,
+            checklinks=False,
+            content_types=CWL_CONTENT_TYPES,
+        )
+
+    if (
+        job_order_object is not None
+        and "http://commonwl.org/cwltool#overrides" in job_order_object
+    ):
+        ov_uri = file_uri(job_order_file or input_basedir)
+        overrides_list.extend(
+            resolve_overrides(job_order_object, ov_uri, tool_file_uri)
+        )
+        del job_order_object["http://commonwl.org/cwltool#overrides"]
+
+    if job_order_object is None:
+        input_basedir = args.basedir if args.basedir else os.getcwd()
+
+    if job_order_object is not None and not isinstance(
+        job_order_object, MutableMapping
+    ):
+        _logger.error(
+            "CWL input object at %s is not formatted correctly, it should be a "
+            "JSON/YAML dictionay, not %s.\n"
+            "Raw input object:\n%s",
+            job_order_file or "stdin",
+            type(job_order_object),
+            job_order_object,
+        )
+        sys.exit(1)
+    return (job_order_object, input_basedir, loader)
+
+
+def init_job_order(
+    job_order_object: Optional[CWLObjectType],
+    args: argparse.Namespace,
+    process: Process,
+    loader: Loader,
+    stdout: Union[TextIO, StreamWriter],
+    print_input_deps: bool = False,
+    relative_deps: str = "primary",
+    make_fs_access: Callable[[str], StdFsAccess] = StdFsAccess,
+    input_basedir: str = "",
+    secret_store: Optional[SecretStore] = None,
+    input_required: bool = True,
+) -> CWLObjectType:
+    secrets_req, _ = process.get_requirement("http://commonwl.org/cwltool#Secrets")
+    if job_order_object is None:
+        namemap = {}  # type: Dict[str, str]
+        records = []  # type: List[str]
+        toolparser = generate_parser(
+            argparse.ArgumentParser(prog=args.workflow),
+            process,
+            namemap,
+            records,
+            input_required,
+        )
+        if args.tool_help:
+            toolparser.print_help()
+            exit(0)
+        cmd_line = vars(toolparser.parse_args(args.job_order))
+        for record_name in records:
+            record = {}
+            record_items = {
+                k: v for k, v in cmd_line.items() if k.startswith(record_name)
+            }
+            for key, value in record_items.items():
+                record[key[len(record_name) + 1 :]] = value
+                del cmd_line[key]
+            cmd_line[str(record_name)] = record
+        if "job_order" in cmd_line and cmd_line["job_order"]:
+            try:
+                job_order_object = cast(
+                    CWLObjectType,
+                    loader.resolve_ref(cmd_line["job_order"])[0],
+                )
+            except Exception:
+                _logger.exception(
+                    "Failed to resolv job_order: %s", cmd_line["job_order"]
+                )
+                exit(1)
+        else:
+            job_order_object = {"id": args.workflow}
+
+        del cmd_line["job_order"]
+
+        job_order_object.update({namemap[k]: v for k, v in cmd_line.items()})
+
+        if secret_store and secrets_req:
+            secret_store.store(
+                [shortname(sc) for sc in cast(List[str], secrets_req["secrets"])],
+                job_order_object,
+            )
+
+        if _logger.isEnabledFor(logging.DEBUG):
+            _logger.debug(
+                "Parsed job order from command line: %s",
+                json_dumps(job_order_object, indent=4),
+            )
+
+    for inp in process.tool["inputs"]:
+        if "default" in inp and (
+            not job_order_object or shortname(inp["id"]) not in job_order_object
+        ):
+            if not job_order_object:
+                job_order_object = {}
+            job_order_object[shortname(inp["id"])] = inp["default"]
+
+    if job_order_object is None:
+        if process.tool["inputs"]:
+            if toolparser is not None:
+                print(f"\nOptions for {args.workflow} ")
+                toolparser.print_help()
+            _logger.error("")
+            _logger.error("Input object required, use --help for details")
+            exit(1)
+        else:
+            job_order_object = {}
+
+    if print_input_deps:
+        basedir = None  # type: Optional[str]
+        uri = cast(str, job_order_object["id"])
+        if uri == args.workflow:
+            basedir = os.path.dirname(uri)
+            uri = ""
+        printdeps(
+            job_order_object,
+            loader,
+            stdout,
+            relative_deps,
+            uri,
+            basedir=basedir,
+            nestdirs=False,
+        )
+        exit(0)
+
+    def path_to_loc(p: CWLObjectType) -> None:
+        if "location" not in p and "path" in p:
+            p["location"] = p["path"]
+            del p["path"]
+
+    ns = {}  # type: ContextType
+    ns.update(cast(ContextType, job_order_object.get("$namespaces", {})))
+    ns.update(cast(ContextType, process.metadata.get("$namespaces", {})))
+    ld = Loader(ns)
+
+    def expand_formats(p: CWLObjectType) -> None:
+        if "format" in p:
+            p["format"] = ld.expand_url(cast(str, p["format"]), "")
+
+    visit_class(job_order_object, ("File", "Directory"), path_to_loc)
+    visit_class(
+        job_order_object,
+        ("File",),
+        functools.partial(add_sizes, make_fs_access(input_basedir)),
+    )
+    visit_class(job_order_object, ("File",), expand_formats)
+    adjustDirObjs(job_order_object, trim_listing)
+    normalizeFilesDirs(job_order_object)
+
+    if secret_store and secrets_req:
+        secret_store.store(
+            [shortname(sc) for sc in cast(List[str], secrets_req["secrets"])],
+            job_order_object,
+        )
+
+    if "cwl:tool" in job_order_object:
+        del job_order_object["cwl:tool"]
+    if "id" in job_order_object:
+        del job_order_object["id"]
+    return job_order_object
+
+
+def make_relative(base: str, obj: CWLObjectType) -> None:
+    """Relativize the location URI of a File or Directory object."""
+    uri = cast(str, obj.get("location", obj.get("path")))
+    if ":" in uri.split("/")[0] and not uri.startswith("file://"):
+        pass
+    else:
+        if uri.startswith("file://"):
+            uri = uri_file_path(uri)
+            obj["location"] = os.path.relpath(uri, base)
+
+
+def printdeps(
+    obj: CWLObjectType,
+    document_loader: Loader,
+    stdout: Union[TextIO, StreamWriter],
+    relative_deps: str,
+    uri: str,
+    basedir: Optional[str] = None,
+    nestdirs: bool = True,
+) -> None:
+    """Print a JSON representation of the dependencies of the CWL document."""
+    deps = find_deps(obj, document_loader, uri, basedir=basedir, nestdirs=nestdirs)
+    if relative_deps == "primary":
+        base = basedir if basedir else os.path.dirname(uri_file_path(str(uri)))
+    elif relative_deps == "cwd":
+        base = os.getcwd()
+    visit_class(deps, ("File", "Directory"), functools.partial(make_relative, base))
+    stdout.write(json_dumps(deps, indent=4))
+
+
+def prov_deps(
+    obj: CWLObjectType,
+    document_loader: Loader,
+    uri: str,
+    basedir: Optional[str] = None,
+) -> CWLObjectType:
+    deps = find_deps(obj, document_loader, uri, basedir=basedir)
+
+    def remove_non_cwl(deps: CWLObjectType) -> None:
+        if "secondaryFiles" in deps:
+            sec_files = cast(List[CWLObjectType], deps["secondaryFiles"])
+            for index, entry in enumerate(sec_files):
+                if not ("format" in entry and entry["format"] == CWL_IANA):
+                    del sec_files[index]
+                else:
+                    remove_non_cwl(entry)
+
+    remove_non_cwl(deps)
+    return deps
+
+
+def find_deps(
+    obj: CWLObjectType,
+    document_loader: Loader,
+    uri: str,
+    basedir: Optional[str] = None,
+    nestdirs: bool = True,
+) -> CWLObjectType:
+    """Find the dependencies of the CWL document."""
+    deps = {
+        "class": "File",
+        "location": uri,
+        "format": CWL_IANA,
+    }  # type: CWLObjectType
+
+    def loadref(base: str, uri: str) -> Union[CommentedMap, CommentedSeq, str, None]:
+        return document_loader.fetch(document_loader.fetcher.urljoin(base, uri))
+
+    sfs = scandeps(
+        basedir if basedir else uri,
+        obj,
+        {"$import", "run"},
+        {"$include", "$schemas", "location"},
+        loadref,
+        nestdirs=nestdirs,
+    )
+    if sfs is not None:
+        deps["secondaryFiles"] = cast(MutableSequence[CWLOutputAtomType], sfs)
+
+    return deps
+
+
+def print_pack(
+    loadingContext: LoadingContext,
+    uri: str,
+) -> str:
+    """Return a CWL serialization of the CWL document in JSON."""
+    packed = pack(loadingContext, uri)
+    if len(cast(Sized, packed["$graph"])) > 1:
+        return json_dumps(packed, indent=4)
+    return json_dumps(
+        cast(MutableSequence[CWLObjectType], packed["$graph"])[0], indent=4
+    )
+
+
+def supported_cwl_versions(enable_dev: bool) -> List[str]:
+    # ALLUPDATES and UPDATES are dicts
+    if enable_dev:
+        versions = list(ALLUPDATES)
+    else:
+        versions = list(UPDATES)
+    versions.sort()
+    return versions
+
+
+def configure_logging(
+    args: argparse.Namespace,
+    stderr_handler: logging.Handler,
+    runtimeContext: RuntimeContext,
+) -> None:
+    rdflib_logger = logging.getLogger("rdflib.term")
+    rdflib_logger.addHandler(stderr_handler)
+    rdflib_logger.setLevel(logging.ERROR)
+    if args.quiet:
+        # Silence STDERR, not an eventual provenance log file
+        stderr_handler.setLevel(logging.WARN)
+    if runtimeContext.debug:
+        # Increase to debug for both stderr and provenance log file
+        _logger.setLevel(logging.DEBUG)
+        stderr_handler.setLevel(logging.DEBUG)
+        rdflib_logger.setLevel(logging.DEBUG)
+    fmtclass = coloredlogs.ColoredFormatter if args.enable_color else logging.Formatter
+    formatter = fmtclass("%(levelname)s %(message)s")
+    if args.timestamps:
+        formatter = fmtclass(
+            "[%(asctime)s] %(levelname)s %(message)s", "%Y-%m-%d %H:%M:%S"
+        )
+    stderr_handler.setFormatter(formatter)
+
+
+def setup_schema(
+    args: argparse.Namespace, custom_schema_callback: Optional[Callable[[], None]]
+) -> None:
+    if custom_schema_callback is not None:
+        custom_schema_callback()
+    elif args.enable_ext:
+        with pkg_resources.resource_stream(__name__, "extensions.yml") as res:
+            ext10 = res.read().decode("utf-8")
+        with pkg_resources.resource_stream(__name__, "extensions-v1.1.yml") as res:
+            ext11 = res.read().decode("utf-8")
+        use_custom_schema("v1.0", "http://commonwl.org/cwltool", ext10)
+        use_custom_schema("v1.1", "http://commonwl.org/cwltool", ext11)
+        use_custom_schema("v1.2.0-dev1", "http://commonwl.org/cwltool", ext11)
+        use_custom_schema("v1.2.0-dev2", "http://commonwl.org/cwltool", ext11)
+        use_custom_schema("v1.2.0-dev3", "http://commonwl.org/cwltool", ext11)
+    else:
+        use_standard_schema("v1.0")
+        use_standard_schema("v1.1")
+        use_standard_schema("v1.2.0-dev1")
+        use_standard_schema("v1.2.0-dev2")
+        use_standard_schema("v1.2.0-dev3")
+
+
+class ProvLogFormatter(logging.Formatter):
+    """Enforce ISO8601 with both T and Z."""
+
+    def __init__(self) -> None:
+        """Use the default formatter with our custom formatstring."""
+        super().__init__("[%(asctime)sZ] %(message)s")
+
+    def formatTime(
+        self, record: logging.LogRecord, datefmt: Optional[str] = None
+    ) -> str:
+        formatted_time = time.strftime(
+            "%Y-%m-%dT%H:%M:%S", time.gmtime(float(record.created))
+        )
+        with_msecs = f"{formatted_time},{record.msecs:03f}"
+        return with_msecs
+
+
+def setup_provenance(
+    args: argparse.Namespace,
+    argsl: List[str],
+    runtimeContext: RuntimeContext,
+) -> Optional[int]:
+    if not args.compute_checksum:
+        _logger.error("--provenance incompatible with --no-compute-checksum")
+        return 1
+    ro = ResearchObject(
+        getdefault(runtimeContext.make_fs_access, StdFsAccess)(""),
+        temp_prefix_ro=args.tmpdir_prefix,
+        orcid=args.orcid,
+        full_name=args.cwl_full_name,
+    )
+    runtimeContext.research_obj = ro
+    log_file_io = ro.open_log_file_for_activity(ro.engine_uuid)
+    prov_log_handler = logging.StreamHandler(cast(IO[str], log_file_io))
+
+    prov_log_handler.setFormatter(ProvLogFormatter())
+    _logger.addHandler(prov_log_handler)
+    _logger.debug("[provenance] Logging to %s", log_file_io)
+    if argsl is not None:
+        # Log cwltool command line options to provenance file
+        _logger.info("[cwltool] %s %s", sys.argv[0], " ".join(argsl))
+    _logger.debug("[cwltool] Arguments: %s", args)
+    return None
+
+
+def setup_loadingContext(
+    loadingContext: Optional[LoadingContext],
+    runtimeContext: RuntimeContext,
+    args: argparse.Namespace,
+) -> LoadingContext:
+    if loadingContext is None:
+        loadingContext = LoadingContext(vars(args))
+    else:
+        loadingContext = loadingContext.copy()
+    loadingContext.loader = default_loader(
+        loadingContext.fetcher_constructor,
+        enable_dev=args.enable_dev,
+        doc_cache=args.doc_cache,
+    )
+    loadingContext.research_obj = runtimeContext.research_obj
+    loadingContext.disable_js_validation = args.disable_js_validation or (
+        not args.do_validate
+    )
+    loadingContext.construct_tool_object = getdefault(
+        loadingContext.construct_tool_object, workflow.default_make_tool
+    )
+    loadingContext.resolver = getdefault(loadingContext.resolver, tool_resolver)
+    if loadingContext.do_update is None:
+        loadingContext.do_update = not (args.pack or args.print_subgraph)
+
+    return loadingContext
+
+
+def make_template(
+    tool: Process,
+) -> None:
+    """Make a template CWL input object for the give Process."""
+
+    def my_represent_none(
+        self: Any, data: Any
+    ) -> Any:  # pylint: disable=unused-argument
+        """Force clean representation of 'null'."""
+        return self.represent_scalar("tag:yaml.org,2002:null", "null")
+
+    yaml.representer.RoundTripRepresenter.add_representer(type(None), my_represent_none)
+    yaml.main.round_trip_dump(
+        generate_input_template(tool),
+        sys.stdout,
+        default_flow_style=False,
+        indent=4,
+        block_seq_indent=2,
+    )
+
+
+def choose_target(
+    args: argparse.Namespace,
+    tool: Process,
+    loadingContext: LoadingContext,
+) -> Optional[Process]:
+    """Walk the Workflow, extract the subset matches all the args.targets."""
+    if loadingContext.loader is None:
+        raise Exception("loadingContext.loader cannot be None")
+
+    if isinstance(tool, Workflow):
+        url = urllib.parse.urlparse(tool.tool["id"])
+        if url.fragment:
+            extracted = get_subgraph(
+                [tool.tool["id"] + "/" + r for r in args.target], tool
+            )
+        else:
+            extracted = get_subgraph(
+                [
+                    loadingContext.loader.fetcher.urljoin(tool.tool["id"], "#" + r)
+                    for r in args.target
+                ],
+                tool,
+            )
+    else:
+        _logger.error("Can only use --target on Workflows")
+        return None
+    if isinstance(loadingContext.loader.idx, MutableMapping):
+        loadingContext.loader.idx[extracted["id"]] = extracted
+        tool = make_tool(extracted["id"], loadingContext)
+    else:
+        raise Exception("Missing loadingContext.loader.idx!")
+
+    return tool
+
+
+def choose_step(
+    args: argparse.Namespace,
+    tool: Process,
+    loadingContext: LoadingContext,
+) -> Optional[Process]:
+    """Walk the given Workflow and extract just args.single_step."""
+    if loadingContext.loader is None:
+        raise Exception("loadingContext.loader cannot be None")
+
+    if isinstance(tool, Workflow):
+        url = urllib.parse.urlparse(tool.tool["id"])
+        if url.fragment:
+            extracted = get_step(tool, tool.tool["id"] + "/" + args.singe_step)
+        else:
+            extracted = get_step(
+                tool,
+                loadingContext.loader.fetcher.urljoin(
+                    tool.tool["id"], "#" + args.single_step
+                ),
+            )
+    else:
+        _logger.error("Can only use --single-step on Workflows")
+        return None
+    if isinstance(loadingContext.loader.idx, MutableMapping):
+        loadingContext.loader.idx[extracted["id"]] = extracted
+        tool = make_tool(extracted["id"], loadingContext)
+    else:
+        raise Exception("Missing loadingContext.loader.idx!")
+
+    return tool
+
+
+def check_working_directories(
+    runtimeContext: RuntimeContext,
+) -> Optional[int]:
+    """Make any needed working directories."""
+    for dirprefix in ("tmpdir_prefix", "tmp_outdir_prefix", "cachedir"):
+        if (
+            getattr(runtimeContext, dirprefix)
+            and getattr(runtimeContext, dirprefix) != DEFAULT_TMP_PREFIX
+        ):
+            sl = (
+                "/"
+                if getattr(runtimeContext, dirprefix).endswith("/")
+                or dirprefix == "cachedir"
+                else ""
+            )
+            setattr(
+                runtimeContext,
+                dirprefix,
+                os.path.abspath(getattr(runtimeContext, dirprefix)) + sl,
+            )
+            if not os.path.exists(os.path.dirname(getattr(runtimeContext, dirprefix))):
+                try:
+                    os.makedirs(os.path.dirname(getattr(runtimeContext, dirprefix)))
+                except Exception:
+                    _logger.exception("Failed to create directory.")
+                    return 1
+    return None
+
+
+def main(
+    argsl: Optional[List[str]] = None,
+    args: Optional[argparse.Namespace] = None,
+    job_order_object: Optional[CWLObjectType] = None,
+    stdin: IO[Any] = sys.stdin,
+    stdout: Optional[Union[TextIO, StreamWriter]] = None,
+    stderr: IO[Any] = sys.stderr,
+    versionfunc: Callable[[], str] = versionstring,
+    logger_handler: Optional[logging.Handler] = None,
+    custom_schema_callback: Optional[Callable[[], None]] = None,
+    executor: Optional[JobExecutor] = None,
+    loadingContext: Optional[LoadingContext] = None,
+    runtimeContext: Optional[RuntimeContext] = None,
+    input_required: bool = True,
+) -> int:
+    if not stdout:  # force UTF-8 even if the console is configured differently
+        if hasattr(sys.stdout, "encoding") and sys.stdout.encoding.upper() not in (
+            "UTF-8",
+            "UTF8",
+        ):
+            if hasattr(sys.stdout, "detach"):
+                stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
+            else:
+                stdout = getwriter("utf-8")(sys.stdout)  # type: ignore
+        else:
+            stdout = sys.stdout
+
+    _logger.removeHandler(defaultStreamHandler)
+    stderr_handler = logger_handler
+    if stderr_handler is not None:
+        _logger.addHandler(stderr_handler)
+    else:
+        coloredlogs.install(logger=_logger, stream=stderr)
+        stderr_handler = _logger.handlers[-1]
+    workflowobj = None
+    prov_log_handler = None  # type: Optional[logging.StreamHandler]
+    try:
+        if args is None:
+            if argsl is None:
+                argsl = sys.argv[1:]
+            addl = []  # type: List[str]
+            if "CWLTOOL_OPTIONS" in os.environ:
+                addl = os.environ["CWLTOOL_OPTIONS"].split(" ")
+            parser = arg_parser()
+            argcomplete.autocomplete(parser)
+            args = parser.parse_args(addl + argsl)
+            if args.record_container_id:
+                if not args.cidfile_dir:
+                    args.cidfile_dir = os.getcwd()
+                del args.record_container_id
+
+        if runtimeContext is None:
+            runtimeContext = RuntimeContext(vars(args))
+        else:
+            runtimeContext = runtimeContext.copy()
+
+        # If on Windows platform, a default Docker Container is used if not
+        # explicitely provided by user
+        if onWindows() and not runtimeContext.default_container:
+            # This docker image is a minimal alpine image with bash installed
+            # (size 6 mb). source: https://github.com/frol/docker-alpine-bash
+            runtimeContext.default_container = windows_default_container_id
+
+        # If caller parsed its own arguments, it may not include every
+        # cwltool option, so fill in defaults to avoid crashing when
+        # dereferencing them in args.
+        for key, val in get_default_args().items():
+            if not hasattr(args, key):
+                setattr(args, key, val)
+
+        configure_logging(args, stderr_handler, runtimeContext)
+
+        if args.version:
+            print(versionfunc())
+            return 0
+        _logger.info(versionfunc())
+
+        if args.print_supported_versions:
+            print("\n".join(supported_cwl_versions(args.enable_dev)))
+            return 0
+
+        if not args.workflow:
+            if os.path.isfile("CWLFile"):
+                args.workflow = "CWLFile"
+            else:
+                _logger.error("CWL document required, no input file was provided")
+                parser.print_help()
+                return 1
+
+        if args.ga4gh_tool_registries:
+            ga4gh_tool_registries[:] = args.ga4gh_tool_registries
+        if not args.enable_ga4gh_tool_registry:
+            del ga4gh_tool_registries[:]
+
+        if args.mpi_config_file is not None:
+            runtimeContext.mpi_config = MpiConfig.load(args.mpi_config_file)
+
+        setup_schema(args, custom_schema_callback)
+
+        if args.provenance:
+            if argsl is None:
+                raise Exception("argsl cannot be None")
+            if setup_provenance(args, argsl, runtimeContext) is not None:
+                return 1
+
+        loadingContext = setup_loadingContext(loadingContext, runtimeContext, args)
+
+        uri, tool_file_uri = resolve_tool_uri(
+            args.workflow,
+            resolver=loadingContext.resolver,
+            fetcher_constructor=loadingContext.fetcher_constructor,
+        )
+
+        try_again_msg = (
+            "" if args.debug else ", try again with --debug for more information"
+        )
+
+        try:
+            job_order_object, input_basedir, jobloader = load_job_order(
+                args,
+                stdin,
+                loadingContext.fetcher_constructor,
+                loadingContext.overrides_list,
+                tool_file_uri,
+            )
+
+            if args.overrides:
+                loadingContext.overrides_list.extend(
+                    load_overrides(
+                        file_uri(os.path.abspath(args.overrides)), tool_file_uri
+                    )
+                )
+
+            loadingContext, workflowobj, uri = fetch_document(uri, loadingContext)
+
+            if args.print_deps and loadingContext.loader:
+                printdeps(
+                    workflowobj, loadingContext.loader, stdout, args.relative_deps, uri
+                )
+                return 0
+
+            loadingContext, uri = resolve_and_validate_document(
+                loadingContext,
+                workflowobj,
+                uri,
+                preprocess_only=(args.print_pre or args.pack),
+                skip_schemas=args.skip_schemas,
+            )
+
+            if loadingContext.loader is None:
+                raise Exception("Impossible code path.")
+            processobj, metadata = loadingContext.loader.resolve_ref(uri)
+            processobj = cast(CommentedMap, processobj)
+            if args.pack:
+                stdout.write(print_pack(loadingContext, uri))
+                return 0
+
+            if args.provenance and runtimeContext.research_obj:
+                # Can't really be combined with args.pack at same time
+                runtimeContext.research_obj.packed_workflow(
+                    print_pack(loadingContext, uri)
+                )
+
+            if args.print_pre:
+                stdout.write(
+                    json_dumps(
+                        processobj, indent=4, sort_keys=True, separators=(",", ": ")
+                    )
+                )
+                return 0
+
+            tool = make_tool(uri, loadingContext)
+            if args.make_template:
+                make_template(tool)
+                return 0
+
+            if args.validate:
+                print(f"{args.workflow} is valid CWL.")
+                return 0
+
+            if args.print_rdf:
+                stdout.write(
+                    printrdf(tool, loadingContext.loader.ctx, args.rdf_serializer)
+                )
+                return 0
+
+            if args.print_dot:
+                printdot(tool, loadingContext.loader.ctx, stdout)
+                return 0
+
+            if args.print_targets:
+                for f in ("outputs", "steps", "inputs"):
+                    if tool.tool[f]:
+                        _logger.info("%s%s targets:", f[0].upper(), f[1:-1])
+                        stdout.write(
+                            "  "
+                            + "\n  ".join([shortname(t["id"]) for t in tool.tool[f]])
+                            + "\n"
+                        )
+                return 0
+
+            if args.target:
+                ctool = choose_target(args, tool, loadingContext)
+                if ctool is None:
+                    return 1
+                else:
+                    tool = ctool
+
+            elif args.single_step:
+                ctool = choose_step(args, tool, loadingContext)
+                if ctool is None:
+                    return 1
+                else:
+                    tool = ctool
+
+            if args.print_subgraph:
+                if "name" in tool.tool:
+                    del tool.tool["name"]
+                stdout.write(
+                    json_dumps(
+                        tool.tool, indent=4, sort_keys=True, separators=(",", ": ")
+                    )
+                )
+                return 0
+
+        except (ValidationException) as exc:
+            _logger.error(
+                "Tool definition failed validation:\n%s", str(exc), exc_info=args.debug
+            )
+            return 1
+        except (RuntimeError, WorkflowException) as exc:
+            _logger.error(
+                "Tool definition failed initialization:\n%s",
+                str(exc),
+                exc_info=args.debug,
+            )
+            return 1
+        except Exception as exc:
+            _logger.error(
+                "I'm sorry, I couldn't load this CWL file%s.\nThe error was: %s",
+                try_again_msg,
+                str(exc) if not args.debug else "",
+                exc_info=args.debug,
+            )
+            return 1
+
+        if isinstance(tool, int):
+            return tool
+
+        # If on MacOS platform, TMPDIR must be set to be under one of the
+        # shared volumes in Docker for Mac
+        # More info: https://dockstore.org/docs/faq
+        if sys.platform == "darwin":
+            default_mac_path = "/private/tmp/docker_tmp"
+            if runtimeContext.tmp_outdir_prefix == DEFAULT_TMP_PREFIX:
+                runtimeContext.tmp_outdir_prefix = default_mac_path
+            if runtimeContext.tmpdir_prefix == DEFAULT_TMP_PREFIX:
+                runtimeContext.tmpdir_prefix = default_mac_path
+
+        if check_working_directories(runtimeContext) is not None:
+            return 1
+
+        if args.cachedir:
+            if args.move_outputs == "move":
+                runtimeContext.move_outputs = "copy"
+            runtimeContext.tmp_outdir_prefix = args.cachedir
+
+        runtimeContext.secret_store = getdefault(
+            runtimeContext.secret_store, SecretStore()
+        )
+        runtimeContext.make_fs_access = getdefault(
+            runtimeContext.make_fs_access, StdFsAccess
+        )
+
+        if not executor:
+            if args.parallel:
+                temp_executor = MultithreadedJobExecutor()
+                runtimeContext.select_resources = temp_executor.select_resources
+                real_executor = temp_executor  # type: JobExecutor
+            else:
+                real_executor = SingleJobExecutor()
+        else:
+            real_executor = executor
+
+        try:
+            runtimeContext.basedir = input_basedir
+
+            if isinstance(tool, ProcessGenerator):
+                tfjob_order = {}  # type: CWLObjectType
+                if loadingContext.jobdefaults:
+                    tfjob_order.update(loadingContext.jobdefaults)
+                if job_order_object:
+                    tfjob_order.update(job_order_object)
+                tfout, tfstatus = real_executor(
+                    tool.embedded_tool, tfjob_order, runtimeContext
+                )
+                if not tfout or tfstatus != "success":
+                    raise WorkflowException(
+                        "ProcessGenerator failed to generate workflow"
+                    )
+                tool, job_order_object = tool.result(tfjob_order, tfout, runtimeContext)
+                if not job_order_object:
+                    job_order_object = None
+
+            try:
+                initialized_job_order_object = init_job_order(
+                    job_order_object,
+                    args,
+                    tool,
+                    jobloader,
+                    stdout,
+                    print_input_deps=args.print_input_deps,
+                    relative_deps=args.relative_deps,
+                    make_fs_access=runtimeContext.make_fs_access,
+                    input_basedir=input_basedir,
+                    secret_store=runtimeContext.secret_store,
+                    input_required=input_required,
+                )
+            except SystemExit as err:
+                return err.code
+
+            del args.workflow
+            del args.job_order
+
+            conf_file = getattr(
+                args, "beta_dependency_resolvers_configuration", None
+            )  # str
+            use_conda_dependencies = getattr(
+                args, "beta_conda_dependencies", None
+            )  # str
+
+            if conf_file or use_conda_dependencies:
+                runtimeContext.job_script_provider = DependenciesConfiguration(args)
+            else:
+                runtimeContext.find_default_container = functools.partial(
+                    find_default_container,
+                    default_container=runtimeContext.default_container,
+                    use_biocontainers=args.beta_use_biocontainers,
+                )
+
+            (out, status) = real_executor(
+                tool, initialized_job_order_object, runtimeContext, logger=_logger
+            )
+
+            if out is not None:
+                if runtimeContext.research_obj is not None:
+                    runtimeContext.research_obj.create_job(out, True)
+
+                    def remove_at_id(doc: CWLObjectType) -> None:
+                        for key in list(doc.keys()):
+                            if key == "@id":
+                                del doc[key]
+                            else:
+                                value = doc[key]
+                                if isinstance(value, MutableMapping):
+                                    remove_at_id(value)
+                                elif isinstance(value, MutableSequence):
+                                    for entry in value:
+                                        if isinstance(entry, MutableMapping):
+                                            remove_at_id(entry)
+
+                    remove_at_id(out)
+                    visit_class(
+                        out,
+                        ("File",),
+                        functools.partial(add_sizes, runtimeContext.make_fs_access("")),
+                    )
+
+                def loc_to_path(obj: CWLObjectType) -> None:
+                    for field in ("path", "nameext", "nameroot", "dirname"):
+                        if field in obj:
+                            del obj[field]
+                    if cast(str, obj["location"]).startswith("file://"):
+                        obj["path"] = uri_file_path(cast(str, obj["location"]))
+
+                visit_class(out, ("File", "Directory"), loc_to_path)
+
+                # Unsetting the Generation from final output object
+                visit_class(out, ("File",), MutationManager().unset_generation)
+
+                if isinstance(out, str):
+                    stdout.write(out)
+                else:
+                    stdout.write(json_dumps(out, indent=4, ensure_ascii=False))
+                stdout.write("\n")
+                if hasattr(stdout, "flush"):
+                    stdout.flush()
+
+            if status != "success":
+                _logger.warning("Final process status is %s", status)
+                return 1
+            _logger.info("Final process status is %s", status)
+            return 0
+
+        except (ValidationException) as exc:
+            _logger.error(
+                "Input object failed validation:\n%s", str(exc), exc_info=args.debug
+            )
+            return 1
+        except UnsupportedRequirement as exc:
+            _logger.error(
+                "Workflow or tool uses unsupported feature:\n%s",
+                str(exc),
+                exc_info=args.debug,
+            )
+            return 33
+        except WorkflowException as exc:
+            _logger.error(
+                "Workflow error%s:\n%s",
+                try_again_msg,
+                strip_dup_lineno(str(exc)),
+                exc_info=args.debug,
+            )
+            return 1
+        except Exception as exc:  # pylint: disable=broad-except
+            _logger.error(
+                "Unhandled error%s:\n  %s",
+                try_again_msg,
+                str(exc),
+                exc_info=args.debug,
+            )
+            return 1
+
+    finally:
+        if (
+            args
+            and runtimeContext
+            and runtimeContext.research_obj
+            and workflowobj
+            and loadingContext
+        ):
+            research_obj = runtimeContext.research_obj
+            if loadingContext.loader is not None:
+                research_obj.generate_snapshot(
+                    prov_deps(workflowobj, loadingContext.loader, uri)
+                )
+            else:
+                _logger.warning(
+                    "Unable to generate provenance snapshot "
+                    " due to missing loadingContext.loader."
+                )
+            if prov_log_handler is not None:
+                # Stop logging so we won't half-log adding ourself to RO
+                _logger.debug(
+                    "[provenance] Closing provenance log file %s", prov_log_handler
+                )
+                _logger.removeHandler(prov_log_handler)
+                # Ensure last log lines are written out
+                prov_log_handler.flush()
+                # Underlying WritableBagFile will add the tagfile to the manifest
+                prov_log_handler.stream.close()
+                prov_log_handler.close()
+            research_obj.close(args.provenance)
+
+        _logger.removeHandler(stderr_handler)
+        _logger.addHandler(defaultStreamHandler)
+
+
+def find_default_container(
+    builder: HasReqsHints,
+    default_container: Optional[str] = None,
+    use_biocontainers: Optional[bool] = None,
+) -> Optional[str]:
+    """Find a container."""
+    if not default_container and use_biocontainers:
+        default_container = get_container_from_software_requirements(
+            use_biocontainers, builder
+        )
+    return default_container
+
+
+def run(*args, **kwargs):
+    # type: (*Any, **Any) -> None
+    """Run cwltool."""
+    signal.signal(signal.SIGTERM, _signal_handler)
+    try:
+        sys.exit(main(*args, **kwargs))
+    finally:
+        _terminate_processes()
+
+
+if __name__ == "__main__":
+    run(sys.argv[1:])