Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/psutil/tests/test_misc.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
| author | guerler |
|---|---|
| date | Fri, 31 Jul 2020 00:32:28 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| 0:d30785e31577 | 1:56ad4e20f292 |
|---|---|
| 1 #!/usr/bin/env python3 | |
| 2 # -*- coding: utf-8 -*- | |
| 3 | |
| 4 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. | |
| 5 # Use of this source code is governed by a BSD-style license that can be | |
| 6 # found in the LICENSE file. | |
| 7 | |
| 8 """ | |
| 9 Miscellaneous tests. | |
| 10 """ | |
| 11 | |
| 12 import ast | |
| 13 import collections | |
| 14 import errno | |
| 15 import json | |
| 16 import os | |
| 17 import pickle | |
| 18 import socket | |
| 19 import stat | |
| 20 | |
| 21 from psutil import LINUX | |
| 22 from psutil import POSIX | |
| 23 from psutil import WINDOWS | |
| 24 from psutil._common import memoize | |
| 25 from psutil._common import memoize_when_activated | |
| 26 from psutil._common import supports_ipv6 | |
| 27 from psutil._common import wrap_numbers | |
| 28 from psutil._compat import PY3 | |
| 29 from psutil.tests import APPVEYOR | |
| 30 from psutil.tests import CI_TESTING | |
| 31 from psutil.tests import DEVNULL | |
| 32 from psutil.tests import HAS_BATTERY | |
| 33 from psutil.tests import HAS_MEMORY_MAPS | |
| 34 from psutil.tests import HAS_NET_IO_COUNTERS | |
| 35 from psutil.tests import HAS_SENSORS_BATTERY | |
| 36 from psutil.tests import HAS_SENSORS_FANS | |
| 37 from psutil.tests import HAS_SENSORS_TEMPERATURES | |
| 38 from psutil.tests import import_module_by_path | |
| 39 from psutil.tests import mock | |
| 40 from psutil.tests import PsutilTestCase | |
| 41 from psutil.tests import PYTHON_EXE | |
| 42 from psutil.tests import reload_module | |
| 43 from psutil.tests import ROOT_DIR | |
| 44 from psutil.tests import SCRIPTS_DIR | |
| 45 from psutil.tests import sh | |
| 46 from psutil.tests import TRAVIS | |
| 47 from psutil.tests import unittest | |
| 48 import psutil | |
| 49 import psutil.tests | |
| 50 | |
| 51 | |
| 52 # =================================================================== | |
| 53 # --- Misc / generic tests. | |
| 54 # =================================================================== | |
| 55 | |
| 56 | |
| 57 class TestMisc(PsutilTestCase): | |
| 58 | |
| 59 def test_process__repr__(self, func=repr): | |
| 60 p = psutil.Process(self.spawn_testproc().pid) | |
| 61 r = func(p) | |
| 62 self.assertIn("psutil.Process", r) | |
| 63 self.assertIn("pid=%s" % p.pid, r) | |
| 64 self.assertIn("name='%s'" % p.name(), r) | |
| 65 self.assertIn("status=", r) | |
| 66 self.assertNotIn("exitcode=", r) | |
| 67 p.terminate() | |
| 68 p.wait() | |
| 69 r = func(p) | |
| 70 self.assertIn("status='terminated'", r) | |
| 71 self.assertIn("exitcode=", r) | |
| 72 | |
| 73 with mock.patch.object(psutil.Process, "name", | |
| 74 side_effect=psutil.ZombieProcess(os.getpid())): | |
| 75 p = psutil.Process() | |
| 76 r = func(p) | |
| 77 self.assertIn("pid=%s" % p.pid, r) | |
| 78 self.assertIn("status='zombie'", r) | |
| 79 self.assertNotIn("name=", r) | |
| 80 with mock.patch.object(psutil.Process, "name", | |
| 81 side_effect=psutil.NoSuchProcess(os.getpid())): | |
| 82 p = psutil.Process() | |
| 83 r = func(p) | |
| 84 self.assertIn("pid=%s" % p.pid, r) | |
| 85 self.assertIn("terminated", r) | |
| 86 self.assertNotIn("name=", r) | |
| 87 with mock.patch.object(psutil.Process, "name", | |
| 88 side_effect=psutil.AccessDenied(os.getpid())): | |
| 89 p = psutil.Process() | |
| 90 r = func(p) | |
| 91 self.assertIn("pid=%s" % p.pid, r) | |
| 92 self.assertNotIn("name=", r) | |
| 93 | |
| 94 def test_process__str__(self): | |
| 95 self.test_process__repr__(func=str) | |
| 96 | |
| 97 def test_no_such_process__repr__(self, func=repr): | |
| 98 self.assertEqual( | |
| 99 repr(psutil.NoSuchProcess(321)), | |
| 100 "psutil.NoSuchProcess process no longer exists (pid=321)") | |
| 101 self.assertEqual( | |
| 102 repr(psutil.NoSuchProcess(321, name='foo')), | |
| 103 "psutil.NoSuchProcess process no longer exists (pid=321, " | |
| 104 "name='foo')") | |
| 105 self.assertEqual( | |
| 106 repr(psutil.NoSuchProcess(321, msg='foo')), | |
| 107 "psutil.NoSuchProcess foo") | |
| 108 | |
| 109 def test_zombie_process__repr__(self, func=repr): | |
| 110 self.assertEqual( | |
| 111 repr(psutil.ZombieProcess(321)), | |
| 112 "psutil.ZombieProcess process still exists but it's a zombie " | |
| 113 "(pid=321)") | |
| 114 self.assertEqual( | |
| 115 repr(psutil.ZombieProcess(321, name='foo')), | |
| 116 "psutil.ZombieProcess process still exists but it's a zombie " | |
| 117 "(pid=321, name='foo')") | |
| 118 self.assertEqual( | |
| 119 repr(psutil.ZombieProcess(321, name='foo', ppid=1)), | |
| 120 "psutil.ZombieProcess process still exists but it's a zombie " | |
| 121 "(pid=321, name='foo', ppid=1)") | |
| 122 self.assertEqual( | |
| 123 repr(psutil.ZombieProcess(321, msg='foo')), | |
| 124 "psutil.ZombieProcess foo") | |
| 125 | |
| 126 def test_access_denied__repr__(self, func=repr): | |
| 127 self.assertEqual( | |
| 128 repr(psutil.AccessDenied(321)), | |
| 129 "psutil.AccessDenied (pid=321)") | |
| 130 self.assertEqual( | |
| 131 repr(psutil.AccessDenied(321, name='foo')), | |
| 132 "psutil.AccessDenied (pid=321, name='foo')") | |
| 133 self.assertEqual( | |
| 134 repr(psutil.AccessDenied(321, msg='foo')), | |
| 135 "psutil.AccessDenied foo") | |
| 136 | |
| 137 def test_timeout_expired__repr__(self, func=repr): | |
| 138 self.assertEqual( | |
| 139 repr(psutil.TimeoutExpired(321)), | |
| 140 "psutil.TimeoutExpired timeout after 321 seconds") | |
| 141 self.assertEqual( | |
| 142 repr(psutil.TimeoutExpired(321, pid=111)), | |
| 143 "psutil.TimeoutExpired timeout after 321 seconds (pid=111)") | |
| 144 self.assertEqual( | |
| 145 repr(psutil.TimeoutExpired(321, pid=111, name='foo')), | |
| 146 "psutil.TimeoutExpired timeout after 321 seconds " | |
| 147 "(pid=111, name='foo')") | |
| 148 | |
| 149 def test_process__eq__(self): | |
| 150 p1 = psutil.Process() | |
| 151 p2 = psutil.Process() | |
| 152 self.assertEqual(p1, p2) | |
| 153 p2._ident = (0, 0) | |
| 154 self.assertNotEqual(p1, p2) | |
| 155 self.assertNotEqual(p1, 'foo') | |
| 156 | |
| 157 def test_process__hash__(self): | |
| 158 s = set([psutil.Process(), psutil.Process()]) | |
| 159 self.assertEqual(len(s), 1) | |
| 160 | |
| 161 def test__all__(self): | |
| 162 dir_psutil = dir(psutil) | |
| 163 for name in dir_psutil: | |
| 164 if name in ('long', 'tests', 'test', 'PermissionError', | |
| 165 'ProcessLookupError'): | |
| 166 continue | |
| 167 if not name.startswith('_'): | |
| 168 try: | |
| 169 __import__(name) | |
| 170 except ImportError: | |
| 171 if name not in psutil.__all__: | |
| 172 fun = getattr(psutil, name) | |
| 173 if fun is None: | |
| 174 continue | |
| 175 if (fun.__doc__ is not None and | |
| 176 'deprecated' not in fun.__doc__.lower()): | |
| 177 self.fail('%r not in psutil.__all__' % name) | |
| 178 | |
| 179 # Import 'star' will break if __all__ is inconsistent, see: | |
| 180 # https://github.com/giampaolo/psutil/issues/656 | |
| 181 # Can't do `from psutil import *` as it won't work on python 3 | |
| 182 # so we simply iterate over __all__. | |
| 183 for name in psutil.__all__: | |
| 184 self.assertIn(name, dir_psutil) | |
| 185 | |
| 186 def test_version(self): | |
| 187 self.assertEqual('.'.join([str(x) for x in psutil.version_info]), | |
| 188 psutil.__version__) | |
| 189 | |
| 190 def test_process_as_dict_no_new_names(self): | |
| 191 # See https://github.com/giampaolo/psutil/issues/813 | |
| 192 p = psutil.Process() | |
| 193 p.foo = '1' | |
| 194 self.assertNotIn('foo', p.as_dict()) | |
| 195 | |
| 196 def test_memoize(self): | |
| 197 @memoize | |
| 198 def foo(*args, **kwargs): | |
| 199 "foo docstring" | |
| 200 calls.append(None) | |
| 201 return (args, kwargs) | |
| 202 | |
| 203 calls = [] | |
| 204 # no args | |
| 205 for x in range(2): | |
| 206 ret = foo() | |
| 207 expected = ((), {}) | |
| 208 self.assertEqual(ret, expected) | |
| 209 self.assertEqual(len(calls), 1) | |
| 210 # with args | |
| 211 for x in range(2): | |
| 212 ret = foo(1) | |
| 213 expected = ((1, ), {}) | |
| 214 self.assertEqual(ret, expected) | |
| 215 self.assertEqual(len(calls), 2) | |
| 216 # with args + kwargs | |
| 217 for x in range(2): | |
| 218 ret = foo(1, bar=2) | |
| 219 expected = ((1, ), {'bar': 2}) | |
| 220 self.assertEqual(ret, expected) | |
| 221 self.assertEqual(len(calls), 3) | |
| 222 # clear cache | |
| 223 foo.cache_clear() | |
| 224 ret = foo() | |
| 225 expected = ((), {}) | |
| 226 self.assertEqual(ret, expected) | |
| 227 self.assertEqual(len(calls), 4) | |
| 228 # docstring | |
| 229 self.assertEqual(foo.__doc__, "foo docstring") | |
| 230 | |
| 231 def test_memoize_when_activated(self): | |
| 232 class Foo: | |
| 233 | |
| 234 @memoize_when_activated | |
| 235 def foo(self): | |
| 236 calls.append(None) | |
| 237 | |
| 238 f = Foo() | |
| 239 calls = [] | |
| 240 f.foo() | |
| 241 f.foo() | |
| 242 self.assertEqual(len(calls), 2) | |
| 243 | |
| 244 # activate | |
| 245 calls = [] | |
| 246 f.foo.cache_activate(f) | |
| 247 f.foo() | |
| 248 f.foo() | |
| 249 self.assertEqual(len(calls), 1) | |
| 250 | |
| 251 # deactivate | |
| 252 calls = [] | |
| 253 f.foo.cache_deactivate(f) | |
| 254 f.foo() | |
| 255 f.foo() | |
| 256 self.assertEqual(len(calls), 2) | |
| 257 | |
| 258 def test_parse_environ_block(self): | |
| 259 from psutil._common import parse_environ_block | |
| 260 | |
| 261 def k(s): | |
| 262 return s.upper() if WINDOWS else s | |
| 263 | |
| 264 self.assertEqual(parse_environ_block("a=1\0"), | |
| 265 {k("a"): "1"}) | |
| 266 self.assertEqual(parse_environ_block("a=1\0b=2\0\0"), | |
| 267 {k("a"): "1", k("b"): "2"}) | |
| 268 self.assertEqual(parse_environ_block("a=1\0b=\0\0"), | |
| 269 {k("a"): "1", k("b"): ""}) | |
| 270 # ignore everything after \0\0 | |
| 271 self.assertEqual(parse_environ_block("a=1\0b=2\0\0c=3\0"), | |
| 272 {k("a"): "1", k("b"): "2"}) | |
| 273 # ignore everything that is not an assignment | |
| 274 self.assertEqual(parse_environ_block("xxx\0a=1\0"), {k("a"): "1"}) | |
| 275 self.assertEqual(parse_environ_block("a=1\0=b=2\0"), {k("a"): "1"}) | |
| 276 # do not fail if the block is incomplete | |
| 277 self.assertEqual(parse_environ_block("a=1\0b=2"), {k("a"): "1"}) | |
| 278 | |
| 279 def test_supports_ipv6(self): | |
| 280 self.addCleanup(supports_ipv6.cache_clear) | |
| 281 if supports_ipv6(): | |
| 282 with mock.patch('psutil._common.socket') as s: | |
| 283 s.has_ipv6 = False | |
| 284 supports_ipv6.cache_clear() | |
| 285 assert not supports_ipv6() | |
| 286 | |
| 287 supports_ipv6.cache_clear() | |
| 288 with mock.patch('psutil._common.socket.socket', | |
| 289 side_effect=socket.error) as s: | |
| 290 assert not supports_ipv6() | |
| 291 assert s.called | |
| 292 | |
| 293 supports_ipv6.cache_clear() | |
| 294 with mock.patch('psutil._common.socket.socket', | |
| 295 side_effect=socket.gaierror) as s: | |
| 296 assert not supports_ipv6() | |
| 297 supports_ipv6.cache_clear() | |
| 298 assert s.called | |
| 299 | |
| 300 supports_ipv6.cache_clear() | |
| 301 with mock.patch('psutil._common.socket.socket.bind', | |
| 302 side_effect=socket.gaierror) as s: | |
| 303 assert not supports_ipv6() | |
| 304 supports_ipv6.cache_clear() | |
| 305 assert s.called | |
| 306 else: | |
| 307 with self.assertRaises(Exception): | |
| 308 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) | |
| 309 try: | |
| 310 sock.bind(("::1", 0)) | |
| 311 finally: | |
| 312 sock.close() | |
| 313 | |
| 314 def test_isfile_strict(self): | |
| 315 from psutil._common import isfile_strict | |
| 316 this_file = os.path.abspath(__file__) | |
| 317 assert isfile_strict(this_file) | |
| 318 assert not isfile_strict(os.path.dirname(this_file)) | |
| 319 with mock.patch('psutil._common.os.stat', | |
| 320 side_effect=OSError(errno.EPERM, "foo")): | |
| 321 self.assertRaises(OSError, isfile_strict, this_file) | |
| 322 with mock.patch('psutil._common.os.stat', | |
| 323 side_effect=OSError(errno.EACCES, "foo")): | |
| 324 self.assertRaises(OSError, isfile_strict, this_file) | |
| 325 with mock.patch('psutil._common.os.stat', | |
| 326 side_effect=OSError(errno.ENOENT, "foo")): | |
| 327 assert not isfile_strict(this_file) | |
| 328 with mock.patch('psutil._common.stat.S_ISREG', return_value=False): | |
| 329 assert not isfile_strict(this_file) | |
| 330 | |
| 331 def test_serialization(self): | |
| 332 def check(ret): | |
| 333 if json is not None: | |
| 334 json.loads(json.dumps(ret)) | |
| 335 a = pickle.dumps(ret) | |
| 336 b = pickle.loads(a) | |
| 337 self.assertEqual(ret, b) | |
| 338 | |
| 339 check(psutil.Process().as_dict()) | |
| 340 check(psutil.virtual_memory()) | |
| 341 check(psutil.swap_memory()) | |
| 342 check(psutil.cpu_times()) | |
| 343 check(psutil.cpu_times_percent(interval=0)) | |
| 344 check(psutil.net_io_counters()) | |
| 345 if LINUX and not os.path.exists('/proc/diskstats'): | |
| 346 pass | |
| 347 else: | |
| 348 if not APPVEYOR: | |
| 349 check(psutil.disk_io_counters()) | |
| 350 check(psutil.disk_partitions()) | |
| 351 check(psutil.disk_usage(os.getcwd())) | |
| 352 check(psutil.users()) | |
| 353 | |
| 354 def test_setup_script(self): | |
| 355 setup_py = os.path.join(ROOT_DIR, 'setup.py') | |
| 356 if CI_TESTING and not os.path.exists(setup_py): | |
| 357 return self.skipTest("can't find setup.py") | |
| 358 module = import_module_by_path(setup_py) | |
| 359 self.assertRaises(SystemExit, module.setup) | |
| 360 self.assertEqual(module.get_version(), psutil.__version__) | |
| 361 | |
| 362 def test_ad_on_process_creation(self): | |
| 363 # We are supposed to be able to instantiate Process also in case | |
| 364 # of zombie processes or access denied. | |
| 365 with mock.patch.object(psutil.Process, 'create_time', | |
| 366 side_effect=psutil.AccessDenied) as meth: | |
| 367 psutil.Process() | |
| 368 assert meth.called | |
| 369 with mock.patch.object(psutil.Process, 'create_time', | |
| 370 side_effect=psutil.ZombieProcess(1)) as meth: | |
| 371 psutil.Process() | |
| 372 assert meth.called | |
| 373 with mock.patch.object(psutil.Process, 'create_time', | |
| 374 side_effect=ValueError) as meth: | |
| 375 with self.assertRaises(ValueError): | |
| 376 psutil.Process() | |
| 377 assert meth.called | |
| 378 | |
| 379 def test_sanity_version_check(self): | |
| 380 # see: https://github.com/giampaolo/psutil/issues/564 | |
| 381 with mock.patch( | |
| 382 "psutil._psplatform.cext.version", return_value="0.0.0"): | |
| 383 with self.assertRaises(ImportError) as cm: | |
| 384 reload_module(psutil) | |
| 385 self.assertIn("version conflict", str(cm.exception).lower()) | |
| 386 | |
| 387 | |
| 388 # =================================================================== | |
| 389 # --- Tests for wrap_numbers() function. | |
| 390 # =================================================================== | |
| 391 | |
| 392 | |
| 393 nt = collections.namedtuple('foo', 'a b c') | |
| 394 | |
| 395 | |
| 396 class TestWrapNumbers(PsutilTestCase): | |
| 397 | |
| 398 def setUp(self): | |
| 399 wrap_numbers.cache_clear() | |
| 400 | |
| 401 tearDown = setUp | |
| 402 | |
| 403 def test_first_call(self): | |
| 404 input = {'disk1': nt(5, 5, 5)} | |
| 405 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
| 406 | |
| 407 def test_input_hasnt_changed(self): | |
| 408 input = {'disk1': nt(5, 5, 5)} | |
| 409 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
| 410 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
| 411 | |
| 412 def test_increase_but_no_wrap(self): | |
| 413 input = {'disk1': nt(5, 5, 5)} | |
| 414 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
| 415 input = {'disk1': nt(10, 15, 20)} | |
| 416 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
| 417 input = {'disk1': nt(20, 25, 30)} | |
| 418 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
| 419 input = {'disk1': nt(20, 25, 30)} | |
| 420 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
| 421 | |
| 422 def test_wrap(self): | |
| 423 # let's say 100 is the threshold | |
| 424 input = {'disk1': nt(100, 100, 100)} | |
| 425 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
| 426 # first wrap restarts from 10 | |
| 427 input = {'disk1': nt(100, 100, 10)} | |
| 428 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
| 429 {'disk1': nt(100, 100, 110)}) | |
| 430 # then it remains the same | |
| 431 input = {'disk1': nt(100, 100, 10)} | |
| 432 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
| 433 {'disk1': nt(100, 100, 110)}) | |
| 434 # then it goes up | |
| 435 input = {'disk1': nt(100, 100, 90)} | |
| 436 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
| 437 {'disk1': nt(100, 100, 190)}) | |
| 438 # then it wraps again | |
| 439 input = {'disk1': nt(100, 100, 20)} | |
| 440 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
| 441 {'disk1': nt(100, 100, 210)}) | |
| 442 # and remains the same | |
| 443 input = {'disk1': nt(100, 100, 20)} | |
| 444 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
| 445 {'disk1': nt(100, 100, 210)}) | |
| 446 # now wrap another num | |
| 447 input = {'disk1': nt(50, 100, 20)} | |
| 448 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
| 449 {'disk1': nt(150, 100, 210)}) | |
| 450 # and again | |
| 451 input = {'disk1': nt(40, 100, 20)} | |
| 452 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
| 453 {'disk1': nt(190, 100, 210)}) | |
| 454 # keep it the same | |
| 455 input = {'disk1': nt(40, 100, 20)} | |
| 456 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
| 457 {'disk1': nt(190, 100, 210)}) | |
| 458 | |
| 459 def test_changing_keys(self): | |
| 460 # Emulate a case where the second call to disk_io() | |
| 461 # (or whatever) provides a new disk, then the new disk | |
| 462 # disappears on the third call. | |
| 463 input = {'disk1': nt(5, 5, 5)} | |
| 464 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
| 465 input = {'disk1': nt(5, 5, 5), | |
| 466 'disk2': nt(7, 7, 7)} | |
| 467 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
| 468 input = {'disk1': nt(8, 8, 8)} | |
| 469 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
| 470 | |
| 471 def test_changing_keys_w_wrap(self): | |
| 472 input = {'disk1': nt(50, 50, 50), | |
| 473 'disk2': nt(100, 100, 100)} | |
| 474 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
| 475 # disk 2 wraps | |
| 476 input = {'disk1': nt(50, 50, 50), | |
| 477 'disk2': nt(100, 100, 10)} | |
| 478 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
| 479 {'disk1': nt(50, 50, 50), | |
| 480 'disk2': nt(100, 100, 110)}) | |
| 481 # disk 2 disappears | |
| 482 input = {'disk1': nt(50, 50, 50)} | |
| 483 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
| 484 | |
| 485 # then it appears again; the old wrap is supposed to be | |
| 486 # gone. | |
| 487 input = {'disk1': nt(50, 50, 50), | |
| 488 'disk2': nt(100, 100, 100)} | |
| 489 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
| 490 # remains the same | |
| 491 input = {'disk1': nt(50, 50, 50), | |
| 492 'disk2': nt(100, 100, 100)} | |
| 493 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
| 494 # and then wraps again | |
| 495 input = {'disk1': nt(50, 50, 50), | |
| 496 'disk2': nt(100, 100, 10)} | |
| 497 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
| 498 {'disk1': nt(50, 50, 50), | |
| 499 'disk2': nt(100, 100, 110)}) | |
| 500 | |
| 501 def test_real_data(self): | |
| 502 d = {'nvme0n1': (300, 508, 640, 1571, 5970, 1987, 2049, 451751, 47048), | |
| 503 'nvme0n1p1': (1171, 2, 5600256, 1024, 516, 0, 0, 0, 8), | |
| 504 'nvme0n1p2': (54, 54, 2396160, 5165056, 4, 24, 30, 1207, 28), | |
| 505 'nvme0n1p3': (2389, 4539, 5154, 150, 4828, 1844, 2019, 398, 348)} | |
| 506 self.assertEqual(wrap_numbers(d, 'disk_io'), d) | |
| 507 self.assertEqual(wrap_numbers(d, 'disk_io'), d) | |
| 508 # decrease this ↓ | |
| 509 d = {'nvme0n1': (100, 508, 640, 1571, 5970, 1987, 2049, 451751, 47048), | |
| 510 'nvme0n1p1': (1171, 2, 5600256, 1024, 516, 0, 0, 0, 8), | |
| 511 'nvme0n1p2': (54, 54, 2396160, 5165056, 4, 24, 30, 1207, 28), | |
| 512 'nvme0n1p3': (2389, 4539, 5154, 150, 4828, 1844, 2019, 398, 348)} | |
| 513 out = wrap_numbers(d, 'disk_io') | |
| 514 self.assertEqual(out['nvme0n1'][0], 400) | |
| 515 | |
| 516 # --- cache tests | |
| 517 | |
| 518 def test_cache_first_call(self): | |
| 519 input = {'disk1': nt(5, 5, 5)} | |
| 520 wrap_numbers(input, 'disk_io') | |
| 521 cache = wrap_numbers.cache_info() | |
| 522 self.assertEqual(cache[0], {'disk_io': input}) | |
| 523 self.assertEqual(cache[1], {'disk_io': {}}) | |
| 524 self.assertEqual(cache[2], {'disk_io': {}}) | |
| 525 | |
| 526 def test_cache_call_twice(self): | |
| 527 input = {'disk1': nt(5, 5, 5)} | |
| 528 wrap_numbers(input, 'disk_io') | |
| 529 input = {'disk1': nt(10, 10, 10)} | |
| 530 wrap_numbers(input, 'disk_io') | |
| 531 cache = wrap_numbers.cache_info() | |
| 532 self.assertEqual(cache[0], {'disk_io': input}) | |
| 533 self.assertEqual( | |
| 534 cache[1], | |
| 535 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 0}}) | |
| 536 self.assertEqual(cache[2], {'disk_io': {}}) | |
| 537 | |
| 538 def test_cache_wrap(self): | |
| 539 # let's say 100 is the threshold | |
| 540 input = {'disk1': nt(100, 100, 100)} | |
| 541 wrap_numbers(input, 'disk_io') | |
| 542 | |
| 543 # first wrap restarts from 10 | |
| 544 input = {'disk1': nt(100, 100, 10)} | |
| 545 wrap_numbers(input, 'disk_io') | |
| 546 cache = wrap_numbers.cache_info() | |
| 547 self.assertEqual(cache[0], {'disk_io': input}) | |
| 548 self.assertEqual( | |
| 549 cache[1], | |
| 550 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 100}}) | |
| 551 self.assertEqual(cache[2], {'disk_io': {'disk1': set([('disk1', 2)])}}) | |
| 552 | |
| 553 def assert_(): | |
| 554 cache = wrap_numbers.cache_info() | |
| 555 self.assertEqual( | |
| 556 cache[1], | |
| 557 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, | |
| 558 ('disk1', 2): 100}}) | |
| 559 self.assertEqual(cache[2], | |
| 560 {'disk_io': {'disk1': set([('disk1', 2)])}}) | |
| 561 | |
| 562 # then it remains the same | |
| 563 input = {'disk1': nt(100, 100, 10)} | |
| 564 wrap_numbers(input, 'disk_io') | |
| 565 cache = wrap_numbers.cache_info() | |
| 566 self.assertEqual(cache[0], {'disk_io': input}) | |
| 567 assert_() | |
| 568 | |
| 569 # then it goes up | |
| 570 input = {'disk1': nt(100, 100, 90)} | |
| 571 wrap_numbers(input, 'disk_io') | |
| 572 cache = wrap_numbers.cache_info() | |
| 573 self.assertEqual(cache[0], {'disk_io': input}) | |
| 574 assert_() | |
| 575 | |
| 576 # then it wraps again | |
| 577 input = {'disk1': nt(100, 100, 20)} | |
| 578 wrap_numbers(input, 'disk_io') | |
| 579 cache = wrap_numbers.cache_info() | |
| 580 self.assertEqual(cache[0], {'disk_io': input}) | |
| 581 self.assertEqual( | |
| 582 cache[1], | |
| 583 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 190}}) | |
| 584 self.assertEqual(cache[2], {'disk_io': {'disk1': set([('disk1', 2)])}}) | |
| 585 | |
| 586 def test_cache_changing_keys(self): | |
| 587 input = {'disk1': nt(5, 5, 5)} | |
| 588 wrap_numbers(input, 'disk_io') | |
| 589 input = {'disk1': nt(5, 5, 5), | |
| 590 'disk2': nt(7, 7, 7)} | |
| 591 wrap_numbers(input, 'disk_io') | |
| 592 cache = wrap_numbers.cache_info() | |
| 593 self.assertEqual(cache[0], {'disk_io': input}) | |
| 594 self.assertEqual( | |
| 595 cache[1], | |
| 596 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 0}}) | |
| 597 self.assertEqual(cache[2], {'disk_io': {}}) | |
| 598 | |
| 599 def test_cache_clear(self): | |
| 600 input = {'disk1': nt(5, 5, 5)} | |
| 601 wrap_numbers(input, 'disk_io') | |
| 602 wrap_numbers(input, 'disk_io') | |
| 603 wrap_numbers.cache_clear('disk_io') | |
| 604 self.assertEqual(wrap_numbers.cache_info(), ({}, {}, {})) | |
| 605 wrap_numbers.cache_clear('disk_io') | |
| 606 wrap_numbers.cache_clear('?!?') | |
| 607 | |
| 608 @unittest.skipIf(not HAS_NET_IO_COUNTERS, 'not supported') | |
| 609 def test_cache_clear_public_apis(self): | |
| 610 if not psutil.disk_io_counters() or not psutil.net_io_counters(): | |
| 611 return self.skipTest("no disks or NICs available") | |
| 612 psutil.disk_io_counters() | |
| 613 psutil.net_io_counters() | |
| 614 caches = wrap_numbers.cache_info() | |
| 615 for cache in caches: | |
| 616 self.assertIn('psutil.disk_io_counters', cache) | |
| 617 self.assertIn('psutil.net_io_counters', cache) | |
| 618 | |
| 619 psutil.disk_io_counters.cache_clear() | |
| 620 caches = wrap_numbers.cache_info() | |
| 621 for cache in caches: | |
| 622 self.assertIn('psutil.net_io_counters', cache) | |
| 623 self.assertNotIn('psutil.disk_io_counters', cache) | |
| 624 | |
| 625 psutil.net_io_counters.cache_clear() | |
| 626 caches = wrap_numbers.cache_info() | |
| 627 self.assertEqual(caches, ({}, {}, {})) | |
| 628 | |
| 629 | |
| 630 # =================================================================== | |
| 631 # --- Example script tests | |
| 632 # =================================================================== | |
| 633 | |
| 634 | |
| 635 @unittest.skipIf(not os.path.exists(SCRIPTS_DIR), | |
| 636 "can't locate scripts directory") | |
| 637 class TestScripts(PsutilTestCase): | |
| 638 """Tests for scripts in the "scripts" directory.""" | |
| 639 | |
| 640 @staticmethod | |
| 641 def assert_stdout(exe, *args, **kwargs): | |
| 642 exe = '%s' % os.path.join(SCRIPTS_DIR, exe) | |
| 643 cmd = [PYTHON_EXE, exe] | |
| 644 for arg in args: | |
| 645 cmd.append(arg) | |
| 646 try: | |
| 647 out = sh(cmd, **kwargs).strip() | |
| 648 except RuntimeError as err: | |
| 649 if 'AccessDenied' in str(err): | |
| 650 return str(err) | |
| 651 else: | |
| 652 raise | |
| 653 assert out, out | |
| 654 return out | |
| 655 | |
| 656 @staticmethod | |
| 657 def assert_syntax(exe, args=None): | |
| 658 exe = os.path.join(SCRIPTS_DIR, exe) | |
| 659 if PY3: | |
| 660 f = open(exe, 'rt', encoding='utf8') | |
| 661 else: | |
| 662 f = open(exe, 'rt') | |
| 663 with f: | |
| 664 src = f.read() | |
| 665 ast.parse(src) | |
| 666 | |
| 667 def test_coverage(self): | |
| 668 # make sure all example scripts have a test method defined | |
| 669 meths = dir(self) | |
| 670 for name in os.listdir(SCRIPTS_DIR): | |
| 671 if name.endswith('.py'): | |
| 672 if 'test_' + os.path.splitext(name)[0] not in meths: | |
| 673 # self.assert_stdout(name) | |
| 674 self.fail('no test defined for %r script' | |
| 675 % os.path.join(SCRIPTS_DIR, name)) | |
| 676 | |
| 677 @unittest.skipIf(not POSIX, "POSIX only") | |
| 678 def test_executable(self): | |
| 679 for name in os.listdir(SCRIPTS_DIR): | |
| 680 if name.endswith('.py'): | |
| 681 path = os.path.join(SCRIPTS_DIR, name) | |
| 682 if not stat.S_IXUSR & os.stat(path)[stat.ST_MODE]: | |
| 683 self.fail('%r is not executable' % path) | |
| 684 | |
| 685 def test_disk_usage(self): | |
| 686 self.assert_stdout('disk_usage.py') | |
| 687 | |
| 688 def test_free(self): | |
| 689 self.assert_stdout('free.py') | |
| 690 | |
| 691 def test_meminfo(self): | |
| 692 self.assert_stdout('meminfo.py') | |
| 693 | |
| 694 def test_procinfo(self): | |
| 695 self.assert_stdout('procinfo.py', str(os.getpid())) | |
| 696 | |
| 697 @unittest.skipIf(CI_TESTING and not psutil.users(), "no users") | |
| 698 def test_who(self): | |
| 699 self.assert_stdout('who.py') | |
| 700 | |
| 701 def test_ps(self): | |
| 702 self.assert_stdout('ps.py') | |
| 703 | |
| 704 def test_pstree(self): | |
| 705 self.assert_stdout('pstree.py') | |
| 706 | |
| 707 def test_netstat(self): | |
| 708 self.assert_stdout('netstat.py') | |
| 709 | |
| 710 # permission denied on travis | |
| 711 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS") | |
| 712 def test_ifconfig(self): | |
| 713 self.assert_stdout('ifconfig.py') | |
| 714 | |
| 715 @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported") | |
| 716 def test_pmap(self): | |
| 717 self.assert_stdout('pmap.py', str(os.getpid())) | |
| 718 | |
| 719 def test_procsmem(self): | |
| 720 if 'uss' not in psutil.Process().memory_full_info()._fields: | |
| 721 raise self.skipTest("not supported") | |
| 722 self.assert_stdout('procsmem.py', stderr=DEVNULL) | |
| 723 | |
| 724 def test_killall(self): | |
| 725 self.assert_syntax('killall.py') | |
| 726 | |
| 727 def test_nettop(self): | |
| 728 self.assert_syntax('nettop.py') | |
| 729 | |
| 730 def test_top(self): | |
| 731 self.assert_syntax('top.py') | |
| 732 | |
| 733 def test_iotop(self): | |
| 734 self.assert_syntax('iotop.py') | |
| 735 | |
| 736 def test_pidof(self): | |
| 737 output = self.assert_stdout('pidof.py', psutil.Process().name()) | |
| 738 self.assertIn(str(os.getpid()), output) | |
| 739 | |
| 740 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
| 741 def test_winservices(self): | |
| 742 self.assert_stdout('winservices.py') | |
| 743 | |
| 744 def test_cpu_distribution(self): | |
| 745 self.assert_syntax('cpu_distribution.py') | |
| 746 | |
| 747 @unittest.skipIf(not HAS_SENSORS_TEMPERATURES, "not supported") | |
| 748 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS") | |
| 749 def test_temperatures(self): | |
| 750 if not psutil.sensors_temperatures(): | |
| 751 self.skipTest("no temperatures") | |
| 752 self.assert_stdout('temperatures.py') | |
| 753 | |
| 754 @unittest.skipIf(not HAS_SENSORS_FANS, "not supported") | |
| 755 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS") | |
| 756 def test_fans(self): | |
| 757 if not psutil.sensors_fans(): | |
| 758 self.skipTest("no fans") | |
| 759 self.assert_stdout('fans.py') | |
| 760 | |
| 761 @unittest.skipIf(not HAS_SENSORS_BATTERY, "not supported") | |
| 762 @unittest.skipIf(not HAS_BATTERY, "no battery") | |
| 763 def test_battery(self): | |
| 764 self.assert_stdout('battery.py') | |
| 765 | |
| 766 def test_sensors(self): | |
| 767 self.assert_stdout('sensors.py') | |
| 768 | |
| 769 | |
| 770 if __name__ == '__main__': | |
| 771 from psutil.tests.runner import run_from_name | |
| 772 run_from_name(__file__) |
