view env/lib/python3.9/site-packages/cwltool/process.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line source

"""Classes and methods relevant for all CWL Proccess types."""
import abc
import copy
import functools
import hashlib
import json
import logging
import math
import os
import shutil
import stat
import textwrap
import urllib
import uuid
from os import scandir
from typing import (
    Any,
    Callable,
    Dict,
    Iterable,
    Iterator,
    List,
    MutableMapping,
    MutableSequence,
    Optional,
    Set,
    Sized,
    Tuple,
    Type,
    Union,
    cast,
)

from pkg_resources import resource_stream
from rdflib import Graph
from ruamel.yaml.comments import CommentedMap, CommentedSeq
from schema_salad.avro.schema import (
    Names,
    Schema,
    SchemaParseException,
    make_avsc_object,
)
from schema_salad.exceptions import ValidationException
from schema_salad.ref_resolver import Loader, file_uri, uri_file_path
from schema_salad.schema import load_schema, make_avro_schema, make_valid_avro
from schema_salad.sourceline import SourceLine, strip_dup_lineno
from schema_salad.utils import convert_to_dict
from schema_salad.validate import validate_ex
from typing_extensions import TYPE_CHECKING

from . import expression
from .builder import Builder, HasReqsHints
from .context import LoadingContext, RuntimeContext, getdefault
from .errors import UnsupportedRequirement, WorkflowException
from .loghandler import _logger
from .mpi import MPIRequirementName
from .pathmapper import MapperEnt, PathMapper
from .secrets import SecretStore
from .stdfsaccess import StdFsAccess
from .update import INTERNAL_VERSION
from .utils import (
    CWLObjectType,
    CWLOutputAtomType,
    CWLOutputType,
    JobsGeneratorType,
    OutputCallbackType,
    adjustDirObjs,
    aslist,
    cmp_like_py2,
    copytree_with_merge,
    ensure_writable,
    get_listing,
    normalizeFilesDirs,
    onWindows,
    random_outdir,
    visit_class,
)
from .validate_js import validate_js_expressions

if TYPE_CHECKING:
    from .provenance_profile import ProvenanceProfile  # pylint: disable=unused-import


class LogAsDebugFilter(logging.Filter):
    def __init__(self, name: str, parent: logging.Logger) -> None:
        """Initialize."""
        name = str(name)
        super().__init__(name)
        self.parent = parent

    def filter(self, record: logging.LogRecord) -> bool:
        return self.parent.isEnabledFor(logging.DEBUG)


_logger_validation_warnings = logging.getLogger("cwltool.validation_warnings")
_logger_validation_warnings.setLevel(_logger.getEffectiveLevel())
_logger_validation_warnings.addFilter(
    LogAsDebugFilter("cwltool.validation_warnings", _logger)
)

supportedProcessRequirements = [
    "DockerRequirement",
    "SchemaDefRequirement",
    "EnvVarRequirement",
    "ScatterFeatureRequirement",
    "SubworkflowFeatureRequirement",
    "MultipleInputFeatureRequirement",
    "InlineJavascriptRequirement",
    "ShellCommandRequirement",
    "StepInputExpressionRequirement",
    "ResourceRequirement",
    "InitialWorkDirRequirement",
    "ToolTimeLimit",
    "WorkReuse",
    "NetworkAccess",
    "InplaceUpdateRequirement",
    "LoadListingRequirement",
    MPIRequirementName,
    "http://commonwl.org/cwltool#TimeLimit",
    "http://commonwl.org/cwltool#WorkReuse",
    "http://commonwl.org/cwltool#NetworkAccess",
    "http://commonwl.org/cwltool#LoadListingRequirement",
    "http://commonwl.org/cwltool#InplaceUpdateRequirement",
]

cwl_files = (
    "Workflow.yml",
    "CommandLineTool.yml",
    "CommonWorkflowLanguage.yml",
    "Process.yml",
    "Operation.yml",
    "concepts.md",
    "contrib.md",
    "intro.md",
    "invocation.md",
)

salad_files = (
    "metaschema.yml",
    "metaschema_base.yml",
    "salad.md",
    "field_name.yml",
    "import_include.md",
    "link_res.yml",
    "ident_res.yml",
    "vocab_res.yml",
    "vocab_res.yml",
    "field_name_schema.yml",
    "field_name_src.yml",
    "field_name_proc.yml",
    "ident_res_schema.yml",
    "ident_res_src.yml",
    "ident_res_proc.yml",
    "link_res_schema.yml",
    "link_res_src.yml",
    "link_res_proc.yml",
    "vocab_res_schema.yml",
    "vocab_res_src.yml",
    "vocab_res_proc.yml",
)

SCHEMA_CACHE = (
    {}
)  # type: Dict[str, Tuple[Loader, Union[Names, SchemaParseException], CWLObjectType, Loader]]
SCHEMA_FILE = None  # type: Optional[CWLObjectType]
SCHEMA_DIR = None  # type: Optional[CWLObjectType]
SCHEMA_ANY = None  # type: Optional[CWLObjectType]

custom_schemas = {}  # type: Dict[str, Tuple[str, str]]


def use_standard_schema(version: str) -> None:
    if version in custom_schemas:
        del custom_schemas[version]
    if version in SCHEMA_CACHE:
        del SCHEMA_CACHE[version]


def use_custom_schema(version: str, name: str, text: str) -> None:
    custom_schemas[version] = (name, text)
    if version in SCHEMA_CACHE:
        del SCHEMA_CACHE[version]


