Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/psutil/tests/test_system.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
| author | guerler |
|---|---|
| date | Fri, 31 Jul 2020 00:32:28 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| 0:d30785e31577 | 1:56ad4e20f292 |
|---|---|
| 1 #!/usr/bin/env python3 | |
| 2 | |
| 3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. | |
| 4 # Use of this source code is governed by a BSD-style license that can be | |
| 5 # found in the LICENSE file. | |
| 6 | |
| 7 """Tests for system APIS.""" | |
| 8 | |
| 9 import contextlib | |
| 10 import datetime | |
| 11 import errno | |
| 12 import os | |
| 13 import pprint | |
| 14 import shutil | |
| 15 import signal | |
| 16 import socket | |
| 17 import sys | |
| 18 import time | |
| 19 | |
| 20 import psutil | |
| 21 from psutil import AIX | |
| 22 from psutil import BSD | |
| 23 from psutil import FREEBSD | |
| 24 from psutil import LINUX | |
| 25 from psutil import MACOS | |
| 26 from psutil import NETBSD | |
| 27 from psutil import OPENBSD | |
| 28 from psutil import POSIX | |
| 29 from psutil import SUNOS | |
| 30 from psutil import WINDOWS | |
| 31 from psutil._compat import FileNotFoundError | |
| 32 from psutil._compat import long | |
| 33 from psutil.tests import ASCII_FS | |
| 34 from psutil.tests import check_net_address | |
| 35 from psutil.tests import CI_TESTING | |
| 36 from psutil.tests import DEVNULL | |
| 37 from psutil.tests import enum | |
| 38 from psutil.tests import GLOBAL_TIMEOUT | |
| 39 from psutil.tests import HAS_BATTERY | |
| 40 from psutil.tests import HAS_CPU_FREQ | |
| 41 from psutil.tests import HAS_GETLOADAVG | |
| 42 from psutil.tests import HAS_NET_IO_COUNTERS | |
| 43 from psutil.tests import HAS_SENSORS_BATTERY | |
| 44 from psutil.tests import HAS_SENSORS_FANS | |
| 45 from psutil.tests import HAS_SENSORS_TEMPERATURES | |
| 46 from psutil.tests import IS_64BIT | |
| 47 from psutil.tests import mock | |
| 48 from psutil.tests import PsutilTestCase | |
| 49 from psutil.tests import PYPY | |
| 50 from psutil.tests import retry_on_failure | |
| 51 from psutil.tests import TRAVIS | |
| 52 from psutil.tests import GITHUB_WHEELS | |
| 53 from psutil.tests import UNICODE_SUFFIX | |
| 54 from psutil.tests import unittest | |
| 55 | |
| 56 | |
| 57 # =================================================================== | |
| 58 # --- System-related API tests | |
| 59 # =================================================================== | |
| 60 | |
| 61 | |
| 62 class TestProcessAPIs(PsutilTestCase): | |
| 63 | |
| 64 def test_process_iter(self): | |
| 65 self.assertIn(os.getpid(), [x.pid for x in psutil.process_iter()]) | |
| 66 sproc = self.spawn_testproc() | |
| 67 self.assertIn(sproc.pid, [x.pid for x in psutil.process_iter()]) | |
| 68 p = psutil.Process(sproc.pid) | |
| 69 p.kill() | |
| 70 p.wait() | |
| 71 self.assertNotIn(sproc.pid, [x.pid for x in psutil.process_iter()]) | |
| 72 | |
| 73 with mock.patch('psutil.Process', | |
| 74 side_effect=psutil.NoSuchProcess(os.getpid())): | |
| 75 self.assertEqual(list(psutil.process_iter()), []) | |
| 76 with mock.patch('psutil.Process', | |
| 77 side_effect=psutil.AccessDenied(os.getpid())): | |
| 78 with self.assertRaises(psutil.AccessDenied): | |
| 79 list(psutil.process_iter()) | |
| 80 | |
| 81 def test_prcess_iter_w_attrs(self): | |
| 82 for p in psutil.process_iter(attrs=['pid']): | |
| 83 self.assertEqual(list(p.info.keys()), ['pid']) | |
| 84 with self.assertRaises(ValueError): | |
| 85 list(psutil.process_iter(attrs=['foo'])) | |
| 86 with mock.patch("psutil._psplatform.Process.cpu_times", | |
| 87 side_effect=psutil.AccessDenied(0, "")) as m: | |
| 88 for p in psutil.process_iter(attrs=["pid", "cpu_times"]): | |
| 89 self.assertIsNone(p.info['cpu_times']) | |
| 90 self.assertGreaterEqual(p.info['pid'], 0) | |
| 91 assert m.called | |
| 92 with mock.patch("psutil._psplatform.Process.cpu_times", | |
| 93 side_effect=psutil.AccessDenied(0, "")) as m: | |
| 94 flag = object() | |
| 95 for p in psutil.process_iter( | |
| 96 attrs=["pid", "cpu_times"], ad_value=flag): | |
| 97 self.assertIs(p.info['cpu_times'], flag) | |
| 98 self.assertGreaterEqual(p.info['pid'], 0) | |
| 99 assert m.called | |
| 100 | |
| 101 @unittest.skipIf(PYPY and WINDOWS, | |
| 102 "spawn_testproc() unreliable on PYPY + WINDOWS") | |
| 103 def test_wait_procs(self): | |
| 104 def callback(p): | |
| 105 pids.append(p.pid) | |
| 106 | |
| 107 pids = [] | |
| 108 sproc1 = self.spawn_testproc() | |
| 109 sproc2 = self.spawn_testproc() | |
| 110 sproc3 = self.spawn_testproc() | |
| 111 procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)] | |
| 112 self.assertRaises(ValueError, psutil.wait_procs, procs, timeout=-1) | |
| 113 self.assertRaises(TypeError, psutil.wait_procs, procs, callback=1) | |
| 114 t = time.time() | |
| 115 gone, alive = psutil.wait_procs(procs, timeout=0.01, callback=callback) | |
| 116 | |
| 117 self.assertLess(time.time() - t, 0.5) | |
| 118 self.assertEqual(gone, []) | |
| 119 self.assertEqual(len(alive), 3) | |
| 120 self.assertEqual(pids, []) | |
| 121 for p in alive: | |
| 122 self.assertFalse(hasattr(p, 'returncode')) | |
| 123 | |
| 124 @retry_on_failure(30) | |
| 125 def test(procs, callback): | |
| 126 gone, alive = psutil.wait_procs(procs, timeout=0.03, | |
| 127 callback=callback) | |
| 128 self.assertEqual(len(gone), 1) | |
| 129 self.assertEqual(len(alive), 2) | |
| 130 return gone, alive | |
| 131 | |
| 132 sproc3.terminate() | |
| 133 gone, alive = test(procs, callback) | |
| 134 self.assertIn(sproc3.pid, [x.pid for x in gone]) | |
| 135 if POSIX: | |
| 136 self.assertEqual(gone.pop().returncode, -signal.SIGTERM) | |
| 137 else: | |
| 138 self.assertEqual(gone.pop().returncode, 1) | |
| 139 self.assertEqual(pids, [sproc3.pid]) | |
| 140 for p in alive: | |
| 141 self.assertFalse(hasattr(p, 'returncode')) | |
| 142 | |
| 143 @retry_on_failure(30) | |
| 144 def test(procs, callback): | |
| 145 gone, alive = psutil.wait_procs(procs, timeout=0.03, | |
| 146 callback=callback) | |
| 147 self.assertEqual(len(gone), 3) | |
| 148 self.assertEqual(len(alive), 0) | |
| 149 return gone, alive | |
| 150 | |
| 151 sproc1.terminate() | |
| 152 sproc2.terminate() | |
| 153 gone, alive = test(procs, callback) | |
| 154 self.assertEqual(set(pids), set([sproc1.pid, sproc2.pid, sproc3.pid])) | |
| 155 for p in gone: | |
| 156 self.assertTrue(hasattr(p, 'returncode')) | |
| 157 | |
| 158 @unittest.skipIf(PYPY and WINDOWS, | |
| 159 "spawn_testproc() unreliable on PYPY + WINDOWS") | |
| 160 def test_wait_procs_no_timeout(self): | |
| 161 sproc1 = self.spawn_testproc() | |
| 162 sproc2 = self.spawn_testproc() | |
| 163 sproc3 = self.spawn_testproc() | |
| 164 procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)] | |
| 165 for p in procs: | |
| 166 p.terminate() | |
| 167 gone, alive = psutil.wait_procs(procs) | |
| 168 | |
| 169 def test_pid_exists(self): | |
| 170 sproc = self.spawn_testproc() | |
| 171 self.assertTrue(psutil.pid_exists(sproc.pid)) | |
| 172 p = psutil.Process(sproc.pid) | |
| 173 p.kill() | |
| 174 p.wait() | |
| 175 self.assertFalse(psutil.pid_exists(sproc.pid)) | |
| 176 self.assertFalse(psutil.pid_exists(-1)) | |
| 177 self.assertEqual(psutil.pid_exists(0), 0 in psutil.pids()) | |
| 178 | |
| 179 def test_pid_exists_2(self): | |
| 180 pids = psutil.pids() | |
| 181 for pid in pids: | |
| 182 try: | |
| 183 assert psutil.pid_exists(pid) | |
| 184 except AssertionError: | |
| 185 # in case the process disappeared in meantime fail only | |
| 186 # if it is no longer in psutil.pids() | |
| 187 time.sleep(.1) | |
| 188 if pid in psutil.pids(): | |
| 189 self.fail(pid) | |
| 190 pids = range(max(pids) + 5000, max(pids) + 6000) | |
| 191 for pid in pids: | |
| 192 self.assertFalse(psutil.pid_exists(pid), msg=pid) | |
| 193 | |
| 194 | |
| 195 class TestMiscAPIs(PsutilTestCase): | |
| 196 | |
| 197 def test_boot_time(self): | |
| 198 bt = psutil.boot_time() | |
| 199 self.assertIsInstance(bt, float) | |
| 200 self.assertGreater(bt, 0) | |
| 201 self.assertLess(bt, time.time()) | |
| 202 | |
| 203 @unittest.skipIf(CI_TESTING and not psutil.users(), "unreliable on CI") | |
| 204 def test_users(self): | |
| 205 users = psutil.users() | |
| 206 self.assertNotEqual(users, []) | |
| 207 for user in users: | |
| 208 assert user.name, user | |
| 209 self.assertIsInstance(user.name, str) | |
| 210 self.assertIsInstance(user.terminal, (str, type(None))) | |
| 211 if user.host is not None: | |
| 212 self.assertIsInstance(user.host, (str, type(None))) | |
| 213 user.terminal | |
| 214 user.host | |
| 215 assert user.started > 0.0, user | |
| 216 datetime.datetime.fromtimestamp(user.started) | |
| 217 if WINDOWS or OPENBSD: | |
| 218 self.assertIsNone(user.pid) | |
| 219 else: | |
| 220 psutil.Process(user.pid) | |
| 221 | |
| 222 @unittest.skipIf(not POSIX, 'POSIX only') | |
| 223 def test_PAGESIZE(self): | |
| 224 # pagesize is used internally to perform different calculations | |
| 225 # and it's determined by using SC_PAGE_SIZE; make sure | |
| 226 # getpagesize() returns the same value. | |
| 227 import resource | |
| 228 self.assertEqual(os.sysconf("SC_PAGE_SIZE"), resource.getpagesize()) | |
| 229 | |
| 230 def test_test(self): | |
| 231 # test for psutil.test() function | |
| 232 stdout = sys.stdout | |
| 233 sys.stdout = DEVNULL | |
| 234 try: | |
| 235 psutil.test() | |
| 236 finally: | |
| 237 sys.stdout = stdout | |
| 238 | |
| 239 def test_os_constants(self): | |
| 240 names = ["POSIX", "WINDOWS", "LINUX", "MACOS", "FREEBSD", "OPENBSD", | |
| 241 "NETBSD", "BSD", "SUNOS"] | |
| 242 for name in names: | |
| 243 self.assertIsInstance(getattr(psutil, name), bool, msg=name) | |
| 244 | |
| 245 if os.name == 'posix': | |
| 246 assert psutil.POSIX | |
| 247 assert not psutil.WINDOWS | |
| 248 names.remove("POSIX") | |
| 249 if "linux" in sys.platform.lower(): | |
| 250 assert psutil.LINUX | |
| 251 names.remove("LINUX") | |
| 252 elif "bsd" in sys.platform.lower(): | |
| 253 assert psutil.BSD | |
| 254 self.assertEqual([psutil.FREEBSD, psutil.OPENBSD, | |
| 255 psutil.NETBSD].count(True), 1) | |
| 256 names.remove("BSD") | |
| 257 names.remove("FREEBSD") | |
| 258 names.remove("OPENBSD") | |
| 259 names.remove("NETBSD") | |
| 260 elif "sunos" in sys.platform.lower() or \ | |
| 261 "solaris" in sys.platform.lower(): | |
| 262 assert psutil.SUNOS | |
| 263 names.remove("SUNOS") | |
| 264 elif "darwin" in sys.platform.lower(): | |
| 265 assert psutil.MACOS | |
| 266 names.remove("MACOS") | |
| 267 else: | |
| 268 assert psutil.WINDOWS | |
| 269 assert not psutil.POSIX | |
| 270 names.remove("WINDOWS") | |
| 271 | |
| 272 # assert all other constants are set to False | |
| 273 for name in names: | |
| 274 self.assertIs(getattr(psutil, name), False, msg=name) | |
| 275 | |
| 276 | |
| 277 class TestMemoryAPIs(PsutilTestCase): | |
| 278 | |
| 279 def test_virtual_memory(self): | |
| 280 mem = psutil.virtual_memory() | |
| 281 assert mem.total > 0, mem | |
| 282 assert mem.available > 0, mem | |
| 283 assert 0 <= mem.percent <= 100, mem | |
| 284 assert mem.used > 0, mem | |
| 285 assert mem.free >= 0, mem | |
| 286 for name in mem._fields: | |
| 287 value = getattr(mem, name) | |
| 288 if name != 'percent': | |
| 289 self.assertIsInstance(value, (int, long)) | |
| 290 if name != 'total': | |
| 291 if not value >= 0: | |
| 292 self.fail("%r < 0 (%s)" % (name, value)) | |
| 293 if value > mem.total: | |
| 294 self.fail("%r > total (total=%s, %s=%s)" | |
| 295 % (name, mem.total, name, value)) | |
| 296 | |
| 297 def test_swap_memory(self): | |
| 298 mem = psutil.swap_memory() | |
| 299 self.assertEqual( | |
| 300 mem._fields, ('total', 'used', 'free', 'percent', 'sin', 'sout')) | |
| 301 | |
| 302 assert mem.total >= 0, mem | |
| 303 assert mem.used >= 0, mem | |
| 304 if mem.total > 0: | |
| 305 # likely a system with no swap partition | |
| 306 assert mem.free > 0, mem | |
| 307 else: | |
| 308 assert mem.free == 0, mem | |
| 309 assert 0 <= mem.percent <= 100, mem | |
| 310 assert mem.sin >= 0, mem | |
| 311 assert mem.sout >= 0, mem | |
| 312 | |
| 313 | |
| 314 class TestCpuAPIs(PsutilTestCase): | |
| 315 | |
| 316 def test_cpu_count_logical(self): | |
| 317 logical = psutil.cpu_count() | |
| 318 self.assertIsNotNone(logical) | |
| 319 self.assertEqual(logical, len(psutil.cpu_times(percpu=True))) | |
| 320 self.assertGreaterEqual(logical, 1) | |
| 321 # | |
| 322 if os.path.exists("/proc/cpuinfo"): | |
| 323 with open("/proc/cpuinfo") as fd: | |
| 324 cpuinfo_data = fd.read() | |
| 325 if "physical id" not in cpuinfo_data: | |
| 326 raise unittest.SkipTest("cpuinfo doesn't include physical id") | |
| 327 | |
| 328 def test_cpu_count_physical(self): | |
| 329 logical = psutil.cpu_count() | |
| 330 physical = psutil.cpu_count(logical=False) | |
| 331 if physical is None: | |
| 332 raise self.skipTest("physical cpu_count() is None") | |
| 333 if WINDOWS and sys.getwindowsversion()[:2] <= (6, 1): # <= Vista | |
| 334 self.assertIsNone(physical) | |
| 335 else: | |
| 336 self.assertGreaterEqual(physical, 1) | |
| 337 self.assertGreaterEqual(logical, physical) | |
| 338 | |
| 339 def test_cpu_count_none(self): | |
| 340 # https://github.com/giampaolo/psutil/issues/1085 | |
| 341 for val in (-1, 0, None): | |
| 342 with mock.patch('psutil._psplatform.cpu_count_logical', | |
| 343 return_value=val) as m: | |
| 344 self.assertIsNone(psutil.cpu_count()) | |
| 345 assert m.called | |
| 346 with mock.patch('psutil._psplatform.cpu_count_physical', | |
| 347 return_value=val) as m: | |
| 348 self.assertIsNone(psutil.cpu_count(logical=False)) | |
| 349 assert m.called | |
| 350 | |
| 351 def test_cpu_times(self): | |
| 352 # Check type, value >= 0, str(). | |
| 353 total = 0 | |
| 354 times = psutil.cpu_times() | |
| 355 sum(times) | |
| 356 for cp_time in times: | |
| 357 self.assertIsInstance(cp_time, float) | |
| 358 self.assertGreaterEqual(cp_time, 0.0) | |
| 359 total += cp_time | |
| 360 self.assertEqual(total, sum(times)) | |
| 361 str(times) | |
| 362 # CPU times are always supposed to increase over time | |
| 363 # or at least remain the same and that's because time | |
| 364 # cannot go backwards. | |
| 365 # Surprisingly sometimes this might not be the case (at | |
| 366 # least on Windows and Linux), see: | |
| 367 # https://github.com/giampaolo/psutil/issues/392 | |
| 368 # https://github.com/giampaolo/psutil/issues/645 | |
| 369 # if not WINDOWS: | |
| 370 # last = psutil.cpu_times() | |
| 371 # for x in range(100): | |
| 372 # new = psutil.cpu_times() | |
| 373 # for field in new._fields: | |
| 374 # new_t = getattr(new, field) | |
| 375 # last_t = getattr(last, field) | |
| 376 # self.assertGreaterEqual(new_t, last_t, | |
| 377 # msg="%s %s" % (new_t, last_t)) | |
| 378 # last = new | |
| 379 | |
| 380 def test_cpu_times_time_increases(self): | |
| 381 # Make sure time increases between calls. | |
| 382 t1 = sum(psutil.cpu_times()) | |
| 383 stop_at = time.time() + GLOBAL_TIMEOUT | |
| 384 while time.time() < stop_at: | |
| 385 t2 = sum(psutil.cpu_times()) | |
| 386 if t2 > t1: | |
| 387 return | |
| 388 self.fail("time remained the same") | |
| 389 | |
| 390 def test_per_cpu_times(self): | |
| 391 # Check type, value >= 0, str(). | |
| 392 for times in psutil.cpu_times(percpu=True): | |
| 393 total = 0 | |
| 394 sum(times) | |
| 395 for cp_time in times: | |
| 396 self.assertIsInstance(cp_time, float) | |
| 397 self.assertGreaterEqual(cp_time, 0.0) | |
| 398 total += cp_time | |
| 399 self.assertEqual(total, sum(times)) | |
| 400 str(times) | |
| 401 self.assertEqual(len(psutil.cpu_times(percpu=True)[0]), | |
| 402 len(psutil.cpu_times(percpu=False))) | |
| 403 | |
| 404 # Note: in theory CPU times are always supposed to increase over | |
| 405 # time or remain the same but never go backwards. In practice | |
| 406 # sometimes this is not the case. | |
| 407 # This issue seemd to be afflict Windows: | |
| 408 # https://github.com/giampaolo/psutil/issues/392 | |
| 409 # ...but it turns out also Linux (rarely) behaves the same. | |
| 410 # last = psutil.cpu_times(percpu=True) | |
| 411 # for x in range(100): | |
| 412 # new = psutil.cpu_times(percpu=True) | |
| 413 # for index in range(len(new)): | |
| 414 # newcpu = new[index] | |
| 415 # lastcpu = last[index] | |
| 416 # for field in newcpu._fields: | |
| 417 # new_t = getattr(newcpu, field) | |
| 418 # last_t = getattr(lastcpu, field) | |
| 419 # self.assertGreaterEqual( | |
| 420 # new_t, last_t, msg="%s %s" % (lastcpu, newcpu)) | |
| 421 # last = new | |
| 422 | |
| 423 def test_per_cpu_times_2(self): | |
| 424 # Simulate some work load then make sure time have increased | |
| 425 # between calls. | |
| 426 tot1 = psutil.cpu_times(percpu=True) | |
| 427 giveup_at = time.time() + GLOBAL_TIMEOUT | |
| 428 while True: | |
| 429 if time.time() >= giveup_at: | |
| 430 return self.fail("timeout") | |
| 431 tot2 = psutil.cpu_times(percpu=True) | |
| 432 for t1, t2 in zip(tot1, tot2): | |
| 433 t1, t2 = psutil._cpu_busy_time(t1), psutil._cpu_busy_time(t2) | |
| 434 difference = t2 - t1 | |
| 435 if difference >= 0.05: | |
| 436 return | |
| 437 | |
| 438 def test_cpu_times_comparison(self): | |
| 439 # Make sure the sum of all per cpu times is almost equal to | |
| 440 # base "one cpu" times. | |
| 441 base = psutil.cpu_times() | |
| 442 per_cpu = psutil.cpu_times(percpu=True) | |
| 443 summed_values = base._make([sum(num) for num in zip(*per_cpu)]) | |
| 444 for field in base._fields: | |
| 445 self.assertAlmostEqual( | |
| 446 getattr(base, field), getattr(summed_values, field), delta=1) | |
| 447 | |
| 448 def _test_cpu_percent(self, percent, last_ret, new_ret): | |
| 449 try: | |
| 450 self.assertIsInstance(percent, float) | |
| 451 self.assertGreaterEqual(percent, 0.0) | |
| 452 self.assertIsNot(percent, -0.0) | |
| 453 self.assertLessEqual(percent, 100.0 * psutil.cpu_count()) | |
| 454 except AssertionError as err: | |
| 455 raise AssertionError("\n%s\nlast=%s\nnew=%s" % ( | |
| 456 err, pprint.pformat(last_ret), pprint.pformat(new_ret))) | |
| 457 | |
| 458 def test_cpu_percent(self): | |
| 459 last = psutil.cpu_percent(interval=0.001) | |
| 460 for x in range(100): | |
| 461 new = psutil.cpu_percent(interval=None) | |
| 462 self._test_cpu_percent(new, last, new) | |
| 463 last = new | |
| 464 with self.assertRaises(ValueError): | |
| 465 psutil.cpu_percent(interval=-1) | |
| 466 | |
| 467 def test_per_cpu_percent(self): | |
| 468 last = psutil.cpu_percent(interval=0.001, percpu=True) | |
| 469 self.assertEqual(len(last), psutil.cpu_count()) | |
| 470 for x in range(100): | |
| 471 new = psutil.cpu_percent(interval=None, percpu=True) | |
| 472 for percent in new: | |
| 473 self._test_cpu_percent(percent, last, new) | |
| 474 last = new | |
| 475 with self.assertRaises(ValueError): | |
| 476 psutil.cpu_percent(interval=-1, percpu=True) | |
| 477 | |
| 478 def test_cpu_times_percent(self): | |
| 479 last = psutil.cpu_times_percent(interval=0.001) | |
| 480 for x in range(100): | |
| 481 new = psutil.cpu_times_percent(interval=None) | |
| 482 for percent in new: | |
| 483 self._test_cpu_percent(percent, last, new) | |
| 484 self._test_cpu_percent(sum(new), last, new) | |
| 485 last = new | |
| 486 | |
| 487 def test_per_cpu_times_percent(self): | |
| 488 last = psutil.cpu_times_percent(interval=0.001, percpu=True) | |
| 489 self.assertEqual(len(last), psutil.cpu_count()) | |
| 490 for x in range(100): | |
| 491 new = psutil.cpu_times_percent(interval=None, percpu=True) | |
| 492 for cpu in new: | |
| 493 for percent in cpu: | |
| 494 self._test_cpu_percent(percent, last, new) | |
| 495 self._test_cpu_percent(sum(cpu), last, new) | |
| 496 last = new | |
| 497 | |
| 498 def test_per_cpu_times_percent_negative(self): | |
| 499 # see: https://github.com/giampaolo/psutil/issues/645 | |
| 500 psutil.cpu_times_percent(percpu=True) | |
| 501 zero_times = [x._make([0 for x in range(len(x._fields))]) | |
| 502 for x in psutil.cpu_times(percpu=True)] | |
| 503 with mock.patch('psutil.cpu_times', return_value=zero_times): | |
| 504 for cpu in psutil.cpu_times_percent(percpu=True): | |
| 505 for percent in cpu: | |
| 506 self._test_cpu_percent(percent, None, None) | |
| 507 | |
| 508 def test_cpu_stats(self): | |
| 509 # Tested more extensively in per-platform test modules. | |
| 510 infos = psutil.cpu_stats() | |
| 511 self.assertEqual( | |
| 512 infos._fields, | |
| 513 ('ctx_switches', 'interrupts', 'soft_interrupts', 'syscalls')) | |
| 514 for name in infos._fields: | |
| 515 value = getattr(infos, name) | |
| 516 self.assertGreaterEqual(value, 0) | |
| 517 # on AIX, ctx_switches is always 0 | |
| 518 if not AIX and name in ('ctx_switches', 'interrupts'): | |
| 519 self.assertGreater(value, 0) | |
| 520 | |
| 521 @unittest.skipIf(not HAS_CPU_FREQ, "not suported") | |
| 522 def test_cpu_freq(self): | |
| 523 def check_ls(ls): | |
| 524 for nt in ls: | |
| 525 self.assertEqual(nt._fields, ('current', 'min', 'max')) | |
| 526 if nt.max != 0.0: | |
| 527 self.assertLessEqual(nt.current, nt.max) | |
| 528 for name in nt._fields: | |
| 529 value = getattr(nt, name) | |
| 530 self.assertIsInstance(value, (int, long, float)) | |
| 531 self.assertGreaterEqual(value, 0) | |
| 532 | |
| 533 ls = psutil.cpu_freq(percpu=True) | |
| 534 if TRAVIS and not ls: | |
| 535 raise self.skipTest("skipped on Travis") | |
| 536 if FREEBSD and not ls: | |
| 537 raise self.skipTest("returns empty list on FreeBSD") | |
| 538 | |
| 539 assert ls, ls | |
| 540 check_ls([psutil.cpu_freq(percpu=False)]) | |
| 541 | |
| 542 if LINUX: | |
| 543 self.assertEqual(len(ls), psutil.cpu_count()) | |
| 544 | |
| 545 @unittest.skipIf(not HAS_GETLOADAVG, "not supported") | |
| 546 def test_getloadavg(self): | |
| 547 loadavg = psutil.getloadavg() | |
| 548 self.assertEqual(len(loadavg), 3) | |
| 549 for load in loadavg: | |
| 550 self.assertIsInstance(load, float) | |
| 551 self.assertGreaterEqual(load, 0.0) | |
| 552 | |
| 553 | |
| 554 class TestDiskAPIs(PsutilTestCase): | |
| 555 | |
| 556 @unittest.skipIf(PYPY and not IS_64BIT, "unreliable on PYPY32 + 32BIT") | |
| 557 def test_disk_usage(self): | |
| 558 usage = psutil.disk_usage(os.getcwd()) | |
| 559 self.assertEqual(usage._fields, ('total', 'used', 'free', 'percent')) | |
| 560 | |
| 561 assert usage.total > 0, usage | |
| 562 assert usage.used > 0, usage | |
| 563 assert usage.free > 0, usage | |
| 564 assert usage.total > usage.used, usage | |
| 565 assert usage.total > usage.free, usage | |
| 566 assert 0 <= usage.percent <= 100, usage.percent | |
| 567 if hasattr(shutil, 'disk_usage'): | |
| 568 # py >= 3.3, see: http://bugs.python.org/issue12442 | |
| 569 shutil_usage = shutil.disk_usage(os.getcwd()) | |
| 570 tolerance = 5 * 1024 * 1024 # 5MB | |
| 571 self.assertEqual(usage.total, shutil_usage.total) | |
| 572 self.assertAlmostEqual(usage.free, shutil_usage.free, | |
| 573 delta=tolerance) | |
| 574 self.assertAlmostEqual(usage.used, shutil_usage.used, | |
| 575 delta=tolerance) | |
| 576 | |
| 577 # if path does not exist OSError ENOENT is expected across | |
| 578 # all platforms | |
| 579 fname = self.get_testfn() | |
| 580 with self.assertRaises(FileNotFoundError): | |
| 581 psutil.disk_usage(fname) | |
| 582 | |
| 583 @unittest.skipIf(not ASCII_FS, "not an ASCII fs") | |
| 584 def test_disk_usage_unicode(self): | |
| 585 # See: https://github.com/giampaolo/psutil/issues/416 | |
| 586 with self.assertRaises(UnicodeEncodeError): | |
| 587 psutil.disk_usage(UNICODE_SUFFIX) | |
| 588 | |
| 589 def test_disk_usage_bytes(self): | |
| 590 psutil.disk_usage(b'.') | |
| 591 | |
| 592 def test_disk_partitions(self): | |
| 593 # all = False | |
| 594 ls = psutil.disk_partitions(all=False) | |
| 595 # on travis we get: | |
| 596 # self.assertEqual(p.cpu_affinity(), [n]) | |
| 597 # AssertionError: Lists differ: [0, 1, 2, 3, 4, 5, 6, 7,... != [0] | |
| 598 self.assertTrue(ls, msg=ls) | |
| 599 for disk in ls: | |
| 600 self.assertIsInstance(disk.device, str) | |
| 601 self.assertIsInstance(disk.mountpoint, str) | |
| 602 self.assertIsInstance(disk.fstype, str) | |
| 603 self.assertIsInstance(disk.opts, str) | |
| 604 if WINDOWS and 'cdrom' in disk.opts: | |
| 605 continue | |
| 606 if not POSIX: | |
| 607 assert os.path.exists(disk.device), disk | |
| 608 else: | |
| 609 # we cannot make any assumption about this, see: | |
| 610 # http://goo.gl/p9c43 | |
| 611 disk.device | |
| 612 # on modern systems mount points can also be files | |
| 613 assert os.path.exists(disk.mountpoint), disk | |
| 614 assert disk.fstype, disk | |
| 615 | |
| 616 # all = True | |
| 617 ls = psutil.disk_partitions(all=True) | |
| 618 self.assertTrue(ls, msg=ls) | |
| 619 for disk in psutil.disk_partitions(all=True): | |
| 620 if not WINDOWS and disk.mountpoint: | |
| 621 try: | |
| 622 os.stat(disk.mountpoint) | |
| 623 except OSError as err: | |
| 624 if (GITHUB_WHEELS or TRAVIS) and \ | |
| 625 MACOS and err.errno == errno.EIO: | |
| 626 continue | |
| 627 # http://mail.python.org/pipermail/python-dev/ | |
| 628 # 2012-June/120787.html | |
| 629 if err.errno not in (errno.EPERM, errno.EACCES): | |
| 630 raise | |
| 631 else: | |
| 632 assert os.path.exists(disk.mountpoint), disk | |
| 633 self.assertIsInstance(disk.fstype, str) | |
| 634 self.assertIsInstance(disk.opts, str) | |
| 635 | |
| 636 def find_mount_point(path): | |
| 637 path = os.path.abspath(path) | |
| 638 while not os.path.ismount(path): | |
| 639 path = os.path.dirname(path) | |
| 640 return path.lower() | |
| 641 | |
| 642 mount = find_mount_point(__file__) | |
| 643 mounts = [x.mountpoint.lower() for x in | |
| 644 psutil.disk_partitions(all=True) if x.mountpoint] | |
| 645 self.assertIn(mount, mounts) | |
| 646 psutil.disk_usage(mount) | |
| 647 | |
| 648 @unittest.skipIf(LINUX and not os.path.exists('/proc/diskstats'), | |
| 649 '/proc/diskstats not available on this linux version') | |
| 650 @unittest.skipIf(CI_TESTING and not psutil.disk_io_counters(), | |
| 651 "unreliable on CI") # no visible disks | |
| 652 def test_disk_io_counters(self): | |
| 653 def check_ntuple(nt): | |
| 654 self.assertEqual(nt[0], nt.read_count) | |
| 655 self.assertEqual(nt[1], nt.write_count) | |
| 656 self.assertEqual(nt[2], nt.read_bytes) | |
| 657 self.assertEqual(nt[3], nt.write_bytes) | |
| 658 if not (OPENBSD or NETBSD): | |
| 659 self.assertEqual(nt[4], nt.read_time) | |
| 660 self.assertEqual(nt[5], nt.write_time) | |
| 661 if LINUX: | |
| 662 self.assertEqual(nt[6], nt.read_merged_count) | |
| 663 self.assertEqual(nt[7], nt.write_merged_count) | |
| 664 self.assertEqual(nt[8], nt.busy_time) | |
| 665 elif FREEBSD: | |
| 666 self.assertEqual(nt[6], nt.busy_time) | |
| 667 for name in nt._fields: | |
| 668 assert getattr(nt, name) >= 0, nt | |
| 669 | |
| 670 ret = psutil.disk_io_counters(perdisk=False) | |
| 671 assert ret is not None, "no disks on this system?" | |
| 672 check_ntuple(ret) | |
| 673 ret = psutil.disk_io_counters(perdisk=True) | |
| 674 # make sure there are no duplicates | |
| 675 self.assertEqual(len(ret), len(set(ret))) | |
| 676 for key in ret: | |
| 677 assert key, key | |
| 678 check_ntuple(ret[key]) | |
| 679 | |
| 680 def test_disk_io_counters_no_disks(self): | |
| 681 # Emulate a case where no disks are installed, see: | |
| 682 # https://github.com/giampaolo/psutil/issues/1062 | |
| 683 with mock.patch('psutil._psplatform.disk_io_counters', | |
| 684 return_value={}) as m: | |
| 685 self.assertIsNone(psutil.disk_io_counters(perdisk=False)) | |
| 686 self.assertEqual(psutil.disk_io_counters(perdisk=True), {}) | |
| 687 assert m.called | |
| 688 | |
| 689 | |
| 690 class TestNetAPIs(PsutilTestCase): | |
| 691 | |
| 692 @unittest.skipIf(not HAS_NET_IO_COUNTERS, 'not supported') | |
| 693 def test_net_io_counters(self): | |
| 694 def check_ntuple(nt): | |
| 695 self.assertEqual(nt[0], nt.bytes_sent) | |
| 696 self.assertEqual(nt[1], nt.bytes_recv) | |
| 697 self.assertEqual(nt[2], nt.packets_sent) | |
| 698 self.assertEqual(nt[3], nt.packets_recv) | |
| 699 self.assertEqual(nt[4], nt.errin) | |
| 700 self.assertEqual(nt[5], nt.errout) | |
| 701 self.assertEqual(nt[6], nt.dropin) | |
| 702 self.assertEqual(nt[7], nt.dropout) | |
| 703 assert nt.bytes_sent >= 0, nt | |
| 704 assert nt.bytes_recv >= 0, nt | |
| 705 assert nt.packets_sent >= 0, nt | |
| 706 assert nt.packets_recv >= 0, nt | |
| 707 assert nt.errin >= 0, nt | |
| 708 assert nt.errout >= 0, nt | |
| 709 assert nt.dropin >= 0, nt | |
| 710 assert nt.dropout >= 0, nt | |
| 711 | |
| 712 ret = psutil.net_io_counters(pernic=False) | |
| 713 check_ntuple(ret) | |
| 714 ret = psutil.net_io_counters(pernic=True) | |
| 715 self.assertNotEqual(ret, []) | |
| 716 for key in ret: | |
| 717 self.assertTrue(key) | |
| 718 self.assertIsInstance(key, str) | |
| 719 check_ntuple(ret[key]) | |
| 720 | |
| 721 @unittest.skipIf(not HAS_NET_IO_COUNTERS, 'not supported') | |
| 722 def test_net_io_counters_no_nics(self): | |
| 723 # Emulate a case where no NICs are installed, see: | |
| 724 # https://github.com/giampaolo/psutil/issues/1062 | |
| 725 with mock.patch('psutil._psplatform.net_io_counters', | |
| 726 return_value={}) as m: | |
| 727 self.assertIsNone(psutil.net_io_counters(pernic=False)) | |
| 728 self.assertEqual(psutil.net_io_counters(pernic=True), {}) | |
| 729 assert m.called | |
| 730 | |
| 731 def test_net_if_addrs(self): | |
| 732 nics = psutil.net_if_addrs() | |
| 733 assert nics, nics | |
| 734 | |
| 735 nic_stats = psutil.net_if_stats() | |
| 736 | |
| 737 # Not reliable on all platforms (net_if_addrs() reports more | |
| 738 # interfaces). | |
| 739 # self.assertEqual(sorted(nics.keys()), | |
| 740 # sorted(psutil.net_io_counters(pernic=True).keys())) | |
| 741 | |
| 742 families = set([socket.AF_INET, socket.AF_INET6, psutil.AF_LINK]) | |
| 743 for nic, addrs in nics.items(): | |
| 744 self.assertIsInstance(nic, str) | |
| 745 self.assertEqual(len(set(addrs)), len(addrs)) | |
| 746 for addr in addrs: | |
| 747 self.assertIsInstance(addr.family, int) | |
| 748 self.assertIsInstance(addr.address, str) | |
| 749 self.assertIsInstance(addr.netmask, (str, type(None))) | |
| 750 self.assertIsInstance(addr.broadcast, (str, type(None))) | |
| 751 self.assertIn(addr.family, families) | |
| 752 if sys.version_info >= (3, 4) and not PYPY: | |
| 753 self.assertIsInstance(addr.family, enum.IntEnum) | |
| 754 if nic_stats[nic].isup: | |
| 755 # Do not test binding to addresses of interfaces | |
| 756 # that are down | |
| 757 if addr.family == socket.AF_INET: | |
| 758 s = socket.socket(addr.family) | |
| 759 with contextlib.closing(s): | |
| 760 s.bind((addr.address, 0)) | |
| 761 elif addr.family == socket.AF_INET6: | |
| 762 info = socket.getaddrinfo( | |
| 763 addr.address, 0, socket.AF_INET6, | |
| 764 socket.SOCK_STREAM, 0, socket.AI_PASSIVE)[0] | |
| 765 af, socktype, proto, canonname, sa = info | |
| 766 s = socket.socket(af, socktype, proto) | |
| 767 with contextlib.closing(s): | |
| 768 s.bind(sa) | |
| 769 for ip in (addr.address, addr.netmask, addr.broadcast, | |
| 770 addr.ptp): | |
| 771 if ip is not None: | |
| 772 # TODO: skip AF_INET6 for now because I get: | |
| 773 # AddressValueError: Only hex digits permitted in | |
| 774 # u'c6f3%lxcbr0' in u'fe80::c8e0:fff:fe54:c6f3%lxcbr0' | |
| 775 if addr.family != socket.AF_INET6: | |
| 776 check_net_address(ip, addr.family) | |
| 777 # broadcast and ptp addresses are mutually exclusive | |
| 778 if addr.broadcast: | |
| 779 self.assertIsNone(addr.ptp) | |
| 780 elif addr.ptp: | |
| 781 self.assertIsNone(addr.broadcast) | |
| 782 | |
| 783 if BSD or MACOS or SUNOS: | |
| 784 if hasattr(socket, "AF_LINK"): | |
| 785 self.assertEqual(psutil.AF_LINK, socket.AF_LINK) | |
| 786 elif LINUX: | |
| 787 self.assertEqual(psutil.AF_LINK, socket.AF_PACKET) | |
| 788 elif WINDOWS: | |
| 789 self.assertEqual(psutil.AF_LINK, -1) | |
| 790 | |
| 791 def test_net_if_addrs_mac_null_bytes(self): | |
| 792 # Simulate that the underlying C function returns an incomplete | |
| 793 # MAC address. psutil is supposed to fill it with null bytes. | |
| 794 # https://github.com/giampaolo/psutil/issues/786 | |
| 795 if POSIX: | |
| 796 ret = [('em1', psutil.AF_LINK, '06:3d:29', None, None, None)] | |
| 797 else: | |
| 798 ret = [('em1', -1, '06-3d-29', None, None, None)] | |
| 799 with mock.patch('psutil._psplatform.net_if_addrs', | |
| 800 return_value=ret) as m: | |
| 801 addr = psutil.net_if_addrs()['em1'][0] | |
| 802 assert m.called | |
| 803 if POSIX: | |
| 804 self.assertEqual(addr.address, '06:3d:29:00:00:00') | |
| 805 else: | |
| 806 self.assertEqual(addr.address, '06-3d-29-00-00-00') | |
| 807 | |
| 808 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS") # raises EPERM | |
| 809 def test_net_if_stats(self): | |
| 810 nics = psutil.net_if_stats() | |
| 811 assert nics, nics | |
| 812 all_duplexes = (psutil.NIC_DUPLEX_FULL, | |
| 813 psutil.NIC_DUPLEX_HALF, | |
| 814 psutil.NIC_DUPLEX_UNKNOWN) | |
| 815 for name, stats in nics.items(): | |
| 816 self.assertIsInstance(name, str) | |
| 817 isup, duplex, speed, mtu = stats | |
| 818 self.assertIsInstance(isup, bool) | |
| 819 self.assertIn(duplex, all_duplexes) | |
| 820 self.assertIn(duplex, all_duplexes) | |
| 821 self.assertGreaterEqual(speed, 0) | |
| 822 self.assertGreaterEqual(mtu, 0) | |
| 823 | |
| 824 @unittest.skipIf(not (LINUX or BSD or MACOS), | |
| 825 "LINUX or BSD or MACOS specific") | |
| 826 def test_net_if_stats_enodev(self): | |
| 827 # See: https://github.com/giampaolo/psutil/issues/1279 | |
| 828 with mock.patch('psutil._psutil_posix.net_if_mtu', | |
| 829 side_effect=OSError(errno.ENODEV, "")) as m: | |
| 830 ret = psutil.net_if_stats() | |
| 831 self.assertEqual(ret, {}) | |
| 832 assert m.called | |
| 833 | |
| 834 | |
| 835 class TestSensorsAPIs(PsutilTestCase): | |
| 836 | |
| 837 @unittest.skipIf(not HAS_SENSORS_TEMPERATURES, "not supported") | |
| 838 def test_sensors_temperatures(self): | |
| 839 temps = psutil.sensors_temperatures() | |
| 840 for name, entries in temps.items(): | |
| 841 self.assertIsInstance(name, str) | |
| 842 for entry in entries: | |
| 843 self.assertIsInstance(entry.label, str) | |
| 844 if entry.current is not None: | |
| 845 self.assertGreaterEqual(entry.current, 0) | |
| 846 if entry.high is not None: | |
| 847 self.assertGreaterEqual(entry.high, 0) | |
| 848 if entry.critical is not None: | |
| 849 self.assertGreaterEqual(entry.critical, 0) | |
| 850 | |
| 851 @unittest.skipIf(not HAS_SENSORS_TEMPERATURES, "not supported") | |
| 852 def test_sensors_temperatures_fahreneit(self): | |
| 853 d = {'coretemp': [('label', 50.0, 60.0, 70.0)]} | |
| 854 with mock.patch("psutil._psplatform.sensors_temperatures", | |
| 855 return_value=d) as m: | |
| 856 temps = psutil.sensors_temperatures( | |
| 857 fahrenheit=True)['coretemp'][0] | |
| 858 assert m.called | |
| 859 self.assertEqual(temps.current, 122.0) | |
| 860 self.assertEqual(temps.high, 140.0) | |
| 861 self.assertEqual(temps.critical, 158.0) | |
| 862 | |
| 863 @unittest.skipIf(not HAS_SENSORS_BATTERY, "not supported") | |
| 864 @unittest.skipIf(not HAS_BATTERY, "no battery") | |
| 865 def test_sensors_battery(self): | |
| 866 ret = psutil.sensors_battery() | |
| 867 self.assertGreaterEqual(ret.percent, 0) | |
| 868 self.assertLessEqual(ret.percent, 100) | |
| 869 if ret.secsleft not in (psutil.POWER_TIME_UNKNOWN, | |
| 870 psutil.POWER_TIME_UNLIMITED): | |
| 871 self.assertGreaterEqual(ret.secsleft, 0) | |
| 872 else: | |
| 873 if ret.secsleft == psutil.POWER_TIME_UNLIMITED: | |
| 874 self.assertTrue(ret.power_plugged) | |
| 875 self.assertIsInstance(ret.power_plugged, bool) | |
| 876 | |
| 877 @unittest.skipIf(not HAS_SENSORS_FANS, "not supported") | |
| 878 def test_sensors_fans(self): | |
| 879 fans = psutil.sensors_fans() | |
| 880 for name, entries in fans.items(): | |
| 881 self.assertIsInstance(name, str) | |
| 882 for entry in entries: | |
| 883 self.assertIsInstance(entry.label, str) | |
| 884 self.assertIsInstance(entry.current, (int, long)) | |
| 885 self.assertGreaterEqual(entry.current, 0) | |
| 886 | |
| 887 | |
| 888 if __name__ == '__main__': | |
| 889 from psutil.tests.runner import run_from_name | |
| 890 run_from_name(__file__) |
