Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/cwltool/builder.py @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
| author | guerler |
|---|---|
| date | Fri, 31 Jul 2020 00:18:57 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:d30785e31577 |
|---|---|
| 1 from __future__ import absolute_import | |
| 2 | |
| 3 import copy | |
| 4 import os | |
| 5 import logging | |
| 6 from typing import (Any, Callable, Dict, List, MutableMapping, MutableSequence, | |
| 7 Optional, Set, Tuple, Union) | |
| 8 | |
| 9 from typing_extensions import Text, Type, TYPE_CHECKING # pylint: disable=unused-import | |
| 10 # move to a regular typing import when Python 3.3-3.6 is no longer supported | |
| 11 | |
| 12 from rdflib import Graph, URIRef # pylint: disable=unused-import | |
| 13 from rdflib.namespace import OWL, RDFS | |
| 14 from ruamel.yaml.comments import CommentedMap | |
| 15 from schema_salad import validate | |
| 16 from schema_salad.schema import Names, convert_to_dict | |
| 17 from schema_salad.avro.schema import make_avsc_object, Schema | |
| 18 from schema_salad.sourceline import SourceLine | |
| 19 from schema_salad.ref_resolver import uri_file_path | |
| 20 from six import iteritems, string_types | |
| 21 from future.utils import raise_from | |
| 22 from typing import IO | |
| 23 from typing_extensions import (TYPE_CHECKING, # pylint: disable=unused-import | |
| 24 Text, Type) | |
| 25 # move to a regular typing import when Python 3.3-3.6 is no longer supported | |
| 26 | |
| 27 from . import expression | |
| 28 from .errors import WorkflowException | |
| 29 from .loghandler import _logger | |
| 30 from .mutation import MutationManager # pylint: disable=unused-import | |
| 31 from .pathmapper import PathMapper # pylint: disable=unused-import | |
| 32 from .pathmapper import CONTENT_LIMIT, get_listing, normalizeFilesDirs, visit_class | |
| 33 from .stdfsaccess import StdFsAccess # pylint: disable=unused-import | |
| 34 from .utils import aslist, docker_windows_path_adjust, json_dumps, onWindows | |
| 35 | |
| 36 | |
| 37 | |
| 38 if TYPE_CHECKING: | |
| 39 from .provenance import ProvenanceProfile # pylint: disable=unused-import | |
| 40 | |
| 41 | |
| 42 def content_limit_respected_read_bytes(f): # type: (IO[bytes]) -> bytes | |
| 43 contents = f.read(CONTENT_LIMIT + 1) | |
| 44 if len(contents) > CONTENT_LIMIT: | |
| 45 raise WorkflowException("loadContents handling encountered buffer that is exceeds maximum lenght of %d bytes" % CONTENT_LIMIT) | |
| 46 return contents | |
| 47 | |
| 48 | |
| 49 def content_limit_respected_read(f): # type: (IO[bytes]) -> Text | |
| 50 return content_limit_respected_read_bytes(f).decode("utf-8") | |
| 51 | |
| 52 | |
| 53 def substitute(value, replace): # type: (Text, Text) -> Text | |
| 54 if replace.startswith("^"): | |
| 55 try: | |
| 56 return substitute(value[0:value.rindex('.')], replace[1:]) | |
| 57 except ValueError: | |
| 58 # No extension to remove | |
| 59 return value + replace.lstrip("^") | |
| 60 return value + replace | |
| 61 | |
| 62 def formatSubclassOf(fmt, cls, ontology, visited): | |
| 63 # type: (Text, Text, Optional[Graph], Set[Text]) -> bool | |
| 64 """Determine if `fmt` is a subclass of `cls`.""" | |
| 65 if URIRef(fmt) == URIRef(cls): | |
| 66 return True | |
| 67 | |
| 68 if ontology is None: | |
| 69 return False | |
| 70 | |
| 71 if fmt in visited: | |
| 72 return False | |
| 73 | |
| 74 visited.add(fmt) | |
| 75 | |
| 76 uriRefFmt = URIRef(fmt) | |
| 77 | |
| 78 for s, p, o in ontology.triples((uriRefFmt, RDFS.subClassOf, None)): | |
| 79 # Find parent classes of `fmt` and search upward | |
| 80 if formatSubclassOf(o, cls, ontology, visited): | |
| 81 return True | |
| 82 | |
| 83 for s, p, o in ontology.triples((uriRefFmt, OWL.equivalentClass, None)): | |
| 84 # Find equivalent classes of `fmt` and search horizontally | |
| 85 if formatSubclassOf(o, cls, ontology, visited): | |
| 86 return True | |
| 87 | |
| 88 for s, p, o in ontology.triples((None, OWL.equivalentClass, uriRefFmt)): | |
| 89 # Find equivalent classes of `fmt` and search horizontally | |
| 90 if formatSubclassOf(s, cls, ontology, visited): | |
| 91 return True | |
| 92 | |
| 93 return False | |
| 94 | |
| 95 def check_format(actual_file, # type: Union[Dict[Text, Any], List[Dict[Text, Any]], Text] | |
| 96 input_formats, # type: Union[List[Text], Text] | |
| 97 ontology # type: Optional[Graph] | |
| 98 ): # type: (...) -> None | |
| 99 """Confirm that the format present is valid for the allowed formats.""" | |
| 100 for afile in aslist(actual_file): | |
| 101 if not afile: | |
| 102 continue | |
| 103 if "format" not in afile: | |
| 104 raise validate.ValidationException( | |
| 105 u"File has no 'format' defined: {}".format( | |
| 106 json_dumps(afile, indent=4))) | |
| 107 for inpf in aslist(input_formats): | |
| 108 if afile["format"] == inpf or \ | |
| 109 formatSubclassOf(afile["format"], inpf, ontology, set()): | |
| 110 return | |
| 111 raise validate.ValidationException( | |
| 112 u"File has an incompatible format: {}".format( | |
| 113 json_dumps(afile, indent=4))) | |
| 114 | |
| 115 class HasReqsHints(object): | |
| 116 def __init__(self): # type: () -> None | |
| 117 """Initialize this reqs decorator.""" | |
| 118 self.requirements = [] # type: List[Dict[Text, Any]] | |
| 119 self.hints = [] # type: List[Dict[Text, Any]] | |
| 120 | |
| 121 def get_requirement(self, | |
| 122 feature # type: Text | |
| 123 ): # type: (...) -> Tuple[Optional[Any], Optional[bool]] | |
| 124 for item in reversed(self.requirements): | |
| 125 if item["class"] == feature: | |
| 126 return (item, True) | |
| 127 for item in reversed(self.hints): | |
| 128 if item["class"] == feature: | |
| 129 return (item, False) | |
| 130 return (None, None) | |
| 131 | |
| 132 class Builder(HasReqsHints): | |
| 133 def __init__(self, | |
| 134 job, # type: Dict[Text, expression.JSON] | |
| 135 files, # type: List[Dict[Text, Text]] | |
| 136 bindings, # type: List[Dict[Text, Any]] | |
| 137 schemaDefs, # type: Dict[Text, Dict[Text, Any]] | |
| 138 names, # type: Names | |
| 139 requirements, # type: List[Dict[Text, Any]] | |
| 140 hints, # type: List[Dict[Text, Any]] | |
| 141 resources, # type: Dict[str, int] | |
| 142 mutation_manager, # type: Optional[MutationManager] | |
| 143 formatgraph, # type: Optional[Graph] | |
| 144 make_fs_access, # type: Type[StdFsAccess] | |
| 145 fs_access, # type: StdFsAccess | |
| 146 job_script_provider, # type: Optional[Any] | |
| 147 timeout, # type: float | |
| 148 debug, # type: bool | |
| 149 js_console, # type: bool | |
| 150 force_docker_pull, # type: bool | |
| 151 loadListing, # type: Text | |
| 152 outdir, # type: Text | |
| 153 tmpdir, # type: Text | |
| 154 stagedir # type: Text | |
| 155 ): # type: (...) -> None | |
| 156 """Initialize this Builder.""" | |
| 157 self.job = job | |
| 158 self.files = files | |
| 159 self.bindings = bindings | |
| 160 self.schemaDefs = schemaDefs | |
| 161 self.names = names | |
| 162 self.requirements = requirements | |
| 163 self.hints = hints | |
| 164 self.resources = resources | |
| 165 self.mutation_manager = mutation_manager | |
| 166 self.formatgraph = formatgraph | |
| 167 | |
| 168 self.make_fs_access = make_fs_access | |
| 169 self.fs_access = fs_access | |
| 170 | |
| 171 self.job_script_provider = job_script_provider | |
| 172 | |
| 173 self.timeout = timeout | |
| 174 | |
| 175 self.debug = debug | |
| 176 self.js_console = js_console | |
| 177 self.force_docker_pull = force_docker_pull | |
| 178 | |
| 179 # One of "no_listing", "shallow_listing", "deep_listing" | |
| 180 self.loadListing = loadListing | |
| 181 | |
| 182 self.outdir = outdir | |
| 183 self.tmpdir = tmpdir | |
| 184 self.stagedir = stagedir | |
| 185 | |
| 186 self.pathmapper = None # type: Optional[PathMapper] | |
| 187 self.prov_obj = None # type: Optional[ProvenanceProfile] | |
| 188 self.find_default_container = None # type: Optional[Callable[[], Text]] | |
| 189 | |
| 190 def build_job_script(self, commands): | |
| 191 # type: (List[Text]) -> Text | |
| 192 build_job_script_method = getattr(self.job_script_provider, "build_job_script", None) # type: Callable[[Builder, Union[List[str],List[Text]]], Text] | |
| 193 if build_job_script_method is not None: | |
| 194 return build_job_script_method(self, commands) | |
| 195 return None | |
| 196 | |
| 197 def bind_input(self, | |
| 198 schema, # type: MutableMapping[Text, Any] | |
| 199 datum, # type: Any | |
| 200 discover_secondaryFiles, # type: bool | |
| 201 lead_pos=None, # type: Optional[Union[int, List[int]]] | |
| 202 tail_pos=None, # type: Optional[List[int]] | |
| 203 ): # type: (...) -> List[MutableMapping[Text, Any]] | |
| 204 | |
| 205 if tail_pos is None: | |
| 206 tail_pos = [] | |
| 207 if lead_pos is None: | |
| 208 lead_pos = [] | |
| 209 | |
| 210 bindings = [] # type: List[MutableMapping[Text, Text]] | |
| 211 binding = {} # type: Union[MutableMapping[Text, Text], CommentedMap] | |
| 212 value_from_expression = False | |
| 213 if "inputBinding" in schema and isinstance(schema["inputBinding"], MutableMapping): | |
| 214 binding = CommentedMap(schema["inputBinding"].items()) | |
| 215 | |
| 216 bp = list(aslist(lead_pos)) | |
| 217 if "position" in binding: | |
| 218 position = binding["position"] | |
| 219 if isinstance(position, str): # no need to test the CWL Version | |
| 220 # the schema for v1.0 only allow ints | |
| 221 binding['position'] = self.do_eval(position, context=datum) | |
| 222 bp.append(binding['position']) | |
| 223 else: | |
| 224 bp.extend(aslist(binding['position'])) | |
| 225 else: | |
| 226 bp.append(0) | |
| 227 bp.extend(aslist(tail_pos)) | |
| 228 binding["position"] = bp | |
| 229 | |
| 230 binding["datum"] = datum | |
| 231 if "valueFrom" in binding: | |
| 232 value_from_expression = True | |
| 233 | |
| 234 # Handle union types | |
| 235 if isinstance(schema["type"], MutableSequence): | |
| 236 bound_input = False | |
| 237 for t in schema["type"]: | |
| 238 avsc = None # type: Optional[Schema] | |
| 239 if isinstance(t, string_types) and self.names.has_name(t, ""): | |
| 240 avsc = self.names.get_name(t, "") | |
| 241 elif isinstance(t, MutableMapping) and "name" in t and self.names.has_name(t["name"], ""): | |
| 242 avsc = self.names.get_name(t["name"], "") | |
| 243 if not avsc: | |
| 244 avsc = make_avsc_object(convert_to_dict(t), self.names) | |
| 245 if validate.validate(avsc, datum): | |
| 246 schema = copy.deepcopy(schema) | |
| 247 schema["type"] = t | |
| 248 if not value_from_expression: | |
| 249 return self.bind_input(schema, datum, lead_pos=lead_pos, tail_pos=tail_pos, discover_secondaryFiles=discover_secondaryFiles) | |
| 250 else: | |
| 251 self.bind_input(schema, datum, lead_pos=lead_pos, tail_pos=tail_pos, discover_secondaryFiles=discover_secondaryFiles) | |
| 252 bound_input = True | |
| 253 if not bound_input: | |
| 254 raise validate.ValidationException(u"'%s' is not a valid union %s" % (datum, schema["type"])) | |
| 255 elif isinstance(schema["type"], MutableMapping): | |
| 256 st = copy.deepcopy(schema["type"]) | |
| 257 if binding and "inputBinding" not in st\ | |
| 258 and "type" in st\ | |
| 259 and st["type"] == "array"\ | |
| 260 and "itemSeparator" not in binding: | |
| 261 st["inputBinding"] = {} | |
| 262 for k in ("secondaryFiles", "format", "streamable"): | |
| 263 if k in schema: | |
| 264 st[k] = schema[k] | |
| 265 if value_from_expression: | |
| 266 self.bind_input(st, datum, lead_pos=lead_pos, tail_pos=tail_pos, discover_secondaryFiles=discover_secondaryFiles) | |
| 267 else: | |
| 268 bindings.extend(self.bind_input(st, datum, lead_pos=lead_pos, tail_pos=tail_pos, discover_secondaryFiles=discover_secondaryFiles)) | |
| 269 else: | |
| 270 if schema["type"] in self.schemaDefs: | |
| 271 schema = self.schemaDefs[schema["type"]] | |
| 272 | |
| 273 if schema["type"] == "record": | |
| 274 for f in schema["fields"]: | |
| 275 if f["name"] in datum and datum[f["name"]] is not None: | |
| 276 bindings.extend(self.bind_input(f, datum[f["name"]], lead_pos=lead_pos, tail_pos=f["name"], discover_secondaryFiles=discover_secondaryFiles)) | |
| 277 else: | |
| 278 datum[f["name"]] = f.get("default") | |
| 279 | |
| 280 if schema["type"] == "array": | |
| 281 for n, item in enumerate(datum): | |
| 282 b2 = None | |
| 283 if binding: | |
| 284 b2 = copy.deepcopy(binding) | |
| 285 b2["datum"] = item | |
| 286 itemschema = { | |
| 287 u"type": schema["items"], | |
| 288 u"inputBinding": b2 | |
| 289 } | |
| 290 for k in ("secondaryFiles", "format", "streamable"): | |
| 291 if k in schema: | |
| 292 itemschema[k] = schema[k] | |
| 293 bindings.extend( | |
| 294 self.bind_input(itemschema, item, lead_pos=n, tail_pos=tail_pos, discover_secondaryFiles=discover_secondaryFiles)) | |
| 295 binding = {} | |
| 296 | |
| 297 def _capture_files(f): # type: (Dict[Text, Text]) -> Dict[Text, Text] | |
| 298 self.files.append(f) | |
| 299 return f | |
| 300 | |
| 301 if schema["type"] == "File": | |
| 302 self.files.append(datum) | |
| 303 if (binding and binding.get("loadContents")) or schema.get("loadContents"): | |
| 304 with self.fs_access.open(datum["location"], "rb") as f: | |
| 305 datum["contents"] = content_limit_respected_read(f) | |
| 306 | |
| 307 if "secondaryFiles" in schema: | |
| 308 if "secondaryFiles" not in datum: | |
| 309 datum["secondaryFiles"] = [] | |
| 310 for sf in aslist(schema["secondaryFiles"]): | |
| 311 if 'required' in sf: | |
| 312 sf_required = self.do_eval(sf['required'], context=datum) | |
| 313 else: | |
| 314 sf_required = True | |
| 315 | |
| 316 | |
| 317 if "$(" in sf["pattern"] or "${" in sf["pattern"]: | |
| 318 sfpath = self.do_eval(sf["pattern"], context=datum) | |
| 319 else: | |
| 320 sfpath = substitute(datum["basename"], sf["pattern"]) | |
| 321 | |
| 322 for sfname in aslist(sfpath): | |
| 323 if not sfname: | |
| 324 continue | |
| 325 found = False | |
| 326 for d in datum["secondaryFiles"]: | |
| 327 if not d.get("basename"): | |
| 328 d["basename"] = d["location"][d["location"].rindex("/")+1:] | |
| 329 if d["basename"] == sfname: | |
| 330 found = True | |
| 331 if not found: | |
| 332 sf_location = datum["location"][0:datum["location"].rindex("/")+1]+sfname | |
| 333 if isinstance(sfname, MutableMapping): | |
| 334 datum["secondaryFiles"].append(sfname) | |
| 335 elif discover_secondaryFiles and self.fs_access.exists(sf_location): | |
| 336 datum["secondaryFiles"].append({ | |
| 337 "location": sf_location, | |
| 338 "basename": sfname, | |
| 339 "class": "File"}) | |
| 340 elif sf_required: | |
| 341 raise WorkflowException("Missing required secondary file '%s' from file object: %s" % ( | |
| 342 sfname, json_dumps(datum, indent=4))) | |
| 343 | |
| 344 normalizeFilesDirs(datum["secondaryFiles"]) | |
| 345 | |
| 346 if "format" in schema: | |
| 347 try: | |
| 348 check_format(datum, self.do_eval(schema["format"]), | |
| 349 self.formatgraph) | |
| 350 except validate.ValidationException as ve: | |
| 351 raise_from(WorkflowException( | |
| 352 "Expected value of '%s' to have format %s but\n " | |
| 353 " %s" % (schema["name"], schema["format"], ve)), ve) | |
| 354 | |
| 355 visit_class(datum.get("secondaryFiles", []), ("File", "Directory"), _capture_files) | |
| 356 | |
| 357 if schema["type"] == "Directory": | |
| 358 ll = schema.get("loadListing") or self.loadListing | |
| 359 if ll and ll != "no_listing": | |
| 360 get_listing(self.fs_access, datum, (ll == "deep_listing")) | |
| 361 self.files.append(datum) | |
| 362 | |
| 363 if schema["type"] == "Any": | |
| 364 visit_class(datum, ("File", "Directory"), _capture_files) | |
| 365 | |
| 366 # Position to front of the sort key | |
| 367 if binding: | |
| 368 for bi in bindings: | |
| 369 bi["position"] = binding["position"] + bi["position"] | |
| 370 bindings.append(binding) | |
| 371 | |
| 372 return bindings | |
| 373 | |
| 374 def tostr(self, value): # type: (Union[MutableMapping[Text, Text], Any]) -> Text | |
| 375 if isinstance(value, MutableMapping) and value.get("class") in ("File", "Directory"): | |
| 376 if "path" not in value: | |
| 377 raise WorkflowException(u"%s object missing \"path\": %s" % (value["class"], value)) | |
| 378 | |
| 379 # Path adjust for windows file path when passing to docker, docker accepts unix like path only | |
| 380 (docker_req, docker_is_req) = self.get_requirement("DockerRequirement") | |
| 381 if onWindows() and docker_req is not None: | |
| 382 # docker_req is none only when there is no dockerRequirement | |
| 383 # mentioned in hints and Requirement | |
| 384 path = docker_windows_path_adjust(value["path"]) | |
| 385 return path | |
| 386 return value["path"] | |
| 387 else: | |
| 388 return Text(value) | |
| 389 | |
| 390 def generate_arg(self, binding): # type: (Dict[Text, Any]) -> List[Text] | |
| 391 value = binding.get("datum") | |
| 392 if "valueFrom" in binding: | |
| 393 with SourceLine(binding, "valueFrom", WorkflowException, _logger.isEnabledFor(logging.DEBUG)): | |
| 394 value = self.do_eval(binding["valueFrom"], context=value) | |
| 395 | |
| 396 prefix = binding.get("prefix") # type: Optional[Text] | |
| 397 sep = binding.get("separate", True) | |
| 398 if prefix is None and not sep: | |
| 399 with SourceLine(binding, "separate", WorkflowException, _logger.isEnabledFor(logging.DEBUG)): | |
| 400 raise WorkflowException("'separate' option can not be specified without prefix") | |
| 401 | |
| 402 argl = [] # type: MutableSequence[MutableMapping[Text, Text]] | |
| 403 if isinstance(value, MutableSequence): | |
| 404 if binding.get("itemSeparator") and value: | |
| 405 argl = [binding["itemSeparator"].join([self.tostr(v) for v in value])] | |
| 406 elif binding.get("valueFrom"): | |
| 407 value = [self.tostr(v) for v in value] | |
| 408 return ([prefix] if prefix else []) + value | |
| 409 elif prefix and value: | |
| 410 return [prefix] | |
| 411 else: | |
| 412 return [] | |
| 413 elif isinstance(value, MutableMapping) and value.get("class") in ("File", "Directory"): | |
| 414 argl = [value] | |
| 415 elif isinstance(value, MutableMapping): | |
| 416 return [prefix] if prefix else [] | |
| 417 elif value is True and prefix: | |
| 418 return [prefix] | |
| 419 elif value is False or value is None or (value is True and not prefix): | |
| 420 return [] | |
| 421 else: | |
| 422 argl = [value] | |
| 423 | |
| 424 args = [] | |
| 425 for j in argl: | |
| 426 if sep: | |
| 427 args.extend([prefix, self.tostr(j)]) | |
| 428 else: | |
| 429 args.append(self.tostr(j) if prefix is None else prefix + self.tostr(j)) | |
| 430 | |
| 431 return [a for a in args if a is not None] | |
| 432 | |
| 433 def do_eval(self, ex, context=None, recursive=False, strip_whitespace=True): | |
| 434 # type: (Union[Dict[Text, Text], Text], Any, bool, bool) -> Any | |
| 435 if recursive: | |
| 436 if isinstance(ex, MutableMapping): | |
| 437 return {k: self.do_eval(v, context, recursive) | |
| 438 for k, v in iteritems(ex)} | |
| 439 if isinstance(ex, MutableSequence): | |
| 440 return [self.do_eval(v, context, recursive) | |
| 441 for v in ex] | |
| 442 | |
| 443 return expression.do_eval(ex, self.job, self.requirements, | |
| 444 self.outdir, self.tmpdir, | |
| 445 self.resources, | |
| 446 context=context, | |
| 447 timeout=self.timeout, | |
| 448 debug=self.debug, | |
| 449 js_console=self.js_console, | |
| 450 force_docker_pull=self.force_docker_pull, | |
| 451 strip_whitespace=strip_whitespace) |