def get_schema(
    version: str,
) -> Tuple[Loader, Union[Names, SchemaParseException], CWLObjectType, Loader]:

    if version in SCHEMA_CACHE:
        return SCHEMA_CACHE[version]

    cache = {}  # type: Dict[str, Union[str, Graph, bool]]
    version = version.split("#")[-1]
    if ".dev" in version:
        version = ".".join(version.split(".")[:-1])
    for f in cwl_files:
        try:
            res = resource_stream(__name__, f"schemas/{version}/{f}")
            cache["https://w3id.org/cwl/" + f] = res.read().decode("UTF-8")
            res.close()
        except OSError:
            pass

    for f in salad_files:
        try:
            res = resource_stream(
                __name__,
                f"schemas/{version}/salad/schema_salad/metaschema/{f}",
            )
            cache[
                "https://w3id.org/cwl/salad/schema_salad/metaschema/" + f
            ] = res.read().decode("UTF-8")
            res.close()
        except OSError:
            pass

    if version in custom_schemas:
        cache[custom_schemas[version][0]] = custom_schemas[version][1]
        SCHEMA_CACHE[version] = load_schema(custom_schemas[version][0], cache=cache)
    else:
        SCHEMA_CACHE[version] = load_schema(
            "https://w3id.org/cwl/CommonWorkflowLanguage.yml", cache=cache
        )

    return SCHEMA_CACHE[version]


def shortname(inputid: str) -> str:
    d = urllib.parse.urlparse(inputid)
    if d.fragment:
        return d.fragment.split("/")[-1]
    return d.path.split("/")[-1]


def checkRequirements(
    rec: Union[MutableSequence[CWLObjectType], CWLObjectType, CWLOutputType, None],
    supported_process_requirements: Iterable[str],
) -> None:
    if isinstance(rec, MutableMapping):
        if "requirements" in rec:
            for i, entry in enumerate(
                cast(MutableSequence[CWLObjectType], rec["requirements"])
            ):
                with SourceLine(rec["requirements"], i, UnsupportedRequirement):
                    if cast(str, entry["class"]) not in supported_process_requirements:
                        raise UnsupportedRequirement(
                            "Unsupported requirement {}".format(entry["class"])
                        )
        for key in rec:
            checkRequirements(rec[key], supported_process_requirements)
    if isinstance(rec, MutableSequence):
        for entry2 in rec:
            checkRequirements(entry2, supported_process_requirements)


def stage_files(
    pathmapper: PathMapper,
    stage_func: Optional[Callable[[str, str], None]] = None,
    ignore_writable: bool = False,
    symlink: bool = True,
    secret_store: Optional[SecretStore] = None,
    fix_conflicts: bool = False,
) -> None:
    """Link or copy files to their targets. Create them as needed."""
    targets = {}  # type: Dict[str, MapperEnt]
    for key, entry in pathmapper.items():
        if "File" not in entry.type:
            continue
        if entry.target not in targets:
            targets[entry.target] = entry
        elif targets[entry.target].resolved != entry.resolved:
            if fix_conflicts:
                # find first key that does not clash with an existing entry in targets
                # start with entry.target + '_' + 2 and then keep incrementing the number till there is no clash
                i = 2
                tgt = f"{entry.target}_{i}"
                while tgt in targets:
                    i += 1
                    tgt = f"{entry.target}_{i}"
                targets[tgt] = pathmapper.update(
                    key, entry.resolved, tgt, entry.type, entry.staged
                )
            else:
                raise WorkflowException(
                    "File staging conflict, trying to stage both %s and %s to the same target %s"
                    % (targets[entry.target].resolved, entry.resolved, entry.target)
                )

    for key, entry in pathmapper.items():
        if not entry.staged:
            continue
        if not os.path.exists(os.path.dirname(entry.target)):
            os.makedirs(os.path.dirname(entry.target))
        if entry.type in ("File", "Directory") and os.path.exists(entry.resolved):
            if symlink:  # Use symlink func if allowed
                if onWindows():
                    if entry.type == "File":
                        shutil.copy(entry.resolved, entry.target)
                    elif entry.type == "Directory":
                        if os.path.exists(entry.target) and os.path.isdir(entry.target):
                            shutil.rmtree(entry.target)
                        copytree_with_merge(entry.resolved, entry.target)
                else:
                    os.symlink(entry.resolved, entry.target)
            elif stage_func is not None:
                stage_func(entry.resolved, entry.target)
        elif (
            entry.type == "Directory"
            and not os.path.exists(entry.target)
            and entry.resolved.startswith("_:")
        ):
            os.makedirs(entry.target)
        elif entry.type == "WritableFile" and not ignore_writable:
            shutil.copy(entry.resolved, entry.target)
            ensure_writable(entry.target)
        elif entry.type == "WritableDirectory" and not ignore_writable:
            if entry.resolved.startswith("_:"):
                os.makedirs(entry.target)
            else:
                shutil.copytree(entry.resolved, entry.target)
                ensure_writable(entry.target)
        elif entry.type == "CreateFile" or entry.type == "CreateWritableFile":
            with open(entry.target, "wb") as new:
                if secret_store is not None:
                    new.write(
                        cast(str, secret_store.retrieve(entry.resolved)).encode("utf-8")
                    )
                else:
                    new.write(entry.resolved.encode("utf-8"))
            if entry.type == "CreateFile":
                os.chmod(entry.target, stat.S_IRUSR)  # Read only
            else:  # it is a "CreateWritableFile"
                ensure_writable(entry.target)
            pathmapper.update(key, entry.target, entry.target, entry.type, entry.staged)


