Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/psutil/tests/test_system.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
| author | shellac |
|---|---|
| date | Sat, 02 May 2020 07:14:21 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:26e78fe6e8c4 |
|---|---|
| 1 #!/usr/bin/env python3 | |
| 2 | |
| 3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. | |
| 4 # Use of this source code is governed by a BSD-style license that can be | |
| 5 # found in the LICENSE file. | |
| 6 | |
| 7 """Tests for system APIS.""" | |
| 8 | |
| 9 import contextlib | |
| 10 import datetime | |
| 11 import errno | |
| 12 import os | |
| 13 import pprint | |
| 14 import shutil | |
| 15 import signal | |
| 16 import socket | |
| 17 import sys | |
| 18 import tempfile | |
| 19 import time | |
| 20 | |
| 21 import psutil | |
| 22 from psutil import AIX | |
| 23 from psutil import BSD | |
| 24 from psutil import FREEBSD | |
| 25 from psutil import LINUX | |
| 26 from psutil import MACOS | |
| 27 from psutil import NETBSD | |
| 28 from psutil import OPENBSD | |
| 29 from psutil import POSIX | |
| 30 from psutil import SUNOS | |
| 31 from psutil import WINDOWS | |
| 32 from psutil._compat import FileNotFoundError | |
| 33 from psutil._compat import long | |
| 34 from psutil.tests import ASCII_FS | |
| 35 from psutil.tests import check_net_address | |
| 36 from psutil.tests import CI_TESTING | |
| 37 from psutil.tests import DEVNULL | |
| 38 from psutil.tests import enum | |
| 39 from psutil.tests import get_test_subprocess | |
| 40 from psutil.tests import HAS_BATTERY | |
| 41 from psutil.tests import HAS_CPU_FREQ | |
| 42 from psutil.tests import HAS_GETLOADAVG | |
| 43 from psutil.tests import HAS_NET_IO_COUNTERS | |
| 44 from psutil.tests import HAS_SENSORS_BATTERY | |
| 45 from psutil.tests import HAS_SENSORS_FANS | |
| 46 from psutil.tests import HAS_SENSORS_TEMPERATURES | |
| 47 from psutil.tests import mock | |
| 48 from psutil.tests import PYPY | |
| 49 from psutil.tests import reap_children | |
| 50 from psutil.tests import retry_on_failure | |
| 51 from psutil.tests import safe_rmpath | |
| 52 from psutil.tests import TESTFN | |
| 53 from psutil.tests import TESTFN_UNICODE | |
| 54 from psutil.tests import TRAVIS | |
| 55 from psutil.tests import unittest | |
| 56 | |
| 57 | |
| 58 # =================================================================== | |
| 59 # --- System-related API tests | |
| 60 # =================================================================== | |
| 61 | |
| 62 | |
| 63 class TestProcessAPIs(unittest.TestCase): | |
| 64 | |
| 65 def setUp(self): | |
| 66 safe_rmpath(TESTFN) | |
| 67 | |
| 68 def tearDown(self): | |
| 69 reap_children() | |
| 70 | |
| 71 def test_process_iter(self): | |
| 72 self.assertIn(os.getpid(), [x.pid for x in psutil.process_iter()]) | |
| 73 sproc = get_test_subprocess() | |
| 74 self.assertIn(sproc.pid, [x.pid for x in psutil.process_iter()]) | |
| 75 p = psutil.Process(sproc.pid) | |
| 76 p.kill() | |
| 77 p.wait() | |
| 78 self.assertNotIn(sproc.pid, [x.pid for x in psutil.process_iter()]) | |
| 79 | |
| 80 with mock.patch('psutil.Process', | |
| 81 side_effect=psutil.NoSuchProcess(os.getpid())): | |
| 82 self.assertEqual(list(psutil.process_iter()), []) | |
| 83 with mock.patch('psutil.Process', | |
| 84 side_effect=psutil.AccessDenied(os.getpid())): | |
| 85 with self.assertRaises(psutil.AccessDenied): | |
| 86 list(psutil.process_iter()) | |
| 87 | |
| 88 def test_prcess_iter_w_attrs(self): | |
| 89 for p in psutil.process_iter(attrs=['pid']): | |
| 90 self.assertEqual(list(p.info.keys()), ['pid']) | |
| 91 with self.assertRaises(ValueError): | |
| 92 list(psutil.process_iter(attrs=['foo'])) | |
| 93 with mock.patch("psutil._psplatform.Process.cpu_times", | |
| 94 side_effect=psutil.AccessDenied(0, "")) as m: | |
| 95 for p in psutil.process_iter(attrs=["pid", "cpu_times"]): | |
| 96 self.assertIsNone(p.info['cpu_times']) | |
| 97 self.assertGreaterEqual(p.info['pid'], 0) | |
| 98 assert m.called | |
| 99 with mock.patch("psutil._psplatform.Process.cpu_times", | |
| 100 side_effect=psutil.AccessDenied(0, "")) as m: | |
| 101 flag = object() | |
| 102 for p in psutil.process_iter( | |
| 103 attrs=["pid", "cpu_times"], ad_value=flag): | |
| 104 self.assertIs(p.info['cpu_times'], flag) | |
| 105 self.assertGreaterEqual(p.info['pid'], 0) | |
| 106 assert m.called | |
| 107 | |
| 108 @unittest.skipIf(PYPY and WINDOWS, | |
| 109 "get_test_subprocess() unreliable on PYPY + WINDOWS") | |
| 110 def test_wait_procs(self): | |
| 111 def callback(p): | |
| 112 pids.append(p.pid) | |
| 113 | |
| 114 pids = [] | |
| 115 sproc1 = get_test_subprocess() | |
| 116 sproc2 = get_test_subprocess() | |
| 117 sproc3 = get_test_subprocess() | |
| 118 procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)] | |
| 119 self.assertRaises(ValueError, psutil.wait_procs, procs, timeout=-1) | |
| 120 self.assertRaises(TypeError, psutil.wait_procs, procs, callback=1) | |
| 121 t = time.time() | |
| 122 gone, alive = psutil.wait_procs(procs, timeout=0.01, callback=callback) | |
| 123 | |
| 124 self.assertLess(time.time() - t, 0.5) | |
| 125 self.assertEqual(gone, []) | |
| 126 self.assertEqual(len(alive), 3) | |
| 127 self.assertEqual(pids, []) | |
| 128 for p in alive: | |
| 129 self.assertFalse(hasattr(p, 'returncode')) | |
| 130 | |
| 131 @retry_on_failure(30) | |
| 132 def test(procs, callback): | |
| 133 gone, alive = psutil.wait_procs(procs, timeout=0.03, | |
| 134 callback=callback) | |
| 135 self.assertEqual(len(gone), 1) | |
| 136 self.assertEqual(len(alive), 2) | |
| 137 return gone, alive | |
| 138 | |
| 139 sproc3.terminate() | |
| 140 gone, alive = test(procs, callback) | |
| 141 self.assertIn(sproc3.pid, [x.pid for x in gone]) | |
| 142 if POSIX: | |
| 143 self.assertEqual(gone.pop().returncode, -signal.SIGTERM) | |
| 144 else: | |
| 145 self.assertEqual(gone.pop().returncode, 1) | |
| 146 self.assertEqual(pids, [sproc3.pid]) | |
| 147 for p in alive: | |
| 148 self.assertFalse(hasattr(p, 'returncode')) | |
| 149 | |
| 150 @retry_on_failure(30) | |
| 151 def test(procs, callback): | |
| 152 gone, alive = psutil.wait_procs(procs, timeout=0.03, | |
| 153 callback=callback) | |
| 154 self.assertEqual(len(gone), 3) | |
| 155 self.assertEqual(len(alive), 0) | |
| 156 return gone, alive | |
| 157 | |
| 158 sproc1.terminate() | |
| 159 sproc2.terminate() | |
| 160 gone, alive = test(procs, callback) | |
| 161 self.assertEqual(set(pids), set([sproc1.pid, sproc2.pid, sproc3.pid])) | |
| 162 for p in gone: | |
| 163 self.assertTrue(hasattr(p, 'returncode')) | |
| 164 | |
| 165 @unittest.skipIf(PYPY and WINDOWS, | |
| 166 "get_test_subprocess() unreliable on PYPY + WINDOWS") | |
| 167 def test_wait_procs_no_timeout(self): | |
| 168 sproc1 = get_test_subprocess() | |
| 169 sproc2 = get_test_subprocess() | |
| 170 sproc3 = get_test_subprocess() | |
| 171 procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)] | |
| 172 for p in procs: | |
| 173 p.terminate() | |
| 174 gone, alive = psutil.wait_procs(procs) | |
| 175 | |
| 176 def test_pid_exists(self): | |
| 177 sproc = get_test_subprocess() | |
| 178 self.assertTrue(psutil.pid_exists(sproc.pid)) | |
| 179 p = psutil.Process(sproc.pid) | |
| 180 p.kill() | |
| 181 p.wait() | |
| 182 self.assertFalse(psutil.pid_exists(sproc.pid)) | |
| 183 self.assertFalse(psutil.pid_exists(-1)) | |
| 184 self.assertEqual(psutil.pid_exists(0), 0 in psutil.pids()) | |
| 185 | |
| 186 def test_pid_exists_2(self): | |
| 187 reap_children() | |
| 188 pids = psutil.pids() | |
| 189 for pid in pids: | |
| 190 try: | |
| 191 assert psutil.pid_exists(pid) | |
| 192 except AssertionError: | |
| 193 # in case the process disappeared in meantime fail only | |
| 194 # if it is no longer in psutil.pids() | |
| 195 time.sleep(.1) | |
| 196 if pid in psutil.pids(): | |
| 197 self.fail(pid) | |
| 198 pids = range(max(pids) + 5000, max(pids) + 6000) | |
| 199 for pid in pids: | |
| 200 self.assertFalse(psutil.pid_exists(pid), msg=pid) | |
| 201 | |
| 202 def test_pids(self): | |
| 203 pidslist = psutil.pids() | |
| 204 procslist = [x.pid for x in psutil.process_iter()] | |
| 205 # make sure every pid is unique | |
| 206 self.assertEqual(sorted(set(pidslist)), pidslist) | |
| 207 self.assertEqual(pidslist, procslist) | |
| 208 | |
| 209 | |
| 210 class TestMiscAPIs(unittest.TestCase): | |
| 211 | |
| 212 def test_boot_time(self): | |
| 213 bt = psutil.boot_time() | |
| 214 self.assertIsInstance(bt, float) | |
| 215 self.assertGreater(bt, 0) | |
| 216 self.assertLess(bt, time.time()) | |
| 217 | |
| 218 @unittest.skipIf(CI_TESTING and not psutil.users(), "unreliable on CI") | |
| 219 def test_users(self): | |
| 220 users = psutil.users() | |
| 221 self.assertNotEqual(users, []) | |
| 222 for user in users: | |
| 223 assert user.name, user | |
| 224 self.assertIsInstance(user.name, str) | |
| 225 self.assertIsInstance(user.terminal, (str, type(None))) | |
| 226 if user.host is not None: | |
| 227 self.assertIsInstance(user.host, (str, type(None))) | |
| 228 user.terminal | |
| 229 user.host | |
| 230 assert user.started > 0.0, user | |
| 231 datetime.datetime.fromtimestamp(user.started) | |
| 232 if WINDOWS or OPENBSD: | |
| 233 self.assertIsNone(user.pid) | |
| 234 else: | |
| 235 psutil.Process(user.pid) | |
| 236 | |
| 237 @unittest.skipIf(not POSIX, 'POSIX only') | |
| 238 def test_PAGESIZE(self): | |
| 239 # pagesize is used internally to perform different calculations | |
| 240 # and it's determined by using SC_PAGE_SIZE; make sure | |
| 241 # getpagesize() returns the same value. | |
| 242 import resource | |
| 243 self.assertEqual(os.sysconf("SC_PAGE_SIZE"), resource.getpagesize()) | |
| 244 | |
| 245 def test_test(self): | |
| 246 # test for psutil.test() function | |
| 247 stdout = sys.stdout | |
| 248 sys.stdout = DEVNULL | |
| 249 try: | |
| 250 psutil.test() | |
| 251 finally: | |
| 252 sys.stdout = stdout | |
| 253 | |
| 254 def test_os_constants(self): | |
| 255 names = ["POSIX", "WINDOWS", "LINUX", "MACOS", "FREEBSD", "OPENBSD", | |
| 256 "NETBSD", "BSD", "SUNOS"] | |
| 257 for name in names: | |
| 258 self.assertIsInstance(getattr(psutil, name), bool, msg=name) | |
| 259 | |
| 260 if os.name == 'posix': | |
| 261 assert psutil.POSIX | |
| 262 assert not psutil.WINDOWS | |
| 263 names.remove("POSIX") | |
| 264 if "linux" in sys.platform.lower(): | |
| 265 assert psutil.LINUX | |
| 266 names.remove("LINUX") | |
| 267 elif "bsd" in sys.platform.lower(): | |
| 268 assert psutil.BSD | |
| 269 self.assertEqual([psutil.FREEBSD, psutil.OPENBSD, | |
| 270 psutil.NETBSD].count(True), 1) | |
| 271 names.remove("BSD") | |
| 272 names.remove("FREEBSD") | |
| 273 names.remove("OPENBSD") | |
| 274 names.remove("NETBSD") | |
| 275 elif "sunos" in sys.platform.lower() or \ | |
| 276 "solaris" in sys.platform.lower(): | |
| 277 assert psutil.SUNOS | |
| 278 names.remove("SUNOS") | |
| 279 elif "darwin" in sys.platform.lower(): | |
| 280 assert psutil.MACOS | |
| 281 names.remove("MACOS") | |
| 282 else: | |
| 283 assert psutil.WINDOWS | |
| 284 assert not psutil.POSIX | |
| 285 names.remove("WINDOWS") | |
| 286 | |
| 287 # assert all other constants are set to False | |
| 288 for name in names: | |
| 289 self.assertIs(getattr(psutil, name), False, msg=name) | |
| 290 | |
| 291 | |
| 292 class TestMemoryAPIs(unittest.TestCase): | |
| 293 | |
| 294 def test_virtual_memory(self): | |
| 295 mem = psutil.virtual_memory() | |
| 296 assert mem.total > 0, mem | |
| 297 assert mem.available > 0, mem | |
| 298 assert 0 <= mem.percent <= 100, mem | |
| 299 assert mem.used > 0, mem | |
| 300 assert mem.free >= 0, mem | |
| 301 for name in mem._fields: | |
| 302 value = getattr(mem, name) | |
| 303 if name != 'percent': | |
| 304 self.assertIsInstance(value, (int, long)) | |
| 305 if name != 'total': | |
| 306 if not value >= 0: | |
| 307 self.fail("%r < 0 (%s)" % (name, value)) | |
| 308 if value > mem.total: | |
| 309 self.fail("%r > total (total=%s, %s=%s)" | |
| 310 % (name, mem.total, name, value)) | |
| 311 | |
| 312 def test_swap_memory(self): | |
| 313 mem = psutil.swap_memory() | |
| 314 self.assertEqual( | |
| 315 mem._fields, ('total', 'used', 'free', 'percent', 'sin', 'sout')) | |
| 316 | |
| 317 assert mem.total >= 0, mem | |
| 318 assert mem.used >= 0, mem | |
| 319 if mem.total > 0: | |
| 320 # likely a system with no swap partition | |
| 321 assert mem.free > 0, mem | |
| 322 else: | |
| 323 assert mem.free == 0, mem | |
| 324 assert 0 <= mem.percent <= 100, mem | |
| 325 assert mem.sin >= 0, mem | |
| 326 assert mem.sout >= 0, mem | |
| 327 | |
| 328 | |
| 329 class TestCpuAPIs(unittest.TestCase): | |
| 330 | |
| 331 def test_cpu_count_logical(self): | |
| 332 logical = psutil.cpu_count() | |
| 333 self.assertIsNotNone(logical) | |
| 334 self.assertEqual(logical, len(psutil.cpu_times(percpu=True))) | |
| 335 self.assertGreaterEqual(logical, 1) | |
| 336 # | |
| 337 if os.path.exists("/proc/cpuinfo"): | |
| 338 with open("/proc/cpuinfo") as fd: | |
| 339 cpuinfo_data = fd.read() | |
| 340 if "physical id" not in cpuinfo_data: | |
| 341 raise unittest.SkipTest("cpuinfo doesn't include physical id") | |
| 342 | |
| 343 def test_cpu_count_physical(self): | |
| 344 logical = psutil.cpu_count() | |
| 345 physical = psutil.cpu_count(logical=False) | |
| 346 if physical is None: | |
| 347 raise self.skipTest("physical cpu_count() is None") | |
| 348 if WINDOWS and sys.getwindowsversion()[:2] <= (6, 1): # <= Vista | |
| 349 self.assertIsNone(physical) | |
| 350 else: | |
| 351 self.assertGreaterEqual(physical, 1) | |
| 352 self.assertGreaterEqual(logical, physical) | |
| 353 | |
| 354 def test_cpu_count_none(self): | |
| 355 # https://github.com/giampaolo/psutil/issues/1085 | |
| 356 for val in (-1, 0, None): | |
| 357 with mock.patch('psutil._psplatform.cpu_count_logical', | |
| 358 return_value=val) as m: | |
| 359 self.assertIsNone(psutil.cpu_count()) | |
| 360 assert m.called | |
| 361 with mock.patch('psutil._psplatform.cpu_count_physical', | |
| 362 return_value=val) as m: | |
| 363 self.assertIsNone(psutil.cpu_count(logical=False)) | |
| 364 assert m.called | |
| 365 | |
| 366 def test_cpu_times(self): | |
| 367 # Check type, value >= 0, str(). | |
| 368 total = 0 | |
| 369 times = psutil.cpu_times() | |
| 370 sum(times) | |
| 371 for cp_time in times: | |
| 372 self.assertIsInstance(cp_time, float) | |
| 373 self.assertGreaterEqual(cp_time, 0.0) | |
| 374 total += cp_time | |
| 375 self.assertEqual(total, sum(times)) | |
| 376 str(times) | |
| 377 # CPU times are always supposed to increase over time | |
| 378 # or at least remain the same and that's because time | |
| 379 # cannot go backwards. | |
| 380 # Surprisingly sometimes this might not be the case (at | |
| 381 # least on Windows and Linux), see: | |
| 382 # https://github.com/giampaolo/psutil/issues/392 | |
| 383 # https://github.com/giampaolo/psutil/issues/645 | |
| 384 # if not WINDOWS: | |
| 385 # last = psutil.cpu_times() | |
| 386 # for x in range(100): | |
| 387 # new = psutil.cpu_times() | |
| 388 # for field in new._fields: | |
| 389 # new_t = getattr(new, field) | |
| 390 # last_t = getattr(last, field) | |
| 391 # self.assertGreaterEqual(new_t, last_t, | |
| 392 # msg="%s %s" % (new_t, last_t)) | |
| 393 # last = new | |
| 394 | |
| 395 def test_cpu_times_time_increases(self): | |
| 396 # Make sure time increases between calls. | |
| 397 t1 = sum(psutil.cpu_times()) | |
| 398 stop_at = time.time() + 1 | |
| 399 while time.time() < stop_at: | |
| 400 t2 = sum(psutil.cpu_times()) | |
| 401 if t2 > t1: | |
| 402 return | |
| 403 self.fail("time remained the same") | |
| 404 | |
| 405 def test_per_cpu_times(self): | |
| 406 # Check type, value >= 0, str(). | |
| 407 for times in psutil.cpu_times(percpu=True): | |
| 408 total = 0 | |
| 409 sum(times) | |
| 410 for cp_time in times: | |
| 411 self.assertIsInstance(cp_time, float) | |
| 412 self.assertGreaterEqual(cp_time, 0.0) | |
| 413 total += cp_time | |
| 414 self.assertEqual(total, sum(times)) | |
| 415 str(times) | |
| 416 self.assertEqual(len(psutil.cpu_times(percpu=True)[0]), | |
| 417 len(psutil.cpu_times(percpu=False))) | |
| 418 | |
| 419 # Note: in theory CPU times are always supposed to increase over | |
| 420 # time or remain the same but never go backwards. In practice | |
| 421 # sometimes this is not the case. | |
| 422 # This issue seemd to be afflict Windows: | |
| 423 # https://github.com/giampaolo/psutil/issues/392 | |
| 424 # ...but it turns out also Linux (rarely) behaves the same. | |
| 425 # last = psutil.cpu_times(percpu=True) | |
| 426 # for x in range(100): | |
| 427 # new = psutil.cpu_times(percpu=True) | |
| 428 # for index in range(len(new)): | |
| 429 # newcpu = new[index] | |
| 430 # lastcpu = last[index] | |
| 431 # for field in newcpu._fields: | |
| 432 # new_t = getattr(newcpu, field) | |
| 433 # last_t = getattr(lastcpu, field) | |
| 434 # self.assertGreaterEqual( | |
| 435 # new_t, last_t, msg="%s %s" % (lastcpu, newcpu)) | |
| 436 # last = new | |
| 437 | |
| 438 def test_per_cpu_times_2(self): | |
| 439 # Simulate some work load then make sure time have increased | |
| 440 # between calls. | |
| 441 tot1 = psutil.cpu_times(percpu=True) | |
| 442 giveup_at = time.time() + 1 | |
| 443 while True: | |
| 444 if time.time() >= giveup_at: | |
| 445 return self.fail("timeout") | |
| 446 tot2 = psutil.cpu_times(percpu=True) | |
| 447 for t1, t2 in zip(tot1, tot2): | |
| 448 t1, t2 = psutil._cpu_busy_time(t1), psutil._cpu_busy_time(t2) | |
| 449 difference = t2 - t1 | |
| 450 if difference >= 0.05: | |
| 451 return | |
| 452 | |
| 453 def test_cpu_times_comparison(self): | |
| 454 # Make sure the sum of all per cpu times is almost equal to | |
| 455 # base "one cpu" times. | |
| 456 base = psutil.cpu_times() | |
| 457 per_cpu = psutil.cpu_times(percpu=True) | |
| 458 summed_values = base._make([sum(num) for num in zip(*per_cpu)]) | |
| 459 for field in base._fields: | |
| 460 self.assertAlmostEqual( | |
| 461 getattr(base, field), getattr(summed_values, field), delta=1) | |
| 462 | |
| 463 def _test_cpu_percent(self, percent, last_ret, new_ret): | |
| 464 try: | |
| 465 self.assertIsInstance(percent, float) | |
| 466 self.assertGreaterEqual(percent, 0.0) | |
| 467 self.assertIsNot(percent, -0.0) | |
| 468 self.assertLessEqual(percent, 100.0 * psutil.cpu_count()) | |
| 469 except AssertionError as err: | |
| 470 raise AssertionError("\n%s\nlast=%s\nnew=%s" % ( | |
| 471 err, pprint.pformat(last_ret), pprint.pformat(new_ret))) | |
| 472 | |
| 473 def test_cpu_percent(self): | |
| 474 last = psutil.cpu_percent(interval=0.001) | |
| 475 for x in range(100): | |
| 476 new = psutil.cpu_percent(interval=None) | |
| 477 self._test_cpu_percent(new, last, new) | |
| 478 last = new | |
| 479 with self.assertRaises(ValueError): | |
| 480 psutil.cpu_percent(interval=-1) | |
| 481 | |
| 482 def test_per_cpu_percent(self): | |
| 483 last = psutil.cpu_percent(interval=0.001, percpu=True) | |
| 484 self.assertEqual(len(last), psutil.cpu_count()) | |
| 485 for x in range(100): | |
| 486 new = psutil.cpu_percent(interval=None, percpu=True) | |
| 487 for percent in new: | |
| 488 self._test_cpu_percent(percent, last, new) | |
| 489 last = new | |
| 490 with self.assertRaises(ValueError): | |
| 491 psutil.cpu_percent(interval=-1, percpu=True) | |
| 492 | |
| 493 def test_cpu_times_percent(self): | |
| 494 last = psutil.cpu_times_percent(interval=0.001) | |
| 495 for x in range(100): | |
| 496 new = psutil.cpu_times_percent(interval=None) | |
| 497 for percent in new: | |
| 498 self._test_cpu_percent(percent, last, new) | |
| 499 self._test_cpu_percent(sum(new), last, new) | |
| 500 last = new | |
| 501 | |
| 502 def test_per_cpu_times_percent(self): | |
| 503 last = psutil.cpu_times_percent(interval=0.001, percpu=True) | |
| 504 self.assertEqual(len(last), psutil.cpu_count()) | |
| 505 for x in range(100): | |
| 506 new = psutil.cpu_times_percent(interval=None, percpu=True) | |
| 507 for cpu in new: | |
| 508 for percent in cpu: | |
| 509 self._test_cpu_percent(percent, last, new) | |
| 510 self._test_cpu_percent(sum(cpu), last, new) | |
| 511 last = new | |
| 512 | |
| 513 def test_per_cpu_times_percent_negative(self): | |
| 514 # see: https://github.com/giampaolo/psutil/issues/645 | |
| 515 psutil.cpu_times_percent(percpu=True) | |
| 516 zero_times = [x._make([0 for x in range(len(x._fields))]) | |
| 517 for x in psutil.cpu_times(percpu=True)] | |
| 518 with mock.patch('psutil.cpu_times', return_value=zero_times): | |
| 519 for cpu in psutil.cpu_times_percent(percpu=True): | |
| 520 for percent in cpu: | |
| 521 self._test_cpu_percent(percent, None, None) | |
| 522 | |
| 523 def test_cpu_stats(self): | |
| 524 # Tested more extensively in per-platform test modules. | |
| 525 infos = psutil.cpu_stats() | |
| 526 self.assertEqual( | |
| 527 infos._fields, | |
| 528 ('ctx_switches', 'interrupts', 'soft_interrupts', 'syscalls')) | |
| 529 for name in infos._fields: | |
| 530 value = getattr(infos, name) | |
| 531 self.assertGreaterEqual(value, 0) | |
| 532 # on AIX, ctx_switches is always 0 | |
| 533 if not AIX and name in ('ctx_switches', 'interrupts'): | |
| 534 self.assertGreater(value, 0) | |
| 535 | |
| 536 @unittest.skipIf(not HAS_CPU_FREQ, "not suported") | |
| 537 def test_cpu_freq(self): | |
| 538 def check_ls(ls): | |
| 539 for nt in ls: | |
| 540 self.assertEqual(nt._fields, ('current', 'min', 'max')) | |
| 541 if nt.max != 0.0: | |
| 542 self.assertLessEqual(nt.current, nt.max) | |
| 543 for name in nt._fields: | |
| 544 value = getattr(nt, name) | |
| 545 self.assertIsInstance(value, (int, long, float)) | |
| 546 self.assertGreaterEqual(value, 0) | |
| 547 | |
| 548 ls = psutil.cpu_freq(percpu=True) | |
| 549 if TRAVIS and not ls: | |
| 550 raise self.skipTest("skipped on Travis") | |
| 551 if FREEBSD and not ls: | |
| 552 raise self.skipTest("returns empty list on FreeBSD") | |
| 553 | |
| 554 assert ls, ls | |
| 555 check_ls([psutil.cpu_freq(percpu=False)]) | |
| 556 | |
| 557 if LINUX: | |
| 558 self.assertEqual(len(ls), psutil.cpu_count()) | |
| 559 | |
| 560 @unittest.skipIf(not HAS_GETLOADAVG, "not supported") | |
| 561 def test_getloadavg(self): | |
| 562 loadavg = psutil.getloadavg() | |
| 563 assert len(loadavg) == 3 | |
| 564 | |
| 565 for load in loadavg: | |
| 566 self.assertIsInstance(load, float) | |
| 567 self.assertGreaterEqual(load, 0.0) | |
| 568 | |
| 569 | |
| 570 class TestDiskAPIs(unittest.TestCase): | |
| 571 | |
| 572 def test_disk_usage(self): | |
| 573 usage = psutil.disk_usage(os.getcwd()) | |
| 574 self.assertEqual(usage._fields, ('total', 'used', 'free', 'percent')) | |
| 575 | |
| 576 assert usage.total > 0, usage | |
| 577 assert usage.used > 0, usage | |
| 578 assert usage.free > 0, usage | |
| 579 assert usage.total > usage.used, usage | |
| 580 assert usage.total > usage.free, usage | |
| 581 assert 0 <= usage.percent <= 100, usage.percent | |
| 582 if hasattr(shutil, 'disk_usage'): | |
| 583 # py >= 3.3, see: http://bugs.python.org/issue12442 | |
| 584 shutil_usage = shutil.disk_usage(os.getcwd()) | |
| 585 tolerance = 5 * 1024 * 1024 # 5MB | |
| 586 self.assertEqual(usage.total, shutil_usage.total) | |
| 587 self.assertAlmostEqual(usage.free, shutil_usage.free, | |
| 588 delta=tolerance) | |
| 589 self.assertAlmostEqual(usage.used, shutil_usage.used, | |
| 590 delta=tolerance) | |
| 591 | |
| 592 # if path does not exist OSError ENOENT is expected across | |
| 593 # all platforms | |
| 594 fname = tempfile.mktemp() | |
| 595 with self.assertRaises(FileNotFoundError): | |
| 596 psutil.disk_usage(fname) | |
| 597 | |
| 598 def test_disk_usage_unicode(self): | |
| 599 # See: https://github.com/giampaolo/psutil/issues/416 | |
| 600 if ASCII_FS: | |
| 601 with self.assertRaises(UnicodeEncodeError): | |
| 602 psutil.disk_usage(TESTFN_UNICODE) | |
| 603 | |
| 604 def test_disk_usage_bytes(self): | |
| 605 psutil.disk_usage(b'.') | |
| 606 | |
| 607 def test_disk_partitions(self): | |
| 608 # all = False | |
| 609 ls = psutil.disk_partitions(all=False) | |
| 610 # on travis we get: | |
| 611 # self.assertEqual(p.cpu_affinity(), [n]) | |
| 612 # AssertionError: Lists differ: [0, 1, 2, 3, 4, 5, 6, 7,... != [0] | |
| 613 self.assertTrue(ls, msg=ls) | |
| 614 for disk in ls: | |
| 615 self.assertIsInstance(disk.device, str) | |
| 616 self.assertIsInstance(disk.mountpoint, str) | |
| 617 self.assertIsInstance(disk.fstype, str) | |
| 618 self.assertIsInstance(disk.opts, str) | |
| 619 if WINDOWS and 'cdrom' in disk.opts: | |
| 620 continue | |
| 621 if not POSIX: | |
| 622 assert os.path.exists(disk.device), disk | |
| 623 else: | |
| 624 # we cannot make any assumption about this, see: | |
| 625 # http://goo.gl/p9c43 | |
| 626 disk.device | |
| 627 # on modern systems mount points can also be files | |
| 628 assert os.path.exists(disk.mountpoint), disk | |
| 629 assert disk.fstype, disk | |
| 630 | |
| 631 # all = True | |
| 632 ls = psutil.disk_partitions(all=True) | |
| 633 self.assertTrue(ls, msg=ls) | |
| 634 for disk in psutil.disk_partitions(all=True): | |
| 635 if not WINDOWS and disk.mountpoint: | |
| 636 try: | |
| 637 os.stat(disk.mountpoint) | |
| 638 except OSError as err: | |
| 639 if TRAVIS and MACOS and err.errno == errno.EIO: | |
| 640 continue | |
| 641 # http://mail.python.org/pipermail/python-dev/ | |
| 642 # 2012-June/120787.html | |
| 643 if err.errno not in (errno.EPERM, errno.EACCES): | |
| 644 raise | |
| 645 else: | |
| 646 assert os.path.exists(disk.mountpoint), disk | |
| 647 self.assertIsInstance(disk.fstype, str) | |
| 648 self.assertIsInstance(disk.opts, str) | |
| 649 | |
| 650 def find_mount_point(path): | |
| 651 path = os.path.abspath(path) | |
| 652 while not os.path.ismount(path): | |
| 653 path = os.path.dirname(path) | |
| 654 return path.lower() | |
| 655 | |
| 656 mount = find_mount_point(__file__) | |
| 657 mounts = [x.mountpoint.lower() for x in | |
| 658 psutil.disk_partitions(all=True) if x.mountpoint] | |
| 659 self.assertIn(mount, mounts) | |
| 660 psutil.disk_usage(mount) | |
| 661 | |
| 662 @unittest.skipIf(LINUX and not os.path.exists('/proc/diskstats'), | |
| 663 '/proc/diskstats not available on this linux version') | |
| 664 @unittest.skipIf(CI_TESTING and not psutil.disk_io_counters(), | |
| 665 "unreliable on CI") # no visible disks | |
| 666 def test_disk_io_counters(self): | |
| 667 def check_ntuple(nt): | |
| 668 self.assertEqual(nt[0], nt.read_count) | |
| 669 self.assertEqual(nt[1], nt.write_count) | |
| 670 self.assertEqual(nt[2], nt.read_bytes) | |
| 671 self.assertEqual(nt[3], nt.write_bytes) | |
| 672 if not (OPENBSD or NETBSD): | |
| 673 self.assertEqual(nt[4], nt.read_time) | |
| 674 self.assertEqual(nt[5], nt.write_time) | |
| 675 if LINUX: | |
| 676 self.assertEqual(nt[6], nt.read_merged_count) | |
| 677 self.assertEqual(nt[7], nt.write_merged_count) | |
| 678 self.assertEqual(nt[8], nt.busy_time) | |
| 679 elif FREEBSD: | |
| 680 self.assertEqual(nt[6], nt.busy_time) | |
| 681 for name in nt._fields: | |
| 682 assert getattr(nt, name) >= 0, nt | |
| 683 | |
| 684 ret = psutil.disk_io_counters(perdisk=False) | |
| 685 assert ret is not None, "no disks on this system?" | |
| 686 check_ntuple(ret) | |
| 687 ret = psutil.disk_io_counters(perdisk=True) | |
| 688 # make sure there are no duplicates | |
| 689 self.assertEqual(len(ret), len(set(ret))) | |
| 690 for key in ret: | |
| 691 assert key, key | |
| 692 check_ntuple(ret[key]) | |
| 693 | |
| 694 def test_disk_io_counters_no_disks(self): | |
| 695 # Emulate a case where no disks are installed, see: | |
| 696 # https://github.com/giampaolo/psutil/issues/1062 | |
| 697 with mock.patch('psutil._psplatform.disk_io_counters', | |
| 698 return_value={}) as m: | |
| 699 self.assertIsNone(psutil.disk_io_counters(perdisk=False)) | |
| 700 self.assertEqual(psutil.disk_io_counters(perdisk=True), {}) | |
| 701 assert m.called | |
| 702 | |
| 703 | |
| 704 class TestNetAPIs(unittest.TestCase): | |
| 705 | |
| 706 @unittest.skipIf(not HAS_NET_IO_COUNTERS, 'not supported') | |
| 707 def test_net_io_counters(self): | |
| 708 def check_ntuple(nt): | |
| 709 self.assertEqual(nt[0], nt.bytes_sent) | |
| 710 self.assertEqual(nt[1], nt.bytes_recv) | |
| 711 self.assertEqual(nt[2], nt.packets_sent) | |
| 712 self.assertEqual(nt[3], nt.packets_recv) | |
| 713 self.assertEqual(nt[4], nt.errin) | |
| 714 self.assertEqual(nt[5], nt.errout) | |
| 715 self.assertEqual(nt[6], nt.dropin) | |
| 716 self.assertEqual(nt[7], nt.dropout) | |
| 717 assert nt.bytes_sent >= 0, nt | |
| 718 assert nt.bytes_recv >= 0, nt | |
| 719 assert nt.packets_sent >= 0, nt | |
| 720 assert nt.packets_recv >= 0, nt | |
| 721 assert nt.errin >= 0, nt | |
| 722 assert nt.errout >= 0, nt | |
| 723 assert nt.dropin >= 0, nt | |
| 724 assert nt.dropout >= 0, nt | |
| 725 | |
| 726 ret = psutil.net_io_counters(pernic=False) | |
| 727 check_ntuple(ret) | |
| 728 ret = psutil.net_io_counters(pernic=True) | |
| 729 self.assertNotEqual(ret, []) | |
| 730 for key in ret: | |
| 731 self.assertTrue(key) | |
| 732 self.assertIsInstance(key, str) | |
| 733 check_ntuple(ret[key]) | |
| 734 | |
| 735 @unittest.skipIf(not HAS_NET_IO_COUNTERS, 'not supported') | |
| 736 def test_net_io_counters_no_nics(self): | |
| 737 # Emulate a case where no NICs are installed, see: | |
| 738 # https://github.com/giampaolo/psutil/issues/1062 | |
| 739 with mock.patch('psutil._psplatform.net_io_counters', | |
| 740 return_value={}) as m: | |
| 741 self.assertIsNone(psutil.net_io_counters(pernic=False)) | |
| 742 self.assertEqual(psutil.net_io_counters(pernic=True), {}) | |
| 743 assert m.called | |
| 744 | |
| 745 def test_net_if_addrs(self): | |
| 746 nics = psutil.net_if_addrs() | |
| 747 assert nics, nics | |
| 748 | |
| 749 nic_stats = psutil.net_if_stats() | |
| 750 | |
| 751 # Not reliable on all platforms (net_if_addrs() reports more | |
| 752 # interfaces). | |
| 753 # self.assertEqual(sorted(nics.keys()), | |
| 754 # sorted(psutil.net_io_counters(pernic=True).keys())) | |
| 755 | |
| 756 families = set([socket.AF_INET, socket.AF_INET6, psutil.AF_LINK]) | |
| 757 for nic, addrs in nics.items(): | |
| 758 self.assertIsInstance(nic, str) | |
| 759 self.assertEqual(len(set(addrs)), len(addrs)) | |
| 760 for addr in addrs: | |
| 761 self.assertIsInstance(addr.family, int) | |
| 762 self.assertIsInstance(addr.address, str) | |
| 763 self.assertIsInstance(addr.netmask, (str, type(None))) | |
| 764 self.assertIsInstance(addr.broadcast, (str, type(None))) | |
| 765 self.assertIn(addr.family, families) | |
| 766 if sys.version_info >= (3, 4): | |
| 767 self.assertIsInstance(addr.family, enum.IntEnum) | |
| 768 if nic_stats[nic].isup: | |
| 769 # Do not test binding to addresses of interfaces | |
| 770 # that are down | |
| 771 if addr.family == socket.AF_INET: | |
| 772 s = socket.socket(addr.family) | |
| 773 with contextlib.closing(s): | |
| 774 s.bind((addr.address, 0)) | |
| 775 elif addr.family == socket.AF_INET6: | |
| 776 info = socket.getaddrinfo( | |
| 777 addr.address, 0, socket.AF_INET6, | |
| 778 socket.SOCK_STREAM, 0, socket.AI_PASSIVE)[0] | |
| 779 af, socktype, proto, canonname, sa = info | |
| 780 s = socket.socket(af, socktype, proto) | |
| 781 with contextlib.closing(s): | |
| 782 s.bind(sa) | |
| 783 for ip in (addr.address, addr.netmask, addr.broadcast, | |
| 784 addr.ptp): | |
| 785 if ip is not None: | |
| 786 # TODO: skip AF_INET6 for now because I get: | |
| 787 # AddressValueError: Only hex digits permitted in | |
| 788 # u'c6f3%lxcbr0' in u'fe80::c8e0:fff:fe54:c6f3%lxcbr0' | |
| 789 if addr.family != socket.AF_INET6: | |
| 790 check_net_address(ip, addr.family) | |
| 791 # broadcast and ptp addresses are mutually exclusive | |
| 792 if addr.broadcast: | |
| 793 self.assertIsNone(addr.ptp) | |
| 794 elif addr.ptp: | |
| 795 self.assertIsNone(addr.broadcast) | |
| 796 | |
| 797 if BSD or MACOS or SUNOS: | |
| 798 if hasattr(socket, "AF_LINK"): | |
| 799 self.assertEqual(psutil.AF_LINK, socket.AF_LINK) | |
| 800 elif LINUX: | |
| 801 self.assertEqual(psutil.AF_LINK, socket.AF_PACKET) | |
| 802 elif WINDOWS: | |
| 803 self.assertEqual(psutil.AF_LINK, -1) | |
| 804 | |
| 805 def test_net_if_addrs_mac_null_bytes(self): | |
| 806 # Simulate that the underlying C function returns an incomplete | |
| 807 # MAC address. psutil is supposed to fill it with null bytes. | |
| 808 # https://github.com/giampaolo/psutil/issues/786 | |
| 809 if POSIX: | |
| 810 ret = [('em1', psutil.AF_LINK, '06:3d:29', None, None, None)] | |
| 811 else: | |
| 812 ret = [('em1', -1, '06-3d-29', None, None, None)] | |
| 813 with mock.patch('psutil._psplatform.net_if_addrs', | |
| 814 return_value=ret) as m: | |
| 815 addr = psutil.net_if_addrs()['em1'][0] | |
| 816 assert m.called | |
| 817 if POSIX: | |
| 818 self.assertEqual(addr.address, '06:3d:29:00:00:00') | |
| 819 else: | |
| 820 self.assertEqual(addr.address, '06-3d-29-00-00-00') | |
| 821 | |
| 822 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS") # raises EPERM | |
| 823 def test_net_if_stats(self): | |
| 824 nics = psutil.net_if_stats() | |
| 825 assert nics, nics | |
| 826 all_duplexes = (psutil.NIC_DUPLEX_FULL, | |
| 827 psutil.NIC_DUPLEX_HALF, | |
| 828 psutil.NIC_DUPLEX_UNKNOWN) | |
| 829 for name, stats in nics.items(): | |
| 830 self.assertIsInstance(name, str) | |
| 831 isup, duplex, speed, mtu = stats | |
| 832 self.assertIsInstance(isup, bool) | |
| 833 self.assertIn(duplex, all_duplexes) | |
| 834 self.assertIn(duplex, all_duplexes) | |
| 835 self.assertGreaterEqual(speed, 0) | |
| 836 self.assertGreaterEqual(mtu, 0) | |
| 837 | |
| 838 @unittest.skipIf(not (LINUX or BSD or MACOS), | |
| 839 "LINUX or BSD or MACOS specific") | |
| 840 def test_net_if_stats_enodev(self): | |
| 841 # See: https://github.com/giampaolo/psutil/issues/1279 | |
| 842 with mock.patch('psutil._psutil_posix.net_if_mtu', | |
| 843 side_effect=OSError(errno.ENODEV, "")) as m: | |
| 844 ret = psutil.net_if_stats() | |
| 845 self.assertEqual(ret, {}) | |
| 846 assert m.called | |
| 847 | |
| 848 | |
| 849 class TestSensorsAPIs(unittest.TestCase): | |
| 850 | |
| 851 @unittest.skipIf(not HAS_SENSORS_TEMPERATURES, "not supported") | |
| 852 def test_sensors_temperatures(self): | |
| 853 temps = psutil.sensors_temperatures() | |
| 854 for name, entries in temps.items(): | |
| 855 self.assertIsInstance(name, str) | |
| 856 for entry in entries: | |
| 857 self.assertIsInstance(entry.label, str) | |
| 858 if entry.current is not None: | |
| 859 self.assertGreaterEqual(entry.current, 0) | |
| 860 if entry.high is not None: | |
| 861 self.assertGreaterEqual(entry.high, 0) | |
| 862 if entry.critical is not None: | |
| 863 self.assertGreaterEqual(entry.critical, 0) | |
| 864 | |
| 865 @unittest.skipIf(not HAS_SENSORS_TEMPERATURES, "not supported") | |
| 866 def test_sensors_temperatures_fahreneit(self): | |
| 867 d = {'coretemp': [('label', 50.0, 60.0, 70.0)]} | |
| 868 with mock.patch("psutil._psplatform.sensors_temperatures", | |
| 869 return_value=d) as m: | |
| 870 temps = psutil.sensors_temperatures( | |
| 871 fahrenheit=True)['coretemp'][0] | |
| 872 assert m.called | |
| 873 self.assertEqual(temps.current, 122.0) | |
| 874 self.assertEqual(temps.high, 140.0) | |
| 875 self.assertEqual(temps.critical, 158.0) | |
| 876 | |
| 877 @unittest.skipIf(not HAS_SENSORS_BATTERY, "not supported") | |
| 878 @unittest.skipIf(not HAS_BATTERY, "no battery") | |
| 879 def test_sensors_battery(self): | |
| 880 ret = psutil.sensors_battery() | |
| 881 self.assertGreaterEqual(ret.percent, 0) | |
| 882 self.assertLessEqual(ret.percent, 100) | |
| 883 if ret.secsleft not in (psutil.POWER_TIME_UNKNOWN, | |
| 884 psutil.POWER_TIME_UNLIMITED): | |
| 885 self.assertGreaterEqual(ret.secsleft, 0) | |
| 886 else: | |
| 887 if ret.secsleft == psutil.POWER_TIME_UNLIMITED: | |
| 888 self.assertTrue(ret.power_plugged) | |
| 889 self.assertIsInstance(ret.power_plugged, bool) | |
| 890 | |
| 891 @unittest.skipIf(not HAS_SENSORS_FANS, "not supported") | |
| 892 def test_sensors_fans(self): | |
| 893 fans = psutil.sensors_fans() | |
| 894 for name, entries in fans.items(): | |
| 895 self.assertIsInstance(name, str) | |
| 896 for entry in entries: | |
| 897 self.assertIsInstance(entry.label, str) | |
| 898 self.assertIsInstance(entry.current, (int, long)) | |
| 899 self.assertGreaterEqual(entry.current, 0) | |
| 900 | |
| 901 | |
| 902 if __name__ == '__main__': | |
| 903 from psutil.tests.runner import run | |
| 904 run(__file__) |
