comparison env/lib/python3.9/site-packages/cwltool/checker.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """Static checking of CWL workflow connectivity."""
2 from collections import namedtuple
3 from typing import (
4 Any,
5 Dict,
6 List,
7 MutableMapping,
8 MutableSequence,
9 Optional,
10 Sized,
11 Union,
12 cast,
13 )
14
15 from schema_salad.exceptions import ValidationException
16 from schema_salad.sourceline import SourceLine, bullets, strip_dup_lineno
17 from schema_salad.utils import json_dumps
18
19 from .errors import WorkflowException
20 from .loghandler import _logger
21 from .process import shortname
22 from .utils import CWLObjectType, CWLOutputAtomType, CWLOutputType, SinkType, aslist
23
24
25 def _get_type(tp):
26 # type: (Any) -> Any
27 if isinstance(tp, MutableMapping):
28 if tp.get("type") not in ("array", "record", "enum"):
29 return tp["type"]
30 return tp
31
32
33 def check_types(
34 srctype: SinkType,
35 sinktype: SinkType,
36 linkMerge: Optional[str],
37 valueFrom: Optional[str],
38 ) -> str:
39 """
40 Check if the source and sink types are correct.
41
42 Acceptable types are "pass", "warning", or "exception".
43 """
44 if valueFrom is not None:
45 return "pass"
46 if linkMerge is None:
47 if can_assign_src_to_sink(srctype, sinktype, strict=True):
48 return "pass"
49 if can_assign_src_to_sink(srctype, sinktype, strict=False):
50 return "warning"
51 return "exception"
52 if linkMerge == "merge_nested":
53 return check_types(
54 {"items": _get_type(srctype), "type": "array"},
55 _get_type(sinktype),
56 None,
57 None,
58 )
59 if linkMerge == "merge_flattened":
60 return check_types(
61 merge_flatten_type(_get_type(srctype)), _get_type(sinktype), None, None
62 )
63 raise WorkflowException(f"Unrecognized linkMerge enum '{linkMerge}'")
64
65
66 def merge_flatten_type(src: SinkType) -> CWLOutputType:
67 """Return the merge flattened type of the source type."""
68 if isinstance(src, MutableSequence):
69 return [merge_flatten_type(cast(SinkType, t)) for t in src]
70 if isinstance(src, MutableMapping) and src.get("type") == "array":
71 return src
72 return {"items": src, "type": "array"}
73
74
75 def can_assign_src_to_sink(
76 src: SinkType, sink: Optional[SinkType], strict: bool = False
77 ) -> bool:
78 """
79 Check for identical type specifications, ignoring extra keys like inputBinding.
80
81 src: admissible source types
82 sink: admissible sink types
83
84 In non-strict comparison, at least one source type must match one sink type.
85 In strict comparison, all source types must match at least one sink type.
86 """
87 if src == "Any" or sink == "Any":
88 return True
89 if isinstance(src, MutableMapping) and isinstance(sink, MutableMapping):
90 if sink.get("not_connected") and strict:
91 return False
92 if src["type"] == "array" and sink["type"] == "array":
93 return can_assign_src_to_sink(
94 cast(MutableSequence[CWLOutputAtomType], src["items"]),
95 cast(MutableSequence[CWLOutputAtomType], sink["items"]),
96 strict,
97 )
98 if src["type"] == "record" and sink["type"] == "record":
99 return _compare_records(src, sink, strict)
100 if src["type"] == "File" and sink["type"] == "File":
101 for sinksf in cast(List[CWLObjectType], sink.get("secondaryFiles", [])):
102 if not [
103 1
104 for srcsf in cast(
105 List[CWLObjectType], src.get("secondaryFiles", [])
106 )
107 if sinksf == srcsf
108 ]:
109 if strict:
110 return False
111 return True
112 return can_assign_src_to_sink(
113 cast(SinkType, src["type"]), cast(Optional[SinkType], sink["type"]), strict
114 )
115 if isinstance(src, MutableSequence):
116 if strict:
117 for this_src in src:
118 if not can_assign_src_to_sink(cast(SinkType, this_src), sink):
119 return False
120 return True
121 for this_src in src:
122 if can_assign_src_to_sink(cast(SinkType, this_src), sink):
123 return True
124 return False
125 if isinstance(sink, MutableSequence):
126 for this_sink in sink:
127 if can_assign_src_to_sink(src, cast(SinkType, this_sink)):
128 return True
129 return False
130 return bool(src == sink)
131
132
133 def _compare_records(
134 src: CWLObjectType, sink: CWLObjectType, strict: bool = False
135 ) -> bool:
136 """
137 Compare two records, ensuring they have compatible fields.
138
139 This handles normalizing record names, which will be relative to workflow
140 step, so that they can be compared.
141 """
142
143 def _rec_fields(
144 rec,
145 ): # type: (MutableMapping[str, Any]) -> MutableMapping[str, Any]
146 out = {}
147 for field in rec["fields"]:
148 name = shortname(field["name"])
149 out[name] = field["type"]
150 return out
151
152 srcfields = _rec_fields(src)
153 sinkfields = _rec_fields(sink)
154 for key in sinkfields.keys():
155 if (
156 not can_assign_src_to_sink(
157 srcfields.get(key, "null"), sinkfields.get(key, "null"), strict
158 )
159 and sinkfields.get(key) is not None
160 ):
161 _logger.info(
162 "Record comparison failure for %s and %s\n"
163 "Did not match fields for %s: %s and %s",
164 src["name"],
165 sink["name"],
166 key,
167 srcfields.get(key),
168 sinkfields.get(key),
169 )
170 return False
171 return True
172
173
174 def missing_subset(fullset: List[Any], subset: List[Any]) -> List[Any]:
175 missing = []
176 for i in subset:
177 if i not in fullset:
178 missing.append(i)
179 return missing
180
181
182 def static_checker(
183 workflow_inputs: List[CWLObjectType],
184 workflow_outputs: MutableSequence[CWLObjectType],
185 step_inputs: MutableSequence[CWLObjectType],
186 step_outputs: List[CWLObjectType],
187 param_to_step: Dict[str, CWLObjectType],
188 ) -> None:
189 """Check if all source and sink types of a workflow are compatible before run time."""
190 # source parameters: workflow_inputs and step_outputs
191 # sink parameters: step_inputs and workflow_outputs
192
193 # make a dictionary of source parameters, indexed by the "id" field
194 src_parms = workflow_inputs + step_outputs
195 src_dict = {} # type: Dict[str, CWLObjectType]
196 for parm in src_parms:
197 src_dict[cast(str, parm["id"])] = parm
198
199 step_inputs_val = check_all_types(src_dict, step_inputs, "source", param_to_step)
200 workflow_outputs_val = check_all_types(
201 src_dict, workflow_outputs, "outputSource", param_to_step
202 )
203
204 warnings = step_inputs_val["warning"] + workflow_outputs_val["warning"]
205 exceptions = step_inputs_val["exception"] + workflow_outputs_val["exception"]
206
207 warning_msgs = []
208 exception_msgs = []
209 for warning in warnings:
210 src = warning.src
211 sink = warning.sink
212 linkMerge = warning.linkMerge
213 sinksf = sorted(
214 [
215 p["pattern"]
216 for p in sink.get("secondaryFiles", [])
217 if p.get("required", True)
218 ]
219 )
220 srcsf = sorted([p["pattern"] for p in src.get("secondaryFiles", [])])
221 # Every secondaryFile required by the sink, should be declared
222 # by the source
223 missing = missing_subset(srcsf, sinksf)
224 if missing:
225 msg1 = "Parameter '{}' requires secondaryFiles {} but".format(
226 shortname(sink["id"]),
227 missing,
228 )
229 msg3 = SourceLine(src, "id").makeError(
230 "source '%s' does not provide those secondaryFiles."
231 % (shortname(src["id"]))
232 )
233 msg4 = SourceLine(src.get("_tool_entry", src), "secondaryFiles").makeError(
234 "To resolve, add missing secondaryFiles patterns to definition of '%s' or"
235 % (shortname(src["id"]))
236 )
237 msg5 = SourceLine(
238 sink.get("_tool_entry", sink), "secondaryFiles"
239 ).makeError(
240 "mark missing secondaryFiles in definition of '%s' as optional."
241 % shortname(sink["id"])
242 )
243 msg = SourceLine(sink).makeError(
244 "{}\n{}".format(msg1, bullets([msg3, msg4, msg5], " "))
245 )
246 elif sink.get("not_connected"):
247 if not sink.get("used_by_step"):
248 msg = SourceLine(sink, "type").makeError(
249 "'%s' is not an input parameter of %s, expected %s"
250 % (
251 shortname(sink["id"]),
252 param_to_step[sink["id"]]["run"],
253 ", ".join(
254 shortname(cast(str, s["id"]))
255 for s in cast(
256 List[Dict[str, Union[str, bool]]],
257 param_to_step[sink["id"]]["inputs"],
258 )
259 if not s.get("not_connected")
260 ),
261 )
262 )
263 else:
264 msg = ""
265 else:
266 msg = (
267 SourceLine(src, "type").makeError(
268 "Source '%s' of type %s may be incompatible"
269 % (shortname(src["id"]), json_dumps(src["type"]))
270 )
271 + "\n"
272 + SourceLine(sink, "type").makeError(
273 " with sink '%s' of type %s"
274 % (shortname(sink["id"]), json_dumps(sink["type"]))
275 )
276 )
277 if linkMerge is not None:
278 msg += "\n" + SourceLine(sink).makeError(
279 " source has linkMerge method %s" % linkMerge
280 )
281
282 if warning.message is not None:
283 msg += "\n" + SourceLine(sink).makeError(" " + warning.message)
284
285 if msg:
286 warning_msgs.append(msg)
287
288 for exception in exceptions:
289 src = exception.src
290 sink = exception.sink
291 linkMerge = exception.linkMerge
292 extra_message = exception.message
293 msg = (
294 SourceLine(src, "type").makeError(
295 "Source '%s' of type %s is incompatible"
296 % (shortname(src["id"]), json_dumps(src["type"]))
297 )
298 + "\n"
299 + SourceLine(sink, "type").makeError(
300 " with sink '%s' of type %s"
301 % (shortname(sink["id"]), json_dumps(sink["type"]))
302 )
303 )
304 if extra_message is not None:
305 msg += "\n" + SourceLine(sink).makeError(" " + extra_message)
306
307 if linkMerge is not None:
308 msg += "\n" + SourceLine(sink).makeError(
309 " source has linkMerge method %s" % linkMerge
310 )
311 exception_msgs.append(msg)
312
313 for sink in step_inputs:
314 if (
315 "null" != sink["type"]
316 and "null" not in sink["type"]
317 and "source" not in sink
318 and "default" not in sink
319 and "valueFrom" not in sink
320 ):
321 msg = SourceLine(sink).makeError(
322 "Required parameter '%s' does not have source, default, or valueFrom expression"
323 % shortname(sink["id"])
324 )
325 exception_msgs.append(msg)
326
327 all_warning_msg = strip_dup_lineno("\n".join(warning_msgs))
328 all_exception_msg = strip_dup_lineno("\n" + "\n".join(exception_msgs))
329
330 if all_warning_msg:
331 _logger.warning("Workflow checker warning:\n%s", all_warning_msg)
332 if exceptions:
333 raise ValidationException(all_exception_msg)
334
335
336 SrcSink = namedtuple("SrcSink", ["src", "sink", "linkMerge", "message"])
337
338
339 def check_all_types(
340 src_dict: Dict[str, CWLObjectType],
341 sinks: MutableSequence[CWLObjectType],
342 sourceField: str,
343 param_to_step: Dict[str, CWLObjectType],
344 ) -> Dict[str, List[SrcSink]]:
345 """
346 Given a list of sinks, check if their types match with the types of their sources.
347
348 sourceField is either "soure" or "outputSource"
349 """
350 validation = {"warning": [], "exception": []} # type: Dict[str, List[SrcSink]]
351 for sink in sinks:
352 if sourceField in sink:
353
354 valueFrom = cast(Optional[str], sink.get("valueFrom"))
355 pickValue = cast(Optional[str], sink.get("pickValue"))
356
357 extra_message = None
358 if pickValue is not None:
359 extra_message = "pickValue is: %s" % pickValue
360
361 if isinstance(sink[sourceField], MutableSequence):
362 linkMerge = cast(
363 Optional[str],
364 sink.get(
365 "linkMerge",
366 (
367 "merge_nested"
368 if len(cast(Sized, sink[sourceField])) > 1
369 else None
370 ),
371 ),
372 ) # type: Optional[str]
373
374 if pickValue in ["first_non_null", "the_only_non_null"]:
375 linkMerge = None
376
377 srcs_of_sink = [] # type: List[CWLObjectType]
378 for parm_id in cast(MutableSequence[str], sink[sourceField]):
379 srcs_of_sink += [src_dict[parm_id]]
380 if (
381 is_conditional_step(param_to_step, parm_id)
382 and pickValue is None
383 ):
384 validation["warning"].append(
385 SrcSink(
386 src_dict[parm_id],
387 sink,
388 linkMerge,
389 message="Source is from conditional step, but pickValue is not used",
390 )
391 )
392 else:
393 parm_id = cast(str, sink[sourceField])
394 srcs_of_sink = [src_dict[parm_id]]
395 linkMerge = None
396
397 if pickValue is not None:
398 validation["warning"].append(
399 SrcSink(
400 src_dict[parm_id],
401 sink,
402 linkMerge,
403 message="pickValue is used but only a single input source is declared",
404 )
405 )
406
407 if is_conditional_step(param_to_step, parm_id):
408 src_typ = aslist(srcs_of_sink[0]["type"])
409 snk_typ = sink["type"]
410
411 if "null" not in src_typ:
412 src_typ = ["null"] + cast(List[Any], src_typ)
413
414 if "null" not in cast(
415 Union[List[str], CWLObjectType], snk_typ
416 ): # Given our type names this works even if not a list
417 validation["warning"].append(
418 SrcSink(
419 src_dict[parm_id],
420 sink,
421 linkMerge,
422 message="Source is from conditional step and may produce `null`",
423 )
424 )
425
426 srcs_of_sink[0]["type"] = src_typ
427
428 for src in srcs_of_sink:
429 check_result = check_types(src, sink, linkMerge, valueFrom)
430 if check_result == "warning":
431 validation["warning"].append(
432 SrcSink(src, sink, linkMerge, message=extra_message)
433 )
434 elif check_result == "exception":
435 validation["exception"].append(
436 SrcSink(src, sink, linkMerge, message=extra_message)
437 )
438
439 return validation
440
441
442 def is_conditional_step(param_to_step: Dict[str, CWLObjectType], parm_id: str) -> bool:
443 source_step = param_to_step.get(parm_id)
444 if source_step is not None:
445 if source_step.get("when") is not None:
446 return True
447 return False