def relocateOutputs(
    outputObj: CWLObjectType,
    destination_path: str,
    source_directories: Set[str],
    action: str,
    fs_access: StdFsAccess,
    compute_checksum: bool = True,
    path_mapper: Type[PathMapper] = PathMapper,
) -> CWLObjectType:
    adjustDirObjs(outputObj, functools.partial(get_listing, fs_access, recursive=True))

    if action not in ("move", "copy"):
        return outputObj

    def _collectDirEntries(
        obj: Union[CWLObjectType, MutableSequence[CWLObjectType], None]
    ) -> Iterator[CWLObjectType]:
        if isinstance(obj, dict):
            if obj.get("class") in ("File", "Directory"):
                yield obj
            else:
                for sub_obj in obj.values():
                    yield from _collectDirEntries(sub_obj)
        elif isinstance(obj, MutableSequence):
            for sub_obj in obj:
                yield from _collectDirEntries(sub_obj)

    def _relocate(src: str, dst: str) -> None:
        if src == dst:
            return

        # If the source is not contained in source_directories we're not allowed to delete it
        src = fs_access.realpath(src)
        src_can_deleted = any(
            os.path.commonprefix([p, src]) == p for p in source_directories
        )

        _action = "move" if action == "move" and src_can_deleted else "copy"

        if _action == "move":
            _logger.debug("Moving %s to %s", src, dst)
            if fs_access.isdir(src) and fs_access.isdir(dst):
                # merge directories
                for dir_entry in scandir(src):
                    _relocate(dir_entry.path, fs_access.join(dst, dir_entry.name))
            else:
                shutil.move(src, dst)

        elif _action == "copy":
            _logger.debug("Copying %s to %s", src, dst)
            if fs_access.isdir(src):
                if os.path.isdir(dst):
                    shutil.rmtree(dst)
                elif os.path.isfile(dst):
                    os.unlink(dst)
                shutil.copytree(src, dst)
            else:
                shutil.copy2(src, dst)

    def _realpath(
        ob: CWLObjectType,
    ) -> None:  # should be type Union[CWLFile, CWLDirectory]
        location = cast(str, ob["location"])
        if location.startswith("file:"):
            ob["location"] = file_uri(os.path.realpath(uri_file_path(location)))
        elif location.startswith("/"):
            ob["location"] = os.path.realpath(location)
        elif not location.startswith("_:") and ":" in location:
            ob["location"] = file_uri(fs_access.realpath(location))

    outfiles = list(_collectDirEntries(outputObj))
    visit_class(outfiles, ("File", "Directory"), _realpath)
    pm = path_mapper(outfiles, "", destination_path, separateDirs=False)
    stage_files(pm, stage_func=_relocate, symlink=False, fix_conflicts=True)

    def _check_adjust(a_file: CWLObjectType) -> CWLObjectType:
        a_file["location"] = file_uri(pm.mapper(cast(str, a_file["location"]))[1])
        if "contents" in a_file:
            del a_file["contents"]
        return a_file

    visit_class(outputObj, ("File", "Directory"), _check_adjust)

    if compute_checksum:
        visit_class(
            outputObj, ("File",), functools.partial(compute_checksums, fs_access)
        )
    return outputObj


def cleanIntermediate(output_dirs: Iterable[str]) -> None:
    for a in output_dirs:
        if os.path.exists(a):
            _logger.debug("Removing intermediate output directory %s", a)
            shutil.rmtree(a, True)


def add_sizes(fsaccess: StdFsAccess, obj: CWLObjectType) -> None:
    if "location" in obj:
        try:
            if "size" not in obj:
                obj["size"] = fsaccess.size(cast(str, obj["location"]))
        except OSError:
            pass
    elif "contents" in obj:
        obj["size"] = len(cast(Sized, obj["contents"]))
    return  # best effort


def fill_in_defaults(
    inputs: List[CWLObjectType],
    job: CWLObjectType,
    fsaccess: StdFsAccess,
) -> None:
    for e, inp in enumerate(inputs):
        with SourceLine(
            inputs, e, WorkflowException, _logger.isEnabledFor(logging.DEBUG)
        ):
            fieldname = shortname(cast(str, inp["id"]))
            if job.get(fieldname) is not None:
                pass
            elif job.get(fieldname) is None and "default" in inp:
                job[fieldname] = copy.deepcopy(inp["default"])
            elif job.get(fieldname) is None and "null" in aslist(inp["type"]):
                job[fieldname] = None
            else:
                raise WorkflowException(
                    "Missing required input parameter '%s'"
                    % shortname(cast(str, inp["id"]))
                )


def avroize_type(
    field_type: Union[
        CWLObjectType, MutableSequence[CWLOutputType], CWLOutputType, None
    ],
    name_prefix: str = "",
) -> None:
    """Add missing information to a type so that CWL types are valid."""
    if isinstance(field_type, MutableSequence):
        for field in field_type:
            avroize_type(field, name_prefix)
    elif isinstance(field_type, MutableMapping):
        if field_type["type"] in ("enum", "record"):
            if "name" not in field_type:
                field_type["name"] = name_prefix + str(uuid.uuid4())
        if field_type["type"] == "record":
            avroize_type(
                cast(MutableSequence[CWLOutputType], field_type["fields"]), name_prefix
            )
        if field_type["type"] == "array":
            avroize_type(
                cast(MutableSequence[CWLOutputType], field_type["items"]), name_prefix
            )
        if isinstance(field_type["type"], MutableSequence):
            for ctype in field_type["type"]:
                avroize_type(cast(CWLOutputType, ctype), name_prefix)


