comparison env/lib/python3.9/site-packages/cwltool/process.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """Classes and methods relevant for all CWL Proccess types."""
2 import abc
3 import copy
4 import functools
5 import hashlib
6 import json
7 import logging
8 import math
9 import os
10 import shutil
11 import stat
12 import textwrap
13 import urllib
14 import uuid
15 from os import scandir
16 from typing import (
17 Any,
18 Callable,
19 Dict,
20 Iterable,
21 Iterator,
22 List,
23 MutableMapping,
24 MutableSequence,
25 Optional,
26 Set,
27 Sized,
28 Tuple,
29 Type,
30 Union,
31 cast,
32 )
33
34 from pkg_resources import resource_stream
35 from rdflib import Graph
36 from ruamel.yaml.comments import CommentedMap, CommentedSeq
37 from schema_salad.avro.schema import (
38 Names,
39 Schema,
40 SchemaParseException,
41 make_avsc_object,
42 )
43 from schema_salad.exceptions import ValidationException
44 from schema_salad.ref_resolver import Loader, file_uri, uri_file_path
45 from schema_salad.schema import load_schema, make_avro_schema, make_valid_avro
46 from schema_salad.sourceline import SourceLine, strip_dup_lineno
47 from schema_salad.utils import convert_to_dict
48 from schema_salad.validate import validate_ex
49 from typing_extensions import TYPE_CHECKING
50
51 from . import expression
52 from .builder import Builder, HasReqsHints
53 from .context import LoadingContext, RuntimeContext, getdefault
54 from .errors import UnsupportedRequirement, WorkflowException
55 from .loghandler import _logger
56 from .mpi import MPIRequirementName
57 from .pathmapper import MapperEnt, PathMapper
58 from .secrets import SecretStore
59 from .stdfsaccess import StdFsAccess
60 from .update import INTERNAL_VERSION
61 from .utils import (
62 CWLObjectType,
63 CWLOutputAtomType,
64 CWLOutputType,
65 JobsGeneratorType,
66 OutputCallbackType,
67 adjustDirObjs,
68 aslist,
69 cmp_like_py2,
70 copytree_with_merge,
71 ensure_writable,
72 get_listing,
73 normalizeFilesDirs,
74 onWindows,
75 random_outdir,
76 visit_class,
77 )
78 from .validate_js import validate_js_expressions
79
80 if TYPE_CHECKING:
81 from .provenance_profile import ProvenanceProfile # pylint: disable=unused-import
82
83
84 class LogAsDebugFilter(logging.Filter):
85 def __init__(self, name: str, parent: logging.Logger) -> None:
86 """Initialize."""
87 name = str(name)
88 super().__init__(name)
89 self.parent = parent
90
91 def filter(self, record: logging.LogRecord) -> bool:
92 return self.parent.isEnabledFor(logging.DEBUG)
93
94
95 _logger_validation_warnings = logging.getLogger("cwltool.validation_warnings")
96 _logger_validation_warnings.setLevel(_logger.getEffectiveLevel())
97 _logger_validation_warnings.addFilter(
98 LogAsDebugFilter("cwltool.validation_warnings", _logger)
99 )
100
101 supportedProcessRequirements = [
102 "DockerRequirement",
103 "SchemaDefRequirement",
104 "EnvVarRequirement",
105 "ScatterFeatureRequirement",
106 "SubworkflowFeatureRequirement",
107 "MultipleInputFeatureRequirement",
108 "InlineJavascriptRequirement",
109 "ShellCommandRequirement",
110 "StepInputExpressionRequirement",
111 "ResourceRequirement",
112 "InitialWorkDirRequirement",
113 "ToolTimeLimit",
114 "WorkReuse",
115 "NetworkAccess",
116 "InplaceUpdateRequirement",
117 "LoadListingRequirement",
118 MPIRequirementName,
119 "http://commonwl.org/cwltool#TimeLimit",
120 "http://commonwl.org/cwltool#WorkReuse",
121 "http://commonwl.org/cwltool#NetworkAccess",
122 "http://commonwl.org/cwltool#LoadListingRequirement",
123 "http://commonwl.org/cwltool#InplaceUpdateRequirement",
124 ]
125
126 cwl_files = (
127 "Workflow.yml",
128 "CommandLineTool.yml",
129 "CommonWorkflowLanguage.yml",
130 "Process.yml",
131 "Operation.yml",
132 "concepts.md",
133 "contrib.md",
134 "intro.md",
135 "invocation.md",
136 )
137
138 salad_files = (
139 "metaschema.yml",
140 "metaschema_base.yml",
141 "salad.md",
142 "field_name.yml",
143 "import_include.md",
144 "link_res.yml",
145 "ident_res.yml",
146 "vocab_res.yml",
147 "vocab_res.yml",
148 "field_name_schema.yml",
149 "field_name_src.yml",
150 "field_name_proc.yml",
151 "ident_res_schema.yml",
152 "ident_res_src.yml",
153 "ident_res_proc.yml",
154 "link_res_schema.yml",
155 "link_res_src.yml",
156 "link_res_proc.yml",
157 "vocab_res_schema.yml",
158 "vocab_res_src.yml",
159 "vocab_res_proc.yml",
160 )
161
162 SCHEMA_CACHE = (
163 {}
164 ) # type: Dict[str, Tuple[Loader, Union[Names, SchemaParseException], CWLObjectType, Loader]]
165 SCHEMA_FILE = None # type: Optional[CWLObjectType]
166 SCHEMA_DIR = None # type: Optional[CWLObjectType]
167 SCHEMA_ANY = None # type: Optional[CWLObjectType]
168
169 custom_schemas = {} # type: Dict[str, Tuple[str, str]]
170
171
172 def use_standard_schema(version: str) -> None:
173 if version in custom_schemas:
174 del custom_schemas[version]
175 if version in SCHEMA_CACHE:
176 del SCHEMA_CACHE[version]
177
178
179 def use_custom_schema(version: str, name: str, text: str) -> None:
180 custom_schemas[version] = (name, text)
181 if version in SCHEMA_CACHE:
182 del SCHEMA_CACHE[version]
183
184
185 def get_schema(
186 version: str,
187 ) -> Tuple[Loader, Union[Names, SchemaParseException], CWLObjectType, Loader]:
188
189 if version in SCHEMA_CACHE:
190 return SCHEMA_CACHE[version]
191
192 cache = {} # type: Dict[str, Union[str, Graph, bool]]
193 version = version.split("#")[-1]
194 if ".dev" in version:
195 version = ".".join(version.split(".")[:-1])
196 for f in cwl_files:
197 try:
198 res = resource_stream(__name__, f"schemas/{version}/{f}")
199 cache["https://w3id.org/cwl/" + f] = res.read().decode("UTF-8")
200 res.close()
201 except OSError:
202 pass
203
204 for f in salad_files:
205 try:
206 res = resource_stream(
207 __name__,
208 f"schemas/{version}/salad/schema_salad/metaschema/{f}",
209 )
210 cache[
211 "https://w3id.org/cwl/salad/schema_salad/metaschema/" + f
212 ] = res.read().decode("UTF-8")
213 res.close()
214 except OSError:
215 pass
216
217 if version in custom_schemas:
218 cache[custom_schemas[version][0]] = custom_schemas[version][1]
219 SCHEMA_CACHE[version] = load_schema(custom_schemas[version][0], cache=cache)
220 else:
221 SCHEMA_CACHE[version] = load_schema(
222 "https://w3id.org/cwl/CommonWorkflowLanguage.yml", cache=cache
223 )
224
225 return SCHEMA_CACHE[version]
226
227
228 def shortname(inputid: str) -> str:
229 d = urllib.parse.urlparse(inputid)
230 if d.fragment:
231 return d.fragment.split("/")[-1]
232 return d.path.split("/")[-1]
233
234
235 def checkRequirements(
236 rec: Union[MutableSequence[CWLObjectType], CWLObjectType, CWLOutputType, None],
237 supported_process_requirements: Iterable[str],
238 ) -> None:
239 if isinstance(rec, MutableMapping):
240 if "requirements" in rec:
241 for i, entry in enumerate(
242 cast(MutableSequence[CWLObjectType], rec["requirements"])
243 ):
244 with SourceLine(rec["requirements"], i, UnsupportedRequirement):
245 if cast(str, entry["class"]) not in supported_process_requirements:
246 raise UnsupportedRequirement(
247 "Unsupported requirement {}".format(entry["class"])
248 )
249 for key in rec:
250 checkRequirements(rec[key], supported_process_requirements)
251 if isinstance(rec, MutableSequence):
252 for entry2 in rec:
253 checkRequirements(entry2, supported_process_requirements)
254
255
256 def stage_files(
257 pathmapper: PathMapper,
258 stage_func: Optional[Callable[[str, str], None]] = None,
259 ignore_writable: bool = False,
260 symlink: bool = True,
261 secret_store: Optional[SecretStore] = None,
262 fix_conflicts: bool = False,
263 ) -> None:
264 """Link or copy files to their targets. Create them as needed."""
265 targets = {} # type: Dict[str, MapperEnt]
266 for key, entry in pathmapper.items():
267 if "File" not in entry.type:
268 continue
269 if entry.target not in targets:
270 targets[entry.target] = entry
271 elif targets[entry.target].resolved != entry.resolved:
272 if fix_conflicts:
273 # find first key that does not clash with an existing entry in targets
274 # start with entry.target + '_' + 2 and then keep incrementing the number till there is no clash
275 i = 2
276 tgt = f"{entry.target}_{i}"
277 while tgt in targets:
278 i += 1
279 tgt = f"{entry.target}_{i}"
280 targets[tgt] = pathmapper.update(
281 key, entry.resolved, tgt, entry.type, entry.staged
282 )
283 else:
284 raise WorkflowException(
285 "File staging conflict, trying to stage both %s and %s to the same target %s"
286 % (targets[entry.target].resolved, entry.resolved, entry.target)
287 )
288
289 for key, entry in pathmapper.items():
290 if not entry.staged:
291 continue
292 if not os.path.exists(os.path.dirname(entry.target)):
293 os.makedirs(os.path.dirname(entry.target))
294 if entry.type in ("File", "Directory") and os.path.exists(entry.resolved):
295 if symlink: # Use symlink func if allowed
296 if onWindows():
297 if entry.type == "File":
298 shutil.copy(entry.resolved, entry.target)
299 elif entry.type == "Directory":
300 if os.path.exists(entry.target) and os.path.isdir(entry.target):
301 shutil.rmtree(entry.target)
302 copytree_with_merge(entry.resolved, entry.target)
303 else:
304 os.symlink(entry.resolved, entry.target)
305 elif stage_func is not None:
306 stage_func(entry.resolved, entry.target)
307 elif (
308 entry.type == "Directory"
309 and not os.path.exists(entry.target)
310 and entry.resolved.startswith("_:")
311 ):
312 os.makedirs(entry.target)
313 elif entry.type == "WritableFile" and not ignore_writable:
314 shutil.copy(entry.resolved, entry.target)
315 ensure_writable(entry.target)
316 elif entry.type == "WritableDirectory" and not ignore_writable:
317 if entry.resolved.startswith("_:"):
318 os.makedirs(entry.target)
319 else:
320 shutil.copytree(entry.resolved, entry.target)
321 ensure_writable(entry.target)
322 elif entry.type == "CreateFile" or entry.type == "CreateWritableFile":
323 with open(entry.target, "wb") as new:
324 if secret_store is not None:
325 new.write(
326 cast(str, secret_store.retrieve(entry.resolved)).encode("utf-8")
327 )
328 else:
329 new.write(entry.resolved.encode("utf-8"))
330 if entry.type == "CreateFile":
331 os.chmod(entry.target, stat.S_IRUSR) # Read only
332 else: # it is a "CreateWritableFile"
333 ensure_writable(entry.target)
334 pathmapper.update(key, entry.target, entry.target, entry.type, entry.staged)
335
336
337 def relocateOutputs(
338 outputObj: CWLObjectType,
339 destination_path: str,
340 source_directories: Set[str],
341 action: str,
342 fs_access: StdFsAccess,
343 compute_checksum: bool = True,
344 path_mapper: Type[PathMapper] = PathMapper,
345 ) -> CWLObjectType:
346 adjustDirObjs(outputObj, functools.partial(get_listing, fs_access, recursive=True))
347
348 if action not in ("move", "copy"):
349 return outputObj
350
351 def _collectDirEntries(
352 obj: Union[CWLObjectType, MutableSequence[CWLObjectType], None]
353 ) -> Iterator[CWLObjectType]:
354 if isinstance(obj, dict):
355 if obj.get("class") in ("File", "Directory"):
356 yield obj
357 else:
358 for sub_obj in obj.values():
359 yield from _collectDirEntries(sub_obj)
360 elif isinstance(obj, MutableSequence):
361 for sub_obj in obj:
362 yield from _collectDirEntries(sub_obj)
363
364 def _relocate(src: str, dst: str) -> None:
365 if src == dst:
366 return
367
368 # If the source is not contained in source_directories we're not allowed to delete it
369 src = fs_access.realpath(src)
370 src_can_deleted = any(
371 os.path.commonprefix([p, src]) == p for p in source_directories
372 )
373
374 _action = "move" if action == "move" and src_can_deleted else "copy"
375
376 if _action == "move":
377 _logger.debug("Moving %s to %s", src, dst)
378 if fs_access.isdir(src) and fs_access.isdir(dst):
379 # merge directories
380 for dir_entry in scandir(src):
381 _relocate(dir_entry.path, fs_access.join(dst, dir_entry.name))
382 else:
383 shutil.move(src, dst)
384
385 elif _action == "copy":
386 _logger.debug("Copying %s to %s", src, dst)
387 if fs_access.isdir(src):
388 if os.path.isdir(dst):
389 shutil.rmtree(dst)
390 elif os.path.isfile(dst):
391 os.unlink(dst)
392 shutil.copytree(src, dst)
393 else:
394 shutil.copy2(src, dst)
395
396 def _realpath(
397 ob: CWLObjectType,
398 ) -> None: # should be type Union[CWLFile, CWLDirectory]
399 location = cast(str, ob["location"])
400 if location.startswith("file:"):
401 ob["location"] = file_uri(os.path.realpath(uri_file_path(location)))
402 elif location.startswith("/"):
403 ob["location"] = os.path.realpath(location)
404 elif not location.startswith("_:") and ":" in location:
405 ob["location"] = file_uri(fs_access.realpath(location))
406
407 outfiles = list(_collectDirEntries(outputObj))
408 visit_class(outfiles, ("File", "Directory"), _realpath)
409 pm = path_mapper(outfiles, "", destination_path, separateDirs=False)
410 stage_files(pm, stage_func=_relocate, symlink=False, fix_conflicts=True)
411
412 def _check_adjust(a_file: CWLObjectType) -> CWLObjectType:
413 a_file["location"] = file_uri(pm.mapper(cast(str, a_file["location"]))[1])
414 if "contents" in a_file:
415 del a_file["contents"]
416 return a_file
417
418 visit_class(outputObj, ("File", "Directory"), _check_adjust)
419
420 if compute_checksum:
421 visit_class(
422 outputObj, ("File",), functools.partial(compute_checksums, fs_access)
423 )
424 return outputObj
425
426
427 def cleanIntermediate(output_dirs: Iterable[str]) -> None:
428 for a in output_dirs:
429 if os.path.exists(a):
430 _logger.debug("Removing intermediate output directory %s", a)
431 shutil.rmtree(a, True)
432
433
434 def add_sizes(fsaccess: StdFsAccess, obj: CWLObjectType) -> None:
435 if "location" in obj:
436 try:
437 if "size" not in obj:
438 obj["size"] = fsaccess.size(cast(str, obj["location"]))
439 except OSError:
440 pass
441 elif "contents" in obj:
442 obj["size"] = len(cast(Sized, obj["contents"]))
443 return # best effort
444
445
446 def fill_in_defaults(
447 inputs: List[CWLObjectType],
448 job: CWLObjectType,
449 fsaccess: StdFsAccess,
450 ) -> None:
451 for e, inp in enumerate(inputs):
452 with SourceLine(
453 inputs, e, WorkflowException, _logger.isEnabledFor(logging.DEBUG)
454 ):
455 fieldname = shortname(cast(str, inp["id"]))
456 if job.get(fieldname) is not None:
457 pass
458 elif job.get(fieldname) is None and "default" in inp:
459 job[fieldname] = copy.deepcopy(inp["default"])
460 elif job.get(fieldname) is None and "null" in aslist(inp["type"]):
461 job[fieldname] = None
462 else:
463 raise WorkflowException(
464 "Missing required input parameter '%s'"
465 % shortname(cast(str, inp["id"]))
466 )
467
468
469 def avroize_type(
470 field_type: Union[
471 CWLObjectType, MutableSequence[CWLOutputType], CWLOutputType, None
472 ],
473 name_prefix: str = "",
474 ) -> None:
475 """Add missing information to a type so that CWL types are valid."""
476 if isinstance(field_type, MutableSequence):
477 for field in field_type:
478 avroize_type(field, name_prefix)
479 elif isinstance(field_type, MutableMapping):
480 if field_type["type"] in ("enum", "record"):
481 if "name" not in field_type:
482 field_type["name"] = name_prefix + str(uuid.uuid4())
483 if field_type["type"] == "record":
484 avroize_type(
485 cast(MutableSequence[CWLOutputType], field_type["fields"]), name_prefix
486 )
487 if field_type["type"] == "array":
488 avroize_type(
489 cast(MutableSequence[CWLOutputType], field_type["items"]), name_prefix
490 )
491 if isinstance(field_type["type"], MutableSequence):
492 for ctype in field_type["type"]:
493 avroize_type(cast(CWLOutputType, ctype), name_prefix)
494
495
496 def get_overrides(
497 overrides: MutableSequence[CWLObjectType], toolid: str
498 ) -> CWLObjectType:
499 req = {} # type: CWLObjectType
500 if not isinstance(overrides, MutableSequence):
501 raise ValidationException(
502 "Expected overrides to be a list, but was %s" % type(overrides)
503 )
504 for ov in overrides:
505 if ov["overrideTarget"] == toolid:
506 req.update(ov)
507 return req
508
509
510 _VAR_SPOOL_ERROR = textwrap.dedent(
511 """
512 Non-portable reference to /var/spool/cwl detected: '{}'.
513 To fix, replace /var/spool/cwl with $(runtime.outdir) or add
514 DockerRequirement to the 'requirements' section and declare
515 'dockerOutputDirectory: /var/spool/cwl'.
516 """
517 )
518
519
520 def var_spool_cwl_detector(
521 obj: CWLOutputType,
522 item: Optional[Any] = None,
523 obj_key: Optional[Any] = None,
524 ) -> bool:
525 """Detect any textual reference to /var/spool/cwl."""
526 r = False
527 if isinstance(obj, str):
528 if "var/spool/cwl" in obj and obj_key != "dockerOutputDirectory":
529 _logger.warning(
530 SourceLine(item=item, key=obj_key, raise_type=str).makeError(
531 _VAR_SPOOL_ERROR.format(obj)
532 )
533 )
534 r = True
535 elif isinstance(obj, MutableMapping):
536 for mkey, mvalue in obj.items():
537 r = var_spool_cwl_detector(cast(CWLOutputType, mvalue), obj, mkey) or r
538 elif isinstance(obj, MutableSequence):
539 for lkey, lvalue in enumerate(obj):
540 r = var_spool_cwl_detector(cast(CWLOutputType, lvalue), obj, lkey) or r
541 return r
542
543
544 def eval_resource(
545 builder: Builder, resource_req: Union[str, int, float]
546 ) -> Optional[Union[str, int, float]]:
547 if isinstance(resource_req, str) and expression.needs_parsing(resource_req):
548 result = builder.do_eval(resource_req)
549 if isinstance(result, (str, int)) or result is None:
550 return result
551 raise WorkflowException(
552 "Got incorrect return type {} from resource expression evaluation of {}.".format(
553 type(result), resource_req
554 )
555 )
556 return resource_req
557
558
559 # Threshold where the "too many files" warning kicks in
560 FILE_COUNT_WARNING = 5000
561
562
563 class Process(HasReqsHints, metaclass=abc.ABCMeta):
564 def __init__(
565 self, toolpath_object: CommentedMap, loadingContext: LoadingContext
566 ) -> None:
567 """Build a Process object from the provided dictionary."""
568 super().__init__()
569 self.metadata = getdefault(loadingContext.metadata, {}) # type: CWLObjectType
570 self.provenance_object = None # type: Optional[ProvenanceProfile]
571 self.parent_wf = None # type: Optional[ProvenanceProfile]
572 global SCHEMA_FILE, SCHEMA_DIR, SCHEMA_ANY # pylint: disable=global-statement
573 if SCHEMA_FILE is None or SCHEMA_ANY is None or SCHEMA_DIR is None:
574 get_schema("v1.0")
575 SCHEMA_ANY = cast(
576 CWLObjectType,
577 SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/salad#Any"],
578 )
579 SCHEMA_FILE = cast(
580 CWLObjectType,
581 SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/cwl#File"],
582 )
583 SCHEMA_DIR = cast(
584 CWLObjectType,
585 SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/cwl#Directory"],
586 )
587
588 self.names = make_avro_schema([SCHEMA_FILE, SCHEMA_DIR, SCHEMA_ANY], Loader({}))
589 self.tool = toolpath_object
590 self.requirements = copy.deepcopy(getdefault(loadingContext.requirements, []))
591 self.requirements.extend(self.tool.get("requirements", []))
592 if "id" not in self.tool:
593 self.tool["id"] = "_:" + str(uuid.uuid4())
594 self.requirements.extend(
595 cast(
596 List[CWLObjectType],
597 get_overrides(
598 getdefault(loadingContext.overrides_list, []), self.tool["id"]
599 ).get("requirements", []),
600 )
601 )
602 self.hints = copy.deepcopy(getdefault(loadingContext.hints, []))
603 self.hints.extend(self.tool.get("hints", []))
604 # Versions of requirements and hints which aren't mutated.
605 self.original_requirements = copy.deepcopy(self.requirements)
606 self.original_hints = copy.deepcopy(self.hints)
607 self.doc_loader = loadingContext.loader
608 self.doc_schema = loadingContext.avsc_names
609
610 self.formatgraph = None # type: Optional[Graph]
611 if self.doc_loader is not None:
612 self.formatgraph = self.doc_loader.graph
613
614 checkRequirements(self.tool, supportedProcessRequirements)
615 self.validate_hints(
616 cast(Names, loadingContext.avsc_names),
617 self.tool.get("hints", []),
618 strict=getdefault(loadingContext.strict, False),
619 )
620
621 self.schemaDefs = {} # type: MutableMapping[str, CWLObjectType]
622
623 sd, _ = self.get_requirement("SchemaDefRequirement")
624
625 if sd is not None:
626 sdtypes = cast(MutableSequence[CWLObjectType], sd["types"])
627 avroize_type(cast(MutableSequence[CWLOutputType], sdtypes))
628 av = make_valid_avro(
629 sdtypes,
630 {cast(str, t["name"]): cast(Dict[str, Any], t) for t in sdtypes},
631 set(),
632 )
633 for i in av:
634 self.schemaDefs[i["name"]] = i # type: ignore
635 make_avsc_object(convert_to_dict(av), self.names)
636
637 # Build record schema from inputs
638 self.inputs_record_schema = {
639 "name": "input_record_schema",
640 "type": "record",
641 "fields": [],
642 } # type: CWLObjectType
643 self.outputs_record_schema = {
644 "name": "outputs_record_schema",
645 "type": "record",
646 "fields": [],
647 } # type: CWLObjectType
648
649 for key in ("inputs", "outputs"):
650 for i in self.tool[key]:
651 c = copy.deepcopy(i)
652 c["name"] = shortname(c["id"])
653 del c["id"]
654
655 if "type" not in c:
656 raise ValidationException(
657 "Missing 'type' in parameter '{}'".format(c["name"])
658 )
659
660 if "default" in c and "null" not in aslist(c["type"]):
661 nullable = ["null"]
662 nullable.extend(aslist(c["type"]))
663 c["type"] = nullable
664 else:
665 c["type"] = c["type"]
666 avroize_type(c["type"], c["name"])
667 if key == "inputs":
668 cast(
669 List[CWLObjectType], self.inputs_record_schema["fields"]
670 ).append(c)
671 elif key == "outputs":
672 cast(
673 List[CWLObjectType], self.outputs_record_schema["fields"]
674 ).append(c)
675
676 with SourceLine(toolpath_object, "inputs", ValidationException):
677 self.inputs_record_schema = cast(
678 CWLObjectType,
679 make_valid_avro(self.inputs_record_schema, {}, set()),
680 )
681 make_avsc_object(convert_to_dict(self.inputs_record_schema), self.names)
682 with SourceLine(toolpath_object, "outputs", ValidationException):
683 self.outputs_record_schema = cast(
684 CWLObjectType,
685 make_valid_avro(self.outputs_record_schema, {}, set()),
686 )
687 make_avsc_object(convert_to_dict(self.outputs_record_schema), self.names)
688
689 if toolpath_object.get("class") is not None and not getdefault(
690 loadingContext.disable_js_validation, False
691 ):
692 validate_js_options = (
693 None
694 ) # type: Optional[Dict[str, Union[List[str], str, int]]]
695 if loadingContext.js_hint_options_file is not None:
696 try:
697 with open(loadingContext.js_hint_options_file) as options_file:
698 validate_js_options = json.load(options_file)
699 except (OSError, ValueError):
700 _logger.error(
701 "Failed to read options file %s",
702 loadingContext.js_hint_options_file,
703 )
704 raise
705 if self.doc_schema is not None:
706 validate_js_expressions(
707 toolpath_object,
708 self.doc_schema.names[toolpath_object["class"]],
709 validate_js_options,
710 )
711
712 dockerReq, is_req = self.get_requirement("DockerRequirement")
713
714 if (
715 dockerReq is not None
716 and "dockerOutputDirectory" in dockerReq
717 and is_req is not None
718 and not is_req
719 ):
720 _logger.warning(
721 SourceLine(item=dockerReq, raise_type=str).makeError(
722 "When 'dockerOutputDirectory' is declared, DockerRequirement "
723 "should go in the 'requirements' section, not 'hints'."
724 ""
725 )
726 )
727
728 if (
729 dockerReq is not None
730 and is_req is not None
731 and dockerReq.get("dockerOutputDirectory") == "/var/spool/cwl"
732 ):
733 if is_req:
734 # In this specific case, it is legal to have /var/spool/cwl, so skip the check.
735 pass
736 else:
737 # Must be a requirement
738 var_spool_cwl_detector(self.tool)
739 else:
740 var_spool_cwl_detector(self.tool)
741
742 def _init_job(
743 self, joborder: CWLObjectType, runtime_context: RuntimeContext
744 ) -> Builder:
745
746 if self.metadata.get("cwlVersion") != INTERNAL_VERSION:
747 raise WorkflowException(
748 "Process object loaded with version '%s', must update to '%s' in order to execute."
749 % (self.metadata.get("cwlVersion"), INTERNAL_VERSION)
750 )
751
752 job = copy.deepcopy(joborder)
753
754 make_fs_access = getdefault(runtime_context.make_fs_access, StdFsAccess)
755 fs_access = make_fs_access(runtime_context.basedir)
756
757 load_listing_req, _ = self.get_requirement("LoadListingRequirement")
758
759 load_listing = (
760 cast(str, load_listing_req.get("loadListing"))
761 if load_listing_req is not None
762 else "no_listing"
763 )
764
765 # Validate job order
766 try:
767 fill_in_defaults(self.tool["inputs"], job, fs_access)
768
769 normalizeFilesDirs(job)
770 schema = self.names.get_name("input_record_schema", None)
771 if schema is None:
772 raise WorkflowException(
773 "Missing input record schema: " "{}".format(self.names)
774 )
775 validate_ex(schema, job, strict=False, logger=_logger_validation_warnings)
776
777 if load_listing and load_listing != "no_listing":
778 get_listing(fs_access, job, recursive=(load_listing == "deep_listing"))
779
780 visit_class(job, ("File",), functools.partial(add_sizes, fs_access))
781
782 if load_listing == "deep_listing":
783 for i, inparm in enumerate(self.tool["inputs"]):
784 k = shortname(inparm["id"])
785 if k not in job:
786 continue
787 v = job[k]
788 dircount = [0]
789
790 def inc(d): # type: (List[int]) -> None
791 d[0] += 1
792
793 visit_class(v, ("Directory",), lambda x: inc(dircount))
794 if dircount[0] == 0:
795 continue
796 filecount = [0]
797 visit_class(v, ("File",), lambda x: inc(filecount))
798 if filecount[0] > FILE_COUNT_WARNING:
799 # Long lines in this message are okay, will be reflowed based on terminal columns.
800 _logger.warning(
801 strip_dup_lineno(
802 SourceLine(self.tool["inputs"], i, str).makeError(
803 """Recursive directory listing has resulted in a large number of File objects (%s) passed to the input parameter '%s'. This may negatively affect workflow performance and memory use.
804
805 If this is a problem, use the hint 'cwltool:LoadListingRequirement' with "shallow_listing" or "no_listing" to change the directory listing behavior:
806
807 $namespaces:
808 cwltool: "http://commonwl.org/cwltool#"
809 hints:
810 cwltool:LoadListingRequirement:
811 loadListing: shallow_listing
812
813 """
814 % (filecount[0], k)
815 )
816 )
817 )
818
819 except (ValidationException, WorkflowException) as err:
820 raise WorkflowException("Invalid job input record:\n" + str(err)) from err
821
822 files = [] # type: List[CWLObjectType]
823 bindings = CommentedSeq()
824 outdir = ""
825 tmpdir = ""
826 stagedir = ""
827
828 docker_req, _ = self.get_requirement("DockerRequirement")
829 default_docker = None
830
831 if docker_req is None and runtime_context.default_container:
832 default_docker = runtime_context.default_container
833
834 if (docker_req or default_docker) and runtime_context.use_container:
835 if docker_req is not None:
836 # Check if docker output directory is absolute
837 if docker_req.get("dockerOutputDirectory") and cast(
838 str, docker_req.get("dockerOutputDirectory")
839 ).startswith("/"):
840 outdir = cast(str, docker_req.get("dockerOutputDirectory"))
841 else:
842 outdir = cast(
843 str,
844 docker_req.get("dockerOutputDirectory")
845 or runtime_context.docker_outdir
846 or random_outdir(),
847 )
848 elif default_docker is not None:
849 outdir = runtime_context.docker_outdir or random_outdir()
850 tmpdir = runtime_context.docker_tmpdir or "/tmp" # nosec
851 stagedir = runtime_context.docker_stagedir or "/var/lib/cwl"
852 else:
853 if self.tool["class"] == "CommandLineTool":
854 outdir = fs_access.realpath(runtime_context.get_outdir())
855 tmpdir = fs_access.realpath(runtime_context.get_tmpdir())
856 stagedir = fs_access.realpath(runtime_context.get_stagedir())
857
858 cwl_version = cast(
859 str,
860 self.metadata.get("http://commonwl.org/cwltool#original_cwlVersion", None),
861 )
862 builder = Builder(
863 job,
864 files,
865 bindings,
866 self.schemaDefs,
867 self.names,
868 self.requirements,
869 self.hints,
870 {},
871 runtime_context.mutation_manager,
872 self.formatgraph,
873 make_fs_access,
874 fs_access,
875 runtime_context.job_script_provider,
876 runtime_context.eval_timeout,
877 runtime_context.debug,
878 runtime_context.js_console,
879 runtime_context.force_docker_pull,
880 load_listing,
881 outdir,
882 tmpdir,
883 stagedir,
884 cwl_version,
885 )
886
887 bindings.extend(
888 builder.bind_input(
889 self.inputs_record_schema,
890 job,
891 discover_secondaryFiles=getdefault(runtime_context.toplevel, False),
892 )
893 )
894
895 if self.tool.get("baseCommand"):
896 for index, command in enumerate(aslist(self.tool["baseCommand"])):
897 bindings.append({"position": [-1000000, index], "datum": command})
898
899 if self.tool.get("arguments"):
900 for i, arg in enumerate(self.tool["arguments"]):
901 lc = self.tool["arguments"].lc.data[i]
902 filename = self.tool["arguments"].lc.filename
903 bindings.lc.add_kv_line_col(len(bindings), lc)
904 if isinstance(arg, MutableMapping):
905 arg = copy.deepcopy(arg)
906 if arg.get("position"):
907 position = arg.get("position")
908 if isinstance(position, str): # no need to test the
909 # CWLVersion as the v1.0
910 # schema only allows ints
911 position = builder.do_eval(position)
912 if position is None:
913 position = 0
914 arg["position"] = [position, i]
915 else:
916 arg["position"] = [0, i]
917 bindings.append(arg)
918 elif ("$(" in arg) or ("${" in arg):
919 cm = CommentedMap((("position", [0, i]), ("valueFrom", arg)))
920 cm.lc.add_kv_line_col("valueFrom", lc)
921 cm.lc.filename = filename
922 bindings.append(cm)
923 else:
924 cm = CommentedMap((("position", [0, i]), ("datum", arg)))
925 cm.lc.add_kv_line_col("datum", lc)
926 cm.lc.filename = filename
927 bindings.append(cm)
928
929 # use python2 like sorting of heterogeneous lists
930 # (containing str and int types),
931 key = functools.cmp_to_key(cmp_like_py2)
932
933 # This awkward construction replaces the contents of
934 # "bindings" in place (because Builder expects it to be
935 # mutated in place, sigh, I'm sorry) with its contents sorted,
936 # supporting different versions of Python and ruamel.yaml with
937 # different behaviors/bugs in CommentedSeq.
938 bindings_copy = copy.deepcopy(bindings)
939 del bindings[:]
940 bindings.extend(sorted(bindings_copy, key=key))
941
942 if self.tool["class"] != "Workflow":
943 builder.resources = self.evalResources(builder, runtime_context)
944 return builder
945
946 def evalResources(
947 self, builder: Builder, runtimeContext: RuntimeContext
948 ) -> Dict[str, Union[int, float, str]]:
949 resourceReq, _ = self.get_requirement("ResourceRequirement")
950 if resourceReq is None:
951 resourceReq = {}
952 cwl_version = self.metadata.get(
953 "http://commonwl.org/cwltool#original_cwlVersion", None
954 )
955 if cwl_version == "v1.0":
956 ram = 1024
957 else:
958 ram = 256
959 request: Dict[str, Union[int, float, str]] = {
960 "coresMin": 1,
961 "coresMax": 1,
962 "ramMin": ram,
963 "ramMax": ram,
964 "tmpdirMin": 1024,
965 "tmpdirMax": 1024,
966 "outdirMin": 1024,
967 "outdirMax": 1024,
968 }
969 for a in ("cores", "ram", "tmpdir", "outdir"):
970 mn = mx = None # type: Optional[Union[int, float]]
971 if resourceReq.get(a + "Min"):
972 mn = cast(
973 Union[int, float],
974 eval_resource(
975 builder, cast(Union[str, int, float], resourceReq[a + "Min"])
976 ),
977 )
978 if resourceReq.get(a + "Max"):
979 mx = cast(
980 Union[int, float],
981 eval_resource(
982 builder, cast(Union[str, int, float], resourceReq[a + "Max"])
983 ),
984 )
985 if mn is None:
986 mn = mx
987 elif mx is None:
988 mx = mn
989
990 if mn is not None:
991 request[a + "Min"] = mn
992 request[a + "Max"] = cast(Union[int, float], mx)
993
994 if runtimeContext.select_resources is not None:
995 return runtimeContext.select_resources(request, runtimeContext)
996 return {
997 "cores": request["coresMin"],
998 "ram": math.ceil(request["ramMin"])
999 if not isinstance(request["ramMin"], str)
1000 else request["ramMin"],
1001 "tmpdirSize": math.ceil(request["tmpdirMin"])
1002 if not isinstance(request["tmpdirMin"], str)
1003 else request["tmpdirMin"],
1004 "outdirSize": math.ceil(request["outdirMin"])
1005 if not isinstance(request["outdirMin"], str)
1006 else request["outdirMin"],
1007 }
1008
1009 def validate_hints(
1010 self, avsc_names: Names, hints: List[CWLObjectType], strict: bool
1011 ) -> None:
1012 for i, r in enumerate(hints):
1013 sl = SourceLine(hints, i, ValidationException)
1014 with sl:
1015 if (
1016 avsc_names.get_name(cast(str, r["class"]), None) is not None
1017 and self.doc_loader is not None
1018 ):
1019 plain_hint = {
1020 key: r[key]
1021 for key in r
1022 if key not in self.doc_loader.identifiers
1023 } # strip identifiers
1024 validate_ex(
1025 cast(
1026 Schema,
1027 avsc_names.get_name(cast(str, plain_hint["class"]), None),
1028 ),
1029 plain_hint,
1030 strict=strict,
1031 )
1032 elif r["class"] in ("NetworkAccess", "LoadListingRequirement"):
1033 pass
1034 else:
1035 _logger.info(str(sl.makeError("Unknown hint %s" % (r["class"]))))
1036
1037 def visit(self, op: Callable[[CommentedMap], None]) -> None:
1038 op(self.tool)
1039
1040 @abc.abstractmethod
1041 def job(
1042 self,
1043 job_order: CWLObjectType,
1044 output_callbacks: Optional[OutputCallbackType],
1045 runtimeContext: RuntimeContext,
1046 ) -> JobsGeneratorType:
1047 pass
1048
1049
1050 _names = set() # type: Set[str]
1051
1052
1053 def uniquename(stem: str, names: Optional[Set[str]] = None) -> str:
1054 global _names
1055 if names is None:
1056 names = _names
1057 c = 1
1058 u = stem
1059 while u in names:
1060 c += 1
1061 u = f"{stem}_{c}"
1062 names.add(u)
1063 return u
1064
1065
1066 def nestdir(base: str, deps: CWLObjectType) -> CWLObjectType:
1067 dirname = os.path.dirname(base) + "/"
1068 subid = cast(str, deps["location"])
1069 if subid.startswith(dirname):
1070 s2 = subid[len(dirname) :]
1071 sp = s2.split("/")
1072 sp.pop()
1073 while sp:
1074 nx = sp.pop()
1075 deps = {"class": "Directory", "basename": nx, "listing": [deps]}
1076 return deps
1077
1078
1079 def mergedirs(listing: List[CWLObjectType]) -> List[CWLObjectType]:
1080 r = [] # type: List[CWLObjectType]
1081 ents = {} # type: Dict[str, CWLObjectType]
1082 collided = set() # type: Set[str]
1083 for e in listing:
1084 basename = cast(str, e["basename"])
1085 if basename not in ents:
1086 ents[basename] = e
1087 elif e["class"] == "Directory":
1088 if e.get("listing"):
1089 cast(
1090 List[CWLObjectType], ents[basename].setdefault("listing", [])
1091 ).extend(cast(List[CWLObjectType], e["listing"]))
1092 if cast(str, ents[basename]["location"]).startswith("_:"):
1093 ents[basename]["location"] = e["location"]
1094 elif e["location"] != ents[basename]["location"]:
1095 # same basename, different location, collision,
1096 # rename both.
1097 collided.add(basename)
1098 e2 = ents[basename]
1099
1100 e["basename"] = urllib.parse.quote(cast(str, e["location"]), safe="")
1101 e2["basename"] = urllib.parse.quote(cast(str, e2["location"]), safe="")
1102
1103 e["nameroot"], e["nameext"] = os.path.splitext(cast(str, e["basename"]))
1104 e2["nameroot"], e2["nameext"] = os.path.splitext(cast(str, e2["basename"]))
1105
1106 ents[cast(str, e["basename"])] = e
1107 ents[cast(str, e2["basename"])] = e2
1108 for c in collided:
1109 del ents[c]
1110 for e in ents.values():
1111 if e["class"] == "Directory" and "listing" in e:
1112 e["listing"] = cast(
1113 MutableSequence[CWLOutputAtomType],
1114 mergedirs(cast(List[CWLObjectType], e["listing"])),
1115 )
1116 r.extend(ents.values())
1117 return r
1118
1119
1120 CWL_IANA = "https://www.iana.org/assignments/media-types/application/cwl"
1121
1122
1123 def scandeps(
1124 base: str,
1125 doc: Union[CWLObjectType, MutableSequence[CWLObjectType]],
1126 reffields: Set[str],
1127 urlfields: Set[str],
1128 loadref: Callable[[str, str], Union[CommentedMap, CommentedSeq, str, None]],
1129 urljoin: Callable[[str, str], str] = urllib.parse.urljoin,
1130 nestdirs: bool = True,
1131 ) -> MutableSequence[CWLObjectType]:
1132 r = [] # type: MutableSequence[CWLObjectType]
1133 if isinstance(doc, MutableMapping):
1134 if "id" in doc:
1135 if cast(str, doc["id"]).startswith("file://"):
1136 df, _ = urllib.parse.urldefrag(cast(str, doc["id"]))
1137 if base != df:
1138 r.append({"class": "File", "location": df, "format": CWL_IANA})
1139 base = df
1140
1141 if doc.get("class") in ("File", "Directory") and "location" in urlfields:
1142 u = cast(Optional[str], doc.get("location", doc.get("path")))
1143 if u and not u.startswith("_:"):
1144 deps = {
1145 "class": doc["class"],
1146 "location": urljoin(base, u),
1147 } # type: CWLObjectType
1148 if "basename" in doc:
1149 deps["basename"] = doc["basename"]
1150 if doc["class"] == "Directory" and "listing" in doc:
1151 deps["listing"] = doc["listing"]
1152 if doc["class"] == "File" and "secondaryFiles" in doc:
1153 deps["secondaryFiles"] = cast(
1154 CWLOutputAtomType,
1155 scandeps(
1156 base,
1157 cast(
1158 Union[CWLObjectType, MutableSequence[CWLObjectType]],
1159 doc["secondaryFiles"],
1160 ),
1161 reffields,
1162 urlfields,
1163 loadref,
1164 urljoin=urljoin,
1165 nestdirs=nestdirs,
1166 ),
1167 )
1168 if nestdirs:
1169 deps = nestdir(base, deps)
1170 r.append(deps)
1171 else:
1172 if doc["class"] == "Directory" and "listing" in doc:
1173 r.extend(
1174 scandeps(
1175 base,
1176 cast(MutableSequence[CWLObjectType], doc["listing"]),
1177 reffields,
1178 urlfields,
1179 loadref,
1180 urljoin=urljoin,
1181 nestdirs=nestdirs,
1182 )
1183 )
1184 elif doc["class"] == "File" and "secondaryFiles" in doc:
1185 r.extend(
1186 scandeps(
1187 base,
1188 cast(MutableSequence[CWLObjectType], doc["secondaryFiles"]),
1189 reffields,
1190 urlfields,
1191 loadref,
1192 urljoin=urljoin,
1193 nestdirs=nestdirs,
1194 )
1195 )
1196
1197 for k, v in doc.items():
1198 if k in reffields:
1199 for u2 in aslist(v):
1200 if isinstance(u2, MutableMapping):
1201 r.extend(
1202 scandeps(
1203 base,
1204 u2,
1205 reffields,
1206 urlfields,
1207 loadref,
1208 urljoin=urljoin,
1209 nestdirs=nestdirs,
1210 )
1211 )
1212 else:
1213 subid = urljoin(base, u2)
1214 basedf, _ = urllib.parse.urldefrag(base)
1215 subiddf, _ = urllib.parse.urldefrag(subid)
1216 if basedf == subiddf:
1217 continue
1218 sub = cast(
1219 Union[MutableSequence[CWLObjectType], CWLObjectType],
1220 loadref(base, u2),
1221 )
1222 deps2 = {
1223 "class": "File",
1224 "location": subid,
1225 "format": CWL_IANA,
1226 } # type: CWLObjectType
1227 sf = scandeps(
1228 subid,
1229 sub,
1230 reffields,
1231 urlfields,
1232 loadref,
1233 urljoin=urljoin,
1234 nestdirs=nestdirs,
1235 )
1236 if sf:
1237 deps2["secondaryFiles"] = cast(
1238 MutableSequence[CWLOutputAtomType], sf
1239 )
1240 if nestdirs:
1241 deps2 = nestdir(base, deps2)
1242 r.append(deps2)
1243 elif k in urlfields and k != "location":
1244 for u3 in aslist(v):
1245 deps = {"class": "File", "location": urljoin(base, u3)}
1246 if nestdirs:
1247 deps = nestdir(base, deps)
1248 r.append(deps)
1249 elif doc.get("class") in ("File", "Directory") and k in (
1250 "listing",
1251 "secondaryFiles",
1252 ):
1253 # should be handled earlier.
1254 pass
1255 else:
1256 r.extend(
1257 scandeps(
1258 base,
1259 cast(Union[MutableSequence[CWLObjectType], CWLObjectType], v),
1260 reffields,
1261 urlfields,
1262 loadref,
1263 urljoin=urljoin,
1264 nestdirs=nestdirs,
1265 )
1266 )
1267 elif isinstance(doc, MutableSequence):
1268 for d in doc:
1269 r.extend(
1270 scandeps(
1271 base,
1272 d,
1273 reffields,
1274 urlfields,
1275 loadref,
1276 urljoin=urljoin,
1277 nestdirs=nestdirs,
1278 )
1279 )
1280
1281 if r:
1282 normalizeFilesDirs(r)
1283 r = mergedirs(cast(List[CWLObjectType], r))
1284
1285 return r
1286
1287
1288 def compute_checksums(fs_access: StdFsAccess, fileobj: CWLObjectType) -> None:
1289 if "checksum" not in fileobj:
1290 checksum = hashlib.sha1() # nosec
1291 location = cast(str, fileobj["location"])
1292 with fs_access.open(location, "rb") as f:
1293 contents = f.read(1024 * 1024)
1294 while contents != b"":
1295 checksum.update(contents)
1296 contents = f.read(1024 * 1024)
1297 fileobj["checksum"] = "sha1$%s" % checksum.hexdigest()
1298 fileobj["size"] = fs_access.size(location)