Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/cwltool/command_line_tool.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
| author | shellac |
|---|---|
| date | Mon, 01 Jun 2020 08:59:25 -0400 |
| parents | 79f47841a781 |
| children |
comparison
equal
deleted
inserted
replaced
| 4:79f47841a781 | 5:9b1c78e6ba9c |
|---|---|
| 1 """Implementation of CommandLineTool.""" | |
| 2 from __future__ import absolute_import | |
| 3 | |
| 4 import copy | |
| 5 import hashlib | |
| 6 import json | |
| 7 import locale | |
| 8 import logging | |
| 9 import os | |
| 10 import re | |
| 11 import shutil | |
| 12 import tempfile | |
| 13 import threading | |
| 14 from functools import cmp_to_key, partial | |
| 15 from typing import (Any, Callable, Dict, Generator, IO, List, Mapping, | |
| 16 MutableMapping, MutableSequence, Optional, Set, Union, cast) | |
| 17 | |
| 18 from typing_extensions import Text, Type, TYPE_CHECKING # pylint: disable=unused-import | |
| 19 # move to a regular typing import when Python 3.3-3.6 is no longer supported | |
| 20 | |
| 21 import shellescape | |
| 22 from schema_salad import validate | |
| 23 from schema_salad.avro.schema import Schema | |
| 24 from schema_salad.ref_resolver import file_uri, uri_file_path | |
| 25 from schema_salad.sourceline import SourceLine | |
| 26 from six import string_types | |
| 27 from future.utils import raise_from | |
| 28 | |
| 29 from six.moves import map, urllib | |
| 30 from typing_extensions import (TYPE_CHECKING, # pylint: disable=unused-import | |
| 31 Text, Type) | |
| 32 # move to a regular typing import when Python 3.3-3.6 is no longer supported | |
| 33 | |
| 34 from .builder import (Builder, content_limit_respected_read_bytes, # pylint: disable=unused-import | |
| 35 substitute) | |
| 36 from .context import LoadingContext # pylint: disable=unused-import | |
| 37 from .context import RuntimeContext, getdefault | |
| 38 from .docker import DockerCommandLineJob | |
| 39 from .errors import WorkflowException | |
| 40 from .flatten import flatten | |
| 41 from .job import CommandLineJob, JobBase # pylint: disable=unused-import | |
| 42 from .loghandler import _logger | |
| 43 from .mutation import MutationManager # pylint: disable=unused-import | |
| 44 from .pathmapper import (PathMapper, adjustDirObjs, adjustFileObjs, | |
| 45 get_listing, trim_listing, visit_class) | |
| 46 from .process import (Process, UnsupportedRequirement, | |
| 47 _logger_validation_warnings, compute_checksums, | |
| 48 normalizeFilesDirs, shortname, uniquename) | |
| 49 from .singularity import SingularityCommandLineJob | |
| 50 from .software_requirements import ( # pylint: disable=unused-import | |
| 51 DependenciesConfiguration) | |
| 52 from .stdfsaccess import StdFsAccess # pylint: disable=unused-import | |
| 53 from .utils import (aslist, convert_pathsep_to_unix, | |
| 54 docker_windows_path_adjust, json_dumps, onWindows, | |
| 55 random_outdir, windows_default_container_id, | |
| 56 shared_file_lock, upgrade_lock) | |
| 57 if TYPE_CHECKING: | |
| 58 from .provenance import ProvenanceProfile # pylint: disable=unused-import | |
| 59 | |
| 60 ACCEPTLIST_EN_STRICT_RE = re.compile(r"^[a-zA-Z0-9._+-]+$") | |
| 61 ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*") # Accept anything | |
| 62 ACCEPTLIST_RE = ACCEPTLIST_EN_STRICT_RE | |
| 63 DEFAULT_CONTAINER_MSG = """ | |
| 64 We are on Microsoft Windows and not all components of this CWL description have a | |
| 65 container specified. This means that these steps will be executed in the default container, | |
| 66 which is %s. | |
| 67 | |
| 68 Note, this could affect portability if this CWL description relies on non-POSIX features | |
| 69 or commands in this container. For best results add the following to your CWL | |
| 70 description's hints section: | |
| 71 | |
| 72 hints: | |
| 73 DockerRequirement: | |
| 74 dockerPull: %s | |
| 75 """ | |
| 76 | |
| 77 | |
| 78 class ExpressionTool(Process): | |
| 79 class ExpressionJob(object): | |
| 80 """Job for ExpressionTools.""" | |
| 81 | |
| 82 def __init__(self, | |
| 83 builder, # type: Builder | |
| 84 script, # type: Dict[Text, Text] | |
| 85 output_callback, # type: Callable[[Any, Any], Any] | |
| 86 requirements, # type: List[Dict[Text, Text]] | |
| 87 hints, # type: List[Dict[Text, Text]] | |
| 88 outdir=None, # type: Optional[Text] | |
| 89 tmpdir=None, # type: Optional[Text] | |
| 90 ): # type: (...) -> None | |
| 91 """Initializet this ExpressionJob.""" | |
| 92 self.builder = builder | |
| 93 self.requirements = requirements | |
| 94 self.hints = hints | |
| 95 self.collect_outputs = None # type: Optional[Callable[[Any], Any]] | |
| 96 self.output_callback = output_callback | |
| 97 self.outdir = outdir | |
| 98 self.tmpdir = tmpdir | |
| 99 self.script = script | |
| 100 self.prov_obj = None # type: Optional[ProvenanceProfile] | |
| 101 | |
| 102 def run(self, | |
| 103 runtimeContext, # type: RuntimeContext | |
| 104 tmpdir_lock=None # type: Optional[threading.Lock] | |
| 105 ): # type: (...) -> None | |
| 106 try: | |
| 107 normalizeFilesDirs(self.builder.job) | |
| 108 ev = self.builder.do_eval(self.script) | |
| 109 normalizeFilesDirs(ev) | |
| 110 self.output_callback(ev, "success") | |
| 111 except Exception as err: | |
| 112 _logger.warning(u"Failed to evaluate expression:\n%s", | |
| 113 Text(err), exc_info=runtimeContext.debug) | |
| 114 self.output_callback({}, "permanentFail") | |
| 115 | |
| 116 def job(self, | |
| 117 job_order, # type: Mapping[Text, Text] | |
| 118 output_callbacks, # type: Callable[[Any, Any], Any] | |
| 119 runtimeContext # type: RuntimeContext | |
| 120 ): | |
| 121 # type: (...) -> Generator[ExpressionTool.ExpressionJob, None, None] | |
| 122 builder = self._init_job(job_order, runtimeContext) | |
| 123 | |
| 124 job = ExpressionTool.ExpressionJob( | |
| 125 builder, self.tool["expression"], output_callbacks, | |
| 126 self.requirements, self.hints) | |
| 127 job.prov_obj = runtimeContext.prov_obj | |
| 128 yield job | |
| 129 | |
| 130 | |
| 131 def remove_path(f): # type: (Dict[Text, Any]) -> None | |
| 132 if "path" in f: | |
| 133 del f["path"] | |
| 134 | |
| 135 | |
| 136 def revmap_file(builder, outdir, f): | |
| 137 # type: (Builder, Text, Dict[Text, Any]) -> Union[Dict[Text, Any], None] | |
| 138 """ | |
| 139 Remap a file from internal path to external path. | |
| 140 | |
| 141 For Docker, this maps from the path inside tho container to the path | |
| 142 outside the container. Recognizes files in the pathmapper or remaps | |
| 143 internal output directories to the external directory. | |
| 144 """ | |
| 145 split = urllib.parse.urlsplit(outdir) | |
| 146 if not split.scheme: | |
| 147 outdir = file_uri(str(outdir)) | |
| 148 | |
| 149 # builder.outdir is the inner (container/compute node) output directory | |
| 150 # outdir is the outer (host/storage system) output directory | |
| 151 | |
| 152 if "location" in f and "path" not in f: | |
| 153 if f["location"].startswith("file://"): | |
| 154 f["path"] = convert_pathsep_to_unix(uri_file_path(f["location"])) | |
| 155 else: | |
| 156 return f | |
| 157 | |
| 158 if "path" in f: | |
| 159 path = f["path"] | |
| 160 uripath = file_uri(path) | |
| 161 del f["path"] | |
| 162 | |
| 163 if "basename" not in f: | |
| 164 f["basename"] = os.path.basename(path) | |
| 165 | |
| 166 if not builder.pathmapper: | |
| 167 raise ValueError("Do not call revmap_file using a builder that doesn't have a pathmapper.") | |
| 168 revmap_f = builder.pathmapper.reversemap(path) | |
| 169 | |
| 170 if revmap_f and not builder.pathmapper.mapper(revmap_f[0]).type.startswith("Writable"): | |
| 171 f["location"] = revmap_f[1] | |
| 172 elif uripath == outdir or uripath.startswith(outdir+os.sep) or uripath.startswith(outdir+'/'): | |
| 173 f["location"] = file_uri(path) | |
| 174 elif path == builder.outdir or path.startswith(builder.outdir+os.sep) or path.startswith(builder.outdir+'/'): | |
| 175 f["location"] = builder.fs_access.join(outdir, path[len(builder.outdir) + 1:]) | |
| 176 elif not os.path.isabs(path): | |
| 177 f["location"] = builder.fs_access.join(outdir, path) | |
| 178 else: | |
| 179 raise WorkflowException(u"Output file path %s must be within designated output directory (%s) or an input " | |
| 180 u"file pass through." % (path, builder.outdir)) | |
| 181 return f | |
| 182 | |
| 183 raise WorkflowException(u"Output File object is missing both 'location' " | |
| 184 "and 'path' fields: %s" % f) | |
| 185 | |
| 186 | |
| 187 class CallbackJob(object): | |
| 188 def __init__(self, job, output_callback, cachebuilder, jobcache): | |
| 189 # type: (CommandLineTool, Callable[[Any, Any], Any], Builder, Text) -> None | |
| 190 """Initialize this CallbackJob.""" | |
| 191 self.job = job | |
| 192 self.output_callback = output_callback | |
| 193 self.cachebuilder = cachebuilder | |
| 194 self.outdir = jobcache | |
| 195 self.prov_obj = None # type: Optional[ProvenanceProfile] | |
| 196 | |
| 197 def run(self, | |
| 198 runtimeContext, # type: RuntimeContext | |
| 199 tmpdir_lock=None # type: Optional[threading.Lock] | |
| 200 ): # type: (...) -> None | |
| 201 self.output_callback(self.job.collect_output_ports( | |
| 202 self.job.tool["outputs"], | |
| 203 self.cachebuilder, | |
| 204 self.outdir, | |
| 205 getdefault(runtimeContext.compute_checksum, True)), "success") | |
| 206 | |
| 207 | |
| 208 def check_adjust(builder, file_o): | |
| 209 # type: (Builder, Dict[Text, Any]) -> Dict[Text, Any] | |
| 210 """ | |
| 211 Map files to assigned path inside a container. | |
| 212 | |
| 213 We need to also explicitly walk over input, as implicit reassignment | |
| 214 doesn't reach everything in builder.bindings | |
| 215 """ | |
| 216 if not builder.pathmapper: | |
| 217 raise ValueError("Do not call check_adjust using a builder that doesn't have a pathmapper.") | |
| 218 file_o["path"] = docker_windows_path_adjust( | |
| 219 builder.pathmapper.mapper(file_o["location"])[1]) | |
| 220 dn, bn = os.path.split(file_o["path"]) | |
| 221 if file_o.get("dirname") != dn: | |
| 222 file_o["dirname"] = Text(dn) | |
| 223 if file_o.get("basename") != bn: | |
| 224 file_o["basename"] = Text(bn) | |
| 225 if file_o["class"] == "File": | |
| 226 nr, ne = os.path.splitext(file_o["basename"]) | |
| 227 if file_o.get("nameroot") != nr: | |
| 228 file_o["nameroot"] = Text(nr) | |
| 229 if file_o.get("nameext") != ne: | |
| 230 file_o["nameext"] = Text(ne) | |
| 231 if not ACCEPTLIST_RE.match(file_o["basename"]): | |
| 232 raise WorkflowException( | |
| 233 "Invalid filename: '{}' contains illegal characters".format( | |
| 234 file_o["basename"])) | |
| 235 return file_o | |
| 236 | |
| 237 def check_valid_locations(fs_access, ob): # type: (StdFsAccess, Dict[Text, Any]) -> None | |
| 238 if ob["location"].startswith("_:"): | |
| 239 pass | |
| 240 if ob["class"] == "File" and not fs_access.isfile(ob["location"]): | |
| 241 raise validate.ValidationException("Does not exist or is not a File: '%s'" % ob["location"]) | |
| 242 if ob["class"] == "Directory" and not fs_access.isdir(ob["location"]): | |
| 243 raise validate.ValidationException("Does not exist or is not a Directory: '%s'" % ob["location"]) | |
| 244 | |
| 245 | |
| 246 OutputPorts = Dict[Text, Union[None, Text, List[Union[Dict[Text, Any], Text]], Dict[Text, Any]]] | |
| 247 | |
| 248 class CommandLineTool(Process): | |
| 249 def __init__(self, toolpath_object, loadingContext): | |
| 250 # type: (MutableMapping[Text, Any], LoadingContext) -> None | |
| 251 """Initialize this CommandLineTool.""" | |
| 252 super(CommandLineTool, self).__init__(toolpath_object, loadingContext) | |
| 253 self.prov_obj = loadingContext.prov_obj | |
| 254 | |
| 255 def make_job_runner(self, | |
| 256 runtimeContext # type: RuntimeContext | |
| 257 ): # type: (...) -> Type[JobBase] | |
| 258 dockerReq, _ = self.get_requirement("DockerRequirement") | |
| 259 if not dockerReq and runtimeContext.use_container: | |
| 260 if runtimeContext.find_default_container is not None: | |
| 261 default_container = runtimeContext.find_default_container(self) | |
| 262 if default_container is not None: | |
| 263 self.requirements.insert(0, { | |
| 264 "class": "DockerRequirement", | |
| 265 "dockerPull": default_container | |
| 266 }) | |
| 267 dockerReq = self.requirements[0] | |
| 268 if default_container == windows_default_container_id \ | |
| 269 and runtimeContext.use_container and onWindows(): | |
| 270 _logger.warning( | |
| 271 DEFAULT_CONTAINER_MSG, windows_default_container_id, | |
| 272 windows_default_container_id) | |
| 273 | |
| 274 if dockerReq is not None and runtimeContext.use_container: | |
| 275 if runtimeContext.singularity: | |
| 276 return SingularityCommandLineJob | |
| 277 return DockerCommandLineJob | |
| 278 for t in reversed(self.requirements): | |
| 279 if t["class"] == "DockerRequirement": | |
| 280 raise UnsupportedRequirement( | |
| 281 "--no-container, but this CommandLineTool has " | |
| 282 "DockerRequirement under 'requirements'.") | |
| 283 return CommandLineJob | |
| 284 | |
| 285 def make_path_mapper(self, reffiles, stagedir, runtimeContext, separateDirs): | |
| 286 # type: (List[Any], Text, RuntimeContext, bool) -> PathMapper | |
| 287 return PathMapper(reffiles, runtimeContext.basedir, stagedir, separateDirs) | |
| 288 | |
| 289 def updatePathmap(self, outdir, pathmap, fn): | |
| 290 # type: (Text, PathMapper, Dict[Text, Any]) -> None | |
| 291 if "location" in fn and fn["location"] in pathmap: | |
| 292 pathmap.update(fn["location"], pathmap.mapper(fn["location"]).resolved, | |
| 293 os.path.join(outdir, fn["basename"]), | |
| 294 ("Writable" if fn.get("writable") else "") + fn["class"], False) | |
| 295 for sf in fn.get("secondaryFiles", []): | |
| 296 self.updatePathmap(outdir, pathmap, sf) | |
| 297 for ls in fn.get("listing", []): | |
| 298 self.updatePathmap(os.path.join(outdir, fn["basename"]), pathmap, ls) | |
| 299 | |
| 300 def job(self, | |
| 301 job_order, # type: Mapping[Text, Text] | |
| 302 output_callbacks, # type: Callable[[Any, Any], Any] | |
| 303 runtimeContext # type: RuntimeContext | |
| 304 ): | |
| 305 # type: (...) -> Generator[Union[JobBase, CallbackJob], None, None] | |
| 306 | |
| 307 workReuse, _ = self.get_requirement("WorkReuse") | |
| 308 enableReuse = workReuse.get("enableReuse", True) if workReuse else True | |
| 309 | |
| 310 jobname = uniquename(runtimeContext.name or shortname(self.tool.get("id", "job"))) | |
| 311 if runtimeContext.cachedir and enableReuse: | |
| 312 cachecontext = runtimeContext.copy() | |
| 313 cachecontext.outdir = "/out" | |
| 314 cachecontext.tmpdir = "/tmp" # nosec | |
| 315 cachecontext.stagedir = "/stage" | |
| 316 cachebuilder = self._init_job(job_order, cachecontext) | |
| 317 cachebuilder.pathmapper = PathMapper(cachebuilder.files, | |
| 318 runtimeContext.basedir, | |
| 319 cachebuilder.stagedir, | |
| 320 separateDirs=False) | |
| 321 _check_adjust = partial(check_adjust, cachebuilder) | |
| 322 visit_class([cachebuilder.files, cachebuilder.bindings], | |
| 323 ("File", "Directory"), _check_adjust) | |
| 324 | |
| 325 cmdline = flatten(list(map(cachebuilder.generate_arg, cachebuilder.bindings))) | |
| 326 docker_req, _ = self.get_requirement("DockerRequirement") | |
| 327 if docker_req is not None and runtimeContext.use_container: | |
| 328 dockerimg = docker_req.get("dockerImageId") or docker_req.get("dockerPull") | |
| 329 elif runtimeContext.default_container is not None and runtimeContext.use_container: | |
| 330 dockerimg = runtimeContext.default_container | |
| 331 else: | |
| 332 dockerimg = None | |
| 333 | |
| 334 if dockerimg is not None: | |
| 335 cmdline = ["docker", "run", dockerimg] + cmdline | |
| 336 # not really run using docker, just for hashing purposes | |
| 337 keydict = {u"cmdline": cmdline} # type: Dict[Text, Union[Dict[Text, Any], List[Any]]] | |
| 338 | |
| 339 for shortcut in ["stdin", "stdout", "stderr"]: | |
| 340 if shortcut in self.tool: | |
| 341 keydict[shortcut] = self.tool[shortcut] | |
| 342 | |
| 343 for location, fobj in cachebuilder.pathmapper.items(): | |
| 344 if fobj.type == "File": | |
| 345 checksum = next( | |
| 346 (e['checksum'] for e in cachebuilder.files | |
| 347 if 'location' in e and e['location'] == location | |
| 348 and 'checksum' in e | |
| 349 and e['checksum'] != 'sha1$hash'), None) | |
| 350 fobj_stat = os.stat(fobj.resolved) | |
| 351 if checksum is not None: | |
| 352 keydict[fobj.resolved] = [fobj_stat.st_size, checksum] | |
| 353 else: | |
| 354 keydict[fobj.resolved] = [fobj_stat.st_size, | |
| 355 int(fobj_stat.st_mtime * 1000)] | |
| 356 | |
| 357 interesting = {"DockerRequirement", | |
| 358 "EnvVarRequirement", | |
| 359 "InitialWorkDirRequirement", | |
| 360 "ShellCommandRequirement", | |
| 361 "NetworkAccess"} | |
| 362 for rh in (self.original_requirements, self.original_hints): | |
| 363 for r in reversed(rh): | |
| 364 if r["class"] in interesting and r["class"] not in keydict: | |
| 365 keydict[r["class"]] = r | |
| 366 | |
| 367 keydictstr = json_dumps(keydict, separators=(',', ':'), | |
| 368 sort_keys=True) | |
| 369 cachekey = hashlib.md5( # nosec | |
| 370 keydictstr.encode('utf-8')).hexdigest() | |
| 371 | |
| 372 _logger.debug("[job %s] keydictstr is %s -> %s", jobname, | |
| 373 keydictstr, cachekey) | |
| 374 | |
| 375 jobcache = os.path.join(runtimeContext.cachedir, cachekey) | |
| 376 | |
| 377 # Create a lockfile to manage cache status. | |
| 378 jobcachepending = "{}.status".format(jobcache) | |
| 379 jobcachelock = None | |
| 380 jobstatus = None | |
| 381 | |
| 382 # Opens the file for read/write, or creates an empty file. | |
| 383 jobcachelock = open(jobcachepending, "a+") | |
| 384 | |
| 385 # get the shared lock to ensure no other process is trying | |
| 386 # to write to this cache | |
| 387 shared_file_lock(jobcachelock) | |
| 388 jobcachelock.seek(0) | |
| 389 jobstatus = jobcachelock.read() | |
| 390 | |
| 391 if os.path.isdir(jobcache) and jobstatus == "success": | |
| 392 if docker_req and runtimeContext.use_container: | |
| 393 cachebuilder.outdir = runtimeContext.docker_outdir or random_outdir() | |
| 394 else: | |
| 395 cachebuilder.outdir = jobcache | |
| 396 | |
| 397 _logger.info("[job %s] Using cached output in %s", jobname, jobcache) | |
| 398 yield CallbackJob(self, output_callbacks, cachebuilder, jobcache) | |
| 399 # we're done with the cache so release lock | |
| 400 jobcachelock.close() | |
| 401 return | |
| 402 else: | |
| 403 _logger.info("[job %s] Output of job will be cached in %s", jobname, jobcache) | |
| 404 | |
| 405 # turn shared lock into an exclusive lock since we'll | |
| 406 # be writing the cache directory | |
| 407 upgrade_lock(jobcachelock) | |
| 408 | |
| 409 shutil.rmtree(jobcache, True) | |
| 410 os.makedirs(jobcache) | |
| 411 runtimeContext = runtimeContext.copy() | |
| 412 runtimeContext.outdir = jobcache | |
| 413 | |
| 414 def update_status_output_callback( | |
| 415 output_callbacks, # type: Callable[[List[Dict[Text, Any]], Text], None] | |
| 416 jobcachelock, # type: IO[Any] | |
| 417 outputs, # type: List[Dict[Text, Any]] | |
| 418 processStatus # type: Text | |
| 419 ): # type: (...) -> None | |
| 420 # save status to the lockfile then release the lock | |
| 421 jobcachelock.seek(0) | |
| 422 jobcachelock.truncate() | |
| 423 jobcachelock.write(processStatus) | |
| 424 jobcachelock.close() | |
| 425 output_callbacks(outputs, processStatus) | |
| 426 | |
| 427 output_callbacks = partial( | |
| 428 update_status_output_callback, output_callbacks, jobcachelock) | |
| 429 | |
| 430 builder = self._init_job(job_order, runtimeContext) | |
| 431 | |
| 432 reffiles = copy.deepcopy(builder.files) | |
| 433 | |
| 434 j = self.make_job_runner(runtimeContext)( | |
| 435 builder, builder.job, self.make_path_mapper, self.requirements, | |
| 436 self.hints, jobname) | |
| 437 j.prov_obj = self.prov_obj | |
| 438 | |
| 439 j.successCodes = self.tool.get("successCodes", []) | |
| 440 j.temporaryFailCodes = self.tool.get("temporaryFailCodes", []) | |
| 441 j.permanentFailCodes = self.tool.get("permanentFailCodes", []) | |
| 442 | |
| 443 debug = _logger.isEnabledFor(logging.DEBUG) | |
| 444 | |
| 445 if debug: | |
| 446 _logger.debug(u"[job %s] initializing from %s%s", | |
| 447 j.name, | |
| 448 self.tool.get("id", ""), | |
| 449 u" as part of %s" % runtimeContext.part_of | |
| 450 if runtimeContext.part_of else "") | |
| 451 _logger.debug(u"[job %s] %s", j.name, json_dumps(builder.job, | |
| 452 indent=4)) | |
| 453 | |
| 454 builder.pathmapper = self.make_path_mapper( | |
| 455 reffiles, builder.stagedir, runtimeContext, True) | |
| 456 builder.requirements = j.requirements | |
| 457 | |
| 458 _check_adjust = partial(check_adjust, builder) | |
| 459 | |
| 460 visit_class([builder.files, builder.bindings], ("File", "Directory"), _check_adjust) | |
| 461 | |
| 462 initialWorkdir, _ = self.get_requirement("InitialWorkDirRequirement") | |
| 463 if initialWorkdir is not None: | |
| 464 ls = [] # type: List[Dict[Text, Any]] | |
| 465 if isinstance(initialWorkdir["listing"], string_types): | |
| 466 ls = builder.do_eval(initialWorkdir["listing"]) | |
| 467 else: | |
| 468 for t in initialWorkdir["listing"]: | |
| 469 if isinstance(t, Mapping) and "entry" in t: | |
| 470 entry_exp = builder.do_eval(t["entry"], strip_whitespace=False) | |
| 471 for entry in aslist(entry_exp): | |
| 472 et = {u"entry": entry} | |
| 473 if "entryname" in t: | |
| 474 et["entryname"] = builder.do_eval(t["entryname"]) | |
| 475 else: | |
| 476 et["entryname"] = None | |
| 477 et["writable"] = t.get("writable", False) | |
| 478 if et[u"entry"] is not None: | |
| 479 ls.append(et) | |
| 480 else: | |
| 481 initwd_item = builder.do_eval(t) | |
| 482 if not initwd_item: | |
| 483 continue | |
| 484 if isinstance(initwd_item, MutableSequence): | |
| 485 ls.extend(initwd_item) | |
| 486 else: | |
| 487 ls.append(initwd_item) | |
| 488 for i, t in enumerate(ls): | |
| 489 if "entry" in t: | |
| 490 if isinstance(t["entry"], string_types): | |
| 491 ls[i] = { | |
| 492 "class": "File", | |
| 493 "basename": t["entryname"], | |
| 494 "contents": t["entry"], | |
| 495 "writable": t.get("writable") | |
| 496 } | |
| 497 else: | |
| 498 if t.get("entryname") or t.get("writable"): | |
| 499 t = copy.deepcopy(t) | |
| 500 if t.get("entryname"): | |
| 501 t["entry"]["basename"] = t["entryname"] | |
| 502 t["entry"]["writable"] = t.get("writable") | |
| 503 ls[i] = t["entry"] | |
| 504 j.generatefiles["listing"] = ls | |
| 505 for l in ls: | |
| 506 self.updatePathmap(builder.outdir, builder.pathmapper, l) | |
| 507 visit_class([builder.files, builder.bindings], ("File", "Directory"), _check_adjust) | |
| 508 | |
| 509 if debug: | |
| 510 _logger.debug(u"[job %s] path mappings is %s", j.name, | |
| 511 json_dumps({p: builder.pathmapper.mapper(p) | |
| 512 for p in builder.pathmapper.files()}, | |
| 513 indent=4)) | |
| 514 | |
| 515 if self.tool.get("stdin"): | |
| 516 with SourceLine(self.tool, "stdin", validate.ValidationException, debug): | |
| 517 j.stdin = builder.do_eval(self.tool["stdin"]) | |
| 518 if j.stdin: | |
| 519 reffiles.append({"class": "File", "path": j.stdin}) | |
| 520 | |
| 521 if self.tool.get("stderr"): | |
| 522 with SourceLine(self.tool, "stderr", validate.ValidationException, debug): | |
| 523 j.stderr = builder.do_eval(self.tool["stderr"]) | |
| 524 if j.stderr: | |
| 525 if os.path.isabs(j.stderr) or ".." in j.stderr: | |
| 526 raise validate.ValidationException( | |
| 527 "stderr must be a relative path, got '%s'" % j.stderr) | |
| 528 | |
| 529 if self.tool.get("stdout"): | |
| 530 with SourceLine(self.tool, "stdout", validate.ValidationException, debug): | |
| 531 j.stdout = builder.do_eval(self.tool["stdout"]) | |
| 532 if j.stdout: | |
| 533 if os.path.isabs(j.stdout) or ".." in j.stdout or not j.stdout: | |
| 534 raise validate.ValidationException( | |
| 535 "stdout must be a relative path, got '%s'" % j.stdout) | |
| 536 | |
| 537 if debug: | |
| 538 _logger.debug(u"[job %s] command line bindings is %s", j.name, | |
| 539 json_dumps(builder.bindings, indent=4)) | |
| 540 dockerReq, _ = self.get_requirement("DockerRequirement") | |
| 541 if dockerReq is not None and runtimeContext.use_container: | |
| 542 out_dir, out_prefix = os.path.split( | |
| 543 runtimeContext.tmp_outdir_prefix) | |
| 544 j.outdir = runtimeContext.outdir or \ | |
| 545 tempfile.mkdtemp(prefix=out_prefix, dir=out_dir) | |
| 546 tmpdir_dir, tmpdir_prefix = os.path.split( | |
| 547 runtimeContext.tmpdir_prefix) | |
| 548 j.tmpdir = runtimeContext.tmpdir or \ | |
| 549 tempfile.mkdtemp(prefix=tmpdir_prefix, dir=tmpdir_dir) | |
| 550 j.stagedir = tempfile.mkdtemp(prefix=tmpdir_prefix, dir=tmpdir_dir) | |
| 551 else: | |
| 552 j.outdir = builder.outdir | |
| 553 j.tmpdir = builder.tmpdir | |
| 554 j.stagedir = builder.stagedir | |
| 555 | |
| 556 inplaceUpdateReq, _ = self.get_requirement("InplaceUpdateRequirement") | |
| 557 if inplaceUpdateReq is not None: | |
| 558 j.inplace_update = inplaceUpdateReq["inplaceUpdate"] | |
| 559 normalizeFilesDirs(j.generatefiles) | |
| 560 | |
| 561 readers = {} # type: Dict[Text, Any] | |
| 562 muts = set() # type: Set[Text] | |
| 563 | |
| 564 if builder.mutation_manager is not None: | |
| 565 def register_mut(f): # type: (Dict[Text, Any]) -> None | |
| 566 mm = cast(MutationManager, builder.mutation_manager) | |
| 567 muts.add(f["location"]) | |
| 568 mm.register_mutation(j.name, f) | |
| 569 | |
| 570 def register_reader(f): # type: (Dict[Text, Any]) -> None | |
| 571 mm = cast(MutationManager, builder.mutation_manager) | |
| 572 if f["location"] not in muts: | |
| 573 mm.register_reader(j.name, f) | |
| 574 readers[f["location"]] = copy.deepcopy(f) | |
| 575 | |
| 576 for li in j.generatefiles["listing"]: | |
| 577 li = cast(Dict[Text, Any], li) | |
| 578 if li.get("writable") and j.inplace_update: | |
| 579 adjustFileObjs(li, register_mut) | |
| 580 adjustDirObjs(li, register_mut) | |
| 581 else: | |
| 582 adjustFileObjs(li, register_reader) | |
| 583 adjustDirObjs(li, register_reader) | |
| 584 | |
| 585 adjustFileObjs(builder.files, register_reader) | |
| 586 adjustFileObjs(builder.bindings, register_reader) | |
| 587 adjustDirObjs(builder.files, register_reader) | |
| 588 adjustDirObjs(builder.bindings, register_reader) | |
| 589 | |
| 590 timelimit, _ = self.get_requirement("ToolTimeLimit") | |
| 591 if timelimit is not None: | |
| 592 with SourceLine(timelimit, "timelimit", validate.ValidationException, debug): | |
| 593 j.timelimit = builder.do_eval(timelimit["timelimit"]) | |
| 594 if not isinstance(j.timelimit, int) or j.timelimit < 0: | |
| 595 raise Exception("timelimit must be an integer >= 0, got: %s" % j.timelimit) | |
| 596 | |
| 597 networkaccess, _ = self.get_requirement("NetworkAccess") | |
| 598 if networkaccess is not None: | |
| 599 with SourceLine(networkaccess, "networkAccess", validate.ValidationException, debug): | |
| 600 j.networkaccess = builder.do_eval(networkaccess["networkAccess"]) | |
| 601 if not isinstance(j.networkaccess, bool): | |
| 602 raise Exception("networkAccess must be a boolean, got: %s" % j.networkaccess) | |
| 603 | |
| 604 j.environment = {} | |
| 605 evr, _ = self.get_requirement("EnvVarRequirement") | |
| 606 if evr is not None: | |
| 607 for t in evr["envDef"]: | |
| 608 j.environment[t["envName"]] = builder.do_eval(t["envValue"]) | |
| 609 | |
| 610 shellcmd, _ = self.get_requirement("ShellCommandRequirement") | |
| 611 if shellcmd is not None: | |
| 612 cmd = [] # type: List[Text] | |
| 613 for b in builder.bindings: | |
| 614 arg = builder.generate_arg(b) | |
| 615 if b.get("shellQuote", True): | |
| 616 arg = [shellescape.quote(a) for a in aslist(arg)] | |
| 617 cmd.extend(aslist(arg)) | |
| 618 j.command_line = ["/bin/sh", "-c", " ".join(cmd)] | |
| 619 else: | |
| 620 j.command_line = flatten(list(map(builder.generate_arg, builder.bindings))) | |
| 621 | |
| 622 j.pathmapper = builder.pathmapper | |
| 623 j.collect_outputs = partial( | |
| 624 self.collect_output_ports, self.tool["outputs"], builder, | |
| 625 compute_checksum=getdefault(runtimeContext.compute_checksum, True), | |
| 626 jobname=jobname, | |
| 627 readers=readers) | |
| 628 j.output_callback = output_callbacks | |
| 629 | |
| 630 yield j | |
| 631 | |
| 632 def collect_output_ports(self, | |
| 633 ports, # type: Set[Dict[Text, Any]] | |
| 634 builder, # type: Builder | |
| 635 outdir, # type: Text | |
| 636 rcode, # type: int | |
| 637 compute_checksum=True, # type: bool | |
| 638 jobname="", # type: Text | |
| 639 readers=None # type: Optional[Dict[Text, Any]] | |
| 640 ): # type: (...) -> OutputPorts | |
| 641 ret = {} # type: OutputPorts | |
| 642 debug = _logger.isEnabledFor(logging.DEBUG) | |
| 643 cwl_version = self.metadata.get( | |
| 644 "http://commonwl.org/cwltool#original_cwlVersion", None) | |
| 645 if cwl_version != "v1.0": | |
| 646 builder.resources["exitCode"] = rcode | |
| 647 try: | |
| 648 fs_access = builder.make_fs_access(outdir) | |
| 649 custom_output = fs_access.join(outdir, "cwl.output.json") | |
| 650 if fs_access.exists(custom_output): | |
| 651 with fs_access.open(custom_output, "r") as f: | |
| 652 ret = json.load(f) | |
| 653 if debug: | |
| 654 _logger.debug(u"Raw output from %s: %s", custom_output, | |
| 655 json_dumps(ret, indent=4)) | |
| 656 else: | |
| 657 for i, port in enumerate(ports): | |
| 658 class ParameterOutputWorkflowException(WorkflowException): | |
| 659 def __init__(self, msg, **kwargs): # type: (Text, **Any) -> None | |
| 660 super(ParameterOutputWorkflowException, self).__init__( | |
| 661 u"Error collecting output for parameter '%s':\n%s" | |
| 662 % (shortname(port["id"]), msg), kwargs) | |
| 663 with SourceLine(ports, i, ParameterOutputWorkflowException, debug): | |
| 664 fragment = shortname(port["id"]) | |
| 665 ret[fragment] = self.collect_output(port, builder, outdir, fs_access, | |
| 666 compute_checksum=compute_checksum) | |
| 667 if ret: | |
| 668 revmap = partial(revmap_file, builder, outdir) | |
| 669 adjustDirObjs(ret, trim_listing) | |
| 670 visit_class(ret, ("File", "Directory"), cast(Callable[[Any], Any], revmap)) | |
| 671 visit_class(ret, ("File", "Directory"), remove_path) | |
| 672 normalizeFilesDirs(ret) | |
| 673 visit_class(ret, ("File", "Directory"), partial(check_valid_locations, fs_access)) | |
| 674 | |
| 675 if compute_checksum: | |
| 676 adjustFileObjs(ret, partial(compute_checksums, fs_access)) | |
| 677 expected_schema = cast(Schema, self.names.get_name( | |
| 678 "outputs_record_schema", "")) | |
| 679 validate.validate_ex(expected_schema, ret, | |
| 680 strict=False, logger=_logger_validation_warnings) | |
| 681 if ret is not None and builder.mutation_manager is not None: | |
| 682 adjustFileObjs(ret, builder.mutation_manager.set_generation) | |
| 683 return ret if ret is not None else {} | |
| 684 except validate.ValidationException as e: | |
| 685 raise_from(WorkflowException( | |
| 686 "Error validating output record. " + Text(e) + "\n in " | |
| 687 + json_dumps(ret, indent=4)), e) | |
| 688 finally: | |
| 689 if builder.mutation_manager and readers: | |
| 690 for r in readers.values(): | |
| 691 builder.mutation_manager.release_reader(jobname, r) | |
| 692 | |
| 693 def collect_output(self, | |
| 694 schema, # type: Dict[Text, Any] | |
| 695 builder, # type: Builder | |
| 696 outdir, # type: Text | |
| 697 fs_access, # type: StdFsAccess | |
| 698 compute_checksum=True # type: bool | |
| 699 ): | |
| 700 # type: (...) -> Optional[Union[Dict[Text, Any], List[Union[Dict[Text, Any], Text]]]] | |
| 701 r = [] # type: List[Any] | |
| 702 empty_and_optional = False | |
| 703 debug = _logger.isEnabledFor(logging.DEBUG) | |
| 704 if "outputBinding" in schema: | |
| 705 binding = schema["outputBinding"] | |
| 706 globpatterns = [] # type: List[Text] | |
| 707 | |
| 708 revmap = partial(revmap_file, builder, outdir) | |
| 709 | |
| 710 if "glob" in binding: | |
| 711 with SourceLine(binding, "glob", WorkflowException, debug): | |
| 712 for gb in aslist(binding["glob"]): | |
| 713 gb = builder.do_eval(gb) | |
| 714 if gb: | |
| 715 globpatterns.extend(aslist(gb)) | |
| 716 | |
| 717 for gb in globpatterns: | |
| 718 if gb.startswith(builder.outdir): | |
| 719 gb = gb[len(builder.outdir) + 1:] | |
| 720 elif gb == ".": | |
| 721 gb = outdir | |
| 722 elif gb.startswith("/"): | |
| 723 raise WorkflowException( | |
| 724 "glob patterns must not start with '/'") | |
| 725 try: | |
| 726 prefix = fs_access.glob(outdir) | |
| 727 r.extend([{"location": g, | |
| 728 "path": fs_access.join(builder.outdir, | |
| 729 g[len(prefix[0])+1:]), | |
| 730 "basename": os.path.basename(g), | |
| 731 "nameroot": os.path.splitext( | |
| 732 os.path.basename(g))[0], | |
| 733 "nameext": os.path.splitext( | |
| 734 os.path.basename(g))[1], | |
| 735 "class": "File" if fs_access.isfile(g) | |
| 736 else "Directory"} | |
| 737 for g in sorted(fs_access.glob( | |
| 738 fs_access.join(outdir, gb)), | |
| 739 key=cmp_to_key(cast( | |
| 740 Callable[[Text, Text], | |
| 741 int], locale.strcoll)))]) | |
| 742 except (OSError, IOError) as e: | |
| 743 _logger.warning(Text(e)) | |
| 744 except Exception: | |
| 745 _logger.error("Unexpected error from fs_access", exc_info=True) | |
| 746 raise | |
| 747 | |
| 748 for files in r: | |
| 749 rfile = files.copy() | |
| 750 revmap(rfile) | |
| 751 if files["class"] == "Directory": | |
| 752 ll = schema.get("loadListing") or builder.loadListing | |
| 753 if ll and ll != "no_listing": | |
| 754 get_listing(fs_access, files, (ll == "deep_listing")) | |
| 755 else: | |
| 756 if binding.get("loadContents"): | |
| 757 with fs_access.open(rfile["location"], "rb") as f: | |
| 758 files["contents"] = content_limit_respected_read_bytes(f).decode("utf-8") | |
| 759 if compute_checksum: | |
| 760 with fs_access.open(rfile["location"], "rb") as f: | |
| 761 checksum = hashlib.sha1() # nosec | |
| 762 contents = f.read(1024 * 1024) | |
| 763 while contents != b"": | |
| 764 checksum.update(contents) | |
| 765 contents = f.read(1024 * 1024) | |
| 766 files["checksum"] = "sha1$%s" % checksum.hexdigest() | |
| 767 files["size"] = fs_access.size(rfile["location"]) | |
| 768 | |
| 769 optional = False | |
| 770 single = False | |
| 771 if isinstance(schema["type"], MutableSequence): | |
| 772 if "null" in schema["type"]: | |
| 773 optional = True | |
| 774 if "File" in schema["type"] or "Directory" in schema["type"]: | |
| 775 single = True | |
| 776 elif schema["type"] == "File" or schema["type"] == "Directory": | |
| 777 single = True | |
| 778 | |
| 779 if "outputEval" in binding: | |
| 780 with SourceLine(binding, "outputEval", WorkflowException, debug): | |
| 781 r = builder.do_eval(binding["outputEval"], context=r) | |
| 782 | |
| 783 if single: | |
| 784 if not r and not optional: | |
| 785 with SourceLine(binding, "glob", WorkflowException, debug): | |
| 786 raise WorkflowException("Did not find output file with glob pattern: '{}'".format(globpatterns)) | |
| 787 elif not r and optional: | |
| 788 pass | |
| 789 elif isinstance(r, MutableSequence): | |
| 790 if len(r) > 1: | |
| 791 raise WorkflowException("Multiple matches for output item that is a single file.") | |
| 792 else: | |
| 793 r = r[0] | |
| 794 | |
| 795 if "secondaryFiles" in schema: | |
| 796 with SourceLine(schema, "secondaryFiles", WorkflowException, debug): | |
| 797 for primary in aslist(r): | |
| 798 if isinstance(primary, MutableMapping): | |
| 799 primary.setdefault("secondaryFiles", []) | |
| 800 pathprefix = primary["path"][0:primary["path"].rindex("/")+1] | |
| 801 for sf in aslist(schema["secondaryFiles"]): | |
| 802 if 'required' in sf: | |
| 803 sf_required = builder.do_eval(sf['required'], context=primary) | |
| 804 else: | |
| 805 sf_required = False | |
| 806 | |
| 807 if "$(" in sf["pattern"] or "${" in sf["pattern"]: | |
| 808 sfpath = builder.do_eval(sf["pattern"], context=primary) | |
| 809 else: | |
| 810 sfpath = substitute(primary["basename"], sf["pattern"]) | |
| 811 | |
| 812 for sfitem in aslist(sfpath): | |
| 813 if not sfitem: | |
| 814 continue | |
| 815 if isinstance(sfitem, string_types): | |
| 816 sfitem = {"path": pathprefix+sfitem} | |
| 817 if not fs_access.exists(sfitem['path']) and sf_required: | |
| 818 raise WorkflowException( | |
| 819 "Missing required secondary file '%s'" % ( | |
| 820 sfitem["path"])) | |
| 821 if "path" in sfitem and "location" not in sfitem: | |
| 822 revmap(sfitem) | |
| 823 if fs_access.isfile(sfitem["location"]): | |
| 824 sfitem["class"] = "File" | |
| 825 primary["secondaryFiles"].append(sfitem) | |
| 826 elif fs_access.isdir(sfitem["location"]): | |
| 827 sfitem["class"] = "Directory" | |
| 828 primary["secondaryFiles"].append(sfitem) | |
| 829 | |
| 830 if "format" in schema: | |
| 831 for primary in aslist(r): | |
| 832 primary["format"] = builder.do_eval(schema["format"], context=primary) | |
| 833 | |
| 834 # Ensure files point to local references outside of the run environment | |
| 835 adjustFileObjs(r, revmap) | |
| 836 | |
| 837 if not r and optional: | |
| 838 # Don't convert zero or empty string to None | |
| 839 if r in [0, '']: | |
| 840 return r | |
| 841 # For [] or None, return None | |
| 842 else: | |
| 843 return None | |
| 844 | |
| 845 if (not empty_and_optional and isinstance(schema["type"], MutableMapping) | |
| 846 and schema["type"]["type"] == "record"): | |
| 847 out = {} | |
| 848 for f in schema["type"]["fields"]: | |
| 849 out[shortname(f["name"])] = self.collect_output( # type: ignore | |
| 850 f, builder, outdir, fs_access, | |
| 851 compute_checksum=compute_checksum) | |
| 852 return out | |
| 853 return r |
