Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/psutil/tests/test_contracts.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
| author | guerler |
|---|---|
| date | Fri, 31 Jul 2020 00:32:28 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| 0:d30785e31577 | 1:56ad4e20f292 |
|---|---|
| 1 #!/usr/bin/env python3 | |
| 2 | |
| 3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. | |
| 4 # Use of this source code is governed by a BSD-style license that can be | |
| 5 # found in the LICENSE file. | |
| 6 | |
| 7 """Contracts tests. These tests mainly check API sanity in terms of | |
| 8 returned types and APIs availability. | |
| 9 Some of these are duplicates of tests test_system.py and test_process.py | |
| 10 """ | |
| 11 | |
| 12 import errno | |
| 13 import multiprocessing | |
| 14 import os | |
| 15 import signal | |
| 16 import stat | |
| 17 import sys | |
| 18 import time | |
| 19 import traceback | |
| 20 | |
| 21 from psutil import AIX | |
| 22 from psutil import BSD | |
| 23 from psutil import FREEBSD | |
| 24 from psutil import LINUX | |
| 25 from psutil import MACOS | |
| 26 from psutil import NETBSD | |
| 27 from psutil import OPENBSD | |
| 28 from psutil import OSX | |
| 29 from psutil import POSIX | |
| 30 from psutil import SUNOS | |
| 31 from psutil import WINDOWS | |
| 32 from psutil._compat import FileNotFoundError | |
| 33 from psutil._compat import long | |
| 34 from psutil._compat import range | |
| 35 from psutil.tests import create_sockets | |
| 36 from psutil.tests import enum | |
| 37 from psutil.tests import GITHUB_WHEELS | |
| 38 from psutil.tests import HAS_CPU_FREQ | |
| 39 from psutil.tests import HAS_NET_IO_COUNTERS | |
| 40 from psutil.tests import HAS_SENSORS_FANS | |
| 41 from psutil.tests import HAS_SENSORS_TEMPERATURES | |
| 42 from psutil.tests import is_namedtuple | |
| 43 from psutil.tests import process_namespace | |
| 44 from psutil.tests import PsutilTestCase | |
| 45 from psutil.tests import PYPY | |
| 46 from psutil.tests import serialrun | |
| 47 from psutil.tests import SKIP_SYSCONS | |
| 48 from psutil.tests import unittest | |
| 49 from psutil.tests import VALID_PROC_STATUSES | |
| 50 import psutil | |
| 51 | |
| 52 | |
| 53 # =================================================================== | |
| 54 # --- APIs availability | |
| 55 # =================================================================== | |
| 56 | |
| 57 # Make sure code reflects what doc promises in terms of APIs | |
| 58 # availability. | |
| 59 | |
| 60 class TestAvailConstantsAPIs(PsutilTestCase): | |
| 61 | |
| 62 def test_PROCFS_PATH(self): | |
| 63 self.assertEqual(hasattr(psutil, "PROCFS_PATH"), | |
| 64 LINUX or SUNOS or AIX) | |
| 65 | |
| 66 def test_win_priority(self): | |
| 67 ae = self.assertEqual | |
| 68 ae(hasattr(psutil, "ABOVE_NORMAL_PRIORITY_CLASS"), WINDOWS) | |
| 69 ae(hasattr(psutil, "BELOW_NORMAL_PRIORITY_CLASS"), WINDOWS) | |
| 70 ae(hasattr(psutil, "HIGH_PRIORITY_CLASS"), WINDOWS) | |
| 71 ae(hasattr(psutil, "IDLE_PRIORITY_CLASS"), WINDOWS) | |
| 72 ae(hasattr(psutil, "NORMAL_PRIORITY_CLASS"), WINDOWS) | |
| 73 ae(hasattr(psutil, "REALTIME_PRIORITY_CLASS"), WINDOWS) | |
| 74 | |
| 75 def test_linux_ioprio_linux(self): | |
| 76 ae = self.assertEqual | |
| 77 ae(hasattr(psutil, "IOPRIO_CLASS_NONE"), LINUX) | |
| 78 ae(hasattr(psutil, "IOPRIO_CLASS_RT"), LINUX) | |
| 79 ae(hasattr(psutil, "IOPRIO_CLASS_BE"), LINUX) | |
| 80 ae(hasattr(psutil, "IOPRIO_CLASS_IDLE"), LINUX) | |
| 81 | |
| 82 def test_linux_ioprio_windows(self): | |
| 83 ae = self.assertEqual | |
| 84 ae(hasattr(psutil, "IOPRIO_HIGH"), WINDOWS) | |
| 85 ae(hasattr(psutil, "IOPRIO_NORMAL"), WINDOWS) | |
| 86 ae(hasattr(psutil, "IOPRIO_LOW"), WINDOWS) | |
| 87 ae(hasattr(psutil, "IOPRIO_VERYLOW"), WINDOWS) | |
| 88 | |
| 89 @unittest.skipIf(GITHUB_WHEELS, "not exposed via GITHUB_WHEELS") | |
| 90 def test_linux_rlimit(self): | |
| 91 ae = self.assertEqual | |
| 92 ae(hasattr(psutil, "RLIM_INFINITY"), LINUX) | |
| 93 ae(hasattr(psutil, "RLIMIT_AS"), LINUX) | |
| 94 ae(hasattr(psutil, "RLIMIT_CORE"), LINUX) | |
| 95 ae(hasattr(psutil, "RLIMIT_CPU"), LINUX) | |
| 96 ae(hasattr(psutil, "RLIMIT_DATA"), LINUX) | |
| 97 ae(hasattr(psutil, "RLIMIT_FSIZE"), LINUX) | |
| 98 ae(hasattr(psutil, "RLIMIT_LOCKS"), LINUX) | |
| 99 ae(hasattr(psutil, "RLIMIT_MEMLOCK"), LINUX) | |
| 100 ae(hasattr(psutil, "RLIMIT_NOFILE"), LINUX) | |
| 101 ae(hasattr(psutil, "RLIMIT_NPROC"), LINUX) | |
| 102 ae(hasattr(psutil, "RLIMIT_RSS"), LINUX) | |
| 103 ae(hasattr(psutil, "RLIMIT_STACK"), LINUX) | |
| 104 | |
| 105 ae(hasattr(psutil, "RLIMIT_MSGQUEUE"), LINUX) # requires Linux 2.6.8 | |
| 106 ae(hasattr(psutil, "RLIMIT_NICE"), LINUX) # requires Linux 2.6.12 | |
| 107 ae(hasattr(psutil, "RLIMIT_RTPRIO"), LINUX) # requires Linux 2.6.12 | |
| 108 ae(hasattr(psutil, "RLIMIT_RTTIME"), LINUX) # requires Linux 2.6.25 | |
| 109 ae(hasattr(psutil, "RLIMIT_SIGPENDING"), LINUX) # requires Linux 2.6.8 | |
| 110 | |
| 111 | |
| 112 class TestAvailSystemAPIs(PsutilTestCase): | |
| 113 | |
| 114 def test_win_service_iter(self): | |
| 115 self.assertEqual(hasattr(psutil, "win_service_iter"), WINDOWS) | |
| 116 | |
| 117 def test_win_service_get(self): | |
| 118 self.assertEqual(hasattr(psutil, "win_service_get"), WINDOWS) | |
| 119 | |
| 120 def test_cpu_freq(self): | |
| 121 self.assertEqual(hasattr(psutil, "cpu_freq"), | |
| 122 LINUX or MACOS or WINDOWS or FREEBSD) | |
| 123 | |
| 124 def test_sensors_temperatures(self): | |
| 125 self.assertEqual( | |
| 126 hasattr(psutil, "sensors_temperatures"), LINUX or FREEBSD) | |
| 127 | |
| 128 def test_sensors_fans(self): | |
| 129 self.assertEqual(hasattr(psutil, "sensors_fans"), LINUX) | |
| 130 | |
| 131 def test_battery(self): | |
| 132 self.assertEqual(hasattr(psutil, "sensors_battery"), | |
| 133 LINUX or WINDOWS or FREEBSD or MACOS) | |
| 134 | |
| 135 | |
| 136 class TestAvailProcessAPIs(PsutilTestCase): | |
| 137 | |
| 138 def test_environ(self): | |
| 139 self.assertEqual(hasattr(psutil.Process, "environ"), | |
| 140 LINUX or MACOS or WINDOWS or AIX or SUNOS) | |
| 141 | |
| 142 def test_uids(self): | |
| 143 self.assertEqual(hasattr(psutil.Process, "uids"), POSIX) | |
| 144 | |
| 145 def test_gids(self): | |
| 146 self.assertEqual(hasattr(psutil.Process, "uids"), POSIX) | |
| 147 | |
| 148 def test_terminal(self): | |
| 149 self.assertEqual(hasattr(psutil.Process, "terminal"), POSIX) | |
| 150 | |
| 151 def test_ionice(self): | |
| 152 self.assertEqual(hasattr(psutil.Process, "ionice"), LINUX or WINDOWS) | |
| 153 | |
| 154 @unittest.skipIf(GITHUB_WHEELS, "not exposed via GITHUB_WHEELS") | |
| 155 def test_rlimit(self): | |
| 156 # requires Linux 2.6.36 | |
| 157 self.assertEqual(hasattr(psutil.Process, "rlimit"), LINUX) | |
| 158 | |
| 159 def test_io_counters(self): | |
| 160 hasit = hasattr(psutil.Process, "io_counters") | |
| 161 self.assertEqual(hasit, False if MACOS or SUNOS else True) | |
| 162 | |
| 163 def test_num_fds(self): | |
| 164 self.assertEqual(hasattr(psutil.Process, "num_fds"), POSIX) | |
| 165 | |
| 166 def test_num_handles(self): | |
| 167 self.assertEqual(hasattr(psutil.Process, "num_handles"), WINDOWS) | |
| 168 | |
| 169 def test_cpu_affinity(self): | |
| 170 self.assertEqual(hasattr(psutil.Process, "cpu_affinity"), | |
| 171 LINUX or WINDOWS or FREEBSD) | |
| 172 | |
| 173 def test_cpu_num(self): | |
| 174 self.assertEqual(hasattr(psutil.Process, "cpu_num"), | |
| 175 LINUX or FREEBSD or SUNOS) | |
| 176 | |
| 177 def test_memory_maps(self): | |
| 178 hasit = hasattr(psutil.Process, "memory_maps") | |
| 179 self.assertEqual( | |
| 180 hasit, False if OPENBSD or NETBSD or AIX or MACOS else True) | |
| 181 | |
| 182 | |
| 183 # =================================================================== | |
| 184 # --- API types | |
| 185 # =================================================================== | |
| 186 | |
| 187 | |
| 188 class TestSystemAPITypes(PsutilTestCase): | |
| 189 """Check the return types of system related APIs. | |
| 190 Mainly we want to test we never return unicode on Python 2, see: | |
| 191 https://github.com/giampaolo/psutil/issues/1039 | |
| 192 """ | |
| 193 | |
| 194 @classmethod | |
| 195 def setUpClass(cls): | |
| 196 cls.proc = psutil.Process() | |
| 197 | |
| 198 def assert_ntuple_of_nums(self, nt, type_=float, gezero=True): | |
| 199 assert is_namedtuple(nt) | |
| 200 for n in nt: | |
| 201 self.assertIsInstance(n, type_) | |
| 202 if gezero: | |
| 203 self.assertGreaterEqual(n, 0) | |
| 204 | |
| 205 def test_cpu_times(self): | |
| 206 self.assert_ntuple_of_nums(psutil.cpu_times()) | |
| 207 for nt in psutil.cpu_times(percpu=True): | |
| 208 self.assert_ntuple_of_nums(nt) | |
| 209 | |
| 210 def test_cpu_percent(self): | |
| 211 self.assertIsInstance(psutil.cpu_percent(interval=None), float) | |
| 212 self.assertIsInstance(psutil.cpu_percent(interval=0.00001), float) | |
| 213 | |
| 214 def test_cpu_times_percent(self): | |
| 215 self.assert_ntuple_of_nums(psutil.cpu_times_percent(interval=None)) | |
| 216 self.assert_ntuple_of_nums(psutil.cpu_times_percent(interval=0.0001)) | |
| 217 | |
| 218 def test_cpu_count(self): | |
| 219 self.assertIsInstance(psutil.cpu_count(), int) | |
| 220 | |
| 221 @unittest.skipIf(not HAS_CPU_FREQ, "not supported") | |
| 222 def test_cpu_freq(self): | |
| 223 if psutil.cpu_freq() is None: | |
| 224 raise self.skipTest("cpu_freq() returns None") | |
| 225 self.assert_ntuple_of_nums(psutil.cpu_freq(), type_=(float, int, long)) | |
| 226 | |
| 227 def test_disk_io_counters(self): | |
| 228 # Duplicate of test_system.py. Keep it anyway. | |
| 229 for k, v in psutil.disk_io_counters(perdisk=True).items(): | |
| 230 self.assertIsInstance(k, str) | |
| 231 self.assert_ntuple_of_nums(v, type_=(int, long)) | |
| 232 | |
| 233 def test_disk_partitions(self): | |
| 234 # Duplicate of test_system.py. Keep it anyway. | |
| 235 for disk in psutil.disk_partitions(): | |
| 236 self.assertIsInstance(disk.device, str) | |
| 237 self.assertIsInstance(disk.mountpoint, str) | |
| 238 self.assertIsInstance(disk.fstype, str) | |
| 239 self.assertIsInstance(disk.opts, str) | |
| 240 | |
| 241 @unittest.skipIf(SKIP_SYSCONS, "requires root") | |
| 242 def test_net_connections(self): | |
| 243 with create_sockets(): | |
| 244 ret = psutil.net_connections('all') | |
| 245 self.assertEqual(len(ret), len(set(ret))) | |
| 246 for conn in ret: | |
| 247 assert is_namedtuple(conn) | |
| 248 | |
| 249 def test_net_if_addrs(self): | |
| 250 # Duplicate of test_system.py. Keep it anyway. | |
| 251 for ifname, addrs in psutil.net_if_addrs().items(): | |
| 252 self.assertIsInstance(ifname, str) | |
| 253 for addr in addrs: | |
| 254 if enum is not None and not PYPY: | |
| 255 self.assertIsInstance(addr.family, enum.IntEnum) | |
| 256 else: | |
| 257 self.assertIsInstance(addr.family, int) | |
| 258 self.assertIsInstance(addr.address, str) | |
| 259 self.assertIsInstance(addr.netmask, (str, type(None))) | |
| 260 self.assertIsInstance(addr.broadcast, (str, type(None))) | |
| 261 | |
| 262 def test_net_if_stats(self): | |
| 263 # Duplicate of test_system.py. Keep it anyway. | |
| 264 for ifname, info in psutil.net_if_stats().items(): | |
| 265 self.assertIsInstance(ifname, str) | |
| 266 self.assertIsInstance(info.isup, bool) | |
| 267 if enum is not None: | |
| 268 self.assertIsInstance(info.duplex, enum.IntEnum) | |
| 269 else: | |
| 270 self.assertIsInstance(info.duplex, int) | |
| 271 self.assertIsInstance(info.speed, int) | |
| 272 self.assertIsInstance(info.mtu, int) | |
| 273 | |
| 274 @unittest.skipIf(not HAS_NET_IO_COUNTERS, 'not supported') | |
| 275 def test_net_io_counters(self): | |
| 276 # Duplicate of test_system.py. Keep it anyway. | |
| 277 for ifname, _ in psutil.net_io_counters(pernic=True).items(): | |
| 278 self.assertIsInstance(ifname, str) | |
| 279 | |
| 280 @unittest.skipIf(not HAS_SENSORS_FANS, "not supported") | |
| 281 def test_sensors_fans(self): | |
| 282 # Duplicate of test_system.py. Keep it anyway. | |
| 283 for name, units in psutil.sensors_fans().items(): | |
| 284 self.assertIsInstance(name, str) | |
| 285 for unit in units: | |
| 286 self.assertIsInstance(unit.label, str) | |
| 287 self.assertIsInstance(unit.current, (float, int, type(None))) | |
| 288 | |
| 289 @unittest.skipIf(not HAS_SENSORS_TEMPERATURES, "not supported") | |
| 290 def test_sensors_temperatures(self): | |
| 291 # Duplicate of test_system.py. Keep it anyway. | |
| 292 for name, units in psutil.sensors_temperatures().items(): | |
| 293 self.assertIsInstance(name, str) | |
| 294 for unit in units: | |
| 295 self.assertIsInstance(unit.label, str) | |
| 296 self.assertIsInstance(unit.current, (float, int, type(None))) | |
| 297 self.assertIsInstance(unit.high, (float, int, type(None))) | |
| 298 self.assertIsInstance(unit.critical, (float, int, type(None))) | |
| 299 | |
| 300 def test_boot_time(self): | |
| 301 # Duplicate of test_system.py. Keep it anyway. | |
| 302 self.assertIsInstance(psutil.boot_time(), float) | |
| 303 | |
| 304 def test_users(self): | |
| 305 # Duplicate of test_system.py. Keep it anyway. | |
| 306 for user in psutil.users(): | |
| 307 self.assertIsInstance(user.name, str) | |
| 308 self.assertIsInstance(user.terminal, (str, type(None))) | |
| 309 self.assertIsInstance(user.host, (str, type(None))) | |
| 310 self.assertIsInstance(user.pid, (int, type(None))) | |
| 311 | |
| 312 | |
| 313 class TestProcessWaitType(PsutilTestCase): | |
| 314 | |
| 315 @unittest.skipIf(not POSIX, "not POSIX") | |
| 316 def test_negative_signal(self): | |
| 317 p = psutil.Process(self.spawn_testproc().pid) | |
| 318 p.terminate() | |
| 319 code = p.wait() | |
| 320 self.assertEqual(code, -signal.SIGTERM) | |
| 321 if enum is not None: | |
| 322 self.assertIsInstance(code, enum.IntEnum) | |
| 323 else: | |
| 324 self.assertIsInstance(code, int) | |
| 325 | |
| 326 | |
| 327 # =================================================================== | |
| 328 # --- Featch all processes test | |
| 329 # =================================================================== | |
| 330 | |
| 331 | |
| 332 def proc_info(pid): | |
| 333 tcase = PsutilTestCase() | |
| 334 | |
| 335 def check_exception(exc, proc, name, ppid): | |
| 336 tcase.assertEqual(exc.pid, pid) | |
| 337 tcase.assertEqual(exc.name, name) | |
| 338 if isinstance(exc, psutil.ZombieProcess): | |
| 339 if exc.ppid is not None: | |
| 340 tcase.assertGreaterEqual(exc.ppid, 0) | |
| 341 tcase.assertEqual(exc.ppid, ppid) | |
| 342 elif isinstance(exc, psutil.NoSuchProcess): | |
| 343 tcase.assertProcessGone(proc) | |
| 344 str(exc) | |
| 345 assert exc.msg | |
| 346 | |
| 347 def do_wait(): | |
| 348 if pid != 0: | |
| 349 try: | |
| 350 proc.wait(0) | |
| 351 except psutil.Error as exc: | |
| 352 check_exception(exc, proc, name, ppid) | |
| 353 | |
| 354 try: | |
| 355 proc = psutil.Process(pid) | |
| 356 d = proc.as_dict(['ppid', 'name']) | |
| 357 except psutil.NoSuchProcess: | |
| 358 return {} | |
| 359 | |
| 360 name, ppid = d['name'], d['ppid'] | |
| 361 info = {'pid': proc.pid} | |
| 362 ns = process_namespace(proc) | |
| 363 with proc.oneshot(): | |
| 364 for fun, fun_name in ns.iter(ns.getters, clear_cache=False): | |
| 365 try: | |
| 366 info[fun_name] = fun() | |
| 367 except psutil.Error as exc: | |
| 368 check_exception(exc, proc, name, ppid) | |
| 369 continue | |
| 370 do_wait() | |
| 371 return info | |
| 372 | |
| 373 | |
| 374 @serialrun | |
| 375 class TestFetchAllProcesses(PsutilTestCase): | |
| 376 """Test which iterates over all running processes and performs | |
| 377 some sanity checks against Process API's returned values. | |
| 378 Uses a process pool to get info about all processes. | |
| 379 """ | |
| 380 | |
| 381 def setUp(self): | |
| 382 self.pool = multiprocessing.Pool() | |
| 383 | |
| 384 def tearDown(self): | |
| 385 self.pool.terminate() | |
| 386 self.pool.join() | |
| 387 | |
| 388 def iter_proc_info(self): | |
| 389 # Fixes "can't pickle <function proc_info>: it's not the | |
| 390 # same object as test_contracts.proc_info". | |
| 391 from psutil.tests.test_contracts import proc_info | |
| 392 return self.pool.imap_unordered(proc_info, psutil.pids()) | |
| 393 | |
| 394 def test_all(self): | |
| 395 failures = [] | |
| 396 for info in self.iter_proc_info(): | |
| 397 for name, value in info.items(): | |
| 398 meth = getattr(self, name) | |
| 399 try: | |
| 400 meth(value, info) | |
| 401 except AssertionError: | |
| 402 s = '\n' + '=' * 70 + '\n' | |
| 403 s += "FAIL: test_%s pid=%s, ret=%s\n" % ( | |
| 404 name, info['pid'], repr(value)) | |
| 405 s += '-' * 70 | |
| 406 s += "\n%s" % traceback.format_exc() | |
| 407 s = "\n".join((" " * 4) + i for i in s.splitlines()) | |
| 408 s += '\n' | |
| 409 failures.append(s) | |
| 410 else: | |
| 411 if value not in (0, 0.0, [], None, '', {}): | |
| 412 assert value, value | |
| 413 if failures: | |
| 414 raise self.fail(''.join(failures)) | |
| 415 | |
| 416 def cmdline(self, ret, info): | |
| 417 self.assertIsInstance(ret, list) | |
| 418 for part in ret: | |
| 419 self.assertIsInstance(part, str) | |
| 420 | |
| 421 def exe(self, ret, info): | |
| 422 self.assertIsInstance(ret, (str, type(None))) | |
| 423 if not ret: | |
| 424 self.assertEqual(ret, '') | |
| 425 else: | |
| 426 if WINDOWS and not ret.endswith('.exe'): | |
| 427 return # May be "Registry", "MemCompression", ... | |
| 428 assert os.path.isabs(ret), ret | |
| 429 # Note: os.stat() may return False even if the file is there | |
| 430 # hence we skip the test, see: | |
| 431 # http://stackoverflow.com/questions/3112546/os-path-exists-lies | |
| 432 if POSIX and os.path.isfile(ret): | |
| 433 if hasattr(os, 'access') and hasattr(os, "X_OK"): | |
| 434 # XXX may fail on MACOS | |
| 435 assert os.access(ret, os.X_OK) | |
| 436 | |
| 437 def pid(self, ret, info): | |
| 438 self.assertIsInstance(ret, int) | |
| 439 self.assertGreaterEqual(ret, 0) | |
| 440 | |
| 441 def ppid(self, ret, info): | |
| 442 self.assertIsInstance(ret, (int, long)) | |
| 443 self.assertGreaterEqual(ret, 0) | |
| 444 | |
| 445 def name(self, ret, info): | |
| 446 self.assertIsInstance(ret, str) | |
| 447 # on AIX, "<exiting>" processes don't have names | |
| 448 if not AIX: | |
| 449 assert ret | |
| 450 | |
| 451 def create_time(self, ret, info): | |
| 452 self.assertIsInstance(ret, float) | |
| 453 try: | |
| 454 self.assertGreaterEqual(ret, 0) | |
| 455 except AssertionError: | |
| 456 # XXX | |
| 457 if OPENBSD and info['status'] == psutil.STATUS_ZOMBIE: | |
| 458 pass | |
| 459 else: | |
| 460 raise | |
| 461 # this can't be taken for granted on all platforms | |
| 462 # self.assertGreaterEqual(ret, psutil.boot_time()) | |
| 463 # make sure returned value can be pretty printed | |
| 464 # with strftime | |
| 465 time.strftime("%Y %m %d %H:%M:%S", time.localtime(ret)) | |
| 466 | |
| 467 def uids(self, ret, info): | |
| 468 assert is_namedtuple(ret) | |
| 469 for uid in ret: | |
| 470 self.assertIsInstance(uid, int) | |
| 471 self.assertGreaterEqual(uid, 0) | |
| 472 | |
| 473 def gids(self, ret, info): | |
| 474 assert is_namedtuple(ret) | |
| 475 # note: testing all gids as above seems not to be reliable for | |
| 476 # gid == 30 (nodoby); not sure why. | |
| 477 for gid in ret: | |
| 478 self.assertIsInstance(gid, int) | |
| 479 if not MACOS and not NETBSD: | |
| 480 self.assertGreaterEqual(gid, 0) | |
| 481 | |
| 482 def username(self, ret, info): | |
| 483 self.assertIsInstance(ret, str) | |
| 484 assert ret | |
| 485 | |
| 486 def status(self, ret, info): | |
| 487 self.assertIsInstance(ret, str) | |
| 488 assert ret | |
| 489 self.assertNotEqual(ret, '?') # XXX | |
| 490 self.assertIn(ret, VALID_PROC_STATUSES) | |
| 491 | |
| 492 def io_counters(self, ret, info): | |
| 493 assert is_namedtuple(ret) | |
| 494 for field in ret: | |
| 495 self.assertIsInstance(field, (int, long)) | |
| 496 if field != -1: | |
| 497 self.assertGreaterEqual(field, 0) | |
| 498 | |
| 499 def ionice(self, ret, info): | |
| 500 if LINUX: | |
| 501 self.assertIsInstance(ret.ioclass, int) | |
| 502 self.assertIsInstance(ret.value, int) | |
| 503 self.assertGreaterEqual(ret.ioclass, 0) | |
| 504 self.assertGreaterEqual(ret.value, 0) | |
| 505 else: # Windows, Cygwin | |
| 506 choices = [ | |
| 507 psutil.IOPRIO_VERYLOW, | |
| 508 psutil.IOPRIO_LOW, | |
| 509 psutil.IOPRIO_NORMAL, | |
| 510 psutil.IOPRIO_HIGH] | |
| 511 self.assertIsInstance(ret, int) | |
| 512 self.assertGreaterEqual(ret, 0) | |
| 513 self.assertIn(ret, choices) | |
| 514 | |
| 515 def num_threads(self, ret, info): | |
| 516 self.assertIsInstance(ret, int) | |
| 517 self.assertGreaterEqual(ret, 1) | |
| 518 | |
| 519 def threads(self, ret, info): | |
| 520 self.assertIsInstance(ret, list) | |
| 521 for t in ret: | |
| 522 assert is_namedtuple(t) | |
| 523 self.assertGreaterEqual(t.id, 0) | |
| 524 self.assertGreaterEqual(t.user_time, 0) | |
| 525 self.assertGreaterEqual(t.system_time, 0) | |
| 526 for field in t: | |
| 527 self.assertIsInstance(field, (int, float)) | |
| 528 | |
| 529 def cpu_times(self, ret, info): | |
| 530 assert is_namedtuple(ret) | |
| 531 for n in ret: | |
| 532 self.assertIsInstance(n, float) | |
| 533 self.assertGreaterEqual(n, 0) | |
| 534 # TODO: check ntuple fields | |
| 535 | |
| 536 def cpu_percent(self, ret, info): | |
| 537 self.assertIsInstance(ret, float) | |
| 538 assert 0.0 <= ret <= 100.0, ret | |
| 539 | |
| 540 def cpu_num(self, ret, info): | |
| 541 self.assertIsInstance(ret, int) | |
| 542 if FREEBSD and ret == -1: | |
| 543 return | |
| 544 self.assertGreaterEqual(ret, 0) | |
| 545 if psutil.cpu_count() == 1: | |
| 546 self.assertEqual(ret, 0) | |
| 547 self.assertIn(ret, list(range(psutil.cpu_count()))) | |
| 548 | |
| 549 def memory_info(self, ret, info): | |
| 550 assert is_namedtuple(ret) | |
| 551 for value in ret: | |
| 552 self.assertIsInstance(value, (int, long)) | |
| 553 self.assertGreaterEqual(value, 0) | |
| 554 if WINDOWS: | |
| 555 self.assertGreaterEqual(ret.peak_wset, ret.wset) | |
| 556 self.assertGreaterEqual(ret.peak_paged_pool, ret.paged_pool) | |
| 557 self.assertGreaterEqual(ret.peak_nonpaged_pool, ret.nonpaged_pool) | |
| 558 self.assertGreaterEqual(ret.peak_pagefile, ret.pagefile) | |
| 559 | |
| 560 def memory_full_info(self, ret, info): | |
| 561 assert is_namedtuple(ret) | |
| 562 total = psutil.virtual_memory().total | |
| 563 for name in ret._fields: | |
| 564 value = getattr(ret, name) | |
| 565 self.assertIsInstance(value, (int, long)) | |
| 566 self.assertGreaterEqual(value, 0, msg=(name, value)) | |
| 567 if LINUX or OSX and name in ('vms', 'data'): | |
| 568 # On Linux there are processes (e.g. 'goa-daemon') whose | |
| 569 # VMS is incredibly high for some reason. | |
| 570 continue | |
| 571 self.assertLessEqual(value, total, msg=(name, value, total)) | |
| 572 | |
| 573 if LINUX: | |
| 574 self.assertGreaterEqual(ret.pss, ret.uss) | |
| 575 | |
| 576 def open_files(self, ret, info): | |
| 577 self.assertIsInstance(ret, list) | |
| 578 for f in ret: | |
| 579 self.assertIsInstance(f.fd, int) | |
| 580 self.assertIsInstance(f.path, str) | |
| 581 if WINDOWS: | |
| 582 self.assertEqual(f.fd, -1) | |
| 583 elif LINUX: | |
| 584 self.assertIsInstance(f.position, int) | |
| 585 self.assertIsInstance(f.mode, str) | |
| 586 self.assertIsInstance(f.flags, int) | |
| 587 self.assertGreaterEqual(f.position, 0) | |
| 588 self.assertIn(f.mode, ('r', 'w', 'a', 'r+', 'a+')) | |
| 589 self.assertGreater(f.flags, 0) | |
| 590 elif BSD and not f.path: | |
| 591 # XXX see: https://github.com/giampaolo/psutil/issues/595 | |
| 592 continue | |
| 593 assert os.path.isabs(f.path), f | |
| 594 try: | |
| 595 st = os.stat(f.path) | |
| 596 except FileNotFoundError: | |
| 597 pass | |
| 598 else: | |
| 599 assert stat.S_ISREG(st.st_mode), f | |
| 600 | |
| 601 def num_fds(self, ret, info): | |
| 602 self.assertIsInstance(ret, int) | |
| 603 self.assertGreaterEqual(ret, 0) | |
| 604 | |
| 605 def connections(self, ret, info): | |
| 606 with create_sockets(): | |
| 607 self.assertEqual(len(ret), len(set(ret))) | |
| 608 for conn in ret: | |
| 609 assert is_namedtuple(conn) | |
| 610 | |
| 611 def cwd(self, ret, info): | |
| 612 if ret: # 'ret' can be None or empty | |
| 613 self.assertIsInstance(ret, str) | |
| 614 assert os.path.isabs(ret), ret | |
| 615 try: | |
| 616 st = os.stat(ret) | |
| 617 except OSError as err: | |
| 618 if WINDOWS and err.errno in \ | |
| 619 psutil._psplatform.ACCESS_DENIED_SET: | |
| 620 pass | |
| 621 # directory has been removed in mean time | |
| 622 elif err.errno != errno.ENOENT: | |
| 623 raise | |
| 624 else: | |
| 625 assert stat.S_ISDIR(st.st_mode) | |
| 626 | |
| 627 def memory_percent(self, ret, info): | |
| 628 self.assertIsInstance(ret, float) | |
| 629 assert 0 <= ret <= 100, ret | |
| 630 | |
| 631 def is_running(self, ret, info): | |
| 632 self.assertIsInstance(ret, bool) | |
| 633 | |
| 634 def cpu_affinity(self, ret, info): | |
| 635 self.assertIsInstance(ret, list) | |
| 636 assert ret != [], ret | |
| 637 cpus = list(range(psutil.cpu_count())) | |
| 638 for n in ret: | |
| 639 self.assertIsInstance(n, int) | |
| 640 self.assertIn(n, cpus) | |
| 641 | |
| 642 def terminal(self, ret, info): | |
| 643 self.assertIsInstance(ret, (str, type(None))) | |
| 644 if ret is not None: | |
| 645 assert os.path.isabs(ret), ret | |
| 646 assert os.path.exists(ret), ret | |
| 647 | |
| 648 def memory_maps(self, ret, info): | |
| 649 for nt in ret: | |
| 650 self.assertIsInstance(nt.addr, str) | |
| 651 self.assertIsInstance(nt.perms, str) | |
| 652 self.assertIsInstance(nt.path, str) | |
| 653 for fname in nt._fields: | |
| 654 value = getattr(nt, fname) | |
| 655 if fname == 'path': | |
| 656 if not value.startswith('['): | |
| 657 assert os.path.isabs(nt.path), nt.path | |
| 658 # commented as on Linux we might get | |
| 659 # '/foo/bar (deleted)' | |
| 660 # assert os.path.exists(nt.path), nt.path | |
| 661 elif fname == 'addr': | |
| 662 assert value, repr(value) | |
| 663 elif fname == 'perms': | |
| 664 if not WINDOWS: | |
| 665 assert value, repr(value) | |
| 666 else: | |
| 667 self.assertIsInstance(value, (int, long)) | |
| 668 self.assertGreaterEqual(value, 0) | |
| 669 | |
| 670 def num_handles(self, ret, info): | |
| 671 self.assertIsInstance(ret, int) | |
| 672 self.assertGreaterEqual(ret, 0) | |
| 673 | |
| 674 def nice(self, ret, info): | |
| 675 self.assertIsInstance(ret, int) | |
| 676 if POSIX: | |
| 677 assert -20 <= ret <= 20, ret | |
| 678 else: | |
| 679 priorities = [getattr(psutil, x) for x in dir(psutil) | |
| 680 if x.endswith('_PRIORITY_CLASS')] | |
| 681 self.assertIn(ret, priorities) | |
| 682 if sys.version_info > (3, 4): | |
| 683 self.assertIsInstance(ret, enum.IntEnum) | |
| 684 else: | |
| 685 self.assertIsInstance(ret, int) | |
| 686 | |
| 687 def num_ctx_switches(self, ret, info): | |
| 688 assert is_namedtuple(ret) | |
| 689 for value in ret: | |
| 690 self.assertIsInstance(value, (int, long)) | |
| 691 self.assertGreaterEqual(value, 0) | |
| 692 | |
| 693 def rlimit(self, ret, info): | |
| 694 self.assertIsInstance(ret, tuple) | |
| 695 self.assertEqual(len(ret), 2) | |
| 696 self.assertGreaterEqual(ret[0], -1) | |
| 697 self.assertGreaterEqual(ret[1], -1) | |
| 698 | |
| 699 def environ(self, ret, info): | |
| 700 self.assertIsInstance(ret, dict) | |
| 701 for k, v in ret.items(): | |
| 702 self.assertIsInstance(k, str) | |
| 703 self.assertIsInstance(v, str) | |
| 704 | |
| 705 | |
| 706 if __name__ == '__main__': | |
| 707 from psutil.tests.runner import run_from_name | |
| 708 run_from_name(__file__) |
