Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/psutil/tests/test_misc.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
| author | shellac |
|---|---|
| date | Sat, 02 May 2020 07:14:21 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:26e78fe6e8c4 |
|---|---|
| 1 #!/usr/bin/env python3 | |
| 2 # -*- coding: utf-8 -*- | |
| 3 | |
| 4 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. | |
| 5 # Use of this source code is governed by a BSD-style license that can be | |
| 6 # found in the LICENSE file. | |
| 7 | |
| 8 """ | |
| 9 Miscellaneous tests. | |
| 10 """ | |
| 11 | |
| 12 import ast | |
| 13 import collections | |
| 14 import contextlib | |
| 15 import errno | |
| 16 import json | |
| 17 import os | |
| 18 import pickle | |
| 19 import socket | |
| 20 import stat | |
| 21 | |
| 22 from psutil import FREEBSD | |
| 23 from psutil import LINUX | |
| 24 from psutil import NETBSD | |
| 25 from psutil import POSIX | |
| 26 from psutil import WINDOWS | |
| 27 from psutil._common import memoize | |
| 28 from psutil._common import memoize_when_activated | |
| 29 from psutil._common import supports_ipv6 | |
| 30 from psutil._common import wrap_numbers | |
| 31 from psutil._common import open_text | |
| 32 from psutil._common import open_binary | |
| 33 from psutil._compat import PY3 | |
| 34 from psutil.tests import APPVEYOR | |
| 35 from psutil.tests import bind_socket | |
| 36 from psutil.tests import bind_unix_socket | |
| 37 from psutil.tests import call_until | |
| 38 from psutil.tests import chdir | |
| 39 from psutil.tests import CI_TESTING | |
| 40 from psutil.tests import create_proc_children_pair | |
| 41 from psutil.tests import create_sockets | |
| 42 from psutil.tests import create_zombie_proc | |
| 43 from psutil.tests import DEVNULL | |
| 44 from psutil.tests import get_free_port | |
| 45 from psutil.tests import get_test_subprocess | |
| 46 from psutil.tests import HAS_BATTERY | |
| 47 from psutil.tests import HAS_CONNECTIONS_UNIX | |
| 48 from psutil.tests import HAS_MEMORY_MAPS | |
| 49 from psutil.tests import HAS_NET_IO_COUNTERS | |
| 50 from psutil.tests import HAS_SENSORS_BATTERY | |
| 51 from psutil.tests import HAS_SENSORS_FANS | |
| 52 from psutil.tests import HAS_SENSORS_TEMPERATURES | |
| 53 from psutil.tests import import_module_by_path | |
| 54 from psutil.tests import is_namedtuple | |
| 55 from psutil.tests import mock | |
| 56 from psutil.tests import PYTHON_EXE | |
| 57 from psutil.tests import reap_children | |
| 58 from psutil.tests import reload_module | |
| 59 from psutil.tests import retry | |
| 60 from psutil.tests import ROOT_DIR | |
| 61 from psutil.tests import safe_mkdir | |
| 62 from psutil.tests import safe_rmpath | |
| 63 from psutil.tests import SCRIPTS_DIR | |
| 64 from psutil.tests import sh | |
| 65 from psutil.tests import tcp_socketpair | |
| 66 from psutil.tests import TESTFN | |
| 67 from psutil.tests import TOX | |
| 68 from psutil.tests import TRAVIS | |
| 69 from psutil.tests import unittest | |
| 70 from psutil.tests import unix_socket_path | |
| 71 from psutil.tests import unix_socketpair | |
| 72 from psutil.tests import wait_for_file | |
| 73 from psutil.tests import wait_for_pid | |
| 74 import psutil | |
| 75 import psutil.tests | |
| 76 | |
| 77 | |
| 78 # =================================================================== | |
| 79 # --- Misc / generic tests. | |
| 80 # =================================================================== | |
| 81 | |
| 82 | |
| 83 class TestMisc(unittest.TestCase): | |
| 84 | |
| 85 def test_process__repr__(self, func=repr): | |
| 86 p = psutil.Process() | |
| 87 r = func(p) | |
| 88 self.assertIn("psutil.Process", r) | |
| 89 self.assertIn("pid=%s" % p.pid, r) | |
| 90 self.assertIn("name=", r) | |
| 91 self.assertIn(p.name(), r) | |
| 92 with mock.patch.object(psutil.Process, "name", | |
| 93 side_effect=psutil.ZombieProcess(os.getpid())): | |
| 94 p = psutil.Process() | |
| 95 r = func(p) | |
| 96 self.assertIn("pid=%s" % p.pid, r) | |
| 97 self.assertIn("zombie", r) | |
| 98 self.assertNotIn("name=", r) | |
| 99 with mock.patch.object(psutil.Process, "name", | |
| 100 side_effect=psutil.NoSuchProcess(os.getpid())): | |
| 101 p = psutil.Process() | |
| 102 r = func(p) | |
| 103 self.assertIn("pid=%s" % p.pid, r) | |
| 104 self.assertIn("terminated", r) | |
| 105 self.assertNotIn("name=", r) | |
| 106 with mock.patch.object(psutil.Process, "name", | |
| 107 side_effect=psutil.AccessDenied(os.getpid())): | |
| 108 p = psutil.Process() | |
| 109 r = func(p) | |
| 110 self.assertIn("pid=%s" % p.pid, r) | |
| 111 self.assertNotIn("name=", r) | |
| 112 | |
| 113 def test_process__str__(self): | |
| 114 self.test_process__repr__(func=str) | |
| 115 | |
| 116 def test_no_such_process__repr__(self, func=repr): | |
| 117 self.assertEqual( | |
| 118 repr(psutil.NoSuchProcess(321)), | |
| 119 "psutil.NoSuchProcess process no longer exists (pid=321)") | |
| 120 self.assertEqual( | |
| 121 repr(psutil.NoSuchProcess(321, name='foo')), | |
| 122 "psutil.NoSuchProcess process no longer exists (pid=321, " | |
| 123 "name='foo')") | |
| 124 self.assertEqual( | |
| 125 repr(psutil.NoSuchProcess(321, msg='foo')), | |
| 126 "psutil.NoSuchProcess foo") | |
| 127 | |
| 128 def test_zombie_process__repr__(self, func=repr): | |
| 129 self.assertEqual( | |
| 130 repr(psutil.ZombieProcess(321)), | |
| 131 "psutil.ZombieProcess process still exists but it's a zombie " | |
| 132 "(pid=321)") | |
| 133 self.assertEqual( | |
| 134 repr(psutil.ZombieProcess(321, name='foo')), | |
| 135 "psutil.ZombieProcess process still exists but it's a zombie " | |
| 136 "(pid=321, name='foo')") | |
| 137 self.assertEqual( | |
| 138 repr(psutil.ZombieProcess(321, name='foo', ppid=1)), | |
| 139 "psutil.ZombieProcess process still exists but it's a zombie " | |
| 140 "(pid=321, name='foo', ppid=1)") | |
| 141 self.assertEqual( | |
| 142 repr(psutil.ZombieProcess(321, msg='foo')), | |
| 143 "psutil.ZombieProcess foo") | |
| 144 | |
| 145 def test_access_denied__repr__(self, func=repr): | |
| 146 self.assertEqual( | |
| 147 repr(psutil.AccessDenied(321)), | |
| 148 "psutil.AccessDenied (pid=321)") | |
| 149 self.assertEqual( | |
| 150 repr(psutil.AccessDenied(321, name='foo')), | |
| 151 "psutil.AccessDenied (pid=321, name='foo')") | |
| 152 self.assertEqual( | |
| 153 repr(psutil.AccessDenied(321, msg='foo')), | |
| 154 "psutil.AccessDenied foo") | |
| 155 | |
| 156 def test_timeout_expired__repr__(self, func=repr): | |
| 157 self.assertEqual( | |
| 158 repr(psutil.TimeoutExpired(321)), | |
| 159 "psutil.TimeoutExpired timeout after 321 seconds") | |
| 160 self.assertEqual( | |
| 161 repr(psutil.TimeoutExpired(321, pid=111)), | |
| 162 "psutil.TimeoutExpired timeout after 321 seconds (pid=111)") | |
| 163 self.assertEqual( | |
| 164 repr(psutil.TimeoutExpired(321, pid=111, name='foo')), | |
| 165 "psutil.TimeoutExpired timeout after 321 seconds " | |
| 166 "(pid=111, name='foo')") | |
| 167 | |
| 168 def test_process__eq__(self): | |
| 169 p1 = psutil.Process() | |
| 170 p2 = psutil.Process() | |
| 171 self.assertEqual(p1, p2) | |
| 172 p2._ident = (0, 0) | |
| 173 self.assertNotEqual(p1, p2) | |
| 174 self.assertNotEqual(p1, 'foo') | |
| 175 | |
| 176 def test_process__hash__(self): | |
| 177 s = set([psutil.Process(), psutil.Process()]) | |
| 178 self.assertEqual(len(s), 1) | |
| 179 | |
| 180 def test__all__(self): | |
| 181 dir_psutil = dir(psutil) | |
| 182 for name in dir_psutil: | |
| 183 if name in ('callable', 'error', 'namedtuple', 'tests', | |
| 184 'long', 'test', 'NUM_CPUS', 'BOOT_TIME', | |
| 185 'TOTAL_PHYMEM', 'PermissionError', | |
| 186 'ProcessLookupError'): | |
| 187 continue | |
| 188 if not name.startswith('_'): | |
| 189 try: | |
| 190 __import__(name) | |
| 191 except ImportError: | |
| 192 if name not in psutil.__all__: | |
| 193 fun = getattr(psutil, name) | |
| 194 if fun is None: | |
| 195 continue | |
| 196 if (fun.__doc__ is not None and | |
| 197 'deprecated' not in fun.__doc__.lower()): | |
| 198 self.fail('%r not in psutil.__all__' % name) | |
| 199 | |
| 200 # Import 'star' will break if __all__ is inconsistent, see: | |
| 201 # https://github.com/giampaolo/psutil/issues/656 | |
| 202 # Can't do `from psutil import *` as it won't work on python 3 | |
| 203 # so we simply iterate over __all__. | |
| 204 for name in psutil.__all__: | |
| 205 self.assertIn(name, dir_psutil) | |
| 206 | |
| 207 def test_version(self): | |
| 208 self.assertEqual('.'.join([str(x) for x in psutil.version_info]), | |
| 209 psutil.__version__) | |
| 210 | |
| 211 def test_process_as_dict_no_new_names(self): | |
| 212 # See https://github.com/giampaolo/psutil/issues/813 | |
| 213 p = psutil.Process() | |
| 214 p.foo = '1' | |
| 215 self.assertNotIn('foo', p.as_dict()) | |
| 216 | |
| 217 def test_memoize(self): | |
| 218 @memoize | |
| 219 def foo(*args, **kwargs): | |
| 220 "foo docstring" | |
| 221 calls.append(None) | |
| 222 return (args, kwargs) | |
| 223 | |
| 224 calls = [] | |
| 225 # no args | |
| 226 for x in range(2): | |
| 227 ret = foo() | |
| 228 expected = ((), {}) | |
| 229 self.assertEqual(ret, expected) | |
| 230 self.assertEqual(len(calls), 1) | |
| 231 # with args | |
| 232 for x in range(2): | |
| 233 ret = foo(1) | |
| 234 expected = ((1, ), {}) | |
| 235 self.assertEqual(ret, expected) | |
| 236 self.assertEqual(len(calls), 2) | |
| 237 # with args + kwargs | |
| 238 for x in range(2): | |
| 239 ret = foo(1, bar=2) | |
| 240 expected = ((1, ), {'bar': 2}) | |
| 241 self.assertEqual(ret, expected) | |
| 242 self.assertEqual(len(calls), 3) | |
| 243 # clear cache | |
| 244 foo.cache_clear() | |
| 245 ret = foo() | |
| 246 expected = ((), {}) | |
| 247 self.assertEqual(ret, expected) | |
| 248 self.assertEqual(len(calls), 4) | |
| 249 # docstring | |
| 250 self.assertEqual(foo.__doc__, "foo docstring") | |
| 251 | |
| 252 def test_memoize_when_activated(self): | |
| 253 class Foo: | |
| 254 | |
| 255 @memoize_when_activated | |
| 256 def foo(self): | |
| 257 calls.append(None) | |
| 258 | |
| 259 f = Foo() | |
| 260 calls = [] | |
| 261 f.foo() | |
| 262 f.foo() | |
| 263 self.assertEqual(len(calls), 2) | |
| 264 | |
| 265 # activate | |
| 266 calls = [] | |
| 267 f.foo.cache_activate(f) | |
| 268 f.foo() | |
| 269 f.foo() | |
| 270 self.assertEqual(len(calls), 1) | |
| 271 | |
| 272 # deactivate | |
| 273 calls = [] | |
| 274 f.foo.cache_deactivate(f) | |
| 275 f.foo() | |
| 276 f.foo() | |
| 277 self.assertEqual(len(calls), 2) | |
| 278 | |
| 279 def test_parse_environ_block(self): | |
| 280 from psutil._common import parse_environ_block | |
| 281 | |
| 282 def k(s): | |
| 283 return s.upper() if WINDOWS else s | |
| 284 | |
| 285 self.assertEqual(parse_environ_block("a=1\0"), | |
| 286 {k("a"): "1"}) | |
| 287 self.assertEqual(parse_environ_block("a=1\0b=2\0\0"), | |
| 288 {k("a"): "1", k("b"): "2"}) | |
| 289 self.assertEqual(parse_environ_block("a=1\0b=\0\0"), | |
| 290 {k("a"): "1", k("b"): ""}) | |
| 291 # ignore everything after \0\0 | |
| 292 self.assertEqual(parse_environ_block("a=1\0b=2\0\0c=3\0"), | |
| 293 {k("a"): "1", k("b"): "2"}) | |
| 294 # ignore everything that is not an assignment | |
| 295 self.assertEqual(parse_environ_block("xxx\0a=1\0"), {k("a"): "1"}) | |
| 296 self.assertEqual(parse_environ_block("a=1\0=b=2\0"), {k("a"): "1"}) | |
| 297 # do not fail if the block is incomplete | |
| 298 self.assertEqual(parse_environ_block("a=1\0b=2"), {k("a"): "1"}) | |
| 299 | |
| 300 def test_supports_ipv6(self): | |
| 301 self.addCleanup(supports_ipv6.cache_clear) | |
| 302 if supports_ipv6(): | |
| 303 with mock.patch('psutil._common.socket') as s: | |
| 304 s.has_ipv6 = False | |
| 305 supports_ipv6.cache_clear() | |
| 306 assert not supports_ipv6() | |
| 307 | |
| 308 supports_ipv6.cache_clear() | |
| 309 with mock.patch('psutil._common.socket.socket', | |
| 310 side_effect=socket.error) as s: | |
| 311 assert not supports_ipv6() | |
| 312 assert s.called | |
| 313 | |
| 314 supports_ipv6.cache_clear() | |
| 315 with mock.patch('psutil._common.socket.socket', | |
| 316 side_effect=socket.gaierror) as s: | |
| 317 assert not supports_ipv6() | |
| 318 supports_ipv6.cache_clear() | |
| 319 assert s.called | |
| 320 | |
| 321 supports_ipv6.cache_clear() | |
| 322 with mock.patch('psutil._common.socket.socket.bind', | |
| 323 side_effect=socket.gaierror) as s: | |
| 324 assert not supports_ipv6() | |
| 325 supports_ipv6.cache_clear() | |
| 326 assert s.called | |
| 327 else: | |
| 328 with self.assertRaises(Exception): | |
| 329 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) | |
| 330 sock.bind(("::1", 0)) | |
| 331 | |
| 332 def test_isfile_strict(self): | |
| 333 from psutil._common import isfile_strict | |
| 334 this_file = os.path.abspath(__file__) | |
| 335 assert isfile_strict(this_file) | |
| 336 assert not isfile_strict(os.path.dirname(this_file)) | |
| 337 with mock.patch('psutil._common.os.stat', | |
| 338 side_effect=OSError(errno.EPERM, "foo")): | |
| 339 self.assertRaises(OSError, isfile_strict, this_file) | |
| 340 with mock.patch('psutil._common.os.stat', | |
| 341 side_effect=OSError(errno.EACCES, "foo")): | |
| 342 self.assertRaises(OSError, isfile_strict, this_file) | |
| 343 with mock.patch('psutil._common.os.stat', | |
| 344 side_effect=OSError(errno.EINVAL, "foo")): | |
| 345 assert not isfile_strict(this_file) | |
| 346 with mock.patch('psutil._common.stat.S_ISREG', return_value=False): | |
| 347 assert not isfile_strict(this_file) | |
| 348 | |
| 349 def test_serialization(self): | |
| 350 def check(ret): | |
| 351 if json is not None: | |
| 352 json.loads(json.dumps(ret)) | |
| 353 a = pickle.dumps(ret) | |
| 354 b = pickle.loads(a) | |
| 355 self.assertEqual(ret, b) | |
| 356 | |
| 357 check(psutil.Process().as_dict()) | |
| 358 check(psutil.virtual_memory()) | |
| 359 check(psutil.swap_memory()) | |
| 360 check(psutil.cpu_times()) | |
| 361 check(psutil.cpu_times_percent(interval=0)) | |
| 362 check(psutil.net_io_counters()) | |
| 363 if LINUX and not os.path.exists('/proc/diskstats'): | |
| 364 pass | |
| 365 else: | |
| 366 if not APPVEYOR: | |
| 367 check(psutil.disk_io_counters()) | |
| 368 check(psutil.disk_partitions()) | |
| 369 check(psutil.disk_usage(os.getcwd())) | |
| 370 check(psutil.users()) | |
| 371 | |
| 372 def test_setup_script(self): | |
| 373 setup_py = os.path.join(ROOT_DIR, 'setup.py') | |
| 374 if TRAVIS and not os.path.exists(setup_py): | |
| 375 return self.skipTest("can't find setup.py") | |
| 376 module = import_module_by_path(setup_py) | |
| 377 self.assertRaises(SystemExit, module.setup) | |
| 378 self.assertEqual(module.get_version(), psutil.__version__) | |
| 379 | |
| 380 def test_ad_on_process_creation(self): | |
| 381 # We are supposed to be able to instantiate Process also in case | |
| 382 # of zombie processes or access denied. | |
| 383 with mock.patch.object(psutil.Process, 'create_time', | |
| 384 side_effect=psutil.AccessDenied) as meth: | |
| 385 psutil.Process() | |
| 386 assert meth.called | |
| 387 with mock.patch.object(psutil.Process, 'create_time', | |
| 388 side_effect=psutil.ZombieProcess(1)) as meth: | |
| 389 psutil.Process() | |
| 390 assert meth.called | |
| 391 with mock.patch.object(psutil.Process, 'create_time', | |
| 392 side_effect=ValueError) as meth: | |
| 393 with self.assertRaises(ValueError): | |
| 394 psutil.Process() | |
| 395 assert meth.called | |
| 396 | |
| 397 def test_sanity_version_check(self): | |
| 398 # see: https://github.com/giampaolo/psutil/issues/564 | |
| 399 with mock.patch( | |
| 400 "psutil._psplatform.cext.version", return_value="0.0.0"): | |
| 401 with self.assertRaises(ImportError) as cm: | |
| 402 reload_module(psutil) | |
| 403 self.assertIn("version conflict", str(cm.exception).lower()) | |
| 404 | |
| 405 | |
| 406 # =================================================================== | |
| 407 # --- Tests for wrap_numbers() function. | |
| 408 # =================================================================== | |
| 409 | |
| 410 | |
| 411 nt = collections.namedtuple('foo', 'a b c') | |
| 412 | |
| 413 | |
| 414 class TestWrapNumbers(unittest.TestCase): | |
| 415 | |
| 416 def setUp(self): | |
| 417 wrap_numbers.cache_clear() | |
| 418 | |
| 419 tearDown = setUp | |
| 420 | |
| 421 def test_first_call(self): | |
| 422 input = {'disk1': nt(5, 5, 5)} | |
| 423 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
| 424 | |
| 425 def test_input_hasnt_changed(self): | |
| 426 input = {'disk1': nt(5, 5, 5)} | |
| 427 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
| 428 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
| 429 | |
| 430 def test_increase_but_no_wrap(self): | |
| 431 input = {'disk1': nt(5, 5, 5)} | |
| 432 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
| 433 input = {'disk1': nt(10, 15, 20)} | |
| 434 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
| 435 input = {'disk1': nt(20, 25, 30)} | |
| 436 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
| 437 input = {'disk1': nt(20, 25, 30)} | |
| 438 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
| 439 | |
| 440 def test_wrap(self): | |
| 441 # let's say 100 is the threshold | |
| 442 input = {'disk1': nt(100, 100, 100)} | |
| 443 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
| 444 # first wrap restarts from 10 | |
| 445 input = {'disk1': nt(100, 100, 10)} | |
| 446 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
| 447 {'disk1': nt(100, 100, 110)}) | |
| 448 # then it remains the same | |
| 449 input = {'disk1': nt(100, 100, 10)} | |
| 450 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
| 451 {'disk1': nt(100, 100, 110)}) | |
| 452 # then it goes up | |
| 453 input = {'disk1': nt(100, 100, 90)} | |
| 454 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
| 455 {'disk1': nt(100, 100, 190)}) | |
| 456 # then it wraps again | |
| 457 input = {'disk1': nt(100, 100, 20)} | |
| 458 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
| 459 {'disk1': nt(100, 100, 210)}) | |
| 460 # and remains the same | |
| 461 input = {'disk1': nt(100, 100, 20)} | |
| 462 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
| 463 {'disk1': nt(100, 100, 210)}) | |
| 464 # now wrap another num | |
| 465 input = {'disk1': nt(50, 100, 20)} | |
| 466 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
| 467 {'disk1': nt(150, 100, 210)}) | |
| 468 # and again | |
| 469 input = {'disk1': nt(40, 100, 20)} | |
| 470 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
| 471 {'disk1': nt(190, 100, 210)}) | |
| 472 # keep it the same | |
| 473 input = {'disk1': nt(40, 100, 20)} | |
| 474 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
| 475 {'disk1': nt(190, 100, 210)}) | |
| 476 | |
| 477 def test_changing_keys(self): | |
| 478 # Emulate a case where the second call to disk_io() | |
| 479 # (or whatever) provides a new disk, then the new disk | |
| 480 # disappears on the third call. | |
| 481 input = {'disk1': nt(5, 5, 5)} | |
| 482 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
| 483 input = {'disk1': nt(5, 5, 5), | |
| 484 'disk2': nt(7, 7, 7)} | |
| 485 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
| 486 input = {'disk1': nt(8, 8, 8)} | |
| 487 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
| 488 | |
| 489 def test_changing_keys_w_wrap(self): | |
| 490 input = {'disk1': nt(50, 50, 50), | |
| 491 'disk2': nt(100, 100, 100)} | |
| 492 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
| 493 # disk 2 wraps | |
| 494 input = {'disk1': nt(50, 50, 50), | |
| 495 'disk2': nt(100, 100, 10)} | |
| 496 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
| 497 {'disk1': nt(50, 50, 50), | |
| 498 'disk2': nt(100, 100, 110)}) | |
| 499 # disk 2 disappears | |
| 500 input = {'disk1': nt(50, 50, 50)} | |
| 501 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
| 502 | |
| 503 # then it appears again; the old wrap is supposed to be | |
| 504 # gone. | |
| 505 input = {'disk1': nt(50, 50, 50), | |
| 506 'disk2': nt(100, 100, 100)} | |
| 507 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
| 508 # remains the same | |
| 509 input = {'disk1': nt(50, 50, 50), | |
| 510 'disk2': nt(100, 100, 100)} | |
| 511 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
| 512 # and then wraps again | |
| 513 input = {'disk1': nt(50, 50, 50), | |
| 514 'disk2': nt(100, 100, 10)} | |
| 515 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
| 516 {'disk1': nt(50, 50, 50), | |
| 517 'disk2': nt(100, 100, 110)}) | |
| 518 | |
| 519 def test_real_data(self): | |
| 520 d = {'nvme0n1': (300, 508, 640, 1571, 5970, 1987, 2049, 451751, 47048), | |
| 521 'nvme0n1p1': (1171, 2, 5600256, 1024, 516, 0, 0, 0, 8), | |
| 522 'nvme0n1p2': (54, 54, 2396160, 5165056, 4, 24, 30, 1207, 28), | |
| 523 'nvme0n1p3': (2389, 4539, 5154, 150, 4828, 1844, 2019, 398, 348)} | |
| 524 self.assertEqual(wrap_numbers(d, 'disk_io'), d) | |
| 525 self.assertEqual(wrap_numbers(d, 'disk_io'), d) | |
| 526 # decrease this ↓ | |
| 527 d = {'nvme0n1': (100, 508, 640, 1571, 5970, 1987, 2049, 451751, 47048), | |
| 528 'nvme0n1p1': (1171, 2, 5600256, 1024, 516, 0, 0, 0, 8), | |
| 529 'nvme0n1p2': (54, 54, 2396160, 5165056, 4, 24, 30, 1207, 28), | |
| 530 'nvme0n1p3': (2389, 4539, 5154, 150, 4828, 1844, 2019, 398, 348)} | |
| 531 out = wrap_numbers(d, 'disk_io') | |
| 532 self.assertEqual(out['nvme0n1'][0], 400) | |
| 533 | |
| 534 # --- cache tests | |
| 535 | |
| 536 def test_cache_first_call(self): | |
| 537 input = {'disk1': nt(5, 5, 5)} | |
| 538 wrap_numbers(input, 'disk_io') | |
| 539 cache = wrap_numbers.cache_info() | |
| 540 self.assertEqual(cache[0], {'disk_io': input}) | |
| 541 self.assertEqual(cache[1], {'disk_io': {}}) | |
| 542 self.assertEqual(cache[2], {'disk_io': {}}) | |
| 543 | |
| 544 def test_cache_call_twice(self): | |
| 545 input = {'disk1': nt(5, 5, 5)} | |
| 546 wrap_numbers(input, 'disk_io') | |
| 547 input = {'disk1': nt(10, 10, 10)} | |
| 548 wrap_numbers(input, 'disk_io') | |
| 549 cache = wrap_numbers.cache_info() | |
| 550 self.assertEqual(cache[0], {'disk_io': input}) | |
| 551 self.assertEqual( | |
| 552 cache[1], | |
| 553 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 0}}) | |
| 554 self.assertEqual(cache[2], {'disk_io': {}}) | |
| 555 | |
| 556 def test_cache_wrap(self): | |
| 557 # let's say 100 is the threshold | |
| 558 input = {'disk1': nt(100, 100, 100)} | |
| 559 wrap_numbers(input, 'disk_io') | |
| 560 | |
| 561 # first wrap restarts from 10 | |
| 562 input = {'disk1': nt(100, 100, 10)} | |
| 563 wrap_numbers(input, 'disk_io') | |
| 564 cache = wrap_numbers.cache_info() | |
| 565 self.assertEqual(cache[0], {'disk_io': input}) | |
| 566 self.assertEqual( | |
| 567 cache[1], | |
| 568 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 100}}) | |
| 569 self.assertEqual(cache[2], {'disk_io': {'disk1': set([('disk1', 2)])}}) | |
| 570 | |
| 571 def assert_(): | |
| 572 cache = wrap_numbers.cache_info() | |
| 573 self.assertEqual( | |
| 574 cache[1], | |
| 575 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, | |
| 576 ('disk1', 2): 100}}) | |
| 577 self.assertEqual(cache[2], | |
| 578 {'disk_io': {'disk1': set([('disk1', 2)])}}) | |
| 579 | |
| 580 # then it remains the same | |
| 581 input = {'disk1': nt(100, 100, 10)} | |
| 582 wrap_numbers(input, 'disk_io') | |
| 583 cache = wrap_numbers.cache_info() | |
| 584 self.assertEqual(cache[0], {'disk_io': input}) | |
| 585 assert_() | |
| 586 | |
| 587 # then it goes up | |
| 588 input = {'disk1': nt(100, 100, 90)} | |
| 589 wrap_numbers(input, 'disk_io') | |
| 590 cache = wrap_numbers.cache_info() | |
| 591 self.assertEqual(cache[0], {'disk_io': input}) | |
| 592 assert_() | |
| 593 | |
| 594 # then it wraps again | |
| 595 input = {'disk1': nt(100, 100, 20)} | |
| 596 wrap_numbers(input, 'disk_io') | |
| 597 cache = wrap_numbers.cache_info() | |
| 598 self.assertEqual(cache[0], {'disk_io': input}) | |
| 599 self.assertEqual( | |
| 600 cache[1], | |
| 601 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 190}}) | |
| 602 self.assertEqual(cache[2], {'disk_io': {'disk1': set([('disk1', 2)])}}) | |
| 603 | |
| 604 def test_cache_changing_keys(self): | |
| 605 input = {'disk1': nt(5, 5, 5)} | |
| 606 wrap_numbers(input, 'disk_io') | |
| 607 input = {'disk1': nt(5, 5, 5), | |
| 608 'disk2': nt(7, 7, 7)} | |
| 609 wrap_numbers(input, 'disk_io') | |
| 610 cache = wrap_numbers.cache_info() | |
| 611 self.assertEqual(cache[0], {'disk_io': input}) | |
| 612 self.assertEqual( | |
| 613 cache[1], | |
| 614 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 0}}) | |
| 615 self.assertEqual(cache[2], {'disk_io': {}}) | |
| 616 | |
| 617 def test_cache_clear(self): | |
| 618 input = {'disk1': nt(5, 5, 5)} | |
| 619 wrap_numbers(input, 'disk_io') | |
| 620 wrap_numbers(input, 'disk_io') | |
| 621 wrap_numbers.cache_clear('disk_io') | |
| 622 self.assertEqual(wrap_numbers.cache_info(), ({}, {}, {})) | |
| 623 wrap_numbers.cache_clear('disk_io') | |
| 624 wrap_numbers.cache_clear('?!?') | |
| 625 | |
| 626 @unittest.skipIf(not HAS_NET_IO_COUNTERS, 'not supported') | |
| 627 def test_cache_clear_public_apis(self): | |
| 628 if not psutil.disk_io_counters() or not psutil.net_io_counters(): | |
| 629 return self.skipTest("no disks or NICs available") | |
| 630 psutil.disk_io_counters() | |
| 631 psutil.net_io_counters() | |
| 632 caches = wrap_numbers.cache_info() | |
| 633 for cache in caches: | |
| 634 self.assertIn('psutil.disk_io_counters', cache) | |
| 635 self.assertIn('psutil.net_io_counters', cache) | |
| 636 | |
| 637 psutil.disk_io_counters.cache_clear() | |
| 638 caches = wrap_numbers.cache_info() | |
| 639 for cache in caches: | |
| 640 self.assertIn('psutil.net_io_counters', cache) | |
| 641 self.assertNotIn('psutil.disk_io_counters', cache) | |
| 642 | |
| 643 psutil.net_io_counters.cache_clear() | |
| 644 caches = wrap_numbers.cache_info() | |
| 645 self.assertEqual(caches, ({}, {}, {})) | |
| 646 | |
| 647 | |
| 648 # =================================================================== | |
| 649 # --- Example script tests | |
| 650 # =================================================================== | |
| 651 | |
| 652 | |
| 653 @unittest.skipIf(TOX, "can't test on TOX") | |
| 654 # See: https://travis-ci.org/giampaolo/psutil/jobs/295224806 | |
| 655 @unittest.skipIf(TRAVIS and not os.path.exists(SCRIPTS_DIR), | |
| 656 "can't locate scripts directory") | |
| 657 class TestScripts(unittest.TestCase): | |
| 658 """Tests for scripts in the "scripts" directory.""" | |
| 659 | |
| 660 @staticmethod | |
| 661 def assert_stdout(exe, *args, **kwargs): | |
| 662 exe = '%s' % os.path.join(SCRIPTS_DIR, exe) | |
| 663 cmd = [PYTHON_EXE, exe] | |
| 664 for arg in args: | |
| 665 cmd.append(arg) | |
| 666 try: | |
| 667 out = sh(cmd, **kwargs).strip() | |
| 668 except RuntimeError as err: | |
| 669 if 'AccessDenied' in str(err): | |
| 670 return str(err) | |
| 671 else: | |
| 672 raise | |
| 673 assert out, out | |
| 674 return out | |
| 675 | |
| 676 @staticmethod | |
| 677 def assert_syntax(exe, args=None): | |
| 678 exe = os.path.join(SCRIPTS_DIR, exe) | |
| 679 if PY3: | |
| 680 f = open(exe, 'rt', encoding='utf8') | |
| 681 else: | |
| 682 f = open(exe, 'rt') | |
| 683 with f: | |
| 684 src = f.read() | |
| 685 ast.parse(src) | |
| 686 | |
| 687 def test_coverage(self): | |
| 688 # make sure all example scripts have a test method defined | |
| 689 meths = dir(self) | |
| 690 for name in os.listdir(SCRIPTS_DIR): | |
| 691 if name.endswith('.py'): | |
| 692 if 'test_' + os.path.splitext(name)[0] not in meths: | |
| 693 # self.assert_stdout(name) | |
| 694 self.fail('no test defined for %r script' | |
| 695 % os.path.join(SCRIPTS_DIR, name)) | |
| 696 | |
| 697 @unittest.skipIf(not POSIX, "POSIX only") | |
| 698 def test_executable(self): | |
| 699 for name in os.listdir(SCRIPTS_DIR): | |
| 700 if name.endswith('.py'): | |
| 701 path = os.path.join(SCRIPTS_DIR, name) | |
| 702 if not stat.S_IXUSR & os.stat(path)[stat.ST_MODE]: | |
| 703 self.fail('%r is not executable' % path) | |
| 704 | |
| 705 def test_disk_usage(self): | |
| 706 self.assert_stdout('disk_usage.py') | |
| 707 | |
| 708 def test_free(self): | |
| 709 self.assert_stdout('free.py') | |
| 710 | |
| 711 def test_meminfo(self): | |
| 712 self.assert_stdout('meminfo.py') | |
| 713 | |
| 714 def test_procinfo(self): | |
| 715 self.assert_stdout('procinfo.py', str(os.getpid())) | |
| 716 | |
| 717 @unittest.skipIf(CI_TESTING and not psutil.users(), "no users") | |
| 718 def test_who(self): | |
| 719 self.assert_stdout('who.py') | |
| 720 | |
| 721 def test_ps(self): | |
| 722 self.assert_stdout('ps.py') | |
| 723 | |
| 724 def test_pstree(self): | |
| 725 self.assert_stdout('pstree.py') | |
| 726 | |
| 727 def test_netstat(self): | |
| 728 self.assert_stdout('netstat.py') | |
| 729 | |
| 730 # permission denied on travis | |
| 731 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS") | |
| 732 def test_ifconfig(self): | |
| 733 self.assert_stdout('ifconfig.py') | |
| 734 | |
| 735 @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported") | |
| 736 def test_pmap(self): | |
| 737 self.assert_stdout('pmap.py', str(os.getpid())) | |
| 738 | |
| 739 def test_procsmem(self): | |
| 740 if 'uss' not in psutil.Process().memory_full_info()._fields: | |
| 741 raise self.skipTest("not supported") | |
| 742 self.assert_stdout('procsmem.py', stderr=DEVNULL) | |
| 743 | |
| 744 def test_killall(self): | |
| 745 self.assert_syntax('killall.py') | |
| 746 | |
| 747 def test_nettop(self): | |
| 748 self.assert_syntax('nettop.py') | |
| 749 | |
| 750 def test_top(self): | |
| 751 self.assert_syntax('top.py') | |
| 752 | |
| 753 def test_iotop(self): | |
| 754 self.assert_syntax('iotop.py') | |
| 755 | |
| 756 def test_pidof(self): | |
| 757 output = self.assert_stdout('pidof.py', psutil.Process().name()) | |
| 758 self.assertIn(str(os.getpid()), output) | |
| 759 | |
| 760 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
| 761 def test_winservices(self): | |
| 762 self.assert_stdout('winservices.py') | |
| 763 | |
| 764 def test_cpu_distribution(self): | |
| 765 self.assert_syntax('cpu_distribution.py') | |
| 766 | |
| 767 @unittest.skipIf(not HAS_SENSORS_TEMPERATURES, "not supported") | |
| 768 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS") | |
| 769 def test_temperatures(self): | |
| 770 if not psutil.sensors_temperatures(): | |
| 771 self.skipTest("no temperatures") | |
| 772 self.assert_stdout('temperatures.py') | |
| 773 | |
| 774 @unittest.skipIf(not HAS_SENSORS_FANS, "not supported") | |
| 775 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS") | |
| 776 def test_fans(self): | |
| 777 if not psutil.sensors_fans(): | |
| 778 self.skipTest("no fans") | |
| 779 self.assert_stdout('fans.py') | |
| 780 | |
| 781 @unittest.skipIf(not HAS_SENSORS_BATTERY, "not supported") | |
| 782 @unittest.skipIf(not HAS_BATTERY, "no battery") | |
| 783 def test_battery(self): | |
| 784 self.assert_stdout('battery.py') | |
| 785 | |
| 786 def test_sensors(self): | |
| 787 self.assert_stdout('sensors.py') | |
| 788 | |
| 789 | |
| 790 # =================================================================== | |
| 791 # --- Unit tests for test utilities. | |
| 792 # =================================================================== | |
| 793 | |
| 794 | |
| 795 class TestRetryDecorator(unittest.TestCase): | |
| 796 | |
| 797 @mock.patch('time.sleep') | |
| 798 def test_retry_success(self, sleep): | |
| 799 # Fail 3 times out of 5; make sure the decorated fun returns. | |
| 800 | |
| 801 @retry(retries=5, interval=1, logfun=None) | |
| 802 def foo(): | |
| 803 while queue: | |
| 804 queue.pop() | |
| 805 1 / 0 | |
| 806 return 1 | |
| 807 | |
| 808 queue = list(range(3)) | |
| 809 self.assertEqual(foo(), 1) | |
| 810 self.assertEqual(sleep.call_count, 3) | |
| 811 | |
| 812 @mock.patch('time.sleep') | |
| 813 def test_retry_failure(self, sleep): | |
| 814 # Fail 6 times out of 5; th function is supposed to raise exc. | |
| 815 | |
| 816 @retry(retries=5, interval=1, logfun=None) | |
| 817 def foo(): | |
| 818 while queue: | |
| 819 queue.pop() | |
| 820 1 / 0 | |
| 821 return 1 | |
| 822 | |
| 823 queue = list(range(6)) | |
| 824 self.assertRaises(ZeroDivisionError, foo) | |
| 825 self.assertEqual(sleep.call_count, 5) | |
| 826 | |
| 827 @mock.patch('time.sleep') | |
| 828 def test_exception_arg(self, sleep): | |
| 829 @retry(exception=ValueError, interval=1) | |
| 830 def foo(): | |
| 831 raise TypeError | |
| 832 | |
| 833 self.assertRaises(TypeError, foo) | |
| 834 self.assertEqual(sleep.call_count, 0) | |
| 835 | |
| 836 @mock.patch('time.sleep') | |
| 837 def test_no_interval_arg(self, sleep): | |
| 838 # if interval is not specified sleep is not supposed to be called | |
| 839 | |
| 840 @retry(retries=5, interval=None, logfun=None) | |
| 841 def foo(): | |
| 842 1 / 0 | |
| 843 | |
| 844 self.assertRaises(ZeroDivisionError, foo) | |
| 845 self.assertEqual(sleep.call_count, 0) | |
| 846 | |
| 847 @mock.patch('time.sleep') | |
| 848 def test_retries_arg(self, sleep): | |
| 849 | |
| 850 @retry(retries=5, interval=1, logfun=None) | |
| 851 def foo(): | |
| 852 1 / 0 | |
| 853 | |
| 854 self.assertRaises(ZeroDivisionError, foo) | |
| 855 self.assertEqual(sleep.call_count, 5) | |
| 856 | |
| 857 @mock.patch('time.sleep') | |
| 858 def test_retries_and_timeout_args(self, sleep): | |
| 859 self.assertRaises(ValueError, retry, retries=5, timeout=1) | |
| 860 | |
| 861 | |
| 862 class TestSyncTestUtils(unittest.TestCase): | |
| 863 | |
| 864 def tearDown(self): | |
| 865 safe_rmpath(TESTFN) | |
| 866 | |
| 867 def test_wait_for_pid(self): | |
| 868 wait_for_pid(os.getpid()) | |
| 869 nopid = max(psutil.pids()) + 99999 | |
| 870 with mock.patch('psutil.tests.retry.__iter__', return_value=iter([0])): | |
| 871 self.assertRaises(psutil.NoSuchProcess, wait_for_pid, nopid) | |
| 872 | |
| 873 def test_wait_for_file(self): | |
| 874 with open(TESTFN, 'w') as f: | |
| 875 f.write('foo') | |
| 876 wait_for_file(TESTFN) | |
| 877 assert not os.path.exists(TESTFN) | |
| 878 | |
| 879 def test_wait_for_file_empty(self): | |
| 880 with open(TESTFN, 'w'): | |
| 881 pass | |
| 882 wait_for_file(TESTFN, empty=True) | |
| 883 assert not os.path.exists(TESTFN) | |
| 884 | |
| 885 def test_wait_for_file_no_file(self): | |
| 886 with mock.patch('psutil.tests.retry.__iter__', return_value=iter([0])): | |
| 887 self.assertRaises(IOError, wait_for_file, TESTFN) | |
| 888 | |
| 889 def test_wait_for_file_no_delete(self): | |
| 890 with open(TESTFN, 'w') as f: | |
| 891 f.write('foo') | |
| 892 wait_for_file(TESTFN, delete=False) | |
| 893 assert os.path.exists(TESTFN) | |
| 894 | |
| 895 def test_call_until(self): | |
| 896 ret = call_until(lambda: 1, "ret == 1") | |
| 897 self.assertEqual(ret, 1) | |
| 898 | |
| 899 | |
| 900 class TestFSTestUtils(unittest.TestCase): | |
| 901 | |
| 902 def setUp(self): | |
| 903 safe_rmpath(TESTFN) | |
| 904 | |
| 905 tearDown = setUp | |
| 906 | |
| 907 def test_open_text(self): | |
| 908 with open_text(__file__) as f: | |
| 909 self.assertEqual(f.mode, 'rt') | |
| 910 | |
| 911 def test_open_binary(self): | |
| 912 with open_binary(__file__) as f: | |
| 913 self.assertEqual(f.mode, 'rb') | |
| 914 | |
| 915 def test_safe_mkdir(self): | |
| 916 safe_mkdir(TESTFN) | |
| 917 assert os.path.isdir(TESTFN) | |
| 918 safe_mkdir(TESTFN) | |
| 919 assert os.path.isdir(TESTFN) | |
| 920 | |
| 921 def test_safe_rmpath(self): | |
| 922 # test file is removed | |
| 923 open(TESTFN, 'w').close() | |
| 924 safe_rmpath(TESTFN) | |
| 925 assert not os.path.exists(TESTFN) | |
| 926 # test no exception if path does not exist | |
| 927 safe_rmpath(TESTFN) | |
| 928 # test dir is removed | |
| 929 os.mkdir(TESTFN) | |
| 930 safe_rmpath(TESTFN) | |
| 931 assert not os.path.exists(TESTFN) | |
| 932 # test other exceptions are raised | |
| 933 with mock.patch('psutil.tests.os.stat', | |
| 934 side_effect=OSError(errno.EINVAL, "")) as m: | |
| 935 with self.assertRaises(OSError): | |
| 936 safe_rmpath(TESTFN) | |
| 937 assert m.called | |
| 938 | |
| 939 def test_chdir(self): | |
| 940 base = os.getcwd() | |
| 941 os.mkdir(TESTFN) | |
| 942 with chdir(TESTFN): | |
| 943 self.assertEqual(os.getcwd(), os.path.join(base, TESTFN)) | |
| 944 self.assertEqual(os.getcwd(), base) | |
| 945 | |
| 946 | |
| 947 class TestProcessUtils(unittest.TestCase): | |
| 948 | |
| 949 def test_reap_children(self): | |
| 950 subp = get_test_subprocess() | |
| 951 p = psutil.Process(subp.pid) | |
| 952 assert p.is_running() | |
| 953 reap_children() | |
| 954 assert not p.is_running() | |
| 955 assert not psutil.tests._pids_started | |
| 956 assert not psutil.tests._subprocesses_started | |
| 957 | |
| 958 def test_create_proc_children_pair(self): | |
| 959 p1, p2 = create_proc_children_pair() | |
| 960 self.assertNotEqual(p1.pid, p2.pid) | |
| 961 assert p1.is_running() | |
| 962 assert p2.is_running() | |
| 963 children = psutil.Process().children(recursive=True) | |
| 964 self.assertEqual(len(children), 2) | |
| 965 self.assertIn(p1, children) | |
| 966 self.assertIn(p2, children) | |
| 967 self.assertEqual(p1.ppid(), os.getpid()) | |
| 968 self.assertEqual(p2.ppid(), p1.pid) | |
| 969 | |
| 970 # make sure both of them are cleaned up | |
| 971 reap_children() | |
| 972 assert not p1.is_running() | |
| 973 assert not p2.is_running() | |
| 974 assert not psutil.tests._pids_started | |
| 975 assert not psutil.tests._subprocesses_started | |
| 976 | |
| 977 @unittest.skipIf(not POSIX, "POSIX only") | |
| 978 def test_create_zombie_proc(self): | |
| 979 zpid = create_zombie_proc() | |
| 980 self.addCleanup(reap_children, recursive=True) | |
| 981 p = psutil.Process(zpid) | |
| 982 self.assertEqual(p.status(), psutil.STATUS_ZOMBIE) | |
| 983 | |
| 984 | |
| 985 class TestNetUtils(unittest.TestCase): | |
| 986 | |
| 987 def bind_socket(self): | |
| 988 port = get_free_port() | |
| 989 with contextlib.closing(bind_socket(addr=('', port))) as s: | |
| 990 self.assertEqual(s.getsockname()[1], port) | |
| 991 | |
| 992 @unittest.skipIf(not POSIX, "POSIX only") | |
| 993 def test_bind_unix_socket(self): | |
| 994 with unix_socket_path() as name: | |
| 995 sock = bind_unix_socket(name) | |
| 996 with contextlib.closing(sock): | |
| 997 self.assertEqual(sock.family, socket.AF_UNIX) | |
| 998 self.assertEqual(sock.type, socket.SOCK_STREAM) | |
| 999 self.assertEqual(sock.getsockname(), name) | |
| 1000 assert os.path.exists(name) | |
| 1001 assert stat.S_ISSOCK(os.stat(name).st_mode) | |
| 1002 # UDP | |
| 1003 with unix_socket_path() as name: | |
| 1004 sock = bind_unix_socket(name, type=socket.SOCK_DGRAM) | |
| 1005 with contextlib.closing(sock): | |
| 1006 self.assertEqual(sock.type, socket.SOCK_DGRAM) | |
| 1007 | |
| 1008 def tcp_tcp_socketpair(self): | |
| 1009 addr = ("127.0.0.1", get_free_port()) | |
| 1010 server, client = tcp_socketpair(socket.AF_INET, addr=addr) | |
| 1011 with contextlib.closing(server): | |
| 1012 with contextlib.closing(client): | |
| 1013 # Ensure they are connected and the positions are | |
| 1014 # correct. | |
| 1015 self.assertEqual(server.getsockname(), addr) | |
| 1016 self.assertEqual(client.getpeername(), addr) | |
| 1017 self.assertNotEqual(client.getsockname(), addr) | |
| 1018 | |
| 1019 @unittest.skipIf(not POSIX, "POSIX only") | |
| 1020 @unittest.skipIf(NETBSD or FREEBSD, | |
| 1021 "/var/run/log UNIX socket opened by default") | |
| 1022 def test_unix_socketpair(self): | |
| 1023 p = psutil.Process() | |
| 1024 num_fds = p.num_fds() | |
| 1025 assert not p.connections(kind='unix') | |
| 1026 with unix_socket_path() as name: | |
| 1027 server, client = unix_socketpair(name) | |
| 1028 try: | |
| 1029 assert os.path.exists(name) | |
| 1030 assert stat.S_ISSOCK(os.stat(name).st_mode) | |
| 1031 self.assertEqual(p.num_fds() - num_fds, 2) | |
| 1032 self.assertEqual(len(p.connections(kind='unix')), 2) | |
| 1033 self.assertEqual(server.getsockname(), name) | |
| 1034 self.assertEqual(client.getpeername(), name) | |
| 1035 finally: | |
| 1036 client.close() | |
| 1037 server.close() | |
| 1038 | |
| 1039 def test_create_sockets(self): | |
| 1040 with create_sockets() as socks: | |
| 1041 fams = collections.defaultdict(int) | |
| 1042 types = collections.defaultdict(int) | |
| 1043 for s in socks: | |
| 1044 fams[s.family] += 1 | |
| 1045 # work around http://bugs.python.org/issue30204 | |
| 1046 types[s.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE)] += 1 | |
| 1047 self.assertGreaterEqual(fams[socket.AF_INET], 2) | |
| 1048 if supports_ipv6(): | |
| 1049 self.assertGreaterEqual(fams[socket.AF_INET6], 2) | |
| 1050 if POSIX and HAS_CONNECTIONS_UNIX: | |
| 1051 self.assertGreaterEqual(fams[socket.AF_UNIX], 2) | |
| 1052 self.assertGreaterEqual(types[socket.SOCK_STREAM], 2) | |
| 1053 self.assertGreaterEqual(types[socket.SOCK_DGRAM], 2) | |
| 1054 | |
| 1055 | |
| 1056 class TestOtherUtils(unittest.TestCase): | |
| 1057 | |
| 1058 def test_is_namedtuple(self): | |
| 1059 assert is_namedtuple(collections.namedtuple('foo', 'a b c')(1, 2, 3)) | |
| 1060 assert not is_namedtuple(tuple()) | |
| 1061 | |
| 1062 | |
| 1063 if __name__ == '__main__': | |
| 1064 from psutil.tests.runner import run | |
| 1065 run(__file__) |
