Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/psutil/tests/test_contracts.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
| author | shellac |
|---|---|
| date | Sat, 02 May 2020 07:14:21 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:26e78fe6e8c4 |
|---|---|
| 1 #!/usr/bin/env python3 | |
| 2 | |
| 3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. | |
| 4 # Use of this source code is governed by a BSD-style license that can be | |
| 5 # found in the LICENSE file. | |
| 6 | |
| 7 """Contracts tests. These tests mainly check API sanity in terms of | |
| 8 returned types and APIs availability. | |
| 9 Some of these are duplicates of tests test_system.py and test_process.py | |
| 10 """ | |
| 11 | |
| 12 import errno | |
| 13 import os | |
| 14 import stat | |
| 15 import time | |
| 16 import traceback | |
| 17 | |
| 18 from psutil import AIX | |
| 19 from psutil import BSD | |
| 20 from psutil import FREEBSD | |
| 21 from psutil import LINUX | |
| 22 from psutil import MACOS | |
| 23 from psutil import NETBSD | |
| 24 from psutil import OPENBSD | |
| 25 from psutil import OSX | |
| 26 from psutil import POSIX | |
| 27 from psutil import SUNOS | |
| 28 from psutil import WINDOWS | |
| 29 from psutil._compat import long | |
| 30 from psutil.tests import create_sockets | |
| 31 from psutil.tests import enum | |
| 32 from psutil.tests import get_kernel_version | |
| 33 from psutil.tests import HAS_CPU_FREQ | |
| 34 from psutil.tests import HAS_NET_IO_COUNTERS | |
| 35 from psutil.tests import HAS_RLIMIT | |
| 36 from psutil.tests import HAS_SENSORS_FANS | |
| 37 from psutil.tests import HAS_SENSORS_TEMPERATURES | |
| 38 from psutil.tests import is_namedtuple | |
| 39 from psutil.tests import safe_rmpath | |
| 40 from psutil.tests import SKIP_SYSCONS | |
| 41 from psutil.tests import TESTFN | |
| 42 from psutil.tests import unittest | |
| 43 from psutil.tests import VALID_PROC_STATUSES | |
| 44 from psutil.tests import warn | |
| 45 import psutil | |
| 46 | |
| 47 | |
| 48 # =================================================================== | |
| 49 # --- APIs availability | |
| 50 # =================================================================== | |
| 51 | |
| 52 # Make sure code reflects what doc promises in terms of APIs | |
| 53 # availability. | |
| 54 | |
| 55 class TestAvailConstantsAPIs(unittest.TestCase): | |
| 56 | |
| 57 def test_PROCFS_PATH(self): | |
| 58 self.assertEqual(hasattr(psutil, "PROCFS_PATH"), | |
| 59 LINUX or SUNOS or AIX) | |
| 60 | |
| 61 def test_win_priority(self): | |
| 62 ae = self.assertEqual | |
| 63 ae(hasattr(psutil, "ABOVE_NORMAL_PRIORITY_CLASS"), WINDOWS) | |
| 64 ae(hasattr(psutil, "BELOW_NORMAL_PRIORITY_CLASS"), WINDOWS) | |
| 65 ae(hasattr(psutil, "HIGH_PRIORITY_CLASS"), WINDOWS) | |
| 66 ae(hasattr(psutil, "IDLE_PRIORITY_CLASS"), WINDOWS) | |
| 67 ae(hasattr(psutil, "NORMAL_PRIORITY_CLASS"), WINDOWS) | |
| 68 ae(hasattr(psutil, "REALTIME_PRIORITY_CLASS"), WINDOWS) | |
| 69 | |
| 70 def test_linux_ioprio_linux(self): | |
| 71 ae = self.assertEqual | |
| 72 ae(hasattr(psutil, "IOPRIO_CLASS_NONE"), LINUX) | |
| 73 ae(hasattr(psutil, "IOPRIO_CLASS_RT"), LINUX) | |
| 74 ae(hasattr(psutil, "IOPRIO_CLASS_BE"), LINUX) | |
| 75 ae(hasattr(psutil, "IOPRIO_CLASS_IDLE"), LINUX) | |
| 76 | |
| 77 def test_linux_ioprio_windows(self): | |
| 78 ae = self.assertEqual | |
| 79 ae(hasattr(psutil, "IOPRIO_HIGH"), WINDOWS) | |
| 80 ae(hasattr(psutil, "IOPRIO_NORMAL"), WINDOWS) | |
| 81 ae(hasattr(psutil, "IOPRIO_LOW"), WINDOWS) | |
| 82 ae(hasattr(psutil, "IOPRIO_VERYLOW"), WINDOWS) | |
| 83 | |
| 84 def test_linux_rlimit(self): | |
| 85 ae = self.assertEqual | |
| 86 hasit = LINUX and get_kernel_version() >= (2, 6, 36) | |
| 87 ae(hasattr(psutil.Process, "rlimit"), hasit) | |
| 88 ae(hasattr(psutil, "RLIM_INFINITY"), hasit) | |
| 89 ae(hasattr(psutil, "RLIMIT_AS"), hasit) | |
| 90 ae(hasattr(psutil, "RLIMIT_CORE"), hasit) | |
| 91 ae(hasattr(psutil, "RLIMIT_CPU"), hasit) | |
| 92 ae(hasattr(psutil, "RLIMIT_DATA"), hasit) | |
| 93 ae(hasattr(psutil, "RLIMIT_FSIZE"), hasit) | |
| 94 ae(hasattr(psutil, "RLIMIT_LOCKS"), hasit) | |
| 95 ae(hasattr(psutil, "RLIMIT_MEMLOCK"), hasit) | |
| 96 ae(hasattr(psutil, "RLIMIT_NOFILE"), hasit) | |
| 97 ae(hasattr(psutil, "RLIMIT_NPROC"), hasit) | |
| 98 ae(hasattr(psutil, "RLIMIT_RSS"), hasit) | |
| 99 ae(hasattr(psutil, "RLIMIT_STACK"), hasit) | |
| 100 | |
| 101 hasit = LINUX and get_kernel_version() >= (3, 0) | |
| 102 ae(hasattr(psutil, "RLIMIT_MSGQUEUE"), hasit) | |
| 103 ae(hasattr(psutil, "RLIMIT_NICE"), hasit) | |
| 104 ae(hasattr(psutil, "RLIMIT_RTPRIO"), hasit) | |
| 105 ae(hasattr(psutil, "RLIMIT_RTTIME"), hasit) | |
| 106 ae(hasattr(psutil, "RLIMIT_SIGPENDING"), hasit) | |
| 107 | |
| 108 | |
| 109 class TestAvailSystemAPIs(unittest.TestCase): | |
| 110 | |
| 111 def test_win_service_iter(self): | |
| 112 self.assertEqual(hasattr(psutil, "win_service_iter"), WINDOWS) | |
| 113 | |
| 114 def test_win_service_get(self): | |
| 115 self.assertEqual(hasattr(psutil, "win_service_get"), WINDOWS) | |
| 116 | |
| 117 def test_cpu_freq(self): | |
| 118 linux = (LINUX and | |
| 119 (os.path.exists("/sys/devices/system/cpu/cpufreq") or | |
| 120 os.path.exists("/sys/devices/system/cpu/cpu0/cpufreq"))) | |
| 121 self.assertEqual(hasattr(psutil, "cpu_freq"), | |
| 122 linux or MACOS or WINDOWS or FREEBSD) | |
| 123 | |
| 124 def test_sensors_temperatures(self): | |
| 125 self.assertEqual( | |
| 126 hasattr(psutil, "sensors_temperatures"), LINUX or FREEBSD) | |
| 127 | |
| 128 def test_sensors_fans(self): | |
| 129 self.assertEqual(hasattr(psutil, "sensors_fans"), LINUX) | |
| 130 | |
| 131 def test_battery(self): | |
| 132 self.assertEqual(hasattr(psutil, "sensors_battery"), | |
| 133 LINUX or WINDOWS or FREEBSD or MACOS) | |
| 134 | |
| 135 | |
| 136 class TestAvailProcessAPIs(unittest.TestCase): | |
| 137 | |
| 138 def test_environ(self): | |
| 139 self.assertEqual(hasattr(psutil.Process, "environ"), | |
| 140 LINUX or MACOS or WINDOWS or AIX or SUNOS) | |
| 141 | |
| 142 def test_uids(self): | |
| 143 self.assertEqual(hasattr(psutil.Process, "uids"), POSIX) | |
| 144 | |
| 145 def test_gids(self): | |
| 146 self.assertEqual(hasattr(psutil.Process, "uids"), POSIX) | |
| 147 | |
| 148 def test_terminal(self): | |
| 149 self.assertEqual(hasattr(psutil.Process, "terminal"), POSIX) | |
| 150 | |
| 151 def test_ionice(self): | |
| 152 self.assertEqual(hasattr(psutil.Process, "ionice"), LINUX or WINDOWS) | |
| 153 | |
| 154 def test_rlimit(self): | |
| 155 self.assertEqual(hasattr(psutil.Process, "rlimit"), LINUX) | |
| 156 | |
| 157 def test_io_counters(self): | |
| 158 hasit = hasattr(psutil.Process, "io_counters") | |
| 159 self.assertEqual(hasit, False if MACOS or SUNOS else True) | |
| 160 | |
| 161 def test_num_fds(self): | |
| 162 self.assertEqual(hasattr(psutil.Process, "num_fds"), POSIX) | |
| 163 | |
| 164 def test_num_handles(self): | |
| 165 self.assertEqual(hasattr(psutil.Process, "num_handles"), WINDOWS) | |
| 166 | |
| 167 def test_cpu_affinity(self): | |
| 168 self.assertEqual(hasattr(psutil.Process, "cpu_affinity"), | |
| 169 LINUX or WINDOWS or FREEBSD) | |
| 170 | |
| 171 def test_cpu_num(self): | |
| 172 self.assertEqual(hasattr(psutil.Process, "cpu_num"), | |
| 173 LINUX or FREEBSD or SUNOS) | |
| 174 | |
| 175 def test_memory_maps(self): | |
| 176 hasit = hasattr(psutil.Process, "memory_maps") | |
| 177 self.assertEqual( | |
| 178 hasit, False if OPENBSD or NETBSD or AIX or MACOS else True) | |
| 179 | |
| 180 | |
| 181 # =================================================================== | |
| 182 # --- System API types | |
| 183 # =================================================================== | |
| 184 | |
| 185 | |
| 186 class TestSystemAPITypes(unittest.TestCase): | |
| 187 """Check the return types of system related APIs. | |
| 188 Mainly we want to test we never return unicode on Python 2, see: | |
| 189 https://github.com/giampaolo/psutil/issues/1039 | |
| 190 """ | |
| 191 | |
| 192 @classmethod | |
| 193 def setUpClass(cls): | |
| 194 cls.proc = psutil.Process() | |
| 195 | |
| 196 def tearDown(self): | |
| 197 safe_rmpath(TESTFN) | |
| 198 | |
| 199 def assert_ntuple_of_nums(self, nt, type_=float, gezero=True): | |
| 200 assert is_namedtuple(nt) | |
| 201 for n in nt: | |
| 202 self.assertIsInstance(n, type_) | |
| 203 if gezero: | |
| 204 self.assertGreaterEqual(n, 0) | |
| 205 | |
| 206 def test_cpu_times(self): | |
| 207 self.assert_ntuple_of_nums(psutil.cpu_times()) | |
| 208 for nt in psutil.cpu_times(percpu=True): | |
| 209 self.assert_ntuple_of_nums(nt) | |
| 210 | |
| 211 def test_cpu_percent(self): | |
| 212 self.assertIsInstance(psutil.cpu_percent(interval=None), float) | |
| 213 self.assertIsInstance(psutil.cpu_percent(interval=0.00001), float) | |
| 214 | |
| 215 def test_cpu_times_percent(self): | |
| 216 self.assert_ntuple_of_nums(psutil.cpu_times_percent(interval=None)) | |
| 217 self.assert_ntuple_of_nums(psutil.cpu_times_percent(interval=0.0001)) | |
| 218 | |
| 219 def test_cpu_count(self): | |
| 220 self.assertIsInstance(psutil.cpu_count(), int) | |
| 221 | |
| 222 @unittest.skipIf(not HAS_CPU_FREQ, "not supported") | |
| 223 def test_cpu_freq(self): | |
| 224 if psutil.cpu_freq() is None: | |
| 225 raise self.skipTest("cpu_freq() returns None") | |
| 226 self.assert_ntuple_of_nums(psutil.cpu_freq(), type_=(float, int, long)) | |
| 227 | |
| 228 def test_disk_io_counters(self): | |
| 229 # Duplicate of test_system.py. Keep it anyway. | |
| 230 for k, v in psutil.disk_io_counters(perdisk=True).items(): | |
| 231 self.assertIsInstance(k, str) | |
| 232 self.assert_ntuple_of_nums(v, type_=(int, long)) | |
| 233 | |
| 234 def test_disk_partitions(self): | |
| 235 # Duplicate of test_system.py. Keep it anyway. | |
| 236 for disk in psutil.disk_partitions(): | |
| 237 self.assertIsInstance(disk.device, str) | |
| 238 self.assertIsInstance(disk.mountpoint, str) | |
| 239 self.assertIsInstance(disk.fstype, str) | |
| 240 self.assertIsInstance(disk.opts, str) | |
| 241 | |
| 242 @unittest.skipIf(SKIP_SYSCONS, "requires root") | |
| 243 def test_net_connections(self): | |
| 244 with create_sockets(): | |
| 245 ret = psutil.net_connections('all') | |
| 246 self.assertEqual(len(ret), len(set(ret))) | |
| 247 for conn in ret: | |
| 248 assert is_namedtuple(conn) | |
| 249 | |
| 250 def test_net_if_addrs(self): | |
| 251 # Duplicate of test_system.py. Keep it anyway. | |
| 252 for ifname, addrs in psutil.net_if_addrs().items(): | |
| 253 self.assertIsInstance(ifname, str) | |
| 254 for addr in addrs: | |
| 255 if enum is not None: | |
| 256 assert isinstance(addr.family, enum.IntEnum), addr | |
| 257 else: | |
| 258 assert isinstance(addr.family, int), addr | |
| 259 self.assertIsInstance(addr.address, str) | |
| 260 self.assertIsInstance(addr.netmask, (str, type(None))) | |
| 261 self.assertIsInstance(addr.broadcast, (str, type(None))) | |
| 262 | |
| 263 def test_net_if_stats(self): | |
| 264 # Duplicate of test_system.py. Keep it anyway. | |
| 265 for ifname, info in psutil.net_if_stats().items(): | |
| 266 self.assertIsInstance(ifname, str) | |
| 267 self.assertIsInstance(info.isup, bool) | |
| 268 if enum is not None: | |
| 269 self.assertIsInstance(info.duplex, enum.IntEnum) | |
| 270 else: | |
| 271 self.assertIsInstance(info.duplex, int) | |
| 272 self.assertIsInstance(info.speed, int) | |
| 273 self.assertIsInstance(info.mtu, int) | |
| 274 | |
| 275 @unittest.skipIf(not HAS_NET_IO_COUNTERS, 'not supported') | |
| 276 def test_net_io_counters(self): | |
| 277 # Duplicate of test_system.py. Keep it anyway. | |
| 278 for ifname, _ in psutil.net_io_counters(pernic=True).items(): | |
| 279 self.assertIsInstance(ifname, str) | |
| 280 | |
| 281 @unittest.skipIf(not HAS_SENSORS_FANS, "not supported") | |
| 282 def test_sensors_fans(self): | |
| 283 # Duplicate of test_system.py. Keep it anyway. | |
| 284 for name, units in psutil.sensors_fans().items(): | |
| 285 self.assertIsInstance(name, str) | |
| 286 for unit in units: | |
| 287 self.assertIsInstance(unit.label, str) | |
| 288 self.assertIsInstance(unit.current, (float, int, type(None))) | |
| 289 | |
| 290 @unittest.skipIf(not HAS_SENSORS_TEMPERATURES, "not supported") | |
| 291 def test_sensors_temperatures(self): | |
| 292 # Duplicate of test_system.py. Keep it anyway. | |
| 293 for name, units in psutil.sensors_temperatures().items(): | |
| 294 self.assertIsInstance(name, str) | |
| 295 for unit in units: | |
| 296 self.assertIsInstance(unit.label, str) | |
| 297 self.assertIsInstance(unit.current, (float, int, type(None))) | |
| 298 self.assertIsInstance(unit.high, (float, int, type(None))) | |
| 299 self.assertIsInstance(unit.critical, (float, int, type(None))) | |
| 300 | |
| 301 def test_boot_time(self): | |
| 302 # Duplicate of test_system.py. Keep it anyway. | |
| 303 self.assertIsInstance(psutil.boot_time(), float) | |
| 304 | |
| 305 def test_users(self): | |
| 306 # Duplicate of test_system.py. Keep it anyway. | |
| 307 for user in psutil.users(): | |
| 308 self.assertIsInstance(user.name, str) | |
| 309 self.assertIsInstance(user.terminal, (str, type(None))) | |
| 310 self.assertIsInstance(user.host, (str, type(None))) | |
| 311 self.assertIsInstance(user.pid, (int, type(None))) | |
| 312 | |
| 313 | |
| 314 # =================================================================== | |
| 315 # --- Featch all processes test | |
| 316 # =================================================================== | |
| 317 | |
| 318 | |
| 319 class TestFetchAllProcesses(unittest.TestCase): | |
| 320 """Test which iterates over all running processes and performs | |
| 321 some sanity checks against Process API's returned values. | |
| 322 """ | |
| 323 | |
| 324 def get_attr_names(self): | |
| 325 excluded_names = set([ | |
| 326 'send_signal', 'suspend', 'resume', 'terminate', 'kill', 'wait', | |
| 327 'as_dict', 'parent', 'parents', 'children', 'memory_info_ex', | |
| 328 'oneshot', | |
| 329 ]) | |
| 330 if LINUX and not HAS_RLIMIT: | |
| 331 excluded_names.add('rlimit') | |
| 332 attrs = [] | |
| 333 for name in dir(psutil.Process): | |
| 334 if name.startswith("_"): | |
| 335 continue | |
| 336 if name in excluded_names: | |
| 337 continue | |
| 338 attrs.append(name) | |
| 339 return attrs | |
| 340 | |
| 341 def iter_procs(self): | |
| 342 attrs = self.get_attr_names() | |
| 343 for p in psutil.process_iter(): | |
| 344 with p.oneshot(): | |
| 345 for name in attrs: | |
| 346 yield (p, name) | |
| 347 | |
| 348 def call_meth(self, p, name): | |
| 349 args = () | |
| 350 kwargs = {} | |
| 351 attr = getattr(p, name, None) | |
| 352 if attr is not None and callable(attr): | |
| 353 if name == 'rlimit': | |
| 354 args = (psutil.RLIMIT_NOFILE,) | |
| 355 elif name == 'memory_maps': | |
| 356 kwargs = {'grouped': False} | |
| 357 return attr(*args, **kwargs) | |
| 358 else: | |
| 359 return attr | |
| 360 | |
| 361 def test_fetch_all(self): | |
| 362 valid_procs = 0 | |
| 363 default = object() | |
| 364 failures = [] | |
| 365 for p, name in self.iter_procs(): | |
| 366 ret = default | |
| 367 try: | |
| 368 ret = self.call_meth(p, name) | |
| 369 except NotImplementedError: | |
| 370 msg = "%r was skipped because not implemented" % ( | |
| 371 self.__class__.__name__ + '.test_' + name) | |
| 372 warn(msg) | |
| 373 except (psutil.NoSuchProcess, psutil.AccessDenied) as err: | |
| 374 self.assertEqual(err.pid, p.pid) | |
| 375 if err.name: | |
| 376 # make sure exception's name attr is set | |
| 377 # with the actual process name | |
| 378 self.assertEqual(err.name, p.name()) | |
| 379 assert str(err) | |
| 380 assert err.msg | |
| 381 except Exception: | |
| 382 s = '\n' + '=' * 70 + '\n' | |
| 383 s += "FAIL: test_%s (proc=%s" % (name, p) | |
| 384 if ret != default: | |
| 385 s += ", ret=%s)" % repr(ret) | |
| 386 s += ')\n' | |
| 387 s += '-' * 70 | |
| 388 s += "\n%s" % traceback.format_exc() | |
| 389 s = "\n".join((" " * 4) + i for i in s.splitlines()) | |
| 390 s += '\n' | |
| 391 failures.append(s) | |
| 392 break | |
| 393 else: | |
| 394 valid_procs += 1 | |
| 395 if ret not in (0, 0.0, [], None, '', {}): | |
| 396 assert ret, ret | |
| 397 meth = getattr(self, name) | |
| 398 meth(ret, p) | |
| 399 | |
| 400 if failures: | |
| 401 self.fail(''.join(failures)) | |
| 402 | |
| 403 # we should always have a non-empty list, not including PID 0 etc. | |
| 404 # special cases. | |
| 405 assert valid_procs | |
| 406 | |
| 407 def cmdline(self, ret, proc): | |
| 408 self.assertIsInstance(ret, list) | |
| 409 for part in ret: | |
| 410 self.assertIsInstance(part, str) | |
| 411 | |
| 412 def exe(self, ret, proc): | |
| 413 self.assertIsInstance(ret, (str, type(None))) | |
| 414 if not ret: | |
| 415 self.assertEqual(ret, '') | |
| 416 else: | |
| 417 if WINDOWS and not ret.endswith('.exe'): | |
| 418 return # May be "Registry", "MemCompression", ... | |
| 419 assert os.path.isabs(ret), ret | |
| 420 # Note: os.stat() may return False even if the file is there | |
| 421 # hence we skip the test, see: | |
| 422 # http://stackoverflow.com/questions/3112546/os-path-exists-lies | |
| 423 if POSIX and os.path.isfile(ret): | |
| 424 if hasattr(os, 'access') and hasattr(os, "X_OK"): | |
| 425 # XXX may fail on MACOS | |
| 426 assert os.access(ret, os.X_OK) | |
| 427 | |
| 428 def pid(self, ret, proc): | |
| 429 self.assertIsInstance(ret, int) | |
| 430 self.assertGreaterEqual(ret, 0) | |
| 431 | |
| 432 def ppid(self, ret, proc): | |
| 433 self.assertIsInstance(ret, (int, long)) | |
| 434 self.assertGreaterEqual(ret, 0) | |
| 435 | |
| 436 def name(self, ret, proc): | |
| 437 self.assertIsInstance(ret, str) | |
| 438 # on AIX, "<exiting>" processes don't have names | |
| 439 if not AIX: | |
| 440 assert ret | |
| 441 | |
| 442 def create_time(self, ret, proc): | |
| 443 self.assertIsInstance(ret, float) | |
| 444 try: | |
| 445 self.assertGreaterEqual(ret, 0) | |
| 446 except AssertionError: | |
| 447 # XXX | |
| 448 if OPENBSD and proc.status() == psutil.STATUS_ZOMBIE: | |
| 449 pass | |
| 450 else: | |
| 451 raise | |
| 452 # this can't be taken for granted on all platforms | |
| 453 # self.assertGreaterEqual(ret, psutil.boot_time()) | |
| 454 # make sure returned value can be pretty printed | |
| 455 # with strftime | |
| 456 time.strftime("%Y %m %d %H:%M:%S", time.localtime(ret)) | |
| 457 | |
| 458 def uids(self, ret, proc): | |
| 459 assert is_namedtuple(ret) | |
| 460 for uid in ret: | |
| 461 self.assertIsInstance(uid, int) | |
| 462 self.assertGreaterEqual(uid, 0) | |
| 463 | |
| 464 def gids(self, ret, proc): | |
| 465 assert is_namedtuple(ret) | |
| 466 # note: testing all gids as above seems not to be reliable for | |
| 467 # gid == 30 (nodoby); not sure why. | |
| 468 for gid in ret: | |
| 469 self.assertIsInstance(gid, int) | |
| 470 if not MACOS and not NETBSD: | |
| 471 self.assertGreaterEqual(gid, 0) | |
| 472 | |
| 473 def username(self, ret, proc): | |
| 474 self.assertIsInstance(ret, str) | |
| 475 assert ret | |
| 476 | |
| 477 def status(self, ret, proc): | |
| 478 self.assertIsInstance(ret, str) | |
| 479 assert ret | |
| 480 self.assertNotEqual(ret, '?') # XXX | |
| 481 self.assertIn(ret, VALID_PROC_STATUSES) | |
| 482 | |
| 483 def io_counters(self, ret, proc): | |
| 484 assert is_namedtuple(ret) | |
| 485 for field in ret: | |
| 486 self.assertIsInstance(field, (int, long)) | |
| 487 if field != -1: | |
| 488 self.assertGreaterEqual(field, 0) | |
| 489 | |
| 490 def ionice(self, ret, proc): | |
| 491 if LINUX: | |
| 492 self.assertIsInstance(ret.ioclass, int) | |
| 493 self.assertIsInstance(ret.value, int) | |
| 494 self.assertGreaterEqual(ret.ioclass, 0) | |
| 495 self.assertGreaterEqual(ret.value, 0) | |
| 496 else: # Windows, Cygwin | |
| 497 choices = [ | |
| 498 psutil.IOPRIO_VERYLOW, | |
| 499 psutil.IOPRIO_LOW, | |
| 500 psutil.IOPRIO_NORMAL, | |
| 501 psutil.IOPRIO_HIGH] | |
| 502 self.assertIsInstance(ret, int) | |
| 503 self.assertGreaterEqual(ret, 0) | |
| 504 self.assertIn(ret, choices) | |
| 505 | |
| 506 def num_threads(self, ret, proc): | |
| 507 self.assertIsInstance(ret, int) | |
| 508 self.assertGreaterEqual(ret, 1) | |
| 509 | |
| 510 def threads(self, ret, proc): | |
| 511 self.assertIsInstance(ret, list) | |
| 512 for t in ret: | |
| 513 assert is_namedtuple(t) | |
| 514 self.assertGreaterEqual(t.id, 0) | |
| 515 self.assertGreaterEqual(t.user_time, 0) | |
| 516 self.assertGreaterEqual(t.system_time, 0) | |
| 517 for field in t: | |
| 518 self.assertIsInstance(field, (int, float)) | |
| 519 | |
| 520 def cpu_times(self, ret, proc): | |
| 521 assert is_namedtuple(ret) | |
| 522 for n in ret: | |
| 523 self.assertIsInstance(n, float) | |
| 524 self.assertGreaterEqual(n, 0) | |
| 525 # TODO: check ntuple fields | |
| 526 | |
| 527 def cpu_percent(self, ret, proc): | |
| 528 self.assertIsInstance(ret, float) | |
| 529 assert 0.0 <= ret <= 100.0, ret | |
| 530 | |
| 531 def cpu_num(self, ret, proc): | |
| 532 self.assertIsInstance(ret, int) | |
| 533 if FREEBSD and ret == -1: | |
| 534 return | |
| 535 self.assertGreaterEqual(ret, 0) | |
| 536 if psutil.cpu_count() == 1: | |
| 537 self.assertEqual(ret, 0) | |
| 538 self.assertIn(ret, list(range(psutil.cpu_count()))) | |
| 539 | |
| 540 def memory_info(self, ret, proc): | |
| 541 assert is_namedtuple(ret) | |
| 542 for value in ret: | |
| 543 self.assertIsInstance(value, (int, long)) | |
| 544 self.assertGreaterEqual(value, 0) | |
| 545 if WINDOWS: | |
| 546 self.assertGreaterEqual(ret.peak_wset, ret.wset) | |
| 547 self.assertGreaterEqual(ret.peak_paged_pool, ret.paged_pool) | |
| 548 self.assertGreaterEqual(ret.peak_nonpaged_pool, ret.nonpaged_pool) | |
| 549 self.assertGreaterEqual(ret.peak_pagefile, ret.pagefile) | |
| 550 | |
| 551 def memory_full_info(self, ret, proc): | |
| 552 assert is_namedtuple(ret) | |
| 553 total = psutil.virtual_memory().total | |
| 554 for name in ret._fields: | |
| 555 value = getattr(ret, name) | |
| 556 self.assertIsInstance(value, (int, long)) | |
| 557 self.assertGreaterEqual(value, 0, msg=(name, value)) | |
| 558 if LINUX or OSX and name in ('vms', 'data'): | |
| 559 # On Linux there are processes (e.g. 'goa-daemon') whose | |
| 560 # VMS is incredibly high for some reason. | |
| 561 continue | |
| 562 self.assertLessEqual(value, total, msg=(name, value, total)) | |
| 563 | |
| 564 if LINUX: | |
| 565 self.assertGreaterEqual(ret.pss, ret.uss) | |
| 566 | |
| 567 def open_files(self, ret, proc): | |
| 568 self.assertIsInstance(ret, list) | |
| 569 for f in ret: | |
| 570 self.assertIsInstance(f.fd, int) | |
| 571 self.assertIsInstance(f.path, str) | |
| 572 if WINDOWS: | |
| 573 self.assertEqual(f.fd, -1) | |
| 574 elif LINUX: | |
| 575 self.assertIsInstance(f.position, int) | |
| 576 self.assertIsInstance(f.mode, str) | |
| 577 self.assertIsInstance(f.flags, int) | |
| 578 self.assertGreaterEqual(f.position, 0) | |
| 579 self.assertIn(f.mode, ('r', 'w', 'a', 'r+', 'a+')) | |
| 580 self.assertGreater(f.flags, 0) | |
| 581 elif BSD and not f.path: | |
| 582 # XXX see: https://github.com/giampaolo/psutil/issues/595 | |
| 583 continue | |
| 584 assert os.path.isabs(f.path), f | |
| 585 assert os.path.isfile(f.path), f | |
| 586 | |
| 587 def num_fds(self, ret, proc): | |
| 588 self.assertIsInstance(ret, int) | |
| 589 self.assertGreaterEqual(ret, 0) | |
| 590 | |
| 591 def connections(self, ret, proc): | |
| 592 with create_sockets(): | |
| 593 self.assertEqual(len(ret), len(set(ret))) | |
| 594 for conn in ret: | |
| 595 assert is_namedtuple(conn) | |
| 596 | |
| 597 def cwd(self, ret, proc): | |
| 598 if ret: # 'ret' can be None or empty | |
| 599 self.assertIsInstance(ret, str) | |
| 600 assert os.path.isabs(ret), ret | |
| 601 try: | |
| 602 st = os.stat(ret) | |
| 603 except OSError as err: | |
| 604 if WINDOWS and err.errno in \ | |
| 605 psutil._psplatform.ACCESS_DENIED_SET: | |
| 606 pass | |
| 607 # directory has been removed in mean time | |
| 608 elif err.errno != errno.ENOENT: | |
| 609 raise | |
| 610 else: | |
| 611 assert stat.S_ISDIR(st.st_mode) | |
| 612 | |
| 613 def memory_percent(self, ret, proc): | |
| 614 self.assertIsInstance(ret, float) | |
| 615 assert 0 <= ret <= 100, ret | |
| 616 | |
| 617 def is_running(self, ret, proc): | |
| 618 self.assertIsInstance(ret, bool) | |
| 619 | |
| 620 def cpu_affinity(self, ret, proc): | |
| 621 self.assertIsInstance(ret, list) | |
| 622 assert ret != [], ret | |
| 623 cpus = range(psutil.cpu_count()) | |
| 624 for n in ret: | |
| 625 self.assertIsInstance(n, int) | |
| 626 self.assertIn(n, cpus) | |
| 627 | |
| 628 def terminal(self, ret, proc): | |
| 629 self.assertIsInstance(ret, (str, type(None))) | |
| 630 if ret is not None: | |
| 631 assert os.path.isabs(ret), ret | |
| 632 assert os.path.exists(ret), ret | |
| 633 | |
| 634 def memory_maps(self, ret, proc): | |
| 635 for nt in ret: | |
| 636 self.assertIsInstance(nt.addr, str) | |
| 637 self.assertIsInstance(nt.perms, str) | |
| 638 self.assertIsInstance(nt.path, str) | |
| 639 for fname in nt._fields: | |
| 640 value = getattr(nt, fname) | |
| 641 if fname == 'path': | |
| 642 if not value.startswith('['): | |
| 643 assert os.path.isabs(nt.path), nt.path | |
| 644 # commented as on Linux we might get | |
| 645 # '/foo/bar (deleted)' | |
| 646 # assert os.path.exists(nt.path), nt.path | |
| 647 elif fname == 'addr': | |
| 648 assert value, repr(value) | |
| 649 elif fname == 'perms': | |
| 650 if not WINDOWS: | |
| 651 assert value, repr(value) | |
| 652 else: | |
| 653 self.assertIsInstance(value, (int, long)) | |
| 654 self.assertGreaterEqual(value, 0) | |
| 655 | |
| 656 def num_handles(self, ret, proc): | |
| 657 self.assertIsInstance(ret, int) | |
| 658 self.assertGreaterEqual(ret, 0) | |
| 659 | |
| 660 def nice(self, ret, proc): | |
| 661 self.assertIsInstance(ret, int) | |
| 662 if POSIX: | |
| 663 assert -20 <= ret <= 20, ret | |
| 664 else: | |
| 665 priorities = [getattr(psutil, x) for x in dir(psutil) | |
| 666 if x.endswith('_PRIORITY_CLASS')] | |
| 667 self.assertIn(ret, priorities) | |
| 668 | |
| 669 def num_ctx_switches(self, ret, proc): | |
| 670 assert is_namedtuple(ret) | |
| 671 for value in ret: | |
| 672 self.assertIsInstance(value, (int, long)) | |
| 673 self.assertGreaterEqual(value, 0) | |
| 674 | |
| 675 def rlimit(self, ret, proc): | |
| 676 self.assertIsInstance(ret, tuple) | |
| 677 self.assertEqual(len(ret), 2) | |
| 678 self.assertGreaterEqual(ret[0], -1) | |
| 679 self.assertGreaterEqual(ret[1], -1) | |
| 680 | |
| 681 def environ(self, ret, proc): | |
| 682 self.assertIsInstance(ret, dict) | |
| 683 for k, v in ret.items(): | |
| 684 self.assertIsInstance(k, str) | |
| 685 self.assertIsInstance(v, str) | |
| 686 | |
| 687 | |
| 688 if __name__ == '__main__': | |
| 689 from psutil.tests.runner import run | |
| 690 run(__file__) |
