Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/cwltool/load_tool.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 """Loads a CWL document.""" | |
2 | |
3 import hashlib | |
4 import logging | |
5 import os | |
6 import re | |
7 import urllib | |
8 import uuid | |
9 from typing import ( | |
10 Any, | |
11 Dict, | |
12 List, | |
13 MutableMapping, | |
14 MutableSequence, | |
15 Optional, | |
16 Tuple, | |
17 Union, | |
18 cast, | |
19 ) | |
20 | |
21 from ruamel.yaml.comments import CommentedMap, CommentedSeq | |
22 from schema_salad.exceptions import ValidationException | |
23 from schema_salad.ref_resolver import Loader, file_uri | |
24 from schema_salad.schema import validate_doc | |
25 from schema_salad.sourceline import SourceLine, cmap | |
26 from schema_salad.utils import ( | |
27 ContextType, | |
28 FetcherCallableType, | |
29 IdxResultType, | |
30 ResolveType, | |
31 json_dumps, | |
32 ) | |
33 | |
34 from . import CWL_CONTENT_TYPES, process, update | |
35 from .context import LoadingContext | |
36 from .errors import WorkflowException | |
37 from .loghandler import _logger | |
38 from .process import Process, get_schema, shortname | |
39 from .update import ALLUPDATES | |
40 from .utils import CWLObjectType, ResolverType, visit_class | |
41 | |
42 jobloaderctx = { | |
43 "cwl": "https://w3id.org/cwl/cwl#", | |
44 "cwltool": "http://commonwl.org/cwltool#", | |
45 "path": {"@type": "@id"}, | |
46 "location": {"@type": "@id"}, | |
47 "id": "@id", | |
48 } # type: ContextType | |
49 | |
50 | |
51 overrides_ctx = { | |
52 "overrideTarget": {"@type": "@id"}, | |
53 "cwltool": "http://commonwl.org/cwltool#", | |
54 "http://commonwl.org/cwltool#overrides": { | |
55 "@id": "cwltool:overrides", | |
56 "mapSubject": "overrideTarget", | |
57 }, | |
58 "requirements": { | |
59 "@id": "https://w3id.org/cwl/cwl#requirements", | |
60 "mapSubject": "class", | |
61 }, | |
62 } # type: ContextType | |
63 | |
64 | |
65 def default_loader( | |
66 fetcher_constructor: Optional[FetcherCallableType] = None, | |
67 enable_dev: bool = False, | |
68 doc_cache: bool = True, | |
69 ) -> Loader: | |
70 return Loader( | |
71 jobloaderctx, | |
72 fetcher_constructor=fetcher_constructor, | |
73 allow_attachments=lambda r: enable_dev, | |
74 doc_cache=doc_cache, | |
75 ) | |
76 | |
77 | |
78 def resolve_tool_uri( | |
79 argsworkflow: str, | |
80 resolver: Optional[ResolverType] = None, | |
81 fetcher_constructor: Optional[FetcherCallableType] = None, | |
82 document_loader: Optional[Loader] = None, | |
83 ) -> Tuple[str, str]: | |
84 | |
85 uri = None # type: Optional[str] | |
86 split = urllib.parse.urlsplit(argsworkflow) | |
87 # In case of Windows path, urlsplit misjudge Drive letters as scheme, here we are skipping that | |
88 if split.scheme and split.scheme in ["http", "https", "file"]: | |
89 uri = argsworkflow | |
90 elif os.path.exists(os.path.abspath(argsworkflow)): | |
91 uri = file_uri(str(os.path.abspath(argsworkflow))) | |
92 elif resolver is not None: | |
93 uri = resolver( | |
94 document_loader or default_loader(fetcher_constructor), argsworkflow | |
95 ) | |
96 | |
97 if uri is None: | |
98 raise ValidationException("Not found: '%s'" % argsworkflow) | |
99 | |
100 if argsworkflow != uri: | |
101 _logger.info("Resolved '%s' to '%s'", argsworkflow, uri) | |
102 | |
103 fileuri = urllib.parse.urldefrag(uri)[0] | |
104 return uri, fileuri | |
105 | |
106 | |
107 def fetch_document( | |
108 argsworkflow: Union[str, CWLObjectType], | |
109 loadingContext: Optional[LoadingContext] = None, | |
110 ) -> Tuple[LoadingContext, CommentedMap, str]: | |
111 """Retrieve a CWL document.""" | |
112 if loadingContext is None: | |
113 loadingContext = LoadingContext() | |
114 loadingContext.loader = default_loader() | |
115 else: | |
116 loadingContext = loadingContext.copy() | |
117 if loadingContext.loader is None: | |
118 loadingContext.loader = default_loader( | |
119 loadingContext.fetcher_constructor, | |
120 enable_dev=loadingContext.enable_dev, | |
121 doc_cache=loadingContext.doc_cache, | |
122 ) | |
123 | |
124 if isinstance(argsworkflow, str): | |
125 uri, fileuri = resolve_tool_uri( | |
126 argsworkflow, | |
127 resolver=loadingContext.resolver, | |
128 document_loader=loadingContext.loader, | |
129 ) | |
130 workflowobj = cast( | |
131 CommentedMap, | |
132 loadingContext.loader.fetch(fileuri, content_types=CWL_CONTENT_TYPES), | |
133 ) | |
134 return loadingContext, workflowobj, uri | |
135 if isinstance(argsworkflow, MutableMapping): | |
136 uri = ( | |
137 cast(str, argsworkflow["id"]) | |
138 if argsworkflow.get("id") | |
139 else "_:" + str(uuid.uuid4()) | |
140 ) | |
141 workflowobj = cast( | |
142 CommentedMap, cmap(cast(Dict[str, Any], argsworkflow), fn=uri) | |
143 ) | |
144 loadingContext.loader.idx[uri] = workflowobj | |
145 return loadingContext, workflowobj, uri | |
146 raise ValidationException("Must be URI or object: '%s'" % argsworkflow) | |
147 | |
148 | |
149 def _convert_stdstreams_to_files( | |
150 workflowobj: Union[ | |
151 CWLObjectType, MutableSequence[Union[CWLObjectType, str, int]], str | |
152 ] | |
153 ) -> None: | |
154 if isinstance(workflowobj, MutableMapping): | |
155 if workflowobj.get("class") == "CommandLineTool": | |
156 with SourceLine( | |
157 workflowobj, | |
158 "outputs", | |
159 ValidationException, | |
160 _logger.isEnabledFor(logging.DEBUG), | |
161 ): | |
162 outputs = workflowobj.get("outputs", []) | |
163 if not isinstance(outputs, CommentedSeq): | |
164 raise ValidationException('"outputs" section is not ' "valid.") | |
165 for out in cast( | |
166 MutableSequence[CWLObjectType], workflowobj.get("outputs", []) | |
167 ): | |
168 if not isinstance(out, CommentedMap): | |
169 raise ValidationException( | |
170 f"Output '{out}' is not a valid OutputParameter." | |
171 ) | |
172 for streamtype in ["stdout", "stderr"]: | |
173 if out.get("type") == streamtype: | |
174 if "outputBinding" in out: | |
175 raise ValidationException( | |
176 "Not allowed to specify outputBinding when" | |
177 " using %s shortcut." % streamtype | |
178 ) | |
179 if streamtype in workflowobj: | |
180 filename = workflowobj[streamtype] | |
181 else: | |
182 filename = str( | |
183 hashlib.sha1( # nosec | |
184 json_dumps(workflowobj, sort_keys=True).encode( | |
185 "utf-8" | |
186 ) | |
187 ).hexdigest() | |
188 ) | |
189 workflowobj[streamtype] = filename | |
190 out["type"] = "File" | |
191 out["outputBinding"] = cmap({"glob": filename}) | |
192 for inp in cast( | |
193 MutableSequence[CWLObjectType], workflowobj.get("inputs", []) | |
194 ): | |
195 if inp.get("type") == "stdin": | |
196 if "inputBinding" in inp: | |
197 raise ValidationException( | |
198 "Not allowed to specify inputBinding when" | |
199 " using stdin shortcut." | |
200 ) | |
201 if "stdin" in workflowobj: | |
202 raise ValidationException( | |
203 "Not allowed to specify stdin path when" | |
204 " using stdin type shortcut." | |
205 ) | |
206 else: | |
207 workflowobj["stdin"] = ( | |
208 "$(inputs.%s.path)" | |
209 % cast(str, inp["id"]).rpartition("#")[2] | |
210 ) | |
211 inp["type"] = "File" | |
212 else: | |
213 for entry in workflowobj.values(): | |
214 _convert_stdstreams_to_files( | |
215 cast( | |
216 Union[ | |
217 CWLObjectType, | |
218 MutableSequence[Union[CWLObjectType, str, int]], | |
219 str, | |
220 ], | |
221 entry, | |
222 ) | |
223 ) | |
224 if isinstance(workflowobj, MutableSequence): | |
225 for entry in workflowobj: | |
226 _convert_stdstreams_to_files( | |
227 cast( | |
228 Union[ | |
229 CWLObjectType, | |
230 MutableSequence[Union[CWLObjectType, str, int]], | |
231 str, | |
232 ], | |
233 entry, | |
234 ) | |
235 ) | |
236 | |
237 | |
238 def _add_blank_ids( | |
239 workflowobj: Union[CWLObjectType, MutableSequence[Union[CWLObjectType, str]]] | |
240 ) -> None: | |
241 if isinstance(workflowobj, MutableMapping): | |
242 if ( | |
243 "run" in workflowobj | |
244 and isinstance(workflowobj["run"], MutableMapping) | |
245 and "id" not in workflowobj["run"] | |
246 and "$import" not in workflowobj["run"] | |
247 ): | |
248 workflowobj["run"]["id"] = str(uuid.uuid4()) | |
249 for entry in workflowobj.values(): | |
250 _add_blank_ids( | |
251 cast( | |
252 Union[CWLObjectType, MutableSequence[Union[CWLObjectType, str]]], | |
253 entry, | |
254 ) | |
255 ) | |
256 if isinstance(workflowobj, MutableSequence): | |
257 for entry in workflowobj: | |
258 _add_blank_ids( | |
259 cast( | |
260 Union[CWLObjectType, MutableSequence[Union[CWLObjectType, str]]], | |
261 entry, | |
262 ) | |
263 ) | |
264 | |
265 | |
266 def resolve_and_validate_document( | |
267 loadingContext: LoadingContext, | |
268 workflowobj: Union[CommentedMap, CommentedSeq], | |
269 uri: str, | |
270 preprocess_only: bool = False, | |
271 skip_schemas: Optional[bool] = None, | |
272 ) -> Tuple[LoadingContext, str]: | |
273 """Validate a CWL document.""" | |
274 if not loadingContext.loader: | |
275 raise ValueError("loadingContext must have a loader.") | |
276 else: | |
277 loader = loadingContext.loader | |
278 loadingContext = loadingContext.copy() | |
279 | |
280 if not isinstance(workflowobj, MutableMapping): | |
281 raise ValueError( | |
282 "workflowjobj must be a dict, got '{}': {}".format( | |
283 type(workflowobj), workflowobj | |
284 ) | |
285 ) | |
286 | |
287 jobobj = None | |
288 if "cwl:tool" in workflowobj: | |
289 jobobj, _ = loader.resolve_all(workflowobj, uri) | |
290 uri = urllib.parse.urljoin(uri, workflowobj["https://w3id.org/cwl/cwl#tool"]) | |
291 del cast(Dict[str, Any], jobobj)["https://w3id.org/cwl/cwl#tool"] | |
292 | |
293 workflowobj = fetch_document(uri, loadingContext)[1] | |
294 | |
295 fileuri = urllib.parse.urldefrag(uri)[0] | |
296 | |
297 cwlVersion = loadingContext.metadata.get("cwlVersion") | |
298 if not cwlVersion: | |
299 cwlVersion = workflowobj.get("cwlVersion") | |
300 if not cwlVersion and fileuri != uri: | |
301 # The tool we're loading is a fragment of a bigger file. Get | |
302 # the document root element and look for cwlVersion there. | |
303 metadata = cast(CWLObjectType, fetch_document(fileuri, loadingContext)[1]) | |
304 cwlVersion = cast(str, metadata.get("cwlVersion")) | |
305 if not cwlVersion: | |
306 raise ValidationException( | |
307 "No cwlVersion found. " | |
308 "Use the following syntax in your CWL document to declare " | |
309 "the version: cwlVersion: <version>.\n" | |
310 "Note: if this is a CWL draft-2 (pre v1.0) document then it " | |
311 "will need to be upgraded first." | |
312 ) | |
313 | |
314 if not isinstance(cwlVersion, str): | |
315 with SourceLine(workflowobj, "cwlVersion", ValidationException): | |
316 raise ValidationException( | |
317 "'cwlVersion' must be a string, got {}".format(type(cwlVersion)) | |
318 ) | |
319 # strip out version | |
320 cwlVersion = re.sub(r"^(?:cwl:|https://w3id.org/cwl/cwl#)", "", cwlVersion) | |
321 if cwlVersion not in list(ALLUPDATES): | |
322 # print out all the Supported Versions of cwlVersion | |
323 versions = [] | |
324 for version in list(ALLUPDATES): | |
325 if "dev" in version: | |
326 version += " (with --enable-dev flag only)" | |
327 versions.append(version) | |
328 versions.sort() | |
329 raise ValidationException( | |
330 "The CWL reference runner no longer supports pre CWL v1.0 " | |
331 "documents. Supported versions are: " | |
332 "\n{}".format("\n".join(versions)) | |
333 ) | |
334 | |
335 if ( | |
336 isinstance(jobobj, CommentedMap) | |
337 and "http://commonwl.org/cwltool#overrides" in jobobj | |
338 ): | |
339 loadingContext.overrides_list.extend(resolve_overrides(jobobj, uri, uri)) | |
340 del jobobj["http://commonwl.org/cwltool#overrides"] | |
341 | |
342 if ( | |
343 isinstance(jobobj, CommentedMap) | |
344 and "https://w3id.org/cwl/cwl#requirements" in jobobj | |
345 ): | |
346 if cwlVersion not in ("v1.1.0-dev1", "v1.1"): | |
347 raise ValidationException( | |
348 "`cwl:requirements` in the input object is not part of CWL " | |
349 "v1.0. You can adjust to use `cwltool:overrides` instead; or you " | |
350 "can set the cwlVersion to v1.1 or greater." | |
351 ) | |
352 loadingContext.overrides_list.append( | |
353 { | |
354 "overrideTarget": uri, | |
355 "requirements": jobobj["https://w3id.org/cwl/cwl#requirements"], | |
356 } | |
357 ) | |
358 del jobobj["https://w3id.org/cwl/cwl#requirements"] | |
359 | |
360 (sch_document_loader, avsc_names) = process.get_schema(cwlVersion)[:2] | |
361 | |
362 if isinstance(avsc_names, Exception): | |
363 raise avsc_names | |
364 | |
365 processobj = None # type: Optional[ResolveType] | |
366 document_loader = Loader( | |
367 sch_document_loader.ctx, | |
368 schemagraph=sch_document_loader.graph, | |
369 idx=loader.idx, | |
370 cache=sch_document_loader.cache, | |
371 fetcher_constructor=loadingContext.fetcher_constructor, | |
372 skip_schemas=skip_schemas, | |
373 doc_cache=loadingContext.doc_cache, | |
374 ) | |
375 | |
376 if cwlVersion == "v1.0": | |
377 _add_blank_ids(workflowobj) | |
378 | |
379 document_loader.resolve_all(workflowobj, fileuri) | |
380 processobj, metadata = document_loader.resolve_ref(uri) | |
381 if not isinstance(processobj, (CommentedMap, CommentedSeq)): | |
382 raise ValidationException("Workflow must be a CommentedMap or CommentedSeq.") | |
383 | |
384 if not hasattr(processobj.lc, "filename"): | |
385 processobj.lc.filename = fileuri | |
386 | |
387 if loadingContext.metadata: | |
388 metadata = loadingContext.metadata | |
389 | |
390 if not isinstance(metadata, CommentedMap): | |
391 raise ValidationException( | |
392 "metadata must be a CommentedMap, was %s" % type(metadata) | |
393 ) | |
394 | |
395 if isinstance(processobj, CommentedMap): | |
396 uri = processobj["id"] | |
397 | |
398 _convert_stdstreams_to_files(workflowobj) | |
399 | |
400 if isinstance(jobobj, CommentedMap): | |
401 loadingContext.jobdefaults = jobobj | |
402 | |
403 loadingContext.loader = document_loader | |
404 loadingContext.avsc_names = avsc_names | |
405 loadingContext.metadata = metadata | |
406 | |
407 if preprocess_only: | |
408 return loadingContext, uri | |
409 | |
410 if loadingContext.do_validate: | |
411 validate_doc(avsc_names, processobj, document_loader, loadingContext.strict) | |
412 | |
413 # None means default behavior (do update) | |
414 if loadingContext.do_update in (True, None): | |
415 if "cwlVersion" not in metadata: | |
416 metadata["cwlVersion"] = cwlVersion | |
417 processobj = update.update( | |
418 processobj, document_loader, fileuri, loadingContext.enable_dev, metadata | |
419 ) | |
420 document_loader.idx[processobj["id"]] = processobj | |
421 | |
422 def update_index(pr: CommentedMap) -> None: | |
423 if "id" in pr: | |
424 document_loader.idx[pr["id"]] = pr | |
425 | |
426 visit_class( | |
427 processobj, ("CommandLineTool", "Workflow", "ExpressionTool"), update_index | |
428 ) | |
429 | |
430 return loadingContext, uri | |
431 | |
432 | |
433 def make_tool( | |
434 uri: Union[str, CommentedMap, CommentedSeq], loadingContext: LoadingContext | |
435 ) -> Process: | |
436 """Make a Python CWL object.""" | |
437 if loadingContext.loader is None: | |
438 raise ValueError("loadingContext must have a loader") | |
439 resolveduri, metadata = loadingContext.loader.resolve_ref(uri) | |
440 | |
441 processobj = None | |
442 if isinstance(resolveduri, MutableSequence): | |
443 for obj in resolveduri: | |
444 if obj["id"].endswith("#main"): | |
445 processobj = obj | |
446 break | |
447 if not processobj: | |
448 raise WorkflowException( | |
449 "Tool file contains graph of multiple objects, must specify " | |
450 "one of #%s" | |
451 % ", #".join( | |
452 urllib.parse.urldefrag(i["id"])[1] for i in resolveduri if "id" in i | |
453 ) | |
454 ) | |
455 elif isinstance(resolveduri, MutableMapping): | |
456 processobj = resolveduri | |
457 else: | |
458 raise Exception("Must resolve to list or dict") | |
459 | |
460 tool = loadingContext.construct_tool_object(processobj, loadingContext) | |
461 | |
462 if loadingContext.jobdefaults: | |
463 jobobj = loadingContext.jobdefaults | |
464 for inp in tool.tool["inputs"]: | |
465 if shortname(inp["id"]) in jobobj: | |
466 inp["default"] = jobobj[shortname(inp["id"])] | |
467 | |
468 return tool | |
469 | |
470 | |
471 def load_tool( | |
472 argsworkflow: Union[str, CWLObjectType], | |
473 loadingContext: Optional[LoadingContext] = None, | |
474 ) -> Process: | |
475 | |
476 loadingContext, workflowobj, uri = fetch_document(argsworkflow, loadingContext) | |
477 | |
478 loadingContext, uri = resolve_and_validate_document( | |
479 loadingContext, workflowobj, uri | |
480 ) | |
481 | |
482 return make_tool(uri, loadingContext) | |
483 | |
484 | |
485 def resolve_overrides( | |
486 ov: IdxResultType, | |
487 ov_uri: str, | |
488 baseurl: str, | |
489 ) -> List[CWLObjectType]: | |
490 ovloader = Loader(overrides_ctx) | |
491 ret, _ = ovloader.resolve_all(ov, baseurl) | |
492 if not isinstance(ret, CommentedMap): | |
493 raise Exception("Expected CommentedMap, got %s" % type(ret)) | |
494 cwl_docloader = get_schema("v1.0")[0] | |
495 cwl_docloader.resolve_all(ret, ov_uri) | |
496 return cast(List[CWLObjectType], ret["http://commonwl.org/cwltool#overrides"]) | |
497 | |
498 | |
499 def load_overrides(ov: str, base_url: str) -> List[CWLObjectType]: | |
500 ovloader = Loader(overrides_ctx) | |
501 return resolve_overrides(ovloader.fetch(ov), ov, base_url) | |
502 | |
503 | |
504 def recursive_resolve_and_validate_document( | |
505 loadingContext: LoadingContext, | |
506 workflowobj: Union[CommentedMap, CommentedSeq], | |
507 uri: str, | |
508 preprocess_only: bool = False, | |
509 skip_schemas: Optional[bool] = None, | |
510 ) -> Tuple[LoadingContext, str, Process]: | |
511 """Validate a CWL document, checking that a tool object can be built.""" | |
512 loadingContext, uri = resolve_and_validate_document( | |
513 loadingContext, | |
514 workflowobj, | |
515 uri, | |
516 preprocess_only=preprocess_only, | |
517 skip_schemas=skip_schemas, | |
518 ) | |
519 tool = make_tool(uri, loadingContext) | |
520 return loadingContext, uri, tool |