Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/cwltool/command_line_tool.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
| author | shellac |
|---|---|
| date | Mon, 01 Jun 2020 08:59:25 -0400 |
| parents | 79f47841a781 |
| children |
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/cwltool/command_line_tool.py Thu May 14 16:47:39 2020 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,853 +0,0 @@ -"""Implementation of CommandLineTool.""" -from __future__ import absolute_import - -import copy -import hashlib -import json -import locale -import logging -import os -import re -import shutil -import tempfile -import threading -from functools import cmp_to_key, partial -from typing import (Any, Callable, Dict, Generator, IO, List, Mapping, - MutableMapping, MutableSequence, Optional, Set, Union, cast) - -from typing_extensions import Text, Type, TYPE_CHECKING # pylint: disable=unused-import -# move to a regular typing import when Python 3.3-3.6 is no longer supported - -import shellescape -from schema_salad import validate -from schema_salad.avro.schema import Schema -from schema_salad.ref_resolver import file_uri, uri_file_path -from schema_salad.sourceline import SourceLine -from six import string_types -from future.utils import raise_from - -from six.moves import map, urllib -from typing_extensions import (TYPE_CHECKING, # pylint: disable=unused-import - Text, Type) -# move to a regular typing import when Python 3.3-3.6 is no longer supported - -from .builder import (Builder, content_limit_respected_read_bytes, # pylint: disable=unused-import - substitute) -from .context import LoadingContext # pylint: disable=unused-import -from .context import RuntimeContext, getdefault -from .docker import DockerCommandLineJob -from .errors import WorkflowException -from .flatten import flatten -from .job import CommandLineJob, JobBase # pylint: disable=unused-import -from .loghandler import _logger -from .mutation import MutationManager # pylint: disable=unused-import -from .pathmapper import (PathMapper, adjustDirObjs, adjustFileObjs, - get_listing, trim_listing, visit_class) -from .process import (Process, UnsupportedRequirement, - _logger_validation_warnings, compute_checksums, - normalizeFilesDirs, shortname, uniquename) -from .singularity import SingularityCommandLineJob -from .software_requirements import ( # pylint: disable=unused-import - DependenciesConfiguration) -from .stdfsaccess import StdFsAccess # pylint: disable=unused-import -from .utils import (aslist, convert_pathsep_to_unix, - docker_windows_path_adjust, json_dumps, onWindows, - random_outdir, windows_default_container_id, - shared_file_lock, upgrade_lock) -if TYPE_CHECKING: - from .provenance import ProvenanceProfile # pylint: disable=unused-import - -ACCEPTLIST_EN_STRICT_RE = re.compile(r"^[a-zA-Z0-9._+-]+$") -ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*") # Accept anything -ACCEPTLIST_RE = ACCEPTLIST_EN_STRICT_RE -DEFAULT_CONTAINER_MSG = """ -We are on Microsoft Windows and not all components of this CWL description have a -container specified. This means that these steps will be executed in the default container, -which is %s. - -Note, this could affect portability if this CWL description relies on non-POSIX features -or commands in this container. For best results add the following to your CWL -description's hints section: - -hints: - DockerRequirement: - dockerPull: %s -""" - - -class ExpressionTool(Process): - class ExpressionJob(object): - """Job for ExpressionTools.""" - - def __init__(self, - builder, # type: Builder - script, # type: Dict[Text, Text] - output_callback, # type: Callable[[Any, Any], Any] - requirements, # type: List[Dict[Text, Text]] - hints, # type: List[Dict[Text, Text]] - outdir=None, # type: Optional[Text] - tmpdir=None, # type: Optional[Text] - ): # type: (...) -> None - """Initializet this ExpressionJob.""" - self.builder = builder - self.requirements = requirements - self.hints = hints - self.collect_outputs = None # type: Optional[Callable[[Any], Any]] - self.output_callback = output_callback - self.outdir = outdir - self.tmpdir = tmpdir - self.script = script - self.prov_obj = None # type: Optional[ProvenanceProfile] - - def run(self, - runtimeContext, # type: RuntimeContext - tmpdir_lock=None # type: Optional[threading.Lock] - ): # type: (...) -> None - try: - normalizeFilesDirs(self.builder.job) - ev = self.builder.do_eval(self.script) - normalizeFilesDirs(ev) - self.output_callback(ev, "success") - except Exception as err: - _logger.warning(u"Failed to evaluate expression:\n%s", - Text(err), exc_info=runtimeContext.debug) - self.output_callback({}, "permanentFail") - - def job(self, - job_order, # type: Mapping[Text, Text] - output_callbacks, # type: Callable[[Any, Any], Any] - runtimeContext # type: RuntimeContext - ): - # type: (...) -> Generator[ExpressionTool.ExpressionJob, None, None] - builder = self._init_job(job_order, runtimeContext) - - job = ExpressionTool.ExpressionJob( - builder, self.tool["expression"], output_callbacks, - self.requirements, self.hints) - job.prov_obj = runtimeContext.prov_obj - yield job - - -def remove_path(f): # type: (Dict[Text, Any]) -> None - if "path" in f: - del f["path"] - - -def revmap_file(builder, outdir, f): - # type: (Builder, Text, Dict[Text, Any]) -> Union[Dict[Text, Any], None] - """ - Remap a file from internal path to external path. - - For Docker, this maps from the path inside tho container to the path - outside the container. Recognizes files in the pathmapper or remaps - internal output directories to the external directory. - """ - split = urllib.parse.urlsplit(outdir) - if not split.scheme: - outdir = file_uri(str(outdir)) - - # builder.outdir is the inner (container/compute node) output directory - # outdir is the outer (host/storage system) output directory - - if "location" in f and "path" not in f: - if f["location"].startswith("file://"): - f["path"] = convert_pathsep_to_unix(uri_file_path(f["location"])) - else: - return f - - if "path" in f: - path = f["path"] - uripath = file_uri(path) - del f["path"] - - if "basename" not in f: - f["basename"] = os.path.basename(path) - - if not builder.pathmapper: - raise ValueError("Do not call revmap_file using a builder that doesn't have a pathmapper.") - revmap_f = builder.pathmapper.reversemap(path) - - if revmap_f and not builder.pathmapper.mapper(revmap_f[0]).type.startswith("Writable"): - f["location"] = revmap_f[1] - elif uripath == outdir or uripath.startswith(outdir+os.sep) or uripath.startswith(outdir+'/'): - f["location"] = file_uri(path) - elif path == builder.outdir or path.startswith(builder.outdir+os.sep) or path.startswith(builder.outdir+'/'): - f["location"] = builder.fs_access.join(outdir, path[len(builder.outdir) + 1:]) - elif not os.path.isabs(path): - f["location"] = builder.fs_access.join(outdir, path) - else: - raise WorkflowException(u"Output file path %s must be within designated output directory (%s) or an input " - u"file pass through." % (path, builder.outdir)) - return f - - raise WorkflowException(u"Output File object is missing both 'location' " - "and 'path' fields: %s" % f) - - -class CallbackJob(object): - def __init__(self, job, output_callback, cachebuilder, jobcache): - # type: (CommandLineTool, Callable[[Any, Any], Any], Builder, Text) -> None - """Initialize this CallbackJob.""" - self.job = job - self.output_callback = output_callback - self.cachebuilder = cachebuilder - self.outdir = jobcache - self.prov_obj = None # type: Optional[ProvenanceProfile] - - def run(self, - runtimeContext, # type: RuntimeContext - tmpdir_lock=None # type: Optional[threading.Lock] - ): # type: (...) -> None - self.output_callback(self.job.collect_output_ports( - self.job.tool["outputs"], - self.cachebuilder, - self.outdir, - getdefault(runtimeContext.compute_checksum, True)), "success") - - -def check_adjust(builder, file_o): - # type: (Builder, Dict[Text, Any]) -> Dict[Text, Any] - """ - Map files to assigned path inside a container. - - We need to also explicitly walk over input, as implicit reassignment - doesn't reach everything in builder.bindings - """ - if not builder.pathmapper: - raise ValueError("Do not call check_adjust using a builder that doesn't have a pathmapper.") - file_o["path"] = docker_windows_path_adjust( - builder.pathmapper.mapper(file_o["location"])[1]) - dn, bn = os.path.split(file_o["path"]) - if file_o.get("dirname") != dn: - file_o["dirname"] = Text(dn) - if file_o.get("basename") != bn: - file_o["basename"] = Text(bn) - if file_o["class"] == "File": - nr, ne = os.path.splitext(file_o["basename"]) - if file_o.get("nameroot") != nr: - file_o["nameroot"] = Text(nr) - if file_o.get("nameext") != ne: - file_o["nameext"] = Text(ne) - if not ACCEPTLIST_RE.match(file_o["basename"]): - raise WorkflowException( - "Invalid filename: '{}' contains illegal characters".format( - file_o["basename"])) - return file_o - -def check_valid_locations(fs_access, ob): # type: (StdFsAccess, Dict[Text, Any]) -> None - if ob["location"].startswith("_:"): - pass - if ob["class"] == "File" and not fs_access.isfile(ob["location"]): - raise validate.ValidationException("Does not exist or is not a File: '%s'" % ob["location"]) - if ob["class"] == "Directory" and not fs_access.isdir(ob["location"]): - raise validate.ValidationException("Does not exist or is not a Directory: '%s'" % ob["location"]) - - -OutputPorts = Dict[Text, Union[None, Text, List[Union[Dict[Text, Any], Text]], Dict[Text, Any]]] - -class CommandLineTool(Process): - def __init__(self, toolpath_object, loadingContext): - # type: (MutableMapping[Text, Any], LoadingContext) -> None - """Initialize this CommandLineTool.""" - super(CommandLineTool, self).__init__(toolpath_object, loadingContext) - self.prov_obj = loadingContext.prov_obj - - def make_job_runner(self, - runtimeContext # type: RuntimeContext - ): # type: (...) -> Type[JobBase] - dockerReq, _ = self.get_requirement("DockerRequirement") - if not dockerReq and runtimeContext.use_container: - if runtimeContext.find_default_container is not None: - default_container = runtimeContext.find_default_container(self) - if default_container is not None: - self.requirements.insert(0, { - "class": "DockerRequirement", - "dockerPull": default_container - }) - dockerReq = self.requirements[0] - if default_container == windows_default_container_id \ - and runtimeContext.use_container and onWindows(): - _logger.warning( - DEFAULT_CONTAINER_MSG, windows_default_container_id, - windows_default_container_id) - - if dockerReq is not None and runtimeContext.use_container: - if runtimeContext.singularity: - return SingularityCommandLineJob - return DockerCommandLineJob - for t in reversed(self.requirements): - if t["class"] == "DockerRequirement": - raise UnsupportedRequirement( - "--no-container, but this CommandLineTool has " - "DockerRequirement under 'requirements'.") - return CommandLineJob - - def make_path_mapper(self, reffiles, stagedir, runtimeContext, separateDirs): - # type: (List[Any], Text, RuntimeContext, bool) -> PathMapper - return PathMapper(reffiles, runtimeContext.basedir, stagedir, separateDirs) - - def updatePathmap(self, outdir, pathmap, fn): - # type: (Text, PathMapper, Dict[Text, Any]) -> None - if "location" in fn and fn["location"] in pathmap: - pathmap.update(fn["location"], pathmap.mapper(fn["location"]).resolved, - os.path.join(outdir, fn["basename"]), - ("Writable" if fn.get("writable") else "") + fn["class"], False) - for sf in fn.get("secondaryFiles", []): - self.updatePathmap(outdir, pathmap, sf) - for ls in fn.get("listing", []): - self.updatePathmap(os.path.join(outdir, fn["basename"]), pathmap, ls) - - def job(self, - job_order, # type: Mapping[Text, Text] - output_callbacks, # type: Callable[[Any, Any], Any] - runtimeContext # type: RuntimeContext - ): - # type: (...) -> Generator[Union[JobBase, CallbackJob], None, None] - - workReuse, _ = self.get_requirement("WorkReuse") - enableReuse = workReuse.get("enableReuse", True) if workReuse else True - - jobname = uniquename(runtimeContext.name or shortname(self.tool.get("id", "job"))) - if runtimeContext.cachedir and enableReuse: - cachecontext = runtimeContext.copy() - cachecontext.outdir = "/out" - cachecontext.tmpdir = "/tmp" # nosec - cachecontext.stagedir = "/stage" - cachebuilder = self._init_job(job_order, cachecontext) - cachebuilder.pathmapper = PathMapper(cachebuilder.files, - runtimeContext.basedir, - cachebuilder.stagedir, - separateDirs=False) - _check_adjust = partial(check_adjust, cachebuilder) - visit_class([cachebuilder.files, cachebuilder.bindings], - ("File", "Directory"), _check_adjust) - - cmdline = flatten(list(map(cachebuilder.generate_arg, cachebuilder.bindings))) - docker_req, _ = self.get_requirement("DockerRequirement") - if docker_req is not None and runtimeContext.use_container: - dockerimg = docker_req.get("dockerImageId") or docker_req.get("dockerPull") - elif runtimeContext.default_container is not None and runtimeContext.use_container: - dockerimg = runtimeContext.default_container - else: - dockerimg = None - - if dockerimg is not None: - cmdline = ["docker", "run", dockerimg] + cmdline - # not really run using docker, just for hashing purposes - keydict = {u"cmdline": cmdline} # type: Dict[Text, Union[Dict[Text, Any], List[Any]]] - - for shortcut in ["stdin", "stdout", "stderr"]: - if shortcut in self.tool: - keydict[shortcut] = self.tool[shortcut] - - for location, fobj in cachebuilder.pathmapper.items(): - if fobj.type == "File": - checksum = next( - (e['checksum'] for e in cachebuilder.files - if 'location' in e and e['location'] == location - and 'checksum' in e - and e['checksum'] != 'sha1$hash'), None) - fobj_stat = os.stat(fobj.resolved) - if checksum is not None: - keydict[fobj.resolved] = [fobj_stat.st_size, checksum] - else: - keydict[fobj.resolved] = [fobj_stat.st_size, - int(fobj_stat.st_mtime * 1000)] - - interesting = {"DockerRequirement", - "EnvVarRequirement", - "InitialWorkDirRequirement", - "ShellCommandRequirement", - "NetworkAccess"} - for rh in (self.original_requirements, self.original_hints): - for r in reversed(rh): - if r["class"] in interesting and r["class"] not in keydict: - keydict[r["class"]] = r - - keydictstr = json_dumps(keydict, separators=(',', ':'), - sort_keys=True) - cachekey = hashlib.md5( # nosec - keydictstr.encode('utf-8')).hexdigest() - - _logger.debug("[job %s] keydictstr is %s -> %s", jobname, - keydictstr, cachekey) - - jobcache = os.path.join(runtimeContext.cachedir, cachekey) - - # Create a lockfile to manage cache status. - jobcachepending = "{}.status".format(jobcache) - jobcachelock = None - jobstatus = None - - # Opens the file for read/write, or creates an empty file. - jobcachelock = open(jobcachepending, "a+") - - # get the shared lock to ensure no other process is trying - # to write to this cache - shared_file_lock(jobcachelock) - jobcachelock.seek(0) - jobstatus = jobcachelock.read() - - if os.path.isdir(jobcache) and jobstatus == "success": - if docker_req and runtimeContext.use_container: - cachebuilder.outdir = runtimeContext.docker_outdir or random_outdir() - else: - cachebuilder.outdir = jobcache - - _logger.info("[job %s] Using cached output in %s", jobname, jobcache) - yield CallbackJob(self, output_callbacks, cachebuilder, jobcache) - # we're done with the cache so release lock - jobcachelock.close() - return - else: - _logger.info("[job %s] Output of job will be cached in %s", jobname, jobcache) - - # turn shared lock into an exclusive lock since we'll - # be writing the cache directory - upgrade_lock(jobcachelock) - - shutil.rmtree(jobcache, True) - os.makedirs(jobcache) - runtimeContext = runtimeContext.copy() - runtimeContext.outdir = jobcache - - def update_status_output_callback( - output_callbacks, # type: Callable[[List[Dict[Text, Any]], Text], None] - jobcachelock, # type: IO[Any] - outputs, # type: List[Dict[Text, Any]] - processStatus # type: Text - ): # type: (...) -> None - # save status to the lockfile then release the lock - jobcachelock.seek(0) - jobcachelock.truncate() - jobcachelock.write(processStatus) - jobcachelock.close() - output_callbacks(outputs, processStatus) - - output_callbacks = partial( - update_status_output_callback, output_callbacks, jobcachelock) - - builder = self._init_job(job_order, runtimeContext) - - reffiles = copy.deepcopy(builder.files) - - j = self.make_job_runner(runtimeContext)( - builder, builder.job, self.make_path_mapper, self.requirements, - self.hints, jobname) - j.prov_obj = self.prov_obj - - j.successCodes = self.tool.get("successCodes", []) - j.temporaryFailCodes = self.tool.get("temporaryFailCodes", []) - j.permanentFailCodes = self.tool.get("permanentFailCodes", []) - - debug = _logger.isEnabledFor(logging.DEBUG) - - if debug: - _logger.debug(u"[job %s] initializing from %s%s", - j.name, - self.tool.get("id", ""), - u" as part of %s" % runtimeContext.part_of - if runtimeContext.part_of else "") - _logger.debug(u"[job %s] %s", j.name, json_dumps(builder.job, - indent=4)) - - builder.pathmapper = self.make_path_mapper( - reffiles, builder.stagedir, runtimeContext, True) - builder.requirements = j.requirements - - _check_adjust = partial(check_adjust, builder) - - visit_class([builder.files, builder.bindings], ("File", "Directory"), _check_adjust) - - initialWorkdir, _ = self.get_requirement("InitialWorkDirRequirement") - if initialWorkdir is not None: - ls = [] # type: List[Dict[Text, Any]] - if isinstance(initialWorkdir["listing"], string_types): - ls = builder.do_eval(initialWorkdir["listing"]) - else: - for t in initialWorkdir["listing"]: - if isinstance(t, Mapping) and "entry" in t: - entry_exp = builder.do_eval(t["entry"], strip_whitespace=False) - for entry in aslist(entry_exp): - et = {u"entry": entry} - if "entryname" in t: - et["entryname"] = builder.do_eval(t["entryname"]) - else: - et["entryname"] = None - et["writable"] = t.get("writable", False) - if et[u"entry"] is not None: - ls.append(et) - else: - initwd_item = builder.do_eval(t) - if not initwd_item: - continue - if isinstance(initwd_item, MutableSequence): - ls.extend(initwd_item) - else: - ls.append(initwd_item) - for i, t in enumerate(ls): - if "entry" in t: - if isinstance(t["entry"], string_types): - ls[i] = { - "class": "File", - "basename": t["entryname"], - "contents": t["entry"], - "writable": t.get("writable") - } - else: - if t.get("entryname") or t.get("writable"): - t = copy.deepcopy(t) - if t.get("entryname"): - t["entry"]["basename"] = t["entryname"] - t["entry"]["writable"] = t.get("writable") - ls[i] = t["entry"] - j.generatefiles["listing"] = ls - for l in ls: - self.updatePathmap(builder.outdir, builder.pathmapper, l) - visit_class([builder.files, builder.bindings], ("File", "Directory"), _check_adjust) - - if debug: - _logger.debug(u"[job %s] path mappings is %s", j.name, - json_dumps({p: builder.pathmapper.mapper(p) - for p in builder.pathmapper.files()}, - indent=4)) - - if self.tool.get("stdin"): - with SourceLine(self.tool, "stdin", validate.ValidationException, debug): - j.stdin = builder.do_eval(self.tool["stdin"]) - if j.stdin: - reffiles.append({"class": "File", "path": j.stdin}) - - if self.tool.get("stderr"): - with SourceLine(self.tool, "stderr", validate.ValidationException, debug): - j.stderr = builder.do_eval(self.tool["stderr"]) - if j.stderr: - if os.path.isabs(j.stderr) or ".." in j.stderr: - raise validate.ValidationException( - "stderr must be a relative path, got '%s'" % j.stderr) - - if self.tool.get("stdout"): - with SourceLine(self.tool, "stdout", validate.ValidationException, debug): - j.stdout = builder.do_eval(self.tool["stdout"]) - if j.stdout: - if os.path.isabs(j.stdout) or ".." in j.stdout or not j.stdout: - raise validate.ValidationException( - "stdout must be a relative path, got '%s'" % j.stdout) - - if debug: - _logger.debug(u"[job %s] command line bindings is %s", j.name, - json_dumps(builder.bindings, indent=4)) - dockerReq, _ = self.get_requirement("DockerRequirement") - if dockerReq is not None and runtimeContext.use_container: - out_dir, out_prefix = os.path.split( - runtimeContext.tmp_outdir_prefix) - j.outdir = runtimeContext.outdir or \ - tempfile.mkdtemp(prefix=out_prefix, dir=out_dir) - tmpdir_dir, tmpdir_prefix = os.path.split( - runtimeContext.tmpdir_prefix) - j.tmpdir = runtimeContext.tmpdir or \ - tempfile.mkdtemp(prefix=tmpdir_prefix, dir=tmpdir_dir) - j.stagedir = tempfile.mkdtemp(prefix=tmpdir_prefix, dir=tmpdir_dir) - else: - j.outdir = builder.outdir - j.tmpdir = builder.tmpdir - j.stagedir = builder.stagedir - - inplaceUpdateReq, _ = self.get_requirement("InplaceUpdateRequirement") - if inplaceUpdateReq is not None: - j.inplace_update = inplaceUpdateReq["inplaceUpdate"] - normalizeFilesDirs(j.generatefiles) - - readers = {} # type: Dict[Text, Any] - muts = set() # type: Set[Text] - - if builder.mutation_manager is not None: - def register_mut(f): # type: (Dict[Text, Any]) -> None - mm = cast(MutationManager, builder.mutation_manager) - muts.add(f["location"]) - mm.register_mutation(j.name, f) - - def register_reader(f): # type: (Dict[Text, Any]) -> None - mm = cast(MutationManager, builder.mutation_manager) - if f["location"] not in muts: - mm.register_reader(j.name, f) - readers[f["location"]] = copy.deepcopy(f) - - for li in j.generatefiles["listing"]: - li = cast(Dict[Text, Any], li) - if li.get("writable") and j.inplace_update: - adjustFileObjs(li, register_mut) - adjustDirObjs(li, register_mut) - else: - adjustFileObjs(li, register_reader) - adjustDirObjs(li, register_reader) - - adjustFileObjs(builder.files, register_reader) - adjustFileObjs(builder.bindings, register_reader) - adjustDirObjs(builder.files, register_reader) - adjustDirObjs(builder.bindings, register_reader) - - timelimit, _ = self.get_requirement("ToolTimeLimit") - if timelimit is not None: - with SourceLine(timelimit, "timelimit", validate.ValidationException, debug): - j.timelimit = builder.do_eval(timelimit["timelimit"]) - if not isinstance(j.timelimit, int) or j.timelimit < 0: - raise Exception("timelimit must be an integer >= 0, got: %s" % j.timelimit) - - networkaccess, _ = self.get_requirement("NetworkAccess") - if networkaccess is not None: - with SourceLine(networkaccess, "networkAccess", validate.ValidationException, debug): - j.networkaccess = builder.do_eval(networkaccess["networkAccess"]) - if not isinstance(j.networkaccess, bool): - raise Exception("networkAccess must be a boolean, got: %s" % j.networkaccess) - - j.environment = {} - evr, _ = self.get_requirement("EnvVarRequirement") - if evr is not None: - for t in evr["envDef"]: - j.environment[t["envName"]] = builder.do_eval(t["envValue"]) - - shellcmd, _ = self.get_requirement("ShellCommandRequirement") - if shellcmd is not None: - cmd = [] # type: List[Text] - for b in builder.bindings: - arg = builder.generate_arg(b) - if b.get("shellQuote", True): - arg = [shellescape.quote(a) for a in aslist(arg)] - cmd.extend(aslist(arg)) - j.command_line = ["/bin/sh", "-c", " ".join(cmd)] - else: - j.command_line = flatten(list(map(builder.generate_arg, builder.bindings))) - - j.pathmapper = builder.pathmapper - j.collect_outputs = partial( - self.collect_output_ports, self.tool["outputs"], builder, - compute_checksum=getdefault(runtimeContext.compute_checksum, True), - jobname=jobname, - readers=readers) - j.output_callback = output_callbacks - - yield j - - def collect_output_ports(self, - ports, # type: Set[Dict[Text, Any]] - builder, # type: Builder - outdir, # type: Text - rcode, # type: int - compute_checksum=True, # type: bool - jobname="", # type: Text - readers=None # type: Optional[Dict[Text, Any]] - ): # type: (...) -> OutputPorts - ret = {} # type: OutputPorts - debug = _logger.isEnabledFor(logging.DEBUG) - cwl_version = self.metadata.get( - "http://commonwl.org/cwltool#original_cwlVersion", None) - if cwl_version != "v1.0": - builder.resources["exitCode"] = rcode - try: - fs_access = builder.make_fs_access(outdir) - custom_output = fs_access.join(outdir, "cwl.output.json") - if fs_access.exists(custom_output): - with fs_access.open(custom_output, "r") as f: - ret = json.load(f) - if debug: - _logger.debug(u"Raw output from %s: %s", custom_output, - json_dumps(ret, indent=4)) - else: - for i, port in enumerate(ports): - class ParameterOutputWorkflowException(WorkflowException): - def __init__(self, msg, **kwargs): # type: (Text, **Any) -> None - super(ParameterOutputWorkflowException, self).__init__( - u"Error collecting output for parameter '%s':\n%s" - % (shortname(port["id"]), msg), kwargs) - with SourceLine(ports, i, ParameterOutputWorkflowException, debug): - fragment = shortname(port["id"]) - ret[fragment] = self.collect_output(port, builder, outdir, fs_access, - compute_checksum=compute_checksum) - if ret: - revmap = partial(revmap_file, builder, outdir) - adjustDirObjs(ret, trim_listing) - visit_class(ret, ("File", "Directory"), cast(Callable[[Any], Any], revmap)) - visit_class(ret, ("File", "Directory"), remove_path) - normalizeFilesDirs(ret) - visit_class(ret, ("File", "Directory"), partial(check_valid_locations, fs_access)) - - if compute_checksum: - adjustFileObjs(ret, partial(compute_checksums, fs_access)) - expected_schema = cast(Schema, self.names.get_name( - "outputs_record_schema", "")) - validate.validate_ex(expected_schema, ret, - strict=False, logger=_logger_validation_warnings) - if ret is not None and builder.mutation_manager is not None: - adjustFileObjs(ret, builder.mutation_manager.set_generation) - return ret if ret is not None else {} - except validate.ValidationException as e: - raise_from(WorkflowException( - "Error validating output record. " + Text(e) + "\n in " - + json_dumps(ret, indent=4)), e) - finally: - if builder.mutation_manager and readers: - for r in readers.values(): - builder.mutation_manager.release_reader(jobname, r) - - def collect_output(self, - schema, # type: Dict[Text, Any] - builder, # type: Builder - outdir, # type: Text - fs_access, # type: StdFsAccess - compute_checksum=True # type: bool - ): - # type: (...) -> Optional[Union[Dict[Text, Any], List[Union[Dict[Text, Any], Text]]]] - r = [] # type: List[Any] - empty_and_optional = False - debug = _logger.isEnabledFor(logging.DEBUG) - if "outputBinding" in schema: - binding = schema["outputBinding"] - globpatterns = [] # type: List[Text] - - revmap = partial(revmap_file, builder, outdir) - - if "glob" in binding: - with SourceLine(binding, "glob", WorkflowException, debug): - for gb in aslist(binding["glob"]): - gb = builder.do_eval(gb) - if gb: - globpatterns.extend(aslist(gb)) - - for gb in globpatterns: - if gb.startswith(builder.outdir): - gb = gb[len(builder.outdir) + 1:] - elif gb == ".": - gb = outdir - elif gb.startswith("/"): - raise WorkflowException( - "glob patterns must not start with '/'") - try: - prefix = fs_access.glob(outdir) - r.extend([{"location": g, - "path": fs_access.join(builder.outdir, - g[len(prefix[0])+1:]), - "basename": os.path.basename(g), - "nameroot": os.path.splitext( - os.path.basename(g))[0], - "nameext": os.path.splitext( - os.path.basename(g))[1], - "class": "File" if fs_access.isfile(g) - else "Directory"} - for g in sorted(fs_access.glob( - fs_access.join(outdir, gb)), - key=cmp_to_key(cast( - Callable[[Text, Text], - int], locale.strcoll)))]) - except (OSError, IOError) as e: - _logger.warning(Text(e)) - except Exception: - _logger.error("Unexpected error from fs_access", exc_info=True) - raise - - for files in r: - rfile = files.copy() - revmap(rfile) - if files["class"] == "Directory": - ll = schema.get("loadListing") or builder.loadListing - if ll and ll != "no_listing": - get_listing(fs_access, files, (ll == "deep_listing")) - else: - if binding.get("loadContents"): - with fs_access.open(rfile["location"], "rb") as f: - files["contents"] = content_limit_respected_read_bytes(f).decode("utf-8") - if compute_checksum: - with fs_access.open(rfile["location"], "rb") as f: - checksum = hashlib.sha1() # nosec - contents = f.read(1024 * 1024) - while contents != b"": - checksum.update(contents) - contents = f.read(1024 * 1024) - files["checksum"] = "sha1$%s" % checksum.hexdigest() - files["size"] = fs_access.size(rfile["location"]) - - optional = False - single = False - if isinstance(schema["type"], MutableSequence): - if "null" in schema["type"]: - optional = True - if "File" in schema["type"] or "Directory" in schema["type"]: - single = True - elif schema["type"] == "File" or schema["type"] == "Directory": - single = True - - if "outputEval" in binding: - with SourceLine(binding, "outputEval", WorkflowException, debug): - r = builder.do_eval(binding["outputEval"], context=r) - - if single: - if not r and not optional: - with SourceLine(binding, "glob", WorkflowException, debug): - raise WorkflowException("Did not find output file with glob pattern: '{}'".format(globpatterns)) - elif not r and optional: - pass - elif isinstance(r, MutableSequence): - if len(r) > 1: - raise WorkflowException("Multiple matches for output item that is a single file.") - else: - r = r[0] - - if "secondaryFiles" in schema: - with SourceLine(schema, "secondaryFiles", WorkflowException, debug): - for primary in aslist(r): - if isinstance(primary, MutableMapping): - primary.setdefault("secondaryFiles", []) - pathprefix = primary["path"][0:primary["path"].rindex("/")+1] - for sf in aslist(schema["secondaryFiles"]): - if 'required' in sf: - sf_required = builder.do_eval(sf['required'], context=primary) - else: - sf_required = False - - if "$(" in sf["pattern"] or "${" in sf["pattern"]: - sfpath = builder.do_eval(sf["pattern"], context=primary) - else: - sfpath = substitute(primary["basename"], sf["pattern"]) - - for sfitem in aslist(sfpath): - if not sfitem: - continue - if isinstance(sfitem, string_types): - sfitem = {"path": pathprefix+sfitem} - if not fs_access.exists(sfitem['path']) and sf_required: - raise WorkflowException( - "Missing required secondary file '%s'" % ( - sfitem["path"])) - if "path" in sfitem and "location" not in sfitem: - revmap(sfitem) - if fs_access.isfile(sfitem["location"]): - sfitem["class"] = "File" - primary["secondaryFiles"].append(sfitem) - elif fs_access.isdir(sfitem["location"]): - sfitem["class"] = "Directory" - primary["secondaryFiles"].append(sfitem) - - if "format" in schema: - for primary in aslist(r): - primary["format"] = builder.do_eval(schema["format"], context=primary) - - # Ensure files point to local references outside of the run environment - adjustFileObjs(r, revmap) - - if not r and optional: - # Don't convert zero or empty string to None - if r in [0, '']: - return r - # For [] or None, return None - else: - return None - - if (not empty_and_optional and isinstance(schema["type"], MutableMapping) - and schema["type"]["type"] == "record"): - out = {} - for f in schema["type"]["fields"]: - out[shortname(f["name"])] = self.collect_output( # type: ignore - f, builder, outdir, fs_access, - compute_checksum=compute_checksum) - return out - return r
