Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/cwltool/workflow_job.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 import copy | |
2 import datetime | |
3 import functools | |
4 import logging | |
5 import threading | |
6 from typing import ( | |
7 Dict, | |
8 List, | |
9 MutableMapping, | |
10 MutableSequence, | |
11 Optional, | |
12 Sized, | |
13 Tuple, | |
14 cast, | |
15 ) | |
16 | |
17 from schema_salad.sourceline import SourceLine | |
18 from schema_salad.utils import json_dumps | |
19 from typing_extensions import TYPE_CHECKING | |
20 | |
21 from . import expression | |
22 from .builder import content_limit_respected_read | |
23 from .checker import can_assign_src_to_sink | |
24 from .context import RuntimeContext, getdefault | |
25 from .errors import WorkflowException | |
26 from .loghandler import _logger | |
27 from .process import shortname, uniquename | |
28 from .stdfsaccess import StdFsAccess | |
29 from .utils import ( | |
30 CWLObjectType, | |
31 CWLOutputType, | |
32 JobsGeneratorType, | |
33 OutputCallbackType, | |
34 ParametersType, | |
35 ScatterDestinationsType, | |
36 ScatterOutputCallbackType, | |
37 SinkType, | |
38 WorkflowStateItem, | |
39 adjustDirObjs, | |
40 aslist, | |
41 get_listing, | |
42 ) | |
43 | |
44 if TYPE_CHECKING: | |
45 from .provenance_profile import ProvenanceProfile | |
46 from .workflow import Workflow, WorkflowStep | |
47 | |
48 | |
49 class WorkflowJobStep: | |
50 """Generated for each step in Workflow.steps().""" | |
51 | |
52 def __init__(self, step: "WorkflowStep") -> None: | |
53 """Initialize this WorkflowJobStep.""" | |
54 self.step = step | |
55 self.tool = step.tool | |
56 self.id = step.id | |
57 self.submitted = False | |
58 self.iterable = None # type: Optional[JobsGeneratorType] | |
59 self.completed = False | |
60 self.name = uniquename("step %s" % shortname(self.id)) | |
61 self.prov_obj = step.prov_obj | |
62 self.parent_wf = step.parent_wf | |
63 | |
64 def job( | |
65 self, | |
66 joborder: CWLObjectType, | |
67 output_callback: Optional[OutputCallbackType], | |
68 runtimeContext: RuntimeContext, | |
69 ) -> JobsGeneratorType: | |
70 runtimeContext = runtimeContext.copy() | |
71 runtimeContext.part_of = self.name | |
72 runtimeContext.name = shortname(self.id) | |
73 | |
74 _logger.info("[%s] start", self.name) | |
75 | |
76 yield from self.step.job(joborder, output_callback, runtimeContext) | |
77 | |
78 | |
79 class ReceiveScatterOutput: | |
80 """Produced by the scatter generators.""" | |
81 | |
82 def __init__( | |
83 self, | |
84 output_callback: ScatterOutputCallbackType, | |
85 dest: ScatterDestinationsType, | |
86 total: int, | |
87 ) -> None: | |
88 """Initialize.""" | |
89 self.dest = dest | |
90 self.completed = 0 | |
91 self.processStatus = "success" | |
92 self.total = total | |
93 self.output_callback = output_callback | |
94 self.steps = [] # type: List[Optional[JobsGeneratorType]] | |
95 | |
96 def receive_scatter_output( | |
97 self, index: int, jobout: CWLObjectType, processStatus: str | |
98 ) -> None: | |
99 for key, val in jobout.items(): | |
100 self.dest[key][index] = val | |
101 | |
102 # Release the iterable related to this step to | |
103 # reclaim memory. | |
104 if self.steps: | |
105 self.steps[index] = None | |
106 | |
107 if processStatus != "success": | |
108 if self.processStatus != "permanentFail": | |
109 self.processStatus = processStatus | |
110 | |
111 self.completed += 1 | |
112 | |
113 if self.completed == self.total: | |
114 self.output_callback(self.dest, self.processStatus) | |
115 | |
116 def setTotal( | |
117 self, | |
118 total: int, | |
119 steps: List[Optional[JobsGeneratorType]], | |
120 ) -> None: | |
121 """ | |
122 Set the total number of expected outputs along with the steps. | |
123 | |
124 This is necessary to finish the setup. | |
125 """ | |
126 self.total = total | |
127 self.steps = steps | |
128 if self.completed == self.total: | |
129 self.output_callback(self.dest, self.processStatus) | |
130 | |
131 | |
132 def parallel_steps( | |
133 steps: List[Optional[JobsGeneratorType]], | |
134 rc: ReceiveScatterOutput, | |
135 runtimeContext: RuntimeContext, | |
136 ) -> JobsGeneratorType: | |
137 while rc.completed < rc.total: | |
138 made_progress = False | |
139 for index, step in enumerate(steps): | |
140 if getdefault( | |
141 runtimeContext.on_error, "stop" | |
142 ) == "stop" and rc.processStatus not in ("success", "skipped"): | |
143 break | |
144 if step is None: | |
145 continue | |
146 try: | |
147 for j in step: | |
148 if getdefault( | |
149 runtimeContext.on_error, "stop" | |
150 ) == "stop" and rc.processStatus not in ("success", "skipped"): | |
151 break | |
152 if j is not None: | |
153 made_progress = True | |
154 yield j | |
155 else: | |
156 break | |
157 if made_progress: | |
158 break | |
159 except WorkflowException as exc: | |
160 _logger.error("Cannot make scatter job: %s", str(exc)) | |
161 _logger.debug("", exc_info=True) | |
162 rc.receive_scatter_output(index, {}, "permanentFail") | |
163 if not made_progress and rc.completed < rc.total: | |
164 yield None | |
165 | |
166 | |
167 def nested_crossproduct_scatter( | |
168 process: WorkflowJobStep, | |
169 joborder: CWLObjectType, | |
170 scatter_keys: MutableSequence[str], | |
171 output_callback: ScatterOutputCallbackType, | |
172 runtimeContext: RuntimeContext, | |
173 ) -> JobsGeneratorType: | |
174 scatter_key = scatter_keys[0] | |
175 jobl = len(cast(Sized, joborder[scatter_key])) | |
176 output = {} # type: ScatterDestinationsType | |
177 for i in process.tool["outputs"]: | |
178 output[i["id"]] = [None] * jobl | |
179 | |
180 rc = ReceiveScatterOutput(output_callback, output, jobl) | |
181 | |
182 steps = [] # type: List[Optional[JobsGeneratorType]] | |
183 for index in range(0, jobl): | |
184 sjob = copy.copy(joborder) # type: Optional[CWLObjectType] | |
185 assert sjob is not None # nosec | |
186 sjob[scatter_key] = cast( | |
187 MutableMapping[int, CWLObjectType], joborder[scatter_key] | |
188 )[index] | |
189 | |
190 if len(scatter_keys) == 1: | |
191 if runtimeContext.postScatterEval is not None: | |
192 sjob = runtimeContext.postScatterEval(sjob) | |
193 curriedcallback = functools.partial(rc.receive_scatter_output, index) | |
194 if sjob is not None: | |
195 steps.append(process.job(sjob, curriedcallback, runtimeContext)) | |
196 else: | |
197 curriedcallback({}, "skipped") | |
198 steps.append(None) | |
199 else: | |
200 steps.append( | |
201 nested_crossproduct_scatter( | |
202 process, | |
203 sjob, | |
204 scatter_keys[1:], | |
205 functools.partial(rc.receive_scatter_output, index), | |
206 runtimeContext, | |
207 ) | |
208 ) | |
209 | |
210 rc.setTotal(jobl, steps) | |
211 return parallel_steps(steps, rc, runtimeContext) | |
212 | |
213 | |
214 def crossproduct_size( | |
215 joborder: CWLObjectType, scatter_keys: MutableSequence[str] | |
216 ) -> int: | |
217 scatter_key = scatter_keys[0] | |
218 if len(scatter_keys) == 1: | |
219 ssum = len(cast(Sized, joborder[scatter_key])) | |
220 else: | |
221 ssum = 0 | |
222 for _ in range(0, len(cast(Sized, joborder[scatter_key]))): | |
223 ssum += crossproduct_size(joborder, scatter_keys[1:]) | |
224 return ssum | |
225 | |
226 | |
227 def flat_crossproduct_scatter( | |
228 process: WorkflowJobStep, | |
229 joborder: CWLObjectType, | |
230 scatter_keys: MutableSequence[str], | |
231 output_callback: ScatterOutputCallbackType, | |
232 runtimeContext: RuntimeContext, | |
233 ) -> JobsGeneratorType: | |
234 output = {} # type: ScatterDestinationsType | |
235 for i in process.tool["outputs"]: | |
236 output[i["id"]] = [None] * crossproduct_size(joborder, scatter_keys) | |
237 callback = ReceiveScatterOutput(output_callback, output, 0) | |
238 (steps, total) = _flat_crossproduct_scatter( | |
239 process, joborder, scatter_keys, callback, 0, runtimeContext | |
240 ) | |
241 callback.setTotal(total, steps) | |
242 return parallel_steps(steps, callback, runtimeContext) | |
243 | |
244 | |
245 def _flat_crossproduct_scatter( | |
246 process: WorkflowJobStep, | |
247 joborder: CWLObjectType, | |
248 scatter_keys: MutableSequence[str], | |
249 callback: ReceiveScatterOutput, | |
250 startindex: int, | |
251 runtimeContext: RuntimeContext, | |
252 ) -> Tuple[List[Optional[JobsGeneratorType]], int,]: | |
253 """Inner loop.""" | |
254 scatter_key = scatter_keys[0] | |
255 jobl = len(cast(Sized, joborder[scatter_key])) | |
256 steps = [] # type: List[Optional[JobsGeneratorType]] | |
257 put = startindex | |
258 for index in range(0, jobl): | |
259 sjob = copy.copy(joborder) # type: Optional[CWLObjectType] | |
260 assert sjob is not None # nosec | |
261 sjob[scatter_key] = cast( | |
262 MutableMapping[int, CWLObjectType], joborder[scatter_key] | |
263 )[index] | |
264 | |
265 if len(scatter_keys) == 1: | |
266 if runtimeContext.postScatterEval is not None: | |
267 sjob = runtimeContext.postScatterEval(sjob) | |
268 curriedcallback = functools.partial(callback.receive_scatter_output, put) | |
269 if sjob is not None: | |
270 steps.append(process.job(sjob, curriedcallback, runtimeContext)) | |
271 else: | |
272 curriedcallback({}, "skipped") | |
273 steps.append(None) | |
274 put += 1 | |
275 else: | |
276 (add, _) = _flat_crossproduct_scatter( | |
277 process, sjob, scatter_keys[1:], callback, put, runtimeContext | |
278 ) | |
279 put += len(add) | |
280 steps.extend(add) | |
281 | |
282 return (steps, put) | |
283 | |
284 | |
285 def dotproduct_scatter( | |
286 process: WorkflowJobStep, | |
287 joborder: CWLObjectType, | |
288 scatter_keys: MutableSequence[str], | |
289 output_callback: ScatterOutputCallbackType, | |
290 runtimeContext: RuntimeContext, | |
291 ) -> JobsGeneratorType: | |
292 jobl = None # type: Optional[int] | |
293 for key in scatter_keys: | |
294 if jobl is None: | |
295 jobl = len(cast(Sized, joborder[key])) | |
296 elif jobl != len(cast(Sized, joborder[key])): | |
297 raise WorkflowException( | |
298 "Length of input arrays must be equal when performing " | |
299 "dotproduct scatter." | |
300 ) | |
301 if jobl is None: | |
302 raise Exception("Impossible codepath") | |
303 | |
304 output = {} # type: ScatterDestinationsType | |
305 for i in process.tool["outputs"]: | |
306 output[i["id"]] = [None] * jobl | |
307 | |
308 rc = ReceiveScatterOutput(output_callback, output, jobl) | |
309 | |
310 steps = [] # type: List[Optional[JobsGeneratorType]] | |
311 for index in range(0, jobl): | |
312 sjobo = copy.copy(joborder) # type: Optional[CWLObjectType] | |
313 assert sjobo is not None # nosec | |
314 for key in scatter_keys: | |
315 sjobo[key] = cast(MutableMapping[int, CWLObjectType], joborder[key])[index] | |
316 | |
317 if runtimeContext.postScatterEval is not None: | |
318 sjobo = runtimeContext.postScatterEval(sjobo) | |
319 curriedcallback = functools.partial(rc.receive_scatter_output, index) | |
320 if sjobo is not None: | |
321 steps.append(process.job(sjobo, curriedcallback, runtimeContext)) | |
322 else: | |
323 curriedcallback({}, "skipped") | |
324 steps.append(None) | |
325 | |
326 rc.setTotal(jobl, steps) | |
327 return parallel_steps(steps, rc, runtimeContext) | |
328 | |
329 | |
330 def match_types( | |
331 sinktype: Optional[SinkType], | |
332 src: WorkflowStateItem, | |
333 iid: str, | |
334 inputobj: CWLObjectType, | |
335 linkMerge: Optional[str], | |
336 valueFrom: Optional[str], | |
337 ) -> bool: | |
338 if isinstance(sinktype, MutableSequence): | |
339 # Sink is union type | |
340 for st in sinktype: | |
341 if match_types(st, src, iid, inputobj, linkMerge, valueFrom): | |
342 return True | |
343 elif isinstance(src.parameter["type"], MutableSequence): | |
344 # Source is union type | |
345 # Check that at least one source type is compatible with the sink. | |
346 original_types = src.parameter["type"] | |
347 for source_type in original_types: | |
348 src.parameter["type"] = source_type | |
349 match = match_types(sinktype, src, iid, inputobj, linkMerge, valueFrom) | |
350 if match: | |
351 src.parameter["type"] = original_types | |
352 return True | |
353 src.parameter["type"] = original_types | |
354 return False | |
355 elif linkMerge: | |
356 if iid not in inputobj: | |
357 inputobj[iid] = [] | |
358 sourceTypes = cast(List[Optional[CWLOutputType]], inputobj[iid]) | |
359 if linkMerge == "merge_nested": | |
360 sourceTypes.append(src.value) | |
361 elif linkMerge == "merge_flattened": | |
362 if isinstance(src.value, MutableSequence): | |
363 sourceTypes.extend(src.value) | |
364 else: | |
365 sourceTypes.append(src.value) | |
366 else: | |
367 raise WorkflowException("Unrecognized linkMerge enum '%s'" % linkMerge) | |
368 return True | |
369 elif ( | |
370 valueFrom is not None | |
371 or can_assign_src_to_sink(cast(SinkType, src.parameter["type"]), sinktype) | |
372 or sinktype == "Any" | |
373 ): | |
374 # simply assign the value from state to input | |
375 inputobj[iid] = copy.deepcopy(src.value) | |
376 return True | |
377 return False | |
378 | |
379 | |
380 def object_from_state( | |
381 state: Dict[str, Optional[WorkflowStateItem]], | |
382 parms: ParametersType, | |
383 frag_only: bool, | |
384 supportsMultipleInput: bool, | |
385 sourceField: str, | |
386 incomplete: bool = False, | |
387 ) -> Optional[CWLObjectType]: | |
388 inputobj = {} # type: CWLObjectType | |
389 for inp in parms: | |
390 iid = original_id = cast(str, inp["id"]) | |
391 if frag_only: | |
392 iid = shortname(iid) | |
393 if sourceField in inp: | |
394 connections = aslist(inp[sourceField]) | |
395 if len(connections) > 1 and not supportsMultipleInput: | |
396 raise WorkflowException( | |
397 "Workflow contains multiple inbound links to a single " | |
398 "parameter but MultipleInputFeatureRequirement is not " | |
399 "declared." | |
400 ) | |
401 for src in connections: | |
402 a_state = state.get(src, None) | |
403 if a_state is not None and ( | |
404 a_state.success in ("success", "skipped") or incomplete | |
405 ): | |
406 if not match_types( | |
407 inp["type"], | |
408 a_state, | |
409 iid, | |
410 inputobj, | |
411 cast( | |
412 Optional[str], | |
413 inp.get( | |
414 "linkMerge", | |
415 ("merge_nested" if len(connections) > 1 else None), | |
416 ), | |
417 ), | |
418 valueFrom=cast(str, inp.get("valueFrom")), | |
419 ): | |
420 raise WorkflowException( | |
421 "Type mismatch between source '%s' (%s) and " | |
422 "sink '%s' (%s)" | |
423 % (src, a_state.parameter["type"], original_id, inp["type"]) | |
424 ) | |
425 elif src not in state: | |
426 raise WorkflowException( | |
427 "Connect source '%s' on parameter '%s' does not " | |
428 "exist" % (src, original_id) | |
429 ) | |
430 elif not incomplete: | |
431 return None | |
432 | |
433 if "pickValue" in inp and isinstance(inputobj.get(iid), MutableSequence): | |
434 seq = cast(MutableSequence[Optional[CWLOutputType]], inputobj.get(iid)) | |
435 if inp["pickValue"] == "first_non_null": | |
436 found = False | |
437 for v in seq: | |
438 if v is not None: | |
439 found = True | |
440 inputobj[iid] = v | |
441 break | |
442 if not found: | |
443 raise WorkflowException( | |
444 "All sources for '%s' are null" % (shortname(original_id)) | |
445 ) | |
446 elif inp["pickValue"] == "the_only_non_null": | |
447 found = False | |
448 for v in seq: | |
449 if v is not None: | |
450 if found: | |
451 raise WorkflowException( | |
452 "Expected only one source for '%s' to be non-null, got %s" | |
453 % (shortname(original_id), seq) | |
454 ) | |
455 found = True | |
456 inputobj[iid] = v | |
457 if not found: | |
458 raise WorkflowException( | |
459 "All sources for '%s' are null" % (shortname(original_id)) | |
460 ) | |
461 elif inp["pickValue"] == "all_non_null": | |
462 inputobj[iid] = [v for v in seq if v is not None] | |
463 | |
464 if inputobj.get(iid) is None and "default" in inp: | |
465 inputobj[iid] = inp["default"] | |
466 | |
467 if iid not in inputobj and ("valueFrom" in inp or incomplete): | |
468 inputobj[iid] = None | |
469 | |
470 if iid not in inputobj: | |
471 raise WorkflowException("Value for %s not specified" % original_id) | |
472 return inputobj | |
473 | |
474 | |
475 class WorkflowJob: | |
476 """Generates steps from the Workflow.""" | |
477 | |
478 def __init__(self, workflow: "Workflow", runtimeContext: RuntimeContext) -> None: | |
479 """Initialize this WorkflowJob.""" | |
480 self.workflow = workflow | |
481 self.prov_obj = None # type: Optional[ProvenanceProfile] | |
482 self.parent_wf = None # type: Optional[ProvenanceProfile] | |
483 self.tool = workflow.tool | |
484 if runtimeContext.research_obj is not None: | |
485 self.prov_obj = workflow.provenance_object | |
486 self.parent_wf = workflow.parent_wf | |
487 self.steps = [WorkflowJobStep(s) for s in workflow.steps] | |
488 self.state = {} # type: Dict[str, Optional[WorkflowStateItem]] | |
489 self.processStatus = "" | |
490 self.did_callback = False | |
491 self.made_progress = None # type: Optional[bool] | |
492 self.outdir = runtimeContext.get_outdir() | |
493 | |
494 self.name = uniquename( | |
495 "workflow {}".format( | |
496 getdefault( | |
497 runtimeContext.name, | |
498 shortname(self.workflow.tool.get("id", "embedded")), | |
499 ) | |
500 ) | |
501 ) | |
502 | |
503 _logger.debug( | |
504 "[%s] initialized from %s", | |
505 self.name, | |
506 self.tool.get("id", "workflow embedded in %s" % runtimeContext.part_of), | |
507 ) | |
508 | |
509 def do_output_callback(self, final_output_callback: OutputCallbackType) -> None: | |
510 | |
511 supportsMultipleInput = bool( | |
512 self.workflow.get_requirement("MultipleInputFeatureRequirement")[0] | |
513 ) | |
514 | |
515 wo = None # type: Optional[CWLObjectType] | |
516 try: | |
517 wo = object_from_state( | |
518 self.state, | |
519 self.tool["outputs"], | |
520 True, | |
521 supportsMultipleInput, | |
522 "outputSource", | |
523 incomplete=True, | |
524 ) | |
525 except WorkflowException as err: | |
526 _logger.error( | |
527 "[%s] Cannot collect workflow output: %s", self.name, str(err) | |
528 ) | |
529 self.processStatus = "permanentFail" | |
530 if ( | |
531 self.prov_obj | |
532 and self.parent_wf | |
533 and self.prov_obj.workflow_run_uri != self.parent_wf.workflow_run_uri | |
534 ): | |
535 process_run_id = None # type: Optional[str] | |
536 self.prov_obj.generate_output_prov(wo or {}, process_run_id, self.name) | |
537 self.prov_obj.document.wasEndedBy( | |
538 self.prov_obj.workflow_run_uri, | |
539 None, | |
540 self.prov_obj.engine_uuid, | |
541 datetime.datetime.now(), | |
542 ) | |
543 prov_ids = self.prov_obj.finalize_prov_profile(self.name) | |
544 # Tell parent to associate our provenance files with our wf run | |
545 self.parent_wf.activity_has_provenance( | |
546 self.prov_obj.workflow_run_uri, prov_ids | |
547 ) | |
548 | |
549 _logger.info("[%s] completed %s", self.name, self.processStatus) | |
550 if _logger.isEnabledFor(logging.DEBUG): | |
551 _logger.debug("[%s] outputs %s", self.name, json_dumps(wo, indent=4)) | |
552 | |
553 self.did_callback = True | |
554 | |
555 final_output_callback(wo, self.processStatus) | |
556 | |
557 def receive_output( | |
558 self, | |
559 step: WorkflowJobStep, | |
560 outputparms: List[CWLObjectType], | |
561 final_output_callback: OutputCallbackType, | |
562 jobout: CWLObjectType, | |
563 processStatus: str, | |
564 ) -> None: | |
565 | |
566 for i in outputparms: | |
567 if "id" in i: | |
568 iid = cast(str, i["id"]) | |
569 if iid in jobout: | |
570 self.state[iid] = WorkflowStateItem(i, jobout[iid], processStatus) | |
571 else: | |
572 _logger.error( | |
573 "[%s] Output is missing expected field %s", step.name, iid | |
574 ) | |
575 processStatus = "permanentFail" | |
576 if _logger.isEnabledFor(logging.DEBUG): | |
577 _logger.debug( | |
578 "[%s] produced output %s", step.name, json_dumps(jobout, indent=4) | |
579 ) | |
580 | |
581 if processStatus not in ("success", "skipped"): | |
582 if self.processStatus != "permanentFail": | |
583 self.processStatus = processStatus | |
584 | |
585 _logger.warning("[%s] completed %s", step.name, processStatus) | |
586 else: | |
587 _logger.info("[%s] completed %s", step.name, processStatus) | |
588 | |
589 step.completed = True | |
590 # Release the iterable related to this step to | |
591 # reclaim memory. | |
592 step.iterable = None | |
593 self.made_progress = True | |
594 | |
595 completed = sum(1 for s in self.steps if s.completed) | |
596 if completed == len(self.steps): | |
597 self.do_output_callback(final_output_callback) | |
598 | |
599 def try_make_job( | |
600 self, | |
601 step: WorkflowJobStep, | |
602 final_output_callback: Optional[OutputCallbackType], | |
603 runtimeContext: RuntimeContext, | |
604 ) -> JobsGeneratorType: | |
605 | |
606 if step.submitted: | |
607 return | |
608 | |
609 inputparms = step.tool["inputs"] | |
610 outputparms = step.tool["outputs"] | |
611 | |
612 supportsMultipleInput = bool( | |
613 self.workflow.get_requirement("MultipleInputFeatureRequirement")[0] | |
614 ) | |
615 | |
616 try: | |
617 inputobj = object_from_state( | |
618 self.state, inputparms, False, supportsMultipleInput, "source" | |
619 ) | |
620 if inputobj is None: | |
621 _logger.debug("[%s] job step %s not ready", self.name, step.id) | |
622 return | |
623 | |
624 if step.submitted: | |
625 return | |
626 _logger.info("[%s] starting %s", self.name, step.name) | |
627 | |
628 callback = functools.partial( | |
629 self.receive_output, step, outputparms, final_output_callback | |
630 ) | |
631 | |
632 valueFrom = { | |
633 i["id"]: i["valueFrom"] for i in step.tool["inputs"] if "valueFrom" in i | |
634 } | |
635 | |
636 loadContents = { | |
637 i["id"] for i in step.tool["inputs"] if i.get("loadContents") | |
638 } | |
639 | |
640 if len(valueFrom) > 0 and not bool( | |
641 self.workflow.get_requirement("StepInputExpressionRequirement")[0] | |
642 ): | |
643 raise WorkflowException( | |
644 "Workflow step contains valueFrom but StepInputExpressionRequirement not in requirements" | |
645 ) | |
646 | |
647 vfinputs = {shortname(k): v for k, v in inputobj.items()} | |
648 | |
649 def postScatterEval(io: CWLObjectType) -> Optional[CWLObjectType]: | |
650 shortio = cast(CWLObjectType, {shortname(k): v for k, v in io.items()}) | |
651 | |
652 fs_access = getdefault(runtimeContext.make_fs_access, StdFsAccess)("") | |
653 for k, v in io.items(): | |
654 if k in loadContents: | |
655 val = cast(CWLObjectType, v) | |
656 if val.get("contents") is None: | |
657 with fs_access.open(cast(str, val["location"]), "rb") as f: | |
658 val["contents"] = content_limit_respected_read(f) | |
659 | |
660 def valueFromFunc( | |
661 k: str, v: Optional[CWLOutputType] | |
662 ) -> Optional[CWLOutputType]: | |
663 if k in valueFrom: | |
664 adjustDirObjs( | |
665 v, functools.partial(get_listing, fs_access, recursive=True) | |
666 ) | |
667 return expression.do_eval( | |
668 valueFrom[k], | |
669 shortio, | |
670 self.workflow.requirements, | |
671 None, | |
672 None, | |
673 {}, | |
674 context=v, | |
675 debug=runtimeContext.debug, | |
676 js_console=runtimeContext.js_console, | |
677 timeout=runtimeContext.eval_timeout, | |
678 ) | |
679 return v | |
680 | |
681 psio = {k: valueFromFunc(k, v) for k, v in io.items()} | |
682 if "when" in step.tool: | |
683 evalinputs = {shortname(k): v for k, v in psio.items()} | |
684 whenval = expression.do_eval( | |
685 step.tool["when"], | |
686 evalinputs, | |
687 self.workflow.requirements, | |
688 None, | |
689 None, | |
690 {}, | |
691 context=cast(Optional[CWLObjectType], v), | |
692 debug=runtimeContext.debug, | |
693 js_console=runtimeContext.js_console, | |
694 timeout=runtimeContext.eval_timeout, | |
695 ) | |
696 if whenval is True: | |
697 pass | |
698 elif whenval is False: | |
699 _logger.debug( | |
700 "[%s] conditional %s evaluated to %s", | |
701 step.name, | |
702 step.tool["when"], | |
703 whenval, | |
704 ) | |
705 _logger.debug( | |
706 "[%s] inputs was %s", | |
707 step.name, | |
708 json_dumps(evalinputs, indent=2), | |
709 ) | |
710 return None | |
711 else: | |
712 raise WorkflowException( | |
713 "Conditional 'when' must evaluate to 'true' or 'false'" | |
714 ) | |
715 return psio | |
716 | |
717 if "scatter" in step.tool: | |
718 scatter = cast(List[str], aslist(step.tool["scatter"])) | |
719 method = step.tool.get("scatterMethod") | |
720 if method is None and len(scatter) != 1: | |
721 raise WorkflowException( | |
722 "Must specify scatterMethod when scattering over multiple inputs" | |
723 ) | |
724 runtimeContext = runtimeContext.copy() | |
725 runtimeContext.postScatterEval = postScatterEval | |
726 | |
727 emptyscatter = [ | |
728 shortname(s) for s in scatter if len(cast(Sized, inputobj[s])) == 0 | |
729 ] | |
730 if emptyscatter: | |
731 _logger.warning( | |
732 "[job %s] Notice: scattering over empty input in " | |
733 "'%s'. All outputs will be empty.", | |
734 step.name, | |
735 "', '".join(emptyscatter), | |
736 ) | |
737 | |
738 if method == "dotproduct" or method is None: | |
739 jobs = dotproduct_scatter( | |
740 step, inputobj, scatter, callback, runtimeContext | |
741 ) | |
742 elif method == "nested_crossproduct": | |
743 jobs = nested_crossproduct_scatter( | |
744 step, inputobj, scatter, callback, runtimeContext | |
745 ) | |
746 elif method == "flat_crossproduct": | |
747 jobs = flat_crossproduct_scatter( | |
748 step, inputobj, scatter, callback, runtimeContext | |
749 ) | |
750 else: | |
751 if _logger.isEnabledFor(logging.DEBUG): | |
752 _logger.debug( | |
753 "[%s] job input %s", step.name, json_dumps(inputobj, indent=4) | |
754 ) | |
755 | |
756 inputobj = postScatterEval(inputobj) | |
757 if inputobj is not None: | |
758 if _logger.isEnabledFor(logging.DEBUG): | |
759 _logger.debug( | |
760 "[%s] evaluated job input to %s", | |
761 step.name, | |
762 json_dumps(inputobj, indent=4), | |
763 ) | |
764 jobs = step.job(inputobj, callback, runtimeContext) | |
765 else: | |
766 _logger.info("[%s] will be skipped", step.name) | |
767 callback({k["id"]: None for k in outputparms}, "skipped") | |
768 step.completed = True | |
769 jobs = (_ for _ in ()) | |
770 | |
771 step.submitted = True | |
772 | |
773 yield from jobs | |
774 except WorkflowException: | |
775 raise | |
776 except Exception: | |
777 _logger.exception("Unhandled exception") | |
778 self.processStatus = "permanentFail" | |
779 step.completed = True | |
780 | |
781 def run( | |
782 self, | |
783 runtimeContext: RuntimeContext, | |
784 tmpdir_lock: Optional[threading.Lock] = None, | |
785 ) -> None: | |
786 """Log the start of each workflow.""" | |
787 _logger.info("[%s] start", self.name) | |
788 | |
789 def job( | |
790 self, | |
791 joborder: CWLObjectType, | |
792 output_callback: Optional[OutputCallbackType], | |
793 runtimeContext: RuntimeContext, | |
794 ) -> JobsGeneratorType: | |
795 self.state = {} | |
796 self.processStatus = "success" | |
797 | |
798 if _logger.isEnabledFor(logging.DEBUG): | |
799 _logger.debug("[%s] inputs %s", self.name, json_dumps(joborder, indent=4)) | |
800 | |
801 runtimeContext = runtimeContext.copy() | |
802 runtimeContext.outdir = None | |
803 | |
804 for index, inp in enumerate(self.tool["inputs"]): | |
805 with SourceLine( | |
806 self.tool["inputs"], | |
807 index, | |
808 WorkflowException, | |
809 _logger.isEnabledFor(logging.DEBUG), | |
810 ): | |
811 inp_id = shortname(inp["id"]) | |
812 if inp_id in joborder: | |
813 self.state[inp["id"]] = WorkflowStateItem( | |
814 inp, joborder[inp_id], "success" | |
815 ) | |
816 elif "default" in inp: | |
817 self.state[inp["id"]] = WorkflowStateItem( | |
818 inp, inp["default"], "success" | |
819 ) | |
820 else: | |
821 raise WorkflowException( | |
822 "Input '%s' not in input object and does not have a " | |
823 " default value." % (inp["id"]) | |
824 ) | |
825 | |
826 for step in self.steps: | |
827 for out in step.tool["outputs"]: | |
828 self.state[out["id"]] = None | |
829 | |
830 completed = 0 | |
831 while completed < len(self.steps): | |
832 self.made_progress = False | |
833 | |
834 for step in self.steps: | |
835 if ( | |
836 getdefault(runtimeContext.on_error, "stop") == "stop" | |
837 and self.processStatus != "success" | |
838 ): | |
839 break | |
840 | |
841 if not step.submitted: | |
842 try: | |
843 step.iterable = self.try_make_job( | |
844 step, output_callback, runtimeContext | |
845 ) | |
846 except WorkflowException as exc: | |
847 _logger.error("[%s] Cannot make job: %s", step.name, str(exc)) | |
848 _logger.debug("", exc_info=True) | |
849 self.processStatus = "permanentFail" | |
850 | |
851 if step.iterable is not None: | |
852 try: | |
853 for newjob in step.iterable: | |
854 if ( | |
855 getdefault(runtimeContext.on_error, "stop") == "stop" | |
856 and self.processStatus != "success" | |
857 ): | |
858 break | |
859 if newjob is not None: | |
860 self.made_progress = True | |
861 yield newjob | |
862 else: | |
863 break | |
864 except WorkflowException as exc: | |
865 _logger.error("[%s] Cannot make job: %s", step.name, str(exc)) | |
866 _logger.debug("", exc_info=True) | |
867 self.processStatus = "permanentFail" | |
868 | |
869 completed = sum(1 for s in self.steps if s.completed) | |
870 | |
871 if not self.made_progress and completed < len(self.steps): | |
872 if self.processStatus != "success": | |
873 break | |
874 else: | |
875 yield None | |
876 | |
877 if not self.did_callback and output_callback: | |
878 # could have called earlier on line 336; | |
879 self.do_output_callback(output_callback) | |
880 # depends which one comes first. All steps are completed | |
881 # or all outputs have been produced. |