Mercurial > repos > shellac > sam_consensus_v3
view env/lib/python3.9/site-packages/cwltool/utils.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line source
"""Shared functions and other definitions.""" import collections import os import platform import random import shutil import stat import string import subprocess # nosec import sys import tempfile import urllib import uuid from functools import partial from itertools import zip_longest from pathlib import Path, PurePosixPath from tempfile import NamedTemporaryFile from types import ModuleType from typing import ( IO, Any, Callable, Dict, Generator, Iterable, List, MutableMapping, MutableSequence, NamedTuple, Optional, Set, Union, cast, ) import pkg_resources import requests from cachecontrol import CacheControl from cachecontrol.caches import FileCache from mypy_extensions import TypedDict from schema_salad.exceptions import ValidationException from schema_salad.ref_resolver import Loader from typing_extensions import TYPE_CHECKING, Deque if TYPE_CHECKING: from .command_line_tool import CallbackJob, ExpressionJob from .job import CommandLineJob, JobBase from .stdfsaccess import StdFsAccess from .workflow_job import WorkflowJob __random_outdir = None # type: Optional[str] CONTENT_LIMIT = 64 * 1024 windows_default_container_id = "frolvlad/alpine-bash" DEFAULT_TMP_PREFIX = tempfile.gettempdir() + os.path.sep processes_to_kill = collections.deque() # type: Deque[subprocess.Popen[str]] CWLOutputAtomType = Union[ None, bool, str, int, float, MutableSequence[ Union[ None, bool, str, int, float, MutableSequence[Any], MutableMapping[str, Any] ] ], MutableMapping[ str, Union[ None, bool, str, int, float, MutableSequence[Any], MutableMapping[str, Any] ], ], ] CWLOutputType = Union[ bool, str, int, float, MutableSequence[CWLOutputAtomType], MutableMapping[str, CWLOutputAtomType], ] CWLObjectType = MutableMapping[str, Optional[CWLOutputType]] JobsType = Union[ "CommandLineJob", "JobBase", "WorkflowJob", "ExpressionJob", "CallbackJob" ] JobsGeneratorType = Generator[Optional[JobsType], None, None] OutputCallbackType = Callable[[Optional[CWLObjectType], str], None] ResolverType = Callable[["Loader", str], Optional[str]] DestinationsType = MutableMapping[str, Optional[CWLOutputType]] ScatterDestinationsType = MutableMapping[str, List[Optional[CWLOutputType]]] ScatterOutputCallbackType = Callable[[Optional[ScatterDestinationsType], str], None] SinkType = Union[CWLOutputType, CWLObjectType] DirectoryType = TypedDict( "DirectoryType", {"class": str, "listing": List[CWLObjectType], "basename": str} ) JSONAtomType = Union[Dict[str, Any], List[Any], str, int, float, bool, None] JSONType = Union[ Dict[str, JSONAtomType], List[JSONAtomType], str, int, float, bool, None ] WorkflowStateItem = NamedTuple( "WorkflowStateItem", [ ("parameter", CWLObjectType), ("value", Optional[CWLOutputType]), ("success", str), ], ) ParametersType = List[CWLObjectType] StepType = CWLObjectType # WorkflowStep def versionstring() -> str: """Version of CWLtool used to execute the workflow.""" pkg = pkg_resources.require("cwltool") if pkg: return "{} {}".format(sys.argv[0], pkg[0].version) return "{} {}".format(sys.argv[0], "unknown version") def aslist(thing: Any) -> MutableSequence[Any]: """Wrap any non-MutableSequence/list in a list.""" if isinstance(thing, MutableSequence): return thing return [thing] def copytree_with_merge(src: str, dst: str) -> None: if not os.path.exists(dst): os.makedirs(dst) shutil.copystat(src, dst) lst = os.listdir(src) for item in lst: spath = os.path.join(src, item) dpath = os.path.join(dst, item) if os.path.isdir(spath): copytree_with_merge(spath, dpath) else: shutil.copy2(spath, dpath) def docker_windows_path_adjust(path: str) -> str: r""" Adjust only windows paths for Docker. The docker run command treats them as unix paths. Example: 'C:\Users\foo to /C/Users/foo (Docker for Windows) or /c/Users/foo (Docker toolbox). """ if onWindows(): split = path.split(":") if len(split) == 2: if platform.win32_ver()[0] in ("7", "8"): # Docker toolbox uses lowecase windows Drive letters split[0] = split[0].lower() else: split[0] = split[0].capitalize() # Docker for Windows uses uppercase windows Drive letters path = ":".join(split) path = path.replace(":", "").replace("\\", "/") return path if path[0] == "/" else "/" + path return path def docker_windows_reverse_path_adjust(path: str) -> str: r""" Change docker path (only on windows os) appropriately back to Windows path. Example: /C/Users/foo to C:\Users\foo """ if path is not None and onWindows(): if path[0] == "/": path = path[1:] else: raise ValueError("not a docker path") splitpath = path.split("/") splitpath[0] = splitpath[0] + ":" return "\\".join(splitpath) return path def docker_windows_reverse_fileuri_adjust(fileuri: str) -> str: r""" Convert fileuri to be MS Windows comptabile, if needed. On docker in windows fileuri do not contain : in path To convert this file uri to windows compatible add : after drive letter, so file:///E/var becomes file:///E:/var """ if fileuri is not None and onWindows(): if urllib.parse.urlsplit(fileuri).scheme == "file": filesplit = fileuri.split("/") if filesplit[3][-1] != ":": filesplit[3] = filesplit[3] + ":" return "/".join(filesplit) return fileuri raise ValueError("not a file URI") return fileuri def onWindows() -> bool: """Check if we are on Windows OS.""" return os.name == "nt" def convert_pathsep_to_unix(path: str) -> str: """ Convert path seperators to unix style. On windows os.path.join would use backslash to join path, since we would use these paths in Docker we would convert it to use forward slashes: / """ if path is not None and onWindows(): return path.replace("\\", "/") return path def cmp_like_py2(dict1: Dict[str, Any], dict2: Dict[str, Any]) -> int: """ Compare in the same manner as Python2. Comparision function to be used in sorting as python3 doesn't allow sorting of different types like str() and int(). This function re-creates sorting nature in py2 of heterogeneous list of `int` and `str` """ # extract lists from both dicts first, second = dict1["position"], dict2["position"] # iterate through both list till max of their size for i, j in zip_longest(first, second): if i == j: continue # in case 1st list is smaller # should come first in sorting if i is None: return -1 # if 1st list is longer, # it should come later in sort elif j is None: return 1 # if either of the list contains str element # at any index, both should be str before comparing if isinstance(i, str) or isinstance(j, str): return 1 if str(i) > str(j) else -1 # int comparison otherwise return 1 if i > j else -1 # if both lists are equal return 0 def bytes2str_in_dicts( inp: Union[MutableMapping[str, Any], MutableSequence[Any], Any], ): # type: (...) -> Union[str, MutableSequence[Any], MutableMapping[str, Any]] """ Convert any present byte string to unicode string, inplace. input is a dict of nested dicts and lists """ # if input is dict, recursively call for each value if isinstance(inp, MutableMapping): for k in inp: inp[k] = bytes2str_in_dicts(inp[k]) return inp # if list, iterate through list and fn call # for all its elements if isinstance(inp, MutableSequence): for idx, value in enumerate(inp): inp[idx] = bytes2str_in_dicts(value) return inp # if value is bytes, return decoded string, elif isinstance(inp, bytes): return inp.decode("utf-8") # simply return elements itself return inp def visit_class(rec: Any, cls: Iterable[Any], op: Callable[..., Any]) -> None: """Apply a function to with "class" in cls.""" if isinstance(rec, MutableMapping): if "class" in rec and rec.get("class") in cls: op(rec) for d in rec: visit_class(rec[d], cls, op) if isinstance(rec, MutableSequence): for d in rec: visit_class(d, cls, op) def visit_field(rec: Any, field: str, op: Callable[..., Any]) -> None: """Apply a function to mapping with 'field'.""" if isinstance(rec, MutableMapping): if field in rec: rec[field] = op(rec[field]) for d in rec: visit_field(rec[d], field, op) if isinstance(rec, MutableSequence): for d in rec: visit_field(d, field, op) def random_outdir() -> str: """Return the random directory name chosen to use for tool / workflow output.""" global __random_outdir if not __random_outdir: __random_outdir = "/" + "".join( [random.choice(string.ascii_letters) for _ in range(6)] # nosec ) return __random_outdir return __random_outdir # # Simple multi-platform (fcntl/msvrt) file locking wrapper # fcntl = None # type: Optional[ModuleType] msvcrt = None # type: Optional[ModuleType] try: import fcntl # type: ignore except ImportError: import msvcrt # type: ignore def shared_file_lock(fd: IO[Any]) -> None: if fcntl: fcntl.flock(fd.fileno(), fcntl.LOCK_SH) # type: ignore elif msvcrt: msvcrt.locking(fd.fileno(), msvcrt.LK_LOCK, 1024) # type: ignore def upgrade_lock(fd: IO[Any]) -> None: if fcntl: fcntl.flock(fd.fileno(), fcntl.LOCK_EX) # type: ignore elif msvcrt: pass def adjustFileObjs( rec, op ): # type: (Any, Union[Callable[..., Any], partial[Any]]) -> None """Apply an update function to each File object in the object `rec`.""" visit_class(rec, ("File",), op) def adjustDirObjs(rec, op): # type: (Any, Union[Callable[..., Any], partial[Any]]) -> None """Apply an update function to each Directory object in the object `rec`.""" visit_class(rec, ("Directory",), op) def dedup(listing: List[CWLObjectType]) -> List[CWLObjectType]: marksub = set() def mark(d: Dict[str, str]) -> None: marksub.add(d["location"]) for entry in listing: if entry["class"] == "Directory": for e in cast(List[CWLObjectType], entry.get("listing", [])): adjustFileObjs(e, mark) adjustDirObjs(e, mark) dd = [] markdup = set() # type: Set[str] for r in listing: if r["location"] not in marksub and r["location"] not in markdup: dd.append(r) markdup.add(cast(str, r["location"])) return dd def get_listing( fs_access: "StdFsAccess", rec: CWLObjectType, recursive: bool = True ) -> None: if rec.get("class") != "Directory": finddirs = [] # type: List[CWLObjectType] visit_class(rec, ("Directory",), finddirs.append) for f in finddirs: get_listing(fs_access, f, recursive=recursive) return if "listing" in rec: return listing = [] # type: List[CWLOutputAtomType] loc = cast(str, rec["location"]) for ld in fs_access.listdir(loc): parse = urllib.parse.urlparse(ld) bn = os.path.basename(urllib.request.url2pathname(parse.path)) if fs_access.isdir(ld): ent = { "class": "Directory", "location": ld, "basename": bn, } # type: MutableMapping[str, Any] if recursive: get_listing(fs_access, ent, recursive) listing.append(ent) else: listing.append({"class": "File", "location": ld, "basename": bn}) rec["listing"] = listing def trim_listing(obj): # type: (Dict[str, Any]) -> None """ Remove 'listing' field from Directory objects that are file references. It redundant and potentially expensive to pass fully enumerated Directory objects around if not explicitly needed, so delete the 'listing' field when it is safe to do so. """ if obj.get("location", "").startswith("file://") and "listing" in obj: del obj["listing"] def downloadHttpFile(httpurl): # type: (str) -> str cache_session = None if "XDG_CACHE_HOME" in os.environ: directory = os.environ["XDG_CACHE_HOME"] elif "HOME" in os.environ: directory = os.environ["HOME"] else: directory = os.path.expanduser("~") cache_session = CacheControl( requests.Session(), cache=FileCache(os.path.join(directory, ".cache", "cwltool")), ) r = cache_session.get(httpurl, stream=True) with NamedTemporaryFile(mode="wb", delete=False) as f: for chunk in r.iter_content(chunk_size=16384): if chunk: # filter out keep-alive new chunks f.write(chunk) r.close() return str(f.name) def ensure_writable(path): # type: (str) -> None if os.path.isdir(path): for root, dirs, files in os.walk(path): for name in files: j = os.path.join(root, name) st = os.stat(j) mode = stat.S_IMODE(st.st_mode) os.chmod(j, mode | stat.S_IWUSR) for name in dirs: j = os.path.join(root, name) st = os.stat(j) mode = stat.S_IMODE(st.st_mode) os.chmod(j, mode | stat.S_IWUSR) else: st = os.stat(path) mode = stat.S_IMODE(st.st_mode) os.chmod(path, mode | stat.S_IWUSR) def ensure_non_writable(path): # type: (str) -> None if os.path.isdir(path): for root, dirs, files in os.walk(path): for name in files: j = os.path.join(root, name) st = os.stat(j) mode = stat.S_IMODE(st.st_mode) os.chmod(j, mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH) for name in dirs: j = os.path.join(root, name) st = os.stat(j) mode = stat.S_IMODE(st.st_mode) os.chmod(j, mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH) else: st = os.stat(path) mode = stat.S_IMODE(st.st_mode) os.chmod(path, mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH) def normalizeFilesDirs( job: Optional[ Union[ MutableSequence[MutableMapping[str, Any]], MutableMapping[str, Any], DirectoryType, ] ] ) -> None: def addLocation(d): # type: (Dict[str, Any]) -> None if "location" not in d: if d["class"] == "File" and ("contents" not in d): raise ValidationException( "Anonymous file object must have 'contents' and 'basename' fields." ) if d["class"] == "Directory" and ( "listing" not in d or "basename" not in d ): raise ValidationException( "Anonymous directory object must have 'listing' and 'basename' fields." ) d["location"] = "_:" + str(uuid.uuid4()) if "basename" not in d: d["basename"] = d["location"][2:] parse = urllib.parse.urlparse(d["location"]) path = parse.path # strip trailing slash if path.endswith("/"): if d["class"] != "Directory": raise ValidationException( "location '%s' ends with '/' but is not a Directory" % d["location"] ) path = path.rstrip("/") d["location"] = urllib.parse.urlunparse( ( parse.scheme, parse.netloc, path, parse.params, parse.query, parse.fragment, ) ) if not d.get("basename"): if path.startswith("_:"): d["basename"] = str(path[2:]) else: d["basename"] = str(os.path.basename(urllib.request.url2pathname(path))) if d["class"] == "File": nr, ne = os.path.splitext(d["basename"]) if d.get("nameroot") != nr: d["nameroot"] = str(nr) if d.get("nameext") != ne: d["nameext"] = str(ne) visit_class(job, ("File", "Directory"), addLocation) def posix_path(local_path: str) -> str: return str(PurePosixPath(Path(local_path))) def local_path(posix_path: str) -> str: return str(Path(posix_path)) def create_tmp_dir(tmpdir_prefix: str) -> str: """Create a temporary directory that respects the given tmpdir_prefix.""" tmp_dir, tmp_prefix = os.path.split(tmpdir_prefix) return tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir)