diff env/lib/python3.9/site-packages/cwltool/builder.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/cwltool/builder.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,661 @@
+import copy
+import logging
+import math
+from typing import (
+    IO,
+    Any,
+    Callable,
+    Dict,
+    List,
+    MutableMapping,
+    MutableSequence,
+    Optional,
+    Set,
+    Tuple,
+    Union,
+    cast,
+)
+
+from rdflib import Graph, URIRef
+from rdflib.namespace import OWL, RDFS
+from ruamel.yaml.comments import CommentedMap
+from schema_salad.avro.schema import Names, Schema, make_avsc_object
+from schema_salad.exceptions import ValidationException
+from schema_salad.sourceline import SourceLine
+from schema_salad.utils import convert_to_dict, json_dumps
+from schema_salad.validate import validate
+from typing_extensions import TYPE_CHECKING, Type  # pylint: disable=unused-import
+
+from . import expression
+from .errors import WorkflowException
+from .loghandler import _logger
+from .mutation import MutationManager
+from .software_requirements import DependenciesConfiguration
+from .stdfsaccess import StdFsAccess
+from .utils import (
+    CONTENT_LIMIT,
+    CWLObjectType,
+    CWLOutputType,
+    aslist,
+    docker_windows_path_adjust,
+    get_listing,
+    normalizeFilesDirs,
+    onWindows,
+    visit_class,
+)
+
+if TYPE_CHECKING:
+    from .pathmapper import PathMapper
+    from .provenance_profile import ProvenanceProfile  # pylint: disable=unused-import
+
+
+def content_limit_respected_read_bytes(f):  # type: (IO[bytes]) -> bytes
+    contents = f.read(CONTENT_LIMIT + 1)
+    if len(contents) > CONTENT_LIMIT:
+        raise WorkflowException(
+            "file is too large, loadContents limited to %d bytes" % CONTENT_LIMIT
+        )
+    return contents
+
+
+def content_limit_respected_read(f):  # type: (IO[bytes]) -> str
+    return content_limit_respected_read_bytes(f).decode("utf-8")
+
+
+def substitute(value, replace):  # type: (str, str) -> str
+    if replace.startswith("^"):
+        try:
+            return substitute(value[0 : value.rindex(".")], replace[1:])
+        except ValueError:
+            # No extension to remove
+            return value + replace.lstrip("^")
+    return value + replace
+
+
+def formatSubclassOf(
+    fmt: str, cls: str, ontology: Optional[Graph], visited: Set[str]
+) -> bool:
+    """Determine if `fmt` is a subclass of `cls`."""
+    if URIRef(fmt) == URIRef(cls):
+        return True
+
+    if ontology is None:
+        return False
+
+    if fmt in visited:
+        return False
+
+    visited.add(fmt)
+
+    uriRefFmt = URIRef(fmt)
+
+    for _s, _p, o in ontology.triples((uriRefFmt, RDFS.subClassOf, None)):
+        # Find parent classes of `fmt` and search upward
+        if formatSubclassOf(o, cls, ontology, visited):
+            return True
+
+    for _s, _p, o in ontology.triples((uriRefFmt, OWL.equivalentClass, None)):
+        # Find equivalent classes of `fmt` and search horizontally
+        if formatSubclassOf(o, cls, ontology, visited):
+            return True
+
+    for s, _p, _o in ontology.triples((None, OWL.equivalentClass, uriRefFmt)):
+        # Find equivalent classes of `fmt` and search horizontally
+        if formatSubclassOf(s, cls, ontology, visited):
+            return True
+
+    return False
+
+
+def check_format(
+    actual_file: Union[CWLObjectType, List[CWLObjectType]],
+    input_formats: Union[List[str], str],
+    ontology: Optional[Graph],
+) -> None:
+    """Confirm that the format present is valid for the allowed formats."""
+    for afile in aslist(actual_file):
+        if not afile:
+            continue
+        if "format" not in afile:
+            raise ValidationException(
+                "File has no 'format' defined: {}".format(json_dumps(afile, indent=4))
+            )
+        for inpf in aslist(input_formats):
+            if afile["format"] == inpf or formatSubclassOf(
+                afile["format"], inpf, ontology, set()
+            ):
+                return
+        raise ValidationException(
+            "File has an incompatible format: {}".format(json_dumps(afile, indent=4))
+        )
+
+
+class HasReqsHints:
+    """Base class for get_requirement()."""
+
+    def __init__(self) -> None:
+        """Initialize this reqs decorator."""
+        self.requirements = []  # type: List[CWLObjectType]
+        self.hints = []  # type: List[CWLObjectType]
+
+    def get_requirement(
+        self, feature: str
+    ) -> Tuple[Optional[CWLObjectType], Optional[bool]]:
+        for item in reversed(self.requirements):
+            if item["class"] == feature:
+                return (item, True)
+        for item in reversed(self.hints):
+            if item["class"] == feature:
+                return (item, False)
+        return (None, None)
+
+
+class Builder(HasReqsHints):
+    def __init__(
+        self,
+        job: CWLObjectType,
+        files: List[CWLObjectType],
+        bindings: List[CWLObjectType],
+        schemaDefs: MutableMapping[str, CWLObjectType],
+        names: Names,
+        requirements: List[CWLObjectType],
+        hints: List[CWLObjectType],
+        resources: Dict[str, Union[int, float, str]],
+        mutation_manager: Optional[MutationManager],
+        formatgraph: Optional[Graph],
+        make_fs_access: Type[StdFsAccess],
+        fs_access: StdFsAccess,
+        job_script_provider: Optional[DependenciesConfiguration],
+        timeout: float,
+        debug: bool,
+        js_console: bool,
+        force_docker_pull: bool,
+        loadListing: str,
+        outdir: str,
+        tmpdir: str,
+        stagedir: str,
+        cwlVersion: str,
+    ) -> None:
+        """Initialize this Builder."""
+        self.job = job
+        self.files = files
+        self.bindings = bindings
+        self.schemaDefs = schemaDefs
+        self.names = names
+        self.requirements = requirements
+        self.hints = hints
+        self.resources = resources
+        self.mutation_manager = mutation_manager
+        self.formatgraph = formatgraph
+
+        self.make_fs_access = make_fs_access
+        self.fs_access = fs_access
+
+        self.job_script_provider = job_script_provider
+
+        self.timeout = timeout
+
+        self.debug = debug
+        self.js_console = js_console
+        self.force_docker_pull = force_docker_pull
+
+        # One of "no_listing", "shallow_listing", "deep_listing"
+        self.loadListing = loadListing
+
+        self.outdir = outdir
+        self.tmpdir = tmpdir
+        self.stagedir = stagedir
+
+        self.cwlVersion = cwlVersion
+
+        self.pathmapper = None  # type: Optional[PathMapper]
+        self.prov_obj = None  # type: Optional[ProvenanceProfile]
+        self.find_default_container = None  # type: Optional[Callable[[], str]]
+
+    def build_job_script(self, commands: List[str]) -> Optional[str]:
+        if self.job_script_provider is not None:
+            return self.job_script_provider.build_job_script(self, commands)
+        return None
+
+    def bind_input(
+        self,
+        schema: CWLObjectType,
+        datum: Union[CWLObjectType, List[CWLObjectType]],
+        discover_secondaryFiles: bool,
+        lead_pos: Optional[Union[int, List[int]]] = None,
+        tail_pos: Optional[Union[str, List[int]]] = None,
+    ) -> List[MutableMapping[str, Union[str, List[int]]]]:
+
+        if tail_pos is None:
+            tail_pos = []
+        if lead_pos is None:
+            lead_pos = []
+
+        bindings = []  # type: List[MutableMapping[str, Union[str, List[int]]]]
+        binding = (
+            {}
+        )  # type: Union[MutableMapping[str, Union[str, List[int]]], CommentedMap]
+        value_from_expression = False
+        if "inputBinding" in schema and isinstance(
+            schema["inputBinding"], MutableMapping
+        ):
+            binding = CommentedMap(schema["inputBinding"].items())
+
+            bp = list(aslist(lead_pos))
+            if "position" in binding:
+                position = binding["position"]
+                if isinstance(position, str):  # no need to test the CWL Version
+                    # the schema for v1.0 only allow ints
+                    binding["position"] = self.do_eval(position, context=datum)
+                    bp.append(binding["position"])
+                else:
+                    bp.extend(aslist(binding["position"]))
+            else:
+                bp.append(0)
+            bp.extend(aslist(tail_pos))
+            binding["position"] = bp
+
+            binding["datum"] = datum
+            if "valueFrom" in binding:
+                value_from_expression = True
+
+        # Handle union types
+        if isinstance(schema["type"], MutableSequence):
+            bound_input = False
+            for t in schema["type"]:
+                avsc = None  # type: Optional[Schema]
+                if isinstance(t, str) and self.names.has_name(t, None):
+                    avsc = self.names.get_name(t, None)
+                elif (
+                    isinstance(t, MutableMapping)
+                    and "name" in t
+                    and self.names.has_name(cast(str, t["name"]), None)
+                ):
+                    avsc = self.names.get_name(cast(str, t["name"]), None)
+                if not avsc:
+                    avsc = make_avsc_object(convert_to_dict(t), self.names)
+                if validate(avsc, datum):
+                    schema = copy.deepcopy(schema)
+                    schema["type"] = t
+                    if not value_from_expression:
+                        return self.bind_input(
+                            schema,
+                            datum,
+                            lead_pos=lead_pos,
+                            tail_pos=tail_pos,
+                            discover_secondaryFiles=discover_secondaryFiles,
+                        )
+                    else:
+                        self.bind_input(
+                            schema,
+                            datum,
+                            lead_pos=lead_pos,
+                            tail_pos=tail_pos,
+                            discover_secondaryFiles=discover_secondaryFiles,
+                        )
+                        bound_input = True
+            if not bound_input:
+                raise ValidationException(
+                    "'{}' is not a valid union {}".format(datum, schema["type"])
+                )
+        elif isinstance(schema["type"], MutableMapping):
+            st = copy.deepcopy(schema["type"])
+            if (
+                binding
+                and "inputBinding" not in st
+                and "type" in st
+                and st["type"] == "array"
+                and "itemSeparator" not in binding
+            ):
+                st["inputBinding"] = {}
+            for k in ("secondaryFiles", "format", "streamable"):
+                if k in schema:
+                    st[k] = schema[k]
+            if value_from_expression:
+                self.bind_input(
+                    st,
+                    datum,
+                    lead_pos=lead_pos,
+                    tail_pos=tail_pos,
+                    discover_secondaryFiles=discover_secondaryFiles,
+                )
+            else:
+                bindings.extend(
+                    self.bind_input(
+                        st,
+                        datum,
+                        lead_pos=lead_pos,
+                        tail_pos=tail_pos,
+                        discover_secondaryFiles=discover_secondaryFiles,
+                    )
+                )
+        else:
+            if schema["type"] in self.schemaDefs:
+                schema = self.schemaDefs[cast(str, schema["type"])]
+
+            if schema["type"] == "record":
+                datum = cast(CWLObjectType, datum)
+                for f in cast(List[CWLObjectType], schema["fields"]):
+                    name = cast(str, f["name"])
+                    if name in datum and datum[name] is not None:
+                        bindings.extend(
+                            self.bind_input(
+                                f,
+                                cast(CWLObjectType, datum[name]),
+                                lead_pos=lead_pos,
+                                tail_pos=name,
+                                discover_secondaryFiles=discover_secondaryFiles,
+                            )
+                        )
+                    else:
+                        datum[name] = f.get("default")
+
+            if schema["type"] == "array":
+                for n, item in enumerate(cast(MutableSequence[CWLObjectType], datum)):
+                    b2 = None
+                    if binding:
+                        b2 = cast(CWLObjectType, copy.deepcopy(binding))
+                        b2["datum"] = item
+                    itemschema = {
+                        "type": schema["items"],
+                        "inputBinding": b2,
+                    }  # type: CWLObjectType
+                    for k in ("secondaryFiles", "format", "streamable"):
+                        if k in schema:
+                            itemschema[k] = schema[k]
+                    bindings.extend(
+                        self.bind_input(
+                            itemschema,
+                            item,
+                            lead_pos=n,
+                            tail_pos=tail_pos,
+                            discover_secondaryFiles=discover_secondaryFiles,
+                        )
+                    )
+                binding = {}
+
+            def _capture_files(f: CWLObjectType) -> CWLObjectType:
+                self.files.append(f)
+                return f
+
+            if schema["type"] == "File":
+                datum = cast(CWLObjectType, datum)
+                self.files.append(datum)
+
+                loadContents_sourceline = (
+                    None
+                )  # type: Union[None, MutableMapping[str, Union[str, List[int]]], CWLObjectType]
+                if binding and binding.get("loadContents"):
+                    loadContents_sourceline = binding
+                elif schema.get("loadContents"):
+                    loadContents_sourceline = schema
+
+                if loadContents_sourceline and loadContents_sourceline["loadContents"]:
+                    with SourceLine(
+                        loadContents_sourceline, "loadContents", WorkflowException
+                    ):
+                        try:
+                            with self.fs_access.open(
+                                cast(str, datum["location"]), "rb"
+                            ) as f2:
+                                datum["contents"] = content_limit_respected_read(f2)
+                        except Exception as e:
+                            raise Exception(
+                                "Reading {}\n{}".format(datum["location"], e)
+                            )
+
+                if "secondaryFiles" in schema:
+                    if "secondaryFiles" not in datum:
+                        datum["secondaryFiles"] = []
+                    for sf in aslist(schema["secondaryFiles"]):
+                        if "required" in sf:
+                            sf_required = self.do_eval(sf["required"], context=datum)
+                        else:
+                            sf_required = True
+
+                        if "$(" in sf["pattern"] or "${" in sf["pattern"]:
+                            sfpath = self.do_eval(sf["pattern"], context=datum)
+                        else:
+                            sfpath = substitute(
+                                cast(str, datum["basename"]), sf["pattern"]
+                            )
+
+                        for sfname in aslist(sfpath):
+                            if not sfname:
+                                continue
+                            found = False
+
+                            if isinstance(sfname, str):
+                                d_location = cast(str, datum["location"])
+                                if "/" in d_location:
+                                    sf_location = (
+                                        d_location[0 : d_location.rindex("/") + 1]
+                                        + sfname
+                                    )
+                                else:
+                                    sf_location = d_location + sfname
+                                sfbasename = sfname
+                            elif isinstance(sfname, MutableMapping):
+                                sf_location = sfname["location"]
+                                sfbasename = sfname["basename"]
+                            else:
+                                raise WorkflowException(
+                                    "Expected secondaryFile expression to return type 'str' or 'MutableMapping', received '%s'"
+                                    % (type(sfname))
+                                )
+
+                            for d in cast(
+                                MutableSequence[MutableMapping[str, str]],
+                                datum["secondaryFiles"],
+                            ):
+                                if not d.get("basename"):
+                                    d["basename"] = d["location"][
+                                        d["location"].rindex("/") + 1 :
+                                    ]
+                                if d["basename"] == sfbasename:
+                                    found = True
+
+                            if not found:
+
+                                def addsf(
+                                    files: MutableSequence[CWLObjectType],
+                                    newsf: CWLObjectType,
+                                ) -> None:
+                                    for f in files:
+                                        if f["location"] == newsf["location"]:
+                                            f["basename"] = newsf["basename"]
+                                            return
+                                    files.append(newsf)
+
+                                if isinstance(sfname, MutableMapping):
+                                    addsf(
+                                        cast(
+                                            MutableSequence[CWLObjectType],
+                                            datum["secondaryFiles"],
+                                        ),
+                                        sfname,
+                                    )
+                                elif discover_secondaryFiles and self.fs_access.exists(
+                                    sf_location
+                                ):
+                                    addsf(
+                                        cast(
+                                            MutableSequence[CWLObjectType],
+                                            datum["secondaryFiles"],
+                                        ),
+                                        {
+                                            "location": sf_location,
+                                            "basename": sfname,
+                                            "class": "File",
+                                        },
+                                    )
+                                elif sf_required:
+                                    raise WorkflowException(
+                                        "Missing required secondary file '%s' from file object: %s"
+                                        % (sfname, json_dumps(datum, indent=4))
+                                    )
+
+                    normalizeFilesDirs(
+                        cast(MutableSequence[CWLObjectType], datum["secondaryFiles"])
+                    )
+
+                if "format" in schema:
+                    try:
+                        check_format(
+                            datum,
+                            cast(Union[List[str], str], self.do_eval(schema["format"])),
+                            self.formatgraph,
+                        )
+                    except ValidationException as ve:
+                        raise WorkflowException(
+                            "Expected value of '%s' to have format %s but\n "
+                            " %s" % (schema["name"], schema["format"], ve)
+                        ) from ve
+
+                visit_class(
+                    datum.get("secondaryFiles", []),
+                    ("File", "Directory"),
+                    _capture_files,
+                )
+
+            if schema["type"] == "Directory":
+                datum = cast(CWLObjectType, datum)
+                ll = schema.get("loadListing") or self.loadListing
+                if ll and ll != "no_listing":
+                    get_listing(
+                        self.fs_access,
+                        datum,
+                        (ll == "deep_listing"),
+                    )
+                self.files.append(datum)
+
+            if schema["type"] == "Any":
+                visit_class(datum, ("File", "Directory"), _capture_files)
+
+        # Position to front of the sort key
+        if binding:
+            for bi in bindings:
+                bi["position"] = cast(List[int], binding["position"]) + cast(
+                    List[int], bi["position"]
+                )
+            bindings.append(binding)
+
+        return bindings
+
+    def tostr(self, value: Union[MutableMapping[str, str], Any]) -> str:
+        if isinstance(value, MutableMapping) and value.get("class") in (
+            "File",
+            "Directory",
+        ):
+            if "path" not in value:
+                raise WorkflowException(
+                    '{} object missing "path": {}'.format(value["class"], value)
+                )
+
+            # Path adjust for windows file path when passing to docker, docker accepts unix like path only
+            (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
+            if onWindows() and docker_req is not None:
+                # docker_req is none only when there is no dockerRequirement
+                # mentioned in hints and Requirement
+                path = docker_windows_path_adjust(value["path"])
+                return path
+            return value["path"]
+        else:
+            return str(value)
+
+    def generate_arg(self, binding: CWLObjectType) -> List[str]:
+        value = binding.get("datum")
+        if "valueFrom" in binding:
+            with SourceLine(
+                binding,
+                "valueFrom",
+                WorkflowException,
+                _logger.isEnabledFor(logging.DEBUG),
+            ):
+                value = self.do_eval(cast(str, binding["valueFrom"]), context=value)
+
+        prefix = cast(Optional[str], binding.get("prefix"))
+        sep = binding.get("separate", True)
+        if prefix is None and not sep:
+            with SourceLine(
+                binding,
+                "separate",
+                WorkflowException,
+                _logger.isEnabledFor(logging.DEBUG),
+            ):
+                raise WorkflowException(
+                    "'separate' option can not be specified without prefix"
+                )
+
+        argl = []  # type: MutableSequence[CWLOutputType]
+        if isinstance(value, MutableSequence):
+            if binding.get("itemSeparator") and value:
+                itemSeparator = cast(str, binding["itemSeparator"])
+                argl = [itemSeparator.join([self.tostr(v) for v in value])]
+            elif binding.get("valueFrom"):
+                value = [self.tostr(v) for v in value]
+                return cast(List[str], ([prefix] if prefix else [])) + cast(
+                    List[str], value
+                )
+            elif prefix and value:
+                return [prefix]
+            else:
+                return []
+        elif isinstance(value, MutableMapping) and value.get("class") in (
+            "File",
+            "Directory",
+        ):
+            argl = cast(MutableSequence[CWLOutputType], [value])
+        elif isinstance(value, MutableMapping):
+            return [prefix] if prefix else []
+        elif value is True and prefix:
+            return [prefix]
+        elif value is False or value is None or (value is True and not prefix):
+            return []
+        else:
+            argl = [value]
+
+        args = []
+        for j in argl:
+            if sep:
+                args.extend([prefix, self.tostr(j)])
+            else:
+                args.append(self.tostr(j) if prefix is None else prefix + self.tostr(j))
+
+        return [a for a in args if a is not None]
+
+    def do_eval(
+        self,
+        ex: Optional[CWLOutputType],
+        context: Optional[Any] = None,
+        recursive: bool = False,
+        strip_whitespace: bool = True,
+    ) -> Optional[CWLOutputType]:
+        if recursive:
+            if isinstance(ex, MutableMapping):
+                return {k: self.do_eval(v, context, recursive) for k, v in ex.items()}
+            if isinstance(ex, MutableSequence):
+                return [self.do_eval(v, context, recursive) for v in ex]
+
+        resources = self.resources
+        if self.resources and "cores" in self.resources:
+            cores = resources["cores"]
+            if not isinstance(cores, str):
+                resources = copy.copy(resources)
+                resources["cores"] = int(math.ceil(cores))
+
+        return expression.do_eval(
+            ex,
+            self.job,
+            self.requirements,
+            self.outdir,
+            self.tmpdir,
+            resources,
+            context=context,
+            timeout=self.timeout,
+            debug=self.debug,
+            js_console=self.js_console,
+            force_docker_pull=self.force_docker_pull,
+            strip_whitespace=strip_whitespace,
+            cwlVersion=self.cwlVersion,
+        )