Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/psutil/tests/test_linux.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
| author | guerler |
|---|---|
| date | Fri, 31 Jul 2020 00:32:28 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| 0:d30785e31577 | 1:56ad4e20f292 |
|---|---|
| 1 #!/usr/bin/env python3 | |
| 2 | |
| 3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. | |
| 4 # Use of this source code is governed by a BSD-style license that can be | |
| 5 # found in the LICENSE file. | |
| 6 | |
| 7 """Linux specific tests.""" | |
| 8 | |
| 9 from __future__ import division | |
| 10 import collections | |
| 11 import contextlib | |
| 12 import errno | |
| 13 import glob | |
| 14 import io | |
| 15 import os | |
| 16 import re | |
| 17 import shutil | |
| 18 import socket | |
| 19 import struct | |
| 20 import textwrap | |
| 21 import time | |
| 22 import warnings | |
| 23 | |
| 24 import psutil | |
| 25 from psutil import LINUX | |
| 26 from psutil._compat import basestring | |
| 27 from psutil._compat import FileNotFoundError | |
| 28 from psutil._compat import PY3 | |
| 29 from psutil._compat import u | |
| 30 from psutil.tests import call_until | |
| 31 from psutil.tests import GLOBAL_TIMEOUT | |
| 32 from psutil.tests import HAS_BATTERY | |
| 33 from psutil.tests import HAS_CPU_FREQ | |
| 34 from psutil.tests import HAS_GETLOADAVG | |
| 35 from psutil.tests import HAS_RLIMIT | |
| 36 from psutil.tests import mock | |
| 37 from psutil.tests import PsutilTestCase | |
| 38 from psutil.tests import PYPY | |
| 39 from psutil.tests import reload_module | |
| 40 from psutil.tests import retry_on_failure | |
| 41 from psutil.tests import safe_rmpath | |
| 42 from psutil.tests import sh | |
| 43 from psutil.tests import skip_on_not_implemented | |
| 44 from psutil.tests import ThreadTask | |
| 45 from psutil.tests import TOLERANCE_DISK_USAGE | |
| 46 from psutil.tests import TOLERANCE_SYS_MEM | |
| 47 from psutil.tests import TRAVIS | |
| 48 from psutil.tests import unittest | |
| 49 from psutil.tests import which | |
| 50 | |
| 51 | |
| 52 HERE = os.path.abspath(os.path.dirname(__file__)) | |
| 53 SIOCGIFADDR = 0x8915 | |
| 54 SIOCGIFCONF = 0x8912 | |
| 55 SIOCGIFHWADDR = 0x8927 | |
| 56 if LINUX: | |
| 57 SECTOR_SIZE = 512 | |
| 58 EMPTY_TEMPERATURES = not glob.glob('/sys/class/hwmon/hwmon*') | |
| 59 | |
| 60 # ===================================================================== | |
| 61 # --- utils | |
| 62 # ===================================================================== | |
| 63 | |
| 64 | |
| 65 def get_ipv4_address(ifname): | |
| 66 import fcntl | |
| 67 ifname = ifname[:15] | |
| 68 if PY3: | |
| 69 ifname = bytes(ifname, 'ascii') | |
| 70 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| 71 with contextlib.closing(s): | |
| 72 return socket.inet_ntoa( | |
| 73 fcntl.ioctl(s.fileno(), | |
| 74 SIOCGIFADDR, | |
| 75 struct.pack('256s', ifname))[20:24]) | |
| 76 | |
| 77 | |
| 78 def get_mac_address(ifname): | |
| 79 import fcntl | |
| 80 ifname = ifname[:15] | |
| 81 if PY3: | |
| 82 ifname = bytes(ifname, 'ascii') | |
| 83 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| 84 with contextlib.closing(s): | |
| 85 info = fcntl.ioctl( | |
| 86 s.fileno(), SIOCGIFHWADDR, struct.pack('256s', ifname)) | |
| 87 if PY3: | |
| 88 def ord(x): | |
| 89 return x | |
| 90 else: | |
| 91 import __builtin__ | |
| 92 ord = __builtin__.ord | |
| 93 return ''.join(['%02x:' % ord(char) for char in info[18:24]])[:-1] | |
| 94 | |
| 95 | |
| 96 def free_swap(): | |
| 97 """Parse 'free' cmd and return swap memory's s total, used and free | |
| 98 values. | |
| 99 """ | |
| 100 out = sh('free -b', env={"LANG": "C.UTF-8"}) | |
| 101 lines = out.split('\n') | |
| 102 for line in lines: | |
| 103 if line.startswith('Swap'): | |
| 104 _, total, used, free = line.split() | |
| 105 nt = collections.namedtuple('free', 'total used free') | |
| 106 return nt(int(total), int(used), int(free)) | |
| 107 raise ValueError( | |
| 108 "can't find 'Swap' in 'free' output:\n%s" % '\n'.join(lines)) | |
| 109 | |
| 110 | |
| 111 def free_physmem(): | |
| 112 """Parse 'free' cmd and return physical memory's total, used | |
| 113 and free values. | |
| 114 """ | |
| 115 # Note: free can have 2 different formats, invalidating 'shared' | |
| 116 # and 'cached' memory which may have different positions so we | |
| 117 # do not return them. | |
| 118 # https://github.com/giampaolo/psutil/issues/538#issuecomment-57059946 | |
| 119 out = sh('free -b', env={"LANG": "C.UTF-8"}) | |
| 120 lines = out.split('\n') | |
| 121 for line in lines: | |
| 122 if line.startswith('Mem'): | |
| 123 total, used, free, shared = \ | |
| 124 [int(x) for x in line.split()[1:5]] | |
| 125 nt = collections.namedtuple( | |
| 126 'free', 'total used free shared output') | |
| 127 return nt(total, used, free, shared, out) | |
| 128 raise ValueError( | |
| 129 "can't find 'Mem' in 'free' output:\n%s" % '\n'.join(lines)) | |
| 130 | |
| 131 | |
| 132 def vmstat(stat): | |
| 133 out = sh("vmstat -s", env={"LANG": "C.UTF-8"}) | |
| 134 for line in out.split("\n"): | |
| 135 line = line.strip() | |
| 136 if stat in line: | |
| 137 return int(line.split(' ')[0]) | |
| 138 raise ValueError("can't find %r in 'vmstat' output" % stat) | |
| 139 | |
| 140 | |
| 141 def get_free_version_info(): | |
| 142 out = sh("free -V").strip() | |
| 143 if 'UNKNOWN' in out: | |
| 144 raise unittest.SkipTest("can't determine free version") | |
| 145 return tuple(map(int, out.split()[-1].split('.'))) | |
| 146 | |
| 147 | |
| 148 @contextlib.contextmanager | |
| 149 def mock_open_content(for_path, content): | |
| 150 """Mock open() builtin and forces it to return a certain `content` | |
| 151 on read() if the path being opened matches `for_path`. | |
| 152 """ | |
| 153 def open_mock(name, *args, **kwargs): | |
| 154 if name == for_path: | |
| 155 if PY3: | |
| 156 if isinstance(content, basestring): | |
| 157 return io.StringIO(content) | |
| 158 else: | |
| 159 return io.BytesIO(content) | |
| 160 else: | |
| 161 return io.BytesIO(content) | |
| 162 else: | |
| 163 return orig_open(name, *args, **kwargs) | |
| 164 | |
| 165 orig_open = open | |
| 166 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
| 167 with mock.patch(patch_point, create=True, side_effect=open_mock) as m: | |
| 168 yield m | |
| 169 | |
| 170 | |
| 171 @contextlib.contextmanager | |
| 172 def mock_open_exception(for_path, exc): | |
| 173 """Mock open() builtin and raises `exc` if the path being opened | |
| 174 matches `for_path`. | |
| 175 """ | |
| 176 def open_mock(name, *args, **kwargs): | |
| 177 if name == for_path: | |
| 178 raise exc | |
| 179 else: | |
| 180 return orig_open(name, *args, **kwargs) | |
| 181 | |
| 182 orig_open = open | |
| 183 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
| 184 with mock.patch(patch_point, create=True, side_effect=open_mock) as m: | |
| 185 yield m | |
| 186 | |
| 187 | |
| 188 # ===================================================================== | |
| 189 # --- system virtual memory | |
| 190 # ===================================================================== | |
| 191 | |
| 192 | |
| 193 @unittest.skipIf(not LINUX, "LINUX only") | |
| 194 class TestSystemVirtualMemory(PsutilTestCase): | |
| 195 | |
| 196 def test_total(self): | |
| 197 # free_value = free_physmem().total | |
| 198 # psutil_value = psutil.virtual_memory().total | |
| 199 # self.assertEqual(free_value, psutil_value) | |
| 200 vmstat_value = vmstat('total memory') * 1024 | |
| 201 psutil_value = psutil.virtual_memory().total | |
| 202 self.assertAlmostEqual(vmstat_value, psutil_value) | |
| 203 | |
| 204 @retry_on_failure() | |
| 205 def test_used(self): | |
| 206 # Older versions of procps used slab memory to calculate used memory. | |
| 207 # This got changed in: | |
| 208 # https://gitlab.com/procps-ng/procps/commit/ | |
| 209 # 05d751c4f076a2f0118b914c5e51cfbb4762ad8e | |
| 210 if get_free_version_info() < (3, 3, 12): | |
| 211 raise self.skipTest("old free version") | |
| 212 free = free_physmem() | |
| 213 free_value = free.used | |
| 214 psutil_value = psutil.virtual_memory().used | |
| 215 self.assertAlmostEqual( | |
| 216 free_value, psutil_value, delta=TOLERANCE_SYS_MEM, | |
| 217 msg='%s %s \n%s' % (free_value, psutil_value, free.output)) | |
| 218 | |
| 219 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS") | |
| 220 @retry_on_failure() | |
| 221 def test_free(self): | |
| 222 vmstat_value = vmstat('free memory') * 1024 | |
| 223 psutil_value = psutil.virtual_memory().free | |
| 224 self.assertAlmostEqual( | |
| 225 vmstat_value, psutil_value, delta=TOLERANCE_SYS_MEM) | |
| 226 | |
| 227 @retry_on_failure() | |
| 228 def test_buffers(self): | |
| 229 vmstat_value = vmstat('buffer memory') * 1024 | |
| 230 psutil_value = psutil.virtual_memory().buffers | |
| 231 self.assertAlmostEqual( | |
| 232 vmstat_value, psutil_value, delta=TOLERANCE_SYS_MEM) | |
| 233 | |
| 234 # https://travis-ci.org/giampaolo/psutil/jobs/226719664 | |
| 235 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS") | |
| 236 @retry_on_failure() | |
| 237 def test_active(self): | |
| 238 vmstat_value = vmstat('active memory') * 1024 | |
| 239 psutil_value = psutil.virtual_memory().active | |
| 240 self.assertAlmostEqual( | |
| 241 vmstat_value, psutil_value, delta=TOLERANCE_SYS_MEM) | |
| 242 | |
| 243 # https://travis-ci.org/giampaolo/psutil/jobs/227242952 | |
| 244 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS") | |
| 245 @retry_on_failure() | |
| 246 def test_inactive(self): | |
| 247 vmstat_value = vmstat('inactive memory') * 1024 | |
| 248 psutil_value = psutil.virtual_memory().inactive | |
| 249 self.assertAlmostEqual( | |
| 250 vmstat_value, psutil_value, delta=TOLERANCE_SYS_MEM) | |
| 251 | |
| 252 @retry_on_failure() | |
| 253 def test_shared(self): | |
| 254 free = free_physmem() | |
| 255 free_value = free.shared | |
| 256 if free_value == 0: | |
| 257 raise unittest.SkipTest("free does not support 'shared' column") | |
| 258 psutil_value = psutil.virtual_memory().shared | |
| 259 self.assertAlmostEqual( | |
| 260 free_value, psutil_value, delta=TOLERANCE_SYS_MEM, | |
| 261 msg='%s %s \n%s' % (free_value, psutil_value, free.output)) | |
| 262 | |
| 263 @retry_on_failure() | |
| 264 def test_available(self): | |
| 265 # "free" output format has changed at some point: | |
| 266 # https://github.com/giampaolo/psutil/issues/538#issuecomment-147192098 | |
| 267 out = sh("free -b") | |
| 268 lines = out.split('\n') | |
| 269 if 'available' not in lines[0]: | |
| 270 raise unittest.SkipTest("free does not support 'available' column") | |
| 271 else: | |
| 272 free_value = int(lines[1].split()[-1]) | |
| 273 psutil_value = psutil.virtual_memory().available | |
| 274 self.assertAlmostEqual( | |
| 275 free_value, psutil_value, delta=TOLERANCE_SYS_MEM, | |
| 276 msg='%s %s \n%s' % (free_value, psutil_value, out)) | |
| 277 | |
| 278 def test_warnings_on_misses(self): | |
| 279 # Emulate a case where /proc/meminfo provides few info. | |
| 280 # psutil is supposed to set the missing fields to 0 and | |
| 281 # raise a warning. | |
| 282 with mock_open_content( | |
| 283 '/proc/meminfo', | |
| 284 textwrap.dedent("""\ | |
| 285 Active(anon): 6145416 kB | |
| 286 Active(file): 2950064 kB | |
| 287 Inactive(anon): 574764 kB | |
| 288 Inactive(file): 1567648 kB | |
| 289 MemAvailable: -1 kB | |
| 290 MemFree: 2057400 kB | |
| 291 MemTotal: 16325648 kB | |
| 292 SReclaimable: 346648 kB | |
| 293 """).encode()) as m: | |
| 294 with warnings.catch_warnings(record=True) as ws: | |
| 295 warnings.simplefilter("always") | |
| 296 ret = psutil.virtual_memory() | |
| 297 assert m.called | |
| 298 self.assertEqual(len(ws), 1) | |
| 299 w = ws[0] | |
| 300 assert w.filename.endswith('psutil/_pslinux.py') | |
| 301 self.assertIn( | |
| 302 "memory stats couldn't be determined", str(w.message)) | |
| 303 self.assertIn("cached", str(w.message)) | |
| 304 self.assertIn("shared", str(w.message)) | |
| 305 self.assertIn("active", str(w.message)) | |
| 306 self.assertIn("inactive", str(w.message)) | |
| 307 self.assertIn("buffers", str(w.message)) | |
| 308 self.assertIn("available", str(w.message)) | |
| 309 self.assertEqual(ret.cached, 0) | |
| 310 self.assertEqual(ret.active, 0) | |
| 311 self.assertEqual(ret.inactive, 0) | |
| 312 self.assertEqual(ret.shared, 0) | |
| 313 self.assertEqual(ret.buffers, 0) | |
| 314 self.assertEqual(ret.available, 0) | |
| 315 self.assertEqual(ret.slab, 0) | |
| 316 | |
| 317 @retry_on_failure() | |
| 318 def test_avail_old_percent(self): | |
| 319 # Make sure that our calculation of avail mem for old kernels | |
| 320 # is off by max 15%. | |
| 321 from psutil._pslinux import calculate_avail_vmem | |
| 322 from psutil._pslinux import open_binary | |
| 323 | |
| 324 mems = {} | |
| 325 with open_binary('/proc/meminfo') as f: | |
| 326 for line in f: | |
| 327 fields = line.split() | |
| 328 mems[fields[0]] = int(fields[1]) * 1024 | |
| 329 | |
| 330 a = calculate_avail_vmem(mems) | |
| 331 if b'MemAvailable:' in mems: | |
| 332 b = mems[b'MemAvailable:'] | |
| 333 diff_percent = abs(a - b) / a * 100 | |
| 334 self.assertLess(diff_percent, 15) | |
| 335 | |
| 336 def test_avail_old_comes_from_kernel(self): | |
| 337 # Make sure "MemAvailable:" coluimn is used instead of relying | |
| 338 # on our internal algorithm to calculate avail mem. | |
| 339 with mock_open_content( | |
| 340 '/proc/meminfo', | |
| 341 textwrap.dedent("""\ | |
| 342 Active: 9444728 kB | |
| 343 Active(anon): 6145416 kB | |
| 344 Active(file): 2950064 kB | |
| 345 Buffers: 287952 kB | |
| 346 Cached: 4818144 kB | |
| 347 Inactive(file): 1578132 kB | |
| 348 Inactive(anon): 574764 kB | |
| 349 Inactive(file): 1567648 kB | |
| 350 MemAvailable: 6574984 kB | |
| 351 MemFree: 2057400 kB | |
| 352 MemTotal: 16325648 kB | |
| 353 Shmem: 577588 kB | |
| 354 SReclaimable: 346648 kB | |
| 355 """).encode()) as m: | |
| 356 with warnings.catch_warnings(record=True) as ws: | |
| 357 ret = psutil.virtual_memory() | |
| 358 assert m.called | |
| 359 self.assertEqual(ret.available, 6574984 * 1024) | |
| 360 w = ws[0] | |
| 361 self.assertIn( | |
| 362 "inactive memory stats couldn't be determined", str(w.message)) | |
| 363 | |
| 364 def test_avail_old_missing_fields(self): | |
| 365 # Remove Active(file), Inactive(file) and SReclaimable | |
| 366 # from /proc/meminfo and make sure the fallback is used | |
| 367 # (free + cached), | |
| 368 with mock_open_content( | |
| 369 "/proc/meminfo", | |
| 370 textwrap.dedent("""\ | |
| 371 Active: 9444728 kB | |
| 372 Active(anon): 6145416 kB | |
| 373 Buffers: 287952 kB | |
| 374 Cached: 4818144 kB | |
| 375 Inactive(file): 1578132 kB | |
| 376 Inactive(anon): 574764 kB | |
| 377 MemFree: 2057400 kB | |
| 378 MemTotal: 16325648 kB | |
| 379 Shmem: 577588 kB | |
| 380 """).encode()) as m: | |
| 381 with warnings.catch_warnings(record=True) as ws: | |
| 382 ret = psutil.virtual_memory() | |
| 383 assert m.called | |
| 384 self.assertEqual(ret.available, 2057400 * 1024 + 4818144 * 1024) | |
| 385 w = ws[0] | |
| 386 self.assertIn( | |
| 387 "inactive memory stats couldn't be determined", str(w.message)) | |
| 388 | |
| 389 def test_avail_old_missing_zoneinfo(self): | |
| 390 # Remove /proc/zoneinfo file. Make sure fallback is used | |
| 391 # (free + cached). | |
| 392 with mock_open_content( | |
| 393 "/proc/meminfo", | |
| 394 textwrap.dedent("""\ | |
| 395 Active: 9444728 kB | |
| 396 Active(anon): 6145416 kB | |
| 397 Active(file): 2950064 kB | |
| 398 Buffers: 287952 kB | |
| 399 Cached: 4818144 kB | |
| 400 Inactive(file): 1578132 kB | |
| 401 Inactive(anon): 574764 kB | |
| 402 Inactive(file): 1567648 kB | |
| 403 MemFree: 2057400 kB | |
| 404 MemTotal: 16325648 kB | |
| 405 Shmem: 577588 kB | |
| 406 SReclaimable: 346648 kB | |
| 407 """).encode()): | |
| 408 with mock_open_exception( | |
| 409 "/proc/zoneinfo", | |
| 410 IOError(errno.ENOENT, 'no such file or directory')): | |
| 411 with warnings.catch_warnings(record=True) as ws: | |
| 412 ret = psutil.virtual_memory() | |
| 413 self.assertEqual( | |
| 414 ret.available, 2057400 * 1024 + 4818144 * 1024) | |
| 415 w = ws[0] | |
| 416 self.assertIn( | |
| 417 "inactive memory stats couldn't be determined", | |
| 418 str(w.message)) | |
| 419 | |
| 420 def test_virtual_memory_mocked(self): | |
| 421 # Emulate /proc/meminfo because neither vmstat nor free return slab. | |
| 422 def open_mock(name, *args, **kwargs): | |
| 423 if name == '/proc/meminfo': | |
| 424 return io.BytesIO(textwrap.dedent("""\ | |
| 425 MemTotal: 100 kB | |
| 426 MemFree: 2 kB | |
| 427 MemAvailable: 3 kB | |
| 428 Buffers: 4 kB | |
| 429 Cached: 5 kB | |
| 430 SwapCached: 6 kB | |
| 431 Active: 7 kB | |
| 432 Inactive: 8 kB | |
| 433 Active(anon): 9 kB | |
| 434 Inactive(anon): 10 kB | |
| 435 Active(file): 11 kB | |
| 436 Inactive(file): 12 kB | |
| 437 Unevictable: 13 kB | |
| 438 Mlocked: 14 kB | |
| 439 SwapTotal: 15 kB | |
| 440 SwapFree: 16 kB | |
| 441 Dirty: 17 kB | |
| 442 Writeback: 18 kB | |
| 443 AnonPages: 19 kB | |
| 444 Mapped: 20 kB | |
| 445 Shmem: 21 kB | |
| 446 Slab: 22 kB | |
| 447 SReclaimable: 23 kB | |
| 448 SUnreclaim: 24 kB | |
| 449 KernelStack: 25 kB | |
| 450 PageTables: 26 kB | |
| 451 NFS_Unstable: 27 kB | |
| 452 Bounce: 28 kB | |
| 453 WritebackTmp: 29 kB | |
| 454 CommitLimit: 30 kB | |
| 455 Committed_AS: 31 kB | |
| 456 VmallocTotal: 32 kB | |
| 457 VmallocUsed: 33 kB | |
| 458 VmallocChunk: 34 kB | |
| 459 HardwareCorrupted: 35 kB | |
| 460 AnonHugePages: 36 kB | |
| 461 ShmemHugePages: 37 kB | |
| 462 ShmemPmdMapped: 38 kB | |
| 463 CmaTotal: 39 kB | |
| 464 CmaFree: 40 kB | |
| 465 HugePages_Total: 41 kB | |
| 466 HugePages_Free: 42 kB | |
| 467 HugePages_Rsvd: 43 kB | |
| 468 HugePages_Surp: 44 kB | |
| 469 Hugepagesize: 45 kB | |
| 470 DirectMap46k: 46 kB | |
| 471 DirectMap47M: 47 kB | |
| 472 DirectMap48G: 48 kB | |
| 473 """).encode()) | |
| 474 else: | |
| 475 return orig_open(name, *args, **kwargs) | |
| 476 | |
| 477 orig_open = open | |
| 478 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
| 479 with mock.patch(patch_point, create=True, side_effect=open_mock) as m: | |
| 480 mem = psutil.virtual_memory() | |
| 481 assert m.called | |
| 482 self.assertEqual(mem.total, 100 * 1024) | |
| 483 self.assertEqual(mem.free, 2 * 1024) | |
| 484 self.assertEqual(mem.buffers, 4 * 1024) | |
| 485 # cached mem also includes reclaimable memory | |
| 486 self.assertEqual(mem.cached, (5 + 23) * 1024) | |
| 487 self.assertEqual(mem.shared, 21 * 1024) | |
| 488 self.assertEqual(mem.active, 7 * 1024) | |
| 489 self.assertEqual(mem.inactive, 8 * 1024) | |
| 490 self.assertEqual(mem.slab, 22 * 1024) | |
| 491 self.assertEqual(mem.available, 3 * 1024) | |
| 492 | |
| 493 | |
| 494 # ===================================================================== | |
| 495 # --- system swap memory | |
| 496 # ===================================================================== | |
| 497 | |
| 498 | |
| 499 @unittest.skipIf(not LINUX, "LINUX only") | |
| 500 class TestSystemSwapMemory(PsutilTestCase): | |
| 501 | |
| 502 @staticmethod | |
| 503 def meminfo_has_swap_info(): | |
| 504 """Return True if /proc/meminfo provides swap metrics.""" | |
| 505 with open("/proc/meminfo") as f: | |
| 506 data = f.read() | |
| 507 return 'SwapTotal:' in data and 'SwapFree:' in data | |
| 508 | |
| 509 def test_total(self): | |
| 510 free_value = free_swap().total | |
| 511 psutil_value = psutil.swap_memory().total | |
| 512 return self.assertAlmostEqual( | |
| 513 free_value, psutil_value, delta=TOLERANCE_SYS_MEM) | |
| 514 | |
| 515 @retry_on_failure() | |
| 516 def test_used(self): | |
| 517 free_value = free_swap().used | |
| 518 psutil_value = psutil.swap_memory().used | |
| 519 return self.assertAlmostEqual( | |
| 520 free_value, psutil_value, delta=TOLERANCE_SYS_MEM) | |
| 521 | |
| 522 @retry_on_failure() | |
| 523 def test_free(self): | |
| 524 free_value = free_swap().free | |
| 525 psutil_value = psutil.swap_memory().free | |
| 526 return self.assertAlmostEqual( | |
| 527 free_value, psutil_value, delta=TOLERANCE_SYS_MEM) | |
| 528 | |
| 529 def test_missing_sin_sout(self): | |
| 530 with mock.patch('psutil._common.open', create=True) as m: | |
| 531 with warnings.catch_warnings(record=True) as ws: | |
| 532 warnings.simplefilter("always") | |
| 533 ret = psutil.swap_memory() | |
| 534 assert m.called | |
| 535 self.assertEqual(len(ws), 1) | |
| 536 w = ws[0] | |
| 537 assert w.filename.endswith('psutil/_pslinux.py') | |
| 538 self.assertIn( | |
| 539 "'sin' and 'sout' swap memory stats couldn't " | |
| 540 "be determined", str(w.message)) | |
| 541 self.assertEqual(ret.sin, 0) | |
| 542 self.assertEqual(ret.sout, 0) | |
| 543 | |
| 544 def test_no_vmstat_mocked(self): | |
| 545 # see https://github.com/giampaolo/psutil/issues/722 | |
| 546 with mock_open_exception( | |
| 547 "/proc/vmstat", | |
| 548 IOError(errno.ENOENT, 'no such file or directory')) as m: | |
| 549 with warnings.catch_warnings(record=True) as ws: | |
| 550 warnings.simplefilter("always") | |
| 551 ret = psutil.swap_memory() | |
| 552 assert m.called | |
| 553 self.assertEqual(len(ws), 1) | |
| 554 w = ws[0] | |
| 555 assert w.filename.endswith('psutil/_pslinux.py') | |
| 556 self.assertIn( | |
| 557 "'sin' and 'sout' swap memory stats couldn't " | |
| 558 "be determined and were set to 0", | |
| 559 str(w.message)) | |
| 560 self.assertEqual(ret.sin, 0) | |
| 561 self.assertEqual(ret.sout, 0) | |
| 562 | |
| 563 def test_meminfo_against_sysinfo(self): | |
| 564 # Make sure the content of /proc/meminfo about swap memory | |
| 565 # matches sysinfo() syscall, see: | |
| 566 # https://github.com/giampaolo/psutil/issues/1015 | |
| 567 if not self.meminfo_has_swap_info(): | |
| 568 return unittest.skip("/proc/meminfo has no swap metrics") | |
| 569 with mock.patch('psutil._pslinux.cext.linux_sysinfo') as m: | |
| 570 swap = psutil.swap_memory() | |
| 571 assert not m.called | |
| 572 import psutil._psutil_linux as cext | |
| 573 _, _, _, _, total, free, unit_multiplier = cext.linux_sysinfo() | |
| 574 total *= unit_multiplier | |
| 575 free *= unit_multiplier | |
| 576 self.assertEqual(swap.total, total) | |
| 577 self.assertAlmostEqual(swap.free, free, delta=TOLERANCE_SYS_MEM) | |
| 578 | |
| 579 def test_emulate_meminfo_has_no_metrics(self): | |
| 580 # Emulate a case where /proc/meminfo provides no swap metrics | |
| 581 # in which case sysinfo() syscall is supposed to be used | |
| 582 # as a fallback. | |
| 583 with mock_open_content("/proc/meminfo", b"") as m: | |
| 584 psutil.swap_memory() | |
| 585 assert m.called | |
| 586 | |
| 587 | |
| 588 # ===================================================================== | |
| 589 # --- system CPU | |
| 590 # ===================================================================== | |
| 591 | |
| 592 | |
| 593 @unittest.skipIf(not LINUX, "LINUX only") | |
| 594 class TestSystemCPUTimes(PsutilTestCase): | |
| 595 | |
| 596 @unittest.skipIf(TRAVIS, "unknown failure on travis") | |
| 597 def test_fields(self): | |
| 598 fields = psutil.cpu_times()._fields | |
| 599 kernel_ver = re.findall(r'\d+\.\d+\.\d+', os.uname()[2])[0] | |
| 600 kernel_ver_info = tuple(map(int, kernel_ver.split('.'))) | |
| 601 if kernel_ver_info >= (2, 6, 11): | |
| 602 self.assertIn('steal', fields) | |
| 603 else: | |
| 604 self.assertNotIn('steal', fields) | |
| 605 if kernel_ver_info >= (2, 6, 24): | |
| 606 self.assertIn('guest', fields) | |
| 607 else: | |
| 608 self.assertNotIn('guest', fields) | |
| 609 if kernel_ver_info >= (3, 2, 0): | |
| 610 self.assertIn('guest_nice', fields) | |
| 611 else: | |
| 612 self.assertNotIn('guest_nice', fields) | |
| 613 | |
| 614 | |
| 615 @unittest.skipIf(not LINUX, "LINUX only") | |
| 616 class TestSystemCPUCountLogical(PsutilTestCase): | |
| 617 | |
| 618 @unittest.skipIf(not os.path.exists("/sys/devices/system/cpu/online"), | |
| 619 "/sys/devices/system/cpu/online does not exist") | |
| 620 def test_against_sysdev_cpu_online(self): | |
| 621 with open("/sys/devices/system/cpu/online") as f: | |
| 622 value = f.read().strip() | |
| 623 if "-" in str(value): | |
| 624 value = int(value.split('-')[1]) + 1 | |
| 625 self.assertEqual(psutil.cpu_count(), value) | |
| 626 | |
| 627 @unittest.skipIf(not os.path.exists("/sys/devices/system/cpu"), | |
| 628 "/sys/devices/system/cpu does not exist") | |
| 629 def test_against_sysdev_cpu_num(self): | |
| 630 ls = os.listdir("/sys/devices/system/cpu") | |
| 631 count = len([x for x in ls if re.search(r"cpu\d+$", x) is not None]) | |
| 632 self.assertEqual(psutil.cpu_count(), count) | |
| 633 | |
| 634 @unittest.skipIf(not which("nproc"), "nproc utility not available") | |
| 635 def test_against_nproc(self): | |
| 636 num = int(sh("nproc --all")) | |
| 637 self.assertEqual(psutil.cpu_count(logical=True), num) | |
| 638 | |
| 639 @unittest.skipIf(not which("lscpu"), "lscpu utility not available") | |
| 640 def test_against_lscpu(self): | |
| 641 out = sh("lscpu -p") | |
| 642 num = len([x for x in out.split('\n') if not x.startswith('#')]) | |
| 643 self.assertEqual(psutil.cpu_count(logical=True), num) | |
| 644 | |
| 645 def test_emulate_fallbacks(self): | |
| 646 import psutil._pslinux | |
| 647 original = psutil._pslinux.cpu_count_logical() | |
| 648 # Here we want to mock os.sysconf("SC_NPROCESSORS_ONLN") in | |
| 649 # order to cause the parsing of /proc/cpuinfo and /proc/stat. | |
| 650 with mock.patch( | |
| 651 'psutil._pslinux.os.sysconf', side_effect=ValueError) as m: | |
| 652 self.assertEqual(psutil._pslinux.cpu_count_logical(), original) | |
| 653 assert m.called | |
| 654 | |
| 655 # Let's have open() return emtpy data and make sure None is | |
| 656 # returned ('cause we mimick os.cpu_count()). | |
| 657 with mock.patch('psutil._common.open', create=True) as m: | |
| 658 self.assertIsNone(psutil._pslinux.cpu_count_logical()) | |
| 659 self.assertEqual(m.call_count, 2) | |
| 660 # /proc/stat should be the last one | |
| 661 self.assertEqual(m.call_args[0][0], '/proc/stat') | |
| 662 | |
| 663 # Let's push this a bit further and make sure /proc/cpuinfo | |
| 664 # parsing works as expected. | |
| 665 with open('/proc/cpuinfo', 'rb') as f: | |
| 666 cpuinfo_data = f.read() | |
| 667 fake_file = io.BytesIO(cpuinfo_data) | |
| 668 with mock.patch('psutil._common.open', | |
| 669 return_value=fake_file, create=True) as m: | |
| 670 self.assertEqual(psutil._pslinux.cpu_count_logical(), original) | |
| 671 | |
| 672 # Finally, let's make /proc/cpuinfo return meaningless data; | |
| 673 # this way we'll fall back on relying on /proc/stat | |
| 674 with mock_open_content('/proc/cpuinfo', b"") as m: | |
| 675 self.assertEqual(psutil._pslinux.cpu_count_logical(), original) | |
| 676 m.called | |
| 677 | |
| 678 | |
| 679 @unittest.skipIf(not LINUX, "LINUX only") | |
| 680 class TestSystemCPUCountPhysical(PsutilTestCase): | |
| 681 | |
| 682 @unittest.skipIf(not which("lscpu"), "lscpu utility not available") | |
| 683 def test_against_lscpu(self): | |
| 684 out = sh("lscpu -p") | |
| 685 core_ids = set() | |
| 686 for line in out.split('\n'): | |
| 687 if not line.startswith('#'): | |
| 688 fields = line.split(',') | |
| 689 core_ids.add(fields[1]) | |
| 690 self.assertEqual(psutil.cpu_count(logical=False), len(core_ids)) | |
| 691 | |
| 692 def test_emulate_none(self): | |
| 693 with mock.patch('glob.glob', return_value=[]) as m1: | |
| 694 with mock.patch('psutil._common.open', create=True) as m2: | |
| 695 self.assertIsNone(psutil._pslinux.cpu_count_physical()) | |
| 696 assert m1.called | |
| 697 assert m2.called | |
| 698 | |
| 699 | |
| 700 @unittest.skipIf(not LINUX, "LINUX only") | |
| 701 class TestSystemCPUFrequency(PsutilTestCase): | |
| 702 | |
| 703 @unittest.skipIf(TRAVIS, "fails on Travis") | |
| 704 @unittest.skipIf(not HAS_CPU_FREQ, "not supported") | |
| 705 def test_emulate_use_second_file(self): | |
| 706 # https://github.com/giampaolo/psutil/issues/981 | |
| 707 def path_exists_mock(path): | |
| 708 if path.startswith("/sys/devices/system/cpu/cpufreq/policy"): | |
| 709 return False | |
| 710 else: | |
| 711 return orig_exists(path) | |
| 712 | |
| 713 orig_exists = os.path.exists | |
| 714 with mock.patch("os.path.exists", side_effect=path_exists_mock, | |
| 715 create=True): | |
| 716 assert psutil.cpu_freq() | |
| 717 | |
| 718 @unittest.skipIf(not HAS_CPU_FREQ, "not supported") | |
| 719 def test_emulate_use_cpuinfo(self): | |
| 720 # Emulate a case where /sys/devices/system/cpu/cpufreq* does not | |
| 721 # exist and /proc/cpuinfo is used instead. | |
| 722 def path_exists_mock(path): | |
| 723 if path.startswith('/sys/devices/system/cpu/'): | |
| 724 return False | |
| 725 else: | |
| 726 if path == "/proc/cpuinfo": | |
| 727 flags.append(None) | |
| 728 return os_path_exists(path) | |
| 729 | |
| 730 flags = [] | |
| 731 os_path_exists = os.path.exists | |
| 732 try: | |
| 733 with mock.patch("os.path.exists", side_effect=path_exists_mock): | |
| 734 reload_module(psutil._pslinux) | |
| 735 ret = psutil.cpu_freq() | |
| 736 assert ret | |
| 737 assert flags | |
| 738 self.assertEqual(ret.max, 0.0) | |
| 739 self.assertEqual(ret.min, 0.0) | |
| 740 for freq in psutil.cpu_freq(percpu=True): | |
| 741 self.assertEqual(ret.max, 0.0) | |
| 742 self.assertEqual(ret.min, 0.0) | |
| 743 finally: | |
| 744 reload_module(psutil._pslinux) | |
| 745 reload_module(psutil) | |
| 746 | |
| 747 @unittest.skipIf(not HAS_CPU_FREQ, "not supported") | |
| 748 def test_emulate_data(self): | |
| 749 def open_mock(name, *args, **kwargs): | |
| 750 if (name.endswith('/scaling_cur_freq') and | |
| 751 name.startswith("/sys/devices/system/cpu/cpufreq/policy")): | |
| 752 return io.BytesIO(b"500000") | |
| 753 elif (name.endswith('/scaling_min_freq') and | |
| 754 name.startswith("/sys/devices/system/cpu/cpufreq/policy")): | |
| 755 return io.BytesIO(b"600000") | |
| 756 elif (name.endswith('/scaling_max_freq') and | |
| 757 name.startswith("/sys/devices/system/cpu/cpufreq/policy")): | |
| 758 return io.BytesIO(b"700000") | |
| 759 elif name == '/proc/cpuinfo': | |
| 760 return io.BytesIO(b"cpu MHz : 500") | |
| 761 else: | |
| 762 return orig_open(name, *args, **kwargs) | |
| 763 | |
| 764 orig_open = open | |
| 765 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
| 766 with mock.patch(patch_point, side_effect=open_mock): | |
| 767 with mock.patch( | |
| 768 'os.path.exists', return_value=True): | |
| 769 freq = psutil.cpu_freq() | |
| 770 self.assertEqual(freq.current, 500.0) | |
| 771 # when /proc/cpuinfo is used min and max frequencies are not | |
| 772 # available and are set to 0. | |
| 773 if freq.min != 0.0: | |
| 774 self.assertEqual(freq.min, 600.0) | |
| 775 if freq.max != 0.0: | |
| 776 self.assertEqual(freq.max, 700.0) | |
| 777 | |
| 778 @unittest.skipIf(not HAS_CPU_FREQ, "not supported") | |
| 779 def test_emulate_multi_cpu(self): | |
| 780 def open_mock(name, *args, **kwargs): | |
| 781 n = name | |
| 782 if (n.endswith('/scaling_cur_freq') and | |
| 783 n.startswith("/sys/devices/system/cpu/cpufreq/policy0")): | |
| 784 return io.BytesIO(b"100000") | |
| 785 elif (n.endswith('/scaling_min_freq') and | |
| 786 n.startswith("/sys/devices/system/cpu/cpufreq/policy0")): | |
| 787 return io.BytesIO(b"200000") | |
| 788 elif (n.endswith('/scaling_max_freq') and | |
| 789 n.startswith("/sys/devices/system/cpu/cpufreq/policy0")): | |
| 790 return io.BytesIO(b"300000") | |
| 791 elif (n.endswith('/scaling_cur_freq') and | |
| 792 n.startswith("/sys/devices/system/cpu/cpufreq/policy1")): | |
| 793 return io.BytesIO(b"400000") | |
| 794 elif (n.endswith('/scaling_min_freq') and | |
| 795 n.startswith("/sys/devices/system/cpu/cpufreq/policy1")): | |
| 796 return io.BytesIO(b"500000") | |
| 797 elif (n.endswith('/scaling_max_freq') and | |
| 798 n.startswith("/sys/devices/system/cpu/cpufreq/policy1")): | |
| 799 return io.BytesIO(b"600000") | |
| 800 elif name == '/proc/cpuinfo': | |
| 801 return io.BytesIO(b"cpu MHz : 100\n" | |
| 802 b"cpu MHz : 400") | |
| 803 else: | |
| 804 return orig_open(name, *args, **kwargs) | |
| 805 | |
| 806 orig_open = open | |
| 807 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
| 808 with mock.patch(patch_point, side_effect=open_mock): | |
| 809 with mock.patch('os.path.exists', return_value=True): | |
| 810 with mock.patch('psutil._pslinux.cpu_count_logical', | |
| 811 return_value=2): | |
| 812 freq = psutil.cpu_freq(percpu=True) | |
| 813 self.assertEqual(freq[0].current, 100.0) | |
| 814 if freq[0].min != 0.0: | |
| 815 self.assertEqual(freq[0].min, 200.0) | |
| 816 if freq[0].max != 0.0: | |
| 817 self.assertEqual(freq[0].max, 300.0) | |
| 818 self.assertEqual(freq[1].current, 400.0) | |
| 819 if freq[1].min != 0.0: | |
| 820 self.assertEqual(freq[1].min, 500.0) | |
| 821 if freq[1].max != 0.0: | |
| 822 self.assertEqual(freq[1].max, 600.0) | |
| 823 | |
| 824 @unittest.skipIf(TRAVIS, "fails on Travis") | |
| 825 @unittest.skipIf(not HAS_CPU_FREQ, "not supported") | |
| 826 def test_emulate_no_scaling_cur_freq_file(self): | |
| 827 # See: https://github.com/giampaolo/psutil/issues/1071 | |
| 828 def open_mock(name, *args, **kwargs): | |
| 829 if name.endswith('/scaling_cur_freq'): | |
| 830 raise IOError(errno.ENOENT, "") | |
| 831 elif name.endswith('/cpuinfo_cur_freq'): | |
| 832 return io.BytesIO(b"200000") | |
| 833 elif name == '/proc/cpuinfo': | |
| 834 return io.BytesIO(b"cpu MHz : 200") | |
| 835 else: | |
| 836 return orig_open(name, *args, **kwargs) | |
| 837 | |
| 838 orig_open = open | |
| 839 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
| 840 with mock.patch(patch_point, side_effect=open_mock): | |
| 841 with mock.patch('os.path.exists', return_value=True): | |
| 842 with mock.patch('psutil._pslinux.cpu_count_logical', | |
| 843 return_value=1): | |
| 844 freq = psutil.cpu_freq() | |
| 845 self.assertEqual(freq.current, 200) | |
| 846 | |
| 847 | |
| 848 @unittest.skipIf(not LINUX, "LINUX only") | |
| 849 class TestSystemCPUStats(PsutilTestCase): | |
| 850 | |
| 851 @unittest.skipIf(TRAVIS, "fails on Travis") | |
| 852 def test_ctx_switches(self): | |
| 853 vmstat_value = vmstat("context switches") | |
| 854 psutil_value = psutil.cpu_stats().ctx_switches | |
| 855 self.assertAlmostEqual(vmstat_value, psutil_value, delta=500) | |
| 856 | |
| 857 @unittest.skipIf(TRAVIS, "fails on Travis") | |
| 858 def test_interrupts(self): | |
| 859 vmstat_value = vmstat("interrupts") | |
| 860 psutil_value = psutil.cpu_stats().interrupts | |
| 861 self.assertAlmostEqual(vmstat_value, psutil_value, delta=500) | |
| 862 | |
| 863 | |
| 864 @unittest.skipIf(not LINUX, "LINUX only") | |
| 865 class TestLoadAvg(PsutilTestCase): | |
| 866 | |
| 867 @unittest.skipIf(not HAS_GETLOADAVG, "not supported") | |
| 868 def test_getloadavg(self): | |
| 869 psutil_value = psutil.getloadavg() | |
| 870 with open("/proc/loadavg", "r") as f: | |
| 871 proc_value = f.read().split() | |
| 872 | |
| 873 self.assertAlmostEqual(float(proc_value[0]), psutil_value[0], delta=1) | |
| 874 self.assertAlmostEqual(float(proc_value[1]), psutil_value[1], delta=1) | |
| 875 self.assertAlmostEqual(float(proc_value[2]), psutil_value[2], delta=1) | |
| 876 | |
| 877 | |
| 878 # ===================================================================== | |
| 879 # --- system network | |
| 880 # ===================================================================== | |
| 881 | |
| 882 | |
| 883 @unittest.skipIf(not LINUX, "LINUX only") | |
| 884 class TestSystemNetIfAddrs(PsutilTestCase): | |
| 885 | |
| 886 def test_ips(self): | |
| 887 for name, addrs in psutil.net_if_addrs().items(): | |
| 888 for addr in addrs: | |
| 889 if addr.family == psutil.AF_LINK: | |
| 890 self.assertEqual(addr.address, get_mac_address(name)) | |
| 891 elif addr.family == socket.AF_INET: | |
| 892 self.assertEqual(addr.address, get_ipv4_address(name)) | |
| 893 # TODO: test for AF_INET6 family | |
| 894 | |
| 895 # XXX - not reliable when having virtual NICs installed by Docker. | |
| 896 # @unittest.skipIf(not which('ip'), "'ip' utility not available") | |
| 897 # @unittest.skipIf(TRAVIS, "skipped on Travis") | |
| 898 # def test_net_if_names(self): | |
| 899 # out = sh("ip addr").strip() | |
| 900 # nics = [x for x in psutil.net_if_addrs().keys() if ':' not in x] | |
| 901 # found = 0 | |
| 902 # for line in out.split('\n'): | |
| 903 # line = line.strip() | |
| 904 # if re.search(r"^\d+:", line): | |
| 905 # found += 1 | |
| 906 # name = line.split(':')[1].strip() | |
| 907 # self.assertIn(name, nics) | |
| 908 # self.assertEqual(len(nics), found, msg="%s\n---\n%s" % ( | |
| 909 # pprint.pformat(nics), out)) | |
| 910 | |
| 911 | |
| 912 @unittest.skipIf(not LINUX, "LINUX only") | |
| 913 class TestSystemNetIfStats(PsutilTestCase): | |
| 914 | |
| 915 def test_against_ifconfig(self): | |
| 916 for name, stats in psutil.net_if_stats().items(): | |
| 917 try: | |
| 918 out = sh("ifconfig %s" % name) | |
| 919 except RuntimeError: | |
| 920 pass | |
| 921 else: | |
| 922 # Not always reliable. | |
| 923 # self.assertEqual(stats.isup, 'RUNNING' in out, msg=out) | |
| 924 self.assertEqual(stats.mtu, | |
| 925 int(re.findall(r'(?i)MTU[: ](\d+)', out)[0])) | |
| 926 | |
| 927 | |
| 928 @unittest.skipIf(not LINUX, "LINUX only") | |
| 929 class TestSystemNetIOCounters(PsutilTestCase): | |
| 930 | |
| 931 @retry_on_failure() | |
| 932 def test_against_ifconfig(self): | |
| 933 def ifconfig(nic): | |
| 934 ret = {} | |
| 935 out = sh("ifconfig %s" % name) | |
| 936 ret['packets_recv'] = int( | |
| 937 re.findall(r'RX packets[: ](\d+)', out)[0]) | |
| 938 ret['packets_sent'] = int( | |
| 939 re.findall(r'TX packets[: ](\d+)', out)[0]) | |
| 940 ret['errin'] = int(re.findall(r'errors[: ](\d+)', out)[0]) | |
| 941 ret['errout'] = int(re.findall(r'errors[: ](\d+)', out)[1]) | |
| 942 ret['dropin'] = int(re.findall(r'dropped[: ](\d+)', out)[0]) | |
| 943 ret['dropout'] = int(re.findall(r'dropped[: ](\d+)', out)[1]) | |
| 944 ret['bytes_recv'] = int( | |
| 945 re.findall(r'RX (?:packets \d+ +)?bytes[: ](\d+)', out)[0]) | |
| 946 ret['bytes_sent'] = int( | |
| 947 re.findall(r'TX (?:packets \d+ +)?bytes[: ](\d+)', out)[0]) | |
| 948 return ret | |
| 949 | |
| 950 nio = psutil.net_io_counters(pernic=True, nowrap=False) | |
| 951 for name, stats in nio.items(): | |
| 952 try: | |
| 953 ifconfig_ret = ifconfig(name) | |
| 954 except RuntimeError: | |
| 955 continue | |
| 956 self.assertAlmostEqual( | |
| 957 stats.bytes_recv, ifconfig_ret['bytes_recv'], delta=1024 * 5) | |
| 958 self.assertAlmostEqual( | |
| 959 stats.bytes_sent, ifconfig_ret['bytes_sent'], delta=1024 * 5) | |
| 960 self.assertAlmostEqual( | |
| 961 stats.packets_recv, ifconfig_ret['packets_recv'], delta=1024) | |
| 962 self.assertAlmostEqual( | |
| 963 stats.packets_sent, ifconfig_ret['packets_sent'], delta=1024) | |
| 964 self.assertAlmostEqual( | |
| 965 stats.errin, ifconfig_ret['errin'], delta=10) | |
| 966 self.assertAlmostEqual( | |
| 967 stats.errout, ifconfig_ret['errout'], delta=10) | |
| 968 self.assertAlmostEqual( | |
| 969 stats.dropin, ifconfig_ret['dropin'], delta=10) | |
| 970 self.assertAlmostEqual( | |
| 971 stats.dropout, ifconfig_ret['dropout'], delta=10) | |
| 972 | |
| 973 | |
| 974 @unittest.skipIf(not LINUX, "LINUX only") | |
| 975 class TestSystemNetConnections(PsutilTestCase): | |
| 976 | |
| 977 @mock.patch('psutil._pslinux.socket.inet_ntop', side_effect=ValueError) | |
| 978 @mock.patch('psutil._pslinux.supports_ipv6', return_value=False) | |
| 979 def test_emulate_ipv6_unsupported(self, supports_ipv6, inet_ntop): | |
| 980 # see: https://github.com/giampaolo/psutil/issues/623 | |
| 981 try: | |
| 982 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) | |
| 983 self.addCleanup(s.close) | |
| 984 s.bind(("::1", 0)) | |
| 985 except socket.error: | |
| 986 pass | |
| 987 psutil.net_connections(kind='inet6') | |
| 988 | |
| 989 def test_emulate_unix(self): | |
| 990 with mock_open_content( | |
| 991 '/proc/net/unix', | |
| 992 textwrap.dedent("""\ | |
| 993 0: 00000003 000 000 0001 03 462170 @/tmp/dbus-Qw2hMPIU3n | |
| 994 0: 00000003 000 000 0001 03 35010 @/tmp/dbus-tB2X8h69BQ | |
| 995 0: 00000003 000 000 0001 03 34424 @/tmp/dbus-cHy80Y8O | |
| 996 000000000000000000000000000000000000000000000000000000 | |
| 997 """)) as m: | |
| 998 psutil.net_connections(kind='unix') | |
| 999 assert m.called | |
| 1000 | |
| 1001 | |
| 1002 # ===================================================================== | |
| 1003 # --- system disks | |
| 1004 # ===================================================================== | |
| 1005 | |
| 1006 | |
| 1007 @unittest.skipIf(not LINUX, "LINUX only") | |
| 1008 class TestSystemDiskPartitions(PsutilTestCase): | |
| 1009 | |
| 1010 @unittest.skipIf(not hasattr(os, 'statvfs'), "os.statvfs() not available") | |
| 1011 @skip_on_not_implemented() | |
| 1012 def test_against_df(self): | |
| 1013 # test psutil.disk_usage() and psutil.disk_partitions() | |
| 1014 # against "df -a" | |
| 1015 def df(path): | |
| 1016 out = sh('df -P -B 1 "%s"' % path).strip() | |
| 1017 lines = out.split('\n') | |
| 1018 lines.pop(0) | |
| 1019 line = lines.pop(0) | |
| 1020 dev, total, used, free = line.split()[:4] | |
| 1021 if dev == 'none': | |
| 1022 dev = '' | |
| 1023 total, used, free = int(total), int(used), int(free) | |
| 1024 return dev, total, used, free | |
| 1025 | |
| 1026 for part in psutil.disk_partitions(all=False): | |
| 1027 usage = psutil.disk_usage(part.mountpoint) | |
| 1028 dev, total, used, free = df(part.mountpoint) | |
| 1029 self.assertEqual(usage.total, total) | |
| 1030 self.assertAlmostEqual(usage.free, free, | |
| 1031 delta=TOLERANCE_DISK_USAGE) | |
| 1032 self.assertAlmostEqual(usage.used, used, | |
| 1033 delta=TOLERANCE_DISK_USAGE) | |
| 1034 | |
| 1035 def test_zfs_fs(self): | |
| 1036 # Test that ZFS partitions are returned. | |
| 1037 with open("/proc/filesystems", "r") as f: | |
| 1038 data = f.read() | |
| 1039 if 'zfs' in data: | |
| 1040 for part in psutil.disk_partitions(): | |
| 1041 if part.fstype == 'zfs': | |
| 1042 break | |
| 1043 else: | |
| 1044 self.fail("couldn't find any ZFS partition") | |
| 1045 else: | |
| 1046 # No ZFS partitions on this system. Let's fake one. | |
| 1047 fake_file = io.StringIO(u("nodev\tzfs\n")) | |
| 1048 with mock.patch('psutil._common.open', | |
| 1049 return_value=fake_file, create=True) as m1: | |
| 1050 with mock.patch( | |
| 1051 'psutil._pslinux.cext.disk_partitions', | |
| 1052 return_value=[('/dev/sdb3', '/', 'zfs', 'rw')]) as m2: | |
| 1053 ret = psutil.disk_partitions() | |
| 1054 assert m1.called | |
| 1055 assert m2.called | |
| 1056 assert ret | |
| 1057 self.assertEqual(ret[0].fstype, 'zfs') | |
| 1058 | |
| 1059 def test_emulate_realpath_fail(self): | |
| 1060 # See: https://github.com/giampaolo/psutil/issues/1307 | |
| 1061 try: | |
| 1062 with mock.patch('os.path.realpath', | |
| 1063 return_value='/non/existent') as m: | |
| 1064 with self.assertRaises(FileNotFoundError): | |
| 1065 psutil.disk_partitions() | |
| 1066 assert m.called | |
| 1067 finally: | |
| 1068 psutil.PROCFS_PATH = "/proc" | |
| 1069 | |
| 1070 | |
| 1071 @unittest.skipIf(not LINUX, "LINUX only") | |
| 1072 class TestSystemDiskIoCounters(PsutilTestCase): | |
| 1073 | |
| 1074 def test_emulate_kernel_2_4(self): | |
| 1075 # Tests /proc/diskstats parsing format for 2.4 kernels, see: | |
| 1076 # https://github.com/giampaolo/psutil/issues/767 | |
| 1077 with mock_open_content( | |
| 1078 '/proc/diskstats', | |
| 1079 " 3 0 1 hda 2 3 4 5 6 7 8 9 10 11 12"): | |
| 1080 with mock.patch('psutil._pslinux.is_storage_device', | |
| 1081 return_value=True): | |
| 1082 ret = psutil.disk_io_counters(nowrap=False) | |
| 1083 self.assertEqual(ret.read_count, 1) | |
| 1084 self.assertEqual(ret.read_merged_count, 2) | |
| 1085 self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE) | |
| 1086 self.assertEqual(ret.read_time, 4) | |
| 1087 self.assertEqual(ret.write_count, 5) | |
| 1088 self.assertEqual(ret.write_merged_count, 6) | |
| 1089 self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE) | |
| 1090 self.assertEqual(ret.write_time, 8) | |
| 1091 self.assertEqual(ret.busy_time, 10) | |
| 1092 | |
| 1093 def test_emulate_kernel_2_6_full(self): | |
| 1094 # Tests /proc/diskstats parsing format for 2.6 kernels, | |
| 1095 # lines reporting all metrics: | |
| 1096 # https://github.com/giampaolo/psutil/issues/767 | |
| 1097 with mock_open_content( | |
| 1098 '/proc/diskstats', | |
| 1099 " 3 0 hda 1 2 3 4 5 6 7 8 9 10 11"): | |
| 1100 with mock.patch('psutil._pslinux.is_storage_device', | |
| 1101 return_value=True): | |
| 1102 ret = psutil.disk_io_counters(nowrap=False) | |
| 1103 self.assertEqual(ret.read_count, 1) | |
| 1104 self.assertEqual(ret.read_merged_count, 2) | |
| 1105 self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE) | |
| 1106 self.assertEqual(ret.read_time, 4) | |
| 1107 self.assertEqual(ret.write_count, 5) | |
| 1108 self.assertEqual(ret.write_merged_count, 6) | |
| 1109 self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE) | |
| 1110 self.assertEqual(ret.write_time, 8) | |
| 1111 self.assertEqual(ret.busy_time, 10) | |
| 1112 | |
| 1113 def test_emulate_kernel_2_6_limited(self): | |
| 1114 # Tests /proc/diskstats parsing format for 2.6 kernels, | |
| 1115 # where one line of /proc/partitions return a limited | |
| 1116 # amount of metrics when it bumps into a partition | |
| 1117 # (instead of a disk). See: | |
| 1118 # https://github.com/giampaolo/psutil/issues/767 | |
| 1119 with mock_open_content( | |
| 1120 '/proc/diskstats', | |
| 1121 " 3 1 hda 1 2 3 4"): | |
| 1122 with mock.patch('psutil._pslinux.is_storage_device', | |
| 1123 return_value=True): | |
| 1124 ret = psutil.disk_io_counters(nowrap=False) | |
| 1125 self.assertEqual(ret.read_count, 1) | |
| 1126 self.assertEqual(ret.read_bytes, 2 * SECTOR_SIZE) | |
| 1127 self.assertEqual(ret.write_count, 3) | |
| 1128 self.assertEqual(ret.write_bytes, 4 * SECTOR_SIZE) | |
| 1129 | |
| 1130 self.assertEqual(ret.read_merged_count, 0) | |
| 1131 self.assertEqual(ret.read_time, 0) | |
| 1132 self.assertEqual(ret.write_merged_count, 0) | |
| 1133 self.assertEqual(ret.write_time, 0) | |
| 1134 self.assertEqual(ret.busy_time, 0) | |
| 1135 | |
| 1136 def test_emulate_include_partitions(self): | |
| 1137 # Make sure that when perdisk=True disk partitions are returned, | |
| 1138 # see: | |
| 1139 # https://github.com/giampaolo/psutil/pull/1313#issuecomment-408626842 | |
| 1140 with mock_open_content( | |
| 1141 '/proc/diskstats', | |
| 1142 textwrap.dedent("""\ | |
| 1143 3 0 nvme0n1 1 2 3 4 5 6 7 8 9 10 11 | |
| 1144 3 0 nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11 | |
| 1145 """)): | |
| 1146 with mock.patch('psutil._pslinux.is_storage_device', | |
| 1147 return_value=False): | |
| 1148 ret = psutil.disk_io_counters(perdisk=True, nowrap=False) | |
| 1149 self.assertEqual(len(ret), 2) | |
| 1150 self.assertEqual(ret['nvme0n1'].read_count, 1) | |
| 1151 self.assertEqual(ret['nvme0n1p1'].read_count, 1) | |
| 1152 self.assertEqual(ret['nvme0n1'].write_count, 5) | |
| 1153 self.assertEqual(ret['nvme0n1p1'].write_count, 5) | |
| 1154 | |
| 1155 def test_emulate_exclude_partitions(self): | |
| 1156 # Make sure that when perdisk=False partitions (e.g. 'sda1', | |
| 1157 # 'nvme0n1p1') are skipped and not included in the total count. | |
| 1158 # https://github.com/giampaolo/psutil/pull/1313#issuecomment-408626842 | |
| 1159 with mock_open_content( | |
| 1160 '/proc/diskstats', | |
| 1161 textwrap.dedent("""\ | |
| 1162 3 0 nvme0n1 1 2 3 4 5 6 7 8 9 10 11 | |
| 1163 3 0 nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11 | |
| 1164 """)): | |
| 1165 with mock.patch('psutil._pslinux.is_storage_device', | |
| 1166 return_value=False): | |
| 1167 ret = psutil.disk_io_counters(perdisk=False, nowrap=False) | |
| 1168 self.assertIsNone(ret) | |
| 1169 | |
| 1170 # | |
| 1171 def is_storage_device(name): | |
| 1172 return name == 'nvme0n1' | |
| 1173 | |
| 1174 with mock_open_content( | |
| 1175 '/proc/diskstats', | |
| 1176 textwrap.dedent("""\ | |
| 1177 3 0 nvme0n1 1 2 3 4 5 6 7 8 9 10 11 | |
| 1178 3 0 nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11 | |
| 1179 """)): | |
| 1180 with mock.patch('psutil._pslinux.is_storage_device', | |
| 1181 create=True, side_effect=is_storage_device): | |
| 1182 ret = psutil.disk_io_counters(perdisk=False, nowrap=False) | |
| 1183 self.assertEqual(ret.read_count, 1) | |
| 1184 self.assertEqual(ret.write_count, 5) | |
| 1185 | |
| 1186 def test_emulate_use_sysfs(self): | |
| 1187 def exists(path): | |
| 1188 if path == '/proc/diskstats': | |
| 1189 return False | |
| 1190 return True | |
| 1191 | |
| 1192 wprocfs = psutil.disk_io_counters(perdisk=True) | |
| 1193 with mock.patch('psutil._pslinux.os.path.exists', | |
| 1194 create=True, side_effect=exists): | |
| 1195 wsysfs = psutil.disk_io_counters(perdisk=True) | |
| 1196 self.assertEqual(len(wprocfs), len(wsysfs)) | |
| 1197 | |
| 1198 def test_emulate_not_impl(self): | |
| 1199 def exists(path): | |
| 1200 return False | |
| 1201 | |
| 1202 with mock.patch('psutil._pslinux.os.path.exists', | |
| 1203 create=True, side_effect=exists): | |
| 1204 self.assertRaises(NotImplementedError, psutil.disk_io_counters) | |
| 1205 | |
| 1206 | |
| 1207 # ===================================================================== | |
| 1208 # --- misc | |
| 1209 # ===================================================================== | |
| 1210 | |
| 1211 | |
| 1212 @unittest.skipIf(not LINUX, "LINUX only") | |
| 1213 class TestMisc(PsutilTestCase): | |
| 1214 | |
| 1215 def test_boot_time(self): | |
| 1216 vmstat_value = vmstat('boot time') | |
| 1217 psutil_value = psutil.boot_time() | |
| 1218 self.assertEqual(int(vmstat_value), int(psutil_value)) | |
| 1219 | |
| 1220 def test_no_procfs_on_import(self): | |
| 1221 my_procfs = self.get_testfn() | |
| 1222 os.mkdir(my_procfs) | |
| 1223 | |
| 1224 with open(os.path.join(my_procfs, 'stat'), 'w') as f: | |
| 1225 f.write('cpu 0 0 0 0 0 0 0 0 0 0\n') | |
| 1226 f.write('cpu0 0 0 0 0 0 0 0 0 0 0\n') | |
| 1227 f.write('cpu1 0 0 0 0 0 0 0 0 0 0\n') | |
| 1228 | |
| 1229 try: | |
| 1230 orig_open = open | |
| 1231 | |
| 1232 def open_mock(name, *args, **kwargs): | |
| 1233 if name.startswith('/proc'): | |
| 1234 raise IOError(errno.ENOENT, 'rejecting access for test') | |
| 1235 return orig_open(name, *args, **kwargs) | |
| 1236 | |
| 1237 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
| 1238 with mock.patch(patch_point, side_effect=open_mock): | |
| 1239 reload_module(psutil) | |
| 1240 | |
| 1241 self.assertRaises(IOError, psutil.cpu_times) | |
| 1242 self.assertRaises(IOError, psutil.cpu_times, percpu=True) | |
| 1243 self.assertRaises(IOError, psutil.cpu_percent) | |
| 1244 self.assertRaises(IOError, psutil.cpu_percent, percpu=True) | |
| 1245 self.assertRaises(IOError, psutil.cpu_times_percent) | |
| 1246 self.assertRaises( | |
| 1247 IOError, psutil.cpu_times_percent, percpu=True) | |
| 1248 | |
| 1249 psutil.PROCFS_PATH = my_procfs | |
| 1250 | |
| 1251 self.assertEqual(psutil.cpu_percent(), 0) | |
| 1252 self.assertEqual(sum(psutil.cpu_times_percent()), 0) | |
| 1253 | |
| 1254 # since we don't know the number of CPUs at import time, | |
| 1255 # we awkwardly say there are none until the second call | |
| 1256 per_cpu_percent = psutil.cpu_percent(percpu=True) | |
| 1257 self.assertEqual(sum(per_cpu_percent), 0) | |
| 1258 | |
| 1259 # ditto awkward length | |
| 1260 per_cpu_times_percent = psutil.cpu_times_percent(percpu=True) | |
| 1261 self.assertEqual(sum(map(sum, per_cpu_times_percent)), 0) | |
| 1262 | |
| 1263 # much user, very busy | |
| 1264 with open(os.path.join(my_procfs, 'stat'), 'w') as f: | |
| 1265 f.write('cpu 1 0 0 0 0 0 0 0 0 0\n') | |
| 1266 f.write('cpu0 1 0 0 0 0 0 0 0 0 0\n') | |
| 1267 f.write('cpu1 1 0 0 0 0 0 0 0 0 0\n') | |
| 1268 | |
| 1269 self.assertNotEqual(psutil.cpu_percent(), 0) | |
| 1270 self.assertNotEqual( | |
| 1271 sum(psutil.cpu_percent(percpu=True)), 0) | |
| 1272 self.assertNotEqual(sum(psutil.cpu_times_percent()), 0) | |
| 1273 self.assertNotEqual( | |
| 1274 sum(map(sum, psutil.cpu_times_percent(percpu=True))), 0) | |
| 1275 finally: | |
| 1276 shutil.rmtree(my_procfs) | |
| 1277 reload_module(psutil) | |
| 1278 | |
| 1279 self.assertEqual(psutil.PROCFS_PATH, '/proc') | |
| 1280 | |
| 1281 def test_cpu_steal_decrease(self): | |
| 1282 # Test cumulative cpu stats decrease. We should ignore this. | |
| 1283 # See issue #1210. | |
| 1284 with mock_open_content( | |
| 1285 "/proc/stat", | |
| 1286 textwrap.dedent("""\ | |
| 1287 cpu 0 0 0 0 0 0 0 1 0 0 | |
| 1288 cpu0 0 0 0 0 0 0 0 1 0 0 | |
| 1289 cpu1 0 0 0 0 0 0 0 1 0 0 | |
| 1290 """).encode()) as m: | |
| 1291 # first call to "percent" functions should read the new stat file | |
| 1292 # and compare to the "real" file read at import time - so the | |
| 1293 # values are meaningless | |
| 1294 psutil.cpu_percent() | |
| 1295 assert m.called | |
| 1296 psutil.cpu_percent(percpu=True) | |
| 1297 psutil.cpu_times_percent() | |
| 1298 psutil.cpu_times_percent(percpu=True) | |
| 1299 | |
| 1300 with mock_open_content( | |
| 1301 "/proc/stat", | |
| 1302 textwrap.dedent("""\ | |
| 1303 cpu 1 0 0 0 0 0 0 0 0 0 | |
| 1304 cpu0 1 0 0 0 0 0 0 0 0 0 | |
| 1305 cpu1 1 0 0 0 0 0 0 0 0 0 | |
| 1306 """).encode()) as m: | |
| 1307 # Increase "user" while steal goes "backwards" to zero. | |
| 1308 cpu_percent = psutil.cpu_percent() | |
| 1309 assert m.called | |
| 1310 cpu_percent_percpu = psutil.cpu_percent(percpu=True) | |
| 1311 cpu_times_percent = psutil.cpu_times_percent() | |
| 1312 cpu_times_percent_percpu = psutil.cpu_times_percent(percpu=True) | |
| 1313 self.assertNotEqual(cpu_percent, 0) | |
| 1314 self.assertNotEqual(sum(cpu_percent_percpu), 0) | |
| 1315 self.assertNotEqual(sum(cpu_times_percent), 0) | |
| 1316 self.assertNotEqual(sum(cpu_times_percent), 100.0) | |
| 1317 self.assertNotEqual(sum(map(sum, cpu_times_percent_percpu)), 0) | |
| 1318 self.assertNotEqual(sum(map(sum, cpu_times_percent_percpu)), 100.0) | |
| 1319 self.assertEqual(cpu_times_percent.steal, 0) | |
| 1320 self.assertNotEqual(cpu_times_percent.user, 0) | |
| 1321 | |
| 1322 def test_boot_time_mocked(self): | |
| 1323 with mock.patch('psutil._common.open', create=True) as m: | |
| 1324 self.assertRaises( | |
| 1325 RuntimeError, | |
| 1326 psutil._pslinux.boot_time) | |
| 1327 assert m.called | |
| 1328 | |
| 1329 def test_users_mocked(self): | |
| 1330 # Make sure ':0' and ':0.0' (returned by C ext) are converted | |
| 1331 # to 'localhost'. | |
| 1332 with mock.patch('psutil._pslinux.cext.users', | |
| 1333 return_value=[('giampaolo', 'pts/2', ':0', | |
| 1334 1436573184.0, True, 2)]) as m: | |
| 1335 self.assertEqual(psutil.users()[0].host, 'localhost') | |
| 1336 assert m.called | |
| 1337 with mock.patch('psutil._pslinux.cext.users', | |
| 1338 return_value=[('giampaolo', 'pts/2', ':0.0', | |
| 1339 1436573184.0, True, 2)]) as m: | |
| 1340 self.assertEqual(psutil.users()[0].host, 'localhost') | |
| 1341 assert m.called | |
| 1342 # ...otherwise it should be returned as-is | |
| 1343 with mock.patch('psutil._pslinux.cext.users', | |
| 1344 return_value=[('giampaolo', 'pts/2', 'foo', | |
| 1345 1436573184.0, True, 2)]) as m: | |
| 1346 self.assertEqual(psutil.users()[0].host, 'foo') | |
| 1347 assert m.called | |
| 1348 | |
| 1349 def test_procfs_path(self): | |
| 1350 tdir = self.get_testfn() | |
| 1351 os.mkdir(tdir) | |
| 1352 try: | |
| 1353 psutil.PROCFS_PATH = tdir | |
| 1354 self.assertRaises(IOError, psutil.virtual_memory) | |
| 1355 self.assertRaises(IOError, psutil.cpu_times) | |
| 1356 self.assertRaises(IOError, psutil.cpu_times, percpu=True) | |
| 1357 self.assertRaises(IOError, psutil.boot_time) | |
| 1358 # self.assertRaises(IOError, psutil.pids) | |
| 1359 self.assertRaises(IOError, psutil.net_connections) | |
| 1360 self.assertRaises(IOError, psutil.net_io_counters) | |
| 1361 self.assertRaises(IOError, psutil.net_if_stats) | |
| 1362 # self.assertRaises(IOError, psutil.disk_io_counters) | |
| 1363 self.assertRaises(IOError, psutil.disk_partitions) | |
| 1364 self.assertRaises(psutil.NoSuchProcess, psutil.Process) | |
| 1365 finally: | |
| 1366 psutil.PROCFS_PATH = "/proc" | |
| 1367 | |
| 1368 @retry_on_failure() | |
| 1369 def test_issue_687(self): | |
| 1370 # In case of thread ID: | |
| 1371 # - pid_exists() is supposed to return False | |
| 1372 # - Process(tid) is supposed to work | |
| 1373 # - pids() should not return the TID | |
| 1374 # See: https://github.com/giampaolo/psutil/issues/687 | |
| 1375 t = ThreadTask() | |
| 1376 t.start() | |
| 1377 try: | |
| 1378 p = psutil.Process() | |
| 1379 threads = p.threads() | |
| 1380 self.assertEqual(len(threads), 2) | |
| 1381 tid = sorted(threads, key=lambda x: x.id)[1].id | |
| 1382 self.assertNotEqual(p.pid, tid) | |
| 1383 pt = psutil.Process(tid) | |
| 1384 pt.as_dict() | |
| 1385 self.assertNotIn(tid, psutil.pids()) | |
| 1386 finally: | |
| 1387 t.stop() | |
| 1388 | |
| 1389 def test_pid_exists_no_proc_status(self): | |
| 1390 # Internally pid_exists relies on /proc/{pid}/status. | |
| 1391 # Emulate a case where this file is empty in which case | |
| 1392 # psutil is supposed to fall back on using pids(). | |
| 1393 with mock_open_content("/proc/%s/status", "") as m: | |
| 1394 assert psutil.pid_exists(os.getpid()) | |
| 1395 assert m.called | |
| 1396 | |
| 1397 | |
| 1398 # ===================================================================== | |
| 1399 # --- sensors | |
| 1400 # ===================================================================== | |
| 1401 | |
| 1402 | |
| 1403 @unittest.skipIf(not LINUX, "LINUX only") | |
| 1404 @unittest.skipIf(not HAS_BATTERY, "no battery") | |
| 1405 class TestSensorsBattery(PsutilTestCase): | |
| 1406 | |
| 1407 @unittest.skipIf(not which("acpi"), "acpi utility not available") | |
| 1408 def test_percent(self): | |
| 1409 out = sh("acpi -b") | |
| 1410 acpi_value = int(out.split(",")[1].strip().replace('%', '')) | |
| 1411 psutil_value = psutil.sensors_battery().percent | |
| 1412 self.assertAlmostEqual(acpi_value, psutil_value, delta=1) | |
| 1413 | |
| 1414 @unittest.skipIf(not which("acpi"), "acpi utility not available") | |
| 1415 def test_power_plugged(self): | |
| 1416 out = sh("acpi -b") | |
| 1417 if 'unknown' in out.lower(): | |
| 1418 return unittest.skip("acpi output not reliable") | |
| 1419 if 'discharging at zero rate' in out: | |
| 1420 plugged = True | |
| 1421 else: | |
| 1422 plugged = "Charging" in out.split('\n')[0] | |
| 1423 self.assertEqual(psutil.sensors_battery().power_plugged, plugged) | |
| 1424 | |
| 1425 def test_emulate_power_plugged(self): | |
| 1426 # Pretend the AC power cable is connected. | |
| 1427 def open_mock(name, *args, **kwargs): | |
| 1428 if name.endswith("AC0/online") or name.endswith("AC/online"): | |
| 1429 return io.BytesIO(b"1") | |
| 1430 else: | |
| 1431 return orig_open(name, *args, **kwargs) | |
| 1432 | |
| 1433 orig_open = open | |
| 1434 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
| 1435 with mock.patch(patch_point, side_effect=open_mock) as m: | |
| 1436 self.assertEqual(psutil.sensors_battery().power_plugged, True) | |
| 1437 self.assertEqual( | |
| 1438 psutil.sensors_battery().secsleft, psutil.POWER_TIME_UNLIMITED) | |
| 1439 assert m.called | |
| 1440 | |
| 1441 def test_emulate_power_plugged_2(self): | |
| 1442 # Same as above but pretend /AC0/online does not exist in which | |
| 1443 # case code relies on /status file. | |
| 1444 def open_mock(name, *args, **kwargs): | |
| 1445 if name.endswith("AC0/online") or name.endswith("AC/online"): | |
| 1446 raise IOError(errno.ENOENT, "") | |
| 1447 elif name.endswith("/status"): | |
| 1448 return io.StringIO(u("charging")) | |
| 1449 else: | |
| 1450 return orig_open(name, *args, **kwargs) | |
| 1451 | |
| 1452 orig_open = open | |
| 1453 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
| 1454 with mock.patch(patch_point, side_effect=open_mock) as m: | |
| 1455 self.assertEqual(psutil.sensors_battery().power_plugged, True) | |
| 1456 assert m.called | |
| 1457 | |
| 1458 def test_emulate_power_not_plugged(self): | |
| 1459 # Pretend the AC power cable is not connected. | |
| 1460 def open_mock(name, *args, **kwargs): | |
| 1461 if name.endswith("AC0/online") or name.endswith("AC/online"): | |
| 1462 return io.BytesIO(b"0") | |
| 1463 else: | |
| 1464 return orig_open(name, *args, **kwargs) | |
| 1465 | |
| 1466 orig_open = open | |
| 1467 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
| 1468 with mock.patch(patch_point, side_effect=open_mock) as m: | |
| 1469 self.assertEqual(psutil.sensors_battery().power_plugged, False) | |
| 1470 assert m.called | |
| 1471 | |
| 1472 def test_emulate_power_not_plugged_2(self): | |
| 1473 # Same as above but pretend /AC0/online does not exist in which | |
| 1474 # case code relies on /status file. | |
| 1475 def open_mock(name, *args, **kwargs): | |
| 1476 if name.endswith("AC0/online") or name.endswith("AC/online"): | |
| 1477 raise IOError(errno.ENOENT, "") | |
| 1478 elif name.endswith("/status"): | |
| 1479 return io.StringIO(u("discharging")) | |
| 1480 else: | |
| 1481 return orig_open(name, *args, **kwargs) | |
| 1482 | |
| 1483 orig_open = open | |
| 1484 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
| 1485 with mock.patch(patch_point, side_effect=open_mock) as m: | |
| 1486 self.assertEqual(psutil.sensors_battery().power_plugged, False) | |
| 1487 assert m.called | |
| 1488 | |
| 1489 def test_emulate_power_undetermined(self): | |
| 1490 # Pretend we can't know whether the AC power cable not | |
| 1491 # connected (assert fallback to False). | |
| 1492 def open_mock(name, *args, **kwargs): | |
| 1493 if name.startswith("/sys/class/power_supply/AC0/online") or \ | |
| 1494 name.startswith("/sys/class/power_supply/AC/online"): | |
| 1495 raise IOError(errno.ENOENT, "") | |
| 1496 elif name.startswith("/sys/class/power_supply/BAT0/status"): | |
| 1497 return io.BytesIO(b"???") | |
| 1498 else: | |
| 1499 return orig_open(name, *args, **kwargs) | |
| 1500 | |
| 1501 orig_open = open | |
| 1502 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
| 1503 with mock.patch(patch_point, side_effect=open_mock) as m: | |
| 1504 self.assertIsNone(psutil.sensors_battery().power_plugged) | |
| 1505 assert m.called | |
| 1506 | |
| 1507 def test_emulate_no_base_files(self): | |
| 1508 # Emulate a case where base metrics files are not present, | |
| 1509 # in which case we're supposed to get None. | |
| 1510 with mock_open_exception( | |
| 1511 "/sys/class/power_supply/BAT0/energy_now", | |
| 1512 IOError(errno.ENOENT, "")): | |
| 1513 with mock_open_exception( | |
| 1514 "/sys/class/power_supply/BAT0/charge_now", | |
| 1515 IOError(errno.ENOENT, "")): | |
| 1516 self.assertIsNone(psutil.sensors_battery()) | |
| 1517 | |
| 1518 def test_emulate_energy_full_0(self): | |
| 1519 # Emulate a case where energy_full files returns 0. | |
| 1520 with mock_open_content( | |
| 1521 "/sys/class/power_supply/BAT0/energy_full", b"0") as m: | |
| 1522 self.assertEqual(psutil.sensors_battery().percent, 0) | |
| 1523 assert m.called | |
| 1524 | |
| 1525 def test_emulate_energy_full_not_avail(self): | |
| 1526 # Emulate a case where energy_full file does not exist. | |
| 1527 # Expected fallback on /capacity. | |
| 1528 with mock_open_exception( | |
| 1529 "/sys/class/power_supply/BAT0/energy_full", | |
| 1530 IOError(errno.ENOENT, "")): | |
| 1531 with mock_open_exception( | |
| 1532 "/sys/class/power_supply/BAT0/charge_full", | |
| 1533 IOError(errno.ENOENT, "")): | |
| 1534 with mock_open_content( | |
| 1535 "/sys/class/power_supply/BAT0/capacity", b"88"): | |
| 1536 self.assertEqual(psutil.sensors_battery().percent, 88) | |
| 1537 | |
| 1538 def test_emulate_no_power(self): | |
| 1539 # Emulate a case where /AC0/online file nor /BAT0/status exist. | |
| 1540 with mock_open_exception( | |
| 1541 "/sys/class/power_supply/AC/online", | |
| 1542 IOError(errno.ENOENT, "")): | |
| 1543 with mock_open_exception( | |
| 1544 "/sys/class/power_supply/AC0/online", | |
| 1545 IOError(errno.ENOENT, "")): | |
| 1546 with mock_open_exception( | |
| 1547 "/sys/class/power_supply/BAT0/status", | |
| 1548 IOError(errno.ENOENT, "")): | |
| 1549 self.assertIsNone(psutil.sensors_battery().power_plugged) | |
| 1550 | |
| 1551 | |
| 1552 @unittest.skipIf(not LINUX, "LINUX only") | |
| 1553 class TestSensorsTemperatures(PsutilTestCase): | |
| 1554 | |
| 1555 def test_emulate_class_hwmon(self): | |
| 1556 def open_mock(name, *args, **kwargs): | |
| 1557 if name.endswith('/name'): | |
| 1558 return io.StringIO(u("name")) | |
| 1559 elif name.endswith('/temp1_label'): | |
| 1560 return io.StringIO(u("label")) | |
| 1561 elif name.endswith('/temp1_input'): | |
| 1562 return io.BytesIO(b"30000") | |
| 1563 elif name.endswith('/temp1_max'): | |
| 1564 return io.BytesIO(b"40000") | |
| 1565 elif name.endswith('/temp1_crit'): | |
| 1566 return io.BytesIO(b"50000") | |
| 1567 else: | |
| 1568 return orig_open(name, *args, **kwargs) | |
| 1569 | |
| 1570 orig_open = open | |
| 1571 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
| 1572 with mock.patch(patch_point, side_effect=open_mock): | |
| 1573 # Test case with /sys/class/hwmon | |
| 1574 with mock.patch('glob.glob', | |
| 1575 return_value=['/sys/class/hwmon/hwmon0/temp1']): | |
| 1576 temp = psutil.sensors_temperatures()['name'][0] | |
| 1577 self.assertEqual(temp.label, 'label') | |
| 1578 self.assertEqual(temp.current, 30.0) | |
| 1579 self.assertEqual(temp.high, 40.0) | |
| 1580 self.assertEqual(temp.critical, 50.0) | |
| 1581 | |
| 1582 def test_emulate_class_thermal(self): | |
| 1583 def open_mock(name, *args, **kwargs): | |
| 1584 if name.endswith('0_temp'): | |
| 1585 return io.BytesIO(b"50000") | |
| 1586 elif name.endswith('temp'): | |
| 1587 return io.BytesIO(b"30000") | |
| 1588 elif name.endswith('0_type'): | |
| 1589 return io.StringIO(u("critical")) | |
| 1590 elif name.endswith('type'): | |
| 1591 return io.StringIO(u("name")) | |
| 1592 else: | |
| 1593 return orig_open(name, *args, **kwargs) | |
| 1594 | |
| 1595 def glob_mock(path): | |
| 1596 if path == '/sys/class/hwmon/hwmon*/temp*_*': | |
| 1597 return [] | |
| 1598 elif path == '/sys/class/hwmon/hwmon*/device/temp*_*': | |
| 1599 return [] | |
| 1600 elif path == '/sys/class/thermal/thermal_zone*': | |
| 1601 return ['/sys/class/thermal/thermal_zone0'] | |
| 1602 elif path == '/sys/class/thermal/thermal_zone0/trip_point*': | |
| 1603 return ['/sys/class/thermal/thermal_zone1/trip_point_0_type', | |
| 1604 '/sys/class/thermal/thermal_zone1/trip_point_0_temp'] | |
| 1605 return [] | |
| 1606 | |
| 1607 orig_open = open | |
| 1608 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
| 1609 with mock.patch(patch_point, side_effect=open_mock): | |
| 1610 with mock.patch('glob.glob', create=True, side_effect=glob_mock): | |
| 1611 temp = psutil.sensors_temperatures()['name'][0] | |
| 1612 self.assertEqual(temp.label, '') | |
| 1613 self.assertEqual(temp.current, 30.0) | |
| 1614 self.assertEqual(temp.high, 50.0) | |
| 1615 self.assertEqual(temp.critical, 50.0) | |
| 1616 | |
| 1617 | |
| 1618 @unittest.skipIf(not LINUX, "LINUX only") | |
| 1619 class TestSensorsFans(PsutilTestCase): | |
| 1620 | |
| 1621 def test_emulate_data(self): | |
| 1622 def open_mock(name, *args, **kwargs): | |
| 1623 if name.endswith('/name'): | |
| 1624 return io.StringIO(u("name")) | |
| 1625 elif name.endswith('/fan1_label'): | |
| 1626 return io.StringIO(u("label")) | |
| 1627 elif name.endswith('/fan1_input'): | |
| 1628 return io.StringIO(u("2000")) | |
| 1629 else: | |
| 1630 return orig_open(name, *args, **kwargs) | |
| 1631 | |
| 1632 orig_open = open | |
| 1633 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
| 1634 with mock.patch(patch_point, side_effect=open_mock): | |
| 1635 with mock.patch('glob.glob', | |
| 1636 return_value=['/sys/class/hwmon/hwmon2/fan1']): | |
| 1637 fan = psutil.sensors_fans()['name'][0] | |
| 1638 self.assertEqual(fan.label, 'label') | |
| 1639 self.assertEqual(fan.current, 2000) | |
| 1640 | |
| 1641 | |
| 1642 # ===================================================================== | |
| 1643 # --- test process | |
| 1644 # ===================================================================== | |
| 1645 | |
| 1646 | |
| 1647 @unittest.skipIf(not LINUX, "LINUX only") | |
| 1648 class TestProcess(PsutilTestCase): | |
| 1649 | |
| 1650 @retry_on_failure() | |
| 1651 def test_memory_full_info(self): | |
| 1652 testfn = self.get_testfn() | |
| 1653 src = textwrap.dedent(""" | |
| 1654 import time | |
| 1655 with open("%s", "w") as f: | |
| 1656 time.sleep(10) | |
| 1657 """ % testfn) | |
| 1658 sproc = self.pyrun(src) | |
| 1659 call_until(lambda: os.listdir('.'), "'%s' not in ret" % testfn) | |
| 1660 p = psutil.Process(sproc.pid) | |
| 1661 time.sleep(.1) | |
| 1662 mem = p.memory_full_info() | |
| 1663 maps = p.memory_maps(grouped=False) | |
| 1664 self.assertAlmostEqual( | |
| 1665 mem.uss, sum([x.private_dirty + x.private_clean for x in maps]), | |
| 1666 delta=4096) | |
| 1667 self.assertAlmostEqual( | |
| 1668 mem.pss, sum([x.pss for x in maps]), delta=4096) | |
| 1669 self.assertAlmostEqual( | |
| 1670 mem.swap, sum([x.swap for x in maps]), delta=4096) | |
| 1671 | |
| 1672 def test_memory_full_info_mocked(self): | |
| 1673 # See: https://github.com/giampaolo/psutil/issues/1222 | |
| 1674 with mock_open_content( | |
| 1675 "/proc/%s/smaps" % os.getpid(), | |
| 1676 textwrap.dedent("""\ | |
| 1677 fffff0 r-xp 00000000 00:00 0 [vsyscall] | |
| 1678 Size: 1 kB | |
| 1679 Rss: 2 kB | |
| 1680 Pss: 3 kB | |
| 1681 Shared_Clean: 4 kB | |
| 1682 Shared_Dirty: 5 kB | |
| 1683 Private_Clean: 6 kB | |
| 1684 Private_Dirty: 7 kB | |
| 1685 Referenced: 8 kB | |
| 1686 Anonymous: 9 kB | |
| 1687 LazyFree: 10 kB | |
| 1688 AnonHugePages: 11 kB | |
| 1689 ShmemPmdMapped: 12 kB | |
| 1690 Shared_Hugetlb: 13 kB | |
| 1691 Private_Hugetlb: 14 kB | |
| 1692 Swap: 15 kB | |
| 1693 SwapPss: 16 kB | |
| 1694 KernelPageSize: 17 kB | |
| 1695 MMUPageSize: 18 kB | |
| 1696 Locked: 19 kB | |
| 1697 VmFlags: rd ex | |
| 1698 """).encode()) as m: | |
| 1699 p = psutil.Process() | |
| 1700 mem = p.memory_full_info() | |
| 1701 assert m.called | |
| 1702 self.assertEqual(mem.uss, (6 + 7 + 14) * 1024) | |
| 1703 self.assertEqual(mem.pss, 3 * 1024) | |
| 1704 self.assertEqual(mem.swap, 15 * 1024) | |
| 1705 | |
| 1706 # On PYPY file descriptors are not closed fast enough. | |
| 1707 @unittest.skipIf(PYPY, "unreliable on PYPY") | |
| 1708 def test_open_files_mode(self): | |
| 1709 def get_test_file(fname): | |
| 1710 p = psutil.Process() | |
| 1711 giveup_at = time.time() + GLOBAL_TIMEOUT | |
| 1712 while True: | |
| 1713 for file in p.open_files(): | |
| 1714 if file.path == os.path.abspath(fname): | |
| 1715 return file | |
| 1716 elif time.time() > giveup_at: | |
| 1717 break | |
| 1718 raise RuntimeError("timeout looking for test file") | |
| 1719 | |
| 1720 # | |
| 1721 testfn = self.get_testfn() | |
| 1722 with open(testfn, "w"): | |
| 1723 self.assertEqual(get_test_file(testfn).mode, "w") | |
| 1724 with open(testfn, "r"): | |
| 1725 self.assertEqual(get_test_file(testfn).mode, "r") | |
| 1726 with open(testfn, "a"): | |
| 1727 self.assertEqual(get_test_file(testfn).mode, "a") | |
| 1728 # | |
| 1729 with open(testfn, "r+"): | |
| 1730 self.assertEqual(get_test_file(testfn).mode, "r+") | |
| 1731 with open(testfn, "w+"): | |
| 1732 self.assertEqual(get_test_file(testfn).mode, "r+") | |
| 1733 with open(testfn, "a+"): | |
| 1734 self.assertEqual(get_test_file(testfn).mode, "a+") | |
| 1735 # note: "x" bit is not supported | |
| 1736 if PY3: | |
| 1737 safe_rmpath(testfn) | |
| 1738 with open(testfn, "x"): | |
| 1739 self.assertEqual(get_test_file(testfn).mode, "w") | |
| 1740 safe_rmpath(testfn) | |
| 1741 with open(testfn, "x+"): | |
| 1742 self.assertEqual(get_test_file(testfn).mode, "r+") | |
| 1743 | |
| 1744 def test_open_files_file_gone(self): | |
| 1745 # simulates a file which gets deleted during open_files() | |
| 1746 # execution | |
| 1747 p = psutil.Process() | |
| 1748 files = p.open_files() | |
| 1749 with open(self.get_testfn(), 'w'): | |
| 1750 # give the kernel some time to see the new file | |
| 1751 call_until(p.open_files, "len(ret) != %i" % len(files)) | |
| 1752 with mock.patch('psutil._pslinux.os.readlink', | |
| 1753 side_effect=OSError(errno.ENOENT, "")) as m: | |
| 1754 files = p.open_files() | |
| 1755 assert not files | |
| 1756 assert m.called | |
| 1757 # also simulate the case where os.readlink() returns EINVAL | |
| 1758 # in which case psutil is supposed to 'continue' | |
| 1759 with mock.patch('psutil._pslinux.os.readlink', | |
| 1760 side_effect=OSError(errno.EINVAL, "")) as m: | |
| 1761 self.assertEqual(p.open_files(), []) | |
| 1762 assert m.called | |
| 1763 | |
| 1764 def test_open_files_fd_gone(self): | |
| 1765 # Simulate a case where /proc/{pid}/fdinfo/{fd} disappears | |
| 1766 # while iterating through fds. | |
| 1767 # https://travis-ci.org/giampaolo/psutil/jobs/225694530 | |
| 1768 p = psutil.Process() | |
| 1769 files = p.open_files() | |
| 1770 with open(self.get_testfn(), 'w'): | |
| 1771 # give the kernel some time to see the new file | |
| 1772 call_until(p.open_files, "len(ret) != %i" % len(files)) | |
| 1773 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
| 1774 with mock.patch(patch_point, | |
| 1775 side_effect=IOError(errno.ENOENT, "")) as m: | |
| 1776 files = p.open_files() | |
| 1777 assert not files | |
| 1778 assert m.called | |
| 1779 | |
| 1780 # --- mocked tests | |
| 1781 | |
| 1782 def test_terminal_mocked(self): | |
| 1783 with mock.patch('psutil._pslinux._psposix.get_terminal_map', | |
| 1784 return_value={}) as m: | |
| 1785 self.assertIsNone(psutil._pslinux.Process(os.getpid()).terminal()) | |
| 1786 assert m.called | |
| 1787 | |
| 1788 # TODO: re-enable this test. | |
| 1789 # def test_num_ctx_switches_mocked(self): | |
| 1790 # with mock.patch('psutil._common.open', create=True) as m: | |
| 1791 # self.assertRaises( | |
| 1792 # NotImplementedError, | |
| 1793 # psutil._pslinux.Process(os.getpid()).num_ctx_switches) | |
| 1794 # assert m.called | |
| 1795 | |
| 1796 def test_cmdline_mocked(self): | |
| 1797 # see: https://github.com/giampaolo/psutil/issues/639 | |
| 1798 p = psutil.Process() | |
| 1799 fake_file = io.StringIO(u('foo\x00bar\x00')) | |
| 1800 with mock.patch('psutil._common.open', | |
| 1801 return_value=fake_file, create=True) as m: | |
| 1802 self.assertEqual(p.cmdline(), ['foo', 'bar']) | |
| 1803 assert m.called | |
| 1804 fake_file = io.StringIO(u('foo\x00bar\x00\x00')) | |
| 1805 with mock.patch('psutil._common.open', | |
| 1806 return_value=fake_file, create=True) as m: | |
| 1807 self.assertEqual(p.cmdline(), ['foo', 'bar', '']) | |
| 1808 assert m.called | |
| 1809 | |
| 1810 def test_cmdline_spaces_mocked(self): | |
| 1811 # see: https://github.com/giampaolo/psutil/issues/1179 | |
| 1812 p = psutil.Process() | |
| 1813 fake_file = io.StringIO(u('foo bar ')) | |
| 1814 with mock.patch('psutil._common.open', | |
| 1815 return_value=fake_file, create=True) as m: | |
| 1816 self.assertEqual(p.cmdline(), ['foo', 'bar']) | |
| 1817 assert m.called | |
| 1818 fake_file = io.StringIO(u('foo bar ')) | |
| 1819 with mock.patch('psutil._common.open', | |
| 1820 return_value=fake_file, create=True) as m: | |
| 1821 self.assertEqual(p.cmdline(), ['foo', 'bar', '']) | |
| 1822 assert m.called | |
| 1823 | |
| 1824 def test_cmdline_mixed_separators(self): | |
| 1825 # https://github.com/giampaolo/psutil/issues/ | |
| 1826 # 1179#issuecomment-552984549 | |
| 1827 p = psutil.Process() | |
| 1828 fake_file = io.StringIO(u('foo\x20bar\x00')) | |
| 1829 with mock.patch('psutil._common.open', | |
| 1830 return_value=fake_file, create=True) as m: | |
| 1831 self.assertEqual(p.cmdline(), ['foo', 'bar']) | |
| 1832 assert m.called | |
| 1833 | |
| 1834 def test_readlink_path_deleted_mocked(self): | |
| 1835 with mock.patch('psutil._pslinux.os.readlink', | |
| 1836 return_value='/home/foo (deleted)'): | |
| 1837 self.assertEqual(psutil.Process().exe(), "/home/foo") | |
| 1838 self.assertEqual(psutil.Process().cwd(), "/home/foo") | |
| 1839 | |
| 1840 def test_threads_mocked(self): | |
| 1841 # Test the case where os.listdir() returns a file (thread) | |
| 1842 # which no longer exists by the time we open() it (race | |
| 1843 # condition). threads() is supposed to ignore that instead | |
| 1844 # of raising NSP. | |
| 1845 def open_mock(name, *args, **kwargs): | |
| 1846 if name.startswith('/proc/%s/task' % os.getpid()): | |
| 1847 raise IOError(errno.ENOENT, "") | |
| 1848 else: | |
| 1849 return orig_open(name, *args, **kwargs) | |
| 1850 | |
| 1851 orig_open = open | |
| 1852 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
| 1853 with mock.patch(patch_point, side_effect=open_mock) as m: | |
| 1854 ret = psutil.Process().threads() | |
| 1855 assert m.called | |
| 1856 self.assertEqual(ret, []) | |
| 1857 | |
| 1858 # ...but if it bumps into something != ENOENT we want an | |
| 1859 # exception. | |
| 1860 def open_mock(name, *args, **kwargs): | |
| 1861 if name.startswith('/proc/%s/task' % os.getpid()): | |
| 1862 raise IOError(errno.EPERM, "") | |
| 1863 else: | |
| 1864 return orig_open(name, *args, **kwargs) | |
| 1865 | |
| 1866 with mock.patch(patch_point, side_effect=open_mock): | |
| 1867 self.assertRaises(psutil.AccessDenied, psutil.Process().threads) | |
| 1868 | |
| 1869 def test_exe_mocked(self): | |
| 1870 with mock.patch('psutil._pslinux.readlink', | |
| 1871 side_effect=OSError(errno.ENOENT, "")) as m1: | |
| 1872 with mock.patch('psutil.Process.cmdline', | |
| 1873 side_effect=psutil.AccessDenied(0, "")) as m2: | |
| 1874 # No such file error; might be raised also if /proc/pid/exe | |
| 1875 # path actually exists for system processes with low pids | |
| 1876 # (about 0-20). In this case psutil is supposed to return | |
| 1877 # an empty string. | |
| 1878 ret = psutil.Process().exe() | |
| 1879 assert m1.called | |
| 1880 assert m2.called | |
| 1881 self.assertEqual(ret, "") | |
| 1882 | |
| 1883 # ...but if /proc/pid no longer exist we're supposed to treat | |
| 1884 # it as an alias for zombie process | |
| 1885 with mock.patch('psutil._pslinux.os.path.lexists', | |
| 1886 return_value=False): | |
| 1887 self.assertRaises( | |
| 1888 psutil.ZombieProcess, psutil.Process().exe) | |
| 1889 | |
| 1890 def test_issue_1014(self): | |
| 1891 # Emulates a case where smaps file does not exist. In this case | |
| 1892 # wrap_exception decorator should not raise NoSuchProcess. | |
| 1893 with mock_open_exception( | |
| 1894 '/proc/%s/smaps' % os.getpid(), | |
| 1895 IOError(errno.ENOENT, "")) as m: | |
| 1896 p = psutil.Process() | |
| 1897 with self.assertRaises(FileNotFoundError): | |
| 1898 p.memory_maps() | |
| 1899 assert m.called | |
| 1900 | |
| 1901 @unittest.skipIf(not HAS_RLIMIT, "not supported") | |
| 1902 def test_rlimit_zombie(self): | |
| 1903 # Emulate a case where rlimit() raises ENOSYS, which may | |
| 1904 # happen in case of zombie process: | |
| 1905 # https://travis-ci.org/giampaolo/psutil/jobs/51368273 | |
| 1906 with mock.patch("psutil._pslinux.cext.linux_prlimit", | |
| 1907 side_effect=OSError(errno.ENOSYS, "")) as m: | |
| 1908 p = psutil.Process() | |
| 1909 p.name() | |
| 1910 with self.assertRaises(psutil.ZombieProcess) as exc: | |
| 1911 p.rlimit(psutil.RLIMIT_NOFILE) | |
| 1912 assert m.called | |
| 1913 self.assertEqual(exc.exception.pid, p.pid) | |
| 1914 self.assertEqual(exc.exception.name, p.name()) | |
| 1915 | |
| 1916 def test_cwd_zombie(self): | |
| 1917 with mock.patch("psutil._pslinux.os.readlink", | |
| 1918 side_effect=OSError(errno.ENOENT, "")) as m: | |
| 1919 p = psutil.Process() | |
| 1920 p.name() | |
| 1921 with self.assertRaises(psutil.ZombieProcess) as exc: | |
| 1922 p.cwd() | |
| 1923 assert m.called | |
| 1924 self.assertEqual(exc.exception.pid, p.pid) | |
| 1925 self.assertEqual(exc.exception.name, p.name()) | |
| 1926 | |
| 1927 def test_stat_file_parsing(self): | |
| 1928 from psutil._pslinux import CLOCK_TICKS | |
| 1929 | |
| 1930 args = [ | |
| 1931 "0", # pid | |
| 1932 "(cat)", # name | |
| 1933 "Z", # status | |
| 1934 "1", # ppid | |
| 1935 "0", # pgrp | |
| 1936 "0", # session | |
| 1937 "0", # tty | |
| 1938 "0", # tpgid | |
| 1939 "0", # flags | |
| 1940 "0", # minflt | |
| 1941 "0", # cminflt | |
| 1942 "0", # majflt | |
| 1943 "0", # cmajflt | |
| 1944 "2", # utime | |
| 1945 "3", # stime | |
| 1946 "4", # cutime | |
| 1947 "5", # cstime | |
| 1948 "0", # priority | |
| 1949 "0", # nice | |
| 1950 "0", # num_threads | |
| 1951 "0", # itrealvalue | |
| 1952 "6", # starttime | |
| 1953 "0", # vsize | |
| 1954 "0", # rss | |
| 1955 "0", # rsslim | |
| 1956 "0", # startcode | |
| 1957 "0", # endcode | |
| 1958 "0", # startstack | |
| 1959 "0", # kstkesp | |
| 1960 "0", # kstkeip | |
| 1961 "0", # signal | |
| 1962 "0", # blocked | |
| 1963 "0", # sigignore | |
| 1964 "0", # sigcatch | |
| 1965 "0", # wchan | |
| 1966 "0", # nswap | |
| 1967 "0", # cnswap | |
| 1968 "0", # exit_signal | |
| 1969 "6", # processor | |
| 1970 "0", # rt priority | |
| 1971 "0", # policy | |
| 1972 "7", # delayacct_blkio_ticks | |
| 1973 ] | |
| 1974 content = " ".join(args).encode() | |
| 1975 with mock_open_content('/proc/%s/stat' % os.getpid(), content): | |
| 1976 p = psutil.Process() | |
| 1977 self.assertEqual(p.name(), 'cat') | |
| 1978 self.assertEqual(p.status(), psutil.STATUS_ZOMBIE) | |
| 1979 self.assertEqual(p.ppid(), 1) | |
| 1980 self.assertEqual( | |
| 1981 p.create_time(), 6 / CLOCK_TICKS + psutil.boot_time()) | |
| 1982 cpu = p.cpu_times() | |
| 1983 self.assertEqual(cpu.user, 2 / CLOCK_TICKS) | |
| 1984 self.assertEqual(cpu.system, 3 / CLOCK_TICKS) | |
| 1985 self.assertEqual(cpu.children_user, 4 / CLOCK_TICKS) | |
| 1986 self.assertEqual(cpu.children_system, 5 / CLOCK_TICKS) | |
| 1987 self.assertEqual(cpu.iowait, 7 / CLOCK_TICKS) | |
| 1988 self.assertEqual(p.cpu_num(), 6) | |
| 1989 | |
| 1990 def test_status_file_parsing(self): | |
| 1991 with mock_open_content( | |
| 1992 '/proc/%s/status' % os.getpid(), | |
| 1993 textwrap.dedent("""\ | |
| 1994 Uid:\t1000\t1001\t1002\t1003 | |
| 1995 Gid:\t1004\t1005\t1006\t1007 | |
| 1996 Threads:\t66 | |
| 1997 Cpus_allowed:\tf | |
| 1998 Cpus_allowed_list:\t0-7 | |
| 1999 voluntary_ctxt_switches:\t12 | |
| 2000 nonvoluntary_ctxt_switches:\t13""").encode()): | |
| 2001 p = psutil.Process() | |
| 2002 self.assertEqual(p.num_ctx_switches().voluntary, 12) | |
| 2003 self.assertEqual(p.num_ctx_switches().involuntary, 13) | |
| 2004 self.assertEqual(p.num_threads(), 66) | |
| 2005 uids = p.uids() | |
| 2006 self.assertEqual(uids.real, 1000) | |
| 2007 self.assertEqual(uids.effective, 1001) | |
| 2008 self.assertEqual(uids.saved, 1002) | |
| 2009 gids = p.gids() | |
| 2010 self.assertEqual(gids.real, 1004) | |
| 2011 self.assertEqual(gids.effective, 1005) | |
| 2012 self.assertEqual(gids.saved, 1006) | |
| 2013 self.assertEqual(p._proc._get_eligible_cpus(), list(range(0, 8))) | |
| 2014 | |
| 2015 | |
| 2016 @unittest.skipIf(not LINUX, "LINUX only") | |
| 2017 class TestProcessAgainstStatus(PsutilTestCase): | |
| 2018 """/proc/pid/stat and /proc/pid/status have many values in common. | |
| 2019 Whenever possible, psutil uses /proc/pid/stat (it's faster). | |
| 2020 For all those cases we check that the value found in | |
| 2021 /proc/pid/stat (by psutil) matches the one found in | |
| 2022 /proc/pid/status. | |
| 2023 """ | |
| 2024 | |
| 2025 @classmethod | |
| 2026 def setUpClass(cls): | |
| 2027 cls.proc = psutil.Process() | |
| 2028 | |
| 2029 def read_status_file(self, linestart): | |
| 2030 with psutil._psplatform.open_text( | |
| 2031 '/proc/%s/status' % self.proc.pid) as f: | |
| 2032 for line in f: | |
| 2033 line = line.strip() | |
| 2034 if line.startswith(linestart): | |
| 2035 value = line.partition('\t')[2] | |
| 2036 try: | |
| 2037 return int(value) | |
| 2038 except ValueError: | |
| 2039 return value | |
| 2040 raise ValueError("can't find %r" % linestart) | |
| 2041 | |
| 2042 def test_name(self): | |
| 2043 value = self.read_status_file("Name:") | |
| 2044 self.assertEqual(self.proc.name(), value) | |
| 2045 | |
| 2046 def test_status(self): | |
| 2047 value = self.read_status_file("State:") | |
| 2048 value = value[value.find('(') + 1:value.rfind(')')] | |
| 2049 value = value.replace(' ', '-') | |
| 2050 self.assertEqual(self.proc.status(), value) | |
| 2051 | |
| 2052 def test_ppid(self): | |
| 2053 value = self.read_status_file("PPid:") | |
| 2054 self.assertEqual(self.proc.ppid(), value) | |
| 2055 | |
| 2056 def test_num_threads(self): | |
| 2057 value = self.read_status_file("Threads:") | |
| 2058 self.assertEqual(self.proc.num_threads(), value) | |
| 2059 | |
| 2060 def test_uids(self): | |
| 2061 value = self.read_status_file("Uid:") | |
| 2062 value = tuple(map(int, value.split()[1:4])) | |
| 2063 self.assertEqual(self.proc.uids(), value) | |
| 2064 | |
| 2065 def test_gids(self): | |
| 2066 value = self.read_status_file("Gid:") | |
| 2067 value = tuple(map(int, value.split()[1:4])) | |
| 2068 self.assertEqual(self.proc.gids(), value) | |
| 2069 | |
| 2070 @retry_on_failure() | |
| 2071 def test_num_ctx_switches(self): | |
| 2072 value = self.read_status_file("voluntary_ctxt_switches:") | |
| 2073 self.assertEqual(self.proc.num_ctx_switches().voluntary, value) | |
| 2074 value = self.read_status_file("nonvoluntary_ctxt_switches:") | |
| 2075 self.assertEqual(self.proc.num_ctx_switches().involuntary, value) | |
| 2076 | |
| 2077 def test_cpu_affinity(self): | |
| 2078 value = self.read_status_file("Cpus_allowed_list:") | |
| 2079 if '-' in str(value): | |
| 2080 min_, max_ = map(int, value.split('-')) | |
| 2081 self.assertEqual( | |
| 2082 self.proc.cpu_affinity(), list(range(min_, max_ + 1))) | |
| 2083 | |
| 2084 def test_cpu_affinity_eligible_cpus(self): | |
| 2085 value = self.read_status_file("Cpus_allowed_list:") | |
| 2086 with mock.patch("psutil._pslinux.per_cpu_times") as m: | |
| 2087 self.proc._proc._get_eligible_cpus() | |
| 2088 if '-' in str(value): | |
| 2089 assert not m.called | |
| 2090 else: | |
| 2091 assert m.called | |
| 2092 | |
| 2093 | |
| 2094 # ===================================================================== | |
| 2095 # --- test utils | |
| 2096 # ===================================================================== | |
| 2097 | |
| 2098 | |
| 2099 @unittest.skipIf(not LINUX, "LINUX only") | |
| 2100 class TestUtils(PsutilTestCase): | |
| 2101 | |
| 2102 def test_readlink(self): | |
| 2103 with mock.patch("os.readlink", return_value="foo (deleted)") as m: | |
| 2104 self.assertEqual(psutil._psplatform.readlink("bar"), "foo") | |
| 2105 assert m.called | |
| 2106 | |
| 2107 def test_cat(self): | |
| 2108 testfn = self.get_testfn() | |
| 2109 with open(testfn, "wt") as f: | |
| 2110 f.write("foo ") | |
| 2111 self.assertEqual(psutil._psplatform.cat(testfn, binary=False), "foo") | |
| 2112 self.assertEqual(psutil._psplatform.cat(testfn, binary=True), b"foo") | |
| 2113 self.assertEqual( | |
| 2114 psutil._psplatform.cat(testfn + '??', fallback="bar"), "bar") | |
| 2115 | |
| 2116 | |
| 2117 if __name__ == '__main__': | |
| 2118 from psutil.tests.runner import run_from_name | |
| 2119 run_from_name(__file__) |