def get_overrides(
    overrides: MutableSequence[CWLObjectType], toolid: str
) -> CWLObjectType:
    req = {}  # type: CWLObjectType
    if not isinstance(overrides, MutableSequence):
        raise ValidationException(
            "Expected overrides to be a list, but was %s" % type(overrides)
        )
    for ov in overrides:
        if ov["overrideTarget"] == toolid:
            req.update(ov)
    return req


_VAR_SPOOL_ERROR = textwrap.dedent(
    """
    Non-portable reference to /var/spool/cwl detected: '{}'.
    To fix, replace /var/spool/cwl with $(runtime.outdir) or add
    DockerRequirement to the 'requirements' section and declare
    'dockerOutputDirectory: /var/spool/cwl'.
    """
)


def var_spool_cwl_detector(
    obj: CWLOutputType,
    item: Optional[Any] = None,
    obj_key: Optional[Any] = None,
) -> bool:
    """Detect any textual reference to /var/spool/cwl."""
    r = False
    if isinstance(obj, str):
        if "var/spool/cwl" in obj and obj_key != "dockerOutputDirectory":
            _logger.warning(
                SourceLine(item=item, key=obj_key, raise_type=str).makeError(
                    _VAR_SPOOL_ERROR.format(obj)
                )
            )
            r = True
    elif isinstance(obj, MutableMapping):
        for mkey, mvalue in obj.items():
            r = var_spool_cwl_detector(cast(CWLOutputType, mvalue), obj, mkey) or r
    elif isinstance(obj, MutableSequence):
        for lkey, lvalue in enumerate(obj):
            r = var_spool_cwl_detector(cast(CWLOutputType, lvalue), obj, lkey) or r
    return r


def eval_resource(
    builder: Builder, resource_req: Union[str, int, float]
) -> Optional[Union[str, int, float]]:
    if isinstance(resource_req, str) and expression.needs_parsing(resource_req):
        result = builder.do_eval(resource_req)
        if isinstance(result, (str, int)) or result is None:
            return result
        raise WorkflowException(
            "Got incorrect return type {} from resource expression evaluation of {}.".format(
                type(result), resource_req
            )
        )
    return resource_req


# Threshold where the "too many files" warning kicks in
FILE_COUNT_WARNING = 5000


