diff env/lib/python3.9/site-packages/cwltool/utils.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/cwltool/utils.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,560 @@
+"""Shared functions and other definitions."""
+
+import collections
+import os
+import platform
+import random
+import shutil
+import stat
+import string
+import subprocess  # nosec
+import sys
+import tempfile
+import urllib
+import uuid
+from functools import partial
+from itertools import zip_longest
+from pathlib import Path, PurePosixPath
+from tempfile import NamedTemporaryFile
+from types import ModuleType
+from typing import (
+    IO,
+    Any,
+    Callable,
+    Dict,
+    Generator,
+    Iterable,
+    List,
+    MutableMapping,
+    MutableSequence,
+    NamedTuple,
+    Optional,
+    Set,
+    Union,
+    cast,
+)
+
+import pkg_resources
+import requests
+from cachecontrol import CacheControl
+from cachecontrol.caches import FileCache
+from mypy_extensions import TypedDict
+from schema_salad.exceptions import ValidationException
+from schema_salad.ref_resolver import Loader
+from typing_extensions import TYPE_CHECKING, Deque
+
+if TYPE_CHECKING:
+    from .command_line_tool import CallbackJob, ExpressionJob
+    from .job import CommandLineJob, JobBase
+    from .stdfsaccess import StdFsAccess
+    from .workflow_job import WorkflowJob
+
+__random_outdir = None  # type: Optional[str]
+
+CONTENT_LIMIT = 64 * 1024
+
+windows_default_container_id = "frolvlad/alpine-bash"
+
+DEFAULT_TMP_PREFIX = tempfile.gettempdir() + os.path.sep
+
+processes_to_kill = collections.deque()  # type: Deque[subprocess.Popen[str]]
+
+CWLOutputAtomType = Union[
+    None,
+    bool,
+    str,
+    int,
+    float,
+    MutableSequence[
+        Union[
+            None, bool, str, int, float, MutableSequence[Any], MutableMapping[str, Any]
+        ]
+    ],
+    MutableMapping[
+        str,
+        Union[
+            None, bool, str, int, float, MutableSequence[Any], MutableMapping[str, Any]
+        ],
+    ],
+]
+CWLOutputType = Union[
+    bool,
+    str,
+    int,
+    float,
+    MutableSequence[CWLOutputAtomType],
+    MutableMapping[str, CWLOutputAtomType],
+]
+CWLObjectType = MutableMapping[str, Optional[CWLOutputType]]
+JobsType = Union[
+    "CommandLineJob", "JobBase", "WorkflowJob", "ExpressionJob", "CallbackJob"
+]
+JobsGeneratorType = Generator[Optional[JobsType], None, None]
+OutputCallbackType = Callable[[Optional[CWLObjectType], str], None]
+ResolverType = Callable[["Loader", str], Optional[str]]
+DestinationsType = MutableMapping[str, Optional[CWLOutputType]]
+ScatterDestinationsType = MutableMapping[str, List[Optional[CWLOutputType]]]
+ScatterOutputCallbackType = Callable[[Optional[ScatterDestinationsType], str], None]
+SinkType = Union[CWLOutputType, CWLObjectType]
+DirectoryType = TypedDict(
+    "DirectoryType", {"class": str, "listing": List[CWLObjectType], "basename": str}
+)
+JSONAtomType = Union[Dict[str, Any], List[Any], str, int, float, bool, None]
+JSONType = Union[
+    Dict[str, JSONAtomType], List[JSONAtomType], str, int, float, bool, None
+]
+WorkflowStateItem = NamedTuple(
+    "WorkflowStateItem",
+    [
+        ("parameter", CWLObjectType),
+        ("value", Optional[CWLOutputType]),
+        ("success", str),
+    ],
+)
+
+ParametersType = List[CWLObjectType]
+StepType = CWLObjectType  # WorkflowStep
+
+
+def versionstring() -> str:
+    """Version of CWLtool used to execute the workflow."""
+    pkg = pkg_resources.require("cwltool")
+    if pkg:
+        return "{} {}".format(sys.argv[0], pkg[0].version)
+    return "{} {}".format(sys.argv[0], "unknown version")
+
+
+def aslist(thing: Any) -> MutableSequence[Any]:
+    """Wrap any non-MutableSequence/list in a list."""
+    if isinstance(thing, MutableSequence):
+        return thing
+    return [thing]
+
+
+def copytree_with_merge(src: str, dst: str) -> None:
+    if not os.path.exists(dst):
+        os.makedirs(dst)
+        shutil.copystat(src, dst)
+    lst = os.listdir(src)
+    for item in lst:
+        spath = os.path.join(src, item)
+        dpath = os.path.join(dst, item)
+        if os.path.isdir(spath):
+            copytree_with_merge(spath, dpath)
+        else:
+            shutil.copy2(spath, dpath)
+
+
+def docker_windows_path_adjust(path: str) -> str:
+    r"""
+    Adjust only windows paths for Docker.
+
+    The docker run command treats them as unix paths.
+
+    Example: 'C:\Users\foo to /C/Users/foo (Docker for Windows) or /c/Users/foo
+    (Docker toolbox).
+    """
+    if onWindows():
+        split = path.split(":")
+        if len(split) == 2:
+            if platform.win32_ver()[0] in ("7", "8"):
+                # Docker toolbox uses lowecase windows Drive letters
+                split[0] = split[0].lower()
+            else:
+                split[0] = split[0].capitalize()
+                # Docker for Windows uses uppercase windows Drive letters
+            path = ":".join(split)
+        path = path.replace(":", "").replace("\\", "/")
+        return path if path[0] == "/" else "/" + path
+    return path
+
+
+def docker_windows_reverse_path_adjust(path: str) -> str:
+    r"""
+    Change docker path (only on windows os) appropriately back to Windows path.
+
+    Example:  /C/Users/foo to C:\Users\foo
+    """
+    if path is not None and onWindows():
+        if path[0] == "/":
+            path = path[1:]
+        else:
+            raise ValueError("not a docker path")
+        splitpath = path.split("/")
+        splitpath[0] = splitpath[0] + ":"
+        return "\\".join(splitpath)
+    return path
+
+
+def docker_windows_reverse_fileuri_adjust(fileuri: str) -> str:
+    r"""
+    Convert fileuri to be MS Windows comptabile, if needed.
+
+    On docker in windows fileuri do not contain : in path
+    To convert this file uri to windows compatible add : after drive letter,
+    so file:///E/var becomes file:///E:/var
+    """
+    if fileuri is not None and onWindows():
+        if urllib.parse.urlsplit(fileuri).scheme == "file":
+            filesplit = fileuri.split("/")
+            if filesplit[3][-1] != ":":
+                filesplit[3] = filesplit[3] + ":"
+                return "/".join(filesplit)
+            return fileuri
+        raise ValueError("not a file URI")
+    return fileuri
+
+
+def onWindows() -> bool:
+    """Check if we are on Windows OS."""
+    return os.name == "nt"
+
+
+def convert_pathsep_to_unix(path: str) -> str:
+    """
+    Convert path seperators to unix style.
+
+    On windows os.path.join would use backslash to join path, since we would
+    use these paths in Docker we would convert it to use forward slashes: /
+    """
+    if path is not None and onWindows():
+        return path.replace("\\", "/")
+    return path
+
+
+def cmp_like_py2(dict1: Dict[str, Any], dict2: Dict[str, Any]) -> int:
+    """
+    Compare in the same manner as Python2.
+
+    Comparision function to be used in sorting as python3 doesn't allow sorting
+    of different types like str() and int().
+    This function re-creates sorting nature in py2 of heterogeneous list of
+    `int` and `str`
+    """
+    # extract lists from both dicts
+    first, second = dict1["position"], dict2["position"]
+    # iterate through both list till max of their size
+    for i, j in zip_longest(first, second):
+        if i == j:
+            continue
+        # in case 1st list is smaller
+        # should come first in sorting
+        if i is None:
+            return -1
+        # if 1st list is longer,
+        # it should come later in sort
+        elif j is None:
+            return 1
+
+        # if either of the list contains str element
+        # at any index, both should be str before comparing
+        if isinstance(i, str) or isinstance(j, str):
+            return 1 if str(i) > str(j) else -1
+        # int comparison otherwise
+        return 1 if i > j else -1
+    # if both lists are equal
+    return 0
+
+
+def bytes2str_in_dicts(
+    inp: Union[MutableMapping[str, Any], MutableSequence[Any], Any],
+):
+    # type: (...) -> Union[str, MutableSequence[Any], MutableMapping[str, Any]]
+    """
+    Convert any present byte string to unicode string, inplace.
+
+    input is a dict of nested dicts and lists
+    """
+    # if input is dict, recursively call for each value
+    if isinstance(inp, MutableMapping):
+        for k in inp:
+            inp[k] = bytes2str_in_dicts(inp[k])
+        return inp
+
+    # if list, iterate through list and fn call
+    # for all its elements
+    if isinstance(inp, MutableSequence):
+        for idx, value in enumerate(inp):
+            inp[idx] = bytes2str_in_dicts(value)
+            return inp
+
+    # if value is bytes, return decoded string,
+    elif isinstance(inp, bytes):
+        return inp.decode("utf-8")
+
+    # simply return elements itself
+    return inp
+
+
+def visit_class(rec: Any, cls: Iterable[Any], op: Callable[..., Any]) -> None:
+    """Apply a function to with "class" in cls."""
+    if isinstance(rec, MutableMapping):
+        if "class" in rec and rec.get("class") in cls:
+            op(rec)
+        for d in rec:
+            visit_class(rec[d], cls, op)
+    if isinstance(rec, MutableSequence):
+        for d in rec:
+            visit_class(d, cls, op)
+
+
+def visit_field(rec: Any, field: str, op: Callable[..., Any]) -> None:
+    """Apply a function to mapping with 'field'."""
+    if isinstance(rec, MutableMapping):
+        if field in rec:
+            rec[field] = op(rec[field])
+        for d in rec:
+            visit_field(rec[d], field, op)
+    if isinstance(rec, MutableSequence):
+        for d in rec:
+            visit_field(d, field, op)
+
+
+def random_outdir() -> str:
+    """Return the random directory name chosen to use for tool / workflow output."""
+    global __random_outdir
+    if not __random_outdir:
+        __random_outdir = "/" + "".join(
+            [random.choice(string.ascii_letters) for _ in range(6)]  # nosec
+        )
+        return __random_outdir
+    return __random_outdir
+
+
+#
+# Simple multi-platform (fcntl/msvrt) file locking wrapper
+#
+fcntl = None  # type: Optional[ModuleType]
+msvcrt = None  # type: Optional[ModuleType]
+try:
+    import fcntl  # type: ignore
+except ImportError:
+    import msvcrt  # type: ignore
+
+
+def shared_file_lock(fd: IO[Any]) -> None:
+    if fcntl:
+        fcntl.flock(fd.fileno(), fcntl.LOCK_SH)  # type: ignore
+    elif msvcrt:
+        msvcrt.locking(fd.fileno(), msvcrt.LK_LOCK, 1024)  # type: ignore
+
+
+def upgrade_lock(fd: IO[Any]) -> None:
+    if fcntl:
+        fcntl.flock(fd.fileno(), fcntl.LOCK_EX)  # type: ignore
+    elif msvcrt:
+        pass
+
+
+def adjustFileObjs(
+    rec, op
+):  # type: (Any, Union[Callable[..., Any], partial[Any]]) -> None
+    """Apply an update function to each File object in the object `rec`."""
+    visit_class(rec, ("File",), op)
+
+
+def adjustDirObjs(rec, op):
+    # type: (Any, Union[Callable[..., Any], partial[Any]]) -> None
+    """Apply an update function to each Directory object in the object `rec`."""
+    visit_class(rec, ("Directory",), op)
+
+
+def dedup(listing: List[CWLObjectType]) -> List[CWLObjectType]:
+    marksub = set()
+
+    def mark(d: Dict[str, str]) -> None:
+        marksub.add(d["location"])
+
+    for entry in listing:
+        if entry["class"] == "Directory":
+            for e in cast(List[CWLObjectType], entry.get("listing", [])):
+                adjustFileObjs(e, mark)
+                adjustDirObjs(e, mark)
+
+    dd = []
+    markdup = set()  # type: Set[str]
+    for r in listing:
+        if r["location"] not in marksub and r["location"] not in markdup:
+            dd.append(r)
+            markdup.add(cast(str, r["location"]))
+
+    return dd
+
+
+def get_listing(
+    fs_access: "StdFsAccess", rec: CWLObjectType, recursive: bool = True
+) -> None:
+    if rec.get("class") != "Directory":
+        finddirs = []  # type: List[CWLObjectType]
+        visit_class(rec, ("Directory",), finddirs.append)
+        for f in finddirs:
+            get_listing(fs_access, f, recursive=recursive)
+        return
+    if "listing" in rec:
+        return
+    listing = []  # type: List[CWLOutputAtomType]
+    loc = cast(str, rec["location"])
+    for ld in fs_access.listdir(loc):
+        parse = urllib.parse.urlparse(ld)
+        bn = os.path.basename(urllib.request.url2pathname(parse.path))
+        if fs_access.isdir(ld):
+            ent = {
+                "class": "Directory",
+                "location": ld,
+                "basename": bn,
+            }  # type: MutableMapping[str, Any]
+            if recursive:
+                get_listing(fs_access, ent, recursive)
+            listing.append(ent)
+        else:
+            listing.append({"class": "File", "location": ld, "basename": bn})
+    rec["listing"] = listing
+
+
+def trim_listing(obj):  # type: (Dict[str, Any]) -> None
+    """
+    Remove 'listing' field from Directory objects that are file references.
+
+    It redundant and potentially expensive to pass fully enumerated Directory
+    objects around if not explicitly needed, so delete the 'listing' field when
+    it is safe to do so.
+    """
+    if obj.get("location", "").startswith("file://") and "listing" in obj:
+        del obj["listing"]
+
+
+def downloadHttpFile(httpurl):
+    # type: (str) -> str
+    cache_session = None
+    if "XDG_CACHE_HOME" in os.environ:
+        directory = os.environ["XDG_CACHE_HOME"]
+    elif "HOME" in os.environ:
+        directory = os.environ["HOME"]
+    else:
+        directory = os.path.expanduser("~")
+
+    cache_session = CacheControl(
+        requests.Session(),
+        cache=FileCache(os.path.join(directory, ".cache", "cwltool")),
+    )
+
+    r = cache_session.get(httpurl, stream=True)
+    with NamedTemporaryFile(mode="wb", delete=False) as f:
+        for chunk in r.iter_content(chunk_size=16384):
+            if chunk:  # filter out keep-alive new chunks
+                f.write(chunk)
+    r.close()
+    return str(f.name)
+
+
+def ensure_writable(path):  # type: (str) -> None
+    if os.path.isdir(path):
+        for root, dirs, files in os.walk(path):
+            for name in files:
+                j = os.path.join(root, name)
+                st = os.stat(j)
+                mode = stat.S_IMODE(st.st_mode)
+                os.chmod(j, mode | stat.S_IWUSR)
+            for name in dirs:
+                j = os.path.join(root, name)
+                st = os.stat(j)
+                mode = stat.S_IMODE(st.st_mode)
+                os.chmod(j, mode | stat.S_IWUSR)
+    else:
+        st = os.stat(path)
+        mode = stat.S_IMODE(st.st_mode)
+        os.chmod(path, mode | stat.S_IWUSR)
+
+
+def ensure_non_writable(path):  # type: (str) -> None
+    if os.path.isdir(path):
+        for root, dirs, files in os.walk(path):
+            for name in files:
+                j = os.path.join(root, name)
+                st = os.stat(j)
+                mode = stat.S_IMODE(st.st_mode)
+                os.chmod(j, mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH)
+            for name in dirs:
+                j = os.path.join(root, name)
+                st = os.stat(j)
+                mode = stat.S_IMODE(st.st_mode)
+                os.chmod(j, mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH)
+    else:
+        st = os.stat(path)
+        mode = stat.S_IMODE(st.st_mode)
+        os.chmod(path, mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH)
+
+
+def normalizeFilesDirs(
+    job: Optional[
+        Union[
+            MutableSequence[MutableMapping[str, Any]],
+            MutableMapping[str, Any],
+            DirectoryType,
+        ]
+    ]
+) -> None:
+    def addLocation(d):  # type: (Dict[str, Any]) -> None
+        if "location" not in d:
+            if d["class"] == "File" and ("contents" not in d):
+                raise ValidationException(
+                    "Anonymous file object must have 'contents' and 'basename' fields."
+                )
+            if d["class"] == "Directory" and (
+                "listing" not in d or "basename" not in d
+            ):
+                raise ValidationException(
+                    "Anonymous directory object must have 'listing' and 'basename' fields."
+                )
+            d["location"] = "_:" + str(uuid.uuid4())
+            if "basename" not in d:
+                d["basename"] = d["location"][2:]
+
+        parse = urllib.parse.urlparse(d["location"])
+        path = parse.path
+        # strip trailing slash
+        if path.endswith("/"):
+            if d["class"] != "Directory":
+                raise ValidationException(
+                    "location '%s' ends with '/' but is not a Directory" % d["location"]
+                )
+            path = path.rstrip("/")
+            d["location"] = urllib.parse.urlunparse(
+                (
+                    parse.scheme,
+                    parse.netloc,
+                    path,
+                    parse.params,
+                    parse.query,
+                    parse.fragment,
+                )
+            )
+
+        if not d.get("basename"):
+            if path.startswith("_:"):
+                d["basename"] = str(path[2:])
+            else:
+                d["basename"] = str(os.path.basename(urllib.request.url2pathname(path)))
+
+        if d["class"] == "File":
+            nr, ne = os.path.splitext(d["basename"])
+            if d.get("nameroot") != nr:
+                d["nameroot"] = str(nr)
+            if d.get("nameext") != ne:
+                d["nameext"] = str(ne)
+
+    visit_class(job, ("File", "Directory"), addLocation)
+
+
+def posix_path(local_path: str) -> str:
+    return str(PurePosixPath(Path(local_path)))
+
+
+def local_path(posix_path: str) -> str:
+    return str(Path(posix_path))
+
+
+def create_tmp_dir(tmpdir_prefix: str) -> str:
+    """Create a temporary directory that respects the given tmpdir_prefix."""
+    tmp_dir, tmp_prefix = os.path.split(tmpdir_prefix)
+    return tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir)