comparison env/lib/python3.9/site-packages/cwltool/builder.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 import copy
2 import logging
3 import math
4 from typing import (
5 IO,
6 Any,
7 Callable,
8 Dict,
9 List,
10 MutableMapping,
11 MutableSequence,
12 Optional,
13 Set,
14 Tuple,
15 Union,
16 cast,
17 )
18
19 from rdflib import Graph, URIRef
20 from rdflib.namespace import OWL, RDFS
21 from ruamel.yaml.comments import CommentedMap
22 from schema_salad.avro.schema import Names, Schema, make_avsc_object
23 from schema_salad.exceptions import ValidationException
24 from schema_salad.sourceline import SourceLine
25 from schema_salad.utils import convert_to_dict, json_dumps
26 from schema_salad.validate import validate
27 from typing_extensions import TYPE_CHECKING, Type # pylint: disable=unused-import
28
29 from . import expression
30 from .errors import WorkflowException
31 from .loghandler import _logger
32 from .mutation import MutationManager
33 from .software_requirements import DependenciesConfiguration
34 from .stdfsaccess import StdFsAccess
35 from .utils import (
36 CONTENT_LIMIT,
37 CWLObjectType,
38 CWLOutputType,
39 aslist,
40 docker_windows_path_adjust,
41 get_listing,
42 normalizeFilesDirs,
43 onWindows,
44 visit_class,
45 )
46
47 if TYPE_CHECKING:
48 from .pathmapper import PathMapper
49 from .provenance_profile import ProvenanceProfile # pylint: disable=unused-import
50
51
52 def content_limit_respected_read_bytes(f): # type: (IO[bytes]) -> bytes
53 contents = f.read(CONTENT_LIMIT + 1)
54 if len(contents) > CONTENT_LIMIT:
55 raise WorkflowException(
56 "file is too large, loadContents limited to %d bytes" % CONTENT_LIMIT
57 )
58 return contents
59
60
61 def content_limit_respected_read(f): # type: (IO[bytes]) -> str
62 return content_limit_respected_read_bytes(f).decode("utf-8")
63
64
65 def substitute(value, replace): # type: (str, str) -> str
66 if replace.startswith("^"):
67 try:
68 return substitute(value[0 : value.rindex(".")], replace[1:])
69 except ValueError:
70 # No extension to remove
71 return value + replace.lstrip("^")
72 return value + replace
73
74
75 def formatSubclassOf(
76 fmt: str, cls: str, ontology: Optional[Graph], visited: Set[str]
77 ) -> bool:
78 """Determine if `fmt` is a subclass of `cls`."""
79 if URIRef(fmt) == URIRef(cls):
80 return True
81
82 if ontology is None:
83 return False
84
85 if fmt in visited:
86 return False
87
88 visited.add(fmt)
89
90 uriRefFmt = URIRef(fmt)
91
92 for _s, _p, o in ontology.triples((uriRefFmt, RDFS.subClassOf, None)):
93 # Find parent classes of `fmt` and search upward
94 if formatSubclassOf(o, cls, ontology, visited):
95 return True
96
97 for _s, _p, o in ontology.triples((uriRefFmt, OWL.equivalentClass, None)):
98 # Find equivalent classes of `fmt` and search horizontally
99 if formatSubclassOf(o, cls, ontology, visited):
100 return True
101
102 for s, _p, _o in ontology.triples((None, OWL.equivalentClass, uriRefFmt)):
103 # Find equivalent classes of `fmt` and search horizontally
104 if formatSubclassOf(s, cls, ontology, visited):
105 return True
106
107 return False
108
109
110 def check_format(
111 actual_file: Union[CWLObjectType, List[CWLObjectType]],
112 input_formats: Union[List[str], str],
113 ontology: Optional[Graph],
114 ) -> None:
115 """Confirm that the format present is valid for the allowed formats."""
116 for afile in aslist(actual_file):
117 if not afile:
118 continue
119 if "format" not in afile:
120 raise ValidationException(
121 "File has no 'format' defined: {}".format(json_dumps(afile, indent=4))
122 )
123 for inpf in aslist(input_formats):
124 if afile["format"] == inpf or formatSubclassOf(
125 afile["format"], inpf, ontology, set()
126 ):
127 return
128 raise ValidationException(
129 "File has an incompatible format: {}".format(json_dumps(afile, indent=4))
130 )
131
132
133 class HasReqsHints:
134 """Base class for get_requirement()."""
135
136 def __init__(self) -> None:
137 """Initialize this reqs decorator."""
138 self.requirements = [] # type: List[CWLObjectType]
139 self.hints = [] # type: List[CWLObjectType]
140
141 def get_requirement(
142 self, feature: str
143 ) -> Tuple[Optional[CWLObjectType], Optional[bool]]:
144 for item in reversed(self.requirements):
145 if item["class"] == feature:
146 return (item, True)
147 for item in reversed(self.hints):
148 if item["class"] == feature:
149 return (item, False)
150 return (None, None)
151
152
153 class Builder(HasReqsHints):
154 def __init__(
155 self,
156 job: CWLObjectType,
157 files: List[CWLObjectType],
158 bindings: List[CWLObjectType],
159 schemaDefs: MutableMapping[str, CWLObjectType],
160 names: Names,
161 requirements: List[CWLObjectType],
162 hints: List[CWLObjectType],
163 resources: Dict[str, Union[int, float, str]],
164 mutation_manager: Optional[MutationManager],
165 formatgraph: Optional[Graph],
166 make_fs_access: Type[StdFsAccess],
167 fs_access: StdFsAccess,
168 job_script_provider: Optional[DependenciesConfiguration],
169 timeout: float,
170 debug: bool,
171 js_console: bool,
172 force_docker_pull: bool,
173 loadListing: str,
174 outdir: str,
175 tmpdir: str,
176 stagedir: str,
177 cwlVersion: str,
178 ) -> None:
179 """Initialize this Builder."""
180 self.job = job
181 self.files = files
182 self.bindings = bindings
183 self.schemaDefs = schemaDefs
184 self.names = names
185 self.requirements = requirements
186 self.hints = hints
187 self.resources = resources
188 self.mutation_manager = mutation_manager
189 self.formatgraph = formatgraph
190
191 self.make_fs_access = make_fs_access
192 self.fs_access = fs_access
193
194 self.job_script_provider = job_script_provider
195
196 self.timeout = timeout
197
198 self.debug = debug
199 self.js_console = js_console
200 self.force_docker_pull = force_docker_pull
201
202 # One of "no_listing", "shallow_listing", "deep_listing"
203 self.loadListing = loadListing
204
205 self.outdir = outdir
206 self.tmpdir = tmpdir
207 self.stagedir = stagedir
208
209 self.cwlVersion = cwlVersion
210
211 self.pathmapper = None # type: Optional[PathMapper]
212 self.prov_obj = None # type: Optional[ProvenanceProfile]
213 self.find_default_container = None # type: Optional[Callable[[], str]]
214
215 def build_job_script(self, commands: List[str]) -> Optional[str]:
216 if self.job_script_provider is not None:
217 return self.job_script_provider.build_job_script(self, commands)
218 return None
219
220 def bind_input(
221 self,
222 schema: CWLObjectType,
223 datum: Union[CWLObjectType, List[CWLObjectType]],
224 discover_secondaryFiles: bool,
225 lead_pos: Optional[Union[int, List[int]]] = None,
226 tail_pos: Optional[Union[str, List[int]]] = None,
227 ) -> List[MutableMapping[str, Union[str, List[int]]]]:
228
229 if tail_pos is None:
230 tail_pos = []
231 if lead_pos is None:
232 lead_pos = []
233
234 bindings = [] # type: List[MutableMapping[str, Union[str, List[int]]]]
235 binding = (
236 {}
237 ) # type: Union[MutableMapping[str, Union[str, List[int]]], CommentedMap]
238 value_from_expression = False
239 if "inputBinding" in schema and isinstance(
240 schema["inputBinding"], MutableMapping
241 ):
242 binding = CommentedMap(schema["inputBinding"].items())
243
244 bp = list(aslist(lead_pos))
245 if "position" in binding:
246 position = binding["position"]
247 if isinstance(position, str): # no need to test the CWL Version
248 # the schema for v1.0 only allow ints
249 binding["position"] = self.do_eval(position, context=datum)
250 bp.append(binding["position"])
251 else:
252 bp.extend(aslist(binding["position"]))
253 else:
254 bp.append(0)
255 bp.extend(aslist(tail_pos))
256 binding["position"] = bp
257
258 binding["datum"] = datum
259 if "valueFrom" in binding:
260 value_from_expression = True
261
262 # Handle union types
263 if isinstance(schema["type"], MutableSequence):
264 bound_input = False
265 for t in schema["type"]:
266 avsc = None # type: Optional[Schema]
267 if isinstance(t, str) and self.names.has_name(t, None):
268 avsc = self.names.get_name(t, None)
269 elif (
270 isinstance(t, MutableMapping)
271 and "name" in t
272 and self.names.has_name(cast(str, t["name"]), None)
273 ):
274 avsc = self.names.get_name(cast(str, t["name"]), None)
275 if not avsc:
276 avsc = make_avsc_object(convert_to_dict(t), self.names)
277 if validate(avsc, datum):
278 schema = copy.deepcopy(schema)
279 schema["type"] = t
280 if not value_from_expression:
281 return self.bind_input(
282 schema,
283 datum,
284 lead_pos=lead_pos,
285 tail_pos=tail_pos,
286 discover_secondaryFiles=discover_secondaryFiles,
287 )
288 else:
289 self.bind_input(
290 schema,
291 datum,
292 lead_pos=lead_pos,
293 tail_pos=tail_pos,
294 discover_secondaryFiles=discover_secondaryFiles,
295 )
296 bound_input = True
297 if not bound_input:
298 raise ValidationException(
299 "'{}' is not a valid union {}".format(datum, schema["type"])
300 )
301 elif isinstance(schema["type"], MutableMapping):
302 st = copy.deepcopy(schema["type"])
303 if (
304 binding
305 and "inputBinding" not in st
306 and "type" in st
307 and st["type"] == "array"
308 and "itemSeparator" not in binding
309 ):
310 st["inputBinding"] = {}
311 for k in ("secondaryFiles", "format", "streamable"):
312 if k in schema:
313 st[k] = schema[k]
314 if value_from_expression:
315 self.bind_input(
316 st,
317 datum,
318 lead_pos=lead_pos,
319 tail_pos=tail_pos,
320 discover_secondaryFiles=discover_secondaryFiles,
321 )
322 else:
323 bindings.extend(
324 self.bind_input(
325 st,
326 datum,
327 lead_pos=lead_pos,
328 tail_pos=tail_pos,
329 discover_secondaryFiles=discover_secondaryFiles,
330 )
331 )
332 else:
333 if schema["type"] in self.schemaDefs:
334 schema = self.schemaDefs[cast(str, schema["type"])]
335
336 if schema["type"] == "record":
337 datum = cast(CWLObjectType, datum)
338 for f in cast(List[CWLObjectType], schema["fields"]):
339 name = cast(str, f["name"])
340 if name in datum and datum[name] is not None:
341 bindings.extend(
342 self.bind_input(
343 f,
344 cast(CWLObjectType, datum[name]),
345 lead_pos=lead_pos,
346 tail_pos=name,
347 discover_secondaryFiles=discover_secondaryFiles,
348 )
349 )
350 else:
351 datum[name] = f.get("default")
352
353 if schema["type"] == "array":
354 for n, item in enumerate(cast(MutableSequence[CWLObjectType], datum)):
355 b2 = None
356 if binding:
357 b2 = cast(CWLObjectType, copy.deepcopy(binding))
358 b2["datum"] = item
359 itemschema = {
360 "type": schema["items"],
361 "inputBinding": b2,
362 } # type: CWLObjectType
363 for k in ("secondaryFiles", "format", "streamable"):
364 if k in schema:
365 itemschema[k] = schema[k]
366 bindings.extend(
367 self.bind_input(
368 itemschema,
369 item,
370 lead_pos=n,
371 tail_pos=tail_pos,
372 discover_secondaryFiles=discover_secondaryFiles,
373 )
374 )
375 binding = {}
376
377 def _capture_files(f: CWLObjectType) -> CWLObjectType:
378 self.files.append(f)
379 return f
380
381 if schema["type"] == "File":
382 datum = cast(CWLObjectType, datum)
383 self.files.append(datum)
384
385 loadContents_sourceline = (
386 None
387 ) # type: Union[None, MutableMapping[str, Union[str, List[int]]], CWLObjectType]
388 if binding and binding.get("loadContents"):
389 loadContents_sourceline = binding
390 elif schema.get("loadContents"):
391 loadContents_sourceline = schema
392
393 if loadContents_sourceline and loadContents_sourceline["loadContents"]:
394 with SourceLine(
395 loadContents_sourceline, "loadContents", WorkflowException
396 ):
397 try:
398 with self.fs_access.open(
399 cast(str, datum["location"]), "rb"
400 ) as f2:
401 datum["contents"] = content_limit_respected_read(f2)
402 except Exception as e:
403 raise Exception(
404 "Reading {}\n{}".format(datum["location"], e)
405 )
406
407 if "secondaryFiles" in schema:
408 if "secondaryFiles" not in datum:
409 datum["secondaryFiles"] = []
410 for sf in aslist(schema["secondaryFiles"]):
411 if "required" in sf:
412 sf_required = self.do_eval(sf["required"], context=datum)
413 else:
414 sf_required = True
415
416 if "$(" in sf["pattern"] or "${" in sf["pattern"]:
417 sfpath = self.do_eval(sf["pattern"], context=datum)
418 else:
419 sfpath = substitute(
420 cast(str, datum["basename"]), sf["pattern"]
421 )
422
423 for sfname in aslist(sfpath):
424 if not sfname:
425 continue
426 found = False
427
428 if isinstance(sfname, str):
429 d_location = cast(str, datum["location"])
430 if "/" in d_location:
431 sf_location = (
432 d_location[0 : d_location.rindex("/") + 1]
433 + sfname
434 )
435 else:
436 sf_location = d_location + sfname
437 sfbasename = sfname
438 elif isinstance(sfname, MutableMapping):
439 sf_location = sfname["location"]
440 sfbasename = sfname["basename"]
441 else:
442 raise WorkflowException(
443 "Expected secondaryFile expression to return type 'str' or 'MutableMapping', received '%s'"
444 % (type(sfname))
445 )
446
447 for d in cast(
448 MutableSequence[MutableMapping[str, str]],
449 datum["secondaryFiles"],
450 ):
451 if not d.get("basename"):
452 d["basename"] = d["location"][
453 d["location"].rindex("/") + 1 :
454 ]
455 if d["basename"] == sfbasename:
456 found = True
457
458 if not found:
459
460 def addsf(
461 files: MutableSequence[CWLObjectType],
462 newsf: CWLObjectType,
463 ) -> None:
464 for f in files:
465 if f["location"] == newsf["location"]:
466 f["basename"] = newsf["basename"]
467 return
468 files.append(newsf)
469
470 if isinstance(sfname, MutableMapping):
471 addsf(
472 cast(
473 MutableSequence[CWLObjectType],
474 datum["secondaryFiles"],
475 ),
476 sfname,
477 )
478 elif discover_secondaryFiles and self.fs_access.exists(
479 sf_location
480 ):
481 addsf(
482 cast(
483 MutableSequence[CWLObjectType],
484 datum["secondaryFiles"],
485 ),
486 {
487 "location": sf_location,
488 "basename": sfname,
489 "class": "File",
490 },
491 )
492 elif sf_required:
493 raise WorkflowException(
494 "Missing required secondary file '%s' from file object: %s"
495 % (sfname, json_dumps(datum, indent=4))
496 )
497
498 normalizeFilesDirs(
499 cast(MutableSequence[CWLObjectType], datum["secondaryFiles"])
500 )
501
502 if "format" in schema:
503 try:
504 check_format(
505 datum,
506 cast(Union[List[str], str], self.do_eval(schema["format"])),
507 self.formatgraph,
508 )
509 except ValidationException as ve:
510 raise WorkflowException(
511 "Expected value of '%s' to have format %s but\n "
512 " %s" % (schema["name"], schema["format"], ve)
513 ) from ve
514
515 visit_class(
516 datum.get("secondaryFiles", []),
517 ("File", "Directory"),
518 _capture_files,
519 )
520
521 if schema["type"] == "Directory":
522 datum = cast(CWLObjectType, datum)
523 ll = schema.get("loadListing") or self.loadListing
524 if ll and ll != "no_listing":
525 get_listing(
526 self.fs_access,
527 datum,
528 (ll == "deep_listing"),
529 )
530 self.files.append(datum)
531
532 if schema["type"] == "Any":
533 visit_class(datum, ("File", "Directory"), _capture_files)
534
535 # Position to front of the sort key
536 if binding:
537 for bi in bindings:
538 bi["position"] = cast(List[int], binding["position"]) + cast(
539 List[int], bi["position"]
540 )
541 bindings.append(binding)
542
543 return bindings
544
545 def tostr(self, value: Union[MutableMapping[str, str], Any]) -> str:
546 if isinstance(value, MutableMapping) and value.get("class") in (
547 "File",
548 "Directory",
549 ):
550 if "path" not in value:
551 raise WorkflowException(
552 '{} object missing "path": {}'.format(value["class"], value)
553 )
554
555 # Path adjust for windows file path when passing to docker, docker accepts unix like path only
556 (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
557 if onWindows() and docker_req is not None:
558 # docker_req is none only when there is no dockerRequirement
559 # mentioned in hints and Requirement
560 path = docker_windows_path_adjust(value["path"])
561 return path
562 return value["path"]
563 else:
564 return str(value)
565
566 def generate_arg(self, binding: CWLObjectType) -> List[str]:
567 value = binding.get("datum")
568 if "valueFrom" in binding:
569 with SourceLine(
570 binding,
571 "valueFrom",
572 WorkflowException,
573 _logger.isEnabledFor(logging.DEBUG),
574 ):
575 value = self.do_eval(cast(str, binding["valueFrom"]), context=value)
576
577 prefix = cast(Optional[str], binding.get("prefix"))
578 sep = binding.get("separate", True)
579 if prefix is None and not sep:
580 with SourceLine(
581 binding,
582 "separate",
583 WorkflowException,
584 _logger.isEnabledFor(logging.DEBUG),
585 ):
586 raise WorkflowException(
587 "'separate' option can not be specified without prefix"
588 )
589
590 argl = [] # type: MutableSequence[CWLOutputType]
591 if isinstance(value, MutableSequence):
592 if binding.get("itemSeparator") and value:
593 itemSeparator = cast(str, binding["itemSeparator"])
594 argl = [itemSeparator.join([self.tostr(v) for v in value])]
595 elif binding.get("valueFrom"):
596 value = [self.tostr(v) for v in value]
597 return cast(List[str], ([prefix] if prefix else [])) + cast(
598 List[str], value
599 )
600 elif prefix and value:
601 return [prefix]
602 else:
603 return []
604 elif isinstance(value, MutableMapping) and value.get("class") in (
605 "File",
606 "Directory",
607 ):
608 argl = cast(MutableSequence[CWLOutputType], [value])
609 elif isinstance(value, MutableMapping):
610 return [prefix] if prefix else []
611 elif value is True and prefix:
612 return [prefix]
613 elif value is False or value is None or (value is True and not prefix):
614 return []
615 else:
616 argl = [value]
617
618 args = []
619 for j in argl:
620 if sep:
621 args.extend([prefix, self.tostr(j)])
622 else:
623 args.append(self.tostr(j) if prefix is None else prefix + self.tostr(j))
624
625 return [a for a in args if a is not None]
626
627 def do_eval(
628 self,
629 ex: Optional[CWLOutputType],
630 context: Optional[Any] = None,
631 recursive: bool = False,
632 strip_whitespace: bool = True,
633 ) -> Optional[CWLOutputType]:
634 if recursive:
635 if isinstance(ex, MutableMapping):
636 return {k: self.do_eval(v, context, recursive) for k, v in ex.items()}
637 if isinstance(ex, MutableSequence):
638 return [self.do_eval(v, context, recursive) for v in ex]
639
640 resources = self.resources
641 if self.resources and "cores" in self.resources:
642 cores = resources["cores"]
643 if not isinstance(cores, str):
644 resources = copy.copy(resources)
645 resources["cores"] = int(math.ceil(cores))
646
647 return expression.do_eval(
648 ex,
649 self.job,
650 self.requirements,
651 self.outdir,
652 self.tmpdir,
653 resources,
654 context=context,
655 timeout=self.timeout,
656 debug=self.debug,
657 js_console=self.js_console,
658 force_docker_pull=self.force_docker_pull,
659 strip_whitespace=strip_whitespace,
660 cwlVersion=self.cwlVersion,
661 )