comparison env/lib/python3.9/site-packages/psutil/tests/test_linux.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
4 # Use of this source code is governed by a BSD-style license that can be
5 # found in the LICENSE file.
6
7 """Linux specific tests."""
8
9 from __future__ import division
10 import collections
11 import contextlib
12 import errno
13 import glob
14 import io
15 import os
16 import re
17 import shutil
18 import socket
19 import struct
20 import textwrap
21 import time
22 import warnings
23
24 import psutil
25 from psutil import LINUX
26 from psutil._compat import basestring
27 from psutil._compat import FileNotFoundError
28 from psutil._compat import PY3
29 from psutil._compat import u
30 from psutil.tests import call_until
31 from psutil.tests import GLOBAL_TIMEOUT
32 from psutil.tests import HAS_BATTERY
33 from psutil.tests import HAS_CPU_FREQ
34 from psutil.tests import HAS_GETLOADAVG
35 from psutil.tests import HAS_RLIMIT
36 from psutil.tests import mock
37 from psutil.tests import PsutilTestCase
38 from psutil.tests import PYPY
39 from psutil.tests import reload_module
40 from psutil.tests import retry_on_failure
41 from psutil.tests import safe_rmpath
42 from psutil.tests import sh
43 from psutil.tests import skip_on_not_implemented
44 from psutil.tests import ThreadTask
45 from psutil.tests import TOLERANCE_DISK_USAGE
46 from psutil.tests import TOLERANCE_SYS_MEM
47 from psutil.tests import unittest
48 from psutil.tests import which
49
50
51 HERE = os.path.abspath(os.path.dirname(__file__))
52 SIOCGIFADDR = 0x8915
53 SIOCGIFCONF = 0x8912
54 SIOCGIFHWADDR = 0x8927
55 SIOCGIFNETMASK = 0x891b
56 SIOCGIFBRDADDR = 0x8919
57 if LINUX:
58 SECTOR_SIZE = 512
59 EMPTY_TEMPERATURES = not glob.glob('/sys/class/hwmon/hwmon*')
60
61 # =====================================================================
62 # --- utils
63 # =====================================================================
64
65
66 def get_ipv4_address(ifname):
67 import fcntl
68 ifname = ifname[:15]
69 if PY3:
70 ifname = bytes(ifname, 'ascii')
71 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
72 with contextlib.closing(s):
73 return socket.inet_ntoa(
74 fcntl.ioctl(s.fileno(),
75 SIOCGIFADDR,
76 struct.pack('256s', ifname))[20:24])
77
78
79 def get_ipv4_netmask(ifname):
80 import fcntl
81 ifname = ifname[:15]
82 if PY3:
83 ifname = bytes(ifname, 'ascii')
84 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
85 with contextlib.closing(s):
86 return socket.inet_ntoa(
87 fcntl.ioctl(s.fileno(),
88 SIOCGIFNETMASK,
89 struct.pack('256s', ifname))[20:24])
90
91
92 def get_ipv4_broadcast(ifname):
93 import fcntl
94 ifname = ifname[:15]
95 if PY3:
96 ifname = bytes(ifname, 'ascii')
97 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
98 with contextlib.closing(s):
99 return socket.inet_ntoa(
100 fcntl.ioctl(s.fileno(),
101 SIOCGIFBRDADDR,
102 struct.pack('256s', ifname))[20:24])
103
104
105 def get_ipv6_address(ifname):
106 with open("/proc/net/if_inet6", 'rt') as f:
107 for line in f.readlines():
108 fields = line.split()
109 if fields[-1] == ifname:
110 break
111 else:
112 raise ValueError("could not find interface %r" % ifname)
113 unformatted = fields[0]
114 groups = []
115 for i in range(0, len(unformatted), 4):
116 groups.append(unformatted[i:i + 4])
117 formatted = ":".join(groups)
118 packed = socket.inet_pton(socket.AF_INET6, formatted)
119 return socket.inet_ntop(socket.AF_INET6, packed)
120
121
122 def get_mac_address(ifname):
123 import fcntl
124 ifname = ifname[:15]
125 if PY3:
126 ifname = bytes(ifname, 'ascii')
127 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
128 with contextlib.closing(s):
129 info = fcntl.ioctl(
130 s.fileno(), SIOCGIFHWADDR, struct.pack('256s', ifname))
131 if PY3:
132 def ord(x):
133 return x
134 else:
135 import __builtin__
136 ord = __builtin__.ord
137 return ''.join(['%02x:' % ord(char) for char in info[18:24]])[:-1]
138
139
140 def free_swap():
141 """Parse 'free' cmd and return swap memory's s total, used and free
142 values.
143 """
144 out = sh('free -b', env={"LANG": "C.UTF-8"})
145 lines = out.split('\n')
146 for line in lines:
147 if line.startswith('Swap'):
148 _, total, used, free = line.split()
149 nt = collections.namedtuple('free', 'total used free')
150 return nt(int(total), int(used), int(free))
151 raise ValueError(
152 "can't find 'Swap' in 'free' output:\n%s" % '\n'.join(lines))
153
154
155 def free_physmem():
156 """Parse 'free' cmd and return physical memory's total, used
157 and free values.
158 """
159 # Note: free can have 2 different formats, invalidating 'shared'
160 # and 'cached' memory which may have different positions so we
161 # do not return them.
162 # https://github.com/giampaolo/psutil/issues/538#issuecomment-57059946
163 out = sh('free -b', env={"LANG": "C.UTF-8"})
164 lines = out.split('\n')
165 for line in lines:
166 if line.startswith('Mem'):
167 total, used, free, shared = \
168 [int(x) for x in line.split()[1:5]]
169 nt = collections.namedtuple(
170 'free', 'total used free shared output')
171 return nt(total, used, free, shared, out)
172 raise ValueError(
173 "can't find 'Mem' in 'free' output:\n%s" % '\n'.join(lines))
174
175
176 def vmstat(stat):
177 out = sh("vmstat -s", env={"LANG": "C.UTF-8"})
178 for line in out.split("\n"):
179 line = line.strip()
180 if stat in line:
181 return int(line.split(' ')[0])
182 raise ValueError("can't find %r in 'vmstat' output" % stat)
183
184
185 def get_free_version_info():
186 out = sh("free -V").strip()
187 if 'UNKNOWN' in out:
188 raise unittest.SkipTest("can't determine free version")
189 return tuple(map(int, out.split()[-1].split('.')))
190
191
192 @contextlib.contextmanager
193 def mock_open_content(for_path, content):
194 """Mock open() builtin and forces it to return a certain `content`
195 on read() if the path being opened matches `for_path`.
196 """
197 def open_mock(name, *args, **kwargs):
198 if name == for_path:
199 if PY3:
200 if isinstance(content, basestring):
201 return io.StringIO(content)
202 else:
203 return io.BytesIO(content)
204 else:
205 return io.BytesIO(content)
206 else:
207 return orig_open(name, *args, **kwargs)
208
209 orig_open = open
210 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
211 with mock.patch(patch_point, create=True, side_effect=open_mock) as m:
212 yield m
213
214
215 @contextlib.contextmanager
216 def mock_open_exception(for_path, exc):
217 """Mock open() builtin and raises `exc` if the path being opened
218 matches `for_path`.
219 """
220 def open_mock(name, *args, **kwargs):
221 if name == for_path:
222 raise exc
223 else:
224 return orig_open(name, *args, **kwargs)
225
226 orig_open = open
227 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
228 with mock.patch(patch_point, create=True, side_effect=open_mock) as m:
229 yield m
230
231
232 # =====================================================================
233 # --- system virtual memory
234 # =====================================================================
235
236
237 @unittest.skipIf(not LINUX, "LINUX only")
238 class TestSystemVirtualMemory(PsutilTestCase):
239
240 def test_total(self):
241 # free_value = free_physmem().total
242 # psutil_value = psutil.virtual_memory().total
243 # self.assertEqual(free_value, psutil_value)
244 vmstat_value = vmstat('total memory') * 1024
245 psutil_value = psutil.virtual_memory().total
246 self.assertAlmostEqual(vmstat_value, psutil_value)
247
248 @retry_on_failure()
249 def test_used(self):
250 # Older versions of procps used slab memory to calculate used memory.
251 # This got changed in:
252 # https://gitlab.com/procps-ng/procps/commit/
253 # 05d751c4f076a2f0118b914c5e51cfbb4762ad8e
254 if get_free_version_info() < (3, 3, 12):
255 raise self.skipTest("old free version")
256 free = free_physmem()
257 free_value = free.used
258 psutil_value = psutil.virtual_memory().used
259 self.assertAlmostEqual(
260 free_value, psutil_value, delta=TOLERANCE_SYS_MEM,
261 msg='%s %s \n%s' % (free_value, psutil_value, free.output))
262
263 @retry_on_failure()
264 def test_free(self):
265 vmstat_value = vmstat('free memory') * 1024
266 psutil_value = psutil.virtual_memory().free
267 self.assertAlmostEqual(
268 vmstat_value, psutil_value, delta=TOLERANCE_SYS_MEM)
269
270 @retry_on_failure()
271 def test_buffers(self):
272 vmstat_value = vmstat('buffer memory') * 1024
273 psutil_value = psutil.virtual_memory().buffers
274 self.assertAlmostEqual(
275 vmstat_value, psutil_value, delta=TOLERANCE_SYS_MEM)
276
277 @retry_on_failure()
278 def test_active(self):
279 vmstat_value = vmstat('active memory') * 1024
280 psutil_value = psutil.virtual_memory().active
281 self.assertAlmostEqual(
282 vmstat_value, psutil_value, delta=TOLERANCE_SYS_MEM)
283
284 @retry_on_failure()
285 def test_inactive(self):
286 vmstat_value = vmstat('inactive memory') * 1024
287 psutil_value = psutil.virtual_memory().inactive
288 self.assertAlmostEqual(
289 vmstat_value, psutil_value, delta=TOLERANCE_SYS_MEM)
290
291 @retry_on_failure()
292 def test_shared(self):
293 free = free_physmem()
294 free_value = free.shared
295 if free_value == 0:
296 raise unittest.SkipTest("free does not support 'shared' column")
297 psutil_value = psutil.virtual_memory().shared
298 self.assertAlmostEqual(
299 free_value, psutil_value, delta=TOLERANCE_SYS_MEM,
300 msg='%s %s \n%s' % (free_value, psutil_value, free.output))
301
302 @retry_on_failure()
303 def test_available(self):
304 # "free" output format has changed at some point:
305 # https://github.com/giampaolo/psutil/issues/538#issuecomment-147192098
306 out = sh("free -b")
307 lines = out.split('\n')
308 if 'available' not in lines[0]:
309 raise unittest.SkipTest("free does not support 'available' column")
310 else:
311 free_value = int(lines[1].split()[-1])
312 psutil_value = psutil.virtual_memory().available
313 self.assertAlmostEqual(
314 free_value, psutil_value, delta=TOLERANCE_SYS_MEM,
315 msg='%s %s \n%s' % (free_value, psutil_value, out))
316
317 def test_warnings_on_misses(self):
318 # Emulate a case where /proc/meminfo provides few info.
319 # psutil is supposed to set the missing fields to 0 and
320 # raise a warning.
321 with mock_open_content(
322 '/proc/meminfo',
323 textwrap.dedent("""\
324 Active(anon): 6145416 kB
325 Active(file): 2950064 kB
326 Inactive(anon): 574764 kB
327 Inactive(file): 1567648 kB
328 MemAvailable: -1 kB
329 MemFree: 2057400 kB
330 MemTotal: 16325648 kB
331 SReclaimable: 346648 kB
332 """).encode()) as m:
333 with warnings.catch_warnings(record=True) as ws:
334 warnings.simplefilter("always")
335 ret = psutil.virtual_memory()
336 assert m.called
337 self.assertEqual(len(ws), 1)
338 w = ws[0]
339 assert w.filename.endswith('psutil/_pslinux.py')
340 self.assertIn(
341 "memory stats couldn't be determined", str(w.message))
342 self.assertIn("cached", str(w.message))
343 self.assertIn("shared", str(w.message))
344 self.assertIn("active", str(w.message))
345 self.assertIn("inactive", str(w.message))
346 self.assertIn("buffers", str(w.message))
347 self.assertIn("available", str(w.message))
348 self.assertEqual(ret.cached, 0)
349 self.assertEqual(ret.active, 0)
350 self.assertEqual(ret.inactive, 0)
351 self.assertEqual(ret.shared, 0)
352 self.assertEqual(ret.buffers, 0)
353 self.assertEqual(ret.available, 0)
354 self.assertEqual(ret.slab, 0)
355
356 @retry_on_failure()
357 def test_avail_old_percent(self):
358 # Make sure that our calculation of avail mem for old kernels
359 # is off by max 15%.
360 from psutil._pslinux import calculate_avail_vmem
361 from psutil._pslinux import open_binary
362
363 mems = {}
364 with open_binary('/proc/meminfo') as f:
365 for line in f:
366 fields = line.split()
367 mems[fields[0]] = int(fields[1]) * 1024
368
369 a = calculate_avail_vmem(mems)
370 if b'MemAvailable:' in mems:
371 b = mems[b'MemAvailable:']
372 diff_percent = abs(a - b) / a * 100
373 self.assertLess(diff_percent, 15)
374
375 def test_avail_old_comes_from_kernel(self):
376 # Make sure "MemAvailable:" coluimn is used instead of relying
377 # on our internal algorithm to calculate avail mem.
378 with mock_open_content(
379 '/proc/meminfo',
380 textwrap.dedent("""\
381 Active: 9444728 kB
382 Active(anon): 6145416 kB
383 Active(file): 2950064 kB
384 Buffers: 287952 kB
385 Cached: 4818144 kB
386 Inactive(file): 1578132 kB
387 Inactive(anon): 574764 kB
388 Inactive(file): 1567648 kB
389 MemAvailable: 6574984 kB
390 MemFree: 2057400 kB
391 MemTotal: 16325648 kB
392 Shmem: 577588 kB
393 SReclaimable: 346648 kB
394 """).encode()) as m:
395 with warnings.catch_warnings(record=True) as ws:
396 ret = psutil.virtual_memory()
397 assert m.called
398 self.assertEqual(ret.available, 6574984 * 1024)
399 w = ws[0]
400 self.assertIn(
401 "inactive memory stats couldn't be determined", str(w.message))
402
403 def test_avail_old_missing_fields(self):
404 # Remove Active(file), Inactive(file) and SReclaimable
405 # from /proc/meminfo and make sure the fallback is used
406 # (free + cached),
407 with mock_open_content(
408 "/proc/meminfo",
409 textwrap.dedent("""\
410 Active: 9444728 kB
411 Active(anon): 6145416 kB
412 Buffers: 287952 kB
413 Cached: 4818144 kB
414 Inactive(file): 1578132 kB
415 Inactive(anon): 574764 kB
416 MemFree: 2057400 kB
417 MemTotal: 16325648 kB
418 Shmem: 577588 kB
419 """).encode()) as m:
420 with warnings.catch_warnings(record=True) as ws:
421 ret = psutil.virtual_memory()
422 assert m.called
423 self.assertEqual(ret.available, 2057400 * 1024 + 4818144 * 1024)
424 w = ws[0]
425 self.assertIn(
426 "inactive memory stats couldn't be determined", str(w.message))
427
428 def test_avail_old_missing_zoneinfo(self):
429 # Remove /proc/zoneinfo file. Make sure fallback is used
430 # (free + cached).
431 with mock_open_content(
432 "/proc/meminfo",
433 textwrap.dedent("""\
434 Active: 9444728 kB
435 Active(anon): 6145416 kB
436 Active(file): 2950064 kB
437 Buffers: 287952 kB
438 Cached: 4818144 kB
439 Inactive(file): 1578132 kB
440 Inactive(anon): 574764 kB
441 Inactive(file): 1567648 kB
442 MemFree: 2057400 kB
443 MemTotal: 16325648 kB
444 Shmem: 577588 kB
445 SReclaimable: 346648 kB
446 """).encode()):
447 with mock_open_exception(
448 "/proc/zoneinfo",
449 IOError(errno.ENOENT, 'no such file or directory')):
450 with warnings.catch_warnings(record=True) as ws:
451 ret = psutil.virtual_memory()
452 self.assertEqual(
453 ret.available, 2057400 * 1024 + 4818144 * 1024)
454 w = ws[0]
455 self.assertIn(
456 "inactive memory stats couldn't be determined",
457 str(w.message))
458
459 def test_virtual_memory_mocked(self):
460 # Emulate /proc/meminfo because neither vmstat nor free return slab.
461 def open_mock(name, *args, **kwargs):
462 if name == '/proc/meminfo':
463 return io.BytesIO(textwrap.dedent("""\
464 MemTotal: 100 kB
465 MemFree: 2 kB
466 MemAvailable: 3 kB
467 Buffers: 4 kB
468 Cached: 5 kB
469 SwapCached: 6 kB
470 Active: 7 kB
471 Inactive: 8 kB
472 Active(anon): 9 kB
473 Inactive(anon): 10 kB
474 Active(file): 11 kB
475 Inactive(file): 12 kB
476 Unevictable: 13 kB
477 Mlocked: 14 kB
478 SwapTotal: 15 kB
479 SwapFree: 16 kB
480 Dirty: 17 kB
481 Writeback: 18 kB
482 AnonPages: 19 kB
483 Mapped: 20 kB
484 Shmem: 21 kB
485 Slab: 22 kB
486 SReclaimable: 23 kB
487 SUnreclaim: 24 kB
488 KernelStack: 25 kB
489 PageTables: 26 kB
490 NFS_Unstable: 27 kB
491 Bounce: 28 kB
492 WritebackTmp: 29 kB
493 CommitLimit: 30 kB
494 Committed_AS: 31 kB
495 VmallocTotal: 32 kB
496 VmallocUsed: 33 kB
497 VmallocChunk: 34 kB
498 HardwareCorrupted: 35 kB
499 AnonHugePages: 36 kB
500 ShmemHugePages: 37 kB
501 ShmemPmdMapped: 38 kB
502 CmaTotal: 39 kB
503 CmaFree: 40 kB
504 HugePages_Total: 41 kB
505 HugePages_Free: 42 kB
506 HugePages_Rsvd: 43 kB
507 HugePages_Surp: 44 kB
508 Hugepagesize: 45 kB
509 DirectMap46k: 46 kB
510 DirectMap47M: 47 kB
511 DirectMap48G: 48 kB
512 """).encode())
513 else:
514 return orig_open(name, *args, **kwargs)
515
516 orig_open = open
517 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
518 with mock.patch(patch_point, create=True, side_effect=open_mock) as m:
519 mem = psutil.virtual_memory()
520 assert m.called
521 self.assertEqual(mem.total, 100 * 1024)
522 self.assertEqual(mem.free, 2 * 1024)
523 self.assertEqual(mem.buffers, 4 * 1024)
524 # cached mem also includes reclaimable memory
525 self.assertEqual(mem.cached, (5 + 23) * 1024)
526 self.assertEqual(mem.shared, 21 * 1024)
527 self.assertEqual(mem.active, 7 * 1024)
528 self.assertEqual(mem.inactive, 8 * 1024)
529 self.assertEqual(mem.slab, 22 * 1024)
530 self.assertEqual(mem.available, 3 * 1024)
531
532
533 # =====================================================================
534 # --- system swap memory
535 # =====================================================================
536
537
538 @unittest.skipIf(not LINUX, "LINUX only")
539 class TestSystemSwapMemory(PsutilTestCase):
540
541 @staticmethod
542 def meminfo_has_swap_info():
543 """Return True if /proc/meminfo provides swap metrics."""
544 with open("/proc/meminfo") as f:
545 data = f.read()
546 return 'SwapTotal:' in data and 'SwapFree:' in data
547
548 def test_total(self):
549 free_value = free_swap().total
550 psutil_value = psutil.swap_memory().total
551 return self.assertAlmostEqual(
552 free_value, psutil_value, delta=TOLERANCE_SYS_MEM)
553
554 @retry_on_failure()
555 def test_used(self):
556 free_value = free_swap().used
557 psutil_value = psutil.swap_memory().used
558 return self.assertAlmostEqual(
559 free_value, psutil_value, delta=TOLERANCE_SYS_MEM)
560
561 @retry_on_failure()
562 def test_free(self):
563 free_value = free_swap().free
564 psutil_value = psutil.swap_memory().free
565 return self.assertAlmostEqual(
566 free_value, psutil_value, delta=TOLERANCE_SYS_MEM)
567
568 def test_missing_sin_sout(self):
569 with mock.patch('psutil._common.open', create=True) as m:
570 with warnings.catch_warnings(record=True) as ws:
571 warnings.simplefilter("always")
572 ret = psutil.swap_memory()
573 assert m.called
574 self.assertEqual(len(ws), 1)
575 w = ws[0]
576 assert w.filename.endswith('psutil/_pslinux.py')
577 self.assertIn(
578 "'sin' and 'sout' swap memory stats couldn't "
579 "be determined", str(w.message))
580 self.assertEqual(ret.sin, 0)
581 self.assertEqual(ret.sout, 0)
582
583 def test_no_vmstat_mocked(self):
584 # see https://github.com/giampaolo/psutil/issues/722
585 with mock_open_exception(
586 "/proc/vmstat",
587 IOError(errno.ENOENT, 'no such file or directory')) as m:
588 with warnings.catch_warnings(record=True) as ws:
589 warnings.simplefilter("always")
590 ret = psutil.swap_memory()
591 assert m.called
592 self.assertEqual(len(ws), 1)
593 w = ws[0]
594 assert w.filename.endswith('psutil/_pslinux.py')
595 self.assertIn(
596 "'sin' and 'sout' swap memory stats couldn't "
597 "be determined and were set to 0",
598 str(w.message))
599 self.assertEqual(ret.sin, 0)
600 self.assertEqual(ret.sout, 0)
601
602 def test_meminfo_against_sysinfo(self):
603 # Make sure the content of /proc/meminfo about swap memory
604 # matches sysinfo() syscall, see:
605 # https://github.com/giampaolo/psutil/issues/1015
606 if not self.meminfo_has_swap_info():
607 return unittest.skip("/proc/meminfo has no swap metrics")
608 with mock.patch('psutil._pslinux.cext.linux_sysinfo') as m:
609 swap = psutil.swap_memory()
610 assert not m.called
611 import psutil._psutil_linux as cext
612 _, _, _, _, total, free, unit_multiplier = cext.linux_sysinfo()
613 total *= unit_multiplier
614 free *= unit_multiplier
615 self.assertEqual(swap.total, total)
616 self.assertAlmostEqual(swap.free, free, delta=TOLERANCE_SYS_MEM)
617
618 def test_emulate_meminfo_has_no_metrics(self):
619 # Emulate a case where /proc/meminfo provides no swap metrics
620 # in which case sysinfo() syscall is supposed to be used
621 # as a fallback.
622 with mock_open_content("/proc/meminfo", b"") as m:
623 psutil.swap_memory()
624 assert m.called
625
626
627 # =====================================================================
628 # --- system CPU
629 # =====================================================================
630
631
632 @unittest.skipIf(not LINUX, "LINUX only")
633 class TestSystemCPUTimes(PsutilTestCase):
634
635 def test_fields(self):
636 fields = psutil.cpu_times()._fields
637 kernel_ver = re.findall(r'\d+\.\d+\.\d+', os.uname()[2])[0]
638 kernel_ver_info = tuple(map(int, kernel_ver.split('.')))
639 if kernel_ver_info >= (2, 6, 11):
640 self.assertIn('steal', fields)
641 else:
642 self.assertNotIn('steal', fields)
643 if kernel_ver_info >= (2, 6, 24):
644 self.assertIn('guest', fields)
645 else:
646 self.assertNotIn('guest', fields)
647 if kernel_ver_info >= (3, 2, 0):
648 self.assertIn('guest_nice', fields)
649 else:
650 self.assertNotIn('guest_nice', fields)
651
652
653 @unittest.skipIf(not LINUX, "LINUX only")
654 class TestSystemCPUCountLogical(PsutilTestCase):
655
656 @unittest.skipIf(not os.path.exists("/sys/devices/system/cpu/online"),
657 "/sys/devices/system/cpu/online does not exist")
658 def test_against_sysdev_cpu_online(self):
659 with open("/sys/devices/system/cpu/online") as f:
660 value = f.read().strip()
661 if "-" in str(value):
662 value = int(value.split('-')[1]) + 1
663 self.assertEqual(psutil.cpu_count(), value)
664
665 @unittest.skipIf(not os.path.exists("/sys/devices/system/cpu"),
666 "/sys/devices/system/cpu does not exist")
667 def test_against_sysdev_cpu_num(self):
668 ls = os.listdir("/sys/devices/system/cpu")
669 count = len([x for x in ls if re.search(r"cpu\d+$", x) is not None])
670 self.assertEqual(psutil.cpu_count(), count)
671
672 @unittest.skipIf(not which("nproc"), "nproc utility not available")
673 def test_against_nproc(self):
674 num = int(sh("nproc --all"))
675 self.assertEqual(psutil.cpu_count(logical=True), num)
676
677 @unittest.skipIf(not which("lscpu"), "lscpu utility not available")
678 def test_against_lscpu(self):
679 out = sh("lscpu -p")
680 num = len([x for x in out.split('\n') if not x.startswith('#')])
681 self.assertEqual(psutil.cpu_count(logical=True), num)
682
683 def test_emulate_fallbacks(self):
684 import psutil._pslinux
685 original = psutil._pslinux.cpu_count_logical()
686 # Here we want to mock os.sysconf("SC_NPROCESSORS_ONLN") in
687 # order to cause the parsing of /proc/cpuinfo and /proc/stat.
688 with mock.patch(
689 'psutil._pslinux.os.sysconf', side_effect=ValueError) as m:
690 self.assertEqual(psutil._pslinux.cpu_count_logical(), original)
691 assert m.called
692
693 # Let's have open() return emtpy data and make sure None is
694 # returned ('cause we mimick os.cpu_count()).
695 with mock.patch('psutil._common.open', create=True) as m:
696 self.assertIsNone(psutil._pslinux.cpu_count_logical())
697 self.assertEqual(m.call_count, 2)
698 # /proc/stat should be the last one
699 self.assertEqual(m.call_args[0][0], '/proc/stat')
700
701 # Let's push this a bit further and make sure /proc/cpuinfo
702 # parsing works as expected.
703 with open('/proc/cpuinfo', 'rb') as f:
704 cpuinfo_data = f.read()
705 fake_file = io.BytesIO(cpuinfo_data)
706 with mock.patch('psutil._common.open',
707 return_value=fake_file, create=True) as m:
708 self.assertEqual(psutil._pslinux.cpu_count_logical(), original)
709
710 # Finally, let's make /proc/cpuinfo return meaningless data;
711 # this way we'll fall back on relying on /proc/stat
712 with mock_open_content('/proc/cpuinfo', b"") as m:
713 self.assertEqual(psutil._pslinux.cpu_count_logical(), original)
714 m.called
715
716
717 @unittest.skipIf(not LINUX, "LINUX only")
718 class TestSystemCPUCountPhysical(PsutilTestCase):
719
720 @unittest.skipIf(not which("lscpu"), "lscpu utility not available")
721 def test_against_lscpu(self):
722 out = sh("lscpu -p")
723 core_ids = set()
724 for line in out.split('\n'):
725 if not line.startswith('#'):
726 fields = line.split(',')
727 core_ids.add(fields[1])
728 self.assertEqual(psutil.cpu_count(logical=False), len(core_ids))
729
730 def test_method_2(self):
731 meth_1 = psutil._pslinux.cpu_count_physical()
732 with mock.patch('glob.glob', return_value=[]) as m:
733 meth_2 = psutil._pslinux.cpu_count_physical()
734 assert m.called
735 if meth_1 is not None:
736 self.assertEqual(meth_1, meth_2)
737
738 def test_emulate_none(self):
739 with mock.patch('glob.glob', return_value=[]) as m1:
740 with mock.patch('psutil._common.open', create=True) as m2:
741 self.assertIsNone(psutil._pslinux.cpu_count_physical())
742 assert m1.called
743 assert m2.called
744
745
746 @unittest.skipIf(not LINUX, "LINUX only")
747 class TestSystemCPUFrequency(PsutilTestCase):
748
749 @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
750 def test_emulate_use_second_file(self):
751 # https://github.com/giampaolo/psutil/issues/981
752 def path_exists_mock(path):
753 if path.startswith("/sys/devices/system/cpu/cpufreq/policy"):
754 return False
755 else:
756 return orig_exists(path)
757
758 orig_exists = os.path.exists
759 with mock.patch("os.path.exists", side_effect=path_exists_mock,
760 create=True):
761 assert psutil.cpu_freq()
762
763 @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
764 def test_emulate_use_cpuinfo(self):
765 # Emulate a case where /sys/devices/system/cpu/cpufreq* does not
766 # exist and /proc/cpuinfo is used instead.
767 def path_exists_mock(path):
768 if path.startswith('/sys/devices/system/cpu/'):
769 return False
770 else:
771 if path == "/proc/cpuinfo":
772 flags.append(None)
773 return os_path_exists(path)
774
775 flags = []
776 os_path_exists = os.path.exists
777 try:
778 with mock.patch("os.path.exists", side_effect=path_exists_mock):
779 reload_module(psutil._pslinux)
780 ret = psutil.cpu_freq()
781 assert ret
782 assert flags
783 self.assertEqual(ret.max, 0.0)
784 self.assertEqual(ret.min, 0.0)
785 for freq in psutil.cpu_freq(percpu=True):
786 self.assertEqual(ret.max, 0.0)
787 self.assertEqual(ret.min, 0.0)
788 finally:
789 reload_module(psutil._pslinux)
790 reload_module(psutil)
791
792 @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
793 def test_emulate_data(self):
794 def open_mock(name, *args, **kwargs):
795 if (name.endswith('/scaling_cur_freq') and
796 name.startswith("/sys/devices/system/cpu/cpufreq/policy")):
797 return io.BytesIO(b"500000")
798 elif (name.endswith('/scaling_min_freq') and
799 name.startswith("/sys/devices/system/cpu/cpufreq/policy")):
800 return io.BytesIO(b"600000")
801 elif (name.endswith('/scaling_max_freq') and
802 name.startswith("/sys/devices/system/cpu/cpufreq/policy")):
803 return io.BytesIO(b"700000")
804 elif name == '/proc/cpuinfo':
805 return io.BytesIO(b"cpu MHz : 500")
806 else:
807 return orig_open(name, *args, **kwargs)
808
809 orig_open = open
810 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
811 with mock.patch(patch_point, side_effect=open_mock):
812 with mock.patch(
813 'os.path.exists', return_value=True):
814 freq = psutil.cpu_freq()
815 self.assertEqual(freq.current, 500.0)
816 # when /proc/cpuinfo is used min and max frequencies are not
817 # available and are set to 0.
818 if freq.min != 0.0:
819 self.assertEqual(freq.min, 600.0)
820 if freq.max != 0.0:
821 self.assertEqual(freq.max, 700.0)
822
823 @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
824 def test_emulate_multi_cpu(self):
825 def open_mock(name, *args, **kwargs):
826 n = name
827 if (n.endswith('/scaling_cur_freq') and
828 n.startswith("/sys/devices/system/cpu/cpufreq/policy0")):
829 return io.BytesIO(b"100000")
830 elif (n.endswith('/scaling_min_freq') and
831 n.startswith("/sys/devices/system/cpu/cpufreq/policy0")):
832 return io.BytesIO(b"200000")
833 elif (n.endswith('/scaling_max_freq') and
834 n.startswith("/sys/devices/system/cpu/cpufreq/policy0")):
835 return io.BytesIO(b"300000")
836 elif (n.endswith('/scaling_cur_freq') and
837 n.startswith("/sys/devices/system/cpu/cpufreq/policy1")):
838 return io.BytesIO(b"400000")
839 elif (n.endswith('/scaling_min_freq') and
840 n.startswith("/sys/devices/system/cpu/cpufreq/policy1")):
841 return io.BytesIO(b"500000")
842 elif (n.endswith('/scaling_max_freq') and
843 n.startswith("/sys/devices/system/cpu/cpufreq/policy1")):
844 return io.BytesIO(b"600000")
845 elif name == '/proc/cpuinfo':
846 return io.BytesIO(b"cpu MHz : 100\n"
847 b"cpu MHz : 400")
848 else:
849 return orig_open(name, *args, **kwargs)
850
851 orig_open = open
852 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
853 with mock.patch(patch_point, side_effect=open_mock):
854 with mock.patch('os.path.exists', return_value=True):
855 with mock.patch('psutil._pslinux.cpu_count_logical',
856 return_value=2):
857 freq = psutil.cpu_freq(percpu=True)
858 self.assertEqual(freq[0].current, 100.0)
859 if freq[0].min != 0.0:
860 self.assertEqual(freq[0].min, 200.0)
861 if freq[0].max != 0.0:
862 self.assertEqual(freq[0].max, 300.0)
863 self.assertEqual(freq[1].current, 400.0)
864 if freq[1].min != 0.0:
865 self.assertEqual(freq[1].min, 500.0)
866 if freq[1].max != 0.0:
867 self.assertEqual(freq[1].max, 600.0)
868
869 @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
870 def test_emulate_no_scaling_cur_freq_file(self):
871 # See: https://github.com/giampaolo/psutil/issues/1071
872 def open_mock(name, *args, **kwargs):
873 if name.endswith('/scaling_cur_freq'):
874 raise IOError(errno.ENOENT, "")
875 elif name.endswith('/cpuinfo_cur_freq'):
876 return io.BytesIO(b"200000")
877 elif name == '/proc/cpuinfo':
878 return io.BytesIO(b"cpu MHz : 200")
879 else:
880 return orig_open(name, *args, **kwargs)
881
882 orig_open = open
883 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
884 with mock.patch(patch_point, side_effect=open_mock):
885 with mock.patch('os.path.exists', return_value=True):
886 with mock.patch('psutil._pslinux.cpu_count_logical',
887 return_value=1):
888 freq = psutil.cpu_freq()
889 self.assertEqual(freq.current, 200)
890
891
892 @unittest.skipIf(not LINUX, "LINUX only")
893 class TestSystemCPUStats(PsutilTestCase):
894
895 def test_ctx_switches(self):
896 vmstat_value = vmstat("context switches")
897 psutil_value = psutil.cpu_stats().ctx_switches
898 self.assertAlmostEqual(vmstat_value, psutil_value, delta=500)
899
900 def test_interrupts(self):
901 vmstat_value = vmstat("interrupts")
902 psutil_value = psutil.cpu_stats().interrupts
903 self.assertAlmostEqual(vmstat_value, psutil_value, delta=500)
904
905
906 @unittest.skipIf(not LINUX, "LINUX only")
907 class TestLoadAvg(PsutilTestCase):
908
909 @unittest.skipIf(not HAS_GETLOADAVG, "not supported")
910 def test_getloadavg(self):
911 psutil_value = psutil.getloadavg()
912 with open("/proc/loadavg", "r") as f:
913 proc_value = f.read().split()
914
915 self.assertAlmostEqual(float(proc_value[0]), psutil_value[0], delta=1)
916 self.assertAlmostEqual(float(proc_value[1]), psutil_value[1], delta=1)
917 self.assertAlmostEqual(float(proc_value[2]), psutil_value[2], delta=1)
918
919
920 # =====================================================================
921 # --- system network
922 # =====================================================================
923
924
925 @unittest.skipIf(not LINUX, "LINUX only")
926 class TestSystemNetIfAddrs(PsutilTestCase):
927
928 def test_ips(self):
929 for name, addrs in psutil.net_if_addrs().items():
930 for addr in addrs:
931 if addr.family == psutil.AF_LINK:
932 self.assertEqual(addr.address, get_mac_address(name))
933 elif addr.family == socket.AF_INET:
934 self.assertEqual(addr.address, get_ipv4_address(name))
935 self.assertEqual(addr.netmask, get_ipv4_netmask(name))
936 if addr.broadcast is not None:
937 self.assertEqual(addr.broadcast,
938 get_ipv4_broadcast(name))
939 else:
940 self.assertEqual(get_ipv4_broadcast(name), '0.0.0.0')
941 elif addr.family == socket.AF_INET6:
942 # IPv6 addresses can have a percent symbol at the end.
943 # E.g. these 2 are equivalent:
944 # "fe80::1ff:fe23:4567:890a"
945 # "fe80::1ff:fe23:4567:890a%eth0"
946 # That is the "zone id" portion, which usually is the name
947 # of the network interface.
948 address = addr.address.split('%')[0]
949 self.assertEqual(address, get_ipv6_address(name))
950
951 # XXX - not reliable when having virtual NICs installed by Docker.
952 # @unittest.skipIf(not which('ip'), "'ip' utility not available")
953 # def test_net_if_names(self):
954 # out = sh("ip addr").strip()
955 # nics = [x for x in psutil.net_if_addrs().keys() if ':' not in x]
956 # found = 0
957 # for line in out.split('\n'):
958 # line = line.strip()
959 # if re.search(r"^\d+:", line):
960 # found += 1
961 # name = line.split(':')[1].strip()
962 # self.assertIn(name, nics)
963 # self.assertEqual(len(nics), found, msg="%s\n---\n%s" % (
964 # pprint.pformat(nics), out))
965
966
967 @unittest.skipIf(not LINUX, "LINUX only")
968 class TestSystemNetIfStats(PsutilTestCase):
969
970 def test_against_ifconfig(self):
971 for name, stats in psutil.net_if_stats().items():
972 try:
973 out = sh("ifconfig %s" % name)
974 except RuntimeError:
975 pass
976 else:
977 self.assertEqual(stats.isup, 'RUNNING' in out, msg=out)
978 self.assertEqual(stats.mtu,
979 int(re.findall(r'(?i)MTU[: ](\d+)', out)[0]))
980
981 def test_mtu(self):
982 for name, stats in psutil.net_if_stats().items():
983 with open("/sys/class/net/%s/mtu" % name, "rt") as f:
984 self.assertEqual(stats.mtu, int(f.read().strip()))
985
986
987 @unittest.skipIf(not LINUX, "LINUX only")
988 class TestSystemNetIOCounters(PsutilTestCase):
989
990 @retry_on_failure()
991 def test_against_ifconfig(self):
992 def ifconfig(nic):
993 ret = {}
994 out = sh("ifconfig %s" % name)
995 ret['packets_recv'] = int(
996 re.findall(r'RX packets[: ](\d+)', out)[0])
997 ret['packets_sent'] = int(
998 re.findall(r'TX packets[: ](\d+)', out)[0])
999 ret['errin'] = int(re.findall(r'errors[: ](\d+)', out)[0])
1000 ret['errout'] = int(re.findall(r'errors[: ](\d+)', out)[1])
1001 ret['dropin'] = int(re.findall(r'dropped[: ](\d+)', out)[0])
1002 ret['dropout'] = int(re.findall(r'dropped[: ](\d+)', out)[1])
1003 ret['bytes_recv'] = int(
1004 re.findall(r'RX (?:packets \d+ +)?bytes[: ](\d+)', out)[0])
1005 ret['bytes_sent'] = int(
1006 re.findall(r'TX (?:packets \d+ +)?bytes[: ](\d+)', out)[0])
1007 return ret
1008
1009 nio = psutil.net_io_counters(pernic=True, nowrap=False)
1010 for name, stats in nio.items():
1011 try:
1012 ifconfig_ret = ifconfig(name)
1013 except RuntimeError:
1014 continue
1015 self.assertAlmostEqual(
1016 stats.bytes_recv, ifconfig_ret['bytes_recv'], delta=1024 * 5)
1017 self.assertAlmostEqual(
1018 stats.bytes_sent, ifconfig_ret['bytes_sent'], delta=1024 * 5)
1019 self.assertAlmostEqual(
1020 stats.packets_recv, ifconfig_ret['packets_recv'], delta=1024)
1021 self.assertAlmostEqual(
1022 stats.packets_sent, ifconfig_ret['packets_sent'], delta=1024)
1023 self.assertAlmostEqual(
1024 stats.errin, ifconfig_ret['errin'], delta=10)
1025 self.assertAlmostEqual(
1026 stats.errout, ifconfig_ret['errout'], delta=10)
1027 self.assertAlmostEqual(
1028 stats.dropin, ifconfig_ret['dropin'], delta=10)
1029 self.assertAlmostEqual(
1030 stats.dropout, ifconfig_ret['dropout'], delta=10)
1031
1032
1033 @unittest.skipIf(not LINUX, "LINUX only")
1034 class TestSystemNetConnections(PsutilTestCase):
1035
1036 @mock.patch('psutil._pslinux.socket.inet_ntop', side_effect=ValueError)
1037 @mock.patch('psutil._pslinux.supports_ipv6', return_value=False)
1038 def test_emulate_ipv6_unsupported(self, supports_ipv6, inet_ntop):
1039 # see: https://github.com/giampaolo/psutil/issues/623
1040 try:
1041 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
1042 self.addCleanup(s.close)
1043 s.bind(("::1", 0))
1044 except socket.error:
1045 pass
1046 psutil.net_connections(kind='inet6')
1047
1048 def test_emulate_unix(self):
1049 with mock_open_content(
1050 '/proc/net/unix',
1051 textwrap.dedent("""\
1052 0: 00000003 000 000 0001 03 462170 @/tmp/dbus-Qw2hMPIU3n
1053 0: 00000003 000 000 0001 03 35010 @/tmp/dbus-tB2X8h69BQ
1054 0: 00000003 000 000 0001 03 34424 @/tmp/dbus-cHy80Y8O
1055 000000000000000000000000000000000000000000000000000000
1056 """)) as m:
1057 psutil.net_connections(kind='unix')
1058 assert m.called
1059
1060
1061 # =====================================================================
1062 # --- system disks
1063 # =====================================================================
1064
1065
1066 @unittest.skipIf(not LINUX, "LINUX only")
1067 class TestSystemDiskPartitions(PsutilTestCase):
1068
1069 @unittest.skipIf(not hasattr(os, 'statvfs'), "os.statvfs() not available")
1070 @skip_on_not_implemented()
1071 def test_against_df(self):
1072 # test psutil.disk_usage() and psutil.disk_partitions()
1073 # against "df -a"
1074 def df(path):
1075 out = sh('df -P -B 1 "%s"' % path).strip()
1076 lines = out.split('\n')
1077 lines.pop(0)
1078 line = lines.pop(0)
1079 dev, total, used, free = line.split()[:4]
1080 if dev == 'none':
1081 dev = ''
1082 total, used, free = int(total), int(used), int(free)
1083 return dev, total, used, free
1084
1085 for part in psutil.disk_partitions(all=False):
1086 usage = psutil.disk_usage(part.mountpoint)
1087 dev, total, used, free = df(part.mountpoint)
1088 self.assertEqual(usage.total, total)
1089 self.assertAlmostEqual(usage.free, free,
1090 delta=TOLERANCE_DISK_USAGE)
1091 self.assertAlmostEqual(usage.used, used,
1092 delta=TOLERANCE_DISK_USAGE)
1093
1094 def test_zfs_fs(self):
1095 # Test that ZFS partitions are returned.
1096 with open("/proc/filesystems", "r") as f:
1097 data = f.read()
1098 if 'zfs' in data:
1099 for part in psutil.disk_partitions():
1100 if part.fstype == 'zfs':
1101 break
1102 else:
1103 self.fail("couldn't find any ZFS partition")
1104 else:
1105 # No ZFS partitions on this system. Let's fake one.
1106 fake_file = io.StringIO(u("nodev\tzfs\n"))
1107 with mock.patch('psutil._common.open',
1108 return_value=fake_file, create=True) as m1:
1109 with mock.patch(
1110 'psutil._pslinux.cext.disk_partitions',
1111 return_value=[('/dev/sdb3', '/', 'zfs', 'rw')]) as m2:
1112 ret = psutil.disk_partitions()
1113 assert m1.called
1114 assert m2.called
1115 assert ret
1116 self.assertEqual(ret[0].fstype, 'zfs')
1117
1118 def test_emulate_realpath_fail(self):
1119 # See: https://github.com/giampaolo/psutil/issues/1307
1120 try:
1121 with mock.patch('os.path.realpath',
1122 return_value='/non/existent') as m:
1123 with self.assertRaises(FileNotFoundError):
1124 psutil.disk_partitions()
1125 assert m.called
1126 finally:
1127 psutil.PROCFS_PATH = "/proc"
1128
1129
1130 @unittest.skipIf(not LINUX, "LINUX only")
1131 class TestSystemDiskIoCounters(PsutilTestCase):
1132
1133 def test_emulate_kernel_2_4(self):
1134 # Tests /proc/diskstats parsing format for 2.4 kernels, see:
1135 # https://github.com/giampaolo/psutil/issues/767
1136 with mock_open_content(
1137 '/proc/diskstats',
1138 " 3 0 1 hda 2 3 4 5 6 7 8 9 10 11 12"):
1139 with mock.patch('psutil._pslinux.is_storage_device',
1140 return_value=True):
1141 ret = psutil.disk_io_counters(nowrap=False)
1142 self.assertEqual(ret.read_count, 1)
1143 self.assertEqual(ret.read_merged_count, 2)
1144 self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE)
1145 self.assertEqual(ret.read_time, 4)
1146 self.assertEqual(ret.write_count, 5)
1147 self.assertEqual(ret.write_merged_count, 6)
1148 self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE)
1149 self.assertEqual(ret.write_time, 8)
1150 self.assertEqual(ret.busy_time, 10)
1151
1152 def test_emulate_kernel_2_6_full(self):
1153 # Tests /proc/diskstats parsing format for 2.6 kernels,
1154 # lines reporting all metrics:
1155 # https://github.com/giampaolo/psutil/issues/767
1156 with mock_open_content(
1157 '/proc/diskstats',
1158 " 3 0 hda 1 2 3 4 5 6 7 8 9 10 11"):
1159 with mock.patch('psutil._pslinux.is_storage_device',
1160 return_value=True):
1161 ret = psutil.disk_io_counters(nowrap=False)
1162 self.assertEqual(ret.read_count, 1)
1163 self.assertEqual(ret.read_merged_count, 2)
1164 self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE)
1165 self.assertEqual(ret.read_time, 4)
1166 self.assertEqual(ret.write_count, 5)
1167 self.assertEqual(ret.write_merged_count, 6)
1168 self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE)
1169 self.assertEqual(ret.write_time, 8)
1170 self.assertEqual(ret.busy_time, 10)
1171
1172 def test_emulate_kernel_2_6_limited(self):
1173 # Tests /proc/diskstats parsing format for 2.6 kernels,
1174 # where one line of /proc/partitions return a limited
1175 # amount of metrics when it bumps into a partition
1176 # (instead of a disk). See:
1177 # https://github.com/giampaolo/psutil/issues/767
1178 with mock_open_content(
1179 '/proc/diskstats',
1180 " 3 1 hda 1 2 3 4"):
1181 with mock.patch('psutil._pslinux.is_storage_device',
1182 return_value=True):
1183 ret = psutil.disk_io_counters(nowrap=False)
1184 self.assertEqual(ret.read_count, 1)
1185 self.assertEqual(ret.read_bytes, 2 * SECTOR_SIZE)
1186 self.assertEqual(ret.write_count, 3)
1187 self.assertEqual(ret.write_bytes, 4 * SECTOR_SIZE)
1188
1189 self.assertEqual(ret.read_merged_count, 0)
1190 self.assertEqual(ret.read_time, 0)
1191 self.assertEqual(ret.write_merged_count, 0)
1192 self.assertEqual(ret.write_time, 0)
1193 self.assertEqual(ret.busy_time, 0)
1194
1195 def test_emulate_include_partitions(self):
1196 # Make sure that when perdisk=True disk partitions are returned,
1197 # see:
1198 # https://github.com/giampaolo/psutil/pull/1313#issuecomment-408626842
1199 with mock_open_content(
1200 '/proc/diskstats',
1201 textwrap.dedent("""\
1202 3 0 nvme0n1 1 2 3 4 5 6 7 8 9 10 11
1203 3 0 nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11
1204 """)):
1205 with mock.patch('psutil._pslinux.is_storage_device',
1206 return_value=False):
1207 ret = psutil.disk_io_counters(perdisk=True, nowrap=False)
1208 self.assertEqual(len(ret), 2)
1209 self.assertEqual(ret['nvme0n1'].read_count, 1)
1210 self.assertEqual(ret['nvme0n1p1'].read_count, 1)
1211 self.assertEqual(ret['nvme0n1'].write_count, 5)
1212 self.assertEqual(ret['nvme0n1p1'].write_count, 5)
1213
1214 def test_emulate_exclude_partitions(self):
1215 # Make sure that when perdisk=False partitions (e.g. 'sda1',
1216 # 'nvme0n1p1') are skipped and not included in the total count.
1217 # https://github.com/giampaolo/psutil/pull/1313#issuecomment-408626842
1218 with mock_open_content(
1219 '/proc/diskstats',
1220 textwrap.dedent("""\
1221 3 0 nvme0n1 1 2 3 4 5 6 7 8 9 10 11
1222 3 0 nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11
1223 """)):
1224 with mock.patch('psutil._pslinux.is_storage_device',
1225 return_value=False):
1226 ret = psutil.disk_io_counters(perdisk=False, nowrap=False)
1227 self.assertIsNone(ret)
1228
1229 #
1230 def is_storage_device(name):
1231 return name == 'nvme0n1'
1232
1233 with mock_open_content(
1234 '/proc/diskstats',
1235 textwrap.dedent("""\
1236 3 0 nvme0n1 1 2 3 4 5 6 7 8 9 10 11
1237 3 0 nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11
1238 """)):
1239 with mock.patch('psutil._pslinux.is_storage_device',
1240 create=True, side_effect=is_storage_device):
1241 ret = psutil.disk_io_counters(perdisk=False, nowrap=False)
1242 self.assertEqual(ret.read_count, 1)
1243 self.assertEqual(ret.write_count, 5)
1244
1245 def test_emulate_use_sysfs(self):
1246 def exists(path):
1247 if path == '/proc/diskstats':
1248 return False
1249 return True
1250
1251 wprocfs = psutil.disk_io_counters(perdisk=True)
1252 with mock.patch('psutil._pslinux.os.path.exists',
1253 create=True, side_effect=exists):
1254 wsysfs = psutil.disk_io_counters(perdisk=True)
1255 self.assertEqual(len(wprocfs), len(wsysfs))
1256
1257 def test_emulate_not_impl(self):
1258 def exists(path):
1259 return False
1260
1261 with mock.patch('psutil._pslinux.os.path.exists',
1262 create=True, side_effect=exists):
1263 self.assertRaises(NotImplementedError, psutil.disk_io_counters)
1264
1265
1266 # =====================================================================
1267 # --- misc
1268 # =====================================================================
1269
1270
1271 @unittest.skipIf(not LINUX, "LINUX only")
1272 class TestMisc(PsutilTestCase):
1273
1274 def test_boot_time(self):
1275 vmstat_value = vmstat('boot time')
1276 psutil_value = psutil.boot_time()
1277 self.assertEqual(int(vmstat_value), int(psutil_value))
1278
1279 def test_no_procfs_on_import(self):
1280 my_procfs = self.get_testfn()
1281 os.mkdir(my_procfs)
1282
1283 with open(os.path.join(my_procfs, 'stat'), 'w') as f:
1284 f.write('cpu 0 0 0 0 0 0 0 0 0 0\n')
1285 f.write('cpu0 0 0 0 0 0 0 0 0 0 0\n')
1286 f.write('cpu1 0 0 0 0 0 0 0 0 0 0\n')
1287
1288 try:
1289 orig_open = open
1290
1291 def open_mock(name, *args, **kwargs):
1292 if name.startswith('/proc'):
1293 raise IOError(errno.ENOENT, 'rejecting access for test')
1294 return orig_open(name, *args, **kwargs)
1295
1296 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1297 with mock.patch(patch_point, side_effect=open_mock):
1298 reload_module(psutil)
1299
1300 self.assertRaises(IOError, psutil.cpu_times)
1301 self.assertRaises(IOError, psutil.cpu_times, percpu=True)
1302 self.assertRaises(IOError, psutil.cpu_percent)
1303 self.assertRaises(IOError, psutil.cpu_percent, percpu=True)
1304 self.assertRaises(IOError, psutil.cpu_times_percent)
1305 self.assertRaises(
1306 IOError, psutil.cpu_times_percent, percpu=True)
1307
1308 psutil.PROCFS_PATH = my_procfs
1309
1310 self.assertEqual(psutil.cpu_percent(), 0)
1311 self.assertEqual(sum(psutil.cpu_times_percent()), 0)
1312
1313 # since we don't know the number of CPUs at import time,
1314 # we awkwardly say there are none until the second call
1315 per_cpu_percent = psutil.cpu_percent(percpu=True)
1316 self.assertEqual(sum(per_cpu_percent), 0)
1317
1318 # ditto awkward length
1319 per_cpu_times_percent = psutil.cpu_times_percent(percpu=True)
1320 self.assertEqual(sum(map(sum, per_cpu_times_percent)), 0)
1321
1322 # much user, very busy
1323 with open(os.path.join(my_procfs, 'stat'), 'w') as f:
1324 f.write('cpu 1 0 0 0 0 0 0 0 0 0\n')
1325 f.write('cpu0 1 0 0 0 0 0 0 0 0 0\n')
1326 f.write('cpu1 1 0 0 0 0 0 0 0 0 0\n')
1327
1328 self.assertNotEqual(psutil.cpu_percent(), 0)
1329 self.assertNotEqual(
1330 sum(psutil.cpu_percent(percpu=True)), 0)
1331 self.assertNotEqual(sum(psutil.cpu_times_percent()), 0)
1332 self.assertNotEqual(
1333 sum(map(sum, psutil.cpu_times_percent(percpu=True))), 0)
1334 finally:
1335 shutil.rmtree(my_procfs)
1336 reload_module(psutil)
1337
1338 self.assertEqual(psutil.PROCFS_PATH, '/proc')
1339
1340 def test_cpu_steal_decrease(self):
1341 # Test cumulative cpu stats decrease. We should ignore this.
1342 # See issue #1210.
1343 with mock_open_content(
1344 "/proc/stat",
1345 textwrap.dedent("""\
1346 cpu 0 0 0 0 0 0 0 1 0 0
1347 cpu0 0 0 0 0 0 0 0 1 0 0
1348 cpu1 0 0 0 0 0 0 0 1 0 0
1349 """).encode()) as m:
1350 # first call to "percent" functions should read the new stat file
1351 # and compare to the "real" file read at import time - so the
1352 # values are meaningless
1353 psutil.cpu_percent()
1354 assert m.called
1355 psutil.cpu_percent(percpu=True)
1356 psutil.cpu_times_percent()
1357 psutil.cpu_times_percent(percpu=True)
1358
1359 with mock_open_content(
1360 "/proc/stat",
1361 textwrap.dedent("""\
1362 cpu 1 0 0 0 0 0 0 0 0 0
1363 cpu0 1 0 0 0 0 0 0 0 0 0
1364 cpu1 1 0 0 0 0 0 0 0 0 0
1365 """).encode()) as m:
1366 # Increase "user" while steal goes "backwards" to zero.
1367 cpu_percent = psutil.cpu_percent()
1368 assert m.called
1369 cpu_percent_percpu = psutil.cpu_percent(percpu=True)
1370 cpu_times_percent = psutil.cpu_times_percent()
1371 cpu_times_percent_percpu = psutil.cpu_times_percent(percpu=True)
1372 self.assertNotEqual(cpu_percent, 0)
1373 self.assertNotEqual(sum(cpu_percent_percpu), 0)
1374 self.assertNotEqual(sum(cpu_times_percent), 0)
1375 self.assertNotEqual(sum(cpu_times_percent), 100.0)
1376 self.assertNotEqual(sum(map(sum, cpu_times_percent_percpu)), 0)
1377 self.assertNotEqual(sum(map(sum, cpu_times_percent_percpu)), 100.0)
1378 self.assertEqual(cpu_times_percent.steal, 0)
1379 self.assertNotEqual(cpu_times_percent.user, 0)
1380
1381 def test_boot_time_mocked(self):
1382 with mock.patch('psutil._common.open', create=True) as m:
1383 self.assertRaises(
1384 RuntimeError,
1385 psutil._pslinux.boot_time)
1386 assert m.called
1387
1388 def test_users_mocked(self):
1389 # Make sure ':0' and ':0.0' (returned by C ext) are converted
1390 # to 'localhost'.
1391 with mock.patch('psutil._pslinux.cext.users',
1392 return_value=[('giampaolo', 'pts/2', ':0',
1393 1436573184.0, True, 2)]) as m:
1394 self.assertEqual(psutil.users()[0].host, 'localhost')
1395 assert m.called
1396 with mock.patch('psutil._pslinux.cext.users',
1397 return_value=[('giampaolo', 'pts/2', ':0.0',
1398 1436573184.0, True, 2)]) as m:
1399 self.assertEqual(psutil.users()[0].host, 'localhost')
1400 assert m.called
1401 # ...otherwise it should be returned as-is
1402 with mock.patch('psutil._pslinux.cext.users',
1403 return_value=[('giampaolo', 'pts/2', 'foo',
1404 1436573184.0, True, 2)]) as m:
1405 self.assertEqual(psutil.users()[0].host, 'foo')
1406 assert m.called
1407
1408 def test_procfs_path(self):
1409 tdir = self.get_testfn()
1410 os.mkdir(tdir)
1411 try:
1412 psutil.PROCFS_PATH = tdir
1413 self.assertRaises(IOError, psutil.virtual_memory)
1414 self.assertRaises(IOError, psutil.cpu_times)
1415 self.assertRaises(IOError, psutil.cpu_times, percpu=True)
1416 self.assertRaises(IOError, psutil.boot_time)
1417 # self.assertRaises(IOError, psutil.pids)
1418 self.assertRaises(IOError, psutil.net_connections)
1419 self.assertRaises(IOError, psutil.net_io_counters)
1420 self.assertRaises(IOError, psutil.net_if_stats)
1421 # self.assertRaises(IOError, psutil.disk_io_counters)
1422 self.assertRaises(IOError, psutil.disk_partitions)
1423 self.assertRaises(psutil.NoSuchProcess, psutil.Process)
1424 finally:
1425 psutil.PROCFS_PATH = "/proc"
1426
1427 @retry_on_failure()
1428 def test_issue_687(self):
1429 # In case of thread ID:
1430 # - pid_exists() is supposed to return False
1431 # - Process(tid) is supposed to work
1432 # - pids() should not return the TID
1433 # See: https://github.com/giampaolo/psutil/issues/687
1434 t = ThreadTask()
1435 t.start()
1436 try:
1437 p = psutil.Process()
1438 threads = p.threads()
1439 self.assertEqual(len(threads), 2)
1440 tid = sorted(threads, key=lambda x: x.id)[1].id
1441 self.assertNotEqual(p.pid, tid)
1442 pt = psutil.Process(tid)
1443 pt.as_dict()
1444 self.assertNotIn(tid, psutil.pids())
1445 finally:
1446 t.stop()
1447
1448 def test_pid_exists_no_proc_status(self):
1449 # Internally pid_exists relies on /proc/{pid}/status.
1450 # Emulate a case where this file is empty in which case
1451 # psutil is supposed to fall back on using pids().
1452 with mock_open_content("/proc/%s/status", "") as m:
1453 assert psutil.pid_exists(os.getpid())
1454 assert m.called
1455
1456
1457 # =====================================================================
1458 # --- sensors
1459 # =====================================================================
1460
1461
1462 @unittest.skipIf(not LINUX, "LINUX only")
1463 @unittest.skipIf(not HAS_BATTERY, "no battery")
1464 class TestSensorsBattery(PsutilTestCase):
1465
1466 @unittest.skipIf(not which("acpi"), "acpi utility not available")
1467 def test_percent(self):
1468 out = sh("acpi -b")
1469 acpi_value = int(out.split(",")[1].strip().replace('%', ''))
1470 psutil_value = psutil.sensors_battery().percent
1471 self.assertAlmostEqual(acpi_value, psutil_value, delta=1)
1472
1473 def test_emulate_power_plugged(self):
1474 # Pretend the AC power cable is connected.
1475 def open_mock(name, *args, **kwargs):
1476 if name.endswith("AC0/online") or name.endswith("AC/online"):
1477 return io.BytesIO(b"1")
1478 else:
1479 return orig_open(name, *args, **kwargs)
1480
1481 orig_open = open
1482 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1483 with mock.patch(patch_point, side_effect=open_mock) as m:
1484 self.assertEqual(psutil.sensors_battery().power_plugged, True)
1485 self.assertEqual(
1486 psutil.sensors_battery().secsleft, psutil.POWER_TIME_UNLIMITED)
1487 assert m.called
1488
1489 def test_emulate_power_plugged_2(self):
1490 # Same as above but pretend /AC0/online does not exist in which
1491 # case code relies on /status file.
1492 def open_mock(name, *args, **kwargs):
1493 if name.endswith("AC0/online") or name.endswith("AC/online"):
1494 raise IOError(errno.ENOENT, "")
1495 elif name.endswith("/status"):
1496 return io.StringIO(u("charging"))
1497 else:
1498 return orig_open(name, *args, **kwargs)
1499
1500 orig_open = open
1501 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1502 with mock.patch(patch_point, side_effect=open_mock) as m:
1503 self.assertEqual(psutil.sensors_battery().power_plugged, True)
1504 assert m.called
1505
1506 def test_emulate_power_not_plugged(self):
1507 # Pretend the AC power cable is not connected.
1508 def open_mock(name, *args, **kwargs):
1509 if name.endswith("AC0/online") or name.endswith("AC/online"):
1510 return io.BytesIO(b"0")
1511 else:
1512 return orig_open(name, *args, **kwargs)
1513
1514 orig_open = open
1515 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1516 with mock.patch(patch_point, side_effect=open_mock) as m:
1517 self.assertEqual(psutil.sensors_battery().power_plugged, False)
1518 assert m.called
1519
1520 def test_emulate_power_not_plugged_2(self):
1521 # Same as above but pretend /AC0/online does not exist in which
1522 # case code relies on /status file.
1523 def open_mock(name, *args, **kwargs):
1524 if name.endswith("AC0/online") or name.endswith("AC/online"):
1525 raise IOError(errno.ENOENT, "")
1526 elif name.endswith("/status"):
1527 return io.StringIO(u("discharging"))
1528 else:
1529 return orig_open(name, *args, **kwargs)
1530
1531 orig_open = open
1532 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1533 with mock.patch(patch_point, side_effect=open_mock) as m:
1534 self.assertEqual(psutil.sensors_battery().power_plugged, False)
1535 assert m.called
1536
1537 def test_emulate_power_undetermined(self):
1538 # Pretend we can't know whether the AC power cable not
1539 # connected (assert fallback to False).
1540 def open_mock(name, *args, **kwargs):
1541 if name.startswith("/sys/class/power_supply/AC0/online") or \
1542 name.startswith("/sys/class/power_supply/AC/online"):
1543 raise IOError(errno.ENOENT, "")
1544 elif name.startswith("/sys/class/power_supply/BAT0/status"):
1545 return io.BytesIO(b"???")
1546 else:
1547 return orig_open(name, *args, **kwargs)
1548
1549 orig_open = open
1550 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1551 with mock.patch(patch_point, side_effect=open_mock) as m:
1552 self.assertIsNone(psutil.sensors_battery().power_plugged)
1553 assert m.called
1554
1555 def test_emulate_energy_full_0(self):
1556 # Emulate a case where energy_full files returns 0.
1557 with mock_open_content(
1558 "/sys/class/power_supply/BAT0/energy_full", b"0") as m:
1559 self.assertEqual(psutil.sensors_battery().percent, 0)
1560 assert m.called
1561
1562 def test_emulate_energy_full_not_avail(self):
1563 # Emulate a case where energy_full file does not exist.
1564 # Expected fallback on /capacity.
1565 with mock_open_exception(
1566 "/sys/class/power_supply/BAT0/energy_full",
1567 IOError(errno.ENOENT, "")):
1568 with mock_open_exception(
1569 "/sys/class/power_supply/BAT0/charge_full",
1570 IOError(errno.ENOENT, "")):
1571 with mock_open_content(
1572 "/sys/class/power_supply/BAT0/capacity", b"88"):
1573 self.assertEqual(psutil.sensors_battery().percent, 88)
1574
1575 def test_emulate_no_power(self):
1576 # Emulate a case where /AC0/online file nor /BAT0/status exist.
1577 with mock_open_exception(
1578 "/sys/class/power_supply/AC/online",
1579 IOError(errno.ENOENT, "")):
1580 with mock_open_exception(
1581 "/sys/class/power_supply/AC0/online",
1582 IOError(errno.ENOENT, "")):
1583 with mock_open_exception(
1584 "/sys/class/power_supply/BAT0/status",
1585 IOError(errno.ENOENT, "")):
1586 self.assertIsNone(psutil.sensors_battery().power_plugged)
1587
1588
1589 @unittest.skipIf(not LINUX, "LINUX only")
1590 class TestSensorsBatteryEmulated(PsutilTestCase):
1591
1592 def test_it(self):
1593 def open_mock(name, *args, **kwargs):
1594 if name.endswith("/energy_now"):
1595 return io.StringIO(u("60000000"))
1596 elif name.endswith("/power_now"):
1597 return io.StringIO(u("0"))
1598 elif name.endswith("/energy_full"):
1599 return io.StringIO(u("60000001"))
1600 else:
1601 return orig_open(name, *args, **kwargs)
1602
1603 orig_open = open
1604 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1605 with mock.patch('os.listdir', return_value=["BAT0"]) as mlistdir:
1606 with mock.patch(patch_point, side_effect=open_mock) as mopen:
1607 self.assertIsNotNone(psutil.sensors_battery())
1608 assert mlistdir.called
1609 assert mopen.called
1610
1611
1612 @unittest.skipIf(not LINUX, "LINUX only")
1613 class TestSensorsTemperatures(PsutilTestCase):
1614
1615 def test_emulate_class_hwmon(self):
1616 def open_mock(name, *args, **kwargs):
1617 if name.endswith('/name'):
1618 return io.StringIO(u("name"))
1619 elif name.endswith('/temp1_label'):
1620 return io.StringIO(u("label"))
1621 elif name.endswith('/temp1_input'):
1622 return io.BytesIO(b"30000")
1623 elif name.endswith('/temp1_max'):
1624 return io.BytesIO(b"40000")
1625 elif name.endswith('/temp1_crit'):
1626 return io.BytesIO(b"50000")
1627 else:
1628 return orig_open(name, *args, **kwargs)
1629
1630 orig_open = open
1631 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1632 with mock.patch(patch_point, side_effect=open_mock):
1633 # Test case with /sys/class/hwmon
1634 with mock.patch('glob.glob',
1635 return_value=['/sys/class/hwmon/hwmon0/temp1']):
1636 temp = psutil.sensors_temperatures()['name'][0]
1637 self.assertEqual(temp.label, 'label')
1638 self.assertEqual(temp.current, 30.0)
1639 self.assertEqual(temp.high, 40.0)
1640 self.assertEqual(temp.critical, 50.0)
1641
1642 def test_emulate_class_thermal(self):
1643 def open_mock(name, *args, **kwargs):
1644 if name.endswith('0_temp'):
1645 return io.BytesIO(b"50000")
1646 elif name.endswith('temp'):
1647 return io.BytesIO(b"30000")
1648 elif name.endswith('0_type'):
1649 return io.StringIO(u("critical"))
1650 elif name.endswith('type'):
1651 return io.StringIO(u("name"))
1652 else:
1653 return orig_open(name, *args, **kwargs)
1654
1655 def glob_mock(path):
1656 if path == '/sys/class/hwmon/hwmon*/temp*_*':
1657 return []
1658 elif path == '/sys/class/hwmon/hwmon*/device/temp*_*':
1659 return []
1660 elif path == '/sys/class/thermal/thermal_zone*':
1661 return ['/sys/class/thermal/thermal_zone0']
1662 elif path == '/sys/class/thermal/thermal_zone0/trip_point*':
1663 return ['/sys/class/thermal/thermal_zone1/trip_point_0_type',
1664 '/sys/class/thermal/thermal_zone1/trip_point_0_temp']
1665 return []
1666
1667 orig_open = open
1668 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1669 with mock.patch(patch_point, side_effect=open_mock):
1670 with mock.patch('glob.glob', create=True, side_effect=glob_mock):
1671 temp = psutil.sensors_temperatures()['name'][0]
1672 self.assertEqual(temp.label, '')
1673 self.assertEqual(temp.current, 30.0)
1674 self.assertEqual(temp.high, 50.0)
1675 self.assertEqual(temp.critical, 50.0)
1676
1677
1678 @unittest.skipIf(not LINUX, "LINUX only")
1679 class TestSensorsFans(PsutilTestCase):
1680
1681 def test_emulate_data(self):
1682 def open_mock(name, *args, **kwargs):
1683 if name.endswith('/name'):
1684 return io.StringIO(u("name"))
1685 elif name.endswith('/fan1_label'):
1686 return io.StringIO(u("label"))
1687 elif name.endswith('/fan1_input'):
1688 return io.StringIO(u("2000"))
1689 else:
1690 return orig_open(name, *args, **kwargs)
1691
1692 orig_open = open
1693 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1694 with mock.patch(patch_point, side_effect=open_mock):
1695 with mock.patch('glob.glob',
1696 return_value=['/sys/class/hwmon/hwmon2/fan1']):
1697 fan = psutil.sensors_fans()['name'][0]
1698 self.assertEqual(fan.label, 'label')
1699 self.assertEqual(fan.current, 2000)
1700
1701
1702 # =====================================================================
1703 # --- test process
1704 # =====================================================================
1705
1706
1707 @unittest.skipIf(not LINUX, "LINUX only")
1708 class TestProcess(PsutilTestCase):
1709
1710 @retry_on_failure()
1711 def test_memory_full_info(self):
1712 testfn = self.get_testfn()
1713 src = textwrap.dedent("""
1714 import time
1715 with open("%s", "w") as f:
1716 time.sleep(10)
1717 """ % testfn)
1718 sproc = self.pyrun(src)
1719 call_until(lambda: os.listdir('.'), "'%s' not in ret" % testfn)
1720 p = psutil.Process(sproc.pid)
1721 time.sleep(.1)
1722 mem = p.memory_full_info()
1723 maps = p.memory_maps(grouped=False)
1724 self.assertAlmostEqual(
1725 mem.uss, sum([x.private_dirty + x.private_clean for x in maps]),
1726 delta=4096)
1727 self.assertAlmostEqual(
1728 mem.pss, sum([x.pss for x in maps]), delta=4096)
1729 self.assertAlmostEqual(
1730 mem.swap, sum([x.swap for x in maps]), delta=4096)
1731
1732 def test_memory_full_info_mocked(self):
1733 # See: https://github.com/giampaolo/psutil/issues/1222
1734 with mock_open_content(
1735 "/proc/%s/smaps" % os.getpid(),
1736 textwrap.dedent("""\
1737 fffff0 r-xp 00000000 00:00 0 [vsyscall]
1738 Size: 1 kB
1739 Rss: 2 kB
1740 Pss: 3 kB
1741 Shared_Clean: 4 kB
1742 Shared_Dirty: 5 kB
1743 Private_Clean: 6 kB
1744 Private_Dirty: 7 kB
1745 Referenced: 8 kB
1746 Anonymous: 9 kB
1747 LazyFree: 10 kB
1748 AnonHugePages: 11 kB
1749 ShmemPmdMapped: 12 kB
1750 Shared_Hugetlb: 13 kB
1751 Private_Hugetlb: 14 kB
1752 Swap: 15 kB
1753 SwapPss: 16 kB
1754 KernelPageSize: 17 kB
1755 MMUPageSize: 18 kB
1756 Locked: 19 kB
1757 VmFlags: rd ex
1758 """).encode()) as m:
1759 p = psutil.Process()
1760 mem = p.memory_full_info()
1761 assert m.called
1762 self.assertEqual(mem.uss, (6 + 7 + 14) * 1024)
1763 self.assertEqual(mem.pss, 3 * 1024)
1764 self.assertEqual(mem.swap, 15 * 1024)
1765
1766 # On PYPY file descriptors are not closed fast enough.
1767 @unittest.skipIf(PYPY, "unreliable on PYPY")
1768 def test_open_files_mode(self):
1769 def get_test_file(fname):
1770 p = psutil.Process()
1771 giveup_at = time.time() + GLOBAL_TIMEOUT
1772 while True:
1773 for file in p.open_files():
1774 if file.path == os.path.abspath(fname):
1775 return file
1776 elif time.time() > giveup_at:
1777 break
1778 raise RuntimeError("timeout looking for test file")
1779
1780 #
1781 testfn = self.get_testfn()
1782 with open(testfn, "w"):
1783 self.assertEqual(get_test_file(testfn).mode, "w")
1784 with open(testfn, "r"):
1785 self.assertEqual(get_test_file(testfn).mode, "r")
1786 with open(testfn, "a"):
1787 self.assertEqual(get_test_file(testfn).mode, "a")
1788 #
1789 with open(testfn, "r+"):
1790 self.assertEqual(get_test_file(testfn).mode, "r+")
1791 with open(testfn, "w+"):
1792 self.assertEqual(get_test_file(testfn).mode, "r+")
1793 with open(testfn, "a+"):
1794 self.assertEqual(get_test_file(testfn).mode, "a+")
1795 # note: "x" bit is not supported
1796 if PY3:
1797 safe_rmpath(testfn)
1798 with open(testfn, "x"):
1799 self.assertEqual(get_test_file(testfn).mode, "w")
1800 safe_rmpath(testfn)
1801 with open(testfn, "x+"):
1802 self.assertEqual(get_test_file(testfn).mode, "r+")
1803
1804 def test_open_files_file_gone(self):
1805 # simulates a file which gets deleted during open_files()
1806 # execution
1807 p = psutil.Process()
1808 files = p.open_files()
1809 with open(self.get_testfn(), 'w'):
1810 # give the kernel some time to see the new file
1811 call_until(p.open_files, "len(ret) != %i" % len(files))
1812 with mock.patch('psutil._pslinux.os.readlink',
1813 side_effect=OSError(errno.ENOENT, "")) as m:
1814 files = p.open_files()
1815 assert not files
1816 assert m.called
1817 # also simulate the case where os.readlink() returns EINVAL
1818 # in which case psutil is supposed to 'continue'
1819 with mock.patch('psutil._pslinux.os.readlink',
1820 side_effect=OSError(errno.EINVAL, "")) as m:
1821 self.assertEqual(p.open_files(), [])
1822 assert m.called
1823
1824 def test_open_files_fd_gone(self):
1825 # Simulate a case where /proc/{pid}/fdinfo/{fd} disappears
1826 # while iterating through fds.
1827 # https://travis-ci.org/giampaolo/psutil/jobs/225694530
1828 p = psutil.Process()
1829 files = p.open_files()
1830 with open(self.get_testfn(), 'w'):
1831 # give the kernel some time to see the new file
1832 call_until(p.open_files, "len(ret) != %i" % len(files))
1833 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1834 with mock.patch(patch_point,
1835 side_effect=IOError(errno.ENOENT, "")) as m:
1836 files = p.open_files()
1837 assert not files
1838 assert m.called
1839
1840 # --- mocked tests
1841
1842 def test_terminal_mocked(self):
1843 with mock.patch('psutil._pslinux._psposix.get_terminal_map',
1844 return_value={}) as m:
1845 self.assertIsNone(psutil._pslinux.Process(os.getpid()).terminal())
1846 assert m.called
1847
1848 # TODO: re-enable this test.
1849 # def test_num_ctx_switches_mocked(self):
1850 # with mock.patch('psutil._common.open', create=True) as m:
1851 # self.assertRaises(
1852 # NotImplementedError,
1853 # psutil._pslinux.Process(os.getpid()).num_ctx_switches)
1854 # assert m.called
1855
1856 def test_cmdline_mocked(self):
1857 # see: https://github.com/giampaolo/psutil/issues/639
1858 p = psutil.Process()
1859 fake_file = io.StringIO(u('foo\x00bar\x00'))
1860 with mock.patch('psutil._common.open',
1861 return_value=fake_file, create=True) as m:
1862 self.assertEqual(p.cmdline(), ['foo', 'bar'])
1863 assert m.called
1864 fake_file = io.StringIO(u('foo\x00bar\x00\x00'))
1865 with mock.patch('psutil._common.open',
1866 return_value=fake_file, create=True) as m:
1867 self.assertEqual(p.cmdline(), ['foo', 'bar', ''])
1868 assert m.called
1869
1870 def test_cmdline_spaces_mocked(self):
1871 # see: https://github.com/giampaolo/psutil/issues/1179
1872 p = psutil.Process()
1873 fake_file = io.StringIO(u('foo bar '))
1874 with mock.patch('psutil._common.open',
1875 return_value=fake_file, create=True) as m:
1876 self.assertEqual(p.cmdline(), ['foo', 'bar'])
1877 assert m.called
1878 fake_file = io.StringIO(u('foo bar '))
1879 with mock.patch('psutil._common.open',
1880 return_value=fake_file, create=True) as m:
1881 self.assertEqual(p.cmdline(), ['foo', 'bar', ''])
1882 assert m.called
1883
1884 def test_cmdline_mixed_separators(self):
1885 # https://github.com/giampaolo/psutil/issues/
1886 # 1179#issuecomment-552984549
1887 p = psutil.Process()
1888 fake_file = io.StringIO(u('foo\x20bar\x00'))
1889 with mock.patch('psutil._common.open',
1890 return_value=fake_file, create=True) as m:
1891 self.assertEqual(p.cmdline(), ['foo', 'bar'])
1892 assert m.called
1893
1894 def test_readlink_path_deleted_mocked(self):
1895 with mock.patch('psutil._pslinux.os.readlink',
1896 return_value='/home/foo (deleted)'):
1897 self.assertEqual(psutil.Process().exe(), "/home/foo")
1898 self.assertEqual(psutil.Process().cwd(), "/home/foo")
1899
1900 def test_threads_mocked(self):
1901 # Test the case where os.listdir() returns a file (thread)
1902 # which no longer exists by the time we open() it (race
1903 # condition). threads() is supposed to ignore that instead
1904 # of raising NSP.
1905 def open_mock(name, *args, **kwargs):
1906 if name.startswith('/proc/%s/task' % os.getpid()):
1907 raise IOError(errno.ENOENT, "")
1908 else:
1909 return orig_open(name, *args, **kwargs)
1910
1911 orig_open = open
1912 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1913 with mock.patch(patch_point, side_effect=open_mock) as m:
1914 ret = psutil.Process().threads()
1915 assert m.called
1916 self.assertEqual(ret, [])
1917
1918 # ...but if it bumps into something != ENOENT we want an
1919 # exception.
1920 def open_mock(name, *args, **kwargs):
1921 if name.startswith('/proc/%s/task' % os.getpid()):
1922 raise IOError(errno.EPERM, "")
1923 else:
1924 return orig_open(name, *args, **kwargs)
1925
1926 with mock.patch(patch_point, side_effect=open_mock):
1927 self.assertRaises(psutil.AccessDenied, psutil.Process().threads)
1928
1929 def test_exe_mocked(self):
1930 with mock.patch('psutil._pslinux.readlink',
1931 side_effect=OSError(errno.ENOENT, "")) as m1:
1932 with mock.patch('psutil.Process.cmdline',
1933 side_effect=psutil.AccessDenied(0, "")) as m2:
1934 # No such file error; might be raised also if /proc/pid/exe
1935 # path actually exists for system processes with low pids
1936 # (about 0-20). In this case psutil is supposed to return
1937 # an empty string.
1938 ret = psutil.Process().exe()
1939 assert m1.called
1940 assert m2.called
1941 self.assertEqual(ret, "")
1942
1943 # ...but if /proc/pid no longer exist we're supposed to treat
1944 # it as an alias for zombie process
1945 with mock.patch('psutil._pslinux.os.path.lexists',
1946 return_value=False):
1947 self.assertRaises(
1948 psutil.ZombieProcess, psutil.Process().exe)
1949
1950 def test_issue_1014(self):
1951 # Emulates a case where smaps file does not exist. In this case
1952 # wrap_exception decorator should not raise NoSuchProcess.
1953 with mock_open_exception(
1954 '/proc/%s/smaps' % os.getpid(),
1955 IOError(errno.ENOENT, "")) as m:
1956 p = psutil.Process()
1957 with self.assertRaises(FileNotFoundError):
1958 p.memory_maps()
1959 assert m.called
1960
1961 @unittest.skipIf(not HAS_RLIMIT, "not supported")
1962 def test_rlimit_zombie(self):
1963 # Emulate a case where rlimit() raises ENOSYS, which may
1964 # happen in case of zombie process:
1965 # https://travis-ci.org/giampaolo/psutil/jobs/51368273
1966 with mock.patch("psutil._pslinux.prlimit",
1967 side_effect=OSError(errno.ENOSYS, "")) as m:
1968 p = psutil.Process()
1969 p.name()
1970 with self.assertRaises(psutil.ZombieProcess) as exc:
1971 p.rlimit(psutil.RLIMIT_NOFILE)
1972 assert m.called
1973 self.assertEqual(exc.exception.pid, p.pid)
1974 self.assertEqual(exc.exception.name, p.name())
1975
1976 def test_cwd_zombie(self):
1977 with mock.patch("psutil._pslinux.os.readlink",
1978 side_effect=OSError(errno.ENOENT, "")) as m:
1979 p = psutil.Process()
1980 p.name()
1981 with self.assertRaises(psutil.ZombieProcess) as exc:
1982 p.cwd()
1983 assert m.called
1984 self.assertEqual(exc.exception.pid, p.pid)
1985 self.assertEqual(exc.exception.name, p.name())
1986
1987 def test_stat_file_parsing(self):
1988 from psutil._pslinux import CLOCK_TICKS
1989
1990 args = [
1991 "0", # pid
1992 "(cat)", # name
1993 "Z", # status
1994 "1", # ppid
1995 "0", # pgrp
1996 "0", # session
1997 "0", # tty
1998 "0", # tpgid
1999 "0", # flags
2000 "0", # minflt
2001 "0", # cminflt
2002 "0", # majflt
2003 "0", # cmajflt
2004 "2", # utime
2005 "3", # stime
2006 "4", # cutime
2007 "5", # cstime
2008 "0", # priority
2009 "0", # nice
2010 "0", # num_threads
2011 "0", # itrealvalue
2012 "6", # starttime
2013 "0", # vsize
2014 "0", # rss
2015 "0", # rsslim
2016 "0", # startcode
2017 "0", # endcode
2018 "0", # startstack
2019 "0", # kstkesp
2020 "0", # kstkeip
2021 "0", # signal
2022 "0", # blocked
2023 "0", # sigignore
2024 "0", # sigcatch
2025 "0", # wchan
2026 "0", # nswap
2027 "0", # cnswap
2028 "0", # exit_signal
2029 "6", # processor
2030 "0", # rt priority
2031 "0", # policy
2032 "7", # delayacct_blkio_ticks
2033 ]
2034 content = " ".join(args).encode()
2035 with mock_open_content('/proc/%s/stat' % os.getpid(), content):
2036 p = psutil.Process()
2037 self.assertEqual(p.name(), 'cat')
2038 self.assertEqual(p.status(), psutil.STATUS_ZOMBIE)
2039 self.assertEqual(p.ppid(), 1)
2040 self.assertEqual(
2041 p.create_time(), 6 / CLOCK_TICKS + psutil.boot_time())
2042 cpu = p.cpu_times()
2043 self.assertEqual(cpu.user, 2 / CLOCK_TICKS)
2044 self.assertEqual(cpu.system, 3 / CLOCK_TICKS)
2045 self.assertEqual(cpu.children_user, 4 / CLOCK_TICKS)
2046 self.assertEqual(cpu.children_system, 5 / CLOCK_TICKS)
2047 self.assertEqual(cpu.iowait, 7 / CLOCK_TICKS)
2048 self.assertEqual(p.cpu_num(), 6)
2049
2050 def test_status_file_parsing(self):
2051 with mock_open_content(
2052 '/proc/%s/status' % os.getpid(),
2053 textwrap.dedent("""\
2054 Uid:\t1000\t1001\t1002\t1003
2055 Gid:\t1004\t1005\t1006\t1007
2056 Threads:\t66
2057 Cpus_allowed:\tf
2058 Cpus_allowed_list:\t0-7
2059 voluntary_ctxt_switches:\t12
2060 nonvoluntary_ctxt_switches:\t13""").encode()):
2061 p = psutil.Process()
2062 self.assertEqual(p.num_ctx_switches().voluntary, 12)
2063 self.assertEqual(p.num_ctx_switches().involuntary, 13)
2064 self.assertEqual(p.num_threads(), 66)
2065 uids = p.uids()
2066 self.assertEqual(uids.real, 1000)
2067 self.assertEqual(uids.effective, 1001)
2068 self.assertEqual(uids.saved, 1002)
2069 gids = p.gids()
2070 self.assertEqual(gids.real, 1004)
2071 self.assertEqual(gids.effective, 1005)
2072 self.assertEqual(gids.saved, 1006)
2073 self.assertEqual(p._proc._get_eligible_cpus(), list(range(0, 8)))
2074
2075
2076 @unittest.skipIf(not LINUX, "LINUX only")
2077 class TestProcessAgainstStatus(PsutilTestCase):
2078 """/proc/pid/stat and /proc/pid/status have many values in common.
2079 Whenever possible, psutil uses /proc/pid/stat (it's faster).
2080 For all those cases we check that the value found in
2081 /proc/pid/stat (by psutil) matches the one found in
2082 /proc/pid/status.
2083 """
2084
2085 @classmethod
2086 def setUpClass(cls):
2087 cls.proc = psutil.Process()
2088
2089 def read_status_file(self, linestart):
2090 with psutil._psplatform.open_text(
2091 '/proc/%s/status' % self.proc.pid) as f:
2092 for line in f:
2093 line = line.strip()
2094 if line.startswith(linestart):
2095 value = line.partition('\t')[2]
2096 try:
2097 return int(value)
2098 except ValueError:
2099 return value
2100 raise ValueError("can't find %r" % linestart)
2101
2102 def test_name(self):
2103 value = self.read_status_file("Name:")
2104 self.assertEqual(self.proc.name(), value)
2105
2106 def test_status(self):
2107 value = self.read_status_file("State:")
2108 value = value[value.find('(') + 1:value.rfind(')')]
2109 value = value.replace(' ', '-')
2110 self.assertEqual(self.proc.status(), value)
2111
2112 def test_ppid(self):
2113 value = self.read_status_file("PPid:")
2114 self.assertEqual(self.proc.ppid(), value)
2115
2116 def test_num_threads(self):
2117 value = self.read_status_file("Threads:")
2118 self.assertEqual(self.proc.num_threads(), value)
2119
2120 def test_uids(self):
2121 value = self.read_status_file("Uid:")
2122 value = tuple(map(int, value.split()[1:4]))
2123 self.assertEqual(self.proc.uids(), value)
2124
2125 def test_gids(self):
2126 value = self.read_status_file("Gid:")
2127 value = tuple(map(int, value.split()[1:4]))
2128 self.assertEqual(self.proc.gids(), value)
2129
2130 @retry_on_failure()
2131 def test_num_ctx_switches(self):
2132 value = self.read_status_file("voluntary_ctxt_switches:")
2133 self.assertEqual(self.proc.num_ctx_switches().voluntary, value)
2134 value = self.read_status_file("nonvoluntary_ctxt_switches:")
2135 self.assertEqual(self.proc.num_ctx_switches().involuntary, value)
2136
2137 def test_cpu_affinity(self):
2138 value = self.read_status_file("Cpus_allowed_list:")
2139 if '-' in str(value):
2140 min_, max_ = map(int, value.split('-'))
2141 self.assertEqual(
2142 self.proc.cpu_affinity(), list(range(min_, max_ + 1)))
2143
2144 def test_cpu_affinity_eligible_cpus(self):
2145 value = self.read_status_file("Cpus_allowed_list:")
2146 with mock.patch("psutil._pslinux.per_cpu_times") as m:
2147 self.proc._proc._get_eligible_cpus()
2148 if '-' in str(value):
2149 assert not m.called
2150 else:
2151 assert m.called
2152
2153
2154 # =====================================================================
2155 # --- test utils
2156 # =====================================================================
2157
2158
2159 @unittest.skipIf(not LINUX, "LINUX only")
2160 class TestUtils(PsutilTestCase):
2161
2162 def test_readlink(self):
2163 with mock.patch("os.readlink", return_value="foo (deleted)") as m:
2164 self.assertEqual(psutil._psplatform.readlink("bar"), "foo")
2165 assert m.called
2166
2167 def test_cat(self):
2168 testfn = self.get_testfn()
2169 with open(testfn, "wt") as f:
2170 f.write("foo ")
2171 self.assertEqual(psutil._psplatform.cat(testfn, binary=False), "foo")
2172 self.assertEqual(psutil._psplatform.cat(testfn, binary=True), b"foo")
2173 self.assertEqual(
2174 psutil._psplatform.cat(testfn + '??', fallback="bar"), "bar")
2175
2176
2177 if __name__ == '__main__':
2178 from psutil.tests.runner import run_from_name
2179 run_from_name(__file__)