Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/cwltool/load_tool.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
| author | shellac |
|---|---|
| date | Mon, 22 Mar 2021 18:12:50 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:4f3585e2f14b |
|---|---|
| 1 """Loads a CWL document.""" | |
| 2 | |
| 3 import hashlib | |
| 4 import logging | |
| 5 import os | |
| 6 import re | |
| 7 import urllib | |
| 8 import uuid | |
| 9 from typing import ( | |
| 10 Any, | |
| 11 Dict, | |
| 12 List, | |
| 13 MutableMapping, | |
| 14 MutableSequence, | |
| 15 Optional, | |
| 16 Tuple, | |
| 17 Union, | |
| 18 cast, | |
| 19 ) | |
| 20 | |
| 21 from ruamel.yaml.comments import CommentedMap, CommentedSeq | |
| 22 from schema_salad.exceptions import ValidationException | |
| 23 from schema_salad.ref_resolver import Loader, file_uri | |
| 24 from schema_salad.schema import validate_doc | |
| 25 from schema_salad.sourceline import SourceLine, cmap | |
| 26 from schema_salad.utils import ( | |
| 27 ContextType, | |
| 28 FetcherCallableType, | |
| 29 IdxResultType, | |
| 30 ResolveType, | |
| 31 json_dumps, | |
| 32 ) | |
| 33 | |
| 34 from . import CWL_CONTENT_TYPES, process, update | |
| 35 from .context import LoadingContext | |
| 36 from .errors import WorkflowException | |
| 37 from .loghandler import _logger | |
| 38 from .process import Process, get_schema, shortname | |
| 39 from .update import ALLUPDATES | |
| 40 from .utils import CWLObjectType, ResolverType, visit_class | |
| 41 | |
| 42 jobloaderctx = { | |
| 43 "cwl": "https://w3id.org/cwl/cwl#", | |
| 44 "cwltool": "http://commonwl.org/cwltool#", | |
| 45 "path": {"@type": "@id"}, | |
| 46 "location": {"@type": "@id"}, | |
| 47 "id": "@id", | |
| 48 } # type: ContextType | |
| 49 | |
| 50 | |
| 51 overrides_ctx = { | |
| 52 "overrideTarget": {"@type": "@id"}, | |
| 53 "cwltool": "http://commonwl.org/cwltool#", | |
| 54 "http://commonwl.org/cwltool#overrides": { | |
| 55 "@id": "cwltool:overrides", | |
| 56 "mapSubject": "overrideTarget", | |
| 57 }, | |
| 58 "requirements": { | |
| 59 "@id": "https://w3id.org/cwl/cwl#requirements", | |
| 60 "mapSubject": "class", | |
| 61 }, | |
| 62 } # type: ContextType | |
| 63 | |
| 64 | |
| 65 def default_loader( | |
| 66 fetcher_constructor: Optional[FetcherCallableType] = None, | |
| 67 enable_dev: bool = False, | |
| 68 doc_cache: bool = True, | |
| 69 ) -> Loader: | |
| 70 return Loader( | |
| 71 jobloaderctx, | |
| 72 fetcher_constructor=fetcher_constructor, | |
| 73 allow_attachments=lambda r: enable_dev, | |
| 74 doc_cache=doc_cache, | |
| 75 ) | |
| 76 | |
| 77 | |
| 78 def resolve_tool_uri( | |
| 79 argsworkflow: str, | |
| 80 resolver: Optional[ResolverType] = None, | |
| 81 fetcher_constructor: Optional[FetcherCallableType] = None, | |
| 82 document_loader: Optional[Loader] = None, | |
| 83 ) -> Tuple[str, str]: | |
| 84 | |
| 85 uri = None # type: Optional[str] | |
| 86 split = urllib.parse.urlsplit(argsworkflow) | |
| 87 # In case of Windows path, urlsplit misjudge Drive letters as scheme, here we are skipping that | |
| 88 if split.scheme and split.scheme in ["http", "https", "file"]: | |
| 89 uri = argsworkflow | |
| 90 elif os.path.exists(os.path.abspath(argsworkflow)): | |
| 91 uri = file_uri(str(os.path.abspath(argsworkflow))) | |
| 92 elif resolver is not None: | |
| 93 uri = resolver( | |
| 94 document_loader or default_loader(fetcher_constructor), argsworkflow | |
| 95 ) | |
| 96 | |
| 97 if uri is None: | |
| 98 raise ValidationException("Not found: '%s'" % argsworkflow) | |
| 99 | |
| 100 if argsworkflow != uri: | |
| 101 _logger.info("Resolved '%s' to '%s'", argsworkflow, uri) | |
| 102 | |
| 103 fileuri = urllib.parse.urldefrag(uri)[0] | |
| 104 return uri, fileuri | |
| 105 | |
| 106 | |
| 107 def fetch_document( | |
| 108 argsworkflow: Union[str, CWLObjectType], | |
| 109 loadingContext: Optional[LoadingContext] = None, | |
| 110 ) -> Tuple[LoadingContext, CommentedMap, str]: | |
| 111 """Retrieve a CWL document.""" | |
| 112 if loadingContext is None: | |
| 113 loadingContext = LoadingContext() | |
| 114 loadingContext.loader = default_loader() | |
| 115 else: | |
| 116 loadingContext = loadingContext.copy() | |
| 117 if loadingContext.loader is None: | |
| 118 loadingContext.loader = default_loader( | |
| 119 loadingContext.fetcher_constructor, | |
| 120 enable_dev=loadingContext.enable_dev, | |
| 121 doc_cache=loadingContext.doc_cache, | |
| 122 ) | |
| 123 | |
| 124 if isinstance(argsworkflow, str): | |
| 125 uri, fileuri = resolve_tool_uri( | |
| 126 argsworkflow, | |
| 127 resolver=loadingContext.resolver, | |
| 128 document_loader=loadingContext.loader, | |
| 129 ) | |
| 130 workflowobj = cast( | |
| 131 CommentedMap, | |
| 132 loadingContext.loader.fetch(fileuri, content_types=CWL_CONTENT_TYPES), | |
| 133 ) | |
| 134 return loadingContext, workflowobj, uri | |
| 135 if isinstance(argsworkflow, MutableMapping): | |
| 136 uri = ( | |
| 137 cast(str, argsworkflow["id"]) | |
| 138 if argsworkflow.get("id") | |
| 139 else "_:" + str(uuid.uuid4()) | |
| 140 ) | |
| 141 workflowobj = cast( | |
| 142 CommentedMap, cmap(cast(Dict[str, Any], argsworkflow), fn=uri) | |
| 143 ) | |
| 144 loadingContext.loader.idx[uri] = workflowobj | |
| 145 return loadingContext, workflowobj, uri | |
| 146 raise ValidationException("Must be URI or object: '%s'" % argsworkflow) | |
| 147 | |
| 148 | |
| 149 def _convert_stdstreams_to_files( | |
| 150 workflowobj: Union[ | |
| 151 CWLObjectType, MutableSequence[Union[CWLObjectType, str, int]], str | |
| 152 ] | |
| 153 ) -> None: | |
| 154 if isinstance(workflowobj, MutableMapping): | |
| 155 if workflowobj.get("class") == "CommandLineTool": | |
| 156 with SourceLine( | |
| 157 workflowobj, | |
| 158 "outputs", | |
| 159 ValidationException, | |
| 160 _logger.isEnabledFor(logging.DEBUG), | |
| 161 ): | |
| 162 outputs = workflowobj.get("outputs", []) | |
| 163 if not isinstance(outputs, CommentedSeq): | |
| 164 raise ValidationException('"outputs" section is not ' "valid.") | |
| 165 for out in cast( | |
| 166 MutableSequence[CWLObjectType], workflowobj.get("outputs", []) | |
| 167 ): | |
| 168 if not isinstance(out, CommentedMap): | |
| 169 raise ValidationException( | |
| 170 f"Output '{out}' is not a valid OutputParameter." | |
| 171 ) | |
| 172 for streamtype in ["stdout", "stderr"]: | |
| 173 if out.get("type") == streamtype: | |
| 174 if "outputBinding" in out: | |
| 175 raise ValidationException( | |
| 176 "Not allowed to specify outputBinding when" | |
| 177 " using %s shortcut." % streamtype | |
| 178 ) | |
| 179 if streamtype in workflowobj: | |
| 180 filename = workflowobj[streamtype] | |
| 181 else: | |
| 182 filename = str( | |
| 183 hashlib.sha1( # nosec | |
| 184 json_dumps(workflowobj, sort_keys=True).encode( | |
| 185 "utf-8" | |
| 186 ) | |
| 187 ).hexdigest() | |
| 188 ) | |
| 189 workflowobj[streamtype] = filename | |
| 190 out["type"] = "File" | |
| 191 out["outputBinding"] = cmap({"glob": filename}) | |
| 192 for inp in cast( | |
| 193 MutableSequence[CWLObjectType], workflowobj.get("inputs", []) | |
| 194 ): | |
| 195 if inp.get("type") == "stdin": | |
| 196 if "inputBinding" in inp: | |
| 197 raise ValidationException( | |
| 198 "Not allowed to specify inputBinding when" | |
| 199 " using stdin shortcut." | |
| 200 ) | |
| 201 if "stdin" in workflowobj: | |
| 202 raise ValidationException( | |
| 203 "Not allowed to specify stdin path when" | |
| 204 " using stdin type shortcut." | |
| 205 ) | |
| 206 else: | |
| 207 workflowobj["stdin"] = ( | |
| 208 "$(inputs.%s.path)" | |
| 209 % cast(str, inp["id"]).rpartition("#")[2] | |
| 210 ) | |
| 211 inp["type"] = "File" | |
| 212 else: | |
| 213 for entry in workflowobj.values(): | |
| 214 _convert_stdstreams_to_files( | |
| 215 cast( | |
| 216 Union[ | |
| 217 CWLObjectType, | |
| 218 MutableSequence[Union[CWLObjectType, str, int]], | |
| 219 str, | |
| 220 ], | |
| 221 entry, | |
| 222 ) | |
| 223 ) | |
| 224 if isinstance(workflowobj, MutableSequence): | |
| 225 for entry in workflowobj: | |
| 226 _convert_stdstreams_to_files( | |
| 227 cast( | |
| 228 Union[ | |
| 229 CWLObjectType, | |
| 230 MutableSequence[Union[CWLObjectType, str, int]], | |
| 231 str, | |
| 232 ], | |
| 233 entry, | |
| 234 ) | |
| 235 ) | |
| 236 | |
| 237 | |
| 238 def _add_blank_ids( | |
| 239 workflowobj: Union[CWLObjectType, MutableSequence[Union[CWLObjectType, str]]] | |
| 240 ) -> None: | |
| 241 if isinstance(workflowobj, MutableMapping): | |
| 242 if ( | |
| 243 "run" in workflowobj | |
| 244 and isinstance(workflowobj["run"], MutableMapping) | |
| 245 and "id" not in workflowobj["run"] | |
| 246 and "$import" not in workflowobj["run"] | |
| 247 ): | |
| 248 workflowobj["run"]["id"] = str(uuid.uuid4()) | |
| 249 for entry in workflowobj.values(): | |
| 250 _add_blank_ids( | |
| 251 cast( | |
| 252 Union[CWLObjectType, MutableSequence[Union[CWLObjectType, str]]], | |
| 253 entry, | |
| 254 ) | |
| 255 ) | |
| 256 if isinstance(workflowobj, MutableSequence): | |
| 257 for entry in workflowobj: | |
| 258 _add_blank_ids( | |
| 259 cast( | |
| 260 Union[CWLObjectType, MutableSequence[Union[CWLObjectType, str]]], | |
| 261 entry, | |
| 262 ) | |
| 263 ) | |
| 264 | |
| 265 | |
| 266 def resolve_and_validate_document( | |
| 267 loadingContext: LoadingContext, | |
| 268 workflowobj: Union[CommentedMap, CommentedSeq], | |
| 269 uri: str, | |
| 270 preprocess_only: bool = False, | |
| 271 skip_schemas: Optional[bool] = None, | |
| 272 ) -> Tuple[LoadingContext, str]: | |
| 273 """Validate a CWL document.""" | |
| 274 if not loadingContext.loader: | |
| 275 raise ValueError("loadingContext must have a loader.") | |
| 276 else: | |
| 277 loader = loadingContext.loader | |
| 278 loadingContext = loadingContext.copy() | |
| 279 | |
| 280 if not isinstance(workflowobj, MutableMapping): | |
| 281 raise ValueError( | |
| 282 "workflowjobj must be a dict, got '{}': {}".format( | |
| 283 type(workflowobj), workflowobj | |
| 284 ) | |
| 285 ) | |
| 286 | |
| 287 jobobj = None | |
| 288 if "cwl:tool" in workflowobj: | |
| 289 jobobj, _ = loader.resolve_all(workflowobj, uri) | |
| 290 uri = urllib.parse.urljoin(uri, workflowobj["https://w3id.org/cwl/cwl#tool"]) | |
| 291 del cast(Dict[str, Any], jobobj)["https://w3id.org/cwl/cwl#tool"] | |
| 292 | |
| 293 workflowobj = fetch_document(uri, loadingContext)[1] | |
| 294 | |
| 295 fileuri = urllib.parse.urldefrag(uri)[0] | |
| 296 | |
| 297 cwlVersion = loadingContext.metadata.get("cwlVersion") | |
| 298 if not cwlVersion: | |
| 299 cwlVersion = workflowobj.get("cwlVersion") | |
| 300 if not cwlVersion and fileuri != uri: | |
| 301 # The tool we're loading is a fragment of a bigger file. Get | |
| 302 # the document root element and look for cwlVersion there. | |
| 303 metadata = cast(CWLObjectType, fetch_document(fileuri, loadingContext)[1]) | |
| 304 cwlVersion = cast(str, metadata.get("cwlVersion")) | |
| 305 if not cwlVersion: | |
| 306 raise ValidationException( | |
| 307 "No cwlVersion found. " | |
| 308 "Use the following syntax in your CWL document to declare " | |
| 309 "the version: cwlVersion: <version>.\n" | |
| 310 "Note: if this is a CWL draft-2 (pre v1.0) document then it " | |
| 311 "will need to be upgraded first." | |
| 312 ) | |
| 313 | |
| 314 if not isinstance(cwlVersion, str): | |
| 315 with SourceLine(workflowobj, "cwlVersion", ValidationException): | |
| 316 raise ValidationException( | |
| 317 "'cwlVersion' must be a string, got {}".format(type(cwlVersion)) | |
| 318 ) | |
| 319 # strip out version | |
| 320 cwlVersion = re.sub(r"^(?:cwl:|https://w3id.org/cwl/cwl#)", "", cwlVersion) | |
| 321 if cwlVersion not in list(ALLUPDATES): | |
| 322 # print out all the Supported Versions of cwlVersion | |
| 323 versions = [] | |
| 324 for version in list(ALLUPDATES): | |
| 325 if "dev" in version: | |
| 326 version += " (with --enable-dev flag only)" | |
| 327 versions.append(version) | |
| 328 versions.sort() | |
| 329 raise ValidationException( | |
| 330 "The CWL reference runner no longer supports pre CWL v1.0 " | |
| 331 "documents. Supported versions are: " | |
| 332 "\n{}".format("\n".join(versions)) | |
| 333 ) | |
| 334 | |
| 335 if ( | |
| 336 isinstance(jobobj, CommentedMap) | |
| 337 and "http://commonwl.org/cwltool#overrides" in jobobj | |
| 338 ): | |
| 339 loadingContext.overrides_list.extend(resolve_overrides(jobobj, uri, uri)) | |
| 340 del jobobj["http://commonwl.org/cwltool#overrides"] | |
| 341 | |
| 342 if ( | |
| 343 isinstance(jobobj, CommentedMap) | |
| 344 and "https://w3id.org/cwl/cwl#requirements" in jobobj | |
| 345 ): | |
| 346 if cwlVersion not in ("v1.1.0-dev1", "v1.1"): | |
| 347 raise ValidationException( | |
| 348 "`cwl:requirements` in the input object is not part of CWL " | |
| 349 "v1.0. You can adjust to use `cwltool:overrides` instead; or you " | |
| 350 "can set the cwlVersion to v1.1 or greater." | |
| 351 ) | |
| 352 loadingContext.overrides_list.append( | |
| 353 { | |
| 354 "overrideTarget": uri, | |
| 355 "requirements": jobobj["https://w3id.org/cwl/cwl#requirements"], | |
| 356 } | |
| 357 ) | |
| 358 del jobobj["https://w3id.org/cwl/cwl#requirements"] | |
| 359 | |
| 360 (sch_document_loader, avsc_names) = process.get_schema(cwlVersion)[:2] | |
| 361 | |
| 362 if isinstance(avsc_names, Exception): | |
| 363 raise avsc_names | |
| 364 | |
| 365 processobj = None # type: Optional[ResolveType] | |
| 366 document_loader = Loader( | |
| 367 sch_document_loader.ctx, | |
| 368 schemagraph=sch_document_loader.graph, | |
| 369 idx=loader.idx, | |
| 370 cache=sch_document_loader.cache, | |
| 371 fetcher_constructor=loadingContext.fetcher_constructor, | |
| 372 skip_schemas=skip_schemas, | |
| 373 doc_cache=loadingContext.doc_cache, | |
| 374 ) | |
| 375 | |
| 376 if cwlVersion == "v1.0": | |
| 377 _add_blank_ids(workflowobj) | |
| 378 | |
| 379 document_loader.resolve_all(workflowobj, fileuri) | |
| 380 processobj, metadata = document_loader.resolve_ref(uri) | |
| 381 if not isinstance(processobj, (CommentedMap, CommentedSeq)): | |
| 382 raise ValidationException("Workflow must be a CommentedMap or CommentedSeq.") | |
| 383 | |
| 384 if not hasattr(processobj.lc, "filename"): | |
| 385 processobj.lc.filename = fileuri | |
| 386 | |
| 387 if loadingContext.metadata: | |
| 388 metadata = loadingContext.metadata | |
| 389 | |
| 390 if not isinstance(metadata, CommentedMap): | |
| 391 raise ValidationException( | |
| 392 "metadata must be a CommentedMap, was %s" % type(metadata) | |
| 393 ) | |
| 394 | |
| 395 if isinstance(processobj, CommentedMap): | |
| 396 uri = processobj["id"] | |
| 397 | |
| 398 _convert_stdstreams_to_files(workflowobj) | |
| 399 | |
| 400 if isinstance(jobobj, CommentedMap): | |
| 401 loadingContext.jobdefaults = jobobj | |
| 402 | |
| 403 loadingContext.loader = document_loader | |
| 404 loadingContext.avsc_names = avsc_names | |
| 405 loadingContext.metadata = metadata | |
| 406 | |
| 407 if preprocess_only: | |
| 408 return loadingContext, uri | |
| 409 | |
| 410 if loadingContext.do_validate: | |
| 411 validate_doc(avsc_names, processobj, document_loader, loadingContext.strict) | |
| 412 | |
| 413 # None means default behavior (do update) | |
| 414 if loadingContext.do_update in (True, None): | |
| 415 if "cwlVersion" not in metadata: | |
| 416 metadata["cwlVersion"] = cwlVersion | |
| 417 processobj = update.update( | |
| 418 processobj, document_loader, fileuri, loadingContext.enable_dev, metadata | |
| 419 ) | |
| 420 document_loader.idx[processobj["id"]] = processobj | |
| 421 | |
| 422 def update_index(pr: CommentedMap) -> None: | |
| 423 if "id" in pr: | |
| 424 document_loader.idx[pr["id"]] = pr | |
| 425 | |
| 426 visit_class( | |
| 427 processobj, ("CommandLineTool", "Workflow", "ExpressionTool"), update_index | |
| 428 ) | |
| 429 | |
| 430 return loadingContext, uri | |
| 431 | |
| 432 | |
| 433 def make_tool( | |
| 434 uri: Union[str, CommentedMap, CommentedSeq], loadingContext: LoadingContext | |
| 435 ) -> Process: | |
| 436 """Make a Python CWL object.""" | |
| 437 if loadingContext.loader is None: | |
| 438 raise ValueError("loadingContext must have a loader") | |
| 439 resolveduri, metadata = loadingContext.loader.resolve_ref(uri) | |
| 440 | |
| 441 processobj = None | |
| 442 if isinstance(resolveduri, MutableSequence): | |
| 443 for obj in resolveduri: | |
| 444 if obj["id"].endswith("#main"): | |
| 445 processobj = obj | |
| 446 break | |
| 447 if not processobj: | |
| 448 raise WorkflowException( | |
| 449 "Tool file contains graph of multiple objects, must specify " | |
| 450 "one of #%s" | |
| 451 % ", #".join( | |
| 452 urllib.parse.urldefrag(i["id"])[1] for i in resolveduri if "id" in i | |
| 453 ) | |
| 454 ) | |
| 455 elif isinstance(resolveduri, MutableMapping): | |
| 456 processobj = resolveduri | |
| 457 else: | |
| 458 raise Exception("Must resolve to list or dict") | |
| 459 | |
| 460 tool = loadingContext.construct_tool_object(processobj, loadingContext) | |
| 461 | |
| 462 if loadingContext.jobdefaults: | |
| 463 jobobj = loadingContext.jobdefaults | |
| 464 for inp in tool.tool["inputs"]: | |
| 465 if shortname(inp["id"]) in jobobj: | |
| 466 inp["default"] = jobobj[shortname(inp["id"])] | |
| 467 | |
| 468 return tool | |
| 469 | |
| 470 | |
| 471 def load_tool( | |
| 472 argsworkflow: Union[str, CWLObjectType], | |
| 473 loadingContext: Optional[LoadingContext] = None, | |
| 474 ) -> Process: | |
| 475 | |
| 476 loadingContext, workflowobj, uri = fetch_document(argsworkflow, loadingContext) | |
| 477 | |
| 478 loadingContext, uri = resolve_and_validate_document( | |
| 479 loadingContext, workflowobj, uri | |
| 480 ) | |
| 481 | |
| 482 return make_tool(uri, loadingContext) | |
| 483 | |
| 484 | |
| 485 def resolve_overrides( | |
| 486 ov: IdxResultType, | |
| 487 ov_uri: str, | |
| 488 baseurl: str, | |
| 489 ) -> List[CWLObjectType]: | |
| 490 ovloader = Loader(overrides_ctx) | |
| 491 ret, _ = ovloader.resolve_all(ov, baseurl) | |
| 492 if not isinstance(ret, CommentedMap): | |
| 493 raise Exception("Expected CommentedMap, got %s" % type(ret)) | |
| 494 cwl_docloader = get_schema("v1.0")[0] | |
| 495 cwl_docloader.resolve_all(ret, ov_uri) | |
| 496 return cast(List[CWLObjectType], ret["http://commonwl.org/cwltool#overrides"]) | |
| 497 | |
| 498 | |
| 499 def load_overrides(ov: str, base_url: str) -> List[CWLObjectType]: | |
| 500 ovloader = Loader(overrides_ctx) | |
| 501 return resolve_overrides(ovloader.fetch(ov), ov, base_url) | |
| 502 | |
| 503 | |
| 504 def recursive_resolve_and_validate_document( | |
| 505 loadingContext: LoadingContext, | |
| 506 workflowobj: Union[CommentedMap, CommentedSeq], | |
| 507 uri: str, | |
| 508 preprocess_only: bool = False, | |
| 509 skip_schemas: Optional[bool] = None, | |
| 510 ) -> Tuple[LoadingContext, str, Process]: | |
| 511 """Validate a CWL document, checking that a tool object can be built.""" | |
| 512 loadingContext, uri = resolve_and_validate_document( | |
| 513 loadingContext, | |
| 514 workflowobj, | |
| 515 uri, | |
| 516 preprocess_only=preprocess_only, | |
| 517 skip_schemas=skip_schemas, | |
| 518 ) | |
| 519 tool = make_tool(uri, loadingContext) | |
| 520 return loadingContext, uri, tool |
