comparison env/lib/python3.9/site-packages/cwltool/workflow_job.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 import copy
2 import datetime
3 import functools
4 import logging
5 import threading
6 from typing import (
7 Dict,
8 List,
9 MutableMapping,
10 MutableSequence,
11 Optional,
12 Sized,
13 Tuple,
14 cast,
15 )
16
17 from schema_salad.sourceline import SourceLine
18 from schema_salad.utils import json_dumps
19 from typing_extensions import TYPE_CHECKING
20
21 from . import expression
22 from .builder import content_limit_respected_read
23 from .checker import can_assign_src_to_sink
24 from .context import RuntimeContext, getdefault
25 from .errors import WorkflowException
26 from .loghandler import _logger
27 from .process import shortname, uniquename
28 from .stdfsaccess import StdFsAccess
29 from .utils import (
30 CWLObjectType,
31 CWLOutputType,
32 JobsGeneratorType,
33 OutputCallbackType,
34 ParametersType,
35 ScatterDestinationsType,
36 ScatterOutputCallbackType,
37 SinkType,
38 WorkflowStateItem,
39 adjustDirObjs,
40 aslist,
41 get_listing,
42 )
43
44 if TYPE_CHECKING:
45 from .provenance_profile import ProvenanceProfile
46 from .workflow import Workflow, WorkflowStep
47
48
49 class WorkflowJobStep:
50 """Generated for each step in Workflow.steps()."""
51
52 def __init__(self, step: "WorkflowStep") -> None:
53 """Initialize this WorkflowJobStep."""
54 self.step = step
55 self.tool = step.tool
56 self.id = step.id
57 self.submitted = False
58 self.iterable = None # type: Optional[JobsGeneratorType]
59 self.completed = False
60 self.name = uniquename("step %s" % shortname(self.id))
61 self.prov_obj = step.prov_obj
62 self.parent_wf = step.parent_wf
63
64 def job(
65 self,
66 joborder: CWLObjectType,
67 output_callback: Optional[OutputCallbackType],
68 runtimeContext: RuntimeContext,
69 ) -> JobsGeneratorType:
70 runtimeContext = runtimeContext.copy()
71 runtimeContext.part_of = self.name
72 runtimeContext.name = shortname(self.id)
73
74 _logger.info("[%s] start", self.name)
75
76 yield from self.step.job(joborder, output_callback, runtimeContext)
77
78
79 class ReceiveScatterOutput:
80 """Produced by the scatter generators."""
81
82 def __init__(
83 self,
84 output_callback: ScatterOutputCallbackType,
85 dest: ScatterDestinationsType,
86 total: int,
87 ) -> None:
88 """Initialize."""
89 self.dest = dest
90 self.completed = 0
91 self.processStatus = "success"
92 self.total = total
93 self.output_callback = output_callback
94 self.steps = [] # type: List[Optional[JobsGeneratorType]]
95
96 def receive_scatter_output(
97 self, index: int, jobout: CWLObjectType, processStatus: str
98 ) -> None:
99 for key, val in jobout.items():
100 self.dest[key][index] = val
101
102 # Release the iterable related to this step to
103 # reclaim memory.
104 if self.steps:
105 self.steps[index] = None
106
107 if processStatus != "success":
108 if self.processStatus != "permanentFail":
109 self.processStatus = processStatus
110
111 self.completed += 1
112
113 if self.completed == self.total:
114 self.output_callback(self.dest, self.processStatus)
115
116 def setTotal(
117 self,
118 total: int,
119 steps: List[Optional[JobsGeneratorType]],
120 ) -> None:
121 """
122 Set the total number of expected outputs along with the steps.
123
124 This is necessary to finish the setup.
125 """
126 self.total = total
127 self.steps = steps
128 if self.completed == self.total:
129 self.output_callback(self.dest, self.processStatus)
130
131
132 def parallel_steps(
133 steps: List[Optional[JobsGeneratorType]],
134 rc: ReceiveScatterOutput,
135 runtimeContext: RuntimeContext,
136 ) -> JobsGeneratorType:
137 while rc.completed < rc.total:
138 made_progress = False
139 for index, step in enumerate(steps):
140 if getdefault(
141 runtimeContext.on_error, "stop"
142 ) == "stop" and rc.processStatus not in ("success", "skipped"):
143 break
144 if step is None:
145 continue
146 try:
147 for j in step:
148 if getdefault(
149 runtimeContext.on_error, "stop"
150 ) == "stop" and rc.processStatus not in ("success", "skipped"):
151 break
152 if j is not None:
153 made_progress = True
154 yield j
155 else:
156 break
157 if made_progress:
158 break
159 except WorkflowException as exc:
160 _logger.error("Cannot make scatter job: %s", str(exc))
161 _logger.debug("", exc_info=True)
162 rc.receive_scatter_output(index, {}, "permanentFail")
163 if not made_progress and rc.completed < rc.total:
164 yield None
165
166
167 def nested_crossproduct_scatter(
168 process: WorkflowJobStep,
169 joborder: CWLObjectType,
170 scatter_keys: MutableSequence[str],
171 output_callback: ScatterOutputCallbackType,
172 runtimeContext: RuntimeContext,
173 ) -> JobsGeneratorType:
174 scatter_key = scatter_keys[0]
175 jobl = len(cast(Sized, joborder[scatter_key]))
176 output = {} # type: ScatterDestinationsType
177 for i in process.tool["outputs"]:
178 output[i["id"]] = [None] * jobl
179
180 rc = ReceiveScatterOutput(output_callback, output, jobl)
181
182 steps = [] # type: List[Optional[JobsGeneratorType]]
183 for index in range(0, jobl):
184 sjob = copy.copy(joborder) # type: Optional[CWLObjectType]
185 assert sjob is not None # nosec
186 sjob[scatter_key] = cast(
187 MutableMapping[int, CWLObjectType], joborder[scatter_key]
188 )[index]
189
190 if len(scatter_keys) == 1:
191 if runtimeContext.postScatterEval is not None:
192 sjob = runtimeContext.postScatterEval(sjob)
193 curriedcallback = functools.partial(rc.receive_scatter_output, index)
194 if sjob is not None:
195 steps.append(process.job(sjob, curriedcallback, runtimeContext))
196 else:
197 curriedcallback({}, "skipped")
198 steps.append(None)
199 else:
200 steps.append(
201 nested_crossproduct_scatter(
202 process,
203 sjob,
204 scatter_keys[1:],
205 functools.partial(rc.receive_scatter_output, index),
206 runtimeContext,
207 )
208 )
209
210 rc.setTotal(jobl, steps)
211 return parallel_steps(steps, rc, runtimeContext)
212
213
214 def crossproduct_size(
215 joborder: CWLObjectType, scatter_keys: MutableSequence[str]
216 ) -> int:
217 scatter_key = scatter_keys[0]
218 if len(scatter_keys) == 1:
219 ssum = len(cast(Sized, joborder[scatter_key]))
220 else:
221 ssum = 0
222 for _ in range(0, len(cast(Sized, joborder[scatter_key]))):
223 ssum += crossproduct_size(joborder, scatter_keys[1:])
224 return ssum
225
226
227 def flat_crossproduct_scatter(
228 process: WorkflowJobStep,
229 joborder: CWLObjectType,
230 scatter_keys: MutableSequence[str],
231 output_callback: ScatterOutputCallbackType,
232 runtimeContext: RuntimeContext,
233 ) -> JobsGeneratorType:
234 output = {} # type: ScatterDestinationsType
235 for i in process.tool["outputs"]:
236 output[i["id"]] = [None] * crossproduct_size(joborder, scatter_keys)
237 callback = ReceiveScatterOutput(output_callback, output, 0)
238 (steps, total) = _flat_crossproduct_scatter(
239 process, joborder, scatter_keys, callback, 0, runtimeContext
240 )
241 callback.setTotal(total, steps)
242 return parallel_steps(steps, callback, runtimeContext)
243
244
245 def _flat_crossproduct_scatter(
246 process: WorkflowJobStep,
247 joborder: CWLObjectType,
248 scatter_keys: MutableSequence[str],
249 callback: ReceiveScatterOutput,
250 startindex: int,
251 runtimeContext: RuntimeContext,
252 ) -> Tuple[List[Optional[JobsGeneratorType]], int,]:
253 """Inner loop."""
254 scatter_key = scatter_keys[0]
255 jobl = len(cast(Sized, joborder[scatter_key]))
256 steps = [] # type: List[Optional[JobsGeneratorType]]
257 put = startindex
258 for index in range(0, jobl):
259 sjob = copy.copy(joborder) # type: Optional[CWLObjectType]
260 assert sjob is not None # nosec
261 sjob[scatter_key] = cast(
262 MutableMapping[int, CWLObjectType], joborder[scatter_key]
263 )[index]
264
265 if len(scatter_keys) == 1:
266 if runtimeContext.postScatterEval is not None:
267 sjob = runtimeContext.postScatterEval(sjob)
268 curriedcallback = functools.partial(callback.receive_scatter_output, put)
269 if sjob is not None:
270 steps.append(process.job(sjob, curriedcallback, runtimeContext))
271 else:
272 curriedcallback({}, "skipped")
273 steps.append(None)
274 put += 1
275 else:
276 (add, _) = _flat_crossproduct_scatter(
277 process, sjob, scatter_keys[1:], callback, put, runtimeContext
278 )
279 put += len(add)
280 steps.extend(add)
281
282 return (steps, put)
283
284
285 def dotproduct_scatter(
286 process: WorkflowJobStep,
287 joborder: CWLObjectType,
288 scatter_keys: MutableSequence[str],
289 output_callback: ScatterOutputCallbackType,
290 runtimeContext: RuntimeContext,
291 ) -> JobsGeneratorType:
292 jobl = None # type: Optional[int]
293 for key in scatter_keys:
294 if jobl is None:
295 jobl = len(cast(Sized, joborder[key]))
296 elif jobl != len(cast(Sized, joborder[key])):
297 raise WorkflowException(
298 "Length of input arrays must be equal when performing "
299 "dotproduct scatter."
300 )
301 if jobl is None:
302 raise Exception("Impossible codepath")
303
304 output = {} # type: ScatterDestinationsType
305 for i in process.tool["outputs"]:
306 output[i["id"]] = [None] * jobl
307
308 rc = ReceiveScatterOutput(output_callback, output, jobl)
309
310 steps = [] # type: List[Optional[JobsGeneratorType]]
311 for index in range(0, jobl):
312 sjobo = copy.copy(joborder) # type: Optional[CWLObjectType]
313 assert sjobo is not None # nosec
314 for key in scatter_keys:
315 sjobo[key] = cast(MutableMapping[int, CWLObjectType], joborder[key])[index]
316
317 if runtimeContext.postScatterEval is not None:
318 sjobo = runtimeContext.postScatterEval(sjobo)
319 curriedcallback = functools.partial(rc.receive_scatter_output, index)
320 if sjobo is not None:
321 steps.append(process.job(sjobo, curriedcallback, runtimeContext))
322 else:
323 curriedcallback({}, "skipped")
324 steps.append(None)
325
326 rc.setTotal(jobl, steps)
327 return parallel_steps(steps, rc, runtimeContext)
328
329
330 def match_types(
331 sinktype: Optional[SinkType],
332 src: WorkflowStateItem,
333 iid: str,
334 inputobj: CWLObjectType,
335 linkMerge: Optional[str],
336 valueFrom: Optional[str],
337 ) -> bool:
338 if isinstance(sinktype, MutableSequence):
339 # Sink is union type
340 for st in sinktype:
341 if match_types(st, src, iid, inputobj, linkMerge, valueFrom):
342 return True
343 elif isinstance(src.parameter["type"], MutableSequence):
344 # Source is union type
345 # Check that at least one source type is compatible with the sink.
346 original_types = src.parameter["type"]
347 for source_type in original_types:
348 src.parameter["type"] = source_type
349 match = match_types(sinktype, src, iid, inputobj, linkMerge, valueFrom)
350 if match:
351 src.parameter["type"] = original_types
352 return True
353 src.parameter["type"] = original_types
354 return False
355 elif linkMerge:
356 if iid not in inputobj:
357 inputobj[iid] = []
358 sourceTypes = cast(List[Optional[CWLOutputType]], inputobj[iid])
359 if linkMerge == "merge_nested":
360 sourceTypes.append(src.value)
361 elif linkMerge == "merge_flattened":
362 if isinstance(src.value, MutableSequence):
363 sourceTypes.extend(src.value)
364 else:
365 sourceTypes.append(src.value)
366 else:
367 raise WorkflowException("Unrecognized linkMerge enum '%s'" % linkMerge)
368 return True
369 elif (
370 valueFrom is not None
371 or can_assign_src_to_sink(cast(SinkType, src.parameter["type"]), sinktype)
372 or sinktype == "Any"
373 ):
374 # simply assign the value from state to input
375 inputobj[iid] = copy.deepcopy(src.value)
376 return True
377 return False
378
379
380 def object_from_state(
381 state: Dict[str, Optional[WorkflowStateItem]],
382 parms: ParametersType,
383 frag_only: bool,
384 supportsMultipleInput: bool,
385 sourceField: str,
386 incomplete: bool = False,
387 ) -> Optional[CWLObjectType]:
388 inputobj = {} # type: CWLObjectType
389 for inp in parms:
390 iid = original_id = cast(str, inp["id"])
391 if frag_only:
392 iid = shortname(iid)
393 if sourceField in inp:
394 connections = aslist(inp[sourceField])
395 if len(connections) > 1 and not supportsMultipleInput:
396 raise WorkflowException(
397 "Workflow contains multiple inbound links to a single "
398 "parameter but MultipleInputFeatureRequirement is not "
399 "declared."
400 )
401 for src in connections:
402 a_state = state.get(src, None)
403 if a_state is not None and (
404 a_state.success in ("success", "skipped") or incomplete
405 ):
406 if not match_types(
407 inp["type"],
408 a_state,
409 iid,
410 inputobj,
411 cast(
412 Optional[str],
413 inp.get(
414 "linkMerge",
415 ("merge_nested" if len(connections) > 1 else None),
416 ),
417 ),
418 valueFrom=cast(str, inp.get("valueFrom")),
419 ):
420 raise WorkflowException(
421 "Type mismatch between source '%s' (%s) and "
422 "sink '%s' (%s)"
423 % (src, a_state.parameter["type"], original_id, inp["type"])
424 )
425 elif src not in state:
426 raise WorkflowException(
427 "Connect source '%s' on parameter '%s' does not "
428 "exist" % (src, original_id)
429 )
430 elif not incomplete:
431 return None
432
433 if "pickValue" in inp and isinstance(inputobj.get(iid), MutableSequence):
434 seq = cast(MutableSequence[Optional[CWLOutputType]], inputobj.get(iid))
435 if inp["pickValue"] == "first_non_null":
436 found = False
437 for v in seq:
438 if v is not None:
439 found = True
440 inputobj[iid] = v
441 break
442 if not found:
443 raise WorkflowException(
444 "All sources for '%s' are null" % (shortname(original_id))
445 )
446 elif inp["pickValue"] == "the_only_non_null":
447 found = False
448 for v in seq:
449 if v is not None:
450 if found:
451 raise WorkflowException(
452 "Expected only one source for '%s' to be non-null, got %s"
453 % (shortname(original_id), seq)
454 )
455 found = True
456 inputobj[iid] = v
457 if not found:
458 raise WorkflowException(
459 "All sources for '%s' are null" % (shortname(original_id))
460 )
461 elif inp["pickValue"] == "all_non_null":
462 inputobj[iid] = [v for v in seq if v is not None]
463
464 if inputobj.get(iid) is None and "default" in inp:
465 inputobj[iid] = inp["default"]
466
467 if iid not in inputobj and ("valueFrom" in inp or incomplete):
468 inputobj[iid] = None
469
470 if iid not in inputobj:
471 raise WorkflowException("Value for %s not specified" % original_id)
472 return inputobj
473
474
475 class WorkflowJob:
476 """Generates steps from the Workflow."""
477
478 def __init__(self, workflow: "Workflow", runtimeContext: RuntimeContext) -> None:
479 """Initialize this WorkflowJob."""
480 self.workflow = workflow
481 self.prov_obj = None # type: Optional[ProvenanceProfile]
482 self.parent_wf = None # type: Optional[ProvenanceProfile]
483 self.tool = workflow.tool
484 if runtimeContext.research_obj is not None:
485 self.prov_obj = workflow.provenance_object
486 self.parent_wf = workflow.parent_wf
487 self.steps = [WorkflowJobStep(s) for s in workflow.steps]
488 self.state = {} # type: Dict[str, Optional[WorkflowStateItem]]
489 self.processStatus = ""
490 self.did_callback = False
491 self.made_progress = None # type: Optional[bool]
492 self.outdir = runtimeContext.get_outdir()
493
494 self.name = uniquename(
495 "workflow {}".format(
496 getdefault(
497 runtimeContext.name,
498 shortname(self.workflow.tool.get("id", "embedded")),
499 )
500 )
501 )
502
503 _logger.debug(
504 "[%s] initialized from %s",
505 self.name,
506 self.tool.get("id", "workflow embedded in %s" % runtimeContext.part_of),
507 )
508
509 def do_output_callback(self, final_output_callback: OutputCallbackType) -> None:
510
511 supportsMultipleInput = bool(
512 self.workflow.get_requirement("MultipleInputFeatureRequirement")[0]
513 )
514
515 wo = None # type: Optional[CWLObjectType]
516 try:
517 wo = object_from_state(
518 self.state,
519 self.tool["outputs"],
520 True,
521 supportsMultipleInput,
522 "outputSource",
523 incomplete=True,
524 )
525 except WorkflowException as err:
526 _logger.error(
527 "[%s] Cannot collect workflow output: %s", self.name, str(err)
528 )
529 self.processStatus = "permanentFail"
530 if (
531 self.prov_obj
532 and self.parent_wf
533 and self.prov_obj.workflow_run_uri != self.parent_wf.workflow_run_uri
534 ):
535 process_run_id = None # type: Optional[str]
536 self.prov_obj.generate_output_prov(wo or {}, process_run_id, self.name)
537 self.prov_obj.document.wasEndedBy(
538 self.prov_obj.workflow_run_uri,
539 None,
540 self.prov_obj.engine_uuid,
541 datetime.datetime.now(),
542 )
543 prov_ids = self.prov_obj.finalize_prov_profile(self.name)
544 # Tell parent to associate our provenance files with our wf run
545 self.parent_wf.activity_has_provenance(
546 self.prov_obj.workflow_run_uri, prov_ids
547 )
548
549 _logger.info("[%s] completed %s", self.name, self.processStatus)
550 if _logger.isEnabledFor(logging.DEBUG):
551 _logger.debug("[%s] outputs %s", self.name, json_dumps(wo, indent=4))
552
553 self.did_callback = True
554
555 final_output_callback(wo, self.processStatus)
556
557 def receive_output(
558 self,
559 step: WorkflowJobStep,
560 outputparms: List[CWLObjectType],
561 final_output_callback: OutputCallbackType,
562 jobout: CWLObjectType,
563 processStatus: str,
564 ) -> None:
565
566 for i in outputparms:
567 if "id" in i:
568 iid = cast(str, i["id"])
569 if iid in jobout:
570 self.state[iid] = WorkflowStateItem(i, jobout[iid], processStatus)
571 else:
572 _logger.error(
573 "[%s] Output is missing expected field %s", step.name, iid
574 )
575 processStatus = "permanentFail"
576 if _logger.isEnabledFor(logging.DEBUG):
577 _logger.debug(
578 "[%s] produced output %s", step.name, json_dumps(jobout, indent=4)
579 )
580
581 if processStatus not in ("success", "skipped"):
582 if self.processStatus != "permanentFail":
583 self.processStatus = processStatus
584
585 _logger.warning("[%s] completed %s", step.name, processStatus)
586 else:
587 _logger.info("[%s] completed %s", step.name, processStatus)
588
589 step.completed = True
590 # Release the iterable related to this step to
591 # reclaim memory.
592 step.iterable = None
593 self.made_progress = True
594
595 completed = sum(1 for s in self.steps if s.completed)
596 if completed == len(self.steps):
597 self.do_output_callback(final_output_callback)
598
599 def try_make_job(
600 self,
601 step: WorkflowJobStep,
602 final_output_callback: Optional[OutputCallbackType],
603 runtimeContext: RuntimeContext,
604 ) -> JobsGeneratorType:
605
606 if step.submitted:
607 return
608
609 inputparms = step.tool["inputs"]
610 outputparms = step.tool["outputs"]
611
612 supportsMultipleInput = bool(
613 self.workflow.get_requirement("MultipleInputFeatureRequirement")[0]
614 )
615
616 try:
617 inputobj = object_from_state(
618 self.state, inputparms, False, supportsMultipleInput, "source"
619 )
620 if inputobj is None:
621 _logger.debug("[%s] job step %s not ready", self.name, step.id)
622 return
623
624 if step.submitted:
625 return
626 _logger.info("[%s] starting %s", self.name, step.name)
627
628 callback = functools.partial(
629 self.receive_output, step, outputparms, final_output_callback
630 )
631
632 valueFrom = {
633 i["id"]: i["valueFrom"] for i in step.tool["inputs"] if "valueFrom" in i
634 }
635
636 loadContents = {
637 i["id"] for i in step.tool["inputs"] if i.get("loadContents")
638 }
639
640 if len(valueFrom) > 0 and not bool(
641 self.workflow.get_requirement("StepInputExpressionRequirement")[0]
642 ):
643 raise WorkflowException(
644 "Workflow step contains valueFrom but StepInputExpressionRequirement not in requirements"
645 )
646
647 vfinputs = {shortname(k): v for k, v in inputobj.items()}
648
649 def postScatterEval(io: CWLObjectType) -> Optional[CWLObjectType]:
650 shortio = cast(CWLObjectType, {shortname(k): v for k, v in io.items()})
651
652 fs_access = getdefault(runtimeContext.make_fs_access, StdFsAccess)("")
653 for k, v in io.items():
654 if k in loadContents:
655 val = cast(CWLObjectType, v)
656 if val.get("contents") is None:
657 with fs_access.open(cast(str, val["location"]), "rb") as f:
658 val["contents"] = content_limit_respected_read(f)
659
660 def valueFromFunc(
661 k: str, v: Optional[CWLOutputType]
662 ) -> Optional[CWLOutputType]:
663 if k in valueFrom:
664 adjustDirObjs(
665 v, functools.partial(get_listing, fs_access, recursive=True)
666 )
667 return expression.do_eval(
668 valueFrom[k],
669 shortio,
670 self.workflow.requirements,
671 None,
672 None,
673 {},
674 context=v,
675 debug=runtimeContext.debug,
676 js_console=runtimeContext.js_console,
677 timeout=runtimeContext.eval_timeout,
678 )
679 return v
680
681 psio = {k: valueFromFunc(k, v) for k, v in io.items()}
682 if "when" in step.tool:
683 evalinputs = {shortname(k): v for k, v in psio.items()}
684 whenval = expression.do_eval(
685 step.tool["when"],
686 evalinputs,
687 self.workflow.requirements,
688 None,
689 None,
690 {},
691 context=cast(Optional[CWLObjectType], v),
692 debug=runtimeContext.debug,
693 js_console=runtimeContext.js_console,
694 timeout=runtimeContext.eval_timeout,
695 )
696 if whenval is True:
697 pass
698 elif whenval is False:
699 _logger.debug(
700 "[%s] conditional %s evaluated to %s",
701 step.name,
702 step.tool["when"],
703 whenval,
704 )
705 _logger.debug(
706 "[%s] inputs was %s",
707 step.name,
708 json_dumps(evalinputs, indent=2),
709 )
710 return None
711 else:
712 raise WorkflowException(
713 "Conditional 'when' must evaluate to 'true' or 'false'"
714 )
715 return psio
716
717 if "scatter" in step.tool:
718 scatter = cast(List[str], aslist(step.tool["scatter"]))
719 method = step.tool.get("scatterMethod")
720 if method is None and len(scatter) != 1:
721 raise WorkflowException(
722 "Must specify scatterMethod when scattering over multiple inputs"
723 )
724 runtimeContext = runtimeContext.copy()
725 runtimeContext.postScatterEval = postScatterEval
726
727 emptyscatter = [
728 shortname(s) for s in scatter if len(cast(Sized, inputobj[s])) == 0
729 ]
730 if emptyscatter:
731 _logger.warning(
732 "[job %s] Notice: scattering over empty input in "
733 "'%s'. All outputs will be empty.",
734 step.name,
735 "', '".join(emptyscatter),
736 )
737
738 if method == "dotproduct" or method is None:
739 jobs = dotproduct_scatter(
740 step, inputobj, scatter, callback, runtimeContext
741 )
742 elif method == "nested_crossproduct":
743 jobs = nested_crossproduct_scatter(
744 step, inputobj, scatter, callback, runtimeContext
745 )
746 elif method == "flat_crossproduct":
747 jobs = flat_crossproduct_scatter(
748 step, inputobj, scatter, callback, runtimeContext
749 )
750 else:
751 if _logger.isEnabledFor(logging.DEBUG):
752 _logger.debug(
753 "[%s] job input %s", step.name, json_dumps(inputobj, indent=4)
754 )
755
756 inputobj = postScatterEval(inputobj)
757 if inputobj is not None:
758 if _logger.isEnabledFor(logging.DEBUG):
759 _logger.debug(
760 "[%s] evaluated job input to %s",
761 step.name,
762 json_dumps(inputobj, indent=4),
763 )
764 jobs = step.job(inputobj, callback, runtimeContext)
765 else:
766 _logger.info("[%s] will be skipped", step.name)
767 callback({k["id"]: None for k in outputparms}, "skipped")
768 step.completed = True
769 jobs = (_ for _ in ())
770
771 step.submitted = True
772
773 yield from jobs
774 except WorkflowException:
775 raise
776 except Exception:
777 _logger.exception("Unhandled exception")
778 self.processStatus = "permanentFail"
779 step.completed = True
780
781 def run(
782 self,
783 runtimeContext: RuntimeContext,
784 tmpdir_lock: Optional[threading.Lock] = None,
785 ) -> None:
786 """Log the start of each workflow."""
787 _logger.info("[%s] start", self.name)
788
789 def job(
790 self,
791 joborder: CWLObjectType,
792 output_callback: Optional[OutputCallbackType],
793 runtimeContext: RuntimeContext,
794 ) -> JobsGeneratorType:
795 self.state = {}
796 self.processStatus = "success"
797
798 if _logger.isEnabledFor(logging.DEBUG):
799 _logger.debug("[%s] inputs %s", self.name, json_dumps(joborder, indent=4))
800
801 runtimeContext = runtimeContext.copy()
802 runtimeContext.outdir = None
803
804 for index, inp in enumerate(self.tool["inputs"]):
805 with SourceLine(
806 self.tool["inputs"],
807 index,
808 WorkflowException,
809 _logger.isEnabledFor(logging.DEBUG),
810 ):
811 inp_id = shortname(inp["id"])
812 if inp_id in joborder:
813 self.state[inp["id"]] = WorkflowStateItem(
814 inp, joborder[inp_id], "success"
815 )
816 elif "default" in inp:
817 self.state[inp["id"]] = WorkflowStateItem(
818 inp, inp["default"], "success"
819 )
820 else:
821 raise WorkflowException(
822 "Input '%s' not in input object and does not have a "
823 " default value." % (inp["id"])
824 )
825
826 for step in self.steps:
827 for out in step.tool["outputs"]:
828 self.state[out["id"]] = None
829
830 completed = 0
831 while completed < len(self.steps):
832 self.made_progress = False
833
834 for step in self.steps:
835 if (
836 getdefault(runtimeContext.on_error, "stop") == "stop"
837 and self.processStatus != "success"
838 ):
839 break
840
841 if not step.submitted:
842 try:
843 step.iterable = self.try_make_job(
844 step, output_callback, runtimeContext
845 )
846 except WorkflowException as exc:
847 _logger.error("[%s] Cannot make job: %s", step.name, str(exc))
848 _logger.debug("", exc_info=True)
849 self.processStatus = "permanentFail"
850
851 if step.iterable is not None:
852 try:
853 for newjob in step.iterable:
854 if (
855 getdefault(runtimeContext.on_error, "stop") == "stop"
856 and self.processStatus != "success"
857 ):
858 break
859 if newjob is not None:
860 self.made_progress = True
861 yield newjob
862 else:
863 break
864 except WorkflowException as exc:
865 _logger.error("[%s] Cannot make job: %s", step.name, str(exc))
866 _logger.debug("", exc_info=True)
867 self.processStatus = "permanentFail"
868
869 completed = sum(1 for s in self.steps if s.completed)
870
871 if not self.made_progress and completed < len(self.steps):
872 if self.processStatus != "success":
873 break
874 else:
875 yield None
876
877 if not self.did_callback and output_callback:
878 # could have called earlier on line 336;
879 self.do_output_callback(output_callback)
880 # depends which one comes first. All steps are completed
881 # or all outputs have been produced.