comparison env/lib/python3.9/site-packages/cwltool/load_tool.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """Loads a CWL document."""
2
3 import hashlib
4 import logging
5 import os
6 import re
7 import urllib
8 import uuid
9 from typing import (
10 Any,
11 Dict,
12 List,
13 MutableMapping,
14 MutableSequence,
15 Optional,
16 Tuple,
17 Union,
18 cast,
19 )
20
21 from ruamel.yaml.comments import CommentedMap, CommentedSeq
22 from schema_salad.exceptions import ValidationException
23 from schema_salad.ref_resolver import Loader, file_uri
24 from schema_salad.schema import validate_doc
25 from schema_salad.sourceline import SourceLine, cmap
26 from schema_salad.utils import (
27 ContextType,
28 FetcherCallableType,
29 IdxResultType,
30 ResolveType,
31 json_dumps,
32 )
33
34 from . import CWL_CONTENT_TYPES, process, update
35 from .context import LoadingContext
36 from .errors import WorkflowException
37 from .loghandler import _logger
38 from .process import Process, get_schema, shortname
39 from .update import ALLUPDATES
40 from .utils import CWLObjectType, ResolverType, visit_class
41
42 jobloaderctx = {
43 "cwl": "https://w3id.org/cwl/cwl#",
44 "cwltool": "http://commonwl.org/cwltool#",
45 "path": {"@type": "@id"},
46 "location": {"@type": "@id"},
47 "id": "@id",
48 } # type: ContextType
49
50
51 overrides_ctx = {
52 "overrideTarget": {"@type": "@id"},
53 "cwltool": "http://commonwl.org/cwltool#",
54 "http://commonwl.org/cwltool#overrides": {
55 "@id": "cwltool:overrides",
56 "mapSubject": "overrideTarget",
57 },
58 "requirements": {
59 "@id": "https://w3id.org/cwl/cwl#requirements",
60 "mapSubject": "class",
61 },
62 } # type: ContextType
63
64
65 def default_loader(
66 fetcher_constructor: Optional[FetcherCallableType] = None,
67 enable_dev: bool = False,
68 doc_cache: bool = True,
69 ) -> Loader:
70 return Loader(
71 jobloaderctx,
72 fetcher_constructor=fetcher_constructor,
73 allow_attachments=lambda r: enable_dev,
74 doc_cache=doc_cache,
75 )
76
77
78 def resolve_tool_uri(
79 argsworkflow: str,
80 resolver: Optional[ResolverType] = None,
81 fetcher_constructor: Optional[FetcherCallableType] = None,
82 document_loader: Optional[Loader] = None,
83 ) -> Tuple[str, str]:
84
85 uri = None # type: Optional[str]
86 split = urllib.parse.urlsplit(argsworkflow)
87 # In case of Windows path, urlsplit misjudge Drive letters as scheme, here we are skipping that
88 if split.scheme and split.scheme in ["http", "https", "file"]:
89 uri = argsworkflow
90 elif os.path.exists(os.path.abspath(argsworkflow)):
91 uri = file_uri(str(os.path.abspath(argsworkflow)))
92 elif resolver is not None:
93 uri = resolver(
94 document_loader or default_loader(fetcher_constructor), argsworkflow
95 )
96
97 if uri is None:
98 raise ValidationException("Not found: '%s'" % argsworkflow)
99
100 if argsworkflow != uri:
101 _logger.info("Resolved '%s' to '%s'", argsworkflow, uri)
102
103 fileuri = urllib.parse.urldefrag(uri)[0]
104 return uri, fileuri
105
106
107 def fetch_document(
108 argsworkflow: Union[str, CWLObjectType],
109 loadingContext: Optional[LoadingContext] = None,
110 ) -> Tuple[LoadingContext, CommentedMap, str]:
111 """Retrieve a CWL document."""
112 if loadingContext is None:
113 loadingContext = LoadingContext()
114 loadingContext.loader = default_loader()
115 else:
116 loadingContext = loadingContext.copy()
117 if loadingContext.loader is None:
118 loadingContext.loader = default_loader(
119 loadingContext.fetcher_constructor,
120 enable_dev=loadingContext.enable_dev,
121 doc_cache=loadingContext.doc_cache,
122 )
123
124 if isinstance(argsworkflow, str):
125 uri, fileuri = resolve_tool_uri(
126 argsworkflow,
127 resolver=loadingContext.resolver,
128 document_loader=loadingContext.loader,
129 )
130 workflowobj = cast(
131 CommentedMap,
132 loadingContext.loader.fetch(fileuri, content_types=CWL_CONTENT_TYPES),
133 )
134 return loadingContext, workflowobj, uri
135 if isinstance(argsworkflow, MutableMapping):
136 uri = (
137 cast(str, argsworkflow["id"])
138 if argsworkflow.get("id")
139 else "_:" + str(uuid.uuid4())
140 )
141 workflowobj = cast(
142 CommentedMap, cmap(cast(Dict[str, Any], argsworkflow), fn=uri)
143 )
144 loadingContext.loader.idx[uri] = workflowobj
145 return loadingContext, workflowobj, uri
146 raise ValidationException("Must be URI or object: '%s'" % argsworkflow)
147
148
149 def _convert_stdstreams_to_files(
150 workflowobj: Union[
151 CWLObjectType, MutableSequence[Union[CWLObjectType, str, int]], str
152 ]
153 ) -> None:
154 if isinstance(workflowobj, MutableMapping):
155 if workflowobj.get("class") == "CommandLineTool":
156 with SourceLine(
157 workflowobj,
158 "outputs",
159 ValidationException,
160 _logger.isEnabledFor(logging.DEBUG),
161 ):
162 outputs = workflowobj.get("outputs", [])
163 if not isinstance(outputs, CommentedSeq):
164 raise ValidationException('"outputs" section is not ' "valid.")
165 for out in cast(
166 MutableSequence[CWLObjectType], workflowobj.get("outputs", [])
167 ):
168 if not isinstance(out, CommentedMap):
169 raise ValidationException(
170 f"Output '{out}' is not a valid OutputParameter."
171 )
172 for streamtype in ["stdout", "stderr"]:
173 if out.get("type") == streamtype:
174 if "outputBinding" in out:
175 raise ValidationException(
176 "Not allowed to specify outputBinding when"
177 " using %s shortcut." % streamtype
178 )
179 if streamtype in workflowobj:
180 filename = workflowobj[streamtype]
181 else:
182 filename = str(
183 hashlib.sha1( # nosec
184 json_dumps(workflowobj, sort_keys=True).encode(
185 "utf-8"
186 )
187 ).hexdigest()
188 )
189 workflowobj[streamtype] = filename
190 out["type"] = "File"
191 out["outputBinding"] = cmap({"glob": filename})
192 for inp in cast(
193 MutableSequence[CWLObjectType], workflowobj.get("inputs", [])
194 ):
195 if inp.get("type") == "stdin":
196 if "inputBinding" in inp:
197 raise ValidationException(
198 "Not allowed to specify inputBinding when"
199 " using stdin shortcut."
200 )
201 if "stdin" in workflowobj:
202 raise ValidationException(
203 "Not allowed to specify stdin path when"
204 " using stdin type shortcut."
205 )
206 else:
207 workflowobj["stdin"] = (
208 "$(inputs.%s.path)"
209 % cast(str, inp["id"]).rpartition("#")[2]
210 )
211 inp["type"] = "File"
212 else:
213 for entry in workflowobj.values():
214 _convert_stdstreams_to_files(
215 cast(
216 Union[
217 CWLObjectType,
218 MutableSequence[Union[CWLObjectType, str, int]],
219 str,
220 ],
221 entry,
222 )
223 )
224 if isinstance(workflowobj, MutableSequence):
225 for entry in workflowobj:
226 _convert_stdstreams_to_files(
227 cast(
228 Union[
229 CWLObjectType,
230 MutableSequence[Union[CWLObjectType, str, int]],
231 str,
232 ],
233 entry,
234 )
235 )
236
237
238 def _add_blank_ids(
239 workflowobj: Union[CWLObjectType, MutableSequence[Union[CWLObjectType, str]]]
240 ) -> None:
241 if isinstance(workflowobj, MutableMapping):
242 if (
243 "run" in workflowobj
244 and isinstance(workflowobj["run"], MutableMapping)
245 and "id" not in workflowobj["run"]
246 and "$import" not in workflowobj["run"]
247 ):
248 workflowobj["run"]["id"] = str(uuid.uuid4())
249 for entry in workflowobj.values():
250 _add_blank_ids(
251 cast(
252 Union[CWLObjectType, MutableSequence[Union[CWLObjectType, str]]],
253 entry,
254 )
255 )
256 if isinstance(workflowobj, MutableSequence):
257 for entry in workflowobj:
258 _add_blank_ids(
259 cast(
260 Union[CWLObjectType, MutableSequence[Union[CWLObjectType, str]]],
261 entry,
262 )
263 )
264
265
266 def resolve_and_validate_document(
267 loadingContext: LoadingContext,
268 workflowobj: Union[CommentedMap, CommentedSeq],
269 uri: str,
270 preprocess_only: bool = False,
271 skip_schemas: Optional[bool] = None,
272 ) -> Tuple[LoadingContext, str]:
273 """Validate a CWL document."""
274 if not loadingContext.loader:
275 raise ValueError("loadingContext must have a loader.")
276 else:
277 loader = loadingContext.loader
278 loadingContext = loadingContext.copy()
279
280 if not isinstance(workflowobj, MutableMapping):
281 raise ValueError(
282 "workflowjobj must be a dict, got '{}': {}".format(
283 type(workflowobj), workflowobj
284 )
285 )
286
287 jobobj = None
288 if "cwl:tool" in workflowobj:
289 jobobj, _ = loader.resolve_all(workflowobj, uri)
290 uri = urllib.parse.urljoin(uri, workflowobj["https://w3id.org/cwl/cwl#tool"])
291 del cast(Dict[str, Any], jobobj)["https://w3id.org/cwl/cwl#tool"]
292
293 workflowobj = fetch_document(uri, loadingContext)[1]
294
295 fileuri = urllib.parse.urldefrag(uri)[0]
296
297 cwlVersion = loadingContext.metadata.get("cwlVersion")
298 if not cwlVersion:
299 cwlVersion = workflowobj.get("cwlVersion")
300 if not cwlVersion and fileuri != uri:
301 # The tool we're loading is a fragment of a bigger file. Get
302 # the document root element and look for cwlVersion there.
303 metadata = cast(CWLObjectType, fetch_document(fileuri, loadingContext)[1])
304 cwlVersion = cast(str, metadata.get("cwlVersion"))
305 if not cwlVersion:
306 raise ValidationException(
307 "No cwlVersion found. "
308 "Use the following syntax in your CWL document to declare "
309 "the version: cwlVersion: <version>.\n"
310 "Note: if this is a CWL draft-2 (pre v1.0) document then it "
311 "will need to be upgraded first."
312 )
313
314 if not isinstance(cwlVersion, str):
315 with SourceLine(workflowobj, "cwlVersion", ValidationException):
316 raise ValidationException(
317 "'cwlVersion' must be a string, got {}".format(type(cwlVersion))
318 )
319 # strip out version
320 cwlVersion = re.sub(r"^(?:cwl:|https://w3id.org/cwl/cwl#)", "", cwlVersion)
321 if cwlVersion not in list(ALLUPDATES):
322 # print out all the Supported Versions of cwlVersion
323 versions = []
324 for version in list(ALLUPDATES):
325 if "dev" in version:
326 version += " (with --enable-dev flag only)"
327 versions.append(version)
328 versions.sort()
329 raise ValidationException(
330 "The CWL reference runner no longer supports pre CWL v1.0 "
331 "documents. Supported versions are: "
332 "\n{}".format("\n".join(versions))
333 )
334
335 if (
336 isinstance(jobobj, CommentedMap)
337 and "http://commonwl.org/cwltool#overrides" in jobobj
338 ):
339 loadingContext.overrides_list.extend(resolve_overrides(jobobj, uri, uri))
340 del jobobj["http://commonwl.org/cwltool#overrides"]
341
342 if (
343 isinstance(jobobj, CommentedMap)
344 and "https://w3id.org/cwl/cwl#requirements" in jobobj
345 ):
346 if cwlVersion not in ("v1.1.0-dev1", "v1.1"):
347 raise ValidationException(
348 "`cwl:requirements` in the input object is not part of CWL "
349 "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
350 "can set the cwlVersion to v1.1 or greater."
351 )
352 loadingContext.overrides_list.append(
353 {
354 "overrideTarget": uri,
355 "requirements": jobobj["https://w3id.org/cwl/cwl#requirements"],
356 }
357 )
358 del jobobj["https://w3id.org/cwl/cwl#requirements"]
359
360 (sch_document_loader, avsc_names) = process.get_schema(cwlVersion)[:2]
361
362 if isinstance(avsc_names, Exception):
363 raise avsc_names
364
365 processobj = None # type: Optional[ResolveType]
366 document_loader = Loader(
367 sch_document_loader.ctx,
368 schemagraph=sch_document_loader.graph,
369 idx=loader.idx,
370 cache=sch_document_loader.cache,
371 fetcher_constructor=loadingContext.fetcher_constructor,
372 skip_schemas=skip_schemas,
373 doc_cache=loadingContext.doc_cache,
374 )
375
376 if cwlVersion == "v1.0":
377 _add_blank_ids(workflowobj)
378
379 document_loader.resolve_all(workflowobj, fileuri)
380 processobj, metadata = document_loader.resolve_ref(uri)
381 if not isinstance(processobj, (CommentedMap, CommentedSeq)):
382 raise ValidationException("Workflow must be a CommentedMap or CommentedSeq.")
383
384 if not hasattr(processobj.lc, "filename"):
385 processobj.lc.filename = fileuri
386
387 if loadingContext.metadata:
388 metadata = loadingContext.metadata
389
390 if not isinstance(metadata, CommentedMap):
391 raise ValidationException(
392 "metadata must be a CommentedMap, was %s" % type(metadata)
393 )
394
395 if isinstance(processobj, CommentedMap):
396 uri = processobj["id"]
397
398 _convert_stdstreams_to_files(workflowobj)
399
400 if isinstance(jobobj, CommentedMap):
401 loadingContext.jobdefaults = jobobj
402
403 loadingContext.loader = document_loader
404 loadingContext.avsc_names = avsc_names
405 loadingContext.metadata = metadata
406
407 if preprocess_only:
408 return loadingContext, uri
409
410 if loadingContext.do_validate:
411 validate_doc(avsc_names, processobj, document_loader, loadingContext.strict)
412
413 # None means default behavior (do update)
414 if loadingContext.do_update in (True, None):
415 if "cwlVersion" not in metadata:
416 metadata["cwlVersion"] = cwlVersion
417 processobj = update.update(
418 processobj, document_loader, fileuri, loadingContext.enable_dev, metadata
419 )
420 document_loader.idx[processobj["id"]] = processobj
421
422 def update_index(pr: CommentedMap) -> None:
423 if "id" in pr:
424 document_loader.idx[pr["id"]] = pr
425
426 visit_class(
427 processobj, ("CommandLineTool", "Workflow", "ExpressionTool"), update_index
428 )
429
430 return loadingContext, uri
431
432
433 def make_tool(
434 uri: Union[str, CommentedMap, CommentedSeq], loadingContext: LoadingContext
435 ) -> Process:
436 """Make a Python CWL object."""
437 if loadingContext.loader is None:
438 raise ValueError("loadingContext must have a loader")
439 resolveduri, metadata = loadingContext.loader.resolve_ref(uri)
440
441 processobj = None
442 if isinstance(resolveduri, MutableSequence):
443 for obj in resolveduri:
444 if obj["id"].endswith("#main"):
445 processobj = obj
446 break
447 if not processobj:
448 raise WorkflowException(
449 "Tool file contains graph of multiple objects, must specify "
450 "one of #%s"
451 % ", #".join(
452 urllib.parse.urldefrag(i["id"])[1] for i in resolveduri if "id" in i
453 )
454 )
455 elif isinstance(resolveduri, MutableMapping):
456 processobj = resolveduri
457 else:
458 raise Exception("Must resolve to list or dict")
459
460 tool = loadingContext.construct_tool_object(processobj, loadingContext)
461
462 if loadingContext.jobdefaults:
463 jobobj = loadingContext.jobdefaults
464 for inp in tool.tool["inputs"]:
465 if shortname(inp["id"]) in jobobj:
466 inp["default"] = jobobj[shortname(inp["id"])]
467
468 return tool
469
470
471 def load_tool(
472 argsworkflow: Union[str, CWLObjectType],
473 loadingContext: Optional[LoadingContext] = None,
474 ) -> Process:
475
476 loadingContext, workflowobj, uri = fetch_document(argsworkflow, loadingContext)
477
478 loadingContext, uri = resolve_and_validate_document(
479 loadingContext, workflowobj, uri
480 )
481
482 return make_tool(uri, loadingContext)
483
484
485 def resolve_overrides(
486 ov: IdxResultType,
487 ov_uri: str,
488 baseurl: str,
489 ) -> List[CWLObjectType]:
490 ovloader = Loader(overrides_ctx)
491 ret, _ = ovloader.resolve_all(ov, baseurl)
492 if not isinstance(ret, CommentedMap):
493 raise Exception("Expected CommentedMap, got %s" % type(ret))
494 cwl_docloader = get_schema("v1.0")[0]
495 cwl_docloader.resolve_all(ret, ov_uri)
496 return cast(List[CWLObjectType], ret["http://commonwl.org/cwltool#overrides"])
497
498
499 def load_overrides(ov: str, base_url: str) -> List[CWLObjectType]:
500 ovloader = Loader(overrides_ctx)
501 return resolve_overrides(ovloader.fetch(ov), ov, base_url)
502
503
504 def recursive_resolve_and_validate_document(
505 loadingContext: LoadingContext,
506 workflowobj: Union[CommentedMap, CommentedSeq],
507 uri: str,
508 preprocess_only: bool = False,
509 skip_schemas: Optional[bool] = None,
510 ) -> Tuple[LoadingContext, str, Process]:
511 """Validate a CWL document, checking that a tool object can be built."""
512 loadingContext, uri = resolve_and_validate_document(
513 loadingContext,
514 workflowobj,
515 uri,
516 preprocess_only=preprocess_only,
517 skip_schemas=skip_schemas,
518 )
519 tool = make_tool(uri, loadingContext)
520 return loadingContext, uri, tool