Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/cwltool/process.py @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
| author | guerler |
|---|---|
| date | Fri, 31 Jul 2020 00:18:57 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:d30785e31577 |
|---|---|
| 1 from __future__ import absolute_import | |
| 2 | |
| 3 import abc | |
| 4 import copy | |
| 5 import errno | |
| 6 import functools | |
| 7 import hashlib | |
| 8 import json | |
| 9 import logging | |
| 10 import os | |
| 11 import shutil | |
| 12 import stat | |
| 13 import tempfile | |
| 14 import textwrap | |
| 15 import uuid | |
| 16 | |
| 17 from io import open | |
| 18 from typing import (Any, Callable, Dict, Generator, Iterator, List, | |
| 19 Mapping, MutableMapping, MutableSequence, Optional, Set, Tuple, | |
| 20 Type, Union, cast) | |
| 21 | |
| 22 from pkg_resources import resource_stream | |
| 23 from rdflib import Graph # pylint: disable=unused-import | |
| 24 from ruamel.yaml.comments import CommentedMap, CommentedSeq | |
| 25 from six import PY3, iteritems, itervalues, string_types, with_metaclass | |
| 26 from six.moves import urllib | |
| 27 from future.utils import raise_from | |
| 28 from typing_extensions import (TYPE_CHECKING, # pylint: disable=unused-import | |
| 29 Text) | |
| 30 from schema_salad import schema, validate | |
| 31 from schema_salad.ref_resolver import Loader, file_uri, uri_file_path | |
| 32 from schema_salad.sourceline import SourceLine, strip_dup_lineno | |
| 33 | |
| 34 from . import expression | |
| 35 from .builder import Builder, HasReqsHints | |
| 36 from .context import LoadingContext # pylint: disable=unused-import | |
| 37 from .context import RuntimeContext, getdefault | |
| 38 from .errors import UnsupportedRequirement, WorkflowException | |
| 39 from .loghandler import _logger | |
| 40 from .mutation import MutationManager # pylint: disable=unused-import | |
| 41 from .pathmapper import (PathMapper, adjustDirObjs, ensure_writable, | |
| 42 get_listing, normalizeFilesDirs, visit_class, | |
| 43 MapperEnt) | |
| 44 from .secrets import SecretStore # pylint: disable=unused-import | |
| 45 from .software_requirements import ( # pylint: disable=unused-import | |
| 46 DependenciesConfiguration) | |
| 47 from .stdfsaccess import StdFsAccess | |
| 48 from .utils import (DEFAULT_TMP_PREFIX, aslist, cmp_like_py2, | |
| 49 copytree_with_merge, onWindows, random_outdir) | |
| 50 from .validate_js import validate_js_expressions | |
| 51 from .update import INTERNAL_VERSION | |
| 52 | |
| 53 try: | |
| 54 from os import scandir # type: ignore | |
| 55 except ImportError: | |
| 56 from scandir import scandir # type: ignore | |
| 57 if TYPE_CHECKING: | |
| 58 from .provenance import ProvenanceProfile # pylint: disable=unused-import | |
| 59 | |
| 60 if PY3: | |
| 61 from collections.abc import Iterable # only works on python 3.3+ | |
| 62 else: | |
| 63 from collections import Iterable # pylint: disable=unused-import | |
| 64 | |
| 65 class LogAsDebugFilter(logging.Filter): | |
| 66 def __init__(self, name, parent): # type: (Text, logging.Logger) -> None | |
| 67 """Initialize.""" | |
| 68 name = str(name) | |
| 69 super(LogAsDebugFilter, self).__init__(name) | |
| 70 self.parent = parent | |
| 71 | |
| 72 def filter(self, record): # type: (logging.LogRecord) -> bool | |
| 73 return self.parent.isEnabledFor(logging.DEBUG) | |
| 74 | |
| 75 | |
| 76 _logger_validation_warnings = logging.getLogger("cwltool.validation_warnings") | |
| 77 _logger_validation_warnings.setLevel(_logger.getEffectiveLevel()) | |
| 78 _logger_validation_warnings.addFilter(LogAsDebugFilter("cwltool.validation_warnings", _logger)) | |
| 79 | |
| 80 supportedProcessRequirements = ["DockerRequirement", | |
| 81 "SchemaDefRequirement", | |
| 82 "EnvVarRequirement", | |
| 83 "ScatterFeatureRequirement", | |
| 84 "SubworkflowFeatureRequirement", | |
| 85 "MultipleInputFeatureRequirement", | |
| 86 "InlineJavascriptRequirement", | |
| 87 "ShellCommandRequirement", | |
| 88 "StepInputExpressionRequirement", | |
| 89 "ResourceRequirement", | |
| 90 "InitialWorkDirRequirement", | |
| 91 "ToolTimeLimit", | |
| 92 "WorkReuse", | |
| 93 "NetworkAccess", | |
| 94 "InplaceUpdateRequirement", | |
| 95 "LoadListingRequirement", | |
| 96 "http://commonwl.org/cwltool#TimeLimit", | |
| 97 "http://commonwl.org/cwltool#WorkReuse", | |
| 98 "http://commonwl.org/cwltool#NetworkAccess", | |
| 99 "http://commonwl.org/cwltool#LoadListingRequirement", | |
| 100 "http://commonwl.org/cwltool#InplaceUpdateRequirement"] | |
| 101 | |
| 102 cwl_files = ( | |
| 103 "Workflow.yml", | |
| 104 "CommandLineTool.yml", | |
| 105 "CommonWorkflowLanguage.yml", | |
| 106 "Process.yml", | |
| 107 "concepts.md", | |
| 108 "contrib.md", | |
| 109 "intro.md", | |
| 110 "invocation.md") | |
| 111 | |
| 112 salad_files = ('metaschema.yml', | |
| 113 'metaschema_base.yml', | |
| 114 'salad.md', | |
| 115 'field_name.yml', | |
| 116 'import_include.md', | |
| 117 'link_res.yml', | |
| 118 'ident_res.yml', | |
| 119 'vocab_res.yml', | |
| 120 'vocab_res.yml', | |
| 121 'field_name_schema.yml', | |
| 122 'field_name_src.yml', | |
| 123 'field_name_proc.yml', | |
| 124 'ident_res_schema.yml', | |
| 125 'ident_res_src.yml', | |
| 126 'ident_res_proc.yml', | |
| 127 'link_res_schema.yml', | |
| 128 'link_res_src.yml', | |
| 129 'link_res_proc.yml', | |
| 130 'vocab_res_schema.yml', | |
| 131 'vocab_res_src.yml', | |
| 132 'vocab_res_proc.yml') | |
| 133 | |
| 134 SCHEMA_CACHE = {} # type: Dict[Text, Tuple[Loader, Union[schema.Names, schema.SchemaParseException], Dict[Text, Any], Loader]] | |
| 135 SCHEMA_FILE = None # type: Optional[Dict[Text, Any]] | |
| 136 SCHEMA_DIR = None # type: Optional[Dict[Text, Any]] | |
| 137 SCHEMA_ANY = None # type: Optional[Dict[Text, Any]] | |
| 138 | |
| 139 custom_schemas = {} # type: Dict[Text, Tuple[Text, Text]] | |
| 140 | |
| 141 def use_standard_schema(version): | |
| 142 # type: (Text) -> None | |
| 143 if version in custom_schemas: | |
| 144 del custom_schemas[version] | |
| 145 if version in SCHEMA_CACHE: | |
| 146 del SCHEMA_CACHE[version] | |
| 147 | |
| 148 def use_custom_schema(version, name, text): | |
| 149 # type: (Text, Text, Union[Text, bytes]) -> None | |
| 150 if isinstance(text, bytes): | |
| 151 text2 = text.decode() | |
| 152 else: | |
| 153 text2 = text | |
| 154 custom_schemas[version] = (name, text2) | |
| 155 if version in SCHEMA_CACHE: | |
| 156 del SCHEMA_CACHE[version] | |
| 157 | |
| 158 def get_schema(version): | |
| 159 # type: (Text) -> Tuple[Loader, Union[schema.Names, schema.SchemaParseException], Dict[Text,Any], Loader] | |
| 160 | |
| 161 if version in SCHEMA_CACHE: | |
| 162 return SCHEMA_CACHE[version] | |
| 163 | |
| 164 cache = {} # type: Dict[Text, Any] | |
| 165 version = version.split("#")[-1] | |
| 166 if '.dev' in version: | |
| 167 version = ".".join(version.split(".")[:-1]) | |
| 168 for f in cwl_files: | |
| 169 try: | |
| 170 res = resource_stream(__name__, 'schemas/%s/%s' % (version, f)) | |
| 171 cache["https://w3id.org/cwl/" + f] = res.read() | |
| 172 res.close() | |
| 173 except IOError: | |
| 174 pass | |
| 175 | |
| 176 for f in salad_files: | |
| 177 try: | |
| 178 res = resource_stream( | |
| 179 __name__, 'schemas/{}/salad/schema_salad/metaschema/{}'.format( | |
| 180 version, f)) | |
| 181 cache["https://w3id.org/cwl/salad/schema_salad/metaschema/" | |
| 182 + f] = res.read() | |
| 183 res.close() | |
| 184 except IOError: | |
| 185 pass | |
| 186 | |
| 187 if version in custom_schemas: | |
| 188 cache[custom_schemas[version][0]] = custom_schemas[version][1] | |
| 189 SCHEMA_CACHE[version] = schema.load_schema( | |
| 190 custom_schemas[version][0], cache=cache) | |
| 191 else: | |
| 192 SCHEMA_CACHE[version] = schema.load_schema( | |
| 193 "https://w3id.org/cwl/CommonWorkflowLanguage.yml", cache=cache) | |
| 194 | |
| 195 return SCHEMA_CACHE[version] | |
| 196 | |
| 197 | |
| 198 def shortname(inputid): | |
| 199 # type: (Text) -> Text | |
| 200 d = urllib.parse.urlparse(inputid) | |
| 201 if d.fragment: | |
| 202 return d.fragment.split(u"/")[-1] | |
| 203 return d.path.split(u"/")[-1] | |
| 204 | |
| 205 | |
| 206 def checkRequirements(rec, supported_process_requirements): | |
| 207 # type: (Any, Iterable[Any]) -> None | |
| 208 if isinstance(rec, MutableMapping): | |
| 209 if "requirements" in rec: | |
| 210 for i, entry in enumerate(rec["requirements"]): | |
| 211 with SourceLine(rec["requirements"], i, UnsupportedRequirement): | |
| 212 if entry["class"] not in supported_process_requirements: | |
| 213 raise UnsupportedRequirement( | |
| 214 u"Unsupported requirement {}".format(entry["class"])) | |
| 215 for key in rec: | |
| 216 checkRequirements(rec[key], supported_process_requirements) | |
| 217 if isinstance(rec, MutableSequence): | |
| 218 for entry in rec: | |
| 219 checkRequirements(entry, supported_process_requirements) | |
| 220 | |
| 221 | |
| 222 def stage_files(pathmapper, # type: PathMapper | |
| 223 stage_func=None, # type: Optional[Callable[..., Any]] | |
| 224 ignore_writable=False, # type: bool | |
| 225 symlink=True, # type: bool | |
| 226 secret_store=None, # type: Optional[SecretStore] | |
| 227 fix_conflicts=False # type: bool | |
| 228 ): # type: (...) -> None | |
| 229 """Link or copy files to their targets. Create them as needed.""" | |
| 230 | |
| 231 targets = {} # type: Dict[Text, MapperEnt] | |
| 232 for key, entry in pathmapper.items(): | |
| 233 if not 'File' in entry.type: | |
| 234 continue | |
| 235 if entry.target not in targets: | |
| 236 targets[entry.target] = entry | |
| 237 elif targets[entry.target].resolved != entry.resolved: | |
| 238 if fix_conflicts: | |
| 239 tgt = entry.target | |
| 240 i = 2 | |
| 241 tgt = "%s_%s" % (tgt, i) | |
| 242 while tgt in targets: | |
| 243 i += 1 | |
| 244 tgt = "%s_%s" % (tgt, i) | |
| 245 targets[tgt] = pathmapper.update(key, entry.resolved, tgt, entry.type, entry.staged) | |
| 246 else: | |
| 247 raise WorkflowException("File staging conflict, trying to stage both %s and %s to the same target %s" % ( | |
| 248 targets[entry.target].resolved, entry.resolved, entry.target)) | |
| 249 | |
| 250 for key, entry in pathmapper.items(): | |
| 251 if not entry.staged: | |
| 252 continue | |
| 253 if not os.path.exists(os.path.dirname(entry.target)): | |
| 254 os.makedirs(os.path.dirname(entry.target)) | |
| 255 if entry.type in ("File", "Directory") and os.path.exists(entry.resolved): | |
| 256 if symlink: # Use symlink func if allowed | |
| 257 if onWindows(): | |
| 258 if entry.type == "File": | |
| 259 shutil.copy(entry.resolved, entry.target) | |
| 260 elif entry.type == "Directory": | |
| 261 if os.path.exists(entry.target) \ | |
| 262 and os.path.isdir(entry.target): | |
| 263 shutil.rmtree(entry.target) | |
| 264 copytree_with_merge(entry.resolved, entry.target) | |
| 265 else: | |
| 266 os.symlink(entry.resolved, entry.target) | |
| 267 elif stage_func is not None: | |
| 268 stage_func(entry.resolved, entry.target) | |
| 269 elif entry.type == "Directory" and not os.path.exists(entry.target) \ | |
| 270 and entry.resolved.startswith("_:"): | |
| 271 os.makedirs(entry.target) | |
| 272 elif entry.type == "WritableFile" and not ignore_writable: | |
| 273 shutil.copy(entry.resolved, entry.target) | |
| 274 ensure_writable(entry.target) | |
| 275 elif entry.type == "WritableDirectory" and not ignore_writable: | |
| 276 if entry.resolved.startswith("_:"): | |
| 277 os.makedirs(entry.target) | |
| 278 else: | |
| 279 shutil.copytree(entry.resolved, entry.target) | |
| 280 ensure_writable(entry.target) | |
| 281 elif entry.type == "CreateFile" or entry.type == "CreateWritableFile": | |
| 282 with open(entry.target, "wb") as new: | |
| 283 if secret_store is not None: | |
| 284 new.write( | |
| 285 secret_store.retrieve(entry.resolved).encode("utf-8")) | |
| 286 else: | |
| 287 new.write(entry.resolved.encode("utf-8")) | |
| 288 if entry.type == "CreateFile": | |
| 289 os.chmod(entry.target, stat.S_IRUSR) # Read only | |
| 290 else: # it is a "CreateWritableFile" | |
| 291 ensure_writable(entry.target) | |
| 292 pathmapper.update( | |
| 293 key, entry.target, entry.target, entry.type, entry.staged) | |
| 294 | |
| 295 | |
| 296 def relocateOutputs(outputObj, # type: Union[Dict[Text, Any], List[Dict[Text, Any]]] | |
| 297 destination_path, # type: Text | |
| 298 source_directories, # type: Set[Text] | |
| 299 action, # type: Text | |
| 300 fs_access, # type: StdFsAccess | |
| 301 compute_checksum=True, # type: bool | |
| 302 path_mapper=PathMapper # type: Type[PathMapper] | |
| 303 ): | |
| 304 # type: (...) -> Union[Dict[Text, Any], List[Dict[Text, Any]]] | |
| 305 adjustDirObjs(outputObj, functools.partial(get_listing, fs_access, recursive=True)) | |
| 306 | |
| 307 if action not in ("move", "copy"): | |
| 308 return outputObj | |
| 309 | |
| 310 def _collectDirEntries(obj): | |
| 311 # type: (Union[Dict[Text, Any], List[Dict[Text, Any]]]) -> Iterator[Dict[Text, Any]] | |
| 312 if isinstance(obj, dict): | |
| 313 if obj.get("class") in ("File", "Directory"): | |
| 314 yield obj | |
| 315 else: | |
| 316 for sub_obj in obj.values(): | |
| 317 for dir_entry in _collectDirEntries(sub_obj): | |
| 318 yield dir_entry | |
| 319 elif isinstance(obj, MutableSequence): | |
| 320 for sub_obj in obj: | |
| 321 for dir_entry in _collectDirEntries(sub_obj): | |
| 322 yield dir_entry | |
| 323 | |
| 324 def _relocate(src, dst): # type: (Text, Text) -> None | |
| 325 if src == dst: | |
| 326 return | |
| 327 | |
| 328 # If the source is not contained in source_directories we're not allowed to delete it | |
| 329 src = fs_access.realpath(src) | |
| 330 src_can_deleted = any(os.path.commonprefix([p, src]) == p for p in source_directories) | |
| 331 | |
| 332 _action = "move" if action == "move" and src_can_deleted else "copy" | |
| 333 | |
| 334 if _action == "move": | |
| 335 _logger.debug("Moving %s to %s", src, dst) | |
| 336 if fs_access.isdir(src) and fs_access.isdir(dst): | |
| 337 # merge directories | |
| 338 for dir_entry in scandir(src): | |
| 339 _relocate(dir_entry.path, fs_access.join(dst, dir_entry.name)) | |
| 340 else: | |
| 341 shutil.move(src, dst) | |
| 342 | |
| 343 elif _action == "copy": | |
| 344 _logger.debug("Copying %s to %s", src, dst) | |
| 345 if fs_access.isdir(src): | |
| 346 if os.path.isdir(dst): | |
| 347 shutil.rmtree(dst) | |
| 348 elif os.path.isfile(dst): | |
| 349 os.unlink(dst) | |
| 350 shutil.copytree(src, dst) | |
| 351 else: | |
| 352 shutil.copy2(src, dst) | |
| 353 | |
| 354 def _realpath(ob): # type: (Dict[Text, Any]) -> None | |
| 355 if ob["location"].startswith("file:"): | |
| 356 ob["location"] = file_uri(os.path.realpath(uri_file_path(ob["location"]))) | |
| 357 if ob["location"].startswith("/"): | |
| 358 ob["location"] = os.path.realpath(ob["location"]) | |
| 359 | |
| 360 outfiles = list(_collectDirEntries(outputObj)) | |
| 361 visit_class(outfiles, ("File", "Directory"), _realpath) | |
| 362 pm = path_mapper(outfiles, "", destination_path, separateDirs=False) | |
| 363 stage_files(pm, stage_func=_relocate, symlink=False, fix_conflicts=True) | |
| 364 | |
| 365 def _check_adjust(a_file): # type: (Dict[Text, Text]) -> Dict[Text, Text] | |
| 366 a_file["location"] = file_uri(pm.mapper(a_file["location"])[1]) | |
| 367 if "contents" in a_file: | |
| 368 del a_file["contents"] | |
| 369 return a_file | |
| 370 | |
| 371 visit_class(outputObj, ("File", "Directory"), _check_adjust) | |
| 372 | |
| 373 if compute_checksum: | |
| 374 visit_class(outputObj, ("File",), functools.partial( | |
| 375 compute_checksums, fs_access)) | |
| 376 return outputObj | |
| 377 | |
| 378 | |
| 379 def cleanIntermediate(output_dirs): # type: (Iterable[Text]) -> None | |
| 380 for a in output_dirs: | |
| 381 if os.path.exists(a): | |
| 382 _logger.debug(u"Removing intermediate output directory %s", a) | |
| 383 shutil.rmtree(a, True) | |
| 384 | |
| 385 def add_sizes(fsaccess, obj): # type: (StdFsAccess, Dict[Text, Any]) -> None | |
| 386 if 'location' in obj: | |
| 387 try: | |
| 388 if "size" not in obj: | |
| 389 obj["size"] = fsaccess.size(obj["location"]) | |
| 390 except OSError: | |
| 391 pass | |
| 392 elif 'contents' in obj: | |
| 393 obj["size"] = len(obj['contents']) | |
| 394 else: | |
| 395 return # best effort | |
| 396 | |
| 397 def fill_in_defaults(inputs, # type: List[Dict[Text, Text]] | |
| 398 job, # type: Dict[Text, expression.JSON] | |
| 399 fsaccess # type: StdFsAccess | |
| 400 ): # type: (...) -> None | |
| 401 for e, inp in enumerate(inputs): | |
| 402 with SourceLine(inputs, e, WorkflowException, _logger.isEnabledFor(logging.DEBUG)): | |
| 403 fieldname = shortname(inp[u"id"]) | |
| 404 if job.get(fieldname) is not None: | |
| 405 pass | |
| 406 elif job.get(fieldname) is None and u"default" in inp: | |
| 407 job[fieldname] = copy.deepcopy(inp[u"default"]) | |
| 408 elif job.get(fieldname) is None and u"null" in aslist(inp[u"type"]): | |
| 409 job[fieldname] = None | |
| 410 else: | |
| 411 raise WorkflowException("Missing required input parameter '%s'" % shortname(inp["id"])) | |
| 412 | |
| 413 | |
| 414 def avroize_type(field_type, name_prefix=""): | |
| 415 # type: (Union[List[Dict[Text, Any]], Dict[Text, Any]], Text) -> Any | |
| 416 """Add missing information to a type so that CWL types are valid.""" | |
| 417 if isinstance(field_type, MutableSequence): | |
| 418 for field in field_type: | |
| 419 avroize_type(field, name_prefix) | |
| 420 elif isinstance(field_type, MutableMapping): | |
| 421 if field_type["type"] in ("enum", "record"): | |
| 422 if "name" not in field_type: | |
| 423 field_type["name"] = name_prefix + Text(uuid.uuid4()) | |
| 424 if field_type["type"] == "record": | |
| 425 avroize_type(field_type["fields"], name_prefix) | |
| 426 if field_type["type"] == "array": | |
| 427 avroize_type(field_type["items"], name_prefix) | |
| 428 if isinstance(field_type["type"], MutableSequence): | |
| 429 for ctype in field_type["type"]: | |
| 430 avroize_type(ctype, name_prefix) | |
| 431 return field_type | |
| 432 | |
| 433 def get_overrides(overrides, toolid): # type: (List[Dict[Text, Any]], Text) -> Dict[Text, Any] | |
| 434 req = {} # type: Dict[Text, Any] | |
| 435 if not isinstance(overrides, MutableSequence): | |
| 436 raise validate.ValidationException("Expected overrides to be a list, but was %s" % type(overrides)) | |
| 437 for ov in overrides: | |
| 438 if ov["overrideTarget"] == toolid: | |
| 439 req.update(ov) | |
| 440 return req | |
| 441 | |
| 442 | |
| 443 _VAR_SPOOL_ERROR = textwrap.dedent( | |
| 444 """ | |
| 445 Non-portable reference to /var/spool/cwl detected: '{}'. | |
| 446 To fix, replace /var/spool/cwl with $(runtime.outdir) or add | |
| 447 DockerRequirement to the 'requirements' section and declare | |
| 448 'dockerOutputDirectory: /var/spool/cwl'. | |
| 449 """) | |
| 450 | |
| 451 | |
| 452 def var_spool_cwl_detector(obj, # type: Union[MutableMapping[Text, Text], List[Dict[Text, Any]], Text] | |
| 453 item=None, # type: Optional[Any] | |
| 454 obj_key=None, # type: Optional[Any] | |
| 455 ): # type: (...)->bool | |
| 456 """Detect any textual reference to /var/spool/cwl.""" | |
| 457 r = False | |
| 458 if isinstance(obj, string_types): | |
| 459 if "var/spool/cwl" in obj and obj_key != "dockerOutputDirectory": | |
| 460 _logger.warning( | |
| 461 SourceLine(item=item, key=obj_key, raise_type=Text).makeError( | |
| 462 _VAR_SPOOL_ERROR.format(obj))) | |
| 463 r = True | |
| 464 elif isinstance(obj, MutableMapping): | |
| 465 for mkey, mvalue in iteritems(obj): | |
| 466 r = var_spool_cwl_detector(mvalue, obj, mkey) or r | |
| 467 elif isinstance(obj, MutableSequence): | |
| 468 for lkey, lvalue in enumerate(obj): | |
| 469 r = var_spool_cwl_detector(lvalue, obj, lkey) or r | |
| 470 return r | |
| 471 | |
| 472 def eval_resource(builder, resource_req): # type: (Builder, Text) -> Any | |
| 473 if expression.needs_parsing(resource_req): | |
| 474 return builder.do_eval(resource_req) | |
| 475 return resource_req | |
| 476 | |
| 477 | |
| 478 # Threshold where the "too many files" warning kicks in | |
| 479 FILE_COUNT_WARNING = 5000 | |
| 480 | |
| 481 class Process(with_metaclass(abc.ABCMeta, HasReqsHints)): | |
| 482 def __init__(self, | |
| 483 toolpath_object, # type: MutableMapping[Text, Any] | |
| 484 loadingContext # type: LoadingContext | |
| 485 ): # type: (...) -> None | |
| 486 """Build a Process object from the provided dictionary.""" | |
| 487 self.metadata = getdefault(loadingContext.metadata, {}) # type: Dict[Text,Any] | |
| 488 self.provenance_object = None # type: Optional[ProvenanceProfile] | |
| 489 self.parent_wf = None # type: Optional[ProvenanceProfile] | |
| 490 global SCHEMA_FILE, SCHEMA_DIR, SCHEMA_ANY # pylint: disable=global-statement | |
| 491 if SCHEMA_FILE is None or SCHEMA_ANY is None or SCHEMA_DIR is None: | |
| 492 get_schema("v1.0") | |
| 493 SCHEMA_ANY = cast(Dict[Text, Any], | |
| 494 SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/salad#Any"]) | |
| 495 SCHEMA_FILE = cast(Dict[Text, Any], | |
| 496 SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/cwl#File"]) | |
| 497 SCHEMA_DIR = cast(Dict[Text, Any], | |
| 498 SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/cwl#Directory"]) | |
| 499 | |
| 500 self.names = schema.make_avro_schema([SCHEMA_FILE, SCHEMA_DIR, SCHEMA_ANY], | |
| 501 Loader({})) | |
| 502 self.tool = toolpath_object | |
| 503 self.requirements = copy.deepcopy(getdefault(loadingContext.requirements, [])) | |
| 504 self.requirements.extend(self.tool.get("requirements", [])) | |
| 505 if "id" not in self.tool: | |
| 506 self.tool["id"] = "_:" + Text(uuid.uuid4()) | |
| 507 self.requirements.extend(get_overrides(getdefault(loadingContext.overrides_list, []), | |
| 508 self.tool["id"]).get("requirements", [])) | |
| 509 self.hints = copy.deepcopy(getdefault(loadingContext.hints, [])) | |
| 510 self.hints.extend(self.tool.get("hints", [])) | |
| 511 # Versions of requirements and hints which aren't mutated. | |
| 512 self.original_requirements = copy.deepcopy(self.requirements) | |
| 513 self.original_hints = copy.deepcopy(self.hints) | |
| 514 self.doc_loader = loadingContext.loader | |
| 515 self.doc_schema = loadingContext.avsc_names | |
| 516 | |
| 517 self.formatgraph = None # type: Optional[Graph] | |
| 518 if self.doc_loader is not None: | |
| 519 self.formatgraph = self.doc_loader.graph | |
| 520 | |
| 521 checkRequirements(self.tool, supportedProcessRequirements) | |
| 522 self.validate_hints(loadingContext.avsc_names, self.tool.get("hints", []), | |
| 523 strict=getdefault(loadingContext.strict, False)) | |
| 524 | |
| 525 self.schemaDefs = {} # type: Dict[Text,Dict[Text, Any]] | |
| 526 | |
| 527 sd, _ = self.get_requirement("SchemaDefRequirement") | |
| 528 | |
| 529 if sd is not None: | |
| 530 sdtypes = avroize_type(sd["types"]) | |
| 531 av = schema.make_valid_avro(sdtypes, {t["name"]: t for t in sdtypes}, set()) | |
| 532 for i in av: | |
| 533 self.schemaDefs[i["name"]] = i # type: ignore | |
| 534 schema.make_avsc_object(schema.convert_to_dict(av), self.names) | |
| 535 | |
| 536 # Build record schema from inputs | |
| 537 self.inputs_record_schema = { | |
| 538 "name": "input_record_schema", "type": "record", | |
| 539 "fields": []} # type: Dict[Text, Any] | |
| 540 self.outputs_record_schema = { | |
| 541 "name": "outputs_record_schema", "type": "record", | |
| 542 "fields": []} # type: Dict[Text, Any] | |
| 543 | |
| 544 for key in ("inputs", "outputs"): | |
| 545 for i in self.tool[key]: | |
| 546 c = copy.deepcopy(i) | |
| 547 c["name"] = shortname(c["id"]) | |
| 548 del c["id"] | |
| 549 | |
| 550 if "type" not in c: | |
| 551 raise validate.ValidationException( | |
| 552 u"Missing 'type' in parameter '{}'".format(c["name"])) | |
| 553 | |
| 554 if "default" in c and "null" not in aslist(c["type"]): | |
| 555 nullable = ["null"] | |
| 556 nullable.extend(aslist(c["type"])) | |
| 557 c["type"] = nullable | |
| 558 else: | |
| 559 c["type"] = c["type"] | |
| 560 c["type"] = avroize_type(c["type"], c["name"]) | |
| 561 if key == "inputs": | |
| 562 self.inputs_record_schema["fields"].append(c) | |
| 563 elif key == "outputs": | |
| 564 self.outputs_record_schema["fields"].append(c) | |
| 565 | |
| 566 with SourceLine(toolpath_object, "inputs", validate.ValidationException): | |
| 567 self.inputs_record_schema = cast( | |
| 568 Dict[Text, Any], schema.make_valid_avro( | |
| 569 self.inputs_record_schema, {}, set())) | |
| 570 schema.make_avsc_object( | |
| 571 schema.convert_to_dict(self.inputs_record_schema), self.names) | |
| 572 with SourceLine(toolpath_object, "outputs", validate.ValidationException): | |
| 573 self.outputs_record_schema = cast( | |
| 574 Dict[Text, Any], | |
| 575 schema.make_valid_avro(self.outputs_record_schema, {}, set())) | |
| 576 schema.make_avsc_object( | |
| 577 schema.convert_to_dict(self.outputs_record_schema), self.names) | |
| 578 | |
| 579 if toolpath_object.get("class") is not None \ | |
| 580 and not getdefault(loadingContext.disable_js_validation, False): | |
| 581 if loadingContext.js_hint_options_file is not None: | |
| 582 try: | |
| 583 with open(loadingContext.js_hint_options_file) as options_file: | |
| 584 validate_js_options = json.load(options_file) | |
| 585 except (OSError, ValueError) as err: | |
| 586 _logger.error( | |
| 587 "Failed to read options file %s", | |
| 588 loadingContext.js_hint_options_file) | |
| 589 raise | |
| 590 else: | |
| 591 validate_js_options = None | |
| 592 if self.doc_schema is not None: | |
| 593 validate_js_expressions( | |
| 594 cast(CommentedMap, toolpath_object), | |
| 595 self.doc_schema.names[toolpath_object["class"]], | |
| 596 validate_js_options) | |
| 597 | |
| 598 dockerReq, is_req = self.get_requirement("DockerRequirement") | |
| 599 | |
| 600 if dockerReq is not None and "dockerOutputDirectory" in dockerReq\ | |
| 601 and is_req is not None and not is_req: | |
| 602 _logger.warning(SourceLine( | |
| 603 item=dockerReq, raise_type=Text).makeError( | |
| 604 "When 'dockerOutputDirectory' is declared, DockerRequirement " | |
| 605 "should go in the 'requirements' section, not 'hints'.""")) | |
| 606 | |
| 607 if dockerReq is not None and is_req is not None\ | |
| 608 and dockerReq.get("dockerOutputDirectory") == "/var/spool/cwl": | |
| 609 if is_req: | |
| 610 # In this specific case, it is legal to have /var/spool/cwl, so skip the check. | |
| 611 pass | |
| 612 else: | |
| 613 # Must be a requirement | |
| 614 var_spool_cwl_detector(self.tool) | |
| 615 else: | |
| 616 var_spool_cwl_detector(self.tool) | |
| 617 | |
| 618 def _init_job(self, joborder, runtime_context): | |
| 619 # type: (Mapping[Text, Text], RuntimeContext) -> Builder | |
| 620 | |
| 621 if self.metadata.get("cwlVersion") != INTERNAL_VERSION: | |
| 622 raise WorkflowException("Process object loaded with version '%s', must update to '%s' in order to execute." % ( | |
| 623 self.metadata.get("cwlVersion"), INTERNAL_VERSION)) | |
| 624 | |
| 625 job = cast(Dict[Text, expression.JSON], copy.deepcopy(joborder)) | |
| 626 | |
| 627 make_fs_access = getdefault(runtime_context.make_fs_access, StdFsAccess) | |
| 628 fs_access = make_fs_access(runtime_context.basedir) | |
| 629 | |
| 630 load_listing_req, _ = self.get_requirement( | |
| 631 "LoadListingRequirement") | |
| 632 | |
| 633 if load_listing_req is not None: | |
| 634 load_listing = load_listing_req.get("loadListing") | |
| 635 else: | |
| 636 load_listing = "no_listing" | |
| 637 | |
| 638 # Validate job order | |
| 639 try: | |
| 640 fill_in_defaults(self.tool[u"inputs"], job, fs_access) | |
| 641 | |
| 642 normalizeFilesDirs(job) | |
| 643 schema = self.names.get_name("input_record_schema", "") | |
| 644 if schema is None: | |
| 645 raise WorkflowException("Missing input record schema: " | |
| 646 "{}".format(self.names)) | |
| 647 validate.validate_ex(schema, job, strict=False, | |
| 648 logger=_logger_validation_warnings) | |
| 649 | |
| 650 if load_listing and load_listing != "no_listing": | |
| 651 get_listing(fs_access, job, recursive=(load_listing == "deep_listing")) | |
| 652 | |
| 653 visit_class(job, ("File",), functools.partial(add_sizes, fs_access)) | |
| 654 | |
| 655 if load_listing == "deep_listing": | |
| 656 for i, inparm in enumerate(self.tool["inputs"]): | |
| 657 k = shortname(inparm["id"]) | |
| 658 if k not in job: | |
| 659 continue | |
| 660 v = job[k] | |
| 661 dircount = [0] | |
| 662 | |
| 663 def inc(d): # type: (List[int]) -> None | |
| 664 d[0] += 1 | |
| 665 visit_class(v, ("Directory",), lambda x: inc(dircount)) | |
| 666 if dircount[0] == 0: | |
| 667 continue | |
| 668 filecount = [0] | |
| 669 visit_class(v, ("File",), lambda x: inc(filecount)) | |
| 670 if filecount[0] > FILE_COUNT_WARNING: | |
| 671 # Long lines in this message are okay, will be reflowed based on terminal columns. | |
| 672 _logger.warning(strip_dup_lineno(SourceLine(self.tool["inputs"], i, Text).makeError( | |
| 673 """Recursive directory listing has resulted in a large number of File objects (%s) passed to the input parameter '%s'. This may negatively affect workflow performance and memory use. | |
| 674 | |
| 675 If this is a problem, use the hint 'cwltool:LoadListingRequirement' with "shallow_listing" or "no_listing" to change the directory listing behavior: | |
| 676 | |
| 677 $namespaces: | |
| 678 cwltool: "http://commonwl.org/cwltool#" | |
| 679 hints: | |
| 680 cwltool:LoadListingRequirement: | |
| 681 loadListing: shallow_listing | |
| 682 | |
| 683 """ % (filecount[0], k)))) | |
| 684 | |
| 685 except (validate.ValidationException, WorkflowException) as err: | |
| 686 raise_from(WorkflowException("Invalid job input record:\n" + Text(err)), err) | |
| 687 | |
| 688 files = [] # type: List[Dict[Text, Text]] | |
| 689 bindings = CommentedSeq() | |
| 690 tmpdir = u"" | |
| 691 stagedir = u"" | |
| 692 | |
| 693 docker_req, _ = self.get_requirement("DockerRequirement") | |
| 694 default_docker = None | |
| 695 | |
| 696 if docker_req is None and runtime_context.default_container: | |
| 697 default_docker = runtime_context.default_container | |
| 698 | |
| 699 if (docker_req or default_docker) and runtime_context.use_container: | |
| 700 if docker_req is not None: | |
| 701 # Check if docker output directory is absolute | |
| 702 if docker_req.get("dockerOutputDirectory") and \ | |
| 703 docker_req.get("dockerOutputDirectory").startswith('/'): | |
| 704 outdir = docker_req.get("dockerOutputDirectory") | |
| 705 else: | |
| 706 outdir = docker_req.get("dockerOutputDirectory") or \ | |
| 707 runtime_context.docker_outdir or random_outdir() | |
| 708 elif default_docker is not None: | |
| 709 outdir = runtime_context.docker_outdir or random_outdir() | |
| 710 tmpdir = runtime_context.docker_tmpdir or "/tmp" # nosec | |
| 711 stagedir = runtime_context.docker_stagedir or "/var/lib/cwl" | |
| 712 else: | |
| 713 outdir = fs_access.realpath( | |
| 714 runtime_context.outdir or tempfile.mkdtemp( | |
| 715 prefix=getdefault(runtime_context.tmp_outdir_prefix, | |
| 716 DEFAULT_TMP_PREFIX))) | |
| 717 if self.tool[u"class"] != 'Workflow': | |
| 718 tmpdir = fs_access.realpath(runtime_context.tmpdir | |
| 719 or tempfile.mkdtemp()) | |
| 720 stagedir = fs_access.realpath(runtime_context.stagedir | |
| 721 or tempfile.mkdtemp()) | |
| 722 | |
| 723 builder = Builder(job, | |
| 724 files, | |
| 725 bindings, | |
| 726 self.schemaDefs, | |
| 727 self.names, | |
| 728 self.requirements, | |
| 729 self.hints, | |
| 730 {}, | |
| 731 runtime_context.mutation_manager, | |
| 732 self.formatgraph, | |
| 733 make_fs_access, | |
| 734 fs_access, | |
| 735 runtime_context.job_script_provider, | |
| 736 runtime_context.eval_timeout, | |
| 737 runtime_context.debug, | |
| 738 runtime_context.js_console, | |
| 739 runtime_context.force_docker_pull, | |
| 740 load_listing, | |
| 741 outdir, | |
| 742 tmpdir, | |
| 743 stagedir) | |
| 744 | |
| 745 bindings.extend(builder.bind_input( | |
| 746 self.inputs_record_schema, job, | |
| 747 discover_secondaryFiles=getdefault(runtime_context.toplevel, False))) | |
| 748 | |
| 749 if self.tool.get("baseCommand"): | |
| 750 for index, command in enumerate(aslist(self.tool["baseCommand"])): | |
| 751 bindings.append({ | |
| 752 "position": [-1000000, index], | |
| 753 "datum": command | |
| 754 }) | |
| 755 | |
| 756 if self.tool.get("arguments"): | |
| 757 for i, arg in enumerate(self.tool["arguments"]): | |
| 758 lc = self.tool["arguments"].lc.data[i] | |
| 759 filename = self.tool["arguments"].lc.filename | |
| 760 bindings.lc.add_kv_line_col(len(bindings), lc) | |
| 761 if isinstance(arg, MutableMapping): | |
| 762 arg = copy.deepcopy(arg) | |
| 763 if arg.get("position"): | |
| 764 position = arg.get("position") | |
| 765 if isinstance(position, str): # no need to test the | |
| 766 # CWLVersion as the v1.0 | |
| 767 # schema only allows ints | |
| 768 position = builder.do_eval(position) | |
| 769 if position is None: | |
| 770 position = 0 | |
| 771 arg["position"] = [position, i] | |
| 772 else: | |
| 773 arg["position"] = [0, i] | |
| 774 bindings.append(arg) | |
| 775 elif ("$(" in arg) or ("${" in arg): | |
| 776 cm = CommentedMap(( | |
| 777 ("position", [0, i]), | |
| 778 ("valueFrom", arg) | |
| 779 )) | |
| 780 cm.lc.add_kv_line_col("valueFrom", lc) | |
| 781 cm.lc.filename = filename | |
| 782 bindings.append(cm) | |
| 783 else: | |
| 784 cm = CommentedMap(( | |
| 785 ("position", [0, i]), | |
| 786 ("datum", arg) | |
| 787 )) | |
| 788 cm.lc.add_kv_line_col("datum", lc) | |
| 789 cm.lc.filename = filename | |
| 790 bindings.append(cm) | |
| 791 | |
| 792 # use python2 like sorting of heterogeneous lists | |
| 793 # (containing str and int types), | |
| 794 if PY3: | |
| 795 key = functools.cmp_to_key(cmp_like_py2) | |
| 796 else: # PY2 | |
| 797 key = lambda d: d["position"] | |
| 798 | |
| 799 # This awkward construction replaces the contents of | |
| 800 # "bindings" in place (because Builder expects it to be | |
| 801 # mutated in place, sigh, I'm sorry) with its contents sorted, | |
| 802 # supporting different versions of Python and ruamel.yaml with | |
| 803 # different behaviors/bugs in CommentedSeq. | |
| 804 bindings_copy = copy.deepcopy(bindings) | |
| 805 del bindings[:] | |
| 806 bindings.extend(sorted(bindings_copy, key=key)) | |
| 807 | |
| 808 if self.tool[u"class"] != 'Workflow': | |
| 809 builder.resources = self.evalResources(builder, runtime_context) | |
| 810 return builder | |
| 811 | |
| 812 def evalResources(self, builder, runtimeContext): | |
| 813 # type: (Builder, RuntimeContext) -> Dict[str, int] | |
| 814 resourceReq, _ = self.get_requirement("ResourceRequirement") | |
| 815 if resourceReq is None: | |
| 816 resourceReq = {} | |
| 817 cwl_version = self.metadata.get( | |
| 818 "http://commonwl.org/cwltool#original_cwlVersion", None) | |
| 819 if cwl_version == "v1.0": | |
| 820 ram = 1024 | |
| 821 else: | |
| 822 ram = 256 | |
| 823 request = { | |
| 824 "coresMin": 1, | |
| 825 "coresMax": 1, | |
| 826 "ramMin": ram, | |
| 827 "ramMax": ram, | |
| 828 "tmpdirMin": 1024, | |
| 829 "tmpdirMax": 1024, | |
| 830 "outdirMin": 1024, | |
| 831 "outdirMax": 1024 | |
| 832 } # type: Dict[str, int] | |
| 833 for a in ("cores", "ram", "tmpdir", "outdir"): | |
| 834 mn = None | |
| 835 mx = None | |
| 836 if resourceReq.get(a + "Min"): | |
| 837 mn = eval_resource(builder, resourceReq[a + "Min"]) | |
| 838 if resourceReq.get(a + "Max"): | |
| 839 mx = eval_resource(builder, resourceReq[a + "Max"]) | |
| 840 if mn is None: | |
| 841 mn = mx | |
| 842 elif mx is None: | |
| 843 mx = mn | |
| 844 | |
| 845 if mn is not None: | |
| 846 request[a + "Min"] = cast(int, mn) | |
| 847 request[a + "Max"] = cast(int, mx) | |
| 848 | |
| 849 if runtimeContext.select_resources is not None: | |
| 850 return runtimeContext.select_resources(request, runtimeContext) | |
| 851 return { | |
| 852 "cores": request["coresMin"], | |
| 853 "ram": request["ramMin"], | |
| 854 "tmpdirSize": request["tmpdirMin"], | |
| 855 "outdirSize": request["outdirMin"], | |
| 856 } | |
| 857 | |
| 858 def validate_hints(self, avsc_names, hints, strict): | |
| 859 # type: (Any, List[Dict[Text, Any]], bool) -> None | |
| 860 for i, r in enumerate(hints): | |
| 861 sl = SourceLine(hints, i, validate.ValidationException) | |
| 862 with sl: | |
| 863 if avsc_names.get_name(r["class"], "") is not None and self.doc_loader is not None: | |
| 864 plain_hint = dict((key, r[key]) for key in r if key not in | |
| 865 self.doc_loader.identifiers) # strip identifiers | |
| 866 validate.validate_ex( | |
| 867 avsc_names.get_name(plain_hint["class"], ""), | |
| 868 plain_hint, strict=strict) | |
| 869 elif r["class"] in ("NetworkAccess", "LoadListingRequirement"): | |
| 870 pass | |
| 871 else: | |
| 872 _logger.info(Text(sl.makeError(u"Unknown hint %s" % (r["class"])))) | |
| 873 | |
| 874 def visit(self, op): # type: (Callable[[MutableMapping[Text, Any]], None]) -> None | |
| 875 op(self.tool) | |
| 876 | |
| 877 @abc.abstractmethod | |
| 878 def job(self, | |
| 879 job_order, # type: Mapping[Text, Text] | |
| 880 output_callbacks, # type: Callable[[Any, Any], Any] | |
| 881 runtimeContext # type: RuntimeContext | |
| 882 ): # type: (...) -> Generator[Any, None, None] | |
| 883 # FIXME: Declare base type for what Generator yields | |
| 884 pass | |
| 885 | |
| 886 | |
| 887 _names = set() # type: Set[Text] | |
| 888 | |
| 889 | |
| 890 def uniquename(stem, names=None): # type: (Text, Optional[Set[Text]]) -> Text | |
| 891 global _names | |
| 892 if names is None: | |
| 893 names = _names | |
| 894 c = 1 | |
| 895 u = stem | |
| 896 while u in names: | |
| 897 c += 1 | |
| 898 u = u"%s_%s" % (stem, c) | |
| 899 names.add(u) | |
| 900 return u | |
| 901 | |
| 902 | |
| 903 def nestdir(base, deps): | |
| 904 # type: (Text, Dict[Text, Any]) -> Dict[Text, Any] | |
| 905 dirname = os.path.dirname(base) + "/" | |
| 906 subid = deps["location"] | |
| 907 if subid.startswith(dirname): | |
| 908 s2 = subid[len(dirname):] | |
| 909 sp = s2.split('/') | |
| 910 sp.pop() | |
| 911 while sp: | |
| 912 nx = sp.pop() | |
| 913 deps = { | |
| 914 "class": "Directory", | |
| 915 "basename": nx, | |
| 916 "listing": [deps] | |
| 917 } | |
| 918 return deps | |
| 919 | |
| 920 | |
| 921 def mergedirs(listing): | |
| 922 # type: (List[Dict[Text, Any]]) -> List[Dict[Text, Any]] | |
| 923 r = [] # type: List[Dict[Text, Any]] | |
| 924 ents = {} # type: Dict[Text, Any] | |
| 925 collided = set() # type: Set[Text] | |
| 926 for e in listing: | |
| 927 if e["basename"] not in ents: | |
| 928 ents[e["basename"]] = e | |
| 929 elif e["class"] == "Directory": | |
| 930 if e.get("listing"): | |
| 931 ents[e["basename"]].setdefault("listing", []).extend(e["listing"]) | |
| 932 if ents[e["basename"]]["location"].startswith("_:"): | |
| 933 ents[e["basename"]]["location"] = e["location"] | |
| 934 elif e["location"] != ents[e["basename"]]["location"]: | |
| 935 # same basename, different location, collision, | |
| 936 # rename both. | |
| 937 collided.add(e["basename"]) | |
| 938 e2 = ents[e["basename"]] | |
| 939 | |
| 940 e["basename"] = urllib.parse.quote(e["location"], safe="") | |
| 941 e2["basename"] = urllib.parse.quote(e2["location"], safe="") | |
| 942 | |
| 943 e["nameroot"], e["nameext"] = os.path.splitext(e["basename"]) | |
| 944 e2["nameroot"], e2["nameext"] = os.path.splitext(e2["basename"]) | |
| 945 | |
| 946 ents[e["basename"]] = e | |
| 947 ents[e2["basename"]] = e2 | |
| 948 for c in collided: | |
| 949 del ents[c] | |
| 950 for e in itervalues(ents): | |
| 951 if e["class"] == "Directory" and "listing" in e: | |
| 952 e["listing"] = mergedirs(e["listing"]) | |
| 953 r.extend(itervalues(ents)) | |
| 954 return r | |
| 955 | |
| 956 | |
| 957 CWL_IANA = "https://www.iana.org/assignments/media-types/application/cwl" | |
| 958 | |
| 959 def scandeps(base, # type: Text | |
| 960 doc, # type: Any | |
| 961 reffields, # type: Set[Text] | |
| 962 urlfields, # type: Set[Text] | |
| 963 loadref, # type: Callable[[Text, Text], Text] | |
| 964 urljoin=urllib.parse.urljoin, # type: Callable[[Text, Text], Text] | |
| 965 nestdirs=True # type: bool | |
| 966 ): # type: (...) -> List[Dict[Text, Text]] | |
| 967 r = [] # type: List[Dict[Text, Text]] | |
| 968 if isinstance(doc, MutableMapping): | |
| 969 if "id" in doc: | |
| 970 if doc["id"].startswith("file://"): | |
| 971 df, _ = urllib.parse.urldefrag(doc["id"]) | |
| 972 if base != df: | |
| 973 r.append({ | |
| 974 "class": "File", | |
| 975 "location": df, | |
| 976 "format": CWL_IANA | |
| 977 }) | |
| 978 base = df | |
| 979 | |
| 980 if doc.get("class") in ("File", "Directory") and "location" in urlfields: | |
| 981 u = doc.get("location", doc.get("path")) | |
| 982 if u and not u.startswith("_:"): | |
| 983 deps = {"class": doc["class"], | |
| 984 "location": urljoin(base, u) | |
| 985 } # type: Dict[Text, Any] | |
| 986 if "basename" in doc: | |
| 987 deps["basename"] = doc["basename"] | |
| 988 if doc["class"] == "Directory" and "listing" in doc: | |
| 989 deps["listing"] = doc["listing"] | |
| 990 if doc["class"] == "File" and "secondaryFiles" in doc: | |
| 991 deps["secondaryFiles"] = doc["secondaryFiles"] | |
| 992 if nestdirs: | |
| 993 deps = nestdir(base, deps) | |
| 994 r.append(deps) | |
| 995 else: | |
| 996 if doc["class"] == "Directory" and "listing" in doc: | |
| 997 r.extend(scandeps( | |
| 998 base, doc["listing"], reffields, urlfields, loadref, | |
| 999 urljoin=urljoin, nestdirs=nestdirs)) | |
| 1000 elif doc["class"] == "File" and "secondaryFiles" in doc: | |
| 1001 r.extend(scandeps( | |
| 1002 base, doc["secondaryFiles"], reffields, urlfields, | |
| 1003 loadref, urljoin=urljoin, nestdirs=nestdirs)) | |
| 1004 | |
| 1005 for k, v in iteritems(doc): | |
| 1006 if k in reffields: | |
| 1007 for u in aslist(v): | |
| 1008 if isinstance(u, MutableMapping): | |
| 1009 r.extend(scandeps( | |
| 1010 base, u, reffields, urlfields, loadref, | |
| 1011 urljoin=urljoin, nestdirs=nestdirs)) | |
| 1012 else: | |
| 1013 subid = urljoin(base, u) | |
| 1014 basedf, _ = urllib.parse.urldefrag(base) | |
| 1015 subiddf, _ = urllib.parse.urldefrag(subid) | |
| 1016 if basedf == subiddf: | |
| 1017 continue | |
| 1018 sub = loadref(base, u) | |
| 1019 deps = { | |
| 1020 "class": "File", | |
| 1021 "location": subid, | |
| 1022 "format": CWL_IANA | |
| 1023 } | |
| 1024 sf = scandeps( | |
| 1025 subid, sub, reffields, urlfields, loadref, | |
| 1026 urljoin=urljoin, nestdirs=nestdirs) | |
| 1027 if sf: | |
| 1028 deps["secondaryFiles"] = sf | |
| 1029 if nestdirs: | |
| 1030 deps = nestdir(base, deps) | |
| 1031 r.append(deps) | |
| 1032 elif k in urlfields and k != "location": | |
| 1033 for u in aslist(v): | |
| 1034 deps = { | |
| 1035 "class": "File", | |
| 1036 "location": urljoin(base, u) | |
| 1037 } | |
| 1038 if nestdirs: | |
| 1039 deps = nestdir(base, deps) | |
| 1040 r.append(deps) | |
| 1041 elif k not in ("listing", "secondaryFiles"): | |
| 1042 r.extend(scandeps( | |
| 1043 base, v, reffields, urlfields, loadref, urljoin=urljoin, | |
| 1044 nestdirs=nestdirs)) | |
| 1045 elif isinstance(doc, MutableSequence): | |
| 1046 for d in doc: | |
| 1047 r.extend(scandeps( | |
| 1048 base, d, reffields, urlfields, loadref, urljoin=urljoin, | |
| 1049 nestdirs=nestdirs)) | |
| 1050 | |
| 1051 if r: | |
| 1052 normalizeFilesDirs(r) | |
| 1053 r = mergedirs(r) | |
| 1054 | |
| 1055 return r | |
| 1056 | |
| 1057 | |
| 1058 def compute_checksums(fs_access, fileobj): # type: (StdFsAccess, Dict[Text, Any]) -> None | |
| 1059 if "checksum" not in fileobj: | |
| 1060 checksum = hashlib.sha1() # nosec | |
| 1061 with fs_access.open(fileobj["location"], "rb") as f: | |
| 1062 contents = f.read(1024 * 1024) | |
| 1063 while contents != b"": | |
| 1064 checksum.update(contents) | |
| 1065 contents = f.read(1024 * 1024) | |
| 1066 fileobj["checksum"] = "sha1$%s" % checksum.hexdigest() | |
| 1067 fileobj["size"] = fs_access.size(fileobj["location"]) |
