comparison env/lib/python3.9/site-packages/cwltool/main.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 #!/usr/bin/env python3
2 # PYTHON_ARGCOMPLETE_OK
3 """Entry point for cwltool."""
4
5 import argparse
6 import functools
7 import io
8 import logging
9 import os
10 import signal
11 import subprocess # nosec
12 import sys
13 import time
14 import urllib
15 from codecs import StreamWriter, getwriter
16 from collections.abc import MutableMapping, MutableSequence
17 from typing import (
18 IO,
19 Any,
20 Callable,
21 Dict,
22 List,
23 Mapping,
24 MutableMapping,
25 MutableSequence,
26 Optional,
27 Sized,
28 TextIO,
29 Tuple,
30 Union,
31 cast,
32 )
33
34 import argcomplete
35 import coloredlogs
36 import pkg_resources # part of setuptools
37 from ruamel import yaml
38 from ruamel.yaml.comments import CommentedMap, CommentedSeq
39 from schema_salad.exceptions import ValidationException
40 from schema_salad.ref_resolver import Loader, file_uri, uri_file_path
41 from schema_salad.sourceline import strip_dup_lineno
42 from schema_salad.utils import ContextType, FetcherCallableType, json_dumps
43
44 from . import CWL_CONTENT_TYPES, workflow
45 from .argparser import arg_parser, generate_parser, get_default_args
46 from .builder import HasReqsHints
47 from .context import LoadingContext, RuntimeContext, getdefault
48 from .cwlrdf import printdot, printrdf
49 from .errors import UnsupportedRequirement, WorkflowException
50 from .executors import JobExecutor, MultithreadedJobExecutor, SingleJobExecutor
51 from .load_tool import (
52 default_loader,
53 fetch_document,
54 jobloaderctx,
55 load_overrides,
56 make_tool,
57 resolve_and_validate_document,
58 resolve_overrides,
59 resolve_tool_uri,
60 )
61 from .loghandler import _logger, defaultStreamHandler
62 from .mpi import MpiConfig
63 from .mutation import MutationManager
64 from .pack import pack
65 from .process import (
66 CWL_IANA,
67 Process,
68 add_sizes,
69 scandeps,
70 shortname,
71 use_custom_schema,
72 use_standard_schema,
73 )
74 from .procgenerator import ProcessGenerator
75 from .provenance import ResearchObject
76 from .resolver import ga4gh_tool_registries, tool_resolver
77 from .secrets import SecretStore
78 from .software_requirements import (
79 DependenciesConfiguration,
80 get_container_from_software_requirements,
81 )
82 from .stdfsaccess import StdFsAccess
83 from .subgraph import get_step, get_subgraph
84 from .update import ALLUPDATES, UPDATES
85 from .utils import (
86 DEFAULT_TMP_PREFIX,
87 CWLObjectType,
88 CWLOutputAtomType,
89 CWLOutputType,
90 adjustDirObjs,
91 normalizeFilesDirs,
92 onWindows,
93 processes_to_kill,
94 trim_listing,
95 versionstring,
96 visit_class,
97 windows_default_container_id,
98 )
99 from .workflow import Workflow
100
101
102 def _terminate_processes() -> None:
103 """Kill all spawned processes.
104
105 Processes to be killed must be appended to `utils.processes_to_kill`
106 as they are spawned.
107
108 An important caveat: since there's no supported way to kill another
109 thread in Python, this function cannot stop other threads from
110 continuing to execute while it kills the processes that they've
111 spawned. This may occasionally lead to unexpected behaviour.
112 """
113 # It's possible that another thread will spawn a new task while
114 # we're executing, so it's not safe to use a for loop here.
115 while processes_to_kill:
116 process = processes_to_kill.popleft()
117 cidfile = [
118 str(arg).split("=")[1] for arg in process.args if "--cidfile" in str(arg)
119 ]
120 if cidfile:
121 try:
122 with open(cidfile[0]) as inp_stream:
123 p = subprocess.Popen( # nosec
124 ["docker", "kill", inp_stream.read()], shell=False # nosec
125 )
126 try:
127 p.wait(timeout=10)
128 except subprocess.TimeoutExpired:
129 p.kill()
130 except FileNotFoundError:
131 pass
132
133
134 def _signal_handler(signum: int, _: Any) -> None:
135 """Kill all spawned processes and exit.
136
137 Note that it's possible for another thread to spawn a process after
138 all processes have been killed, but before Python exits.
139
140 Refer to the docstring for _terminate_processes() for other caveats.
141 """
142 _terminate_processes()
143 sys.exit(signum)
144
145
146 def generate_example_input(
147 inptype: Optional[CWLOutputType],
148 default: Optional[CWLOutputType],
149 ) -> Tuple[Any, str]:
150 """Convert a single input schema into an example."""
151 example = None
152 comment = ""
153 defaults = {
154 "null": "null",
155 "Any": "null",
156 "boolean": False,
157 "int": 0,
158 "long": 0,
159 "float": 0.1,
160 "double": 0.1,
161 "string": "a_string",
162 "File": yaml.comments.CommentedMap(
163 [("class", "File"), ("path", "a/file/path")]
164 ),
165 "Directory": yaml.comments.CommentedMap(
166 [("class", "Directory"), ("path", "a/directory/path")]
167 ),
168 } # type: CWLObjectType
169 if isinstance(inptype, MutableSequence):
170 optional = False
171 if "null" in inptype:
172 inptype.remove("null")
173 optional = True
174 if len(inptype) == 1:
175 example, comment = generate_example_input(inptype[0], default)
176 if optional:
177 if comment:
178 comment = f"{comment} (optional)"
179 else:
180 comment = "optional"
181 else:
182 example = CommentedSeq()
183 for index, entry in enumerate(inptype):
184 value, e_comment = generate_example_input(entry, default)
185 example.append(value)
186 example.yaml_add_eol_comment(e_comment, index)
187 if optional:
188 comment = "optional"
189 elif isinstance(inptype, Mapping) and "type" in inptype:
190 if inptype["type"] == "array":
191 first_item = cast(MutableSequence[CWLObjectType], inptype["items"])[0]
192 items_len = len(cast(Sized, inptype["items"]))
193 if items_len == 1 and "type" in first_item and first_item["type"] == "enum":
194 # array of just an enum then list all the options
195 example = first_item["symbols"]
196 if "name" in first_item:
197 comment = 'array of type "{}".'.format(first_item["name"])
198 else:
199 value, comment = generate_example_input(inptype["items"], None)
200 comment = "array of " + comment
201 if items_len == 1:
202 example = [value]
203 else:
204 example = value
205 if default is not None:
206 example = default
207 elif inptype["type"] == "enum":
208 symbols = cast(List[str], inptype["symbols"])
209 if default is not None:
210 example = default
211 elif "default" in inptype:
212 example = inptype["default"]
213 elif len(cast(Sized, inptype["symbols"])) == 1:
214 example = symbols[0]
215 else:
216 example = "{}_enum_value".format(inptype.get("name", "valid"))
217 comment = 'enum; valid values: "{}"'.format('", "'.join(symbols))
218 elif inptype["type"] == "record":
219 example = yaml.comments.CommentedMap()
220 if "name" in inptype:
221 comment = '"{}" record type.'.format(inptype["name"])
222 for field in cast(List[CWLObjectType], inptype["fields"]):
223 value, f_comment = generate_example_input(field["type"], None)
224 example.insert(0, shortname(cast(str, field["name"])), value, f_comment)
225 elif "default" in inptype:
226 example = inptype["default"]
227 comment = 'default value of type "{}".'.format(inptype["type"])
228 else:
229 example = defaults.get(cast(str, inptype["type"]), str(inptype))
230 comment = 'type "{}".'.format(inptype["type"])
231 else:
232 if not default:
233 example = defaults.get(str(inptype), str(inptype))
234 comment = f'type "{inptype}"'
235 else:
236 example = default
237 comment = f'default value of type "{inptype}".'
238 return example, comment
239
240
241 def realize_input_schema(
242 input_types: MutableSequence[CWLObjectType],
243 schema_defs: MutableMapping[str, CWLObjectType],
244 ) -> MutableSequence[CWLObjectType]:
245 """Replace references to named typed with the actual types."""
246 for index, entry in enumerate(input_types):
247 if isinstance(entry, str):
248 if "#" in entry:
249 _, input_type_name = entry.split("#")
250 else:
251 input_type_name = entry
252 if input_type_name in schema_defs:
253 entry = input_types[index] = schema_defs[input_type_name]
254 if isinstance(entry, Mapping):
255 if isinstance(entry["type"], str) and "#" in entry["type"]:
256 _, input_type_name = entry["type"].split("#")
257 if input_type_name in schema_defs:
258 input_types[index]["type"] = cast(
259 CWLOutputAtomType,
260 realize_input_schema(
261 cast(
262 MutableSequence[CWLObjectType],
263 schema_defs[input_type_name],
264 ),
265 schema_defs,
266 ),
267 )
268 if isinstance(entry["type"], MutableSequence):
269 input_types[index]["type"] = cast(
270 CWLOutputAtomType,
271 realize_input_schema(
272 cast(MutableSequence[CWLObjectType], entry["type"]), schema_defs
273 ),
274 )
275 if isinstance(entry["type"], Mapping):
276 input_types[index]["type"] = cast(
277 CWLOutputAtomType,
278 realize_input_schema(
279 [cast(CWLObjectType, input_types[index]["type"])], schema_defs
280 ),
281 )
282 if entry["type"] == "array":
283 items = (
284 entry["items"]
285 if not isinstance(entry["items"], str)
286 else [entry["items"]]
287 )
288 input_types[index]["items"] = cast(
289 CWLOutputAtomType,
290 realize_input_schema(
291 cast(MutableSequence[CWLObjectType], items), schema_defs
292 ),
293 )
294 if entry["type"] == "record":
295 input_types[index]["fields"] = cast(
296 CWLOutputAtomType,
297 realize_input_schema(
298 cast(MutableSequence[CWLObjectType], entry["fields"]),
299 schema_defs,
300 ),
301 )
302 return input_types
303
304
305 def generate_input_template(tool: Process) -> CWLObjectType:
306 """Generate an example input object for the given CWL process."""
307 template = yaml.comments.CommentedMap()
308 for inp in realize_input_schema(tool.tool["inputs"], tool.schemaDefs):
309 name = shortname(cast(str, inp["id"]))
310 value, comment = generate_example_input(inp["type"], inp.get("default", None))
311 template.insert(0, name, value, comment)
312 return template
313
314
315 def load_job_order(
316 args: argparse.Namespace,
317 stdin: IO[Any],
318 fetcher_constructor: Optional[FetcherCallableType],
319 overrides_list: List[CWLObjectType],
320 tool_file_uri: str,
321 ) -> Tuple[Optional[CWLObjectType], str, Loader]:
322
323 job_order_object = None
324 job_order_file = None
325
326 _jobloaderctx = jobloaderctx.copy()
327 loader = Loader(_jobloaderctx, fetcher_constructor=fetcher_constructor)
328
329 if len(args.job_order) == 1 and args.job_order[0][0] != "-":
330 job_order_file = args.job_order[0]
331 elif len(args.job_order) == 1 and args.job_order[0] == "-":
332 job_order_object = yaml.main.round_trip_load(stdin)
333 job_order_object, _ = loader.resolve_all(
334 job_order_object, file_uri(os.getcwd()) + "/"
335 )
336 else:
337 job_order_file = None
338
339 if job_order_object is not None:
340 input_basedir = args.basedir if args.basedir else os.getcwd()
341 elif job_order_file is not None:
342 input_basedir = (
343 args.basedir
344 if args.basedir
345 else os.path.abspath(os.path.dirname(job_order_file))
346 )
347 job_order_object, _ = loader.resolve_ref(
348 job_order_file,
349 checklinks=False,
350 content_types=CWL_CONTENT_TYPES,
351 )
352
353 if (
354 job_order_object is not None
355 and "http://commonwl.org/cwltool#overrides" in job_order_object
356 ):
357 ov_uri = file_uri(job_order_file or input_basedir)
358 overrides_list.extend(
359 resolve_overrides(job_order_object, ov_uri, tool_file_uri)
360 )
361 del job_order_object["http://commonwl.org/cwltool#overrides"]
362
363 if job_order_object is None:
364 input_basedir = args.basedir if args.basedir else os.getcwd()
365
366 if job_order_object is not None and not isinstance(
367 job_order_object, MutableMapping
368 ):
369 _logger.error(
370 "CWL input object at %s is not formatted correctly, it should be a "
371 "JSON/YAML dictionay, not %s.\n"
372 "Raw input object:\n%s",
373 job_order_file or "stdin",
374 type(job_order_object),
375 job_order_object,
376 )
377 sys.exit(1)
378 return (job_order_object, input_basedir, loader)
379
380
381 def init_job_order(
382 job_order_object: Optional[CWLObjectType],
383 args: argparse.Namespace,
384 process: Process,
385 loader: Loader,
386 stdout: Union[TextIO, StreamWriter],
387 print_input_deps: bool = False,
388 relative_deps: str = "primary",
389 make_fs_access: Callable[[str], StdFsAccess] = StdFsAccess,
390 input_basedir: str = "",
391 secret_store: Optional[SecretStore] = None,
392 input_required: bool = True,
393 ) -> CWLObjectType:
394 secrets_req, _ = process.get_requirement("http://commonwl.org/cwltool#Secrets")
395 if job_order_object is None:
396 namemap = {} # type: Dict[str, str]
397 records = [] # type: List[str]
398 toolparser = generate_parser(
399 argparse.ArgumentParser(prog=args.workflow),
400 process,
401 namemap,
402 records,
403 input_required,
404 )
405 if args.tool_help:
406 toolparser.print_help()
407 exit(0)
408 cmd_line = vars(toolparser.parse_args(args.job_order))
409 for record_name in records:
410 record = {}
411 record_items = {
412 k: v for k, v in cmd_line.items() if k.startswith(record_name)
413 }
414 for key, value in record_items.items():
415 record[key[len(record_name) + 1 :]] = value
416 del cmd_line[key]
417 cmd_line[str(record_name)] = record
418 if "job_order" in cmd_line and cmd_line["job_order"]:
419 try:
420 job_order_object = cast(
421 CWLObjectType,
422 loader.resolve_ref(cmd_line["job_order"])[0],
423 )
424 except Exception:
425 _logger.exception(
426 "Failed to resolv job_order: %s", cmd_line["job_order"]
427 )
428 exit(1)
429 else:
430 job_order_object = {"id": args.workflow}
431
432 del cmd_line["job_order"]
433
434 job_order_object.update({namemap[k]: v for k, v in cmd_line.items()})
435
436 if secret_store and secrets_req:
437 secret_store.store(
438 [shortname(sc) for sc in cast(List[str], secrets_req["secrets"])],
439 job_order_object,
440 )
441
442 if _logger.isEnabledFor(logging.DEBUG):
443 _logger.debug(
444 "Parsed job order from command line: %s",
445 json_dumps(job_order_object, indent=4),
446 )
447
448 for inp in process.tool["inputs"]:
449 if "default" in inp and (
450 not job_order_object or shortname(inp["id"]) not in job_order_object
451 ):
452 if not job_order_object:
453 job_order_object = {}
454 job_order_object[shortname(inp["id"])] = inp["default"]
455
456 if job_order_object is None:
457 if process.tool["inputs"]:
458 if toolparser is not None:
459 print(f"\nOptions for {args.workflow} ")
460 toolparser.print_help()
461 _logger.error("")
462 _logger.error("Input object required, use --help for details")
463 exit(1)
464 else:
465 job_order_object = {}
466
467 if print_input_deps:
468 basedir = None # type: Optional[str]
469 uri = cast(str, job_order_object["id"])
470 if uri == args.workflow:
471 basedir = os.path.dirname(uri)
472 uri = ""
473 printdeps(
474 job_order_object,
475 loader,
476 stdout,
477 relative_deps,
478 uri,
479 basedir=basedir,
480 nestdirs=False,
481 )
482 exit(0)
483
484 def path_to_loc(p: CWLObjectType) -> None:
485 if "location" not in p and "path" in p:
486 p["location"] = p["path"]
487 del p["path"]
488
489 ns = {} # type: ContextType
490 ns.update(cast(ContextType, job_order_object.get("$namespaces", {})))
491 ns.update(cast(ContextType, process.metadata.get("$namespaces", {})))
492 ld = Loader(ns)
493
494 def expand_formats(p: CWLObjectType) -> None:
495 if "format" in p:
496 p["format"] = ld.expand_url(cast(str, p["format"]), "")
497
498 visit_class(job_order_object, ("File", "Directory"), path_to_loc)
499 visit_class(
500 job_order_object,
501 ("File",),
502 functools.partial(add_sizes, make_fs_access(input_basedir)),
503 )
504 visit_class(job_order_object, ("File",), expand_formats)
505 adjustDirObjs(job_order_object, trim_listing)
506 normalizeFilesDirs(job_order_object)
507
508 if secret_store and secrets_req:
509 secret_store.store(
510 [shortname(sc) for sc in cast(List[str], secrets_req["secrets"])],
511 job_order_object,
512 )
513
514 if "cwl:tool" in job_order_object:
515 del job_order_object["cwl:tool"]
516 if "id" in job_order_object:
517 del job_order_object["id"]
518 return job_order_object
519
520
521 def make_relative(base: str, obj: CWLObjectType) -> None:
522 """Relativize the location URI of a File or Directory object."""
523 uri = cast(str, obj.get("location", obj.get("path")))
524 if ":" in uri.split("/")[0] and not uri.startswith("file://"):
525 pass
526 else:
527 if uri.startswith("file://"):
528 uri = uri_file_path(uri)
529 obj["location"] = os.path.relpath(uri, base)
530
531
532 def printdeps(
533 obj: CWLObjectType,
534 document_loader: Loader,
535 stdout: Union[TextIO, StreamWriter],
536 relative_deps: str,
537 uri: str,
538 basedir: Optional[str] = None,
539 nestdirs: bool = True,
540 ) -> None:
541 """Print a JSON representation of the dependencies of the CWL document."""
542 deps = find_deps(obj, document_loader, uri, basedir=basedir, nestdirs=nestdirs)
543 if relative_deps == "primary":
544 base = basedir if basedir else os.path.dirname(uri_file_path(str(uri)))
545 elif relative_deps == "cwd":
546 base = os.getcwd()
547 visit_class(deps, ("File", "Directory"), functools.partial(make_relative, base))
548 stdout.write(json_dumps(deps, indent=4))
549
550
551 def prov_deps(
552 obj: CWLObjectType,
553 document_loader: Loader,
554 uri: str,
555 basedir: Optional[str] = None,
556 ) -> CWLObjectType:
557 deps = find_deps(obj, document_loader, uri, basedir=basedir)
558
559 def remove_non_cwl(deps: CWLObjectType) -> None:
560 if "secondaryFiles" in deps:
561 sec_files = cast(List[CWLObjectType], deps["secondaryFiles"])
562 for index, entry in enumerate(sec_files):
563 if not ("format" in entry and entry["format"] == CWL_IANA):
564 del sec_files[index]
565 else:
566 remove_non_cwl(entry)
567
568 remove_non_cwl(deps)
569 return deps
570
571
572 def find_deps(
573 obj: CWLObjectType,
574 document_loader: Loader,
575 uri: str,
576 basedir: Optional[str] = None,
577 nestdirs: bool = True,
578 ) -> CWLObjectType:
579 """Find the dependencies of the CWL document."""
580 deps = {
581 "class": "File",
582 "location": uri,
583 "format": CWL_IANA,
584 } # type: CWLObjectType
585
586 def loadref(base: str, uri: str) -> Union[CommentedMap, CommentedSeq, str, None]:
587 return document_loader.fetch(document_loader.fetcher.urljoin(base, uri))
588
589 sfs = scandeps(
590 basedir if basedir else uri,
591 obj,
592 {"$import", "run"},
593 {"$include", "$schemas", "location"},
594 loadref,
595 nestdirs=nestdirs,
596 )
597 if sfs is not None:
598 deps["secondaryFiles"] = cast(MutableSequence[CWLOutputAtomType], sfs)
599
600 return deps
601
602
603 def print_pack(
604 loadingContext: LoadingContext,
605 uri: str,
606 ) -> str:
607 """Return a CWL serialization of the CWL document in JSON."""
608 packed = pack(loadingContext, uri)
609 if len(cast(Sized, packed["$graph"])) > 1:
610 return json_dumps(packed, indent=4)
611 return json_dumps(
612 cast(MutableSequence[CWLObjectType], packed["$graph"])[0], indent=4
613 )
614
615
616 def supported_cwl_versions(enable_dev: bool) -> List[str]:
617 # ALLUPDATES and UPDATES are dicts
618 if enable_dev:
619 versions = list(ALLUPDATES)
620 else:
621 versions = list(UPDATES)
622 versions.sort()
623 return versions
624
625
626 def configure_logging(
627 args: argparse.Namespace,
628 stderr_handler: logging.Handler,
629 runtimeContext: RuntimeContext,
630 ) -> None:
631 rdflib_logger = logging.getLogger("rdflib.term")
632 rdflib_logger.addHandler(stderr_handler)
633 rdflib_logger.setLevel(logging.ERROR)
634 if args.quiet:
635 # Silence STDERR, not an eventual provenance log file
636 stderr_handler.setLevel(logging.WARN)
637 if runtimeContext.debug:
638 # Increase to debug for both stderr and provenance log file
639 _logger.setLevel(logging.DEBUG)
640 stderr_handler.setLevel(logging.DEBUG)
641 rdflib_logger.setLevel(logging.DEBUG)
642 fmtclass = coloredlogs.ColoredFormatter if args.enable_color else logging.Formatter
643 formatter = fmtclass("%(levelname)s %(message)s")
644 if args.timestamps:
645 formatter = fmtclass(
646 "[%(asctime)s] %(levelname)s %(message)s", "%Y-%m-%d %H:%M:%S"
647 )
648 stderr_handler.setFormatter(formatter)
649
650
651 def setup_schema(
652 args: argparse.Namespace, custom_schema_callback: Optional[Callable[[], None]]
653 ) -> None:
654 if custom_schema_callback is not None:
655 custom_schema_callback()
656 elif args.enable_ext:
657 with pkg_resources.resource_stream(__name__, "extensions.yml") as res:
658 ext10 = res.read().decode("utf-8")
659 with pkg_resources.resource_stream(__name__, "extensions-v1.1.yml") as res:
660 ext11 = res.read().decode("utf-8")
661 use_custom_schema("v1.0", "http://commonwl.org/cwltool", ext10)
662 use_custom_schema("v1.1", "http://commonwl.org/cwltool", ext11)
663 use_custom_schema("v1.2.0-dev1", "http://commonwl.org/cwltool", ext11)
664 use_custom_schema("v1.2.0-dev2", "http://commonwl.org/cwltool", ext11)
665 use_custom_schema("v1.2.0-dev3", "http://commonwl.org/cwltool", ext11)
666 else:
667 use_standard_schema("v1.0")
668 use_standard_schema("v1.1")
669 use_standard_schema("v1.2.0-dev1")
670 use_standard_schema("v1.2.0-dev2")
671 use_standard_schema("v1.2.0-dev3")
672
673
674 class ProvLogFormatter(logging.Formatter):
675 """Enforce ISO8601 with both T and Z."""
676
677 def __init__(self) -> None:
678 """Use the default formatter with our custom formatstring."""
679 super().__init__("[%(asctime)sZ] %(message)s")
680
681 def formatTime(
682 self, record: logging.LogRecord, datefmt: Optional[str] = None
683 ) -> str:
684 formatted_time = time.strftime(
685 "%Y-%m-%dT%H:%M:%S", time.gmtime(float(record.created))
686 )
687 with_msecs = f"{formatted_time},{record.msecs:03f}"
688 return with_msecs
689
690
691 def setup_provenance(
692 args: argparse.Namespace,
693 argsl: List[str],
694 runtimeContext: RuntimeContext,
695 ) -> Optional[int]:
696 if not args.compute_checksum:
697 _logger.error("--provenance incompatible with --no-compute-checksum")
698 return 1
699 ro = ResearchObject(
700 getdefault(runtimeContext.make_fs_access, StdFsAccess)(""),
701 temp_prefix_ro=args.tmpdir_prefix,
702 orcid=args.orcid,
703 full_name=args.cwl_full_name,
704 )
705 runtimeContext.research_obj = ro
706 log_file_io = ro.open_log_file_for_activity(ro.engine_uuid)
707 prov_log_handler = logging.StreamHandler(cast(IO[str], log_file_io))
708
709 prov_log_handler.setFormatter(ProvLogFormatter())
710 _logger.addHandler(prov_log_handler)
711 _logger.debug("[provenance] Logging to %s", log_file_io)
712 if argsl is not None:
713 # Log cwltool command line options to provenance file
714 _logger.info("[cwltool] %s %s", sys.argv[0], " ".join(argsl))
715 _logger.debug("[cwltool] Arguments: %s", args)
716 return None
717
718
719 def setup_loadingContext(
720 loadingContext: Optional[LoadingContext],
721 runtimeContext: RuntimeContext,
722 args: argparse.Namespace,
723 ) -> LoadingContext:
724 if loadingContext is None:
725 loadingContext = LoadingContext(vars(args))
726 else:
727 loadingContext = loadingContext.copy()
728 loadingContext.loader = default_loader(
729 loadingContext.fetcher_constructor,
730 enable_dev=args.enable_dev,
731 doc_cache=args.doc_cache,
732 )
733 loadingContext.research_obj = runtimeContext.research_obj
734 loadingContext.disable_js_validation = args.disable_js_validation or (
735 not args.do_validate
736 )
737 loadingContext.construct_tool_object = getdefault(
738 loadingContext.construct_tool_object, workflow.default_make_tool
739 )
740 loadingContext.resolver = getdefault(loadingContext.resolver, tool_resolver)
741 if loadingContext.do_update is None:
742 loadingContext.do_update = not (args.pack or args.print_subgraph)
743
744 return loadingContext
745
746
747 def make_template(
748 tool: Process,
749 ) -> None:
750 """Make a template CWL input object for the give Process."""
751
752 def my_represent_none(
753 self: Any, data: Any
754 ) -> Any: # pylint: disable=unused-argument
755 """Force clean representation of 'null'."""
756 return self.represent_scalar("tag:yaml.org,2002:null", "null")
757
758 yaml.representer.RoundTripRepresenter.add_representer(type(None), my_represent_none)
759 yaml.main.round_trip_dump(
760 generate_input_template(tool),
761 sys.stdout,
762 default_flow_style=False,
763 indent=4,
764 block_seq_indent=2,
765 )
766
767
768 def choose_target(
769 args: argparse.Namespace,
770 tool: Process,
771 loadingContext: LoadingContext,
772 ) -> Optional[Process]:
773 """Walk the Workflow, extract the subset matches all the args.targets."""
774 if loadingContext.loader is None:
775 raise Exception("loadingContext.loader cannot be None")
776
777 if isinstance(tool, Workflow):
778 url = urllib.parse.urlparse(tool.tool["id"])
779 if url.fragment:
780 extracted = get_subgraph(
781 [tool.tool["id"] + "/" + r for r in args.target], tool
782 )
783 else:
784 extracted = get_subgraph(
785 [
786 loadingContext.loader.fetcher.urljoin(tool.tool["id"], "#" + r)
787 for r in args.target
788 ],
789 tool,
790 )
791 else:
792 _logger.error("Can only use --target on Workflows")
793 return None
794 if isinstance(loadingContext.loader.idx, MutableMapping):
795 loadingContext.loader.idx[extracted["id"]] = extracted
796 tool = make_tool(extracted["id"], loadingContext)
797 else:
798 raise Exception("Missing loadingContext.loader.idx!")
799
800 return tool
801
802
803 def choose_step(
804 args: argparse.Namespace,
805 tool: Process,
806 loadingContext: LoadingContext,
807 ) -> Optional[Process]:
808 """Walk the given Workflow and extract just args.single_step."""
809 if loadingContext.loader is None:
810 raise Exception("loadingContext.loader cannot be None")
811
812 if isinstance(tool, Workflow):
813 url = urllib.parse.urlparse(tool.tool["id"])
814 if url.fragment:
815 extracted = get_step(tool, tool.tool["id"] + "/" + args.singe_step)
816 else:
817 extracted = get_step(
818 tool,
819 loadingContext.loader.fetcher.urljoin(
820 tool.tool["id"], "#" + args.single_step
821 ),
822 )
823 else:
824 _logger.error("Can only use --single-step on Workflows")
825 return None
826 if isinstance(loadingContext.loader.idx, MutableMapping):
827 loadingContext.loader.idx[extracted["id"]] = extracted
828 tool = make_tool(extracted["id"], loadingContext)
829 else:
830 raise Exception("Missing loadingContext.loader.idx!")
831
832 return tool
833
834
835 def check_working_directories(
836 runtimeContext: RuntimeContext,
837 ) -> Optional[int]:
838 """Make any needed working directories."""
839 for dirprefix in ("tmpdir_prefix", "tmp_outdir_prefix", "cachedir"):
840 if (
841 getattr(runtimeContext, dirprefix)
842 and getattr(runtimeContext, dirprefix) != DEFAULT_TMP_PREFIX
843 ):
844 sl = (
845 "/"
846 if getattr(runtimeContext, dirprefix).endswith("/")
847 or dirprefix == "cachedir"
848 else ""
849 )
850 setattr(
851 runtimeContext,
852 dirprefix,
853 os.path.abspath(getattr(runtimeContext, dirprefix)) + sl,
854 )
855 if not os.path.exists(os.path.dirname(getattr(runtimeContext, dirprefix))):
856 try:
857 os.makedirs(os.path.dirname(getattr(runtimeContext, dirprefix)))
858 except Exception:
859 _logger.exception("Failed to create directory.")
860 return 1
861 return None
862
863
864 def main(
865 argsl: Optional[List[str]] = None,
866 args: Optional[argparse.Namespace] = None,
867 job_order_object: Optional[CWLObjectType] = None,
868 stdin: IO[Any] = sys.stdin,
869 stdout: Optional[Union[TextIO, StreamWriter]] = None,
870 stderr: IO[Any] = sys.stderr,
871 versionfunc: Callable[[], str] = versionstring,
872 logger_handler: Optional[logging.Handler] = None,
873 custom_schema_callback: Optional[Callable[[], None]] = None,
874 executor: Optional[JobExecutor] = None,
875 loadingContext: Optional[LoadingContext] = None,
876 runtimeContext: Optional[RuntimeContext] = None,
877 input_required: bool = True,
878 ) -> int:
879 if not stdout: # force UTF-8 even if the console is configured differently
880 if hasattr(sys.stdout, "encoding") and sys.stdout.encoding.upper() not in (
881 "UTF-8",
882 "UTF8",
883 ):
884 if hasattr(sys.stdout, "detach"):
885 stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
886 else:
887 stdout = getwriter("utf-8")(sys.stdout) # type: ignore
888 else:
889 stdout = sys.stdout
890
891 _logger.removeHandler(defaultStreamHandler)
892 stderr_handler = logger_handler
893 if stderr_handler is not None:
894 _logger.addHandler(stderr_handler)
895 else:
896 coloredlogs.install(logger=_logger, stream=stderr)
897 stderr_handler = _logger.handlers[-1]
898 workflowobj = None
899 prov_log_handler = None # type: Optional[logging.StreamHandler]
900 try:
901 if args is None:
902 if argsl is None:
903 argsl = sys.argv[1:]
904 addl = [] # type: List[str]
905 if "CWLTOOL_OPTIONS" in os.environ:
906 addl = os.environ["CWLTOOL_OPTIONS"].split(" ")
907 parser = arg_parser()
908 argcomplete.autocomplete(parser)
909 args = parser.parse_args(addl + argsl)
910 if args.record_container_id:
911 if not args.cidfile_dir:
912 args.cidfile_dir = os.getcwd()
913 del args.record_container_id
914
915 if runtimeContext is None:
916 runtimeContext = RuntimeContext(vars(args))
917 else:
918 runtimeContext = runtimeContext.copy()
919
920 # If on Windows platform, a default Docker Container is used if not
921 # explicitely provided by user
922 if onWindows() and not runtimeContext.default_container:
923 # This docker image is a minimal alpine image with bash installed
924 # (size 6 mb). source: https://github.com/frol/docker-alpine-bash
925 runtimeContext.default_container = windows_default_container_id
926
927 # If caller parsed its own arguments, it may not include every
928 # cwltool option, so fill in defaults to avoid crashing when
929 # dereferencing them in args.
930 for key, val in get_default_args().items():
931 if not hasattr(args, key):
932 setattr(args, key, val)
933
934 configure_logging(args, stderr_handler, runtimeContext)
935
936 if args.version:
937 print(versionfunc())
938 return 0
939 _logger.info(versionfunc())
940
941 if args.print_supported_versions:
942 print("\n".join(supported_cwl_versions(args.enable_dev)))
943 return 0
944
945 if not args.workflow:
946 if os.path.isfile("CWLFile"):
947 args.workflow = "CWLFile"
948 else:
949 _logger.error("CWL document required, no input file was provided")
950 parser.print_help()
951 return 1
952
953 if args.ga4gh_tool_registries:
954 ga4gh_tool_registries[:] = args.ga4gh_tool_registries
955 if not args.enable_ga4gh_tool_registry:
956 del ga4gh_tool_registries[:]
957
958 if args.mpi_config_file is not None:
959 runtimeContext.mpi_config = MpiConfig.load(args.mpi_config_file)
960
961 setup_schema(args, custom_schema_callback)
962
963 if args.provenance:
964 if argsl is None:
965 raise Exception("argsl cannot be None")
966 if setup_provenance(args, argsl, runtimeContext) is not None:
967 return 1
968
969 loadingContext = setup_loadingContext(loadingContext, runtimeContext, args)
970
971 uri, tool_file_uri = resolve_tool_uri(
972 args.workflow,
973 resolver=loadingContext.resolver,
974 fetcher_constructor=loadingContext.fetcher_constructor,
975 )
976
977 try_again_msg = (
978 "" if args.debug else ", try again with --debug for more information"
979 )
980
981 try:
982 job_order_object, input_basedir, jobloader = load_job_order(
983 args,
984 stdin,
985 loadingContext.fetcher_constructor,
986 loadingContext.overrides_list,
987 tool_file_uri,
988 )
989
990 if args.overrides:
991 loadingContext.overrides_list.extend(
992 load_overrides(
993 file_uri(os.path.abspath(args.overrides)), tool_file_uri
994 )
995 )
996
997 loadingContext, workflowobj, uri = fetch_document(uri, loadingContext)
998
999 if args.print_deps and loadingContext.loader:
1000 printdeps(
1001 workflowobj, loadingContext.loader, stdout, args.relative_deps, uri
1002 )
1003 return 0
1004
1005 loadingContext, uri = resolve_and_validate_document(
1006 loadingContext,
1007 workflowobj,
1008 uri,
1009 preprocess_only=(args.print_pre or args.pack),
1010 skip_schemas=args.skip_schemas,
1011 )
1012
1013 if loadingContext.loader is None:
1014 raise Exception("Impossible code path.")
1015 processobj, metadata = loadingContext.loader.resolve_ref(uri)
1016 processobj = cast(CommentedMap, processobj)
1017 if args.pack:
1018 stdout.write(print_pack(loadingContext, uri))
1019 return 0
1020
1021 if args.provenance and runtimeContext.research_obj:
1022 # Can't really be combined with args.pack at same time
1023 runtimeContext.research_obj.packed_workflow(
1024 print_pack(loadingContext, uri)
1025 )
1026
1027 if args.print_pre:
1028 stdout.write(
1029 json_dumps(
1030 processobj, indent=4, sort_keys=True, separators=(",", ": ")
1031 )
1032 )
1033 return 0
1034
1035 tool = make_tool(uri, loadingContext)
1036 if args.make_template:
1037 make_template(tool)
1038 return 0
1039
1040 if args.validate:
1041 print(f"{args.workflow} is valid CWL.")
1042 return 0
1043
1044 if args.print_rdf:
1045 stdout.write(
1046 printrdf(tool, loadingContext.loader.ctx, args.rdf_serializer)
1047 )
1048 return 0
1049
1050 if args.print_dot:
1051 printdot(tool, loadingContext.loader.ctx, stdout)
1052 return 0
1053
1054 if args.print_targets:
1055 for f in ("outputs", "steps", "inputs"):
1056 if tool.tool[f]:
1057 _logger.info("%s%s targets:", f[0].upper(), f[1:-1])
1058 stdout.write(
1059 " "
1060 + "\n ".join([shortname(t["id"]) for t in tool.tool[f]])
1061 + "\n"
1062 )
1063 return 0
1064
1065 if args.target:
1066 ctool = choose_target(args, tool, loadingContext)
1067 if ctool is None:
1068 return 1
1069 else:
1070 tool = ctool
1071
1072 elif args.single_step:
1073 ctool = choose_step(args, tool, loadingContext)
1074 if ctool is None:
1075 return 1
1076 else:
1077 tool = ctool
1078
1079 if args.print_subgraph:
1080 if "name" in tool.tool:
1081 del tool.tool["name"]
1082 stdout.write(
1083 json_dumps(
1084 tool.tool, indent=4, sort_keys=True, separators=(",", ": ")
1085 )
1086 )
1087 return 0
1088
1089 except (ValidationException) as exc:
1090 _logger.error(
1091 "Tool definition failed validation:\n%s", str(exc), exc_info=args.debug
1092 )
1093 return 1
1094 except (RuntimeError, WorkflowException) as exc:
1095 _logger.error(
1096 "Tool definition failed initialization:\n%s",
1097 str(exc),
1098 exc_info=args.debug,
1099 )
1100 return 1
1101 except Exception as exc:
1102 _logger.error(
1103 "I'm sorry, I couldn't load this CWL file%s.\nThe error was: %s",
1104 try_again_msg,
1105 str(exc) if not args.debug else "",
1106 exc_info=args.debug,
1107 )
1108 return 1
1109
1110 if isinstance(tool, int):
1111 return tool
1112
1113 # If on MacOS platform, TMPDIR must be set to be under one of the
1114 # shared volumes in Docker for Mac
1115 # More info: https://dockstore.org/docs/faq
1116 if sys.platform == "darwin":
1117 default_mac_path = "/private/tmp/docker_tmp"
1118 if runtimeContext.tmp_outdir_prefix == DEFAULT_TMP_PREFIX:
1119 runtimeContext.tmp_outdir_prefix = default_mac_path
1120 if runtimeContext.tmpdir_prefix == DEFAULT_TMP_PREFIX:
1121 runtimeContext.tmpdir_prefix = default_mac_path
1122
1123 if check_working_directories(runtimeContext) is not None:
1124 return 1
1125
1126 if args.cachedir:
1127 if args.move_outputs == "move":
1128 runtimeContext.move_outputs = "copy"
1129 runtimeContext.tmp_outdir_prefix = args.cachedir
1130
1131 runtimeContext.secret_store = getdefault(
1132 runtimeContext.secret_store, SecretStore()
1133 )
1134 runtimeContext.make_fs_access = getdefault(
1135 runtimeContext.make_fs_access, StdFsAccess
1136 )
1137
1138 if not executor:
1139 if args.parallel:
1140 temp_executor = MultithreadedJobExecutor()
1141 runtimeContext.select_resources = temp_executor.select_resources
1142 real_executor = temp_executor # type: JobExecutor
1143 else:
1144 real_executor = SingleJobExecutor()
1145 else:
1146 real_executor = executor
1147
1148 try:
1149 runtimeContext.basedir = input_basedir
1150
1151 if isinstance(tool, ProcessGenerator):
1152 tfjob_order = {} # type: CWLObjectType
1153 if loadingContext.jobdefaults:
1154 tfjob_order.update(loadingContext.jobdefaults)
1155 if job_order_object:
1156 tfjob_order.update(job_order_object)
1157 tfout, tfstatus = real_executor(
1158 tool.embedded_tool, tfjob_order, runtimeContext
1159 )
1160 if not tfout or tfstatus != "success":
1161 raise WorkflowException(
1162 "ProcessGenerator failed to generate workflow"
1163 )
1164 tool, job_order_object = tool.result(tfjob_order, tfout, runtimeContext)
1165 if not job_order_object:
1166 job_order_object = None
1167
1168 try:
1169 initialized_job_order_object = init_job_order(
1170 job_order_object,
1171 args,
1172 tool,
1173 jobloader,
1174 stdout,
1175 print_input_deps=args.print_input_deps,
1176 relative_deps=args.relative_deps,
1177 make_fs_access=runtimeContext.make_fs_access,
1178 input_basedir=input_basedir,
1179 secret_store=runtimeContext.secret_store,
1180 input_required=input_required,
1181 )
1182 except SystemExit as err:
1183 return err.code
1184
1185 del args.workflow
1186 del args.job_order
1187
1188 conf_file = getattr(
1189 args, "beta_dependency_resolvers_configuration", None
1190 ) # str
1191 use_conda_dependencies = getattr(
1192 args, "beta_conda_dependencies", None
1193 ) # str
1194
1195 if conf_file or use_conda_dependencies:
1196 runtimeContext.job_script_provider = DependenciesConfiguration(args)
1197 else:
1198 runtimeContext.find_default_container = functools.partial(
1199 find_default_container,
1200 default_container=runtimeContext.default_container,
1201 use_biocontainers=args.beta_use_biocontainers,
1202 )
1203
1204 (out, status) = real_executor(
1205 tool, initialized_job_order_object, runtimeContext, logger=_logger
1206 )
1207
1208 if out is not None:
1209 if runtimeContext.research_obj is not None:
1210 runtimeContext.research_obj.create_job(out, True)
1211
1212 def remove_at_id(doc: CWLObjectType) -> None:
1213 for key in list(doc.keys()):
1214 if key == "@id":
1215 del doc[key]
1216 else:
1217 value = doc[key]
1218 if isinstance(value, MutableMapping):
1219 remove_at_id(value)
1220 elif isinstance(value, MutableSequence):
1221 for entry in value:
1222 if isinstance(entry, MutableMapping):
1223 remove_at_id(entry)
1224
1225 remove_at_id(out)
1226 visit_class(
1227 out,
1228 ("File",),
1229 functools.partial(add_sizes, runtimeContext.make_fs_access("")),
1230 )
1231
1232 def loc_to_path(obj: CWLObjectType) -> None:
1233 for field in ("path", "nameext", "nameroot", "dirname"):
1234 if field in obj:
1235 del obj[field]
1236 if cast(str, obj["location"]).startswith("file://"):
1237 obj["path"] = uri_file_path(cast(str, obj["location"]))
1238
1239 visit_class(out, ("File", "Directory"), loc_to_path)
1240
1241 # Unsetting the Generation from final output object
1242 visit_class(out, ("File",), MutationManager().unset_generation)
1243
1244 if isinstance(out, str):
1245 stdout.write(out)
1246 else:
1247 stdout.write(json_dumps(out, indent=4, ensure_ascii=False))
1248 stdout.write("\n")
1249 if hasattr(stdout, "flush"):
1250 stdout.flush()
1251
1252 if status != "success":
1253 _logger.warning("Final process status is %s", status)
1254 return 1
1255 _logger.info("Final process status is %s", status)
1256 return 0
1257
1258 except (ValidationException) as exc:
1259 _logger.error(
1260 "Input object failed validation:\n%s", str(exc), exc_info=args.debug
1261 )
1262 return 1
1263 except UnsupportedRequirement as exc:
1264 _logger.error(
1265 "Workflow or tool uses unsupported feature:\n%s",
1266 str(exc),
1267 exc_info=args.debug,
1268 )
1269 return 33
1270 except WorkflowException as exc:
1271 _logger.error(
1272 "Workflow error%s:\n%s",
1273 try_again_msg,
1274 strip_dup_lineno(str(exc)),
1275 exc_info=args.debug,
1276 )
1277 return 1
1278 except Exception as exc: # pylint: disable=broad-except
1279 _logger.error(
1280 "Unhandled error%s:\n %s",
1281 try_again_msg,
1282 str(exc),
1283 exc_info=args.debug,
1284 )
1285 return 1
1286
1287 finally:
1288 if (
1289 args
1290 and runtimeContext
1291 and runtimeContext.research_obj
1292 and workflowobj
1293 and loadingContext
1294 ):
1295 research_obj = runtimeContext.research_obj
1296 if loadingContext.loader is not None:
1297 research_obj.generate_snapshot(
1298 prov_deps(workflowobj, loadingContext.loader, uri)
1299 )
1300 else:
1301 _logger.warning(
1302 "Unable to generate provenance snapshot "
1303 " due to missing loadingContext.loader."
1304 )
1305 if prov_log_handler is not None:
1306 # Stop logging so we won't half-log adding ourself to RO
1307 _logger.debug(
1308 "[provenance] Closing provenance log file %s", prov_log_handler
1309 )
1310 _logger.removeHandler(prov_log_handler)
1311 # Ensure last log lines are written out
1312 prov_log_handler.flush()
1313 # Underlying WritableBagFile will add the tagfile to the manifest
1314 prov_log_handler.stream.close()
1315 prov_log_handler.close()
1316 research_obj.close(args.provenance)
1317
1318 _logger.removeHandler(stderr_handler)
1319 _logger.addHandler(defaultStreamHandler)
1320
1321
1322 def find_default_container(
1323 builder: HasReqsHints,
1324 default_container: Optional[str] = None,
1325 use_biocontainers: Optional[bool] = None,
1326 ) -> Optional[str]:
1327 """Find a container."""
1328 if not default_container and use_biocontainers:
1329 default_container = get_container_from_software_requirements(
1330 use_biocontainers, builder
1331 )
1332 return default_container
1333
1334
1335 def run(*args, **kwargs):
1336 # type: (*Any, **Any) -> None
1337 """Run cwltool."""
1338 signal.signal(signal.SIGTERM, _signal_handler)
1339 try:
1340 sys.exit(main(*args, **kwargs))
1341 finally:
1342 _terminate_processes()
1343
1344
1345 if __name__ == "__main__":
1346 run(sys.argv[1:])