Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/cwltool/checker.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
| author | shellac |
|---|---|
| date | Mon, 22 Mar 2021 18:12:50 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:4f3585e2f14b |
|---|---|
| 1 """Static checking of CWL workflow connectivity.""" | |
| 2 from collections import namedtuple | |
| 3 from typing import ( | |
| 4 Any, | |
| 5 Dict, | |
| 6 List, | |
| 7 MutableMapping, | |
| 8 MutableSequence, | |
| 9 Optional, | |
| 10 Sized, | |
| 11 Union, | |
| 12 cast, | |
| 13 ) | |
| 14 | |
| 15 from schema_salad.exceptions import ValidationException | |
| 16 from schema_salad.sourceline import SourceLine, bullets, strip_dup_lineno | |
| 17 from schema_salad.utils import json_dumps | |
| 18 | |
| 19 from .errors import WorkflowException | |
| 20 from .loghandler import _logger | |
| 21 from .process import shortname | |
| 22 from .utils import CWLObjectType, CWLOutputAtomType, CWLOutputType, SinkType, aslist | |
| 23 | |
| 24 | |
| 25 def _get_type(tp): | |
| 26 # type: (Any) -> Any | |
| 27 if isinstance(tp, MutableMapping): | |
| 28 if tp.get("type") not in ("array", "record", "enum"): | |
| 29 return tp["type"] | |
| 30 return tp | |
| 31 | |
| 32 | |
| 33 def check_types( | |
| 34 srctype: SinkType, | |
| 35 sinktype: SinkType, | |
| 36 linkMerge: Optional[str], | |
| 37 valueFrom: Optional[str], | |
| 38 ) -> str: | |
| 39 """ | |
| 40 Check if the source and sink types are correct. | |
| 41 | |
| 42 Acceptable types are "pass", "warning", or "exception". | |
| 43 """ | |
| 44 if valueFrom is not None: | |
| 45 return "pass" | |
| 46 if linkMerge is None: | |
| 47 if can_assign_src_to_sink(srctype, sinktype, strict=True): | |
| 48 return "pass" | |
| 49 if can_assign_src_to_sink(srctype, sinktype, strict=False): | |
| 50 return "warning" | |
| 51 return "exception" | |
| 52 if linkMerge == "merge_nested": | |
| 53 return check_types( | |
| 54 {"items": _get_type(srctype), "type": "array"}, | |
| 55 _get_type(sinktype), | |
| 56 None, | |
| 57 None, | |
| 58 ) | |
| 59 if linkMerge == "merge_flattened": | |
| 60 return check_types( | |
| 61 merge_flatten_type(_get_type(srctype)), _get_type(sinktype), None, None | |
| 62 ) | |
| 63 raise WorkflowException(f"Unrecognized linkMerge enum '{linkMerge}'") | |
| 64 | |
| 65 | |
| 66 def merge_flatten_type(src: SinkType) -> CWLOutputType: | |
| 67 """Return the merge flattened type of the source type.""" | |
| 68 if isinstance(src, MutableSequence): | |
| 69 return [merge_flatten_type(cast(SinkType, t)) for t in src] | |
| 70 if isinstance(src, MutableMapping) and src.get("type") == "array": | |
| 71 return src | |
| 72 return {"items": src, "type": "array"} | |
| 73 | |
| 74 | |
| 75 def can_assign_src_to_sink( | |
| 76 src: SinkType, sink: Optional[SinkType], strict: bool = False | |
| 77 ) -> bool: | |
| 78 """ | |
| 79 Check for identical type specifications, ignoring extra keys like inputBinding. | |
| 80 | |
| 81 src: admissible source types | |
| 82 sink: admissible sink types | |
| 83 | |
| 84 In non-strict comparison, at least one source type must match one sink type. | |
| 85 In strict comparison, all source types must match at least one sink type. | |
| 86 """ | |
| 87 if src == "Any" or sink == "Any": | |
| 88 return True | |
| 89 if isinstance(src, MutableMapping) and isinstance(sink, MutableMapping): | |
| 90 if sink.get("not_connected") and strict: | |
| 91 return False | |
| 92 if src["type"] == "array" and sink["type"] == "array": | |
| 93 return can_assign_src_to_sink( | |
| 94 cast(MutableSequence[CWLOutputAtomType], src["items"]), | |
| 95 cast(MutableSequence[CWLOutputAtomType], sink["items"]), | |
| 96 strict, | |
| 97 ) | |
| 98 if src["type"] == "record" and sink["type"] == "record": | |
| 99 return _compare_records(src, sink, strict) | |
| 100 if src["type"] == "File" and sink["type"] == "File": | |
| 101 for sinksf in cast(List[CWLObjectType], sink.get("secondaryFiles", [])): | |
| 102 if not [ | |
| 103 1 | |
| 104 for srcsf in cast( | |
| 105 List[CWLObjectType], src.get("secondaryFiles", []) | |
| 106 ) | |
| 107 if sinksf == srcsf | |
| 108 ]: | |
| 109 if strict: | |
| 110 return False | |
| 111 return True | |
| 112 return can_assign_src_to_sink( | |
| 113 cast(SinkType, src["type"]), cast(Optional[SinkType], sink["type"]), strict | |
| 114 ) | |
| 115 if isinstance(src, MutableSequence): | |
| 116 if strict: | |
| 117 for this_src in src: | |
| 118 if not can_assign_src_to_sink(cast(SinkType, this_src), sink): | |
| 119 return False | |
| 120 return True | |
| 121 for this_src in src: | |
| 122 if can_assign_src_to_sink(cast(SinkType, this_src), sink): | |
| 123 return True | |
| 124 return False | |
| 125 if isinstance(sink, MutableSequence): | |
| 126 for this_sink in sink: | |
| 127 if can_assign_src_to_sink(src, cast(SinkType, this_sink)): | |
| 128 return True | |
| 129 return False | |
| 130 return bool(src == sink) | |
| 131 | |
| 132 | |
| 133 def _compare_records( | |
| 134 src: CWLObjectType, sink: CWLObjectType, strict: bool = False | |
| 135 ) -> bool: | |
| 136 """ | |
| 137 Compare two records, ensuring they have compatible fields. | |
| 138 | |
| 139 This handles normalizing record names, which will be relative to workflow | |
| 140 step, so that they can be compared. | |
| 141 """ | |
| 142 | |
| 143 def _rec_fields( | |
| 144 rec, | |
| 145 ): # type: (MutableMapping[str, Any]) -> MutableMapping[str, Any] | |
| 146 out = {} | |
| 147 for field in rec["fields"]: | |
| 148 name = shortname(field["name"]) | |
| 149 out[name] = field["type"] | |
| 150 return out | |
| 151 | |
| 152 srcfields = _rec_fields(src) | |
| 153 sinkfields = _rec_fields(sink) | |
| 154 for key in sinkfields.keys(): | |
| 155 if ( | |
| 156 not can_assign_src_to_sink( | |
| 157 srcfields.get(key, "null"), sinkfields.get(key, "null"), strict | |
| 158 ) | |
| 159 and sinkfields.get(key) is not None | |
| 160 ): | |
| 161 _logger.info( | |
| 162 "Record comparison failure for %s and %s\n" | |
| 163 "Did not match fields for %s: %s and %s", | |
| 164 src["name"], | |
| 165 sink["name"], | |
| 166 key, | |
| 167 srcfields.get(key), | |
| 168 sinkfields.get(key), | |
| 169 ) | |
| 170 return False | |
| 171 return True | |
| 172 | |
| 173 | |
| 174 def missing_subset(fullset: List[Any], subset: List[Any]) -> List[Any]: | |
| 175 missing = [] | |
| 176 for i in subset: | |
| 177 if i not in fullset: | |
| 178 missing.append(i) | |
| 179 return missing | |
| 180 | |
| 181 | |
| 182 def static_checker( | |
| 183 workflow_inputs: List[CWLObjectType], | |
| 184 workflow_outputs: MutableSequence[CWLObjectType], | |
| 185 step_inputs: MutableSequence[CWLObjectType], | |
| 186 step_outputs: List[CWLObjectType], | |
| 187 param_to_step: Dict[str, CWLObjectType], | |
| 188 ) -> None: | |
| 189 """Check if all source and sink types of a workflow are compatible before run time.""" | |
| 190 # source parameters: workflow_inputs and step_outputs | |
| 191 # sink parameters: step_inputs and workflow_outputs | |
| 192 | |
| 193 # make a dictionary of source parameters, indexed by the "id" field | |
| 194 src_parms = workflow_inputs + step_outputs | |
| 195 src_dict = {} # type: Dict[str, CWLObjectType] | |
| 196 for parm in src_parms: | |
| 197 src_dict[cast(str, parm["id"])] = parm | |
| 198 | |
| 199 step_inputs_val = check_all_types(src_dict, step_inputs, "source", param_to_step) | |
| 200 workflow_outputs_val = check_all_types( | |
| 201 src_dict, workflow_outputs, "outputSource", param_to_step | |
| 202 ) | |
| 203 | |
| 204 warnings = step_inputs_val["warning"] + workflow_outputs_val["warning"] | |
| 205 exceptions = step_inputs_val["exception"] + workflow_outputs_val["exception"] | |
| 206 | |
| 207 warning_msgs = [] | |
| 208 exception_msgs = [] | |
| 209 for warning in warnings: | |
| 210 src = warning.src | |
| 211 sink = warning.sink | |
| 212 linkMerge = warning.linkMerge | |
| 213 sinksf = sorted( | |
| 214 [ | |
| 215 p["pattern"] | |
| 216 for p in sink.get("secondaryFiles", []) | |
| 217 if p.get("required", True) | |
| 218 ] | |
| 219 ) | |
| 220 srcsf = sorted([p["pattern"] for p in src.get("secondaryFiles", [])]) | |
| 221 # Every secondaryFile required by the sink, should be declared | |
| 222 # by the source | |
| 223 missing = missing_subset(srcsf, sinksf) | |
| 224 if missing: | |
| 225 msg1 = "Parameter '{}' requires secondaryFiles {} but".format( | |
| 226 shortname(sink["id"]), | |
| 227 missing, | |
| 228 ) | |
| 229 msg3 = SourceLine(src, "id").makeError( | |
| 230 "source '%s' does not provide those secondaryFiles." | |
| 231 % (shortname(src["id"])) | |
| 232 ) | |
| 233 msg4 = SourceLine(src.get("_tool_entry", src), "secondaryFiles").makeError( | |
| 234 "To resolve, add missing secondaryFiles patterns to definition of '%s' or" | |
| 235 % (shortname(src["id"])) | |
| 236 ) | |
| 237 msg5 = SourceLine( | |
| 238 sink.get("_tool_entry", sink), "secondaryFiles" | |
| 239 ).makeError( | |
| 240 "mark missing secondaryFiles in definition of '%s' as optional." | |
| 241 % shortname(sink["id"]) | |
| 242 ) | |
| 243 msg = SourceLine(sink).makeError( | |
| 244 "{}\n{}".format(msg1, bullets([msg3, msg4, msg5], " ")) | |
| 245 ) | |
| 246 elif sink.get("not_connected"): | |
| 247 if not sink.get("used_by_step"): | |
| 248 msg = SourceLine(sink, "type").makeError( | |
| 249 "'%s' is not an input parameter of %s, expected %s" | |
| 250 % ( | |
| 251 shortname(sink["id"]), | |
| 252 param_to_step[sink["id"]]["run"], | |
| 253 ", ".join( | |
| 254 shortname(cast(str, s["id"])) | |
| 255 for s in cast( | |
| 256 List[Dict[str, Union[str, bool]]], | |
| 257 param_to_step[sink["id"]]["inputs"], | |
| 258 ) | |
| 259 if not s.get("not_connected") | |
| 260 ), | |
| 261 ) | |
| 262 ) | |
| 263 else: | |
| 264 msg = "" | |
| 265 else: | |
| 266 msg = ( | |
| 267 SourceLine(src, "type").makeError( | |
| 268 "Source '%s' of type %s may be incompatible" | |
| 269 % (shortname(src["id"]), json_dumps(src["type"])) | |
| 270 ) | |
| 271 + "\n" | |
| 272 + SourceLine(sink, "type").makeError( | |
| 273 " with sink '%s' of type %s" | |
| 274 % (shortname(sink["id"]), json_dumps(sink["type"])) | |
| 275 ) | |
| 276 ) | |
| 277 if linkMerge is not None: | |
| 278 msg += "\n" + SourceLine(sink).makeError( | |
| 279 " source has linkMerge method %s" % linkMerge | |
| 280 ) | |
| 281 | |
| 282 if warning.message is not None: | |
| 283 msg += "\n" + SourceLine(sink).makeError(" " + warning.message) | |
| 284 | |
| 285 if msg: | |
| 286 warning_msgs.append(msg) | |
| 287 | |
| 288 for exception in exceptions: | |
| 289 src = exception.src | |
| 290 sink = exception.sink | |
| 291 linkMerge = exception.linkMerge | |
| 292 extra_message = exception.message | |
| 293 msg = ( | |
| 294 SourceLine(src, "type").makeError( | |
| 295 "Source '%s' of type %s is incompatible" | |
| 296 % (shortname(src["id"]), json_dumps(src["type"])) | |
| 297 ) | |
| 298 + "\n" | |
| 299 + SourceLine(sink, "type").makeError( | |
| 300 " with sink '%s' of type %s" | |
| 301 % (shortname(sink["id"]), json_dumps(sink["type"])) | |
| 302 ) | |
| 303 ) | |
| 304 if extra_message is not None: | |
| 305 msg += "\n" + SourceLine(sink).makeError(" " + extra_message) | |
| 306 | |
| 307 if linkMerge is not None: | |
| 308 msg += "\n" + SourceLine(sink).makeError( | |
| 309 " source has linkMerge method %s" % linkMerge | |
| 310 ) | |
| 311 exception_msgs.append(msg) | |
| 312 | |
| 313 for sink in step_inputs: | |
| 314 if ( | |
| 315 "null" != sink["type"] | |
| 316 and "null" not in sink["type"] | |
| 317 and "source" not in sink | |
| 318 and "default" not in sink | |
| 319 and "valueFrom" not in sink | |
| 320 ): | |
| 321 msg = SourceLine(sink).makeError( | |
| 322 "Required parameter '%s' does not have source, default, or valueFrom expression" | |
| 323 % shortname(sink["id"]) | |
| 324 ) | |
| 325 exception_msgs.append(msg) | |
| 326 | |
| 327 all_warning_msg = strip_dup_lineno("\n".join(warning_msgs)) | |
| 328 all_exception_msg = strip_dup_lineno("\n" + "\n".join(exception_msgs)) | |
| 329 | |
| 330 if all_warning_msg: | |
| 331 _logger.warning("Workflow checker warning:\n%s", all_warning_msg) | |
| 332 if exceptions: | |
| 333 raise ValidationException(all_exception_msg) | |
| 334 | |
| 335 | |
| 336 SrcSink = namedtuple("SrcSink", ["src", "sink", "linkMerge", "message"]) | |
| 337 | |
| 338 | |
| 339 def check_all_types( | |
| 340 src_dict: Dict[str, CWLObjectType], | |
| 341 sinks: MutableSequence[CWLObjectType], | |
| 342 sourceField: str, | |
| 343 param_to_step: Dict[str, CWLObjectType], | |
| 344 ) -> Dict[str, List[SrcSink]]: | |
| 345 """ | |
| 346 Given a list of sinks, check if their types match with the types of their sources. | |
| 347 | |
| 348 sourceField is either "soure" or "outputSource" | |
| 349 """ | |
| 350 validation = {"warning": [], "exception": []} # type: Dict[str, List[SrcSink]] | |
| 351 for sink in sinks: | |
| 352 if sourceField in sink: | |
| 353 | |
| 354 valueFrom = cast(Optional[str], sink.get("valueFrom")) | |
| 355 pickValue = cast(Optional[str], sink.get("pickValue")) | |
| 356 | |
| 357 extra_message = None | |
| 358 if pickValue is not None: | |
| 359 extra_message = "pickValue is: %s" % pickValue | |
| 360 | |
| 361 if isinstance(sink[sourceField], MutableSequence): | |
| 362 linkMerge = cast( | |
| 363 Optional[str], | |
| 364 sink.get( | |
| 365 "linkMerge", | |
| 366 ( | |
| 367 "merge_nested" | |
| 368 if len(cast(Sized, sink[sourceField])) > 1 | |
| 369 else None | |
| 370 ), | |
| 371 ), | |
| 372 ) # type: Optional[str] | |
| 373 | |
| 374 if pickValue in ["first_non_null", "the_only_non_null"]: | |
| 375 linkMerge = None | |
| 376 | |
| 377 srcs_of_sink = [] # type: List[CWLObjectType] | |
| 378 for parm_id in cast(MutableSequence[str], sink[sourceField]): | |
| 379 srcs_of_sink += [src_dict[parm_id]] | |
| 380 if ( | |
| 381 is_conditional_step(param_to_step, parm_id) | |
| 382 and pickValue is None | |
| 383 ): | |
| 384 validation["warning"].append( | |
| 385 SrcSink( | |
| 386 src_dict[parm_id], | |
| 387 sink, | |
| 388 linkMerge, | |
| 389 message="Source is from conditional step, but pickValue is not used", | |
| 390 ) | |
| 391 ) | |
| 392 else: | |
| 393 parm_id = cast(str, sink[sourceField]) | |
| 394 srcs_of_sink = [src_dict[parm_id]] | |
| 395 linkMerge = None | |
| 396 | |
| 397 if pickValue is not None: | |
| 398 validation["warning"].append( | |
| 399 SrcSink( | |
| 400 src_dict[parm_id], | |
| 401 sink, | |
| 402 linkMerge, | |
| 403 message="pickValue is used but only a single input source is declared", | |
| 404 ) | |
| 405 ) | |
| 406 | |
| 407 if is_conditional_step(param_to_step, parm_id): | |
| 408 src_typ = aslist(srcs_of_sink[0]["type"]) | |
| 409 snk_typ = sink["type"] | |
| 410 | |
| 411 if "null" not in src_typ: | |
| 412 src_typ = ["null"] + cast(List[Any], src_typ) | |
| 413 | |
| 414 if "null" not in cast( | |
| 415 Union[List[str], CWLObjectType], snk_typ | |
| 416 ): # Given our type names this works even if not a list | |
| 417 validation["warning"].append( | |
| 418 SrcSink( | |
| 419 src_dict[parm_id], | |
| 420 sink, | |
| 421 linkMerge, | |
| 422 message="Source is from conditional step and may produce `null`", | |
| 423 ) | |
| 424 ) | |
| 425 | |
| 426 srcs_of_sink[0]["type"] = src_typ | |
| 427 | |
| 428 for src in srcs_of_sink: | |
| 429 check_result = check_types(src, sink, linkMerge, valueFrom) | |
| 430 if check_result == "warning": | |
| 431 validation["warning"].append( | |
| 432 SrcSink(src, sink, linkMerge, message=extra_message) | |
| 433 ) | |
| 434 elif check_result == "exception": | |
| 435 validation["exception"].append( | |
| 436 SrcSink(src, sink, linkMerge, message=extra_message) | |
| 437 ) | |
| 438 | |
| 439 return validation | |
| 440 | |
| 441 | |
| 442 def is_conditional_step(param_to_step: Dict[str, CWLObjectType], parm_id: str) -> bool: | |
| 443 source_step = param_to_step.get(parm_id) | |
| 444 if source_step is not None: | |
| 445 if source_step.get("when") is not None: | |
| 446 return True | |
| 447 return False |
