diff env/lib/python3.9/site-packages/cwltool/workflow_job.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/cwltool/workflow_job.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,881 @@
+import copy
+import datetime
+import functools
+import logging
+import threading
+from typing import (
+    Dict,
+    List,
+    MutableMapping,
+    MutableSequence,
+    Optional,
+    Sized,
+    Tuple,
+    cast,
+)
+
+from schema_salad.sourceline import SourceLine
+from schema_salad.utils import json_dumps
+from typing_extensions import TYPE_CHECKING
+
+from . import expression
+from .builder import content_limit_respected_read
+from .checker import can_assign_src_to_sink
+from .context import RuntimeContext, getdefault
+from .errors import WorkflowException
+from .loghandler import _logger
+from .process import shortname, uniquename
+from .stdfsaccess import StdFsAccess
+from .utils import (
+    CWLObjectType,
+    CWLOutputType,
+    JobsGeneratorType,
+    OutputCallbackType,
+    ParametersType,
+    ScatterDestinationsType,
+    ScatterOutputCallbackType,
+    SinkType,
+    WorkflowStateItem,
+    adjustDirObjs,
+    aslist,
+    get_listing,
+)
+
+if TYPE_CHECKING:
+    from .provenance_profile import ProvenanceProfile
+    from .workflow import Workflow, WorkflowStep
+
+
+class WorkflowJobStep:
+    """Generated for each step in Workflow.steps()."""
+
+    def __init__(self, step: "WorkflowStep") -> None:
+        """Initialize this WorkflowJobStep."""
+        self.step = step
+        self.tool = step.tool
+        self.id = step.id
+        self.submitted = False
+        self.iterable = None  # type: Optional[JobsGeneratorType]
+        self.completed = False
+        self.name = uniquename("step %s" % shortname(self.id))
+        self.prov_obj = step.prov_obj
+        self.parent_wf = step.parent_wf
+
+    def job(
+        self,
+        joborder: CWLObjectType,
+        output_callback: Optional[OutputCallbackType],
+        runtimeContext: RuntimeContext,
+    ) -> JobsGeneratorType:
+        runtimeContext = runtimeContext.copy()
+        runtimeContext.part_of = self.name
+        runtimeContext.name = shortname(self.id)
+
+        _logger.info("[%s] start", self.name)
+
+        yield from self.step.job(joborder, output_callback, runtimeContext)
+
+
+class ReceiveScatterOutput:
+    """Produced by the scatter generators."""
+
+    def __init__(
+        self,
+        output_callback: ScatterOutputCallbackType,
+        dest: ScatterDestinationsType,
+        total: int,
+    ) -> None:
+        """Initialize."""
+        self.dest = dest
+        self.completed = 0
+        self.processStatus = "success"
+        self.total = total
+        self.output_callback = output_callback
+        self.steps = []  # type: List[Optional[JobsGeneratorType]]
+
+    def receive_scatter_output(
+        self, index: int, jobout: CWLObjectType, processStatus: str
+    ) -> None:
+        for key, val in jobout.items():
+            self.dest[key][index] = val
+
+        # Release the iterable related to this step to
+        # reclaim memory.
+        if self.steps:
+            self.steps[index] = None
+
+        if processStatus != "success":
+            if self.processStatus != "permanentFail":
+                self.processStatus = processStatus
+
+        self.completed += 1
+
+        if self.completed == self.total:
+            self.output_callback(self.dest, self.processStatus)
+
+    def setTotal(
+        self,
+        total: int,
+        steps: List[Optional[JobsGeneratorType]],
+    ) -> None:
+        """
+        Set the total number of expected outputs along with the steps.
+
+        This is necessary to finish the setup.
+        """
+        self.total = total
+        self.steps = steps
+        if self.completed == self.total:
+            self.output_callback(self.dest, self.processStatus)
+
+
+def parallel_steps(
+    steps: List[Optional[JobsGeneratorType]],
+    rc: ReceiveScatterOutput,
+    runtimeContext: RuntimeContext,
+) -> JobsGeneratorType:
+    while rc.completed < rc.total:
+        made_progress = False
+        for index, step in enumerate(steps):
+            if getdefault(
+                runtimeContext.on_error, "stop"
+            ) == "stop" and rc.processStatus not in ("success", "skipped"):
+                break
+            if step is None:
+                continue
+            try:
+                for j in step:
+                    if getdefault(
+                        runtimeContext.on_error, "stop"
+                    ) == "stop" and rc.processStatus not in ("success", "skipped"):
+                        break
+                    if j is not None:
+                        made_progress = True
+                        yield j
+                    else:
+                        break
+                if made_progress:
+                    break
+            except WorkflowException as exc:
+                _logger.error("Cannot make scatter job: %s", str(exc))
+                _logger.debug("", exc_info=True)
+                rc.receive_scatter_output(index, {}, "permanentFail")
+        if not made_progress and rc.completed < rc.total:
+            yield None
+
+
+def nested_crossproduct_scatter(
+    process: WorkflowJobStep,
+    joborder: CWLObjectType,
+    scatter_keys: MutableSequence[str],
+    output_callback: ScatterOutputCallbackType,
+    runtimeContext: RuntimeContext,
+) -> JobsGeneratorType:
+    scatter_key = scatter_keys[0]
+    jobl = len(cast(Sized, joborder[scatter_key]))
+    output = {}  # type: ScatterDestinationsType
+    for i in process.tool["outputs"]:
+        output[i["id"]] = [None] * jobl
+
+    rc = ReceiveScatterOutput(output_callback, output, jobl)
+
+    steps = []  # type: List[Optional[JobsGeneratorType]]
+    for index in range(0, jobl):
+        sjob = copy.copy(joborder)  # type: Optional[CWLObjectType]
+        assert sjob is not None  # nosec
+        sjob[scatter_key] = cast(
+            MutableMapping[int, CWLObjectType], joborder[scatter_key]
+        )[index]
+
+        if len(scatter_keys) == 1:
+            if runtimeContext.postScatterEval is not None:
+                sjob = runtimeContext.postScatterEval(sjob)
+            curriedcallback = functools.partial(rc.receive_scatter_output, index)
+            if sjob is not None:
+                steps.append(process.job(sjob, curriedcallback, runtimeContext))
+            else:
+                curriedcallback({}, "skipped")
+                steps.append(None)
+        else:
+            steps.append(
+                nested_crossproduct_scatter(
+                    process,
+                    sjob,
+                    scatter_keys[1:],
+                    functools.partial(rc.receive_scatter_output, index),
+                    runtimeContext,
+                )
+            )
+
+    rc.setTotal(jobl, steps)
+    return parallel_steps(steps, rc, runtimeContext)
+
+
+def crossproduct_size(
+    joborder: CWLObjectType, scatter_keys: MutableSequence[str]
+) -> int:
+    scatter_key = scatter_keys[0]
+    if len(scatter_keys) == 1:
+        ssum = len(cast(Sized, joborder[scatter_key]))
+    else:
+        ssum = 0
+        for _ in range(0, len(cast(Sized, joborder[scatter_key]))):
+            ssum += crossproduct_size(joborder, scatter_keys[1:])
+    return ssum
+
+
+def flat_crossproduct_scatter(
+    process: WorkflowJobStep,
+    joborder: CWLObjectType,
+    scatter_keys: MutableSequence[str],
+    output_callback: ScatterOutputCallbackType,
+    runtimeContext: RuntimeContext,
+) -> JobsGeneratorType:
+    output = {}  # type: ScatterDestinationsType
+    for i in process.tool["outputs"]:
+        output[i["id"]] = [None] * crossproduct_size(joborder, scatter_keys)
+    callback = ReceiveScatterOutput(output_callback, output, 0)
+    (steps, total) = _flat_crossproduct_scatter(
+        process, joborder, scatter_keys, callback, 0, runtimeContext
+    )
+    callback.setTotal(total, steps)
+    return parallel_steps(steps, callback, runtimeContext)
+
+
+def _flat_crossproduct_scatter(
+    process: WorkflowJobStep,
+    joborder: CWLObjectType,
+    scatter_keys: MutableSequence[str],
+    callback: ReceiveScatterOutput,
+    startindex: int,
+    runtimeContext: RuntimeContext,
+) -> Tuple[List[Optional[JobsGeneratorType]], int,]:
+    """Inner loop."""
+    scatter_key = scatter_keys[0]
+    jobl = len(cast(Sized, joborder[scatter_key]))
+    steps = []  # type: List[Optional[JobsGeneratorType]]
+    put = startindex
+    for index in range(0, jobl):
+        sjob = copy.copy(joborder)  # type: Optional[CWLObjectType]
+        assert sjob is not None  # nosec
+        sjob[scatter_key] = cast(
+            MutableMapping[int, CWLObjectType], joborder[scatter_key]
+        )[index]
+
+        if len(scatter_keys) == 1:
+            if runtimeContext.postScatterEval is not None:
+                sjob = runtimeContext.postScatterEval(sjob)
+            curriedcallback = functools.partial(callback.receive_scatter_output, put)
+            if sjob is not None:
+                steps.append(process.job(sjob, curriedcallback, runtimeContext))
+            else:
+                curriedcallback({}, "skipped")
+                steps.append(None)
+            put += 1
+        else:
+            (add, _) = _flat_crossproduct_scatter(
+                process, sjob, scatter_keys[1:], callback, put, runtimeContext
+            )
+            put += len(add)
+            steps.extend(add)
+
+    return (steps, put)
+
+
+def dotproduct_scatter(
+    process: WorkflowJobStep,
+    joborder: CWLObjectType,
+    scatter_keys: MutableSequence[str],
+    output_callback: ScatterOutputCallbackType,
+    runtimeContext: RuntimeContext,
+) -> JobsGeneratorType:
+    jobl = None  # type: Optional[int]
+    for key in scatter_keys:
+        if jobl is None:
+            jobl = len(cast(Sized, joborder[key]))
+        elif jobl != len(cast(Sized, joborder[key])):
+            raise WorkflowException(
+                "Length of input arrays must be equal when performing "
+                "dotproduct scatter."
+            )
+    if jobl is None:
+        raise Exception("Impossible codepath")
+
+    output = {}  # type: ScatterDestinationsType
+    for i in process.tool["outputs"]:
+        output[i["id"]] = [None] * jobl
+
+    rc = ReceiveScatterOutput(output_callback, output, jobl)
+
+    steps = []  # type: List[Optional[JobsGeneratorType]]
+    for index in range(0, jobl):
+        sjobo = copy.copy(joborder)  # type: Optional[CWLObjectType]
+        assert sjobo is not None  # nosec
+        for key in scatter_keys:
+            sjobo[key] = cast(MutableMapping[int, CWLObjectType], joborder[key])[index]
+
+        if runtimeContext.postScatterEval is not None:
+            sjobo = runtimeContext.postScatterEval(sjobo)
+        curriedcallback = functools.partial(rc.receive_scatter_output, index)
+        if sjobo is not None:
+            steps.append(process.job(sjobo, curriedcallback, runtimeContext))
+        else:
+            curriedcallback({}, "skipped")
+            steps.append(None)
+
+    rc.setTotal(jobl, steps)
+    return parallel_steps(steps, rc, runtimeContext)
+
+
+def match_types(
+    sinktype: Optional[SinkType],
+    src: WorkflowStateItem,
+    iid: str,
+    inputobj: CWLObjectType,
+    linkMerge: Optional[str],
+    valueFrom: Optional[str],
+) -> bool:
+    if isinstance(sinktype, MutableSequence):
+        # Sink is union type
+        for st in sinktype:
+            if match_types(st, src, iid, inputobj, linkMerge, valueFrom):
+                return True
+    elif isinstance(src.parameter["type"], MutableSequence):
+        # Source is union type
+        # Check that at least one source type is compatible with the sink.
+        original_types = src.parameter["type"]
+        for source_type in original_types:
+            src.parameter["type"] = source_type
+            match = match_types(sinktype, src, iid, inputobj, linkMerge, valueFrom)
+            if match:
+                src.parameter["type"] = original_types
+                return True
+        src.parameter["type"] = original_types
+        return False
+    elif linkMerge:
+        if iid not in inputobj:
+            inputobj[iid] = []
+        sourceTypes = cast(List[Optional[CWLOutputType]], inputobj[iid])
+        if linkMerge == "merge_nested":
+            sourceTypes.append(src.value)
+        elif linkMerge == "merge_flattened":
+            if isinstance(src.value, MutableSequence):
+                sourceTypes.extend(src.value)
+            else:
+                sourceTypes.append(src.value)
+        else:
+            raise WorkflowException("Unrecognized linkMerge enum '%s'" % linkMerge)
+        return True
+    elif (
+        valueFrom is not None
+        or can_assign_src_to_sink(cast(SinkType, src.parameter["type"]), sinktype)
+        or sinktype == "Any"
+    ):
+        # simply assign the value from state to input
+        inputobj[iid] = copy.deepcopy(src.value)
+        return True
+    return False
+
+
+def object_from_state(
+    state: Dict[str, Optional[WorkflowStateItem]],
+    parms: ParametersType,
+    frag_only: bool,
+    supportsMultipleInput: bool,
+    sourceField: str,
+    incomplete: bool = False,
+) -> Optional[CWLObjectType]:
+    inputobj = {}  # type: CWLObjectType
+    for inp in parms:
+        iid = original_id = cast(str, inp["id"])
+        if frag_only:
+            iid = shortname(iid)
+        if sourceField in inp:
+            connections = aslist(inp[sourceField])
+            if len(connections) > 1 and not supportsMultipleInput:
+                raise WorkflowException(
+                    "Workflow contains multiple inbound links to a single "
+                    "parameter but MultipleInputFeatureRequirement is not "
+                    "declared."
+                )
+            for src in connections:
+                a_state = state.get(src, None)
+                if a_state is not None and (
+                    a_state.success in ("success", "skipped") or incomplete
+                ):
+                    if not match_types(
+                        inp["type"],
+                        a_state,
+                        iid,
+                        inputobj,
+                        cast(
+                            Optional[str],
+                            inp.get(
+                                "linkMerge",
+                                ("merge_nested" if len(connections) > 1 else None),
+                            ),
+                        ),
+                        valueFrom=cast(str, inp.get("valueFrom")),
+                    ):
+                        raise WorkflowException(
+                            "Type mismatch between source '%s' (%s) and "
+                            "sink '%s' (%s)"
+                            % (src, a_state.parameter["type"], original_id, inp["type"])
+                        )
+                elif src not in state:
+                    raise WorkflowException(
+                        "Connect source '%s' on parameter '%s' does not "
+                        "exist" % (src, original_id)
+                    )
+                elif not incomplete:
+                    return None
+
+        if "pickValue" in inp and isinstance(inputobj.get(iid), MutableSequence):
+            seq = cast(MutableSequence[Optional[CWLOutputType]], inputobj.get(iid))
+            if inp["pickValue"] == "first_non_null":
+                found = False
+                for v in seq:
+                    if v is not None:
+                        found = True
+                        inputobj[iid] = v
+                        break
+                if not found:
+                    raise WorkflowException(
+                        "All sources for '%s' are null" % (shortname(original_id))
+                    )
+            elif inp["pickValue"] == "the_only_non_null":
+                found = False
+                for v in seq:
+                    if v is not None:
+                        if found:
+                            raise WorkflowException(
+                                "Expected only one source for '%s' to be non-null, got %s"
+                                % (shortname(original_id), seq)
+                            )
+                        found = True
+                        inputobj[iid] = v
+                if not found:
+                    raise WorkflowException(
+                        "All sources for '%s' are null" % (shortname(original_id))
+                    )
+            elif inp["pickValue"] == "all_non_null":
+                inputobj[iid] = [v for v in seq if v is not None]
+
+        if inputobj.get(iid) is None and "default" in inp:
+            inputobj[iid] = inp["default"]
+
+        if iid not in inputobj and ("valueFrom" in inp or incomplete):
+            inputobj[iid] = None
+
+        if iid not in inputobj:
+            raise WorkflowException("Value for %s not specified" % original_id)
+    return inputobj
+
+
+class WorkflowJob:
+    """Generates steps from the Workflow."""
+
+    def __init__(self, workflow: "Workflow", runtimeContext: RuntimeContext) -> None:
+        """Initialize this WorkflowJob."""
+        self.workflow = workflow
+        self.prov_obj = None  # type: Optional[ProvenanceProfile]
+        self.parent_wf = None  # type: Optional[ProvenanceProfile]
+        self.tool = workflow.tool
+        if runtimeContext.research_obj is not None:
+            self.prov_obj = workflow.provenance_object
+            self.parent_wf = workflow.parent_wf
+        self.steps = [WorkflowJobStep(s) for s in workflow.steps]
+        self.state = {}  # type: Dict[str, Optional[WorkflowStateItem]]
+        self.processStatus = ""
+        self.did_callback = False
+        self.made_progress = None  # type: Optional[bool]
+        self.outdir = runtimeContext.get_outdir()
+
+        self.name = uniquename(
+            "workflow {}".format(
+                getdefault(
+                    runtimeContext.name,
+                    shortname(self.workflow.tool.get("id", "embedded")),
+                )
+            )
+        )
+
+        _logger.debug(
+            "[%s] initialized from %s",
+            self.name,
+            self.tool.get("id", "workflow embedded in %s" % runtimeContext.part_of),
+        )
+
+    def do_output_callback(self, final_output_callback: OutputCallbackType) -> None:
+
+        supportsMultipleInput = bool(
+            self.workflow.get_requirement("MultipleInputFeatureRequirement")[0]
+        )
+
+        wo = None  # type: Optional[CWLObjectType]
+        try:
+            wo = object_from_state(
+                self.state,
+                self.tool["outputs"],
+                True,
+                supportsMultipleInput,
+                "outputSource",
+                incomplete=True,
+            )
+        except WorkflowException as err:
+            _logger.error(
+                "[%s] Cannot collect workflow output: %s", self.name, str(err)
+            )
+            self.processStatus = "permanentFail"
+        if (
+            self.prov_obj
+            and self.parent_wf
+            and self.prov_obj.workflow_run_uri != self.parent_wf.workflow_run_uri
+        ):
+            process_run_id = None  # type: Optional[str]
+            self.prov_obj.generate_output_prov(wo or {}, process_run_id, self.name)
+            self.prov_obj.document.wasEndedBy(
+                self.prov_obj.workflow_run_uri,
+                None,
+                self.prov_obj.engine_uuid,
+                datetime.datetime.now(),
+            )
+            prov_ids = self.prov_obj.finalize_prov_profile(self.name)
+            # Tell parent to associate our provenance files with our wf run
+            self.parent_wf.activity_has_provenance(
+                self.prov_obj.workflow_run_uri, prov_ids
+            )
+
+        _logger.info("[%s] completed %s", self.name, self.processStatus)
+        if _logger.isEnabledFor(logging.DEBUG):
+            _logger.debug("[%s] outputs %s", self.name, json_dumps(wo, indent=4))
+
+        self.did_callback = True
+
+        final_output_callback(wo, self.processStatus)
+
+    def receive_output(
+        self,
+        step: WorkflowJobStep,
+        outputparms: List[CWLObjectType],
+        final_output_callback: OutputCallbackType,
+        jobout: CWLObjectType,
+        processStatus: str,
+    ) -> None:
+
+        for i in outputparms:
+            if "id" in i:
+                iid = cast(str, i["id"])
+                if iid in jobout:
+                    self.state[iid] = WorkflowStateItem(i, jobout[iid], processStatus)
+                else:
+                    _logger.error(
+                        "[%s] Output is missing expected field %s", step.name, iid
+                    )
+                    processStatus = "permanentFail"
+        if _logger.isEnabledFor(logging.DEBUG):
+            _logger.debug(
+                "[%s] produced output %s", step.name, json_dumps(jobout, indent=4)
+            )
+
+        if processStatus not in ("success", "skipped"):
+            if self.processStatus != "permanentFail":
+                self.processStatus = processStatus
+
+            _logger.warning("[%s] completed %s", step.name, processStatus)
+        else:
+            _logger.info("[%s] completed %s", step.name, processStatus)
+
+        step.completed = True
+        # Release the iterable related to this step to
+        # reclaim memory.
+        step.iterable = None
+        self.made_progress = True
+
+        completed = sum(1 for s in self.steps if s.completed)
+        if completed == len(self.steps):
+            self.do_output_callback(final_output_callback)
+
+    def try_make_job(
+        self,
+        step: WorkflowJobStep,
+        final_output_callback: Optional[OutputCallbackType],
+        runtimeContext: RuntimeContext,
+    ) -> JobsGeneratorType:
+
+        if step.submitted:
+            return
+
+        inputparms = step.tool["inputs"]
+        outputparms = step.tool["outputs"]
+
+        supportsMultipleInput = bool(
+            self.workflow.get_requirement("MultipleInputFeatureRequirement")[0]
+        )
+
+        try:
+            inputobj = object_from_state(
+                self.state, inputparms, False, supportsMultipleInput, "source"
+            )
+            if inputobj is None:
+                _logger.debug("[%s] job step %s not ready", self.name, step.id)
+                return
+
+            if step.submitted:
+                return
+            _logger.info("[%s] starting %s", self.name, step.name)
+
+            callback = functools.partial(
+                self.receive_output, step, outputparms, final_output_callback
+            )
+
+            valueFrom = {
+                i["id"]: i["valueFrom"] for i in step.tool["inputs"] if "valueFrom" in i
+            }
+
+            loadContents = {
+                i["id"] for i in step.tool["inputs"] if i.get("loadContents")
+            }
+
+            if len(valueFrom) > 0 and not bool(
+                self.workflow.get_requirement("StepInputExpressionRequirement")[0]
+            ):
+                raise WorkflowException(
+                    "Workflow step contains valueFrom but StepInputExpressionRequirement not in requirements"
+                )
+
+            vfinputs = {shortname(k): v for k, v in inputobj.items()}
+
+            def postScatterEval(io: CWLObjectType) -> Optional[CWLObjectType]:
+                shortio = cast(CWLObjectType, {shortname(k): v for k, v in io.items()})
+
+                fs_access = getdefault(runtimeContext.make_fs_access, StdFsAccess)("")
+                for k, v in io.items():
+                    if k in loadContents:
+                        val = cast(CWLObjectType, v)
+                        if val.get("contents") is None:
+                            with fs_access.open(cast(str, val["location"]), "rb") as f:
+                                val["contents"] = content_limit_respected_read(f)
+
+                def valueFromFunc(
+                    k: str, v: Optional[CWLOutputType]
+                ) -> Optional[CWLOutputType]:
+                    if k in valueFrom:
+                        adjustDirObjs(
+                            v, functools.partial(get_listing, fs_access, recursive=True)
+                        )
+                        return expression.do_eval(
+                            valueFrom[k],
+                            shortio,
+                            self.workflow.requirements,
+                            None,
+                            None,
+                            {},
+                            context=v,
+                            debug=runtimeContext.debug,
+                            js_console=runtimeContext.js_console,
+                            timeout=runtimeContext.eval_timeout,
+                        )
+                    return v
+
+                psio = {k: valueFromFunc(k, v) for k, v in io.items()}
+                if "when" in step.tool:
+                    evalinputs = {shortname(k): v for k, v in psio.items()}
+                    whenval = expression.do_eval(
+                        step.tool["when"],
+                        evalinputs,
+                        self.workflow.requirements,
+                        None,
+                        None,
+                        {},
+                        context=cast(Optional[CWLObjectType], v),
+                        debug=runtimeContext.debug,
+                        js_console=runtimeContext.js_console,
+                        timeout=runtimeContext.eval_timeout,
+                    )
+                    if whenval is True:
+                        pass
+                    elif whenval is False:
+                        _logger.debug(
+                            "[%s] conditional %s evaluated to %s",
+                            step.name,
+                            step.tool["when"],
+                            whenval,
+                        )
+                        _logger.debug(
+                            "[%s] inputs was %s",
+                            step.name,
+                            json_dumps(evalinputs, indent=2),
+                        )
+                        return None
+                    else:
+                        raise WorkflowException(
+                            "Conditional 'when' must evaluate to 'true' or 'false'"
+                        )
+                return psio
+
+            if "scatter" in step.tool:
+                scatter = cast(List[str], aslist(step.tool["scatter"]))
+                method = step.tool.get("scatterMethod")
+                if method is None and len(scatter) != 1:
+                    raise WorkflowException(
+                        "Must specify scatterMethod when scattering over multiple inputs"
+                    )
+                runtimeContext = runtimeContext.copy()
+                runtimeContext.postScatterEval = postScatterEval
+
+                emptyscatter = [
+                    shortname(s) for s in scatter if len(cast(Sized, inputobj[s])) == 0
+                ]
+                if emptyscatter:
+                    _logger.warning(
+                        "[job %s] Notice: scattering over empty input in "
+                        "'%s'.  All outputs will be empty.",
+                        step.name,
+                        "', '".join(emptyscatter),
+                    )
+
+                if method == "dotproduct" or method is None:
+                    jobs = dotproduct_scatter(
+                        step, inputobj, scatter, callback, runtimeContext
+                    )
+                elif method == "nested_crossproduct":
+                    jobs = nested_crossproduct_scatter(
+                        step, inputobj, scatter, callback, runtimeContext
+                    )
+                elif method == "flat_crossproduct":
+                    jobs = flat_crossproduct_scatter(
+                        step, inputobj, scatter, callback, runtimeContext
+                    )
+            else:
+                if _logger.isEnabledFor(logging.DEBUG):
+                    _logger.debug(
+                        "[%s] job input %s", step.name, json_dumps(inputobj, indent=4)
+                    )
+
+                inputobj = postScatterEval(inputobj)
+                if inputobj is not None:
+                    if _logger.isEnabledFor(logging.DEBUG):
+                        _logger.debug(
+                            "[%s] evaluated job input to %s",
+                            step.name,
+                            json_dumps(inputobj, indent=4),
+                        )
+                    jobs = step.job(inputobj, callback, runtimeContext)
+                else:
+                    _logger.info("[%s] will be skipped", step.name)
+                    callback({k["id"]: None for k in outputparms}, "skipped")
+                    step.completed = True
+                    jobs = (_ for _ in ())
+
+            step.submitted = True
+
+            yield from jobs
+        except WorkflowException:
+            raise
+        except Exception:
+            _logger.exception("Unhandled exception")
+            self.processStatus = "permanentFail"
+            step.completed = True
+
+    def run(
+        self,
+        runtimeContext: RuntimeContext,
+        tmpdir_lock: Optional[threading.Lock] = None,
+    ) -> None:
+        """Log the start of each workflow."""
+        _logger.info("[%s] start", self.name)
+
+    def job(
+        self,
+        joborder: CWLObjectType,
+        output_callback: Optional[OutputCallbackType],
+        runtimeContext: RuntimeContext,
+    ) -> JobsGeneratorType:
+        self.state = {}
+        self.processStatus = "success"
+
+        if _logger.isEnabledFor(logging.DEBUG):
+            _logger.debug("[%s] inputs %s", self.name, json_dumps(joborder, indent=4))
+
+        runtimeContext = runtimeContext.copy()
+        runtimeContext.outdir = None
+
+        for index, inp in enumerate(self.tool["inputs"]):
+            with SourceLine(
+                self.tool["inputs"],
+                index,
+                WorkflowException,
+                _logger.isEnabledFor(logging.DEBUG),
+            ):
+                inp_id = shortname(inp["id"])
+                if inp_id in joborder:
+                    self.state[inp["id"]] = WorkflowStateItem(
+                        inp, joborder[inp_id], "success"
+                    )
+                elif "default" in inp:
+                    self.state[inp["id"]] = WorkflowStateItem(
+                        inp, inp["default"], "success"
+                    )
+                else:
+                    raise WorkflowException(
+                        "Input '%s' not in input object and does not have a "
+                        " default value." % (inp["id"])
+                    )
+
+        for step in self.steps:
+            for out in step.tool["outputs"]:
+                self.state[out["id"]] = None
+
+        completed = 0
+        while completed < len(self.steps):
+            self.made_progress = False
+
+            for step in self.steps:
+                if (
+                    getdefault(runtimeContext.on_error, "stop") == "stop"
+                    and self.processStatus != "success"
+                ):
+                    break
+
+                if not step.submitted:
+                    try:
+                        step.iterable = self.try_make_job(
+                            step, output_callback, runtimeContext
+                        )
+                    except WorkflowException as exc:
+                        _logger.error("[%s] Cannot make job: %s", step.name, str(exc))
+                        _logger.debug("", exc_info=True)
+                        self.processStatus = "permanentFail"
+
+                if step.iterable is not None:
+                    try:
+                        for newjob in step.iterable:
+                            if (
+                                getdefault(runtimeContext.on_error, "stop") == "stop"
+                                and self.processStatus != "success"
+                            ):
+                                break
+                            if newjob is not None:
+                                self.made_progress = True
+                                yield newjob
+                            else:
+                                break
+                    except WorkflowException as exc:
+                        _logger.error("[%s] Cannot make job: %s", step.name, str(exc))
+                        _logger.debug("", exc_info=True)
+                        self.processStatus = "permanentFail"
+
+            completed = sum(1 for s in self.steps if s.completed)
+
+            if not self.made_progress and completed < len(self.steps):
+                if self.processStatus != "success":
+                    break
+                else:
+                    yield None
+
+        if not self.did_callback and output_callback:
+            # could have called earlier on line 336;
+            self.do_output_callback(output_callback)
+            # depends which one comes first. All steps are completed
+            # or all outputs have been produced.