Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/cwltool/builder.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 import copy | |
2 import logging | |
3 import math | |
4 from typing import ( | |
5 IO, | |
6 Any, | |
7 Callable, | |
8 Dict, | |
9 List, | |
10 MutableMapping, | |
11 MutableSequence, | |
12 Optional, | |
13 Set, | |
14 Tuple, | |
15 Union, | |
16 cast, | |
17 ) | |
18 | |
19 from rdflib import Graph, URIRef | |
20 from rdflib.namespace import OWL, RDFS | |
21 from ruamel.yaml.comments import CommentedMap | |
22 from schema_salad.avro.schema import Names, Schema, make_avsc_object | |
23 from schema_salad.exceptions import ValidationException | |
24 from schema_salad.sourceline import SourceLine | |
25 from schema_salad.utils import convert_to_dict, json_dumps | |
26 from schema_salad.validate import validate | |
27 from typing_extensions import TYPE_CHECKING, Type # pylint: disable=unused-import | |
28 | |
29 from . import expression | |
30 from .errors import WorkflowException | |
31 from .loghandler import _logger | |
32 from .mutation import MutationManager | |
33 from .software_requirements import DependenciesConfiguration | |
34 from .stdfsaccess import StdFsAccess | |
35 from .utils import ( | |
36 CONTENT_LIMIT, | |
37 CWLObjectType, | |
38 CWLOutputType, | |
39 aslist, | |
40 docker_windows_path_adjust, | |
41 get_listing, | |
42 normalizeFilesDirs, | |
43 onWindows, | |
44 visit_class, | |
45 ) | |
46 | |
47 if TYPE_CHECKING: | |
48 from .pathmapper import PathMapper | |
49 from .provenance_profile import ProvenanceProfile # pylint: disable=unused-import | |
50 | |
51 | |
52 def content_limit_respected_read_bytes(f): # type: (IO[bytes]) -> bytes | |
53 contents = f.read(CONTENT_LIMIT + 1) | |
54 if len(contents) > CONTENT_LIMIT: | |
55 raise WorkflowException( | |
56 "file is too large, loadContents limited to %d bytes" % CONTENT_LIMIT | |
57 ) | |
58 return contents | |
59 | |
60 | |
61 def content_limit_respected_read(f): # type: (IO[bytes]) -> str | |
62 return content_limit_respected_read_bytes(f).decode("utf-8") | |
63 | |
64 | |
65 def substitute(value, replace): # type: (str, str) -> str | |
66 if replace.startswith("^"): | |
67 try: | |
68 return substitute(value[0 : value.rindex(".")], replace[1:]) | |
69 except ValueError: | |
70 # No extension to remove | |
71 return value + replace.lstrip("^") | |
72 return value + replace | |
73 | |
74 | |
75 def formatSubclassOf( | |
76 fmt: str, cls: str, ontology: Optional[Graph], visited: Set[str] | |
77 ) -> bool: | |
78 """Determine if `fmt` is a subclass of `cls`.""" | |
79 if URIRef(fmt) == URIRef(cls): | |
80 return True | |
81 | |
82 if ontology is None: | |
83 return False | |
84 | |
85 if fmt in visited: | |
86 return False | |
87 | |
88 visited.add(fmt) | |
89 | |
90 uriRefFmt = URIRef(fmt) | |
91 | |
92 for _s, _p, o in ontology.triples((uriRefFmt, RDFS.subClassOf, None)): | |
93 # Find parent classes of `fmt` and search upward | |
94 if formatSubclassOf(o, cls, ontology, visited): | |
95 return True | |
96 | |
97 for _s, _p, o in ontology.triples((uriRefFmt, OWL.equivalentClass, None)): | |
98 # Find equivalent classes of `fmt` and search horizontally | |
99 if formatSubclassOf(o, cls, ontology, visited): | |
100 return True | |
101 | |
102 for s, _p, _o in ontology.triples((None, OWL.equivalentClass, uriRefFmt)): | |
103 # Find equivalent classes of `fmt` and search horizontally | |
104 if formatSubclassOf(s, cls, ontology, visited): | |
105 return True | |
106 | |
107 return False | |
108 | |
109 | |
110 def check_format( | |
111 actual_file: Union[CWLObjectType, List[CWLObjectType]], | |
112 input_formats: Union[List[str], str], | |
113 ontology: Optional[Graph], | |
114 ) -> None: | |
115 """Confirm that the format present is valid for the allowed formats.""" | |
116 for afile in aslist(actual_file): | |
117 if not afile: | |
118 continue | |
119 if "format" not in afile: | |
120 raise ValidationException( | |
121 "File has no 'format' defined: {}".format(json_dumps(afile, indent=4)) | |
122 ) | |
123 for inpf in aslist(input_formats): | |
124 if afile["format"] == inpf or formatSubclassOf( | |
125 afile["format"], inpf, ontology, set() | |
126 ): | |
127 return | |
128 raise ValidationException( | |
129 "File has an incompatible format: {}".format(json_dumps(afile, indent=4)) | |
130 ) | |
131 | |
132 | |
133 class HasReqsHints: | |
134 """Base class for get_requirement().""" | |
135 | |
136 def __init__(self) -> None: | |
137 """Initialize this reqs decorator.""" | |
138 self.requirements = [] # type: List[CWLObjectType] | |
139 self.hints = [] # type: List[CWLObjectType] | |
140 | |
141 def get_requirement( | |
142 self, feature: str | |
143 ) -> Tuple[Optional[CWLObjectType], Optional[bool]]: | |
144 for item in reversed(self.requirements): | |
145 if item["class"] == feature: | |
146 return (item, True) | |
147 for item in reversed(self.hints): | |
148 if item["class"] == feature: | |
149 return (item, False) | |
150 return (None, None) | |
151 | |
152 | |
153 class Builder(HasReqsHints): | |
154 def __init__( | |
155 self, | |
156 job: CWLObjectType, | |
157 files: List[CWLObjectType], | |
158 bindings: List[CWLObjectType], | |
159 schemaDefs: MutableMapping[str, CWLObjectType], | |
160 names: Names, | |
161 requirements: List[CWLObjectType], | |
162 hints: List[CWLObjectType], | |
163 resources: Dict[str, Union[int, float, str]], | |
164 mutation_manager: Optional[MutationManager], | |
165 formatgraph: Optional[Graph], | |
166 make_fs_access: Type[StdFsAccess], | |
167 fs_access: StdFsAccess, | |
168 job_script_provider: Optional[DependenciesConfiguration], | |
169 timeout: float, | |
170 debug: bool, | |
171 js_console: bool, | |
172 force_docker_pull: bool, | |
173 loadListing: str, | |
174 outdir: str, | |
175 tmpdir: str, | |
176 stagedir: str, | |
177 cwlVersion: str, | |
178 ) -> None: | |
179 """Initialize this Builder.""" | |
180 self.job = job | |
181 self.files = files | |
182 self.bindings = bindings | |
183 self.schemaDefs = schemaDefs | |
184 self.names = names | |
185 self.requirements = requirements | |
186 self.hints = hints | |
187 self.resources = resources | |
188 self.mutation_manager = mutation_manager | |
189 self.formatgraph = formatgraph | |
190 | |
191 self.make_fs_access = make_fs_access | |
192 self.fs_access = fs_access | |
193 | |
194 self.job_script_provider = job_script_provider | |
195 | |
196 self.timeout = timeout | |
197 | |
198 self.debug = debug | |
199 self.js_console = js_console | |
200 self.force_docker_pull = force_docker_pull | |
201 | |
202 # One of "no_listing", "shallow_listing", "deep_listing" | |
203 self.loadListing = loadListing | |
204 | |
205 self.outdir = outdir | |
206 self.tmpdir = tmpdir | |
207 self.stagedir = stagedir | |
208 | |
209 self.cwlVersion = cwlVersion | |
210 | |
211 self.pathmapper = None # type: Optional[PathMapper] | |
212 self.prov_obj = None # type: Optional[ProvenanceProfile] | |
213 self.find_default_container = None # type: Optional[Callable[[], str]] | |
214 | |
215 def build_job_script(self, commands: List[str]) -> Optional[str]: | |
216 if self.job_script_provider is not None: | |
217 return self.job_script_provider.build_job_script(self, commands) | |
218 return None | |
219 | |
220 def bind_input( | |
221 self, | |
222 schema: CWLObjectType, | |
223 datum: Union[CWLObjectType, List[CWLObjectType]], | |
224 discover_secondaryFiles: bool, | |
225 lead_pos: Optional[Union[int, List[int]]] = None, | |
226 tail_pos: Optional[Union[str, List[int]]] = None, | |
227 ) -> List[MutableMapping[str, Union[str, List[int]]]]: | |
228 | |
229 if tail_pos is None: | |
230 tail_pos = [] | |
231 if lead_pos is None: | |
232 lead_pos = [] | |
233 | |
234 bindings = [] # type: List[MutableMapping[str, Union[str, List[int]]]] | |
235 binding = ( | |
236 {} | |
237 ) # type: Union[MutableMapping[str, Union[str, List[int]]], CommentedMap] | |
238 value_from_expression = False | |
239 if "inputBinding" in schema and isinstance( | |
240 schema["inputBinding"], MutableMapping | |
241 ): | |
242 binding = CommentedMap(schema["inputBinding"].items()) | |
243 | |
244 bp = list(aslist(lead_pos)) | |
245 if "position" in binding: | |
246 position = binding["position"] | |
247 if isinstance(position, str): # no need to test the CWL Version | |
248 # the schema for v1.0 only allow ints | |
249 binding["position"] = self.do_eval(position, context=datum) | |
250 bp.append(binding["position"]) | |
251 else: | |
252 bp.extend(aslist(binding["position"])) | |
253 else: | |
254 bp.append(0) | |
255 bp.extend(aslist(tail_pos)) | |
256 binding["position"] = bp | |
257 | |
258 binding["datum"] = datum | |
259 if "valueFrom" in binding: | |
260 value_from_expression = True | |
261 | |
262 # Handle union types | |
263 if isinstance(schema["type"], MutableSequence): | |
264 bound_input = False | |
265 for t in schema["type"]: | |
266 avsc = None # type: Optional[Schema] | |
267 if isinstance(t, str) and self.names.has_name(t, None): | |
268 avsc = self.names.get_name(t, None) | |
269 elif ( | |
270 isinstance(t, MutableMapping) | |
271 and "name" in t | |
272 and self.names.has_name(cast(str, t["name"]), None) | |
273 ): | |
274 avsc = self.names.get_name(cast(str, t["name"]), None) | |
275 if not avsc: | |
276 avsc = make_avsc_object(convert_to_dict(t), self.names) | |
277 if validate(avsc, datum): | |
278 schema = copy.deepcopy(schema) | |
279 schema["type"] = t | |
280 if not value_from_expression: | |
281 return self.bind_input( | |
282 schema, | |
283 datum, | |
284 lead_pos=lead_pos, | |
285 tail_pos=tail_pos, | |
286 discover_secondaryFiles=discover_secondaryFiles, | |
287 ) | |
288 else: | |
289 self.bind_input( | |
290 schema, | |
291 datum, | |
292 lead_pos=lead_pos, | |
293 tail_pos=tail_pos, | |
294 discover_secondaryFiles=discover_secondaryFiles, | |
295 ) | |
296 bound_input = True | |
297 if not bound_input: | |
298 raise ValidationException( | |
299 "'{}' is not a valid union {}".format(datum, schema["type"]) | |
300 ) | |
301 elif isinstance(schema["type"], MutableMapping): | |
302 st = copy.deepcopy(schema["type"]) | |
303 if ( | |
304 binding | |
305 and "inputBinding" not in st | |
306 and "type" in st | |
307 and st["type"] == "array" | |
308 and "itemSeparator" not in binding | |
309 ): | |
310 st["inputBinding"] = {} | |
311 for k in ("secondaryFiles", "format", "streamable"): | |
312 if k in schema: | |
313 st[k] = schema[k] | |
314 if value_from_expression: | |
315 self.bind_input( | |
316 st, | |
317 datum, | |
318 lead_pos=lead_pos, | |
319 tail_pos=tail_pos, | |
320 discover_secondaryFiles=discover_secondaryFiles, | |
321 ) | |
322 else: | |
323 bindings.extend( | |
324 self.bind_input( | |
325 st, | |
326 datum, | |
327 lead_pos=lead_pos, | |
328 tail_pos=tail_pos, | |
329 discover_secondaryFiles=discover_secondaryFiles, | |
330 ) | |
331 ) | |
332 else: | |
333 if schema["type"] in self.schemaDefs: | |
334 schema = self.schemaDefs[cast(str, schema["type"])] | |
335 | |
336 if schema["type"] == "record": | |
337 datum = cast(CWLObjectType, datum) | |
338 for f in cast(List[CWLObjectType], schema["fields"]): | |
339 name = cast(str, f["name"]) | |
340 if name in datum and datum[name] is not None: | |
341 bindings.extend( | |
342 self.bind_input( | |
343 f, | |
344 cast(CWLObjectType, datum[name]), | |
345 lead_pos=lead_pos, | |
346 tail_pos=name, | |
347 discover_secondaryFiles=discover_secondaryFiles, | |
348 ) | |
349 ) | |
350 else: | |
351 datum[name] = f.get("default") | |
352 | |
353 if schema["type"] == "array": | |
354 for n, item in enumerate(cast(MutableSequence[CWLObjectType], datum)): | |
355 b2 = None | |
356 if binding: | |
357 b2 = cast(CWLObjectType, copy.deepcopy(binding)) | |
358 b2["datum"] = item | |
359 itemschema = { | |
360 "type": schema["items"], | |
361 "inputBinding": b2, | |
362 } # type: CWLObjectType | |
363 for k in ("secondaryFiles", "format", "streamable"): | |
364 if k in schema: | |
365 itemschema[k] = schema[k] | |
366 bindings.extend( | |
367 self.bind_input( | |
368 itemschema, | |
369 item, | |
370 lead_pos=n, | |
371 tail_pos=tail_pos, | |
372 discover_secondaryFiles=discover_secondaryFiles, | |
373 ) | |
374 ) | |
375 binding = {} | |
376 | |
377 def _capture_files(f: CWLObjectType) -> CWLObjectType: | |
378 self.files.append(f) | |
379 return f | |
380 | |
381 if schema["type"] == "File": | |
382 datum = cast(CWLObjectType, datum) | |
383 self.files.append(datum) | |
384 | |
385 loadContents_sourceline = ( | |
386 None | |
387 ) # type: Union[None, MutableMapping[str, Union[str, List[int]]], CWLObjectType] | |
388 if binding and binding.get("loadContents"): | |
389 loadContents_sourceline = binding | |
390 elif schema.get("loadContents"): | |
391 loadContents_sourceline = schema | |
392 | |
393 if loadContents_sourceline and loadContents_sourceline["loadContents"]: | |
394 with SourceLine( | |
395 loadContents_sourceline, "loadContents", WorkflowException | |
396 ): | |
397 try: | |
398 with self.fs_access.open( | |
399 cast(str, datum["location"]), "rb" | |
400 ) as f2: | |
401 datum["contents"] = content_limit_respected_read(f2) | |
402 except Exception as e: | |
403 raise Exception( | |
404 "Reading {}\n{}".format(datum["location"], e) | |
405 ) | |
406 | |
407 if "secondaryFiles" in schema: | |
408 if "secondaryFiles" not in datum: | |
409 datum["secondaryFiles"] = [] | |
410 for sf in aslist(schema["secondaryFiles"]): | |
411 if "required" in sf: | |
412 sf_required = self.do_eval(sf["required"], context=datum) | |
413 else: | |
414 sf_required = True | |
415 | |
416 if "$(" in sf["pattern"] or "${" in sf["pattern"]: | |
417 sfpath = self.do_eval(sf["pattern"], context=datum) | |
418 else: | |
419 sfpath = substitute( | |
420 cast(str, datum["basename"]), sf["pattern"] | |
421 ) | |
422 | |
423 for sfname in aslist(sfpath): | |
424 if not sfname: | |
425 continue | |
426 found = False | |
427 | |
428 if isinstance(sfname, str): | |
429 d_location = cast(str, datum["location"]) | |
430 if "/" in d_location: | |
431 sf_location = ( | |
432 d_location[0 : d_location.rindex("/") + 1] | |
433 + sfname | |
434 ) | |
435 else: | |
436 sf_location = d_location + sfname | |
437 sfbasename = sfname | |
438 elif isinstance(sfname, MutableMapping): | |
439 sf_location = sfname["location"] | |
440 sfbasename = sfname["basename"] | |
441 else: | |
442 raise WorkflowException( | |
443 "Expected secondaryFile expression to return type 'str' or 'MutableMapping', received '%s'" | |
444 % (type(sfname)) | |
445 ) | |
446 | |
447 for d in cast( | |
448 MutableSequence[MutableMapping[str, str]], | |
449 datum["secondaryFiles"], | |
450 ): | |
451 if not d.get("basename"): | |
452 d["basename"] = d["location"][ | |
453 d["location"].rindex("/") + 1 : | |
454 ] | |
455 if d["basename"] == sfbasename: | |
456 found = True | |
457 | |
458 if not found: | |
459 | |
460 def addsf( | |
461 files: MutableSequence[CWLObjectType], | |
462 newsf: CWLObjectType, | |
463 ) -> None: | |
464 for f in files: | |
465 if f["location"] == newsf["location"]: | |
466 f["basename"] = newsf["basename"] | |
467 return | |
468 files.append(newsf) | |
469 | |
470 if isinstance(sfname, MutableMapping): | |
471 addsf( | |
472 cast( | |
473 MutableSequence[CWLObjectType], | |
474 datum["secondaryFiles"], | |
475 ), | |
476 sfname, | |
477 ) | |
478 elif discover_secondaryFiles and self.fs_access.exists( | |
479 sf_location | |
480 ): | |
481 addsf( | |
482 cast( | |
483 MutableSequence[CWLObjectType], | |
484 datum["secondaryFiles"], | |
485 ), | |
486 { | |
487 "location": sf_location, | |
488 "basename": sfname, | |
489 "class": "File", | |
490 }, | |
491 ) | |
492 elif sf_required: | |
493 raise WorkflowException( | |
494 "Missing required secondary file '%s' from file object: %s" | |
495 % (sfname, json_dumps(datum, indent=4)) | |
496 ) | |
497 | |
498 normalizeFilesDirs( | |
499 cast(MutableSequence[CWLObjectType], datum["secondaryFiles"]) | |
500 ) | |
501 | |
502 if "format" in schema: | |
503 try: | |
504 check_format( | |
505 datum, | |
506 cast(Union[List[str], str], self.do_eval(schema["format"])), | |
507 self.formatgraph, | |
508 ) | |
509 except ValidationException as ve: | |
510 raise WorkflowException( | |
511 "Expected value of '%s' to have format %s but\n " | |
512 " %s" % (schema["name"], schema["format"], ve) | |
513 ) from ve | |
514 | |
515 visit_class( | |
516 datum.get("secondaryFiles", []), | |
517 ("File", "Directory"), | |
518 _capture_files, | |
519 ) | |
520 | |
521 if schema["type"] == "Directory": | |
522 datum = cast(CWLObjectType, datum) | |
523 ll = schema.get("loadListing") or self.loadListing | |
524 if ll and ll != "no_listing": | |
525 get_listing( | |
526 self.fs_access, | |
527 datum, | |
528 (ll == "deep_listing"), | |
529 ) | |
530 self.files.append(datum) | |
531 | |
532 if schema["type"] == "Any": | |
533 visit_class(datum, ("File", "Directory"), _capture_files) | |
534 | |
535 # Position to front of the sort key | |
536 if binding: | |
537 for bi in bindings: | |
538 bi["position"] = cast(List[int], binding["position"]) + cast( | |
539 List[int], bi["position"] | |
540 ) | |
541 bindings.append(binding) | |
542 | |
543 return bindings | |
544 | |
545 def tostr(self, value: Union[MutableMapping[str, str], Any]) -> str: | |
546 if isinstance(value, MutableMapping) and value.get("class") in ( | |
547 "File", | |
548 "Directory", | |
549 ): | |
550 if "path" not in value: | |
551 raise WorkflowException( | |
552 '{} object missing "path": {}'.format(value["class"], value) | |
553 ) | |
554 | |
555 # Path adjust for windows file path when passing to docker, docker accepts unix like path only | |
556 (docker_req, docker_is_req) = self.get_requirement("DockerRequirement") | |
557 if onWindows() and docker_req is not None: | |
558 # docker_req is none only when there is no dockerRequirement | |
559 # mentioned in hints and Requirement | |
560 path = docker_windows_path_adjust(value["path"]) | |
561 return path | |
562 return value["path"] | |
563 else: | |
564 return str(value) | |
565 | |
566 def generate_arg(self, binding: CWLObjectType) -> List[str]: | |
567 value = binding.get("datum") | |
568 if "valueFrom" in binding: | |
569 with SourceLine( | |
570 binding, | |
571 "valueFrom", | |
572 WorkflowException, | |
573 _logger.isEnabledFor(logging.DEBUG), | |
574 ): | |
575 value = self.do_eval(cast(str, binding["valueFrom"]), context=value) | |
576 | |
577 prefix = cast(Optional[str], binding.get("prefix")) | |
578 sep = binding.get("separate", True) | |
579 if prefix is None and not sep: | |
580 with SourceLine( | |
581 binding, | |
582 "separate", | |
583 WorkflowException, | |
584 _logger.isEnabledFor(logging.DEBUG), | |
585 ): | |
586 raise WorkflowException( | |
587 "'separate' option can not be specified without prefix" | |
588 ) | |
589 | |
590 argl = [] # type: MutableSequence[CWLOutputType] | |
591 if isinstance(value, MutableSequence): | |
592 if binding.get("itemSeparator") and value: | |
593 itemSeparator = cast(str, binding["itemSeparator"]) | |
594 argl = [itemSeparator.join([self.tostr(v) for v in value])] | |
595 elif binding.get("valueFrom"): | |
596 value = [self.tostr(v) for v in value] | |
597 return cast(List[str], ([prefix] if prefix else [])) + cast( | |
598 List[str], value | |
599 ) | |
600 elif prefix and value: | |
601 return [prefix] | |
602 else: | |
603 return [] | |
604 elif isinstance(value, MutableMapping) and value.get("class") in ( | |
605 "File", | |
606 "Directory", | |
607 ): | |
608 argl = cast(MutableSequence[CWLOutputType], [value]) | |
609 elif isinstance(value, MutableMapping): | |
610 return [prefix] if prefix else [] | |
611 elif value is True and prefix: | |
612 return [prefix] | |
613 elif value is False or value is None or (value is True and not prefix): | |
614 return [] | |
615 else: | |
616 argl = [value] | |
617 | |
618 args = [] | |
619 for j in argl: | |
620 if sep: | |
621 args.extend([prefix, self.tostr(j)]) | |
622 else: | |
623 args.append(self.tostr(j) if prefix is None else prefix + self.tostr(j)) | |
624 | |
625 return [a for a in args if a is not None] | |
626 | |
627 def do_eval( | |
628 self, | |
629 ex: Optional[CWLOutputType], | |
630 context: Optional[Any] = None, | |
631 recursive: bool = False, | |
632 strip_whitespace: bool = True, | |
633 ) -> Optional[CWLOutputType]: | |
634 if recursive: | |
635 if isinstance(ex, MutableMapping): | |
636 return {k: self.do_eval(v, context, recursive) for k, v in ex.items()} | |
637 if isinstance(ex, MutableSequence): | |
638 return [self.do_eval(v, context, recursive) for v in ex] | |
639 | |
640 resources = self.resources | |
641 if self.resources and "cores" in self.resources: | |
642 cores = resources["cores"] | |
643 if not isinstance(cores, str): | |
644 resources = copy.copy(resources) | |
645 resources["cores"] = int(math.ceil(cores)) | |
646 | |
647 return expression.do_eval( | |
648 ex, | |
649 self.job, | |
650 self.requirements, | |
651 self.outdir, | |
652 self.tmpdir, | |
653 resources, | |
654 context=context, | |
655 timeout=self.timeout, | |
656 debug=self.debug, | |
657 js_console=self.js_console, | |
658 force_docker_pull=self.force_docker_pull, | |
659 strip_whitespace=strip_whitespace, | |
660 cwlVersion=self.cwlVersion, | |
661 ) |