comparison env/lib/python3.9/site-packages/psutil/_compat.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 """Module which provides compatibility with older Python versions.
6 This is more future-compatible rather than the opposite (prefer latest
7 Python 3 way of doing things).
8 """
9
10 import collections
11 import errno
12 import functools
13 import os
14 import sys
15 import types
16
17 __all__ = [
18 # constants
19 "PY3",
20 # builtins
21 "long", "range", "super", "unicode", "basestring",
22 # literals
23 "u", "b",
24 # collections module
25 "lru_cache",
26 # shutil module
27 "which", "get_terminal_size",
28 # python 3 exceptions
29 "FileNotFoundError", "PermissionError", "ProcessLookupError",
30 "InterruptedError", "ChildProcessError", "FileExistsError"]
31
32
33 PY3 = sys.version_info[0] == 3
34 _SENTINEL = object()
35
36 if PY3:
37 long = int
38 xrange = range
39 unicode = str
40 basestring = str
41 range = range
42
43 def u(s):
44 return s
45
46 def b(s):
47 return s.encode("latin-1")
48 else:
49 long = long
50 range = xrange
51 unicode = unicode
52 basestring = basestring
53
54 def u(s):
55 return unicode(s, "unicode_escape")
56
57 def b(s):
58 return s
59
60
61 # --- builtins
62
63
64 # Python 3 super().
65 # Taken from "future" package.
66 # Credit: Ryan Kelly
67 if PY3:
68 super = super
69 else:
70 _builtin_super = super
71
72 def super(type_=_SENTINEL, type_or_obj=_SENTINEL, framedepth=1):
73 """Like Python 3 builtin super(). If called without any arguments
74 it attempts to infer them at runtime.
75 """
76 if type_ is _SENTINEL:
77 f = sys._getframe(framedepth)
78 try:
79 # Get the function's first positional argument.
80 type_or_obj = f.f_locals[f.f_code.co_varnames[0]]
81 except (IndexError, KeyError):
82 raise RuntimeError('super() used in a function with no args')
83 try:
84 # Get the MRO so we can crawl it.
85 mro = type_or_obj.__mro__
86 except (AttributeError, RuntimeError):
87 try:
88 mro = type_or_obj.__class__.__mro__
89 except AttributeError:
90 raise RuntimeError('super() used in a non-newstyle class')
91 for type_ in mro:
92 # Find the class that owns the currently-executing method.
93 for meth in type_.__dict__.values():
94 # Drill down through any wrappers to the underlying func.
95 # This handles e.g. classmethod() and staticmethod().
96 try:
97 while not isinstance(meth, types.FunctionType):
98 if isinstance(meth, property):
99 # Calling __get__ on the property will invoke
100 # user code which might throw exceptions or
101 # have side effects
102 meth = meth.fget
103 else:
104 try:
105 meth = meth.__func__
106 except AttributeError:
107 meth = meth.__get__(type_or_obj, type_)
108 except (AttributeError, TypeError):
109 continue
110 if meth.func_code is f.f_code:
111 break # found
112 else:
113 # Not found. Move onto the next class in MRO.
114 continue
115 break # found
116 else:
117 raise RuntimeError('super() called outside a method')
118
119 # Dispatch to builtin super().
120 if type_or_obj is not _SENTINEL:
121 return _builtin_super(type_, type_or_obj)
122 return _builtin_super(type_)
123
124
125 # --- exceptions
126
127
128 if PY3:
129 FileNotFoundError = FileNotFoundError # NOQA
130 PermissionError = PermissionError # NOQA
131 ProcessLookupError = ProcessLookupError # NOQA
132 InterruptedError = InterruptedError # NOQA
133 ChildProcessError = ChildProcessError # NOQA
134 FileExistsError = FileExistsError # NOQA
135 else:
136 # https://github.com/PythonCharmers/python-future/blob/exceptions/
137 # src/future/types/exceptions/pep3151.py
138 import platform
139
140 def _instance_checking_exception(base_exception=Exception):
141 def wrapped(instance_checker):
142 class TemporaryClass(base_exception):
143
144 def __init__(self, *args, **kwargs):
145 if len(args) == 1 and isinstance(args[0], TemporaryClass):
146 unwrap_me = args[0]
147 for attr in dir(unwrap_me):
148 if not attr.startswith('__'):
149 setattr(self, attr, getattr(unwrap_me, attr))
150 else:
151 super(TemporaryClass, self).__init__(*args, **kwargs)
152
153 class __metaclass__(type):
154 def __instancecheck__(cls, inst):
155 return instance_checker(inst)
156
157 def __subclasscheck__(cls, classinfo):
158 value = sys.exc_info()[1]
159 return isinstance(value, cls)
160
161 TemporaryClass.__name__ = instance_checker.__name__
162 TemporaryClass.__doc__ = instance_checker.__doc__
163 return TemporaryClass
164
165 return wrapped
166
167 @_instance_checking_exception(EnvironmentError)
168 def FileNotFoundError(inst):
169 return getattr(inst, 'errno', _SENTINEL) == errno.ENOENT
170
171 @_instance_checking_exception(EnvironmentError)
172 def ProcessLookupError(inst):
173 return getattr(inst, 'errno', _SENTINEL) == errno.ESRCH
174
175 @_instance_checking_exception(EnvironmentError)
176 def PermissionError(inst):
177 return getattr(inst, 'errno', _SENTINEL) in (
178 errno.EACCES, errno.EPERM)
179
180 @_instance_checking_exception(EnvironmentError)
181 def InterruptedError(inst):
182 return getattr(inst, 'errno', _SENTINEL) == errno.EINTR
183
184 @_instance_checking_exception(EnvironmentError)
185 def ChildProcessError(inst):
186 return getattr(inst, 'errno', _SENTINEL) == errno.ECHILD
187
188 @_instance_checking_exception(EnvironmentError)
189 def FileExistsError(inst):
190 return getattr(inst, 'errno', _SENTINEL) == errno.EEXIST
191
192 if platform.python_implementation() != "CPython":
193 try:
194 raise OSError(errno.EEXIST, "perm")
195 except FileExistsError:
196 pass
197 except OSError:
198 raise RuntimeError(
199 "broken or incompatible Python implementation, see: "
200 "https://github.com/giampaolo/psutil/issues/1659")
201
202
203 # --- stdlib additions
204
205
206 # py 3.2 functools.lru_cache
207 # Taken from: http://code.activestate.com/recipes/578078
208 # Credit: Raymond Hettinger
209 try:
210 from functools import lru_cache
211 except ImportError:
212 try:
213 from threading import RLock
214 except ImportError:
215 from dummy_threading import RLock
216
217 _CacheInfo = collections.namedtuple(
218 "CacheInfo", ["hits", "misses", "maxsize", "currsize"])
219
220 class _HashedSeq(list):
221 __slots__ = 'hashvalue'
222
223 def __init__(self, tup, hash=hash):
224 self[:] = tup
225 self.hashvalue = hash(tup)
226
227 def __hash__(self):
228 return self.hashvalue
229
230 def _make_key(args, kwds, typed,
231 kwd_mark=(object(), ),
232 fasttypes=set((int, str, frozenset, type(None))),
233 sorted=sorted, tuple=tuple, type=type, len=len):
234 key = args
235 if kwds:
236 sorted_items = sorted(kwds.items())
237 key += kwd_mark
238 for item in sorted_items:
239 key += item
240 if typed:
241 key += tuple(type(v) for v in args)
242 if kwds:
243 key += tuple(type(v) for k, v in sorted_items)
244 elif len(key) == 1 and type(key[0]) in fasttypes:
245 return key[0]
246 return _HashedSeq(key)
247
248 def lru_cache(maxsize=100, typed=False):
249 """Least-recently-used cache decorator, see:
250 http://docs.python.org/3/library/functools.html#functools.lru_cache
251 """
252 def decorating_function(user_function):
253 cache = dict()
254 stats = [0, 0]
255 HITS, MISSES = 0, 1
256 make_key = _make_key
257 cache_get = cache.get
258 _len = len
259 lock = RLock()
260 root = []
261 root[:] = [root, root, None, None]
262 nonlocal_root = [root]
263 PREV, NEXT, KEY, RESULT = 0, 1, 2, 3
264 if maxsize == 0:
265 def wrapper(*args, **kwds):
266 result = user_function(*args, **kwds)
267 stats[MISSES] += 1
268 return result
269 elif maxsize is None:
270 def wrapper(*args, **kwds):
271 key = make_key(args, kwds, typed)
272 result = cache_get(key, root)
273 if result is not root:
274 stats[HITS] += 1
275 return result
276 result = user_function(*args, **kwds)
277 cache[key] = result
278 stats[MISSES] += 1
279 return result
280 else:
281 def wrapper(*args, **kwds):
282 if kwds or typed:
283 key = make_key(args, kwds, typed)
284 else:
285 key = args
286 lock.acquire()
287 try:
288 link = cache_get(key)
289 if link is not None:
290 root, = nonlocal_root
291 link_prev, link_next, key, result = link
292 link_prev[NEXT] = link_next
293 link_next[PREV] = link_prev
294 last = root[PREV]
295 last[NEXT] = root[PREV] = link
296 link[PREV] = last
297 link[NEXT] = root
298 stats[HITS] += 1
299 return result
300 finally:
301 lock.release()
302 result = user_function(*args, **kwds)
303 lock.acquire()
304 try:
305 root, = nonlocal_root
306 if key in cache:
307 pass
308 elif _len(cache) >= maxsize:
309 oldroot = root
310 oldroot[KEY] = key
311 oldroot[RESULT] = result
312 root = nonlocal_root[0] = oldroot[NEXT]
313 oldkey = root[KEY]
314 root[KEY] = root[RESULT] = None
315 del cache[oldkey]
316 cache[key] = oldroot
317 else:
318 last = root[PREV]
319 link = [last, root, key, result]
320 last[NEXT] = root[PREV] = cache[key] = link
321 stats[MISSES] += 1
322 finally:
323 lock.release()
324 return result
325
326 def cache_info():
327 """Report cache statistics"""
328 lock.acquire()
329 try:
330 return _CacheInfo(stats[HITS], stats[MISSES], maxsize,
331 len(cache))
332 finally:
333 lock.release()
334
335 def cache_clear():
336 """Clear the cache and cache statistics"""
337 lock.acquire()
338 try:
339 cache.clear()
340 root = nonlocal_root[0]
341 root[:] = [root, root, None, None]
342 stats[:] = [0, 0]
343 finally:
344 lock.release()
345
346 wrapper.__wrapped__ = user_function
347 wrapper.cache_info = cache_info
348 wrapper.cache_clear = cache_clear
349 return functools.update_wrapper(wrapper, user_function)
350
351 return decorating_function
352
353
354 # python 3.3
355 try:
356 from shutil import which
357 except ImportError:
358 def which(cmd, mode=os.F_OK | os.X_OK, path=None):
359 """Given a command, mode, and a PATH string, return the path which
360 conforms to the given mode on the PATH, or None if there is no such
361 file.
362
363 `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result
364 of os.environ.get("PATH"), or can be overridden with a custom search
365 path.
366 """
367 def _access_check(fn, mode):
368 return (os.path.exists(fn) and os.access(fn, mode) and
369 not os.path.isdir(fn))
370
371 if os.path.dirname(cmd):
372 if _access_check(cmd, mode):
373 return cmd
374 return None
375
376 if path is None:
377 path = os.environ.get("PATH", os.defpath)
378 if not path:
379 return None
380 path = path.split(os.pathsep)
381
382 if sys.platform == "win32":
383 if os.curdir not in path:
384 path.insert(0, os.curdir)
385
386 pathext = os.environ.get("PATHEXT", "").split(os.pathsep)
387 if any(cmd.lower().endswith(ext.lower()) for ext in pathext):
388 files = [cmd]
389 else:
390 files = [cmd + ext for ext in pathext]
391 else:
392 files = [cmd]
393
394 seen = set()
395 for dir in path:
396 normdir = os.path.normcase(dir)
397 if normdir not in seen:
398 seen.add(normdir)
399 for thefile in files:
400 name = os.path.join(dir, thefile)
401 if _access_check(name, mode):
402 return name
403 return None
404
405
406 # python 3.3
407 try:
408 from shutil import get_terminal_size
409 except ImportError:
410 def get_terminal_size(fallback=(80, 24)):
411 try:
412 import fcntl
413 import termios
414 import struct
415 except ImportError:
416 return fallback
417 else:
418 try:
419 # This should work on Linux.
420 res = struct.unpack(
421 'hh', fcntl.ioctl(1, termios.TIOCGWINSZ, '1234'))
422 return (res[1], res[0])
423 except Exception:
424 return fallback