diff env/lib/python3.9/site-packages/planemo/io.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/planemo/io.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,429 @@
+"""Planemo I/O abstractions and utilities."""
+from __future__ import absolute_import
+from __future__ import print_function
+
+import contextlib
+import errno
+import fnmatch
+import os
+import shutil
+import subprocess
+import sys
+import tempfile
+import time
+from sys import platform as _platform
+from xml.sax.saxutils import escape
+
+import click
+from galaxy.util import commands
+from galaxy.util.commands import download_command
+from six import (
+    string_types,
+    StringIO
+)
+
+from .exit_codes import (
+    EXIT_CODE_NO_SUCH_TARGET,
+    EXIT_CODE_OK,
+)
+
+
+IS_OS_X = _platform == "darwin"
+
+
+def args_to_str(args):
+    """Collapse list of arguments in a commmand-line string."""
+    if args is None or isinstance(args, string_types):
+        return args
+    else:
+        return commands.argv_to_str(args)
+
+
+def communicate(cmds, **kwds):
+    """Execute shell command and wait for output.
+
+    With click-aware I/O handling, pretty display of the command being executed,
+    and formatted exception if the exit code is not 0.
+    """
+    cmd_string = args_to_str(cmds)
+    info(cmd_string)
+    p = commands.shell_process(cmds, **kwds)
+    if kwds.get("stdout", None) is None and commands.redirecting_io(sys=sys):
+        output = commands.redirect_aware_commmunicate(p)
+    else:
+        output = p.communicate()
+
+    if p.returncode != 0:
+        template = "Problem executing commands {0} - ({1}, {2})"
+        msg = template.format(cmd_string, output[0], output[1])
+        raise RuntimeError(msg)
+    return output
+
+
+def shell(cmds, **kwds):
+    """Print and execute shell command."""
+    cmd_string = args_to_str(cmds)
+    info(cmd_string)
+    return commands.shell(cmds, **kwds)
+
+
+def info(message, *args):
+    """Print stylized info message to the screen."""
+    if args:
+        message = message % args
+    click.echo(click.style(message, bold=True, fg='green'))
+
+
+def error(message, *args):
+    """Print stylized error message to the screen."""
+    if args:
+        message = message % args
+    click.echo(click.style(message, bold=True, fg='red'), err=True)
+
+
+def warn(message, *args):
+    """Print stylized warning message to the screen."""
+    if args:
+        message = message % args
+    click.echo(click.style(message, fg='red'), err=True)
+
+
+def can_write_to_path(path: str, **kwds):
+    """Implement -f/--force logic.
+
+    If supplied path exists, print an error message and return False
+    unless --force caused the 'force' keyword argument to be True.
+    """
+    if not kwds["force"] and os.path.exists(path):
+        error("%s already exists, exiting." % path)
+        return False
+    return True
+
+
+def shell_join(*args):
+    """Join potentially empty commands together with '&&'."""
+    return " && ".join(args_to_str(_) for _ in args if _)
+
+
+def write_file(path, content, force=True):
+    if os.path.exists(path) and not force:
+        return
+
+    with open(path, "w") as f:
+        f.write(content)
+
+
+def untar_to(url, tar_args=None, path=None, dest_dir=None):
+    if tar_args:
+        assert not (path and dest_dir)
+        if dest_dir:
+            if not os.path.exists(dest_dir):
+                os.makedirs(dest_dir)
+            tar_args[0:0] = ['-C', dest_dir]
+        if path:
+            tar_args.insert(0, '-O')
+
+        download_cmd = download_command(url)
+        download_p = commands.shell_process(download_cmd, stdout=subprocess.PIPE)
+        untar_cmd = ['tar'] + tar_args
+        if path:
+            with open(path, 'wb') as fh:
+                shell(untar_cmd, stdin=download_p.stdout, stdout=fh)
+        else:
+            shell(untar_cmd, stdin=download_p.stdout)
+        download_p.wait()
+    else:
+        cmd = download_command(url, to=path)
+        shell(cmd)
+
+
+def find_matching_directories(path, pattern, recursive):
+    """Find directories below supplied path with file matching pattern.
+
+    Returns an empty list if no matches are found, and if recursive is False
+    only the top directory specified by path will be considered.
+    """
+    dirs = []
+    if recursive:
+        if not os.path.isdir(path):
+            template = "--recursive specified with non-directory path [%s]"
+            message = template % (path)
+            raise Exception(message)
+
+        for base_path, dirnames, filenames in os.walk(path):
+            dirnames.sort()
+            for filename in fnmatch.filter(filenames, pattern):
+                dirs.append(base_path)
+    else:
+        if os.path.exists(os.path.join(path, pattern)):
+            dirs.append(path)
+        elif os.path.basename(path) == pattern:
+            dirs.append(os.path.dirname(path))
+    return dirs
+
+
+@contextlib.contextmanager
+def real_io():
+    """Ensure stdout and stderr have supported ``fileno()`` method.
+
+    nosetests replaces these streams with :class:`StringIO` objects
+    that may not work the same in every situtation - :func:`subprocess.Popen`
+    calls in particular.
+    """
+    original_stdout = sys.stdout
+    original_stderr = sys.stderr
+    try:
+        if commands.redirecting_io(sys=sys):
+            sys.stdout = sys.__stdout__
+            sys.stderr = sys.__stderr__
+        yield
+    finally:
+        sys.stdout = original_stdout
+        sys.stderr = original_stderr
+
+
+@contextlib.contextmanager
+def temp_directory(prefix="planemo_tmp_", dir=None, **kwds):
+    if dir is not None:
+        try:
+            os.makedirs(dir)
+        except OSError as e:
+            if e.errno != errno.EEXIST:
+                raise
+    temp_dir = tempfile.mkdtemp(prefix=prefix, dir=dir, **kwds)
+    try:
+        yield temp_dir
+    finally:
+        shutil.rmtree(temp_dir)
+
+
+def ps1_for_path(path, base="PS1"):
+    """ Used by environment commands to build a PS1 shell
+    variables for tool or directory of tools.
+    """
+    file_name = os.path.basename(path)
+    base_name = os.path.splitext(file_name)[0]
+    ps1 = "(%s)${%s}" % (base_name, base)
+    return ps1
+
+
+def kill_pid_file(pid_file: str):
+    """Kill process group corresponding to specified pid file."""
+    try:
+        os.stat(pid_file)
+    except OSError as e:
+        if e.errno == errno.ENOENT:
+            return False
+
+    with open(pid_file, "r") as fh:
+        pid = int(fh.read())
+    kill_posix(pid)
+    try:
+        os.unlink(pid_file)
+    except Exception:
+        pass
+
+
+def kill_posix(pid: int):
+    """Kill process group corresponding to specified pid."""
+    def _check_pid():
+        try:
+            os.kill(pid, 0)
+            return True
+        except OSError:
+            return False
+
+    if _check_pid():
+        for sig in [15, 9]:
+            try:
+                # gunicorn (unlike paste), seem to require killing process
+                # group
+                os.killpg(os.getpgid(pid), sig)
+            except OSError:
+                return
+            time.sleep(1)
+            if not _check_pid():
+                return
+
+
+@contextlib.contextmanager
+def conditionally_captured_io(capture, tee=False):
+    """If capture is True, capture stdout and stderr for logging."""
+    captured_std = []
+    if capture:
+        with _Capturing() as captured_std:
+            yield captured_std
+        if tee:
+            tee_captured_output(captured_std)
+    else:
+        yield
+
+
+@contextlib.contextmanager
+def captured_io_for_xunit(kwds, captured_io):
+    """Capture Planemo I/O and timing for outputting to an xUnit report."""
+    captured_std = []
+    with_xunit = kwds.get('report_xunit', False)
+    with conditionally_captured_io(with_xunit, tee=True):
+        time1 = time.time()
+        yield
+        time2 = time.time()
+
+    if with_xunit:
+        stdout = [escape(m['data']) for m in captured_std
+                  if m['logger'] == 'stdout']
+        stderr = [escape(m['data']) for m in captured_std
+                  if m['logger'] == 'stderr']
+        captured_io["stdout"] = stdout
+        captured_io["stderr"] = stderr
+        captured_io["time"] = (time2 - time1)
+    else:
+        captured_io["stdout"] = None
+        captured_io["stderr"] = None
+        captured_io["time"] = None
+
+
+class _Capturing(list):
+    """Function context which captures stdout/stderr
+
+    This keeps planemo's codebase clean without requiring planemo to hold onto
+    messages, or pass user-facing messages back at all. This could probably be
+    solved by swapping planemo entirely to a logger and reading from/writing
+    to that, but this is easier.
+
+    This swaps sys.std{out,err} with StringIOs and then makes that output
+    available.
+    """
+    # http://stackoverflow.com/a/16571630
+
+    def __enter__(self):
+        self._stdout = sys.stdout
+        self._stderr = sys.stderr
+        sys.stdout = self._stringio_stdout = StringIO()
+        sys.stderr = self._stringio_stderr = StringIO()
+        return self
+
+    def __exit__(self, *args):
+        self.extend([{'logger': 'stdout', 'data': x} for x in
+                     self._stringio_stdout.getvalue().splitlines()])
+        self.extend([{'logger': 'stderr', 'data': x} for x in
+                     self._stringio_stderr.getvalue().splitlines()])
+
+        sys.stdout = self._stdout
+        sys.stderr = self._stderr
+
+
+def tee_captured_output(output):
+    """tee captured standard output and standard error if needed.
+
+    For messages captured with Capturing, send them to their correct
+    locations so as to not interfere with normal user experience.
+    """
+    for message in output:
+        # Append '\n' due to `splitlines()` above
+        if message['logger'] == 'stdout':
+            sys.stdout.write(message['data'] + '\n')
+        if message['logger'] == 'stderr':
+            sys.stderr.write(message['data'] + '\n')
+
+
+def wait_on(function, desc, timeout=5, polling_backoff=0):
+    """Wait on given function's readiness.
+
+    Grow the polling interval incrementally by the polling_backoff.
+    """
+    delta = .25
+    timing = 0
+    while True:
+        if timing > timeout:
+            message = "Timed out waiting on %s." % desc
+            raise Exception(message)
+        timing += delta
+        delta += polling_backoff
+        value = function()
+        if value is not None:
+            return value
+        time.sleep(delta)
+
+
+@contextlib.contextmanager
+def open_file_or_standard_output(path, *args, **kwds):
+    """Open file but respect '-' as referring to stdout."""
+    if path == "-":
+        yield sys.stdout
+    else:
+        yield open(path, *args, **kwds)
+
+
+def filter_paths(paths, cwd=None, **kwds):
+    if cwd is None:
+        cwd = os.getcwd()
+
+    def norm(path):
+        if not os.path.isabs(path):
+            path = os.path.join(cwd, path)
+        return os.path.normpath(path)
+
+    def exclude_func(exclude_path):
+        def path_startswith(p):
+            """Check that p starts with exclude_path and that the first
+            character of p not included in exclude_path (if any) is the
+            directory separator.
+            """
+            norm_p = norm(p)
+            norm_exclude_path = norm(exclude_path)
+            if norm_p.startswith(norm_exclude_path):
+                return norm_p[len(norm_exclude_path):len(norm_exclude_path) + 1] in ['', os.sep]
+            return False
+        return path_startswith
+
+    filters_as_funcs = []
+    filters_as_funcs.extend(map(exclude_func, kwds.get("exclude", [])))
+
+    for exclude_paths_ins in kwds.get("exclude_from", []):
+        with open(exclude_paths_ins, "r") as f:
+            for line in f.readlines():
+                line = line.strip()
+                if not line or line.startswith("#"):
+                    continue
+                filters_as_funcs.append(exclude_func(line))
+
+    return [p for p in paths if not any(f(p) for f in filters_as_funcs)]
+
+
+def coalesce_return_codes(ret_codes, assert_at_least_one=False):
+    # Return 0 if everything is fine, otherwise pick the least
+    # specific non-0 return code - preferring to report errors
+    # to other non-0 exit codes.
+    if assert_at_least_one and len(ret_codes) == 0:
+        return EXIT_CODE_NO_SUCH_TARGET
+
+    coalesced_return_code = EXIT_CODE_OK
+    for ret_code in ret_codes:
+        # None is equivalent to 0 in these methods.
+        ret_code = 0 if ret_code is None else ret_code
+        if ret_code == 0:
+            # Everything is fine, keep moving...
+            pass
+        elif coalesced_return_code == 0:
+            coalesced_return_code = ret_code
+        # At this point in logic both ret_code and coalesced_return_code are
+        # are non-zero
+        elif ret_code < 0:
+            # Error state, this should override eveything else.
+            coalesced_return_code = ret_code
+        elif ret_code > 0 and coalesced_return_code < 0:
+            # Keep error state recorded.
+            pass
+        elif ret_code > 0:
+            # Lets somewhat arbitrarily call the smaller exit code
+            # the less specific.
+            coalesced_return_code = min(ret_code, coalesced_return_code)
+
+    if coalesced_return_code < 0:
+        # Map -1 => 254, -2 => 253, etc...
+        # Not sure it is helpful to have negative error codes
+        # this was a design and API mistake in planemo.
+        coalesced_return_code = 255 + coalesced_return_code
+
+    return coalesced_return_code