class Process(HasReqsHints, metaclass=abc.ABCMeta):
    def __init__(
        self, toolpath_object: CommentedMap, loadingContext: LoadingContext
    ) -> None:
        """Build a Process object from the provided dictionary."""
        super().__init__()
        self.metadata = getdefault(loadingContext.metadata, {})  # type: CWLObjectType
        self.provenance_object = None  # type: Optional[ProvenanceProfile]
        self.parent_wf = None  # type: Optional[ProvenanceProfile]
        global SCHEMA_FILE, SCHEMA_DIR, SCHEMA_ANY  # pylint: disable=global-statement
        if SCHEMA_FILE is None or SCHEMA_ANY is None or SCHEMA_DIR is None:
            get_schema("v1.0")
            SCHEMA_ANY = cast(
                CWLObjectType,
                SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/salad#Any"],
            )
            SCHEMA_FILE = cast(
                CWLObjectType,
                SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/cwl#File"],
            )
            SCHEMA_DIR = cast(
                CWLObjectType,
                SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/cwl#Directory"],
            )

        self.names = make_avro_schema([SCHEMA_FILE, SCHEMA_DIR, SCHEMA_ANY], Loader({}))
        self.tool = toolpath_object
        self.requirements = copy.deepcopy(getdefault(loadingContext.requirements, []))
        self.requirements.extend(self.tool.get("requirements", []))
        if "id" not in self.tool:
            self.tool["id"] = "_:" + str(uuid.uuid4())
        self.requirements.extend(
            cast(
                List[CWLObjectType],
                get_overrides(
                    getdefault(loadingContext.overrides_list, []), self.tool["id"]
                ).get("requirements", []),
            )
        )
        self.hints = copy.deepcopy(getdefault(loadingContext.hints, []))
        self.hints.extend(self.tool.get("hints", []))
        # Versions of requirements and hints which aren't mutated.
        self.original_requirements = copy.deepcopy(self.requirements)
        self.original_hints = copy.deepcopy(self.hints)
        self.doc_loader = loadingContext.loader
        self.doc_schema = loadingContext.avsc_names

        self.formatgraph = None  # type: Optional[Graph]
        if self.doc_loader is not None:
            self.formatgraph = self.doc_loader.graph

        checkRequirements(self.tool, supportedProcessRequirements)
        self.validate_hints(
            cast(Names, loadingContext.avsc_names),
            self.tool.get("hints", []),
            strict=getdefault(loadingContext.strict, False),
        )

        self.schemaDefs = {}  # type: MutableMapping[str, CWLObjectType]

        sd, _ = self.get_requirement("SchemaDefRequirement")

        if sd is not None:
            sdtypes = cast(MutableSequence[CWLObjectType], sd["types"])
            avroize_type(cast(MutableSequence[CWLOutputType], sdtypes))
            av = make_valid_avro(
                sdtypes,
                {cast(str, t["name"]): cast(Dict[str, Any], t) for t in sdtypes},
                set(),
            )
            for i in av:
                self.schemaDefs[i["name"]] = i  # type: ignore
            make_avsc_object(convert_to_dict(av), self.names)

        # Build record schema from inputs
        self.inputs_record_schema = {
            "name": "input_record_schema",
            "type": "record",
            "fields": [],
        }  # type: CWLObjectType
        self.outputs_record_schema = {
            "name": "outputs_record_schema",
            "type": "record",
            "fields": [],
        }  # type: CWLObjectType

        for key in ("inputs", "outputs"):
            for i in self.tool[key]:
                c = copy.deepcopy(i)
                c["name"] = shortname(c["id"])
                del c["id"]

                if "type" not in c:
                    raise ValidationException(
                        "Missing 'type' in parameter '{}'".format(c["name"])
                    )

                if "default" in c and "null" not in aslist(c["type"]):
                    nullable = ["null"]
                    nullable.extend(aslist(c["type"]))
                    c["type"] = nullable
                else:
                    c["type"] = c["type"]
                avroize_type(c["type"], c["name"])
                if key == "inputs":
                    cast(
                        List[CWLObjectType], self.inputs_record_schema["fields"]
                    ).append(c)
                elif key == "outputs":
                    cast(
                        List[CWLObjectType], self.outputs_record_schema["fields"]
                    ).append(c)

        with SourceLine(toolpath_object, "inputs", ValidationException):
            self.inputs_record_schema = cast(
                CWLObjectType,
                make_valid_avro(self.inputs_record_schema, {}, set()),
            )
            make_avsc_object(convert_to_dict(self.inputs_record_schema), self.names)
        with SourceLine(toolpath_object, "outputs", ValidationException):
            self.outputs_record_schema = cast(
                CWLObjectType,
                make_valid_avro(self.outputs_record_schema, {}, set()),
            )
            make_avsc_object(convert_to_dict(self.outputs_record_schema), self.names)

        if toolpath_object.get("class") is not None and not getdefault(
            loadingContext.disable_js_validation, False
        ):
            validate_js_options = (
                None
            )  # type: Optional[Dict[str, Union[List[str], str, int]]]
            if loadingContext.js_hint_options_file is not None:
                try:
                    with open(loadingContext.js_hint_options_file) as options_file:
                        validate_js_options = json.load(options_file)
                except (OSError, ValueError):
                    _logger.error(
                        "Failed to read options file %s",
                        loadingContext.js_hint_options_file,
                    )
                    raise
            if self.doc_schema is not None:
                validate_js_expressions(
                    toolpath_object,
                    self.doc_schema.names[toolpath_object["class"]],
                    validate_js_options,
                )

        dockerReq, is_req = self.get_requirement("DockerRequirement")

        if (
            dockerReq is not None
            and "dockerOutputDirectory" in dockerReq
            and is_req is not None
            and not is_req
        ):
            _logger.warning(
                SourceLine(item=dockerReq, raise_type=str).makeError(
                    "When 'dockerOutputDirectory' is declared, DockerRequirement "
                    "should go in the 'requirements' section, not 'hints'."
                    ""
                )
            )

        if (
            dockerReq is not None
            and is_req is not None
            and dockerReq.get("dockerOutputDirectory") == "/var/spool/cwl"
        ):
            if is_req:
                # In this specific case, it is legal to have /var/spool/cwl, so skip the check.
                pass
            else:
                # Must be a requirement
                var_spool_cwl_detector(self.tool)
        else:
            var_spool_cwl_detector(self.tool)

    def _init_job(
        self, joborder: CWLObjectType, runtime_context: RuntimeContext
    ) -> Builder:

        if self.metadata.get("cwlVersion") != INTERNAL_VERSION:
            raise WorkflowException(
                "Process object loaded with version '%s', must update to '%s' in order to execute."
                % (self.metadata.get("cwlVersion"), INTERNAL_VERSION)
            )

        job = copy.deepcopy(joborder)

        make_fs_access = getdefault(runtime_context.make_fs_access, StdFsAccess)
        fs_access = make_fs_access(runtime_context.basedir)

        load_listing_req, _ = self.get_requirement("LoadListingRequirement")

        load_listing = (
            cast(str, load_listing_req.get("loadListing"))
            if load_listing_req is not None
            else "no_listing"
        )

        # Validate job order
        try:
            fill_in_defaults(self.tool["inputs"], job, fs_access)

            normalizeFilesDirs(job)
            schema = self.names.get_name("input_record_schema", None)
            if schema is None:
                raise WorkflowException(
                    "Missing input record schema: " "{}".format(self.names)
                )
            validate_ex(schema, job, strict=False, logger=_logger_validation_warnings)

            if load_listing and load_listing != "no_listing":
                get_listing(fs_access, job, recursive=(load_listing == "deep_listing"))

            visit_class(job, ("File",), functools.partial(add_sizes, fs_access))

            if load_listing == "deep_listing":
                for i, inparm in enumerate(self.tool["inputs"]):
                    k = shortname(inparm["id"])
                    if k not in job:
                        continue
                    v = job[k]
                    dircount = [0]

                    def inc(d):  # type: (List[int]) -> None
                        d[0] += 1

                    visit_class(v, ("Directory",), lambda x: inc(dircount))
                    if dircount[0] == 0:
                        continue
                    filecount = [0]
                    visit_class(v, ("File",), lambda x: inc(filecount))
                    if filecount[0] > FILE_COUNT_WARNING:
                        # Long lines in this message are okay, will be reflowed based on terminal columns.
                        _logger.warning(
                            strip_dup_lineno(
                                SourceLine(self.tool["inputs"], i, str).makeError(
                                    """Recursive directory listing has resulted in a large number of File objects (%s) passed to the input parameter '%s'.  This may negatively affect workflow performance and memory use.

If this is a problem, use the hint 'cwltool:LoadListingRequirement' with "shallow_listing" or "no_listing" to change the directory listing behavior:

$namespaces:
  cwltool: "http://commonwl.org/cwltool#"
hints:
  cwltool:LoadListingRequirement:
    loadListing: shallow_listing

"""
                                    % (filecount[0], k)
                                )
                            )
                        )

        except (ValidationException, WorkflowException) as err:
            raise WorkflowException("Invalid job input record:\n" + str(err)) from err

        files = []  # type: List[CWLObjectType]
        bindings = CommentedSeq()
        outdir = ""
        tmpdir = ""
        stagedir = ""

        docker_req, _ = self.get_requirement("DockerRequirement")
        default_docker = None

        if docker_req is None and runtime_context.default_container:
            default_docker = runtime_context.default_container

        if (docker_req or default_docker) and runtime_context.use_container:
            if docker_req is not None:
                # Check if docker output directory is absolute
                if docker_req.get("dockerOutputDirectory") and cast(
                    str, docker_req.get("dockerOutputDirectory")
                ).startswith("/"):
                    outdir = cast(str, docker_req.get("dockerOutputDirectory"))
                else:
                    outdir = cast(
                        str,
                        docker_req.get("dockerOutputDirectory")
                        or runtime_context.docker_outdir
                        or random_outdir(),
                    )
            elif default_docker is not None:
                outdir = runtime_context.docker_outdir or random_outdir()
            tmpdir = runtime_context.docker_tmpdir or "/tmp"  # nosec
            stagedir = runtime_context.docker_stagedir or "/var/lib/cwl"
        else:
            if self.tool["class"] == "CommandLineTool":
                outdir = fs_access.realpath(runtime_context.get_outdir())
                tmpdir = fs_access.realpath(runtime_context.get_tmpdir())
                stagedir = fs_access.realpath(runtime_context.get_stagedir())

        cwl_version = cast(
            str,
            self.metadata.get("http://commonwl.org/cwltool#original_cwlVersion", None),
        )
        builder = Builder(
            job,
            files,
            bindings,
            self.schemaDefs,
            self.names,
            self.requirements,
            self.hints,
            {},
            runtime_context.mutation_manager,
            self.formatgraph,
            make_fs_access,
            fs_access,
            runtime_context.job_script_provider,
            runtime_context.eval_timeout,
            runtime_context.debug,
            runtime_context.js_console,
            runtime_context.force_docker_pull,
            load_listing,
            outdir,
            tmpdir,
            stagedir,
            cwl_version,
        )

        bindings.extend(
            builder.bind_input(
                self.inputs_record_schema,
                job,
                discover_secondaryFiles=getdefault(runtime_context.toplevel, False),
            )
        )

        if self.tool.get("baseCommand"):
            for index, command in enumerate(aslist(self.tool["baseCommand"])):
                bindings.append({"position": [-1000000, index], "datum": command})

        if self.tool.get("arguments"):
            for i, arg in enumerate(self.tool["arguments"]):
                lc = self.tool["arguments"].lc.data[i]
                filename = self.tool["arguments"].lc.filename
                bindings.lc.add_kv_line_col(len(bindings), lc)
                if isinstance(arg, MutableMapping):
                    arg = copy.deepcopy(arg)
                    if arg.get("position"):
                        position = arg.get("position")
                        if isinstance(position, str):  # no need to test the
                            # CWLVersion as the v1.0
                            # schema only allows ints
                            position = builder.do_eval(position)
                            if position is None:
                                position = 0
                        arg["position"] = [position, i]
                    else:
                        arg["position"] = [0, i]
                    bindings.append(arg)
                elif ("$(" in arg) or ("${" in arg):
                    cm = CommentedMap((("position", [0, i]), ("valueFrom", arg)))
                    cm.lc.add_kv_line_col("valueFrom", lc)
                    cm.lc.filename = filename
                    bindings.append(cm)
                else:
                    cm = CommentedMap((("position", [0, i]), ("datum", arg)))
                    cm.lc.add_kv_line_col("datum", lc)
                    cm.lc.filename = filename
                    bindings.append(cm)

        # use python2 like sorting of heterogeneous lists
        # (containing str and int types),
        key = functools.cmp_to_key(cmp_like_py2)

        # This awkward construction replaces the contents of
        # "bindings" in place (because Builder expects it to be
        # mutated in place, sigh, I'm sorry) with its contents sorted,
        # supporting different versions of Python and ruamel.yaml with
        # different behaviors/bugs in CommentedSeq.
        bindings_copy = copy.deepcopy(bindings)
        del bindings[:]
        bindings.extend(sorted(bindings_copy, key=key))

        if self.tool["class"] != "Workflow":
            builder.resources = self.evalResources(builder, runtime_context)
        return builder

    def evalResources(
        self, builder: Builder, runtimeContext: RuntimeContext
    ) -> Dict[str, Union[int, float, str]]:
        resourceReq, _ = self.get_requirement("ResourceRequirement")
        if resourceReq is None:
            resourceReq = {}
        cwl_version = self.metadata.get(
            "http://commonwl.org/cwltool#original_cwlVersion", None
        )
        if cwl_version == "v1.0":
            ram = 1024
        else:
            ram = 256
        request: Dict[str, Union[int, float, str]] = {
            "coresMin": 1,
            "coresMax": 1,
            "ramMin": ram,
            "ramMax": ram,
            "tmpdirMin": 1024,
            "tmpdirMax": 1024,
            "outdirMin": 1024,
            "outdirMax": 1024,
        }
        for a in ("cores", "ram", "tmpdir", "outdir"):
            mn = mx = None  # type: Optional[Union[int, float]]
            if resourceReq.get(a + "Min"):
                mn = cast(
                    Union[int, float],
                    eval_resource(
                        builder, cast(Union[str, int, float], resourceReq[a + "Min"])
                    ),
                )
            if resourceReq.get(a + "Max"):
                mx = cast(
                    Union[int, float],
                    eval_resource(
                        builder, cast(Union[str, int, float], resourceReq[a + "Max"])
                    ),
                )
            if mn is None:
                mn = mx
            elif mx is None:
                mx = mn

            if mn is not None:
                request[a + "Min"] = mn
                request[a + "Max"] = cast(Union[int, float], mx)

        if runtimeContext.select_resources is not None:
            return runtimeContext.select_resources(request, runtimeContext)
        return {
            "cores": request["coresMin"],
            "ram": math.ceil(request["ramMin"])
            if not isinstance(request["ramMin"], str)
            else request["ramMin"],
            "tmpdirSize": math.ceil(request["tmpdirMin"])
            if not isinstance(request["tmpdirMin"], str)
            else request["tmpdirMin"],
            "outdirSize": math.ceil(request["outdirMin"])
            if not isinstance(request["outdirMin"], str)
            else request["outdirMin"],
        }

    def validate_hints(
        self, avsc_names: Names, hints: List[CWLObjectType], strict: bool
    ) -> None:
        for i, r in enumerate(hints):
            sl = SourceLine(hints, i, ValidationException)
            with sl:
                if (
                    avsc_names.get_name(cast(str, r["class"]), None) is not None
                    and self.doc_loader is not None
                ):
                    plain_hint = {
                        key: r[key]
                        for key in r
                        if key not in self.doc_loader.identifiers
                    }  # strip identifiers
                    validate_ex(
                        cast(
                            Schema,
                            avsc_names.get_name(cast(str, plain_hint["class"]), None),
                        ),
                        plain_hint,
                        strict=strict,
                    )
                elif r["class"] in ("NetworkAccess", "LoadListingRequirement"):
                    pass
                else:
                    _logger.info(str(sl.makeError("Unknown hint %s" % (r["class"]))))

    def visit(self, op: Callable[[CommentedMap], None]) -> None:
        op(self.tool)

    @abc.abstractmethod
    def job(
        self,
        job_order: CWLObjectType,
        output_callbacks: Optional[OutputCallbackType],
        runtimeContext: RuntimeContext,
    ) -> JobsGeneratorType:
        pass


_names = set()  # type: Set[str]


def uniquename(stem: str, names: Optional[Set[str]] = None) -> str:
    global _names
    if names is None:
        names = _names
    c = 1
    u = stem
    while u in names:
        c += 1
        u = f"{stem}_{c}"
    names.add(u)
    return u


def nestdir(base: str, deps: CWLObjectType) -> CWLObjectType:
    dirname = os.path.dirname(base) + "/"
    subid = cast(str, deps["location"])
    if subid.startswith(dirname):
        s2 = subid[len(dirname) :]
        sp = s2.split("/")
        sp.pop()
        while sp:
            nx = sp.pop()
            deps = {"class": "Directory", "basename": nx, "listing": [deps]}
    return deps


def mergedirs(listing: List[CWLObjectType]) -> List[CWLObjectType]:
    r = []  # type: List[CWLObjectType]
    ents = {}  # type: Dict[str, CWLObjectType]
    collided = set()  # type: Set[str]
    for e in listing:
        basename = cast(str, e["basename"])
        if basename not in ents:
            ents[basename] = e
        elif e["class"] == "Directory":
            if e.get("listing"):
                cast(
                    List[CWLObjectType], ents[basename].setdefault("listing", [])
                ).extend(cast(List[CWLObjectType], e["listing"]))
            if cast(str, ents[basename]["location"]).startswith("_:"):
                ents[basename]["location"] = e["location"]
        elif e["location"] != ents[basename]["location"]:
            # same basename, different location, collision,
            # rename both.
            collided.add(basename)
            e2 = ents[basename]

            e["basename"] = urllib.parse.quote(cast(str, e["location"]), safe="")
            e2["basename"] = urllib.parse.quote(cast(str, e2["location"]), safe="")

            e["nameroot"], e["nameext"] = os.path.splitext(cast(str, e["basename"]))
            e2["nameroot"], e2["nameext"] = os.path.splitext(cast(str, e2["basename"]))

            ents[cast(str, e["basename"])] = e
            ents[cast(str, e2["basename"])] = e2
    for c in collided:
        del ents[c]
    for e in ents.values():
        if e["class"] == "Directory" and "listing" in e:
            e["listing"] = cast(
                MutableSequence[CWLOutputAtomType],
                mergedirs(cast(List[CWLObjectType], e["listing"])),
            )
    r.extend(ents.values())
    return r


CWL_IANA = "https://www.iana.org/assignments/media-types/application/cwl"


def scandeps(
    base: str,
    doc: Union[CWLObjectType, MutableSequence[CWLObjectType]],
    reffields: Set[str],
    urlfields: Set[str],
    loadref: Callable[[str, str], Union[CommentedMap, CommentedSeq, str, None]],
    urljoin: Callable[[str, str], str] = urllib.parse.urljoin,
    nestdirs: bool = True,
) -> MutableSequence[CWLObjectType]:
    r = []  # type: MutableSequence[CWLObjectType]
    if isinstance(doc, MutableMapping):
        if "id" in doc:
            if cast(str, doc["id"]).startswith("file://"):
                df, _ = urllib.parse.urldefrag(cast(str, doc["id"]))
                if base != df:
                    r.append({"class": "File", "location": df, "format": CWL_IANA})
                    base = df

        if doc.get("class") in ("File", "Directory") and "location" in urlfields:
            u = cast(Optional[str], doc.get("location", doc.get("path")))
            if u and not u.startswith("_:"):
                deps = {
                    "class": doc["class"],
                    "location": urljoin(base, u),
                }  # type: CWLObjectType
                if "basename" in doc:
                    deps["basename"] = doc["basename"]
                if doc["class"] == "Directory" and "listing" in doc:
                    deps["listing"] = doc["listing"]
                if doc["class"] == "File" and "secondaryFiles" in doc:
                    deps["secondaryFiles"] = cast(
                        CWLOutputAtomType,
                        scandeps(
                            base,
                            cast(
                                Union[CWLObjectType, MutableSequence[CWLObjectType]],
                                doc["secondaryFiles"],
                            ),
                            reffields,
                            urlfields,
                            loadref,
                            urljoin=urljoin,
                            nestdirs=nestdirs,
                        ),
                    )
                if nestdirs:
                    deps = nestdir(base, deps)
                r.append(deps)
            else:
                if doc["class"] == "Directory" and "listing" in doc:
                    r.extend(
                        scandeps(
                            base,
                            cast(MutableSequence[CWLObjectType], doc["listing"]),
                            reffields,
                            urlfields,
                            loadref,
                            urljoin=urljoin,
                            nestdirs=nestdirs,
                        )
                    )
                elif doc["class"] == "File" and "secondaryFiles" in doc:
                    r.extend(
                        scandeps(
                            base,
                            cast(MutableSequence[CWLObjectType], doc["secondaryFiles"]),
                            reffields,
                            urlfields,
                            loadref,
                            urljoin=urljoin,
                            nestdirs=nestdirs,
                        )
                    )

        for k, v in doc.items():
            if k in reffields:
                for u2 in aslist(v):
                    if isinstance(u2, MutableMapping):
                        r.extend(
                            scandeps(
                                base,
                                u2,
                                reffields,
                                urlfields,
                                loadref,
                                urljoin=urljoin,
                                nestdirs=nestdirs,
                            )
                        )
                    else:
                        subid = urljoin(base, u2)
                        basedf, _ = urllib.parse.urldefrag(base)
                        subiddf, _ = urllib.parse.urldefrag(subid)
                        if basedf == subiddf:
                            continue
                        sub = cast(
                            Union[MutableSequence[CWLObjectType], CWLObjectType],
                            loadref(base, u2),
                        )
                        deps2 = {
                            "class": "File",
                            "location": subid,
                            "format": CWL_IANA,
                        }  # type: CWLObjectType
                        sf = scandeps(
                            subid,
                            sub,
                            reffields,
                            urlfields,
                            loadref,
                            urljoin=urljoin,
                            nestdirs=nestdirs,
                        )
                        if sf:
                            deps2["secondaryFiles"] = cast(
                                MutableSequence[CWLOutputAtomType], sf
                            )
                        if nestdirs:
                            deps2 = nestdir(base, deps2)
                        r.append(deps2)
            elif k in urlfields and k != "location":
                for u3 in aslist(v):
                    deps = {"class": "File", "location": urljoin(base, u3)}
                    if nestdirs:
                        deps = nestdir(base, deps)
                    r.append(deps)
            elif doc.get("class") in ("File", "Directory") and k in (
                "listing",
                "secondaryFiles",
            ):
                # should be handled earlier.
                pass
            else:
                r.extend(
                    scandeps(
                        base,
                        cast(Union[MutableSequence[CWLObjectType], CWLObjectType], v),
                        reffields,
                        urlfields,
                        loadref,
                        urljoin=urljoin,
                        nestdirs=nestdirs,
                    )
                )
    elif isinstance(doc, MutableSequence):
        for d in doc:
            r.extend(
                scandeps(
                    base,
                    d,
                    reffields,
                    urlfields,
                    loadref,
                    urljoin=urljoin,
                    nestdirs=nestdirs,
                )
            )

    if r:
        normalizeFilesDirs(r)
        r = mergedirs(cast(List[CWLObjectType], r))

    return r


def compute_checksums(fs_access: StdFsAccess, fileobj: CWLObjectType) -> None:
    if "checksum" not in fileobj:
        checksum = hashlib.sha1()  # nosec
        location = cast(str, fileobj["location"])
        with fs_access.open(location, "rb") as f:
            contents = f.read(1024 * 1024)
            while contents != b"":
                checksum.update(contents)
                contents = f.read(1024 * 1024)
        fileobj["checksum"] = "sha1$%s" % checksum.hexdigest()
        fileobj["size"] = fs_access.size(location)