Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/psutil/_common.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
| author | shellac |
|---|---|
| date | Mon, 01 Jun 2020 08:59:25 -0400 |
| parents | 79f47841a781 |
| children |
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/psutil/_common.py Thu May 14 16:47:39 2020 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,846 +0,0 @@ -# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""Common objects shared by __init__.py and _ps*.py modules.""" - -# Note: this module is imported by setup.py so it should not import -# psutil or third-party modules. - -from __future__ import division, print_function - -import contextlib -import errno -import functools -import os -import socket -import stat -import sys -import threading -import warnings -from collections import defaultdict -from collections import namedtuple -from socket import AF_INET -from socket import SOCK_DGRAM -from socket import SOCK_STREAM - -try: - from socket import AF_INET6 -except ImportError: - AF_INET6 = None -try: - from socket import AF_UNIX -except ImportError: - AF_UNIX = None - -if sys.version_info >= (3, 4): - import enum -else: - enum = None - - -# can't take it from _common.py as this script is imported by setup.py -PY3 = sys.version_info[0] == 3 - -__all__ = [ - # OS constants - 'FREEBSD', 'BSD', 'LINUX', 'NETBSD', 'OPENBSD', 'MACOS', 'OSX', 'POSIX', - 'SUNOS', 'WINDOWS', - # connection constants - 'CONN_CLOSE', 'CONN_CLOSE_WAIT', 'CONN_CLOSING', 'CONN_ESTABLISHED', - 'CONN_FIN_WAIT1', 'CONN_FIN_WAIT2', 'CONN_LAST_ACK', 'CONN_LISTEN', - 'CONN_NONE', 'CONN_SYN_RECV', 'CONN_SYN_SENT', 'CONN_TIME_WAIT', - # net constants - 'NIC_DUPLEX_FULL', 'NIC_DUPLEX_HALF', 'NIC_DUPLEX_UNKNOWN', - # process status constants - 'STATUS_DEAD', 'STATUS_DISK_SLEEP', 'STATUS_IDLE', 'STATUS_LOCKED', - 'STATUS_RUNNING', 'STATUS_SLEEPING', 'STATUS_STOPPED', 'STATUS_SUSPENDED', - 'STATUS_TRACING_STOP', 'STATUS_WAITING', 'STATUS_WAKE_KILL', - 'STATUS_WAKING', 'STATUS_ZOMBIE', 'STATUS_PARKED', - # other constants - 'ENCODING', 'ENCODING_ERRS', 'AF_INET6', - # named tuples - 'pconn', 'pcputimes', 'pctxsw', 'pgids', 'pio', 'pionice', 'popenfile', - 'pthread', 'puids', 'sconn', 'scpustats', 'sdiskio', 'sdiskpart', - 'sdiskusage', 'snetio', 'snicaddr', 'snicstats', 'sswap', 'suser', - # utility functions - 'conn_tmap', 'deprecated_method', 'isfile_strict', 'memoize', - 'parse_environ_block', 'path_exists_strict', 'usage_percent', - 'supports_ipv6', 'sockfam_to_enum', 'socktype_to_enum', "wrap_numbers", - 'bytes2human', 'conn_to_ntuple', 'debug', - # shell utils - 'hilite', 'term_supports_colors', 'print_color', -] - - -# =================================================================== -# --- OS constants -# =================================================================== - - -POSIX = os.name == "posix" -WINDOWS = os.name == "nt" -LINUX = sys.platform.startswith("linux") -MACOS = sys.platform.startswith("darwin") -OSX = MACOS # deprecated alias -FREEBSD = sys.platform.startswith("freebsd") -OPENBSD = sys.platform.startswith("openbsd") -NETBSD = sys.platform.startswith("netbsd") -BSD = FREEBSD or OPENBSD or NETBSD -SUNOS = sys.platform.startswith(("sunos", "solaris")) -AIX = sys.platform.startswith("aix") - - -# =================================================================== -# --- API constants -# =================================================================== - - -# Process.status() -STATUS_RUNNING = "running" -STATUS_SLEEPING = "sleeping" -STATUS_DISK_SLEEP = "disk-sleep" -STATUS_STOPPED = "stopped" -STATUS_TRACING_STOP = "tracing-stop" -STATUS_ZOMBIE = "zombie" -STATUS_DEAD = "dead" -STATUS_WAKE_KILL = "wake-kill" -STATUS_WAKING = "waking" -STATUS_IDLE = "idle" # Linux, macOS, FreeBSD -STATUS_LOCKED = "locked" # FreeBSD -STATUS_WAITING = "waiting" # FreeBSD -STATUS_SUSPENDED = "suspended" # NetBSD -STATUS_PARKED = "parked" # Linux - -# Process.connections() and psutil.net_connections() -CONN_ESTABLISHED = "ESTABLISHED" -CONN_SYN_SENT = "SYN_SENT" -CONN_SYN_RECV = "SYN_RECV" -CONN_FIN_WAIT1 = "FIN_WAIT1" -CONN_FIN_WAIT2 = "FIN_WAIT2" -CONN_TIME_WAIT = "TIME_WAIT" -CONN_CLOSE = "CLOSE" -CONN_CLOSE_WAIT = "CLOSE_WAIT" -CONN_LAST_ACK = "LAST_ACK" -CONN_LISTEN = "LISTEN" -CONN_CLOSING = "CLOSING" -CONN_NONE = "NONE" - -# net_if_stats() -if enum is None: - NIC_DUPLEX_FULL = 2 - NIC_DUPLEX_HALF = 1 - NIC_DUPLEX_UNKNOWN = 0 -else: - class NicDuplex(enum.IntEnum): - NIC_DUPLEX_FULL = 2 - NIC_DUPLEX_HALF = 1 - NIC_DUPLEX_UNKNOWN = 0 - - globals().update(NicDuplex.__members__) - -# sensors_battery() -if enum is None: - POWER_TIME_UNKNOWN = -1 - POWER_TIME_UNLIMITED = -2 -else: - class BatteryTime(enum.IntEnum): - POWER_TIME_UNKNOWN = -1 - POWER_TIME_UNLIMITED = -2 - - globals().update(BatteryTime.__members__) - -# --- others - -ENCODING = sys.getfilesystemencoding() -if not PY3: - ENCODING_ERRS = "replace" -else: - try: - ENCODING_ERRS = sys.getfilesystemencodeerrors() # py 3.6 - except AttributeError: - ENCODING_ERRS = "surrogateescape" if POSIX else "replace" - - -# =================================================================== -# --- namedtuples -# =================================================================== - -# --- for system functions - -# psutil.swap_memory() -sswap = namedtuple('sswap', ['total', 'used', 'free', 'percent', 'sin', - 'sout']) -# psutil.disk_usage() -sdiskusage = namedtuple('sdiskusage', ['total', 'used', 'free', 'percent']) -# psutil.disk_io_counters() -sdiskio = namedtuple('sdiskio', ['read_count', 'write_count', - 'read_bytes', 'write_bytes', - 'read_time', 'write_time']) -# psutil.disk_partitions() -sdiskpart = namedtuple('sdiskpart', ['device', 'mountpoint', 'fstype', 'opts']) -# psutil.net_io_counters() -snetio = namedtuple('snetio', ['bytes_sent', 'bytes_recv', - 'packets_sent', 'packets_recv', - 'errin', 'errout', - 'dropin', 'dropout']) -# psutil.users() -suser = namedtuple('suser', ['name', 'terminal', 'host', 'started', 'pid']) -# psutil.net_connections() -sconn = namedtuple('sconn', ['fd', 'family', 'type', 'laddr', 'raddr', - 'status', 'pid']) -# psutil.net_if_addrs() -snicaddr = namedtuple('snicaddr', - ['family', 'address', 'netmask', 'broadcast', 'ptp']) -# psutil.net_if_stats() -snicstats = namedtuple('snicstats', ['isup', 'duplex', 'speed', 'mtu']) -# psutil.cpu_stats() -scpustats = namedtuple( - 'scpustats', ['ctx_switches', 'interrupts', 'soft_interrupts', 'syscalls']) -# psutil.cpu_freq() -scpufreq = namedtuple('scpufreq', ['current', 'min', 'max']) -# psutil.sensors_temperatures() -shwtemp = namedtuple( - 'shwtemp', ['label', 'current', 'high', 'critical']) -# psutil.sensors_battery() -sbattery = namedtuple('sbattery', ['percent', 'secsleft', 'power_plugged']) -# psutil.sensors_fans() -sfan = namedtuple('sfan', ['label', 'current']) - -# --- for Process methods - -# psutil.Process.cpu_times() -pcputimes = namedtuple('pcputimes', - ['user', 'system', 'children_user', 'children_system']) -# psutil.Process.open_files() -popenfile = namedtuple('popenfile', ['path', 'fd']) -# psutil.Process.threads() -pthread = namedtuple('pthread', ['id', 'user_time', 'system_time']) -# psutil.Process.uids() -puids = namedtuple('puids', ['real', 'effective', 'saved']) -# psutil.Process.gids() -pgids = namedtuple('pgids', ['real', 'effective', 'saved']) -# psutil.Process.io_counters() -pio = namedtuple('pio', ['read_count', 'write_count', - 'read_bytes', 'write_bytes']) -# psutil.Process.ionice() -pionice = namedtuple('pionice', ['ioclass', 'value']) -# psutil.Process.ctx_switches() -pctxsw = namedtuple('pctxsw', ['voluntary', 'involuntary']) -# psutil.Process.connections() -pconn = namedtuple('pconn', ['fd', 'family', 'type', 'laddr', 'raddr', - 'status']) - -# psutil.connections() and psutil.Process.connections() -addr = namedtuple('addr', ['ip', 'port']) - - -# =================================================================== -# --- Process.connections() 'kind' parameter mapping -# =================================================================== - - -conn_tmap = { - "all": ([AF_INET, AF_INET6, AF_UNIX], [SOCK_STREAM, SOCK_DGRAM]), - "tcp": ([AF_INET, AF_INET6], [SOCK_STREAM]), - "tcp4": ([AF_INET], [SOCK_STREAM]), - "udp": ([AF_INET, AF_INET6], [SOCK_DGRAM]), - "udp4": ([AF_INET], [SOCK_DGRAM]), - "inet": ([AF_INET, AF_INET6], [SOCK_STREAM, SOCK_DGRAM]), - "inet4": ([AF_INET], [SOCK_STREAM, SOCK_DGRAM]), - "inet6": ([AF_INET6], [SOCK_STREAM, SOCK_DGRAM]), -} - -if AF_INET6 is not None: - conn_tmap.update({ - "tcp6": ([AF_INET6], [SOCK_STREAM]), - "udp6": ([AF_INET6], [SOCK_DGRAM]), - }) - -if AF_UNIX is not None: - conn_tmap.update({ - "unix": ([AF_UNIX], [SOCK_STREAM, SOCK_DGRAM]), - }) - - -# ===================================================================== -# --- Exceptions -# ===================================================================== - - -class Error(Exception): - """Base exception class. All other psutil exceptions inherit - from this one. - """ - __module__ = 'psutil' - - def __init__(self, msg=""): - Exception.__init__(self, msg) - self.msg = msg - - def __repr__(self): - ret = "psutil.%s %s" % (self.__class__.__name__, self.msg) - return ret.strip() - - __str__ = __repr__ - - -class NoSuchProcess(Error): - """Exception raised when a process with a certain PID doesn't - or no longer exists. - """ - __module__ = 'psutil' - - def __init__(self, pid, name=None, msg=None): - Error.__init__(self, msg) - self.pid = pid - self.name = name - self.msg = msg - if msg is None: - if name: - details = "(pid=%s, name=%s)" % (self.pid, repr(self.name)) - else: - details = "(pid=%s)" % self.pid - self.msg = "process no longer exists " + details - - def __path__(self): - return 'xxx' - - -class ZombieProcess(NoSuchProcess): - """Exception raised when querying a zombie process. This is - raised on macOS, BSD and Solaris only, and not always: depending - on the query the OS may be able to succeed anyway. - On Linux all zombie processes are querable (hence this is never - raised). Windows doesn't have zombie processes. - """ - __module__ = 'psutil' - - def __init__(self, pid, name=None, ppid=None, msg=None): - NoSuchProcess.__init__(self, msg) - self.pid = pid - self.ppid = ppid - self.name = name - self.msg = msg - if msg is None: - args = ["pid=%s" % pid] - if name: - args.append("name=%s" % repr(self.name)) - if ppid: - args.append("ppid=%s" % self.ppid) - details = "(%s)" % ", ".join(args) - self.msg = "process still exists but it's a zombie " + details - - -class AccessDenied(Error): - """Exception raised when permission to perform an action is denied.""" - __module__ = 'psutil' - - def __init__(self, pid=None, name=None, msg=None): - Error.__init__(self, msg) - self.pid = pid - self.name = name - self.msg = msg - if msg is None: - if (pid is not None) and (name is not None): - self.msg = "(pid=%s, name=%s)" % (pid, repr(name)) - elif (pid is not None): - self.msg = "(pid=%s)" % self.pid - else: - self.msg = "" - - -class TimeoutExpired(Error): - """Raised on Process.wait(timeout) if timeout expires and process - is still alive. - """ - __module__ = 'psutil' - - def __init__(self, seconds, pid=None, name=None): - Error.__init__(self, "timeout after %s seconds" % seconds) - self.seconds = seconds - self.pid = pid - self.name = name - if (pid is not None) and (name is not None): - self.msg += " (pid=%s, name=%s)" % (pid, repr(name)) - elif (pid is not None): - self.msg += " (pid=%s)" % self.pid - - -# =================================================================== -# --- utils -# =================================================================== - - -def usage_percent(used, total, round_=None): - """Calculate percentage usage of 'used' against 'total'.""" - try: - ret = (float(used) / total) * 100 - except ZeroDivisionError: - return 0.0 - else: - if round_ is not None: - ret = round(ret, round_) - return ret - - -def memoize(fun): - """A simple memoize decorator for functions supporting (hashable) - positional arguments. - It also provides a cache_clear() function for clearing the cache: - - >>> @memoize - ... def foo() - ... return 1 - ... - >>> foo() - 1 - >>> foo.cache_clear() - >>> - """ - @functools.wraps(fun) - def wrapper(*args, **kwargs): - key = (args, frozenset(sorted(kwargs.items()))) - try: - return cache[key] - except KeyError: - ret = cache[key] = fun(*args, **kwargs) - return ret - - def cache_clear(): - """Clear cache.""" - cache.clear() - - cache = {} - wrapper.cache_clear = cache_clear - return wrapper - - -def memoize_when_activated(fun): - """A memoize decorator which is disabled by default. It can be - activated and deactivated on request. - For efficiency reasons it can be used only against class methods - accepting no arguments. - - >>> class Foo: - ... @memoize - ... def foo() - ... print(1) - ... - >>> f = Foo() - >>> # deactivated (default) - >>> foo() - 1 - >>> foo() - 1 - >>> - >>> # activated - >>> foo.cache_activate(self) - >>> foo() - 1 - >>> foo() - >>> foo() - >>> - """ - @functools.wraps(fun) - def wrapper(self): - try: - # case 1: we previously entered oneshot() ctx - ret = self._cache[fun] - except AttributeError: - # case 2: we never entered oneshot() ctx - return fun(self) - except KeyError: - # case 3: we entered oneshot() ctx but there's no cache - # for this entry yet - ret = self._cache[fun] = fun(self) - return ret - - def cache_activate(proc): - """Activate cache. Expects a Process instance. Cache will be - stored as a "_cache" instance attribute.""" - proc._cache = {} - - def cache_deactivate(proc): - """Deactivate and clear cache.""" - try: - del proc._cache - except AttributeError: - pass - - wrapper.cache_activate = cache_activate - wrapper.cache_deactivate = cache_deactivate - return wrapper - - -def isfile_strict(path): - """Same as os.path.isfile() but does not swallow EACCES / EPERM - exceptions, see: - http://mail.python.org/pipermail/python-dev/2012-June/120787.html - """ - try: - st = os.stat(path) - except OSError as err: - if err.errno in (errno.EPERM, errno.EACCES): - raise - return False - else: - return stat.S_ISREG(st.st_mode) - - -def path_exists_strict(path): - """Same as os.path.exists() but does not swallow EACCES / EPERM - exceptions, see: - http://mail.python.org/pipermail/python-dev/2012-June/120787.html - """ - try: - os.stat(path) - except OSError as err: - if err.errno in (errno.EPERM, errno.EACCES): - raise - return False - else: - return True - - -@memoize -def supports_ipv6(): - """Return True if IPv6 is supported on this platform.""" - if not socket.has_ipv6 or AF_INET6 is None: - return False - try: - sock = socket.socket(AF_INET6, socket.SOCK_STREAM) - with contextlib.closing(sock): - sock.bind(("::1", 0)) - return True - except socket.error: - return False - - -def parse_environ_block(data): - """Parse a C environ block of environment variables into a dictionary.""" - # The block is usually raw data from the target process. It might contain - # trailing garbage and lines that do not look like assignments. - ret = {} - pos = 0 - - # localize global variable to speed up access. - WINDOWS_ = WINDOWS - while True: - next_pos = data.find("\0", pos) - # nul byte at the beginning or double nul byte means finish - if next_pos <= pos: - break - # there might not be an equals sign - equal_pos = data.find("=", pos, next_pos) - if equal_pos > pos: - key = data[pos:equal_pos] - value = data[equal_pos + 1:next_pos] - # Windows expects environment variables to be uppercase only - if WINDOWS_: - key = key.upper() - ret[key] = value - pos = next_pos + 1 - - return ret - - -def sockfam_to_enum(num): - """Convert a numeric socket family value to an IntEnum member. - If it's not a known member, return the numeric value itself. - """ - if enum is None: - return num - else: # pragma: no cover - try: - return socket.AddressFamily(num) - except ValueError: - return num - - -def socktype_to_enum(num): - """Convert a numeric socket type value to an IntEnum member. - If it's not a known member, return the numeric value itself. - """ - if enum is None: - return num - else: # pragma: no cover - try: - return socket.SocketKind(num) - except ValueError: - return num - - -def conn_to_ntuple(fd, fam, type_, laddr, raddr, status, status_map, pid=None): - """Convert a raw connection tuple to a proper ntuple.""" - if fam in (socket.AF_INET, AF_INET6): - if laddr: - laddr = addr(*laddr) - if raddr: - raddr = addr(*raddr) - if type_ == socket.SOCK_STREAM and fam in (AF_INET, AF_INET6): - status = status_map.get(status, CONN_NONE) - else: - status = CONN_NONE # ignore whatever C returned to us - fam = sockfam_to_enum(fam) - type_ = socktype_to_enum(type_) - if pid is None: - return pconn(fd, fam, type_, laddr, raddr, status) - else: - return sconn(fd, fam, type_, laddr, raddr, status, pid) - - -def deprecated_method(replacement): - """A decorator which can be used to mark a method as deprecated - 'replcement' is the method name which will be called instead. - """ - def outer(fun): - msg = "%s() is deprecated and will be removed; use %s() instead" % ( - fun.__name__, replacement) - if fun.__doc__ is None: - fun.__doc__ = msg - - @functools.wraps(fun) - def inner(self, *args, **kwargs): - warnings.warn(msg, category=DeprecationWarning, stacklevel=2) - return getattr(self, replacement)(*args, **kwargs) - return inner - return outer - - -class _WrapNumbers: - """Watches numbers so that they don't overflow and wrap - (reset to zero). - """ - - def __init__(self): - self.lock = threading.Lock() - self.cache = {} - self.reminders = {} - self.reminder_keys = {} - - def _add_dict(self, input_dict, name): - assert name not in self.cache - assert name not in self.reminders - assert name not in self.reminder_keys - self.cache[name] = input_dict - self.reminders[name] = defaultdict(int) - self.reminder_keys[name] = defaultdict(set) - - def _remove_dead_reminders(self, input_dict, name): - """In case the number of keys changed between calls (e.g. a - disk disappears) this removes the entry from self.reminders. - """ - old_dict = self.cache[name] - gone_keys = set(old_dict.keys()) - set(input_dict.keys()) - for gone_key in gone_keys: - for remkey in self.reminder_keys[name][gone_key]: - del self.reminders[name][remkey] - del self.reminder_keys[name][gone_key] - - def run(self, input_dict, name): - """Cache dict and sum numbers which overflow and wrap. - Return an updated copy of `input_dict` - """ - if name not in self.cache: - # This was the first call. - self._add_dict(input_dict, name) - return input_dict - - self._remove_dead_reminders(input_dict, name) - - old_dict = self.cache[name] - new_dict = {} - for key in input_dict.keys(): - input_tuple = input_dict[key] - try: - old_tuple = old_dict[key] - except KeyError: - # The input dict has a new key (e.g. a new disk or NIC) - # which didn't exist in the previous call. - new_dict[key] = input_tuple - continue - - bits = [] - for i in range(len(input_tuple)): - input_value = input_tuple[i] - old_value = old_tuple[i] - remkey = (key, i) - if input_value < old_value: - # it wrapped! - self.reminders[name][remkey] += old_value - self.reminder_keys[name][key].add(remkey) - bits.append(input_value + self.reminders[name][remkey]) - - new_dict[key] = tuple(bits) - - self.cache[name] = input_dict - return new_dict - - def cache_clear(self, name=None): - """Clear the internal cache, optionally only for function 'name'.""" - with self.lock: - if name is None: - self.cache.clear() - self.reminders.clear() - self.reminder_keys.clear() - else: - self.cache.pop(name, None) - self.reminders.pop(name, None) - self.reminder_keys.pop(name, None) - - def cache_info(self): - """Return internal cache dicts as a tuple of 3 elements.""" - with self.lock: - return (self.cache, self.reminders, self.reminder_keys) - - -def wrap_numbers(input_dict, name): - """Given an `input_dict` and a function `name`, adjust the numbers - which "wrap" (restart from zero) across different calls by adding - "old value" to "new value" and return an updated dict. - """ - with _wn.lock: - return _wn.run(input_dict, name) - - -_wn = _WrapNumbers() -wrap_numbers.cache_clear = _wn.cache_clear -wrap_numbers.cache_info = _wn.cache_info - - -def open_binary(fname, **kwargs): - return open(fname, "rb", **kwargs) - - -def open_text(fname, **kwargs): - """On Python 3 opens a file in text mode by using fs encoding and - a proper en/decoding errors handler. - On Python 2 this is just an alias for open(name, 'rt'). - """ - if PY3: - # See: - # https://github.com/giampaolo/psutil/issues/675 - # https://github.com/giampaolo/psutil/pull/733 - kwargs.setdefault('encoding', ENCODING) - kwargs.setdefault('errors', ENCODING_ERRS) - return open(fname, "rt", **kwargs) - - -def bytes2human(n, format="%(value).1f%(symbol)s"): - """Used by various scripts. See: - http://goo.gl/zeJZl - - >>> bytes2human(10000) - '9.8K' - >>> bytes2human(100001221) - '95.4M' - """ - symbols = ('B', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y') - prefix = {} - for i, s in enumerate(symbols[1:]): - prefix[s] = 1 << (i + 1) * 10 - for symbol in reversed(symbols[1:]): - if n >= prefix[symbol]: - value = float(n) / prefix[symbol] - return format % locals() - return format % dict(symbol=symbols[0], value=n) - - -def get_procfs_path(): - """Return updated psutil.PROCFS_PATH constant.""" - return sys.modules['psutil'].PROCFS_PATH - - -if PY3: - def decode(s): - return s.decode(encoding=ENCODING, errors=ENCODING_ERRS) -else: - def decode(s): - return s - - -# ===================================================================== -# --- shell utils -# ===================================================================== - - -@memoize -def term_supports_colors(file=sys.stdout): - if os.name == 'nt': - return True - try: - import curses - assert file.isatty() - curses.setupterm() - assert curses.tigetnum("colors") > 0 - except Exception: - return False - else: - return True - - -def hilite(s, color="green", bold=False): - """Return an highlighted version of 'string'.""" - if not term_supports_colors(): - return s - attr = [] - colors = dict(green='32', red='91', brown='33') - colors[None] = '29' - try: - color = colors[color] - except KeyError: - raise ValueError("invalid color %r; choose between %s" % ( - list(colors.keys()))) - attr.append(color) - if bold: - attr.append('1') - return '\x1b[%sm%s\x1b[0m' % (';'.join(attr), s) - - -def print_color(s, color="green", bold=False, file=sys.stdout): - """Print a colorized version of string.""" - if not term_supports_colors(): - print(s, file=file) - elif POSIX: - print(hilite(s, color, bold), file=file) - else: - import ctypes - - DEFAULT_COLOR = 7 - GetStdHandle = ctypes.windll.Kernel32.GetStdHandle - SetConsoleTextAttribute = \ - ctypes.windll.Kernel32.SetConsoleTextAttribute - - colors = dict(green=2, red=4, brown=6) - colors[None] = DEFAULT_COLOR - try: - color = colors[color] - except KeyError: - raise ValueError("invalid color %r; choose between %r" % ( - color, list(colors.keys()))) - if bold and color <= 7: - color += 8 - - handle_id = -12 if file is sys.stderr else -11 - GetStdHandle.restype = ctypes.c_ulong - handle = GetStdHandle(handle_id) - SetConsoleTextAttribute(handle, color) - try: - print(s, file=file) - finally: - SetConsoleTextAttribute(handle, DEFAULT_COLOR) - - -if bool(os.getenv('PSUTIL_DEBUG', 0)): - import inspect - - def debug(msg): - """If PSUTIL_DEBUG env var is set, print a debug message to stderr.""" - fname, lineno, func_name, lines, index = inspect.getframeinfo( - inspect.currentframe().f_back) - print("psutil-debug [%s:%s]> %s" % (fname, lineno, msg), - file=sys.stderr) -else: - def debug(msg): - pass
