Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/psutil/tests/test_linux.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
| author | shellac |
|---|---|
| date | Sat, 02 May 2020 07:14:21 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:26e78fe6e8c4 |
|---|---|
| 1 #!/usr/bin/env python3 | |
| 2 | |
| 3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. | |
| 4 # Use of this source code is governed by a BSD-style license that can be | |
| 5 # found in the LICENSE file. | |
| 6 | |
| 7 """Linux specific tests.""" | |
| 8 | |
| 9 from __future__ import division | |
| 10 import collections | |
| 11 import contextlib | |
| 12 import errno | |
| 13 import glob | |
| 14 import io | |
| 15 import os | |
| 16 import re | |
| 17 import shutil | |
| 18 import socket | |
| 19 import struct | |
| 20 import tempfile | |
| 21 import textwrap | |
| 22 import time | |
| 23 import warnings | |
| 24 | |
| 25 import psutil | |
| 26 from psutil import LINUX | |
| 27 from psutil._compat import basestring | |
| 28 from psutil._compat import FileNotFoundError | |
| 29 from psutil._compat import PY3 | |
| 30 from psutil._compat import u | |
| 31 from psutil.tests import call_until | |
| 32 from psutil.tests import HAS_BATTERY | |
| 33 from psutil.tests import HAS_CPU_FREQ | |
| 34 from psutil.tests import HAS_GETLOADAVG | |
| 35 from psutil.tests import HAS_RLIMIT | |
| 36 from psutil.tests import MEMORY_TOLERANCE | |
| 37 from psutil.tests import mock | |
| 38 from psutil.tests import PYPY | |
| 39 from psutil.tests import pyrun | |
| 40 from psutil.tests import reap_children | |
| 41 from psutil.tests import reload_module | |
| 42 from psutil.tests import retry_on_failure | |
| 43 from psutil.tests import safe_rmpath | |
| 44 from psutil.tests import sh | |
| 45 from psutil.tests import skip_on_not_implemented | |
| 46 from psutil.tests import TESTFN | |
| 47 from psutil.tests import ThreadTask | |
| 48 from psutil.tests import TRAVIS | |
| 49 from psutil.tests import unittest | |
| 50 from psutil.tests import which | |
| 51 | |
| 52 | |
| 53 HERE = os.path.abspath(os.path.dirname(__file__)) | |
| 54 SIOCGIFADDR = 0x8915 | |
| 55 SIOCGIFCONF = 0x8912 | |
| 56 SIOCGIFHWADDR = 0x8927 | |
| 57 if LINUX: | |
| 58 SECTOR_SIZE = 512 | |
| 59 EMPTY_TEMPERATURES = not glob.glob('/sys/class/hwmon/hwmon*') | |
| 60 | |
| 61 # ===================================================================== | |
| 62 # --- utils | |
| 63 # ===================================================================== | |
| 64 | |
| 65 | |
| 66 def get_ipv4_address(ifname): | |
| 67 import fcntl | |
| 68 ifname = ifname[:15] | |
| 69 if PY3: | |
| 70 ifname = bytes(ifname, 'ascii') | |
| 71 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| 72 with contextlib.closing(s): | |
| 73 return socket.inet_ntoa( | |
| 74 fcntl.ioctl(s.fileno(), | |
| 75 SIOCGIFADDR, | |
| 76 struct.pack('256s', ifname))[20:24]) | |
| 77 | |
| 78 | |
| 79 def get_mac_address(ifname): | |
| 80 import fcntl | |
| 81 ifname = ifname[:15] | |
| 82 if PY3: | |
| 83 ifname = bytes(ifname, 'ascii') | |
| 84 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| 85 with contextlib.closing(s): | |
| 86 info = fcntl.ioctl( | |
| 87 s.fileno(), SIOCGIFHWADDR, struct.pack('256s', ifname)) | |
| 88 if PY3: | |
| 89 def ord(x): | |
| 90 return x | |
| 91 else: | |
| 92 import __builtin__ | |
| 93 ord = __builtin__.ord | |
| 94 return ''.join(['%02x:' % ord(char) for char in info[18:24]])[:-1] | |
| 95 | |
| 96 | |
| 97 def free_swap(): | |
| 98 """Parse 'free' cmd and return swap memory's s total, used and free | |
| 99 values. | |
| 100 """ | |
| 101 out = sh('free -b', env={"LANG": "C.UTF-8"}) | |
| 102 lines = out.split('\n') | |
| 103 for line in lines: | |
| 104 if line.startswith('Swap'): | |
| 105 _, total, used, free = line.split() | |
| 106 nt = collections.namedtuple('free', 'total used free') | |
| 107 return nt(int(total), int(used), int(free)) | |
| 108 raise ValueError( | |
| 109 "can't find 'Swap' in 'free' output:\n%s" % '\n'.join(lines)) | |
| 110 | |
| 111 | |
| 112 def free_physmem(): | |
| 113 """Parse 'free' cmd and return physical memory's total, used | |
| 114 and free values. | |
| 115 """ | |
| 116 # Note: free can have 2 different formats, invalidating 'shared' | |
| 117 # and 'cached' memory which may have different positions so we | |
| 118 # do not return them. | |
| 119 # https://github.com/giampaolo/psutil/issues/538#issuecomment-57059946 | |
| 120 out = sh('free -b', env={"LANG": "C.UTF-8"}) | |
| 121 lines = out.split('\n') | |
| 122 for line in lines: | |
| 123 if line.startswith('Mem'): | |
| 124 total, used, free, shared = \ | |
| 125 [int(x) for x in line.split()[1:5]] | |
| 126 nt = collections.namedtuple( | |
| 127 'free', 'total used free shared output') | |
| 128 return nt(total, used, free, shared, out) | |
| 129 raise ValueError( | |
| 130 "can't find 'Mem' in 'free' output:\n%s" % '\n'.join(lines)) | |
| 131 | |
| 132 | |
| 133 def vmstat(stat): | |
| 134 out = sh("vmstat -s", env={"LANG": "C.UTF-8"}) | |
| 135 for line in out.split("\n"): | |
| 136 line = line.strip() | |
| 137 if stat in line: | |
| 138 return int(line.split(' ')[0]) | |
| 139 raise ValueError("can't find %r in 'vmstat' output" % stat) | |
| 140 | |
| 141 | |
| 142 def get_free_version_info(): | |
| 143 out = sh("free -V").strip() | |
| 144 return tuple(map(int, out.split()[-1].split('.'))) | |
| 145 | |
| 146 | |
| 147 @contextlib.contextmanager | |
| 148 def mock_open_content(for_path, content): | |
| 149 """Mock open() builtin and forces it to return a certain `content` | |
| 150 on read() if the path being opened matches `for_path`. | |
| 151 """ | |
| 152 def open_mock(name, *args, **kwargs): | |
| 153 if name == for_path: | |
| 154 if PY3: | |
| 155 if isinstance(content, basestring): | |
| 156 return io.StringIO(content) | |
| 157 else: | |
| 158 return io.BytesIO(content) | |
| 159 else: | |
| 160 return io.BytesIO(content) | |
| 161 else: | |
| 162 return orig_open(name, *args, **kwargs) | |
| 163 | |
| 164 orig_open = open | |
| 165 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
| 166 with mock.patch(patch_point, create=True, side_effect=open_mock) as m: | |
| 167 yield m | |
| 168 | |
| 169 | |
| 170 @contextlib.contextmanager | |
| 171 def mock_open_exception(for_path, exc): | |
| 172 """Mock open() builtin and raises `exc` if the path being opened | |
| 173 matches `for_path`. | |
| 174 """ | |
| 175 def open_mock(name, *args, **kwargs): | |
| 176 if name == for_path: | |
| 177 raise exc | |
| 178 else: | |
| 179 return orig_open(name, *args, **kwargs) | |
| 180 | |
| 181 orig_open = open | |
| 182 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
| 183 with mock.patch(patch_point, create=True, side_effect=open_mock) as m: | |
| 184 yield m | |
| 185 | |
| 186 | |
| 187 # ===================================================================== | |
| 188 # --- system virtual memory | |
| 189 # ===================================================================== | |
| 190 | |
| 191 | |
| 192 @unittest.skipIf(not LINUX, "LINUX only") | |
| 193 class TestSystemVirtualMemory(unittest.TestCase): | |
| 194 | |
| 195 def test_total(self): | |
| 196 # free_value = free_physmem().total | |
| 197 # psutil_value = psutil.virtual_memory().total | |
| 198 # self.assertEqual(free_value, psutil_value) | |
| 199 vmstat_value = vmstat('total memory') * 1024 | |
| 200 psutil_value = psutil.virtual_memory().total | |
| 201 self.assertAlmostEqual(vmstat_value, psutil_value) | |
| 202 | |
| 203 # Older versions of procps used slab memory to calculate used memory. | |
| 204 # This got changed in: | |
| 205 # https://gitlab.com/procps-ng/procps/commit/ | |
| 206 # 05d751c4f076a2f0118b914c5e51cfbb4762ad8e | |
| 207 @unittest.skipIf(LINUX and get_free_version_info() < (3, 3, 12), | |
| 208 "old free version") | |
| 209 @retry_on_failure() | |
| 210 def test_used(self): | |
| 211 free = free_physmem() | |
| 212 free_value = free.used | |
| 213 psutil_value = psutil.virtual_memory().used | |
| 214 self.assertAlmostEqual( | |
| 215 free_value, psutil_value, delta=MEMORY_TOLERANCE, | |
| 216 msg='%s %s \n%s' % (free_value, psutil_value, free.output)) | |
| 217 | |
| 218 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS") | |
| 219 @retry_on_failure() | |
| 220 def test_free(self): | |
| 221 vmstat_value = vmstat('free memory') * 1024 | |
| 222 psutil_value = psutil.virtual_memory().free | |
| 223 self.assertAlmostEqual( | |
| 224 vmstat_value, psutil_value, delta=MEMORY_TOLERANCE) | |
| 225 | |
| 226 @retry_on_failure() | |
| 227 def test_buffers(self): | |
| 228 vmstat_value = vmstat('buffer memory') * 1024 | |
| 229 psutil_value = psutil.virtual_memory().buffers | |
| 230 self.assertAlmostEqual( | |
| 231 vmstat_value, psutil_value, delta=MEMORY_TOLERANCE) | |
| 232 | |
| 233 # https://travis-ci.org/giampaolo/psutil/jobs/226719664 | |
| 234 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS") | |
| 235 @retry_on_failure() | |
| 236 def test_active(self): | |
| 237 vmstat_value = vmstat('active memory') * 1024 | |
| 238 psutil_value = psutil.virtual_memory().active | |
| 239 self.assertAlmostEqual( | |
| 240 vmstat_value, psutil_value, delta=MEMORY_TOLERANCE) | |
| 241 | |
| 242 # https://travis-ci.org/giampaolo/psutil/jobs/227242952 | |
| 243 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS") | |
| 244 @retry_on_failure() | |
| 245 def test_inactive(self): | |
| 246 vmstat_value = vmstat('inactive memory') * 1024 | |
| 247 psutil_value = psutil.virtual_memory().inactive | |
| 248 self.assertAlmostEqual( | |
| 249 vmstat_value, psutil_value, delta=MEMORY_TOLERANCE) | |
| 250 | |
| 251 @retry_on_failure() | |
| 252 def test_shared(self): | |
| 253 free = free_physmem() | |
| 254 free_value = free.shared | |
| 255 if free_value == 0: | |
| 256 raise unittest.SkipTest("free does not support 'shared' column") | |
| 257 psutil_value = psutil.virtual_memory().shared | |
| 258 self.assertAlmostEqual( | |
| 259 free_value, psutil_value, delta=MEMORY_TOLERANCE, | |
| 260 msg='%s %s \n%s' % (free_value, psutil_value, free.output)) | |
| 261 | |
| 262 @retry_on_failure() | |
| 263 def test_available(self): | |
| 264 # "free" output format has changed at some point: | |
| 265 # https://github.com/giampaolo/psutil/issues/538#issuecomment-147192098 | |
| 266 out = sh("free -b") | |
| 267 lines = out.split('\n') | |
| 268 if 'available' not in lines[0]: | |
| 269 raise unittest.SkipTest("free does not support 'available' column") | |
| 270 else: | |
| 271 free_value = int(lines[1].split()[-1]) | |
| 272 psutil_value = psutil.virtual_memory().available | |
| 273 self.assertAlmostEqual( | |
| 274 free_value, psutil_value, delta=MEMORY_TOLERANCE, | |
| 275 msg='%s %s \n%s' % (free_value, psutil_value, out)) | |
| 276 | |
| 277 def test_warnings_on_misses(self): | |
| 278 # Emulate a case where /proc/meminfo provides few info. | |
| 279 # psutil is supposed to set the missing fields to 0 and | |
| 280 # raise a warning. | |
| 281 with mock_open_content( | |
| 282 '/proc/meminfo', | |
| 283 textwrap.dedent("""\ | |
| 284 Active(anon): 6145416 kB | |
| 285 Active(file): 2950064 kB | |
| 286 Inactive(anon): 574764 kB | |
| 287 Inactive(file): 1567648 kB | |
| 288 MemAvailable: -1 kB | |
| 289 MemFree: 2057400 kB | |
| 290 MemTotal: 16325648 kB | |
| 291 SReclaimable: 346648 kB | |
| 292 """).encode()) as m: | |
| 293 with warnings.catch_warnings(record=True) as ws: | |
| 294 warnings.simplefilter("always") | |
| 295 ret = psutil.virtual_memory() | |
| 296 assert m.called | |
| 297 self.assertEqual(len(ws), 1) | |
| 298 w = ws[0] | |
| 299 assert w.filename.endswith('psutil/_pslinux.py') | |
| 300 self.assertIn( | |
| 301 "memory stats couldn't be determined", str(w.message)) | |
| 302 self.assertIn("cached", str(w.message)) | |
| 303 self.assertIn("shared", str(w.message)) | |
| 304 self.assertIn("active", str(w.message)) | |
| 305 self.assertIn("inactive", str(w.message)) | |
| 306 self.assertIn("buffers", str(w.message)) | |
| 307 self.assertIn("available", str(w.message)) | |
| 308 self.assertEqual(ret.cached, 0) | |
| 309 self.assertEqual(ret.active, 0) | |
| 310 self.assertEqual(ret.inactive, 0) | |
| 311 self.assertEqual(ret.shared, 0) | |
| 312 self.assertEqual(ret.buffers, 0) | |
| 313 self.assertEqual(ret.available, 0) | |
| 314 self.assertEqual(ret.slab, 0) | |
| 315 | |
| 316 @retry_on_failure() | |
| 317 def test_avail_old_percent(self): | |
| 318 # Make sure that our calculation of avail mem for old kernels | |
| 319 # is off by max 15%. | |
| 320 from psutil._pslinux import calculate_avail_vmem | |
| 321 from psutil._pslinux import open_binary | |
| 322 | |
| 323 mems = {} | |
| 324 with open_binary('/proc/meminfo') as f: | |
| 325 for line in f: | |
| 326 fields = line.split() | |
| 327 mems[fields[0]] = int(fields[1]) * 1024 | |
| 328 | |
| 329 a = calculate_avail_vmem(mems) | |
| 330 if b'MemAvailable:' in mems: | |
| 331 b = mems[b'MemAvailable:'] | |
| 332 diff_percent = abs(a - b) / a * 100 | |
| 333 self.assertLess(diff_percent, 15) | |
| 334 | |
| 335 def test_avail_old_comes_from_kernel(self): | |
| 336 # Make sure "MemAvailable:" coluimn is used instead of relying | |
| 337 # on our internal algorithm to calculate avail mem. | |
| 338 with mock_open_content( | |
| 339 '/proc/meminfo', | |
| 340 textwrap.dedent("""\ | |
| 341 Active: 9444728 kB | |
| 342 Active(anon): 6145416 kB | |
| 343 Active(file): 2950064 kB | |
| 344 Buffers: 287952 kB | |
| 345 Cached: 4818144 kB | |
| 346 Inactive(file): 1578132 kB | |
| 347 Inactive(anon): 574764 kB | |
| 348 Inactive(file): 1567648 kB | |
| 349 MemAvailable: 6574984 kB | |
| 350 MemFree: 2057400 kB | |
| 351 MemTotal: 16325648 kB | |
| 352 Shmem: 577588 kB | |
| 353 SReclaimable: 346648 kB | |
| 354 """).encode()) as m: | |
| 355 with warnings.catch_warnings(record=True) as ws: | |
| 356 ret = psutil.virtual_memory() | |
| 357 assert m.called | |
| 358 self.assertEqual(ret.available, 6574984 * 1024) | |
| 359 w = ws[0] | |
| 360 self.assertIn( | |
| 361 "inactive memory stats couldn't be determined", str(w.message)) | |
| 362 | |
| 363 def test_avail_old_missing_fields(self): | |
| 364 # Remove Active(file), Inactive(file) and SReclaimable | |
| 365 # from /proc/meminfo and make sure the fallback is used | |
| 366 # (free + cached), | |
| 367 with mock_open_content( | |
| 368 "/proc/meminfo", | |
| 369 textwrap.dedent("""\ | |
| 370 Active: 9444728 kB | |
| 371 Active(anon): 6145416 kB | |
| 372 Buffers: 287952 kB | |
| 373 Cached: 4818144 kB | |
| 374 Inactive(file): 1578132 kB | |
| 375 Inactive(anon): 574764 kB | |
| 376 MemFree: 2057400 kB | |
| 377 MemTotal: 16325648 kB | |
| 378 Shmem: 577588 kB | |
| 379 """).encode()) as m: | |
| 380 with warnings.catch_warnings(record=True) as ws: | |
| 381 ret = psutil.virtual_memory() | |
| 382 assert m.called | |
| 383 self.assertEqual(ret.available, 2057400 * 1024 + 4818144 * 1024) | |
| 384 w = ws[0] | |
| 385 self.assertIn( | |
| 386 "inactive memory stats couldn't be determined", str(w.message)) | |
| 387 | |
| 388 def test_avail_old_missing_zoneinfo(self): | |
| 389 # Remove /proc/zoneinfo file. Make sure fallback is used | |
| 390 # (free + cached). | |
| 391 with mock_open_content( | |
| 392 "/proc/meminfo", | |
| 393 textwrap.dedent("""\ | |
| 394 Active: 9444728 kB | |
| 395 Active(anon): 6145416 kB | |
| 396 Active(file): 2950064 kB | |
| 397 Buffers: 287952 kB | |
| 398 Cached: 4818144 kB | |
| 399 Inactive(file): 1578132 kB | |
| 400 Inactive(anon): 574764 kB | |
| 401 Inactive(file): 1567648 kB | |
| 402 MemFree: 2057400 kB | |
| 403 MemTotal: 16325648 kB | |
| 404 Shmem: 577588 kB | |
| 405 SReclaimable: 346648 kB | |
| 406 """).encode()): | |
| 407 with mock_open_exception( | |
| 408 "/proc/zoneinfo", | |
| 409 IOError(errno.ENOENT, 'no such file or directory')): | |
| 410 with warnings.catch_warnings(record=True) as ws: | |
| 411 ret = psutil.virtual_memory() | |
| 412 self.assertEqual( | |
| 413 ret.available, 2057400 * 1024 + 4818144 * 1024) | |
| 414 w = ws[0] | |
| 415 self.assertIn( | |
| 416 "inactive memory stats couldn't be determined", | |
| 417 str(w.message)) | |
| 418 | |
| 419 def test_virtual_memory_mocked(self): | |
| 420 # Emulate /proc/meminfo because neither vmstat nor free return slab. | |
| 421 def open_mock(name, *args, **kwargs): | |
| 422 if name == '/proc/meminfo': | |
| 423 return io.BytesIO(textwrap.dedent("""\ | |
| 424 MemTotal: 100 kB | |
| 425 MemFree: 2 kB | |
| 426 MemAvailable: 3 kB | |
| 427 Buffers: 4 kB | |
| 428 Cached: 5 kB | |
| 429 SwapCached: 6 kB | |
| 430 Active: 7 kB | |
| 431 Inactive: 8 kB | |
| 432 Active(anon): 9 kB | |
| 433 Inactive(anon): 10 kB | |
| 434 Active(file): 11 kB | |
| 435 Inactive(file): 12 kB | |
| 436 Unevictable: 13 kB | |
| 437 Mlocked: 14 kB | |
| 438 SwapTotal: 15 kB | |
| 439 SwapFree: 16 kB | |
| 440 Dirty: 17 kB | |
| 441 Writeback: 18 kB | |
| 442 AnonPages: 19 kB | |
| 443 Mapped: 20 kB | |
| 444 Shmem: 21 kB | |
| 445 Slab: 22 kB | |
| 446 SReclaimable: 23 kB | |
| 447 SUnreclaim: 24 kB | |
| 448 KernelStack: 25 kB | |
| 449 PageTables: 26 kB | |
| 450 NFS_Unstable: 27 kB | |
| 451 Bounce: 28 kB | |
| 452 WritebackTmp: 29 kB | |
| 453 CommitLimit: 30 kB | |
| 454 Committed_AS: 31 kB | |
| 455 VmallocTotal: 32 kB | |
| 456 VmallocUsed: 33 kB | |
| 457 VmallocChunk: 34 kB | |
| 458 HardwareCorrupted: 35 kB | |
| 459 AnonHugePages: 36 kB | |
| 460 ShmemHugePages: 37 kB | |
| 461 ShmemPmdMapped: 38 kB | |
| 462 CmaTotal: 39 kB | |
| 463 CmaFree: 40 kB | |
| 464 HugePages_Total: 41 kB | |
| 465 HugePages_Free: 42 kB | |
| 466 HugePages_Rsvd: 43 kB | |
| 467 HugePages_Surp: 44 kB | |
| 468 Hugepagesize: 45 kB | |
| 469 DirectMap46k: 46 kB | |
| 470 DirectMap47M: 47 kB | |
| 471 DirectMap48G: 48 kB | |
| 472 """).encode()) | |
| 473 else: | |
| 474 return orig_open(name, *args, **kwargs) | |
| 475 | |
| 476 orig_open = open | |
| 477 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
| 478 with mock.patch(patch_point, create=True, side_effect=open_mock) as m: | |
| 479 mem = psutil.virtual_memory() | |
| 480 assert m.called | |
| 481 self.assertEqual(mem.total, 100 * 1024) | |
| 482 self.assertEqual(mem.free, 2 * 1024) | |
| 483 self.assertEqual(mem.buffers, 4 * 1024) | |
| 484 # cached mem also includes reclaimable memory | |
| 485 self.assertEqual(mem.cached, (5 + 23) * 1024) | |
| 486 self.assertEqual(mem.shared, 21 * 1024) | |
| 487 self.assertEqual(mem.active, 7 * 1024) | |
| 488 self.assertEqual(mem.inactive, 8 * 1024) | |
| 489 self.assertEqual(mem.slab, 22 * 1024) | |
| 490 self.assertEqual(mem.available, 3 * 1024) | |
| 491 | |
| 492 | |
| 493 # ===================================================================== | |
| 494 # --- system swap memory | |
| 495 # ===================================================================== | |
| 496 | |
| 497 | |
| 498 @unittest.skipIf(not LINUX, "LINUX only") | |
| 499 class TestSystemSwapMemory(unittest.TestCase): | |
| 500 | |
| 501 @staticmethod | |
| 502 def meminfo_has_swap_info(): | |
| 503 """Return True if /proc/meminfo provides swap metrics.""" | |
| 504 with open("/proc/meminfo") as f: | |
| 505 data = f.read() | |
| 506 return 'SwapTotal:' in data and 'SwapFree:' in data | |
| 507 | |
| 508 def test_total(self): | |
| 509 free_value = free_swap().total | |
| 510 psutil_value = psutil.swap_memory().total | |
| 511 return self.assertAlmostEqual( | |
| 512 free_value, psutil_value, delta=MEMORY_TOLERANCE) | |
| 513 | |
| 514 @retry_on_failure() | |
| 515 def test_used(self): | |
| 516 free_value = free_swap().used | |
| 517 psutil_value = psutil.swap_memory().used | |
| 518 return self.assertAlmostEqual( | |
| 519 free_value, psutil_value, delta=MEMORY_TOLERANCE) | |
| 520 | |
| 521 @retry_on_failure() | |
| 522 def test_free(self): | |
| 523 free_value = free_swap().free | |
| 524 psutil_value = psutil.swap_memory().free | |
| 525 return self.assertAlmostEqual( | |
| 526 free_value, psutil_value, delta=MEMORY_TOLERANCE) | |
| 527 | |
| 528 def test_missing_sin_sout(self): | |
| 529 with mock.patch('psutil._common.open', create=True) as m: | |
| 530 with warnings.catch_warnings(record=True) as ws: | |
| 531 warnings.simplefilter("always") | |
| 532 ret = psutil.swap_memory() | |
| 533 assert m.called | |
| 534 self.assertEqual(len(ws), 1) | |
| 535 w = ws[0] | |
| 536 assert w.filename.endswith('psutil/_pslinux.py') | |
| 537 self.assertIn( | |
| 538 "'sin' and 'sout' swap memory stats couldn't " | |
| 539 "be determined", str(w.message)) | |
| 540 self.assertEqual(ret.sin, 0) | |
| 541 self.assertEqual(ret.sout, 0) | |
| 542 | |
| 543 def test_no_vmstat_mocked(self): | |
| 544 # see https://github.com/giampaolo/psutil/issues/722 | |
| 545 with mock_open_exception( | |
| 546 "/proc/vmstat", | |
| 547 IOError(errno.ENOENT, 'no such file or directory')) as m: | |
| 548 with warnings.catch_warnings(record=True) as ws: | |
| 549 warnings.simplefilter("always") | |
| 550 ret = psutil.swap_memory() | |
| 551 assert m.called | |
| 552 self.assertEqual(len(ws), 1) | |
| 553 w = ws[0] | |
| 554 assert w.filename.endswith('psutil/_pslinux.py') | |
| 555 self.assertIn( | |
| 556 "'sin' and 'sout' swap memory stats couldn't " | |
| 557 "be determined and were set to 0", | |
| 558 str(w.message)) | |
| 559 self.assertEqual(ret.sin, 0) | |
| 560 self.assertEqual(ret.sout, 0) | |
| 561 | |
| 562 def test_meminfo_against_sysinfo(self): | |
| 563 # Make sure the content of /proc/meminfo about swap memory | |
| 564 # matches sysinfo() syscall, see: | |
| 565 # https://github.com/giampaolo/psutil/issues/1015 | |
| 566 if not self.meminfo_has_swap_info(): | |
| 567 return unittest.skip("/proc/meminfo has no swap metrics") | |
| 568 with mock.patch('psutil._pslinux.cext.linux_sysinfo') as m: | |
| 569 swap = psutil.swap_memory() | |
| 570 assert not m.called | |
| 571 import psutil._psutil_linux as cext | |
| 572 _, _, _, _, total, free, unit_multiplier = cext.linux_sysinfo() | |
| 573 total *= unit_multiplier | |
| 574 free *= unit_multiplier | |
| 575 self.assertEqual(swap.total, total) | |
| 576 self.assertAlmostEqual(swap.free, free, delta=MEMORY_TOLERANCE) | |
| 577 | |
| 578 def test_emulate_meminfo_has_no_metrics(self): | |
| 579 # Emulate a case where /proc/meminfo provides no swap metrics | |
| 580 # in which case sysinfo() syscall is supposed to be used | |
| 581 # as a fallback. | |
| 582 with mock_open_content("/proc/meminfo", b"") as m: | |
| 583 psutil.swap_memory() | |
| 584 assert m.called | |
| 585 | |
| 586 | |
| 587 # ===================================================================== | |
| 588 # --- system CPU | |
| 589 # ===================================================================== | |
| 590 | |
| 591 | |
| 592 @unittest.skipIf(not LINUX, "LINUX only") | |
| 593 class TestSystemCPUTimes(unittest.TestCase): | |
| 594 | |
| 595 @unittest.skipIf(TRAVIS, "unknown failure on travis") | |
| 596 def test_fields(self): | |
| 597 fields = psutil.cpu_times()._fields | |
| 598 kernel_ver = re.findall(r'\d+\.\d+\.\d+', os.uname()[2])[0] | |
| 599 kernel_ver_info = tuple(map(int, kernel_ver.split('.'))) | |
| 600 if kernel_ver_info >= (2, 6, 11): | |
| 601 self.assertIn('steal', fields) | |
| 602 else: | |
| 603 self.assertNotIn('steal', fields) | |
| 604 if kernel_ver_info >= (2, 6, 24): | |
| 605 self.assertIn('guest', fields) | |
| 606 else: | |
| 607 self.assertNotIn('guest', fields) | |
| 608 if kernel_ver_info >= (3, 2, 0): | |
| 609 self.assertIn('guest_nice', fields) | |
| 610 else: | |
| 611 self.assertNotIn('guest_nice', fields) | |
| 612 | |
| 613 | |
| 614 @unittest.skipIf(not LINUX, "LINUX only") | |
| 615 class TestSystemCPUCountLogical(unittest.TestCase): | |
| 616 | |
| 617 @unittest.skipIf(not os.path.exists("/sys/devices/system/cpu/online"), | |
| 618 "/sys/devices/system/cpu/online does not exist") | |
| 619 def test_against_sysdev_cpu_online(self): | |
| 620 with open("/sys/devices/system/cpu/online") as f: | |
| 621 value = f.read().strip() | |
| 622 if "-" in str(value): | |
| 623 value = int(value.split('-')[1]) + 1 | |
| 624 self.assertEqual(psutil.cpu_count(), value) | |
| 625 | |
| 626 @unittest.skipIf(not os.path.exists("/sys/devices/system/cpu"), | |
| 627 "/sys/devices/system/cpu does not exist") | |
| 628 def test_against_sysdev_cpu_num(self): | |
| 629 ls = os.listdir("/sys/devices/system/cpu") | |
| 630 count = len([x for x in ls if re.search(r"cpu\d+$", x) is not None]) | |
| 631 self.assertEqual(psutil.cpu_count(), count) | |
| 632 | |
| 633 @unittest.skipIf(not which("nproc"), "nproc utility not available") | |
| 634 def test_against_nproc(self): | |
| 635 num = int(sh("nproc --all")) | |
| 636 self.assertEqual(psutil.cpu_count(logical=True), num) | |
| 637 | |
| 638 @unittest.skipIf(not which("lscpu"), "lscpu utility not available") | |
| 639 def test_against_lscpu(self): | |
| 640 out = sh("lscpu -p") | |
| 641 num = len([x for x in out.split('\n') if not x.startswith('#')]) | |
| 642 self.assertEqual(psutil.cpu_count(logical=True), num) | |
| 643 | |
| 644 def test_emulate_fallbacks(self): | |
| 645 import psutil._pslinux | |
| 646 original = psutil._pslinux.cpu_count_logical() | |
| 647 # Here we want to mock os.sysconf("SC_NPROCESSORS_ONLN") in | |
| 648 # order to cause the parsing of /proc/cpuinfo and /proc/stat. | |
| 649 with mock.patch( | |
| 650 'psutil._pslinux.os.sysconf', side_effect=ValueError) as m: | |
| 651 self.assertEqual(psutil._pslinux.cpu_count_logical(), original) | |
| 652 assert m.called | |
| 653 | |
| 654 # Let's have open() return emtpy data and make sure None is | |
| 655 # returned ('cause we mimick os.cpu_count()). | |
| 656 with mock.patch('psutil._common.open', create=True) as m: | |
| 657 self.assertIsNone(psutil._pslinux.cpu_count_logical()) | |
| 658 self.assertEqual(m.call_count, 2) | |
| 659 # /proc/stat should be the last one | |
| 660 self.assertEqual(m.call_args[0][0], '/proc/stat') | |
| 661 | |
| 662 # Let's push this a bit further and make sure /proc/cpuinfo | |
| 663 # parsing works as expected. | |
| 664 with open('/proc/cpuinfo', 'rb') as f: | |
| 665 cpuinfo_data = f.read() | |
| 666 fake_file = io.BytesIO(cpuinfo_data) | |
| 667 with mock.patch('psutil._common.open', | |
| 668 return_value=fake_file, create=True) as m: | |
| 669 self.assertEqual(psutil._pslinux.cpu_count_logical(), original) | |
| 670 | |
| 671 # Finally, let's make /proc/cpuinfo return meaningless data; | |
| 672 # this way we'll fall back on relying on /proc/stat | |
| 673 with mock_open_content('/proc/cpuinfo', b"") as m: | |
| 674 self.assertEqual(psutil._pslinux.cpu_count_logical(), original) | |
| 675 m.called | |
| 676 | |
| 677 | |
| 678 @unittest.skipIf(not LINUX, "LINUX only") | |
| 679 class TestSystemCPUCountPhysical(unittest.TestCase): | |
| 680 | |
| 681 @unittest.skipIf(not which("lscpu"), "lscpu utility not available") | |
| 682 def test_against_lscpu(self): | |
| 683 out = sh("lscpu -p") | |
| 684 core_ids = set() | |
| 685 for line in out.split('\n'): | |
| 686 if not line.startswith('#'): | |
| 687 fields = line.split(',') | |
| 688 core_ids.add(fields[1]) | |
| 689 self.assertEqual(psutil.cpu_count(logical=False), len(core_ids)) | |
| 690 | |
| 691 def test_emulate_none(self): | |
| 692 with mock.patch('glob.glob', return_value=[]) as m1: | |
| 693 with mock.patch('psutil._common.open', create=True) as m2: | |
| 694 self.assertIsNone(psutil._pslinux.cpu_count_physical()) | |
| 695 assert m1.called | |
| 696 assert m2.called | |
| 697 | |
| 698 | |
| 699 @unittest.skipIf(not LINUX, "LINUX only") | |
| 700 class TestSystemCPUFrequency(unittest.TestCase): | |
| 701 | |
| 702 @unittest.skipIf(TRAVIS, "fails on Travis") | |
| 703 @unittest.skipIf(not HAS_CPU_FREQ, "not supported") | |
| 704 def test_emulate_use_second_file(self): | |
| 705 # https://github.com/giampaolo/psutil/issues/981 | |
| 706 def path_exists_mock(path): | |
| 707 if path.startswith("/sys/devices/system/cpu/cpufreq/policy"): | |
| 708 return False | |
| 709 else: | |
| 710 return orig_exists(path) | |
| 711 | |
| 712 orig_exists = os.path.exists | |
| 713 with mock.patch("os.path.exists", side_effect=path_exists_mock, | |
| 714 create=True): | |
| 715 assert psutil.cpu_freq() | |
| 716 | |
| 717 @unittest.skipIf(not HAS_CPU_FREQ, "not supported") | |
| 718 def test_emulate_use_cpuinfo(self): | |
| 719 # Emulate a case where /sys/devices/system/cpu/cpufreq* does not | |
| 720 # exist and /proc/cpuinfo is used instead. | |
| 721 def path_exists_mock(path): | |
| 722 if path.startswith('/sys/devices/system/cpu/'): | |
| 723 return False | |
| 724 else: | |
| 725 if path == "/proc/cpuinfo": | |
| 726 flags.append(None) | |
| 727 return os_path_exists(path) | |
| 728 | |
| 729 flags = [] | |
| 730 os_path_exists = os.path.exists | |
| 731 try: | |
| 732 with mock.patch("os.path.exists", side_effect=path_exists_mock): | |
| 733 reload_module(psutil._pslinux) | |
| 734 ret = psutil.cpu_freq() | |
| 735 assert ret | |
| 736 assert flags | |
| 737 self.assertEqual(ret.max, 0.0) | |
| 738 self.assertEqual(ret.min, 0.0) | |
| 739 for freq in psutil.cpu_freq(percpu=True): | |
| 740 self.assertEqual(ret.max, 0.0) | |
| 741 self.assertEqual(ret.min, 0.0) | |
| 742 finally: | |
| 743 reload_module(psutil._pslinux) | |
| 744 reload_module(psutil) | |
| 745 | |
| 746 @unittest.skipIf(not HAS_CPU_FREQ, "not supported") | |
| 747 def test_emulate_data(self): | |
| 748 def open_mock(name, *args, **kwargs): | |
| 749 if (name.endswith('/scaling_cur_freq') and | |
| 750 name.startswith("/sys/devices/system/cpu/cpufreq/policy")): | |
| 751 return io.BytesIO(b"500000") | |
| 752 elif (name.endswith('/scaling_min_freq') and | |
| 753 name.startswith("/sys/devices/system/cpu/cpufreq/policy")): | |
| 754 return io.BytesIO(b"600000") | |
| 755 elif (name.endswith('/scaling_max_freq') and | |
| 756 name.startswith("/sys/devices/system/cpu/cpufreq/policy")): | |
| 757 return io.BytesIO(b"700000") | |
| 758 elif name == '/proc/cpuinfo': | |
| 759 return io.BytesIO(b"cpu MHz : 500") | |
| 760 else: | |
| 761 return orig_open(name, *args, **kwargs) | |
| 762 | |
| 763 orig_open = open | |
| 764 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
| 765 with mock.patch(patch_point, side_effect=open_mock): | |
| 766 with mock.patch( | |
| 767 'os.path.exists', return_value=True): | |
| 768 freq = psutil.cpu_freq() | |
| 769 self.assertEqual(freq.current, 500.0) | |
| 770 # when /proc/cpuinfo is used min and max frequencies are not | |
| 771 # available and are set to 0. | |
| 772 if freq.min != 0.0: | |
| 773 self.assertEqual(freq.min, 600.0) | |
| 774 if freq.max != 0.0: | |
| 775 self.assertEqual(freq.max, 700.0) | |
| 776 | |
| 777 @unittest.skipIf(not HAS_CPU_FREQ, "not supported") | |
| 778 def test_emulate_multi_cpu(self): | |
| 779 def open_mock(name, *args, **kwargs): | |
| 780 n = name | |
| 781 if (n.endswith('/scaling_cur_freq') and | |
| 782 n.startswith("/sys/devices/system/cpu/cpufreq/policy0")): | |
| 783 return io.BytesIO(b"100000") | |
| 784 elif (n.endswith('/scaling_min_freq') and | |
| 785 n.startswith("/sys/devices/system/cpu/cpufreq/policy0")): | |
| 786 return io.BytesIO(b"200000") | |
| 787 elif (n.endswith('/scaling_max_freq') and | |
| 788 n.startswith("/sys/devices/system/cpu/cpufreq/policy0")): | |
| 789 return io.BytesIO(b"300000") | |
| 790 elif (n.endswith('/scaling_cur_freq') and | |
| 791 n.startswith("/sys/devices/system/cpu/cpufreq/policy1")): | |
| 792 return io.BytesIO(b"400000") | |
| 793 elif (n.endswith('/scaling_min_freq') and | |
| 794 n.startswith("/sys/devices/system/cpu/cpufreq/policy1")): | |
| 795 return io.BytesIO(b"500000") | |
| 796 elif (n.endswith('/scaling_max_freq') and | |
| 797 n.startswith("/sys/devices/system/cpu/cpufreq/policy1")): | |
| 798 return io.BytesIO(b"600000") | |
| 799 elif name == '/proc/cpuinfo': | |
| 800 return io.BytesIO(b"cpu MHz : 100\n" | |
| 801 b"cpu MHz : 400") | |
| 802 else: | |
| 803 return orig_open(name, *args, **kwargs) | |
| 804 | |
| 805 orig_open = open | |
| 806 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
| 807 with mock.patch(patch_point, side_effect=open_mock): | |
| 808 with mock.patch('os.path.exists', return_value=True): | |
| 809 with mock.patch('psutil._pslinux.cpu_count_logical', | |
| 810 return_value=2): | |
| 811 freq = psutil.cpu_freq(percpu=True) | |
| 812 self.assertEqual(freq[0].current, 100.0) | |
| 813 if freq[0].min != 0.0: | |
| 814 self.assertEqual(freq[0].min, 200.0) | |
| 815 if freq[0].max != 0.0: | |
| 816 self.assertEqual(freq[0].max, 300.0) | |
| 817 self.assertEqual(freq[1].current, 400.0) | |
| 818 if freq[1].min != 0.0: | |
| 819 self.assertEqual(freq[1].min, 500.0) | |
| 820 if freq[1].max != 0.0: | |
| 821 self.assertEqual(freq[1].max, 600.0) | |
| 822 | |
| 823 @unittest.skipIf(TRAVIS, "fails on Travis") | |
| 824 @unittest.skipIf(not HAS_CPU_FREQ, "not supported") | |
| 825 def test_emulate_no_scaling_cur_freq_file(self): | |
| 826 # See: https://github.com/giampaolo/psutil/issues/1071 | |
| 827 def open_mock(name, *args, **kwargs): | |
| 828 if name.endswith('/scaling_cur_freq'): | |
| 829 raise IOError(errno.ENOENT, "") | |
| 830 elif name.endswith('/cpuinfo_cur_freq'): | |
| 831 return io.BytesIO(b"200000") | |
| 832 elif name == '/proc/cpuinfo': | |
| 833 return io.BytesIO(b"cpu MHz : 200") | |
| 834 else: | |
| 835 return orig_open(name, *args, **kwargs) | |
| 836 | |
| 837 orig_open = open | |
| 838 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
| 839 with mock.patch(patch_point, side_effect=open_mock): | |
| 840 with mock.patch('os.path.exists', return_value=True): | |
| 841 with mock.patch('psutil._pslinux.cpu_count_logical', | |
| 842 return_value=1): | |
| 843 freq = psutil.cpu_freq() | |
| 844 self.assertEqual(freq.current, 200) | |
| 845 | |
| 846 | |
| 847 @unittest.skipIf(not LINUX, "LINUX only") | |
| 848 class TestSystemCPUStats(unittest.TestCase): | |
| 849 | |
| 850 @unittest.skipIf(TRAVIS, "fails on Travis") | |
| 851 def test_ctx_switches(self): | |
| 852 vmstat_value = vmstat("context switches") | |
| 853 psutil_value = psutil.cpu_stats().ctx_switches | |
| 854 self.assertAlmostEqual(vmstat_value, psutil_value, delta=500) | |
| 855 | |
| 856 @unittest.skipIf(TRAVIS, "fails on Travis") | |
| 857 def test_interrupts(self): | |
| 858 vmstat_value = vmstat("interrupts") | |
| 859 psutil_value = psutil.cpu_stats().interrupts | |
| 860 self.assertAlmostEqual(vmstat_value, psutil_value, delta=500) | |
| 861 | |
| 862 | |
| 863 @unittest.skipIf(not LINUX, "LINUX only") | |
| 864 class TestLoadAvg(unittest.TestCase): | |
| 865 | |
| 866 @unittest.skipIf(not HAS_GETLOADAVG, "not supported") | |
| 867 def test_getloadavg(self): | |
| 868 psutil_value = psutil.getloadavg() | |
| 869 with open("/proc/loadavg", "r") as f: | |
| 870 proc_value = f.read().split() | |
| 871 | |
| 872 self.assertAlmostEqual(float(proc_value[0]), psutil_value[0], delta=1) | |
| 873 self.assertAlmostEqual(float(proc_value[1]), psutil_value[1], delta=1) | |
| 874 self.assertAlmostEqual(float(proc_value[2]), psutil_value[2], delta=1) | |
| 875 | |
| 876 | |
| 877 # ===================================================================== | |
| 878 # --- system network | |
| 879 # ===================================================================== | |
| 880 | |
| 881 | |
| 882 @unittest.skipIf(not LINUX, "LINUX only") | |
| 883 class TestSystemNetIfAddrs(unittest.TestCase): | |
| 884 | |
| 885 def test_ips(self): | |
| 886 for name, addrs in psutil.net_if_addrs().items(): | |
| 887 for addr in addrs: | |
| 888 if addr.family == psutil.AF_LINK: | |
| 889 self.assertEqual(addr.address, get_mac_address(name)) | |
| 890 elif addr.family == socket.AF_INET: | |
| 891 self.assertEqual(addr.address, get_ipv4_address(name)) | |
| 892 # TODO: test for AF_INET6 family | |
| 893 | |
| 894 # XXX - not reliable when having virtual NICs installed by Docker. | |
| 895 # @unittest.skipIf(not which('ip'), "'ip' utility not available") | |
| 896 # @unittest.skipIf(TRAVIS, "skipped on Travis") | |
| 897 # def test_net_if_names(self): | |
| 898 # out = sh("ip addr").strip() | |
| 899 # nics = [x for x in psutil.net_if_addrs().keys() if ':' not in x] | |
| 900 # found = 0 | |
| 901 # for line in out.split('\n'): | |
| 902 # line = line.strip() | |
| 903 # if re.search(r"^\d+:", line): | |
| 904 # found += 1 | |
| 905 # name = line.split(':')[1].strip() | |
| 906 # self.assertIn(name, nics) | |
| 907 # self.assertEqual(len(nics), found, msg="%s\n---\n%s" % ( | |
| 908 # pprint.pformat(nics), out)) | |
| 909 | |
| 910 | |
| 911 @unittest.skipIf(not LINUX, "LINUX only") | |
| 912 class TestSystemNetIfStats(unittest.TestCase): | |
| 913 | |
| 914 def test_against_ifconfig(self): | |
| 915 for name, stats in psutil.net_if_stats().items(): | |
| 916 try: | |
| 917 out = sh("ifconfig %s" % name) | |
| 918 except RuntimeError: | |
| 919 pass | |
| 920 else: | |
| 921 # Not always reliable. | |
| 922 # self.assertEqual(stats.isup, 'RUNNING' in out, msg=out) | |
| 923 self.assertEqual(stats.mtu, | |
| 924 int(re.findall(r'(?i)MTU[: ](\d+)', out)[0])) | |
| 925 | |
| 926 | |
| 927 @unittest.skipIf(not LINUX, "LINUX only") | |
| 928 class TestSystemNetIOCounters(unittest.TestCase): | |
| 929 | |
| 930 @retry_on_failure() | |
| 931 def test_against_ifconfig(self): | |
| 932 def ifconfig(nic): | |
| 933 ret = {} | |
| 934 out = sh("ifconfig %s" % name) | |
| 935 ret['packets_recv'] = int( | |
| 936 re.findall(r'RX packets[: ](\d+)', out)[0]) | |
| 937 ret['packets_sent'] = int( | |
| 938 re.findall(r'TX packets[: ](\d+)', out)[0]) | |
| 939 ret['errin'] = int(re.findall(r'errors[: ](\d+)', out)[0]) | |
| 940 ret['errout'] = int(re.findall(r'errors[: ](\d+)', out)[1]) | |
| 941 ret['dropin'] = int(re.findall(r'dropped[: ](\d+)', out)[0]) | |
| 942 ret['dropout'] = int(re.findall(r'dropped[: ](\d+)', out)[1]) | |
| 943 ret['bytes_recv'] = int( | |
| 944 re.findall(r'RX (?:packets \d+ +)?bytes[: ](\d+)', out)[0]) | |
| 945 ret['bytes_sent'] = int( | |
| 946 re.findall(r'TX (?:packets \d+ +)?bytes[: ](\d+)', out)[0]) | |
| 947 return ret | |
| 948 | |
| 949 nio = psutil.net_io_counters(pernic=True, nowrap=False) | |
| 950 for name, stats in nio.items(): | |
| 951 try: | |
| 952 ifconfig_ret = ifconfig(name) | |
| 953 except RuntimeError: | |
| 954 continue | |
| 955 self.assertAlmostEqual( | |
| 956 stats.bytes_recv, ifconfig_ret['bytes_recv'], delta=1024 * 5) | |
| 957 self.assertAlmostEqual( | |
| 958 stats.bytes_sent, ifconfig_ret['bytes_sent'], delta=1024 * 5) | |
| 959 self.assertAlmostEqual( | |
| 960 stats.packets_recv, ifconfig_ret['packets_recv'], delta=1024) | |
| 961 self.assertAlmostEqual( | |
| 962 stats.packets_sent, ifconfig_ret['packets_sent'], delta=1024) | |
| 963 self.assertAlmostEqual( | |
| 964 stats.errin, ifconfig_ret['errin'], delta=10) | |
| 965 self.assertAlmostEqual( | |
| 966 stats.errout, ifconfig_ret['errout'], delta=10) | |
| 967 self.assertAlmostEqual( | |
| 968 stats.dropin, ifconfig_ret['dropin'], delta=10) | |
| 969 self.assertAlmostEqual( | |
| 970 stats.dropout, ifconfig_ret['dropout'], delta=10) | |
| 971 | |
| 972 | |
| 973 @unittest.skipIf(not LINUX, "LINUX only") | |
| 974 class TestSystemNetConnections(unittest.TestCase): | |
| 975 | |
| 976 @mock.patch('psutil._pslinux.socket.inet_ntop', side_effect=ValueError) | |
| 977 @mock.patch('psutil._pslinux.supports_ipv6', return_value=False) | |
| 978 def test_emulate_ipv6_unsupported(self, supports_ipv6, inet_ntop): | |
| 979 # see: https://github.com/giampaolo/psutil/issues/623 | |
| 980 try: | |
| 981 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) | |
| 982 self.addCleanup(s.close) | |
| 983 s.bind(("::1", 0)) | |
| 984 except socket.error: | |
| 985 pass | |
| 986 psutil.net_connections(kind='inet6') | |
| 987 | |
| 988 def test_emulate_unix(self): | |
| 989 with mock_open_content( | |
| 990 '/proc/net/unix', | |
| 991 textwrap.dedent("""\ | |
| 992 0: 00000003 000 000 0001 03 462170 @/tmp/dbus-Qw2hMPIU3n | |
| 993 0: 00000003 000 000 0001 03 35010 @/tmp/dbus-tB2X8h69BQ | |
| 994 0: 00000003 000 000 0001 03 34424 @/tmp/dbus-cHy80Y8O | |
| 995 000000000000000000000000000000000000000000000000000000 | |
| 996 """)) as m: | |
| 997 psutil.net_connections(kind='unix') | |
| 998 assert m.called | |
| 999 | |
| 1000 | |
| 1001 # ===================================================================== | |
| 1002 # --- system disks | |
| 1003 # ===================================================================== | |
| 1004 | |
| 1005 | |
| 1006 @unittest.skipIf(not LINUX, "LINUX only") | |
| 1007 class TestSystemDiskPartitions(unittest.TestCase): | |
| 1008 | |
| 1009 @unittest.skipIf(not hasattr(os, 'statvfs'), "os.statvfs() not available") | |
| 1010 @skip_on_not_implemented() | |
| 1011 def test_against_df(self): | |
| 1012 # test psutil.disk_usage() and psutil.disk_partitions() | |
| 1013 # against "df -a" | |
| 1014 def df(path): | |
| 1015 out = sh('df -P -B 1 "%s"' % path).strip() | |
| 1016 lines = out.split('\n') | |
| 1017 lines.pop(0) | |
| 1018 line = lines.pop(0) | |
| 1019 dev, total, used, free = line.split()[:4] | |
| 1020 if dev == 'none': | |
| 1021 dev = '' | |
| 1022 total, used, free = int(total), int(used), int(free) | |
| 1023 return dev, total, used, free | |
| 1024 | |
| 1025 for part in psutil.disk_partitions(all=False): | |
| 1026 usage = psutil.disk_usage(part.mountpoint) | |
| 1027 dev, total, used, free = df(part.mountpoint) | |
| 1028 self.assertEqual(usage.total, total) | |
| 1029 # 10 MB tollerance | |
| 1030 if abs(usage.free - free) > 10 * 1024 * 1024: | |
| 1031 self.fail("psutil=%s, df=%s" % (usage.free, free)) | |
| 1032 if abs(usage.used - used) > 10 * 1024 * 1024: | |
| 1033 self.fail("psutil=%s, df=%s" % (usage.used, used)) | |
| 1034 | |
| 1035 def test_zfs_fs(self): | |
| 1036 # Test that ZFS partitions are returned. | |
| 1037 with open("/proc/filesystems", "r") as f: | |
| 1038 data = f.read() | |
| 1039 if 'zfs' in data: | |
| 1040 for part in psutil.disk_partitions(): | |
| 1041 if part.fstype == 'zfs': | |
| 1042 break | |
| 1043 else: | |
| 1044 self.fail("couldn't find any ZFS partition") | |
| 1045 else: | |
| 1046 # No ZFS partitions on this system. Let's fake one. | |
| 1047 fake_file = io.StringIO(u("nodev\tzfs\n")) | |
| 1048 with mock.patch('psutil._common.open', | |
| 1049 return_value=fake_file, create=True) as m1: | |
| 1050 with mock.patch( | |
| 1051 'psutil._pslinux.cext.disk_partitions', | |
| 1052 return_value=[('/dev/sdb3', '/', 'zfs', 'rw')]) as m2: | |
| 1053 ret = psutil.disk_partitions() | |
| 1054 assert m1.called | |
| 1055 assert m2.called | |
| 1056 assert ret | |
| 1057 self.assertEqual(ret[0].fstype, 'zfs') | |
| 1058 | |
| 1059 def test_emulate_realpath_fail(self): | |
| 1060 # See: https://github.com/giampaolo/psutil/issues/1307 | |
| 1061 try: | |
| 1062 with mock.patch('os.path.realpath', | |
| 1063 return_value='/non/existent') as m: | |
| 1064 with self.assertRaises(FileNotFoundError): | |
| 1065 psutil.disk_partitions() | |
| 1066 assert m.called | |
| 1067 finally: | |
| 1068 psutil.PROCFS_PATH = "/proc" | |
| 1069 | |
| 1070 | |
| 1071 @unittest.skipIf(not LINUX, "LINUX only") | |
| 1072 class TestSystemDiskIoCounters(unittest.TestCase): | |
| 1073 | |
| 1074 def test_emulate_kernel_2_4(self): | |
| 1075 # Tests /proc/diskstats parsing format for 2.4 kernels, see: | |
| 1076 # https://github.com/giampaolo/psutil/issues/767 | |
| 1077 with mock_open_content( | |
| 1078 '/proc/diskstats', | |
| 1079 " 3 0 1 hda 2 3 4 5 6 7 8 9 10 11 12"): | |
| 1080 with mock.patch('psutil._pslinux.is_storage_device', | |
| 1081 return_value=True): | |
| 1082 ret = psutil.disk_io_counters(nowrap=False) | |
| 1083 self.assertEqual(ret.read_count, 1) | |
| 1084 self.assertEqual(ret.read_merged_count, 2) | |
| 1085 self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE) | |
| 1086 self.assertEqual(ret.read_time, 4) | |
| 1087 self.assertEqual(ret.write_count, 5) | |
| 1088 self.assertEqual(ret.write_merged_count, 6) | |
| 1089 self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE) | |
| 1090 self.assertEqual(ret.write_time, 8) | |
| 1091 self.assertEqual(ret.busy_time, 10) | |
| 1092 | |
| 1093 def test_emulate_kernel_2_6_full(self): | |
| 1094 # Tests /proc/diskstats parsing format for 2.6 kernels, | |
| 1095 # lines reporting all metrics: | |
| 1096 # https://github.com/giampaolo/psutil/issues/767 | |
| 1097 with mock_open_content( | |
| 1098 '/proc/diskstats', | |
| 1099 " 3 0 hda 1 2 3 4 5 6 7 8 9 10 11"): | |
| 1100 with mock.patch('psutil._pslinux.is_storage_device', | |
| 1101 return_value=True): | |
| 1102 ret = psutil.disk_io_counters(nowrap=False) | |
| 1103 self.assertEqual(ret.read_count, 1) | |
| 1104 self.assertEqual(ret.read_merged_count, 2) | |
| 1105 self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE) | |
| 1106 self.assertEqual(ret.read_time, 4) | |
| 1107 self.assertEqual(ret.write_count, 5) | |
| 1108 self.assertEqual(ret.write_merged_count, 6) | |
| 1109 self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE) | |
| 1110 self.assertEqual(ret.write_time, 8) | |
| 1111 self.assertEqual(ret.busy_time, 10) | |
| 1112 | |
| 1113 def test_emulate_kernel_2_6_limited(self): | |
| 1114 # Tests /proc/diskstats parsing format for 2.6 kernels, | |
| 1115 # where one line of /proc/partitions return a limited | |
| 1116 # amount of metrics when it bumps into a partition | |
| 1117 # (instead of a disk). See: | |
| 1118 # https://github.com/giampaolo/psutil/issues/767 | |
| 1119 with mock_open_content( | |
| 1120 '/proc/diskstats', | |
| 1121 " 3 1 hda 1 2 3 4"): | |
| 1122 with mock.patch('psutil._pslinux.is_storage_device', | |
| 1123 return_value=True): | |
| 1124 ret = psutil.disk_io_counters(nowrap=False) | |
| 1125 self.assertEqual(ret.read_count, 1) | |
| 1126 self.assertEqual(ret.read_bytes, 2 * SECTOR_SIZE) | |
| 1127 self.assertEqual(ret.write_count, 3) | |
| 1128 self.assertEqual(ret.write_bytes, 4 * SECTOR_SIZE) | |
| 1129 | |
| 1130 self.assertEqual(ret.read_merged_count, 0) | |
| 1131 self.assertEqual(ret.read_time, 0) | |
| 1132 self.assertEqual(ret.write_merged_count, 0) | |
| 1133 self.assertEqual(ret.write_time, 0) | |
| 1134 self.assertEqual(ret.busy_time, 0) | |
| 1135 | |
| 1136 def test_emulate_include_partitions(self): | |
| 1137 # Make sure that when perdisk=True disk partitions are returned, | |
| 1138 # see: | |
| 1139 # https://github.com/giampaolo/psutil/pull/1313#issuecomment-408626842 | |
| 1140 with mock_open_content( | |
| 1141 '/proc/diskstats', | |
| 1142 textwrap.dedent("""\ | |
| 1143 3 0 nvme0n1 1 2 3 4 5 6 7 8 9 10 11 | |
| 1144 3 0 nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11 | |
| 1145 """)): | |
| 1146 with mock.patch('psutil._pslinux.is_storage_device', | |
| 1147 return_value=False): | |
| 1148 ret = psutil.disk_io_counters(perdisk=True, nowrap=False) | |
| 1149 self.assertEqual(len(ret), 2) | |
| 1150 self.assertEqual(ret['nvme0n1'].read_count, 1) | |
| 1151 self.assertEqual(ret['nvme0n1p1'].read_count, 1) | |
| 1152 self.assertEqual(ret['nvme0n1'].write_count, 5) | |
| 1153 self.assertEqual(ret['nvme0n1p1'].write_count, 5) | |
| 1154 | |
| 1155 def test_emulate_exclude_partitions(self): | |
| 1156 # Make sure that when perdisk=False partitions (e.g. 'sda1', | |
| 1157 # 'nvme0n1p1') are skipped and not included in the total count. | |
| 1158 # https://github.com/giampaolo/psutil/pull/1313#issuecomment-408626842 | |
| 1159 with mock_open_content( | |
| 1160 '/proc/diskstats', | |
| 1161 textwrap.dedent("""\ | |
| 1162 3 0 nvme0n1 1 2 3 4 5 6 7 8 9 10 11 | |
| 1163 3 0 nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11 | |
| 1164 """)): | |
| 1165 with mock.patch('psutil._pslinux.is_storage_device', | |
| 1166 return_value=False): | |
| 1167 ret = psutil.disk_io_counters(perdisk=False, nowrap=False) | |
| 1168 self.assertIsNone(ret) | |
| 1169 | |
| 1170 # | |
| 1171 def is_storage_device(name): | |
| 1172 return name == 'nvme0n1' | |
| 1173 | |
| 1174 with mock_open_content( | |
| 1175 '/proc/diskstats', | |
| 1176 textwrap.dedent("""\ | |
| 1177 3 0 nvme0n1 1 2 3 4 5 6 7 8 9 10 11 | |
| 1178 3 0 nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11 | |
| 1179 """)): | |
| 1180 with mock.patch('psutil._pslinux.is_storage_device', | |
| 1181 create=True, side_effect=is_storage_device): | |
| 1182 ret = psutil.disk_io_counters(perdisk=False, nowrap=False) | |
| 1183 self.assertEqual(ret.read_count, 1) | |
| 1184 self.assertEqual(ret.write_count, 5) | |
| 1185 | |
| 1186 def test_emulate_use_sysfs(self): | |
| 1187 def exists(path): | |
| 1188 if path == '/proc/diskstats': | |
| 1189 return False | |
| 1190 return True | |
| 1191 | |
| 1192 wprocfs = psutil.disk_io_counters(perdisk=True) | |
| 1193 with mock.patch('psutil._pslinux.os.path.exists', | |
| 1194 create=True, side_effect=exists): | |
| 1195 wsysfs = psutil.disk_io_counters(perdisk=True) | |
| 1196 self.assertEqual(len(wprocfs), len(wsysfs)) | |
| 1197 | |
| 1198 def test_emulate_not_impl(self): | |
| 1199 def exists(path): | |
| 1200 return False | |
| 1201 | |
| 1202 with mock.patch('psutil._pslinux.os.path.exists', | |
| 1203 create=True, side_effect=exists): | |
| 1204 self.assertRaises(NotImplementedError, psutil.disk_io_counters) | |
| 1205 | |
| 1206 | |
| 1207 # ===================================================================== | |
| 1208 # --- misc | |
| 1209 # ===================================================================== | |
| 1210 | |
| 1211 | |
| 1212 @unittest.skipIf(not LINUX, "LINUX only") | |
| 1213 class TestMisc(unittest.TestCase): | |
| 1214 | |
| 1215 def test_boot_time(self): | |
| 1216 vmstat_value = vmstat('boot time') | |
| 1217 psutil_value = psutil.boot_time() | |
| 1218 self.assertEqual(int(vmstat_value), int(psutil_value)) | |
| 1219 | |
| 1220 def test_no_procfs_on_import(self): | |
| 1221 my_procfs = tempfile.mkdtemp() | |
| 1222 | |
| 1223 with open(os.path.join(my_procfs, 'stat'), 'w') as f: | |
| 1224 f.write('cpu 0 0 0 0 0 0 0 0 0 0\n') | |
| 1225 f.write('cpu0 0 0 0 0 0 0 0 0 0 0\n') | |
| 1226 f.write('cpu1 0 0 0 0 0 0 0 0 0 0\n') | |
| 1227 | |
| 1228 try: | |
| 1229 orig_open = open | |
| 1230 | |
| 1231 def open_mock(name, *args, **kwargs): | |
| 1232 if name.startswith('/proc'): | |
| 1233 raise IOError(errno.ENOENT, 'rejecting access for test') | |
| 1234 return orig_open(name, *args, **kwargs) | |
| 1235 | |
| 1236 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
| 1237 with mock.patch(patch_point, side_effect=open_mock): | |
| 1238 reload_module(psutil) | |
| 1239 | |
| 1240 self.assertRaises(IOError, psutil.cpu_times) | |
| 1241 self.assertRaises(IOError, psutil.cpu_times, percpu=True) | |
| 1242 self.assertRaises(IOError, psutil.cpu_percent) | |
| 1243 self.assertRaises(IOError, psutil.cpu_percent, percpu=True) | |
| 1244 self.assertRaises(IOError, psutil.cpu_times_percent) | |
| 1245 self.assertRaises( | |
| 1246 IOError, psutil.cpu_times_percent, percpu=True) | |
| 1247 | |
| 1248 psutil.PROCFS_PATH = my_procfs | |
| 1249 | |
| 1250 self.assertEqual(psutil.cpu_percent(), 0) | |
| 1251 self.assertEqual(sum(psutil.cpu_times_percent()), 0) | |
| 1252 | |
| 1253 # since we don't know the number of CPUs at import time, | |
| 1254 # we awkwardly say there are none until the second call | |
| 1255 per_cpu_percent = psutil.cpu_percent(percpu=True) | |
| 1256 self.assertEqual(sum(per_cpu_percent), 0) | |
| 1257 | |
| 1258 # ditto awkward length | |
| 1259 per_cpu_times_percent = psutil.cpu_times_percent(percpu=True) | |
| 1260 self.assertEqual(sum(map(sum, per_cpu_times_percent)), 0) | |
| 1261 | |
| 1262 # much user, very busy | |
| 1263 with open(os.path.join(my_procfs, 'stat'), 'w') as f: | |
| 1264 f.write('cpu 1 0 0 0 0 0 0 0 0 0\n') | |
| 1265 f.write('cpu0 1 0 0 0 0 0 0 0 0 0\n') | |
| 1266 f.write('cpu1 1 0 0 0 0 0 0 0 0 0\n') | |
| 1267 | |
| 1268 self.assertNotEqual(psutil.cpu_percent(), 0) | |
| 1269 self.assertNotEqual( | |
| 1270 sum(psutil.cpu_percent(percpu=True)), 0) | |
| 1271 self.assertNotEqual(sum(psutil.cpu_times_percent()), 0) | |
| 1272 self.assertNotEqual( | |
| 1273 sum(map(sum, psutil.cpu_times_percent(percpu=True))), 0) | |
| 1274 finally: | |
| 1275 shutil.rmtree(my_procfs) | |
| 1276 reload_module(psutil) | |
| 1277 | |
| 1278 self.assertEqual(psutil.PROCFS_PATH, '/proc') | |
| 1279 | |
| 1280 def test_cpu_steal_decrease(self): | |
| 1281 # Test cumulative cpu stats decrease. We should ignore this. | |
| 1282 # See issue #1210. | |
| 1283 with mock_open_content( | |
| 1284 "/proc/stat", | |
| 1285 textwrap.dedent("""\ | |
| 1286 cpu 0 0 0 0 0 0 0 1 0 0 | |
| 1287 cpu0 0 0 0 0 0 0 0 1 0 0 | |
| 1288 cpu1 0 0 0 0 0 0 0 1 0 0 | |
| 1289 """).encode()) as m: | |
| 1290 # first call to "percent" functions should read the new stat file | |
| 1291 # and compare to the "real" file read at import time - so the | |
| 1292 # values are meaningless | |
| 1293 psutil.cpu_percent() | |
| 1294 assert m.called | |
| 1295 psutil.cpu_percent(percpu=True) | |
| 1296 psutil.cpu_times_percent() | |
| 1297 psutil.cpu_times_percent(percpu=True) | |
| 1298 | |
| 1299 with mock_open_content( | |
| 1300 "/proc/stat", | |
| 1301 textwrap.dedent("""\ | |
| 1302 cpu 1 0 0 0 0 0 0 0 0 0 | |
| 1303 cpu0 1 0 0 0 0 0 0 0 0 0 | |
| 1304 cpu1 1 0 0 0 0 0 0 0 0 0 | |
| 1305 """).encode()) as m: | |
| 1306 # Increase "user" while steal goes "backwards" to zero. | |
| 1307 cpu_percent = psutil.cpu_percent() | |
| 1308 assert m.called | |
| 1309 cpu_percent_percpu = psutil.cpu_percent(percpu=True) | |
| 1310 cpu_times_percent = psutil.cpu_times_percent() | |
| 1311 cpu_times_percent_percpu = psutil.cpu_times_percent(percpu=True) | |
| 1312 self.assertNotEqual(cpu_percent, 0) | |
| 1313 self.assertNotEqual(sum(cpu_percent_percpu), 0) | |
| 1314 self.assertNotEqual(sum(cpu_times_percent), 0) | |
| 1315 self.assertNotEqual(sum(cpu_times_percent), 100.0) | |
| 1316 self.assertNotEqual(sum(map(sum, cpu_times_percent_percpu)), 0) | |
| 1317 self.assertNotEqual(sum(map(sum, cpu_times_percent_percpu)), 100.0) | |
| 1318 self.assertEqual(cpu_times_percent.steal, 0) | |
| 1319 self.assertNotEqual(cpu_times_percent.user, 0) | |
| 1320 | |
| 1321 def test_boot_time_mocked(self): | |
| 1322 with mock.patch('psutil._common.open', create=True) as m: | |
| 1323 self.assertRaises( | |
| 1324 RuntimeError, | |
| 1325 psutil._pslinux.boot_time) | |
| 1326 assert m.called | |
| 1327 | |
| 1328 def test_users_mocked(self): | |
| 1329 # Make sure ':0' and ':0.0' (returned by C ext) are converted | |
| 1330 # to 'localhost'. | |
| 1331 with mock.patch('psutil._pslinux.cext.users', | |
| 1332 return_value=[('giampaolo', 'pts/2', ':0', | |
| 1333 1436573184.0, True, 2)]) as m: | |
| 1334 self.assertEqual(psutil.users()[0].host, 'localhost') | |
| 1335 assert m.called | |
| 1336 with mock.patch('psutil._pslinux.cext.users', | |
| 1337 return_value=[('giampaolo', 'pts/2', ':0.0', | |
| 1338 1436573184.0, True, 2)]) as m: | |
| 1339 self.assertEqual(psutil.users()[0].host, 'localhost') | |
| 1340 assert m.called | |
| 1341 # ...otherwise it should be returned as-is | |
| 1342 with mock.patch('psutil._pslinux.cext.users', | |
| 1343 return_value=[('giampaolo', 'pts/2', 'foo', | |
| 1344 1436573184.0, True, 2)]) as m: | |
| 1345 self.assertEqual(psutil.users()[0].host, 'foo') | |
| 1346 assert m.called | |
| 1347 | |
| 1348 def test_procfs_path(self): | |
| 1349 tdir = tempfile.mkdtemp() | |
| 1350 try: | |
| 1351 psutil.PROCFS_PATH = tdir | |
| 1352 self.assertRaises(IOError, psutil.virtual_memory) | |
| 1353 self.assertRaises(IOError, psutil.cpu_times) | |
| 1354 self.assertRaises(IOError, psutil.cpu_times, percpu=True) | |
| 1355 self.assertRaises(IOError, psutil.boot_time) | |
| 1356 # self.assertRaises(IOError, psutil.pids) | |
| 1357 self.assertRaises(IOError, psutil.net_connections) | |
| 1358 self.assertRaises(IOError, psutil.net_io_counters) | |
| 1359 self.assertRaises(IOError, psutil.net_if_stats) | |
| 1360 # self.assertRaises(IOError, psutil.disk_io_counters) | |
| 1361 self.assertRaises(IOError, psutil.disk_partitions) | |
| 1362 self.assertRaises(psutil.NoSuchProcess, psutil.Process) | |
| 1363 finally: | |
| 1364 psutil.PROCFS_PATH = "/proc" | |
| 1365 os.rmdir(tdir) | |
| 1366 | |
| 1367 def test_issue_687(self): | |
| 1368 # In case of thread ID: | |
| 1369 # - pid_exists() is supposed to return False | |
| 1370 # - Process(tid) is supposed to work | |
| 1371 # - pids() should not return the TID | |
| 1372 # See: https://github.com/giampaolo/psutil/issues/687 | |
| 1373 t = ThreadTask() | |
| 1374 t.start() | |
| 1375 try: | |
| 1376 p = psutil.Process() | |
| 1377 tid = p.threads()[1].id | |
| 1378 assert not psutil.pid_exists(tid), tid | |
| 1379 pt = psutil.Process(tid) | |
| 1380 pt.as_dict() | |
| 1381 self.assertNotIn(tid, psutil.pids()) | |
| 1382 finally: | |
| 1383 t.stop() | |
| 1384 | |
| 1385 def test_pid_exists_no_proc_status(self): | |
| 1386 # Internally pid_exists relies on /proc/{pid}/status. | |
| 1387 # Emulate a case where this file is empty in which case | |
| 1388 # psutil is supposed to fall back on using pids(). | |
| 1389 with mock_open_content("/proc/%s/status", "") as m: | |
| 1390 assert psutil.pid_exists(os.getpid()) | |
| 1391 assert m.called | |
| 1392 | |
| 1393 | |
| 1394 # ===================================================================== | |
| 1395 # --- sensors | |
| 1396 # ===================================================================== | |
| 1397 | |
| 1398 | |
| 1399 @unittest.skipIf(not LINUX, "LINUX only") | |
| 1400 @unittest.skipIf(not HAS_BATTERY, "no battery") | |
| 1401 class TestSensorsBattery(unittest.TestCase): | |
| 1402 | |
| 1403 @unittest.skipIf(not which("acpi"), "acpi utility not available") | |
| 1404 def test_percent(self): | |
| 1405 out = sh("acpi -b") | |
| 1406 acpi_value = int(out.split(",")[1].strip().replace('%', '')) | |
| 1407 psutil_value = psutil.sensors_battery().percent | |
| 1408 self.assertAlmostEqual(acpi_value, psutil_value, delta=1) | |
| 1409 | |
| 1410 @unittest.skipIf(not which("acpi"), "acpi utility not available") | |
| 1411 def test_power_plugged(self): | |
| 1412 out = sh("acpi -b") | |
| 1413 if 'unknown' in out.lower(): | |
| 1414 return unittest.skip("acpi output not reliable") | |
| 1415 if 'discharging at zero rate' in out: | |
| 1416 plugged = True | |
| 1417 else: | |
| 1418 plugged = "Charging" in out.split('\n')[0] | |
| 1419 self.assertEqual(psutil.sensors_battery().power_plugged, plugged) | |
| 1420 | |
| 1421 def test_emulate_power_plugged(self): | |
| 1422 # Pretend the AC power cable is connected. | |
| 1423 def open_mock(name, *args, **kwargs): | |
| 1424 if name.endswith("AC0/online") or name.endswith("AC/online"): | |
| 1425 return io.BytesIO(b"1") | |
| 1426 else: | |
| 1427 return orig_open(name, *args, **kwargs) | |
| 1428 | |
| 1429 orig_open = open | |
| 1430 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
| 1431 with mock.patch(patch_point, side_effect=open_mock) as m: | |
| 1432 self.assertEqual(psutil.sensors_battery().power_plugged, True) | |
| 1433 self.assertEqual( | |
| 1434 psutil.sensors_battery().secsleft, psutil.POWER_TIME_UNLIMITED) | |
| 1435 assert m.called | |
| 1436 | |
| 1437 def test_emulate_power_plugged_2(self): | |
| 1438 # Same as above but pretend /AC0/online does not exist in which | |
| 1439 # case code relies on /status file. | |
| 1440 def open_mock(name, *args, **kwargs): | |
| 1441 if name.endswith("AC0/online") or name.endswith("AC/online"): | |
| 1442 raise IOError(errno.ENOENT, "") | |
| 1443 elif name.endswith("/status"): | |
| 1444 return io.StringIO(u("charging")) | |
| 1445 else: | |
| 1446 return orig_open(name, *args, **kwargs) | |
| 1447 | |
| 1448 orig_open = open | |
| 1449 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
| 1450 with mock.patch(patch_point, side_effect=open_mock) as m: | |
| 1451 self.assertEqual(psutil.sensors_battery().power_plugged, True) | |
| 1452 assert m.called | |
| 1453 | |
| 1454 def test_emulate_power_not_plugged(self): | |
| 1455 # Pretend the AC power cable is not connected. | |
| 1456 def open_mock(name, *args, **kwargs): | |
| 1457 if name.endswith("AC0/online") or name.endswith("AC/online"): | |
| 1458 return io.BytesIO(b"0") | |
| 1459 else: | |
| 1460 return orig_open(name, *args, **kwargs) | |
| 1461 | |
| 1462 orig_open = open | |
| 1463 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
| 1464 with mock.patch(patch_point, side_effect=open_mock) as m: | |
| 1465 self.assertEqual(psutil.sensors_battery().power_plugged, False) | |
| 1466 assert m.called | |
| 1467 | |
| 1468 def test_emulate_power_not_plugged_2(self): | |
| 1469 # Same as above but pretend /AC0/online does not exist in which | |
| 1470 # case code relies on /status file. | |
| 1471 def open_mock(name, *args, **kwargs): | |
| 1472 if name.endswith("AC0/online") or name.endswith("AC/online"): | |
| 1473 raise IOError(errno.ENOENT, "") | |
| 1474 elif name.endswith("/status"): | |
| 1475 return io.StringIO(u("discharging")) | |
| 1476 else: | |
| 1477 return orig_open(name, *args, **kwargs) | |
| 1478 | |
| 1479 orig_open = open | |
| 1480 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
| 1481 with mock.patch(patch_point, side_effect=open_mock) as m: | |
| 1482 self.assertEqual(psutil.sensors_battery().power_plugged, False) | |
| 1483 assert m.called | |
| 1484 | |
| 1485 def test_emulate_power_undetermined(self): | |
| 1486 # Pretend we can't know whether the AC power cable not | |
| 1487 # connected (assert fallback to False). | |
| 1488 def open_mock(name, *args, **kwargs): | |
| 1489 if name.startswith("/sys/class/power_supply/AC0/online") or \ | |
| 1490 name.startswith("/sys/class/power_supply/AC/online"): | |
| 1491 raise IOError(errno.ENOENT, "") | |
| 1492 elif name.startswith("/sys/class/power_supply/BAT0/status"): | |
| 1493 return io.BytesIO(b"???") | |
| 1494 else: | |
| 1495 return orig_open(name, *args, **kwargs) | |
| 1496 | |
| 1497 orig_open = open | |
| 1498 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
| 1499 with mock.patch(patch_point, side_effect=open_mock) as m: | |
| 1500 self.assertIsNone(psutil.sensors_battery().power_plugged) | |
| 1501 assert m.called | |
| 1502 | |
| 1503 def test_emulate_no_base_files(self): | |
| 1504 # Emulate a case where base metrics files are not present, | |
| 1505 # in which case we're supposed to get None. | |
| 1506 with mock_open_exception( | |
| 1507 "/sys/class/power_supply/BAT0/energy_now", | |
| 1508 IOError(errno.ENOENT, "")): | |
| 1509 with mock_open_exception( | |
| 1510 "/sys/class/power_supply/BAT0/charge_now", | |
| 1511 IOError(errno.ENOENT, "")): | |
| 1512 self.assertIsNone(psutil.sensors_battery()) | |
| 1513 | |
| 1514 def test_emulate_energy_full_0(self): | |
| 1515 # Emulate a case where energy_full files returns 0. | |
| 1516 with mock_open_content( | |
| 1517 "/sys/class/power_supply/BAT0/energy_full", b"0") as m: | |
| 1518 self.assertEqual(psutil.sensors_battery().percent, 0) | |
| 1519 assert m.called | |
| 1520 | |
| 1521 def test_emulate_energy_full_not_avail(self): | |
| 1522 # Emulate a case where energy_full file does not exist. | |
| 1523 # Expected fallback on /capacity. | |
| 1524 with mock_open_exception( | |
| 1525 "/sys/class/power_supply/BAT0/energy_full", | |
| 1526 IOError(errno.ENOENT, "")): | |
| 1527 with mock_open_exception( | |
| 1528 "/sys/class/power_supply/BAT0/charge_full", | |
| 1529 IOError(errno.ENOENT, "")): | |
| 1530 with mock_open_content( | |
| 1531 "/sys/class/power_supply/BAT0/capacity", b"88"): | |
| 1532 self.assertEqual(psutil.sensors_battery().percent, 88) | |
| 1533 | |
| 1534 def test_emulate_no_power(self): | |
| 1535 # Emulate a case where /AC0/online file nor /BAT0/status exist. | |
| 1536 with mock_open_exception( | |
| 1537 "/sys/class/power_supply/AC/online", | |
| 1538 IOError(errno.ENOENT, "")): | |
| 1539 with mock_open_exception( | |
| 1540 "/sys/class/power_supply/AC0/online", | |
| 1541 IOError(errno.ENOENT, "")): | |
| 1542 with mock_open_exception( | |
| 1543 "/sys/class/power_supply/BAT0/status", | |
| 1544 IOError(errno.ENOENT, "")): | |
| 1545 self.assertIsNone(psutil.sensors_battery().power_plugged) | |
| 1546 | |
| 1547 | |
| 1548 @unittest.skipIf(not LINUX, "LINUX only") | |
| 1549 class TestSensorsTemperatures(unittest.TestCase): | |
| 1550 | |
| 1551 def test_emulate_class_hwmon(self): | |
| 1552 def open_mock(name, *args, **kwargs): | |
| 1553 if name.endswith('/name'): | |
| 1554 return io.StringIO(u("name")) | |
| 1555 elif name.endswith('/temp1_label'): | |
| 1556 return io.StringIO(u("label")) | |
| 1557 elif name.endswith('/temp1_input'): | |
| 1558 return io.BytesIO(b"30000") | |
| 1559 elif name.endswith('/temp1_max'): | |
| 1560 return io.BytesIO(b"40000") | |
| 1561 elif name.endswith('/temp1_crit'): | |
| 1562 return io.BytesIO(b"50000") | |
| 1563 else: | |
| 1564 return orig_open(name, *args, **kwargs) | |
| 1565 | |
| 1566 orig_open = open | |
| 1567 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
| 1568 with mock.patch(patch_point, side_effect=open_mock): | |
| 1569 # Test case with /sys/class/hwmon | |
| 1570 with mock.patch('glob.glob', | |
| 1571 return_value=['/sys/class/hwmon/hwmon0/temp1']): | |
| 1572 temp = psutil.sensors_temperatures()['name'][0] | |
| 1573 self.assertEqual(temp.label, 'label') | |
| 1574 self.assertEqual(temp.current, 30.0) | |
| 1575 self.assertEqual(temp.high, 40.0) | |
| 1576 self.assertEqual(temp.critical, 50.0) | |
| 1577 | |
| 1578 def test_emulate_class_thermal(self): | |
| 1579 def open_mock(name, *args, **kwargs): | |
| 1580 if name.endswith('0_temp'): | |
| 1581 return io.BytesIO(b"50000") | |
| 1582 elif name.endswith('temp'): | |
| 1583 return io.BytesIO(b"30000") | |
| 1584 elif name.endswith('0_type'): | |
| 1585 return io.StringIO(u("critical")) | |
| 1586 elif name.endswith('type'): | |
| 1587 return io.StringIO(u("name")) | |
| 1588 else: | |
| 1589 return orig_open(name, *args, **kwargs) | |
| 1590 | |
| 1591 def glob_mock(path): | |
| 1592 if path == '/sys/class/hwmon/hwmon*/temp*_*': | |
| 1593 return [] | |
| 1594 elif path == '/sys/class/hwmon/hwmon*/device/temp*_*': | |
| 1595 return [] | |
| 1596 elif path == '/sys/class/thermal/thermal_zone*': | |
| 1597 return ['/sys/class/thermal/thermal_zone0'] | |
| 1598 elif path == '/sys/class/thermal/thermal_zone0/trip_point*': | |
| 1599 return ['/sys/class/thermal/thermal_zone1/trip_point_0_type', | |
| 1600 '/sys/class/thermal/thermal_zone1/trip_point_0_temp'] | |
| 1601 return [] | |
| 1602 | |
| 1603 orig_open = open | |
| 1604 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
| 1605 with mock.patch(patch_point, side_effect=open_mock): | |
| 1606 with mock.patch('glob.glob', create=True, side_effect=glob_mock): | |
| 1607 temp = psutil.sensors_temperatures()['name'][0] | |
| 1608 self.assertEqual(temp.label, '') | |
| 1609 self.assertEqual(temp.current, 30.0) | |
| 1610 self.assertEqual(temp.high, 50.0) | |
| 1611 self.assertEqual(temp.critical, 50.0) | |
| 1612 | |
| 1613 | |
| 1614 @unittest.skipIf(not LINUX, "LINUX only") | |
| 1615 class TestSensorsFans(unittest.TestCase): | |
| 1616 | |
| 1617 def test_emulate_data(self): | |
| 1618 def open_mock(name, *args, **kwargs): | |
| 1619 if name.endswith('/name'): | |
| 1620 return io.StringIO(u("name")) | |
| 1621 elif name.endswith('/fan1_label'): | |
| 1622 return io.StringIO(u("label")) | |
| 1623 elif name.endswith('/fan1_input'): | |
| 1624 return io.StringIO(u("2000")) | |
| 1625 else: | |
| 1626 return orig_open(name, *args, **kwargs) | |
| 1627 | |
| 1628 orig_open = open | |
| 1629 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
| 1630 with mock.patch(patch_point, side_effect=open_mock): | |
| 1631 with mock.patch('glob.glob', | |
| 1632 return_value=['/sys/class/hwmon/hwmon2/fan1']): | |
| 1633 fan = psutil.sensors_fans()['name'][0] | |
| 1634 self.assertEqual(fan.label, 'label') | |
| 1635 self.assertEqual(fan.current, 2000) | |
| 1636 | |
| 1637 | |
| 1638 # ===================================================================== | |
| 1639 # --- test process | |
| 1640 # ===================================================================== | |
| 1641 | |
| 1642 | |
| 1643 @unittest.skipIf(not LINUX, "LINUX only") | |
| 1644 class TestProcess(unittest.TestCase): | |
| 1645 | |
| 1646 def setUp(self): | |
| 1647 safe_rmpath(TESTFN) | |
| 1648 | |
| 1649 tearDown = setUp | |
| 1650 | |
| 1651 def test_memory_full_info(self): | |
| 1652 src = textwrap.dedent(""" | |
| 1653 import time | |
| 1654 with open("%s", "w") as f: | |
| 1655 time.sleep(10) | |
| 1656 """ % TESTFN) | |
| 1657 sproc = pyrun(src) | |
| 1658 self.addCleanup(reap_children) | |
| 1659 call_until(lambda: os.listdir('.'), "'%s' not in ret" % TESTFN) | |
| 1660 p = psutil.Process(sproc.pid) | |
| 1661 time.sleep(.1) | |
| 1662 mem = p.memory_full_info() | |
| 1663 maps = p.memory_maps(grouped=False) | |
| 1664 self.assertAlmostEqual( | |
| 1665 mem.uss, sum([x.private_dirty + x.private_clean for x in maps]), | |
| 1666 delta=4096) | |
| 1667 self.assertAlmostEqual( | |
| 1668 mem.pss, sum([x.pss for x in maps]), delta=4096) | |
| 1669 self.assertAlmostEqual( | |
| 1670 mem.swap, sum([x.swap for x in maps]), delta=4096) | |
| 1671 | |
| 1672 def test_memory_full_info_mocked(self): | |
| 1673 # See: https://github.com/giampaolo/psutil/issues/1222 | |
| 1674 with mock_open_content( | |
| 1675 "/proc/%s/smaps" % os.getpid(), | |
| 1676 textwrap.dedent("""\ | |
| 1677 fffff0 r-xp 00000000 00:00 0 [vsyscall] | |
| 1678 Size: 1 kB | |
| 1679 Rss: 2 kB | |
| 1680 Pss: 3 kB | |
| 1681 Shared_Clean: 4 kB | |
| 1682 Shared_Dirty: 5 kB | |
| 1683 Private_Clean: 6 kB | |
| 1684 Private_Dirty: 7 kB | |
| 1685 Referenced: 8 kB | |
| 1686 Anonymous: 9 kB | |
| 1687 LazyFree: 10 kB | |
| 1688 AnonHugePages: 11 kB | |
| 1689 ShmemPmdMapped: 12 kB | |
| 1690 Shared_Hugetlb: 13 kB | |
| 1691 Private_Hugetlb: 14 kB | |
| 1692 Swap: 15 kB | |
| 1693 SwapPss: 16 kB | |
| 1694 KernelPageSize: 17 kB | |
| 1695 MMUPageSize: 18 kB | |
| 1696 Locked: 19 kB | |
| 1697 VmFlags: rd ex | |
| 1698 """).encode()) as m: | |
| 1699 p = psutil.Process() | |
| 1700 mem = p.memory_full_info() | |
| 1701 assert m.called | |
| 1702 self.assertEqual(mem.uss, (6 + 7 + 14) * 1024) | |
| 1703 self.assertEqual(mem.pss, 3 * 1024) | |
| 1704 self.assertEqual(mem.swap, 15 * 1024) | |
| 1705 | |
| 1706 # On PYPY file descriptors are not closed fast enough. | |
| 1707 @unittest.skipIf(PYPY, "unreliable on PYPY") | |
| 1708 def test_open_files_mode(self): | |
| 1709 def get_test_file(): | |
| 1710 p = psutil.Process() | |
| 1711 giveup_at = time.time() + 2 | |
| 1712 while True: | |
| 1713 for file in p.open_files(): | |
| 1714 if file.path == os.path.abspath(TESTFN): | |
| 1715 return file | |
| 1716 elif time.time() > giveup_at: | |
| 1717 break | |
| 1718 raise RuntimeError("timeout looking for test file") | |
| 1719 | |
| 1720 # | |
| 1721 with open(TESTFN, "w"): | |
| 1722 self.assertEqual(get_test_file().mode, "w") | |
| 1723 with open(TESTFN, "r"): | |
| 1724 self.assertEqual(get_test_file().mode, "r") | |
| 1725 with open(TESTFN, "a"): | |
| 1726 self.assertEqual(get_test_file().mode, "a") | |
| 1727 # | |
| 1728 with open(TESTFN, "r+"): | |
| 1729 self.assertEqual(get_test_file().mode, "r+") | |
| 1730 with open(TESTFN, "w+"): | |
| 1731 self.assertEqual(get_test_file().mode, "r+") | |
| 1732 with open(TESTFN, "a+"): | |
| 1733 self.assertEqual(get_test_file().mode, "a+") | |
| 1734 # note: "x" bit is not supported | |
| 1735 if PY3: | |
| 1736 safe_rmpath(TESTFN) | |
| 1737 with open(TESTFN, "x"): | |
| 1738 self.assertEqual(get_test_file().mode, "w") | |
| 1739 safe_rmpath(TESTFN) | |
| 1740 with open(TESTFN, "x+"): | |
| 1741 self.assertEqual(get_test_file().mode, "r+") | |
| 1742 | |
| 1743 def test_open_files_file_gone(self): | |
| 1744 # simulates a file which gets deleted during open_files() | |
| 1745 # execution | |
| 1746 p = psutil.Process() | |
| 1747 files = p.open_files() | |
| 1748 with tempfile.NamedTemporaryFile(): | |
| 1749 # give the kernel some time to see the new file | |
| 1750 call_until(p.open_files, "len(ret) != %i" % len(files)) | |
| 1751 with mock.patch('psutil._pslinux.os.readlink', | |
| 1752 side_effect=OSError(errno.ENOENT, "")) as m: | |
| 1753 files = p.open_files() | |
| 1754 assert not files | |
| 1755 assert m.called | |
| 1756 # also simulate the case where os.readlink() returns EINVAL | |
| 1757 # in which case psutil is supposed to 'continue' | |
| 1758 with mock.patch('psutil._pslinux.os.readlink', | |
| 1759 side_effect=OSError(errno.EINVAL, "")) as m: | |
| 1760 self.assertEqual(p.open_files(), []) | |
| 1761 assert m.called | |
| 1762 | |
| 1763 def test_open_files_fd_gone(self): | |
| 1764 # Simulate a case where /proc/{pid}/fdinfo/{fd} disappears | |
| 1765 # while iterating through fds. | |
| 1766 # https://travis-ci.org/giampaolo/psutil/jobs/225694530 | |
| 1767 p = psutil.Process() | |
| 1768 files = p.open_files() | |
| 1769 with tempfile.NamedTemporaryFile(): | |
| 1770 # give the kernel some time to see the new file | |
| 1771 call_until(p.open_files, "len(ret) != %i" % len(files)) | |
| 1772 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
| 1773 with mock.patch(patch_point, | |
| 1774 side_effect=IOError(errno.ENOENT, "")) as m: | |
| 1775 files = p.open_files() | |
| 1776 assert not files | |
| 1777 assert m.called | |
| 1778 | |
| 1779 # --- mocked tests | |
| 1780 | |
| 1781 def test_terminal_mocked(self): | |
| 1782 with mock.patch('psutil._pslinux._psposix.get_terminal_map', | |
| 1783 return_value={}) as m: | |
| 1784 self.assertIsNone(psutil._pslinux.Process(os.getpid()).terminal()) | |
| 1785 assert m.called | |
| 1786 | |
| 1787 # TODO: re-enable this test. | |
| 1788 # def test_num_ctx_switches_mocked(self): | |
| 1789 # with mock.patch('psutil._common.open', create=True) as m: | |
| 1790 # self.assertRaises( | |
| 1791 # NotImplementedError, | |
| 1792 # psutil._pslinux.Process(os.getpid()).num_ctx_switches) | |
| 1793 # assert m.called | |
| 1794 | |
| 1795 def test_cmdline_mocked(self): | |
| 1796 # see: https://github.com/giampaolo/psutil/issues/639 | |
| 1797 p = psutil.Process() | |
| 1798 fake_file = io.StringIO(u('foo\x00bar\x00')) | |
| 1799 with mock.patch('psutil._common.open', | |
| 1800 return_value=fake_file, create=True) as m: | |
| 1801 self.assertEqual(p.cmdline(), ['foo', 'bar']) | |
| 1802 assert m.called | |
| 1803 fake_file = io.StringIO(u('foo\x00bar\x00\x00')) | |
| 1804 with mock.patch('psutil._common.open', | |
| 1805 return_value=fake_file, create=True) as m: | |
| 1806 self.assertEqual(p.cmdline(), ['foo', 'bar', '']) | |
| 1807 assert m.called | |
| 1808 | |
| 1809 def test_cmdline_spaces_mocked(self): | |
| 1810 # see: https://github.com/giampaolo/psutil/issues/1179 | |
| 1811 p = psutil.Process() | |
| 1812 fake_file = io.StringIO(u('foo bar ')) | |
| 1813 with mock.patch('psutil._common.open', | |
| 1814 return_value=fake_file, create=True) as m: | |
| 1815 self.assertEqual(p.cmdline(), ['foo', 'bar']) | |
| 1816 assert m.called | |
| 1817 fake_file = io.StringIO(u('foo bar ')) | |
| 1818 with mock.patch('psutil._common.open', | |
| 1819 return_value=fake_file, create=True) as m: | |
| 1820 self.assertEqual(p.cmdline(), ['foo', 'bar', '']) | |
| 1821 assert m.called | |
| 1822 | |
| 1823 def test_cmdline_mixed_separators(self): | |
| 1824 # https://github.com/giampaolo/psutil/issues/ | |
| 1825 # 1179#issuecomment-552984549 | |
| 1826 p = psutil.Process() | |
| 1827 fake_file = io.StringIO(u('foo\x20bar\x00')) | |
| 1828 with mock.patch('psutil._common.open', | |
| 1829 return_value=fake_file, create=True) as m: | |
| 1830 self.assertEqual(p.cmdline(), ['foo', 'bar']) | |
| 1831 assert m.called | |
| 1832 | |
| 1833 def test_readlink_path_deleted_mocked(self): | |
| 1834 with mock.patch('psutil._pslinux.os.readlink', | |
| 1835 return_value='/home/foo (deleted)'): | |
| 1836 self.assertEqual(psutil.Process().exe(), "/home/foo") | |
| 1837 self.assertEqual(psutil.Process().cwd(), "/home/foo") | |
| 1838 | |
| 1839 def test_threads_mocked(self): | |
| 1840 # Test the case where os.listdir() returns a file (thread) | |
| 1841 # which no longer exists by the time we open() it (race | |
| 1842 # condition). threads() is supposed to ignore that instead | |
| 1843 # of raising NSP. | |
| 1844 def open_mock(name, *args, **kwargs): | |
| 1845 if name.startswith('/proc/%s/task' % os.getpid()): | |
| 1846 raise IOError(errno.ENOENT, "") | |
| 1847 else: | |
| 1848 return orig_open(name, *args, **kwargs) | |
| 1849 | |
| 1850 orig_open = open | |
| 1851 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
| 1852 with mock.patch(patch_point, side_effect=open_mock) as m: | |
| 1853 ret = psutil.Process().threads() | |
| 1854 assert m.called | |
| 1855 self.assertEqual(ret, []) | |
| 1856 | |
| 1857 # ...but if it bumps into something != ENOENT we want an | |
| 1858 # exception. | |
| 1859 def open_mock(name, *args, **kwargs): | |
| 1860 if name.startswith('/proc/%s/task' % os.getpid()): | |
| 1861 raise IOError(errno.EPERM, "") | |
| 1862 else: | |
| 1863 return orig_open(name, *args, **kwargs) | |
| 1864 | |
| 1865 with mock.patch(patch_point, side_effect=open_mock): | |
| 1866 self.assertRaises(psutil.AccessDenied, psutil.Process().threads) | |
| 1867 | |
| 1868 def test_exe_mocked(self): | |
| 1869 with mock.patch('psutil._pslinux.readlink', | |
| 1870 side_effect=OSError(errno.ENOENT, "")) as m1: | |
| 1871 with mock.patch('psutil.Process.cmdline', | |
| 1872 side_effect=psutil.AccessDenied(0, "")) as m2: | |
| 1873 # No such file error; might be raised also if /proc/pid/exe | |
| 1874 # path actually exists for system processes with low pids | |
| 1875 # (about 0-20). In this case psutil is supposed to return | |
| 1876 # an empty string. | |
| 1877 ret = psutil.Process().exe() | |
| 1878 assert m1.called | |
| 1879 assert m2.called | |
| 1880 self.assertEqual(ret, "") | |
| 1881 | |
| 1882 # ...but if /proc/pid no longer exist we're supposed to treat | |
| 1883 # it as an alias for zombie process | |
| 1884 with mock.patch('psutil._pslinux.os.path.lexists', | |
| 1885 return_value=False): | |
| 1886 self.assertRaises( | |
| 1887 psutil.ZombieProcess, psutil.Process().exe) | |
| 1888 | |
| 1889 def test_issue_1014(self): | |
| 1890 # Emulates a case where smaps file does not exist. In this case | |
| 1891 # wrap_exception decorator should not raise NoSuchProcess. | |
| 1892 with mock_open_exception( | |
| 1893 '/proc/%s/smaps' % os.getpid(), | |
| 1894 IOError(errno.ENOENT, "")) as m: | |
| 1895 p = psutil.Process() | |
| 1896 with self.assertRaises(FileNotFoundError): | |
| 1897 p.memory_maps() | |
| 1898 assert m.called | |
| 1899 | |
| 1900 @unittest.skipIf(not HAS_RLIMIT, "not supported") | |
| 1901 def test_rlimit_zombie(self): | |
| 1902 # Emulate a case where rlimit() raises ENOSYS, which may | |
| 1903 # happen in case of zombie process: | |
| 1904 # https://travis-ci.org/giampaolo/psutil/jobs/51368273 | |
| 1905 with mock.patch("psutil._pslinux.cext.linux_prlimit", | |
| 1906 side_effect=OSError(errno.ENOSYS, "")) as m: | |
| 1907 p = psutil.Process() | |
| 1908 p.name() | |
| 1909 with self.assertRaises(psutil.ZombieProcess) as exc: | |
| 1910 p.rlimit(psutil.RLIMIT_NOFILE) | |
| 1911 assert m.called | |
| 1912 self.assertEqual(exc.exception.pid, p.pid) | |
| 1913 self.assertEqual(exc.exception.name, p.name()) | |
| 1914 | |
| 1915 def test_cwd_zombie(self): | |
| 1916 with mock.patch("psutil._pslinux.os.readlink", | |
| 1917 side_effect=OSError(errno.ENOENT, "")) as m: | |
| 1918 p = psutil.Process() | |
| 1919 p.name() | |
| 1920 with self.assertRaises(psutil.ZombieProcess) as exc: | |
| 1921 p.cwd() | |
| 1922 assert m.called | |
| 1923 self.assertEqual(exc.exception.pid, p.pid) | |
| 1924 self.assertEqual(exc.exception.name, p.name()) | |
| 1925 | |
| 1926 def test_stat_file_parsing(self): | |
| 1927 from psutil._pslinux import CLOCK_TICKS | |
| 1928 | |
| 1929 args = [ | |
| 1930 "0", # pid | |
| 1931 "(cat)", # name | |
| 1932 "Z", # status | |
| 1933 "1", # ppid | |
| 1934 "0", # pgrp | |
| 1935 "0", # session | |
| 1936 "0", # tty | |
| 1937 "0", # tpgid | |
| 1938 "0", # flags | |
| 1939 "0", # minflt | |
| 1940 "0", # cminflt | |
| 1941 "0", # majflt | |
| 1942 "0", # cmajflt | |
| 1943 "2", # utime | |
| 1944 "3", # stime | |
| 1945 "4", # cutime | |
| 1946 "5", # cstime | |
| 1947 "0", # priority | |
| 1948 "0", # nice | |
| 1949 "0", # num_threads | |
| 1950 "0", # itrealvalue | |
| 1951 "6", # starttime | |
| 1952 "0", # vsize | |
| 1953 "0", # rss | |
| 1954 "0", # rsslim | |
| 1955 "0", # startcode | |
| 1956 "0", # endcode | |
| 1957 "0", # startstack | |
| 1958 "0", # kstkesp | |
| 1959 "0", # kstkeip | |
| 1960 "0", # signal | |
| 1961 "0", # blocked | |
| 1962 "0", # sigignore | |
| 1963 "0", # sigcatch | |
| 1964 "0", # wchan | |
| 1965 "0", # nswap | |
| 1966 "0", # cnswap | |
| 1967 "0", # exit_signal | |
| 1968 "6", # processor | |
| 1969 "0", # rt priority | |
| 1970 "0", # policy | |
| 1971 "7", # delayacct_blkio_ticks | |
| 1972 ] | |
| 1973 content = " ".join(args).encode() | |
| 1974 with mock_open_content('/proc/%s/stat' % os.getpid(), content): | |
| 1975 p = psutil.Process() | |
| 1976 self.assertEqual(p.name(), 'cat') | |
| 1977 self.assertEqual(p.status(), psutil.STATUS_ZOMBIE) | |
| 1978 self.assertEqual(p.ppid(), 1) | |
| 1979 self.assertEqual( | |
| 1980 p.create_time(), 6 / CLOCK_TICKS + psutil.boot_time()) | |
| 1981 cpu = p.cpu_times() | |
| 1982 self.assertEqual(cpu.user, 2 / CLOCK_TICKS) | |
| 1983 self.assertEqual(cpu.system, 3 / CLOCK_TICKS) | |
| 1984 self.assertEqual(cpu.children_user, 4 / CLOCK_TICKS) | |
| 1985 self.assertEqual(cpu.children_system, 5 / CLOCK_TICKS) | |
| 1986 self.assertEqual(cpu.iowait, 7 / CLOCK_TICKS) | |
| 1987 self.assertEqual(p.cpu_num(), 6) | |
| 1988 | |
| 1989 def test_status_file_parsing(self): | |
| 1990 with mock_open_content( | |
| 1991 '/proc/%s/status' % os.getpid(), | |
| 1992 textwrap.dedent("""\ | |
| 1993 Uid:\t1000\t1001\t1002\t1003 | |
| 1994 Gid:\t1004\t1005\t1006\t1007 | |
| 1995 Threads:\t66 | |
| 1996 Cpus_allowed:\tf | |
| 1997 Cpus_allowed_list:\t0-7 | |
| 1998 voluntary_ctxt_switches:\t12 | |
| 1999 nonvoluntary_ctxt_switches:\t13""").encode()): | |
| 2000 p = psutil.Process() | |
| 2001 self.assertEqual(p.num_ctx_switches().voluntary, 12) | |
| 2002 self.assertEqual(p.num_ctx_switches().involuntary, 13) | |
| 2003 self.assertEqual(p.num_threads(), 66) | |
| 2004 uids = p.uids() | |
| 2005 self.assertEqual(uids.real, 1000) | |
| 2006 self.assertEqual(uids.effective, 1001) | |
| 2007 self.assertEqual(uids.saved, 1002) | |
| 2008 gids = p.gids() | |
| 2009 self.assertEqual(gids.real, 1004) | |
| 2010 self.assertEqual(gids.effective, 1005) | |
| 2011 self.assertEqual(gids.saved, 1006) | |
| 2012 self.assertEqual(p._proc._get_eligible_cpus(), list(range(0, 8))) | |
| 2013 | |
| 2014 | |
| 2015 @unittest.skipIf(not LINUX, "LINUX only") | |
| 2016 class TestProcessAgainstStatus(unittest.TestCase): | |
| 2017 """/proc/pid/stat and /proc/pid/status have many values in common. | |
| 2018 Whenever possible, psutil uses /proc/pid/stat (it's faster). | |
| 2019 For all those cases we check that the value found in | |
| 2020 /proc/pid/stat (by psutil) matches the one found in | |
| 2021 /proc/pid/status. | |
| 2022 """ | |
| 2023 | |
| 2024 @classmethod | |
| 2025 def setUpClass(cls): | |
| 2026 cls.proc = psutil.Process() | |
| 2027 | |
| 2028 def read_status_file(self, linestart): | |
| 2029 with psutil._psplatform.open_text( | |
| 2030 '/proc/%s/status' % self.proc.pid) as f: | |
| 2031 for line in f: | |
| 2032 line = line.strip() | |
| 2033 if line.startswith(linestart): | |
| 2034 value = line.partition('\t')[2] | |
| 2035 try: | |
| 2036 return int(value) | |
| 2037 except ValueError: | |
| 2038 return value | |
| 2039 raise ValueError("can't find %r" % linestart) | |
| 2040 | |
| 2041 def test_name(self): | |
| 2042 value = self.read_status_file("Name:") | |
| 2043 self.assertEqual(self.proc.name(), value) | |
| 2044 | |
| 2045 def test_status(self): | |
| 2046 value = self.read_status_file("State:") | |
| 2047 value = value[value.find('(') + 1:value.rfind(')')] | |
| 2048 value = value.replace(' ', '-') | |
| 2049 self.assertEqual(self.proc.status(), value) | |
| 2050 | |
| 2051 def test_ppid(self): | |
| 2052 value = self.read_status_file("PPid:") | |
| 2053 self.assertEqual(self.proc.ppid(), value) | |
| 2054 | |
| 2055 def test_num_threads(self): | |
| 2056 value = self.read_status_file("Threads:") | |
| 2057 self.assertEqual(self.proc.num_threads(), value) | |
| 2058 | |
| 2059 def test_uids(self): | |
| 2060 value = self.read_status_file("Uid:") | |
| 2061 value = tuple(map(int, value.split()[1:4])) | |
| 2062 self.assertEqual(self.proc.uids(), value) | |
| 2063 | |
| 2064 def test_gids(self): | |
| 2065 value = self.read_status_file("Gid:") | |
| 2066 value = tuple(map(int, value.split()[1:4])) | |
| 2067 self.assertEqual(self.proc.gids(), value) | |
| 2068 | |
| 2069 @retry_on_failure() | |
| 2070 def test_num_ctx_switches(self): | |
| 2071 value = self.read_status_file("voluntary_ctxt_switches:") | |
| 2072 self.assertEqual(self.proc.num_ctx_switches().voluntary, value) | |
| 2073 value = self.read_status_file("nonvoluntary_ctxt_switches:") | |
| 2074 self.assertEqual(self.proc.num_ctx_switches().involuntary, value) | |
| 2075 | |
| 2076 def test_cpu_affinity(self): | |
| 2077 value = self.read_status_file("Cpus_allowed_list:") | |
| 2078 if '-' in str(value): | |
| 2079 min_, max_ = map(int, value.split('-')) | |
| 2080 self.assertEqual( | |
| 2081 self.proc.cpu_affinity(), list(range(min_, max_ + 1))) | |
| 2082 | |
| 2083 def test_cpu_affinity_eligible_cpus(self): | |
| 2084 value = self.read_status_file("Cpus_allowed_list:") | |
| 2085 with mock.patch("psutil._pslinux.per_cpu_times") as m: | |
| 2086 self.proc._proc._get_eligible_cpus() | |
| 2087 if '-' in str(value): | |
| 2088 assert not m.called | |
| 2089 else: | |
| 2090 assert m.called | |
| 2091 | |
| 2092 | |
| 2093 # ===================================================================== | |
| 2094 # --- test utils | |
| 2095 # ===================================================================== | |
| 2096 | |
| 2097 | |
| 2098 @unittest.skipIf(not LINUX, "LINUX only") | |
| 2099 class TestUtils(unittest.TestCase): | |
| 2100 | |
| 2101 def test_readlink(self): | |
| 2102 with mock.patch("os.readlink", return_value="foo (deleted)") as m: | |
| 2103 self.assertEqual(psutil._psplatform.readlink("bar"), "foo") | |
| 2104 assert m.called | |
| 2105 | |
| 2106 def test_cat(self): | |
| 2107 fname = os.path.abspath(TESTFN) | |
| 2108 with open(fname, "wt") as f: | |
| 2109 f.write("foo ") | |
| 2110 self.assertEqual(psutil._psplatform.cat(TESTFN, binary=False), "foo") | |
| 2111 self.assertEqual(psutil._psplatform.cat(TESTFN, binary=True), b"foo") | |
| 2112 self.assertEqual( | |
| 2113 psutil._psplatform.cat(TESTFN + '??', fallback="bar"), "bar") | |
| 2114 | |
| 2115 | |
| 2116 if __name__ == '__main__': | |
| 2117 from psutil.tests.runner import run | |
| 2118 run(__file__) |
