Mercurial > repos > shellac > sam_consensus_v3
view env/lib/python3.9/site-packages/cwltool/builder.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line source
import copy import logging import math from typing import ( IO, Any, Callable, Dict, List, MutableMapping, MutableSequence, Optional, Set, Tuple, Union, cast, ) from rdflib import Graph, URIRef from rdflib.namespace import OWL, RDFS from ruamel.yaml.comments import CommentedMap from schema_salad.avro.schema import Names, Schema, make_avsc_object from schema_salad.exceptions import ValidationException from schema_salad.sourceline import SourceLine from schema_salad.utils import convert_to_dict, json_dumps from schema_salad.validate import validate from typing_extensions import TYPE_CHECKING, Type # pylint: disable=unused-import from . import expression from .errors import WorkflowException from .loghandler import _logger from .mutation import MutationManager from .software_requirements import DependenciesConfiguration from .stdfsaccess import StdFsAccess from .utils import ( CONTENT_LIMIT, CWLObjectType, CWLOutputType, aslist, docker_windows_path_adjust, get_listing, normalizeFilesDirs, onWindows, visit_class, ) if TYPE_CHECKING: from .pathmapper import PathMapper from .provenance_profile import ProvenanceProfile # pylint: disable=unused-import def content_limit_respected_read_bytes(f): # type: (IO[bytes]) -> bytes contents = f.read(CONTENT_LIMIT + 1) if len(contents) > CONTENT_LIMIT: raise WorkflowException( "file is too large, loadContents limited to %d bytes" % CONTENT_LIMIT ) return contents def content_limit_respected_read(f): # type: (IO[bytes]) -> str return content_limit_respected_read_bytes(f).decode("utf-8") def substitute(value, replace): # type: (str, str) -> str if replace.startswith("^"): try: return substitute(value[0 : value.rindex(".")], replace[1:]) except ValueError: # No extension to remove return value + replace.lstrip("^") return value + replace def formatSubclassOf( fmt: str, cls: str, ontology: Optional[Graph], visited: Set[str] ) -> bool: """Determine if `fmt` is a subclass of `cls`.""" if URIRef(fmt) == URIRef(cls): return True if ontology is None: return False if fmt in visited: return False visited.add(fmt) uriRefFmt = URIRef(fmt) for _s, _p, o in ontology.triples((uriRefFmt, RDFS.subClassOf, None)): # Find parent classes of `fmt` and search upward if formatSubclassOf(o, cls, ontology, visited): return True for _s, _p, o in ontology.triples((uriRefFmt, OWL.equivalentClass, None)): # Find equivalent classes of `fmt` and search horizontally if formatSubclassOf(o, cls, ontology, visited): return True for s, _p, _o in ontology.triples((None, OWL.equivalentClass, uriRefFmt)): # Find equivalent classes of `fmt` and search horizontally if formatSubclassOf(s, cls, ontology, visited): return True return False def check_format( actual_file: Union[CWLObjectType, List[CWLObjectType]], input_formats: Union[List[str], str], ontology: Optional[Graph], ) -> None: """Confirm that the format present is valid for the allowed formats.""" for afile in aslist(actual_file): if not afile: continue if "format" not in afile: raise ValidationException( "File has no 'format' defined: {}".format(json_dumps(afile, indent=4)) ) for inpf in aslist(input_formats): if afile["format"] == inpf or formatSubclassOf( afile["format"], inpf, ontology, set() ): return raise ValidationException( "File has an incompatible format: {}".format(json_dumps(afile, indent=4)) ) class HasReqsHints: """Base class for get_requirement().""" def __init__(self) -> None: """Initialize this reqs decorator.""" self.requirements = [] # type: List[CWLObjectType] self.hints = [] # type: List[CWLObjectType] def get_requirement( self, feature: str ) -> Tuple[Optional[CWLObjectType], Optional[bool]]: for item in reversed(self.requirements): if item["class"] == feature: return (item, True) for item in reversed(self.hints): if item["class"] == feature: return (item, False) return (None, None) class Builder(HasReqsHints): def __init__( self, job: CWLObjectType, files: List[CWLObjectType], bindings: List[CWLObjectType], schemaDefs: MutableMapping[str, CWLObjectType], names: Names, requirements: List[CWLObjectType], hints: List[CWLObjectType], resources: Dict[str, Union[int, float, str]], mutation_manager: Optional[MutationManager], formatgraph: Optional[Graph], make_fs_access: Type[StdFsAccess], fs_access: StdFsAccess, job_script_provider: Optional[DependenciesConfiguration], timeout: float, debug: bool, js_console: bool, force_docker_pull: bool, loadListing: str, outdir: str, tmpdir: str, stagedir: str, cwlVersion: str, ) -> None: """Initialize this Builder.""" self.job = job self.files = files self.bindings = bindings self.schemaDefs = schemaDefs self.names = names self.requirements = requirements self.hints = hints self.resources = resources self.mutation_manager = mutation_manager self.formatgraph = formatgraph self.make_fs_access = make_fs_access self.fs_access = fs_access self.job_script_provider = job_script_provider self.timeout = timeout self.debug = debug self.js_console = js_console self.force_docker_pull = force_docker_pull # One of "no_listing", "shallow_listing", "deep_listing" self.loadListing = loadListing self.outdir = outdir self.tmpdir = tmpdir self.stagedir = stagedir self.cwlVersion = cwlVersion self.pathmapper = None # type: Optional[PathMapper] self.prov_obj = None # type: Optional[ProvenanceProfile] self.find_default_container = None # type: Optional[Callable[[], str]] def build_job_script(self, commands: List[str]) -> Optional[str]: if self.job_script_provider is not None: return self.job_script_provider.build_job_script(self, commands) return None def bind_input( self, schema: CWLObjectType, datum: Union[CWLObjectType, List[CWLObjectType]], discover_secondaryFiles: bool, lead_pos: Optional[Union[int, List[int]]] = None, tail_pos: Optional[Union[str, List[int]]] = None, ) -> List[MutableMapping[str, Union[str, List[int]]]]: if tail_pos is None: tail_pos = [] if lead_pos is None: lead_pos = [] bindings = [] # type: List[MutableMapping[str, Union[str, List[int]]]] binding = ( {} ) # type: Union[MutableMapping[str, Union[str, List[int]]], CommentedMap] value_from_expression = False if "inputBinding" in schema and isinstance( schema["inputBinding"], MutableMapping ): binding = CommentedMap(schema["inputBinding"].items()) bp = list(aslist(lead_pos)) if "position" in binding: position = binding["position"] if isinstance(position, str): # no need to test the CWL Version # the schema for v1.0 only allow ints binding["position"] = self.do_eval(position, context=datum) bp.append(binding["position"]) else: bp.extend(aslist(binding["position"])) else: bp.append(0) bp.extend(aslist(tail_pos)) binding["position"] = bp binding["datum"] = datum if "valueFrom" in binding: value_from_expression = True # Handle union types if isinstance(schema["type"], MutableSequence): bound_input = False for t in schema["type"]: avsc = None # type: Optional[Schema] if isinstance(t, str) and self.names.has_name(t, None): avsc = self.names.get_name(t, None) elif ( isinstance(t, MutableMapping) and "name" in t and self.names.has_name(cast(str, t["name"]), None) ): avsc = self.names.get_name(cast(str, t["name"]), None) if not avsc: avsc = make_avsc_object(convert_to_dict(t), self.names) if validate(avsc, datum): schema = copy.deepcopy(schema) schema["type"] = t if not value_from_expression: return self.bind_input( schema, datum, lead_pos=lead_pos, tail_pos=tail_pos, discover_secondaryFiles=discover_secondaryFiles, ) else: self.bind_input( schema, datum, lead_pos=lead_pos, tail_pos=tail_pos, discover_secondaryFiles=discover_secondaryFiles, ) bound_input = True if not bound_input: raise ValidationException( "'{}' is not a valid union {}".format(datum, schema["type"]) ) elif isinstance(schema["type"], MutableMapping): st = copy.deepcopy(schema["type"]) if ( binding and "inputBinding" not in st and "type" in st and st["type"] == "array" and "itemSeparator" not in binding ): st["inputBinding"] = {} for k in ("secondaryFiles", "format", "streamable"): if k in schema: st[k] = schema[k] if value_from_expression: self.bind_input( st, datum, lead_pos=lead_pos, tail_pos=tail_pos, discover_secondaryFiles=discover_secondaryFiles, ) else: bindings.extend( self.bind_input( st, datum, lead_pos=lead_pos, tail_pos=tail_pos, discover_secondaryFiles=discover_secondaryFiles, ) ) else: if schema["type"] in self.schemaDefs: schema = self.schemaDefs[cast(str, schema["type"])] if schema["type"] == "record": datum = cast(CWLObjectType, datum) for f in cast(List[CWLObjectType], schema["fields"]): name = cast(str, f["name"]) if name in datum and datum[name] is not None: bindings.extend( self.bind_input( f, cast(CWLObjectType, datum[name]), lead_pos=lead_pos, tail_pos=name, discover_secondaryFiles=discover_secondaryFiles, ) ) else: datum[name] = f.get("default") if schema["type"] == "array": for n, item in enumerate(cast(MutableSequence[CWLObjectType], datum)): b2 = None if binding: b2 = cast(CWLObjectType, copy.deepcopy(binding)) b2["datum"] = item itemschema = { "type": schema["items"], "inputBinding": b2, } # type: CWLObjectType for k in ("secondaryFiles", "format", "streamable"): if k in schema: itemschema[k] = schema[k] bindings.extend( self.bind_input( itemschema, item, lead_pos=n, tail_pos=tail_pos, discover_secondaryFiles=discover_secondaryFiles, ) ) binding = {} def _capture_files(f: CWLObjectType) -> CWLObjectType: self.files.append(f) return f if schema["type"] == "File": datum = cast(CWLObjectType, datum) self.files.append(datum) loadContents_sourceline = ( None ) # type: Union[None, MutableMapping[str, Union[str, List[int]]], CWLObjectType] if binding and binding.get("loadContents"): loadContents_sourceline = binding elif schema.get("loadContents"): loadContents_sourceline = schema if loadContents_sourceline and loadContents_sourceline["loadContents"]: with SourceLine( loadContents_sourceline, "loadContents", WorkflowException ): try: with self.fs_access.open( cast(str, datum["location"]), "rb" ) as f2: datum["contents"] = content_limit_respected_read(f2) except Exception as e: raise Exception( "Reading {}\n{}".format(datum["location"], e) ) if "secondaryFiles" in schema: if "secondaryFiles" not in datum: datum["secondaryFiles"] = [] for sf in aslist(schema["secondaryFiles"]): if "required" in sf: sf_required = self.do_eval(sf["required"], context=datum) else: sf_required = True if "$(" in sf["pattern"] or "${" in sf["pattern"]: sfpath = self.do_eval(sf["pattern"], context=datum) else: sfpath = substitute( cast(str, datum["basename"]), sf["pattern"] ) for sfname in aslist(sfpath): if not sfname: continue found = False if isinstance(sfname, str): d_location = cast(str, datum["location"]) if "/" in d_location: sf_location = ( d_location[0 : d_location.rindex("/") + 1] + sfname ) else: sf_location = d_location + sfname sfbasename = sfname elif isinstance(sfname, MutableMapping): sf_location = sfname["location"] sfbasename = sfname["basename"] else: raise WorkflowException( "Expected secondaryFile expression to return type 'str' or 'MutableMapping', received '%s'" % (type(sfname)) ) for d in cast( MutableSequence[MutableMapping[str, str]], datum["secondaryFiles"], ): if not d.get("basename"): d["basename"] = d["location"][ d["location"].rindex("/") + 1 : ] if d["basename"] == sfbasename: found = True if not found: def addsf( files: MutableSequence[CWLObjectType], newsf: CWLObjectType, ) -> None: for f in files: if f["location"] == newsf["location"]: f["basename"] = newsf["basename"] return files.append(newsf) if isinstance(sfname, MutableMapping): addsf( cast( MutableSequence[CWLObjectType], datum["secondaryFiles"], ), sfname, ) elif discover_secondaryFiles and self.fs_access.exists( sf_location ): addsf( cast( MutableSequence[CWLObjectType], datum["secondaryFiles"], ), { "location": sf_location, "basename": sfname, "class": "File", }, ) elif sf_required: raise WorkflowException( "Missing required secondary file '%s' from file object: %s" % (sfname, json_dumps(datum, indent=4)) ) normalizeFilesDirs( cast(MutableSequence[CWLObjectType], datum["secondaryFiles"]) ) if "format" in schema: try: check_format( datum, cast(Union[List[str], str], self.do_eval(schema["format"])), self.formatgraph, ) except ValidationException as ve: raise WorkflowException( "Expected value of '%s' to have format %s but\n " " %s" % (schema["name"], schema["format"], ve) ) from ve visit_class( datum.get("secondaryFiles", []), ("File", "Directory"), _capture_files, ) if schema["type"] == "Directory": datum = cast(CWLObjectType, datum) ll = schema.get("loadListing") or self.loadListing if ll and ll != "no_listing": get_listing( self.fs_access, datum, (ll == "deep_listing"), ) self.files.append(datum) if schema["type"] == "Any": visit_class(datum, ("File", "Directory"), _capture_files) # Position to front of the sort key if binding: for bi in bindings: bi["position"] = cast(List[int], binding["position"]) + cast( List[int], bi["position"] ) bindings.append(binding) return bindings def tostr(self, value: Union[MutableMapping[str, str], Any]) -> str: if isinstance(value, MutableMapping) and value.get("class") in ( "File", "Directory", ): if "path" not in value: raise WorkflowException( '{} object missing "path": {}'.format(value["class"], value) ) # Path adjust for windows file path when passing to docker, docker accepts unix like path only (docker_req, docker_is_req) = self.get_requirement("DockerRequirement") if onWindows() and docker_req is not None: # docker_req is none only when there is no dockerRequirement # mentioned in hints and Requirement path = docker_windows_path_adjust(value["path"]) return path return value["path"] else: return str(value) def generate_arg(self, binding: CWLObjectType) -> List[str]: value = binding.get("datum") if "valueFrom" in binding: with SourceLine( binding, "valueFrom", WorkflowException, _logger.isEnabledFor(logging.DEBUG), ): value = self.do_eval(cast(str, binding["valueFrom"]), context=value) prefix = cast(Optional[str], binding.get("prefix")) sep = binding.get("separate", True) if prefix is None and not sep: with SourceLine( binding, "separate", WorkflowException, _logger.isEnabledFor(logging.DEBUG), ): raise WorkflowException( "'separate' option can not be specified without prefix" ) argl = [] # type: MutableSequence[CWLOutputType] if isinstance(value, MutableSequence): if binding.get("itemSeparator") and value: itemSeparator = cast(str, binding["itemSeparator"]) argl = [itemSeparator.join([self.tostr(v) for v in value])] elif binding.get("valueFrom"): value = [self.tostr(v) for v in value] return cast(List[str], ([prefix] if prefix else [])) + cast( List[str], value ) elif prefix and value: return [prefix] else: return [] elif isinstance(value, MutableMapping) and value.get("class") in ( "File", "Directory", ): argl = cast(MutableSequence[CWLOutputType], [value]) elif isinstance(value, MutableMapping): return [prefix] if prefix else [] elif value is True and prefix: return [prefix] elif value is False or value is None or (value is True and not prefix): return [] else: argl = [value] args = [] for j in argl: if sep: args.extend([prefix, self.tostr(j)]) else: args.append(self.tostr(j) if prefix is None else prefix + self.tostr(j)) return [a for a in args if a is not None] def do_eval( self, ex: Optional[CWLOutputType], context: Optional[Any] = None, recursive: bool = False, strip_whitespace: bool = True, ) -> Optional[CWLOutputType]: if recursive: if isinstance(ex, MutableMapping): return {k: self.do_eval(v, context, recursive) for k, v in ex.items()} if isinstance(ex, MutableSequence): return [self.do_eval(v, context, recursive) for v in ex] resources = self.resources if self.resources and "cores" in self.resources: cores = resources["cores"] if not isinstance(cores, str): resources = copy.copy(resources) resources["cores"] = int(math.ceil(cores)) return expression.do_eval( ex, self.job, self.requirements, self.outdir, self.tmpdir, resources, context=context, timeout=self.timeout, debug=self.debug, js_console=self.js_console, force_docker_pull=self.force_docker_pull, strip_whitespace=strip_whitespace, cwlVersion=self.cwlVersion, )