comparison env/lib/python3.7/site-packages/psutil/tests/test_linux.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
4 # Use of this source code is governed by a BSD-style license that can be
5 # found in the LICENSE file.
6
7 """Linux specific tests."""
8
9 from __future__ import division
10 import collections
11 import contextlib
12 import errno
13 import glob
14 import io
15 import os
16 import re
17 import shutil
18 import socket
19 import struct
20 import tempfile
21 import textwrap
22 import time
23 import warnings
24
25 import psutil
26 from psutil import LINUX
27 from psutil._compat import basestring
28 from psutil._compat import FileNotFoundError
29 from psutil._compat import PY3
30 from psutil._compat import u
31 from psutil.tests import call_until
32 from psutil.tests import HAS_BATTERY
33 from psutil.tests import HAS_CPU_FREQ
34 from psutil.tests import HAS_GETLOADAVG
35 from psutil.tests import HAS_RLIMIT
36 from psutil.tests import MEMORY_TOLERANCE
37 from psutil.tests import mock
38 from psutil.tests import PYPY
39 from psutil.tests import pyrun
40 from psutil.tests import reap_children
41 from psutil.tests import reload_module
42 from psutil.tests import retry_on_failure
43 from psutil.tests import safe_rmpath
44 from psutil.tests import sh
45 from psutil.tests import skip_on_not_implemented
46 from psutil.tests import TESTFN
47 from psutil.tests import ThreadTask
48 from psutil.tests import TRAVIS
49 from psutil.tests import unittest
50 from psutil.tests import which
51
52
53 HERE = os.path.abspath(os.path.dirname(__file__))
54 SIOCGIFADDR = 0x8915
55 SIOCGIFCONF = 0x8912
56 SIOCGIFHWADDR = 0x8927
57 if LINUX:
58 SECTOR_SIZE = 512
59 EMPTY_TEMPERATURES = not glob.glob('/sys/class/hwmon/hwmon*')
60
61 # =====================================================================
62 # --- utils
63 # =====================================================================
64
65
66 def get_ipv4_address(ifname):
67 import fcntl
68 ifname = ifname[:15]
69 if PY3:
70 ifname = bytes(ifname, 'ascii')
71 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
72 with contextlib.closing(s):
73 return socket.inet_ntoa(
74 fcntl.ioctl(s.fileno(),
75 SIOCGIFADDR,
76 struct.pack('256s', ifname))[20:24])
77
78
79 def get_mac_address(ifname):
80 import fcntl
81 ifname = ifname[:15]
82 if PY3:
83 ifname = bytes(ifname, 'ascii')
84 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
85 with contextlib.closing(s):
86 info = fcntl.ioctl(
87 s.fileno(), SIOCGIFHWADDR, struct.pack('256s', ifname))
88 if PY3:
89 def ord(x):
90 return x
91 else:
92 import __builtin__
93 ord = __builtin__.ord
94 return ''.join(['%02x:' % ord(char) for char in info[18:24]])[:-1]
95
96
97 def free_swap():
98 """Parse 'free' cmd and return swap memory's s total, used and free
99 values.
100 """
101 out = sh('free -b', env={"LANG": "C.UTF-8"})
102 lines = out.split('\n')
103 for line in lines:
104 if line.startswith('Swap'):
105 _, total, used, free = line.split()
106 nt = collections.namedtuple('free', 'total used free')
107 return nt(int(total), int(used), int(free))
108 raise ValueError(
109 "can't find 'Swap' in 'free' output:\n%s" % '\n'.join(lines))
110
111
112 def free_physmem():
113 """Parse 'free' cmd and return physical memory's total, used
114 and free values.
115 """
116 # Note: free can have 2 different formats, invalidating 'shared'
117 # and 'cached' memory which may have different positions so we
118 # do not return them.
119 # https://github.com/giampaolo/psutil/issues/538#issuecomment-57059946
120 out = sh('free -b', env={"LANG": "C.UTF-8"})
121 lines = out.split('\n')
122 for line in lines:
123 if line.startswith('Mem'):
124 total, used, free, shared = \
125 [int(x) for x in line.split()[1:5]]
126 nt = collections.namedtuple(
127 'free', 'total used free shared output')
128 return nt(total, used, free, shared, out)
129 raise ValueError(
130 "can't find 'Mem' in 'free' output:\n%s" % '\n'.join(lines))
131
132
133 def vmstat(stat):
134 out = sh("vmstat -s", env={"LANG": "C.UTF-8"})
135 for line in out.split("\n"):
136 line = line.strip()
137 if stat in line:
138 return int(line.split(' ')[0])
139 raise ValueError("can't find %r in 'vmstat' output" % stat)
140
141
142 def get_free_version_info():
143 out = sh("free -V").strip()
144 return tuple(map(int, out.split()[-1].split('.')))
145
146
147 @contextlib.contextmanager
148 def mock_open_content(for_path, content):
149 """Mock open() builtin and forces it to return a certain `content`
150 on read() if the path being opened matches `for_path`.
151 """
152 def open_mock(name, *args, **kwargs):
153 if name == for_path:
154 if PY3:
155 if isinstance(content, basestring):
156 return io.StringIO(content)
157 else:
158 return io.BytesIO(content)
159 else:
160 return io.BytesIO(content)
161 else:
162 return orig_open(name, *args, **kwargs)
163
164 orig_open = open
165 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
166 with mock.patch(patch_point, create=True, side_effect=open_mock) as m:
167 yield m
168
169
170 @contextlib.contextmanager
171 def mock_open_exception(for_path, exc):
172 """Mock open() builtin and raises `exc` if the path being opened
173 matches `for_path`.
174 """
175 def open_mock(name, *args, **kwargs):
176 if name == for_path:
177 raise exc
178 else:
179 return orig_open(name, *args, **kwargs)
180
181 orig_open = open
182 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
183 with mock.patch(patch_point, create=True, side_effect=open_mock) as m:
184 yield m
185
186
187 # =====================================================================
188 # --- system virtual memory
189 # =====================================================================
190
191
192 @unittest.skipIf(not LINUX, "LINUX only")
193 class TestSystemVirtualMemory(unittest.TestCase):
194
195 def test_total(self):
196 # free_value = free_physmem().total
197 # psutil_value = psutil.virtual_memory().total
198 # self.assertEqual(free_value, psutil_value)
199 vmstat_value = vmstat('total memory') * 1024
200 psutil_value = psutil.virtual_memory().total
201 self.assertAlmostEqual(vmstat_value, psutil_value)
202
203 # Older versions of procps used slab memory to calculate used memory.
204 # This got changed in:
205 # https://gitlab.com/procps-ng/procps/commit/
206 # 05d751c4f076a2f0118b914c5e51cfbb4762ad8e
207 @unittest.skipIf(LINUX and get_free_version_info() < (3, 3, 12),
208 "old free version")
209 @retry_on_failure()
210 def test_used(self):
211 free = free_physmem()
212 free_value = free.used
213 psutil_value = psutil.virtual_memory().used
214 self.assertAlmostEqual(
215 free_value, psutil_value, delta=MEMORY_TOLERANCE,
216 msg='%s %s \n%s' % (free_value, psutil_value, free.output))
217
218 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
219 @retry_on_failure()
220 def test_free(self):
221 vmstat_value = vmstat('free memory') * 1024
222 psutil_value = psutil.virtual_memory().free
223 self.assertAlmostEqual(
224 vmstat_value, psutil_value, delta=MEMORY_TOLERANCE)
225
226 @retry_on_failure()
227 def test_buffers(self):
228 vmstat_value = vmstat('buffer memory') * 1024
229 psutil_value = psutil.virtual_memory().buffers
230 self.assertAlmostEqual(
231 vmstat_value, psutil_value, delta=MEMORY_TOLERANCE)
232
233 # https://travis-ci.org/giampaolo/psutil/jobs/226719664
234 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
235 @retry_on_failure()
236 def test_active(self):
237 vmstat_value = vmstat('active memory') * 1024
238 psutil_value = psutil.virtual_memory().active
239 self.assertAlmostEqual(
240 vmstat_value, psutil_value, delta=MEMORY_TOLERANCE)
241
242 # https://travis-ci.org/giampaolo/psutil/jobs/227242952
243 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
244 @retry_on_failure()
245 def test_inactive(self):
246 vmstat_value = vmstat('inactive memory') * 1024
247 psutil_value = psutil.virtual_memory().inactive
248 self.assertAlmostEqual(
249 vmstat_value, psutil_value, delta=MEMORY_TOLERANCE)
250
251 @retry_on_failure()
252 def test_shared(self):
253 free = free_physmem()
254 free_value = free.shared
255 if free_value == 0:
256 raise unittest.SkipTest("free does not support 'shared' column")
257 psutil_value = psutil.virtual_memory().shared
258 self.assertAlmostEqual(
259 free_value, psutil_value, delta=MEMORY_TOLERANCE,
260 msg='%s %s \n%s' % (free_value, psutil_value, free.output))
261
262 @retry_on_failure()
263 def test_available(self):
264 # "free" output format has changed at some point:
265 # https://github.com/giampaolo/psutil/issues/538#issuecomment-147192098
266 out = sh("free -b")
267 lines = out.split('\n')
268 if 'available' not in lines[0]:
269 raise unittest.SkipTest("free does not support 'available' column")
270 else:
271 free_value = int(lines[1].split()[-1])
272 psutil_value = psutil.virtual_memory().available
273 self.assertAlmostEqual(
274 free_value, psutil_value, delta=MEMORY_TOLERANCE,
275 msg='%s %s \n%s' % (free_value, psutil_value, out))
276
277 def test_warnings_on_misses(self):
278 # Emulate a case where /proc/meminfo provides few info.
279 # psutil is supposed to set the missing fields to 0 and
280 # raise a warning.
281 with mock_open_content(
282 '/proc/meminfo',
283 textwrap.dedent("""\
284 Active(anon): 6145416 kB
285 Active(file): 2950064 kB
286 Inactive(anon): 574764 kB
287 Inactive(file): 1567648 kB
288 MemAvailable: -1 kB
289 MemFree: 2057400 kB
290 MemTotal: 16325648 kB
291 SReclaimable: 346648 kB
292 """).encode()) as m:
293 with warnings.catch_warnings(record=True) as ws:
294 warnings.simplefilter("always")
295 ret = psutil.virtual_memory()
296 assert m.called
297 self.assertEqual(len(ws), 1)
298 w = ws[0]
299 assert w.filename.endswith('psutil/_pslinux.py')
300 self.assertIn(
301 "memory stats couldn't be determined", str(w.message))
302 self.assertIn("cached", str(w.message))
303 self.assertIn("shared", str(w.message))
304 self.assertIn("active", str(w.message))
305 self.assertIn("inactive", str(w.message))
306 self.assertIn("buffers", str(w.message))
307 self.assertIn("available", str(w.message))
308 self.assertEqual(ret.cached, 0)
309 self.assertEqual(ret.active, 0)
310 self.assertEqual(ret.inactive, 0)
311 self.assertEqual(ret.shared, 0)
312 self.assertEqual(ret.buffers, 0)
313 self.assertEqual(ret.available, 0)
314 self.assertEqual(ret.slab, 0)
315
316 @retry_on_failure()
317 def test_avail_old_percent(self):
318 # Make sure that our calculation of avail mem for old kernels
319 # is off by max 15%.
320 from psutil._pslinux import calculate_avail_vmem
321 from psutil._pslinux import open_binary
322
323 mems = {}
324 with open_binary('/proc/meminfo') as f:
325 for line in f:
326 fields = line.split()
327 mems[fields[0]] = int(fields[1]) * 1024
328
329 a = calculate_avail_vmem(mems)
330 if b'MemAvailable:' in mems:
331 b = mems[b'MemAvailable:']
332 diff_percent = abs(a - b) / a * 100
333 self.assertLess(diff_percent, 15)
334
335 def test_avail_old_comes_from_kernel(self):
336 # Make sure "MemAvailable:" coluimn is used instead of relying
337 # on our internal algorithm to calculate avail mem.
338 with mock_open_content(
339 '/proc/meminfo',
340 textwrap.dedent("""\
341 Active: 9444728 kB
342 Active(anon): 6145416 kB
343 Active(file): 2950064 kB
344 Buffers: 287952 kB
345 Cached: 4818144 kB
346 Inactive(file): 1578132 kB
347 Inactive(anon): 574764 kB
348 Inactive(file): 1567648 kB
349 MemAvailable: 6574984 kB
350 MemFree: 2057400 kB
351 MemTotal: 16325648 kB
352 Shmem: 577588 kB
353 SReclaimable: 346648 kB
354 """).encode()) as m:
355 with warnings.catch_warnings(record=True) as ws:
356 ret = psutil.virtual_memory()
357 assert m.called
358 self.assertEqual(ret.available, 6574984 * 1024)
359 w = ws[0]
360 self.assertIn(
361 "inactive memory stats couldn't be determined", str(w.message))
362
363 def test_avail_old_missing_fields(self):
364 # Remove Active(file), Inactive(file) and SReclaimable
365 # from /proc/meminfo and make sure the fallback is used
366 # (free + cached),
367 with mock_open_content(
368 "/proc/meminfo",
369 textwrap.dedent("""\
370 Active: 9444728 kB
371 Active(anon): 6145416 kB
372 Buffers: 287952 kB
373 Cached: 4818144 kB
374 Inactive(file): 1578132 kB
375 Inactive(anon): 574764 kB
376 MemFree: 2057400 kB
377 MemTotal: 16325648 kB
378 Shmem: 577588 kB
379 """).encode()) as m:
380 with warnings.catch_warnings(record=True) as ws:
381 ret = psutil.virtual_memory()
382 assert m.called
383 self.assertEqual(ret.available, 2057400 * 1024 + 4818144 * 1024)
384 w = ws[0]
385 self.assertIn(
386 "inactive memory stats couldn't be determined", str(w.message))
387
388 def test_avail_old_missing_zoneinfo(self):
389 # Remove /proc/zoneinfo file. Make sure fallback is used
390 # (free + cached).
391 with mock_open_content(
392 "/proc/meminfo",
393 textwrap.dedent("""\
394 Active: 9444728 kB
395 Active(anon): 6145416 kB
396 Active(file): 2950064 kB
397 Buffers: 287952 kB
398 Cached: 4818144 kB
399 Inactive(file): 1578132 kB
400 Inactive(anon): 574764 kB
401 Inactive(file): 1567648 kB
402 MemFree: 2057400 kB
403 MemTotal: 16325648 kB
404 Shmem: 577588 kB
405 SReclaimable: 346648 kB
406 """).encode()):
407 with mock_open_exception(
408 "/proc/zoneinfo",
409 IOError(errno.ENOENT, 'no such file or directory')):
410 with warnings.catch_warnings(record=True) as ws:
411 ret = psutil.virtual_memory()
412 self.assertEqual(
413 ret.available, 2057400 * 1024 + 4818144 * 1024)
414 w = ws[0]
415 self.assertIn(
416 "inactive memory stats couldn't be determined",
417 str(w.message))
418
419 def test_virtual_memory_mocked(self):
420 # Emulate /proc/meminfo because neither vmstat nor free return slab.
421 def open_mock(name, *args, **kwargs):
422 if name == '/proc/meminfo':
423 return io.BytesIO(textwrap.dedent("""\
424 MemTotal: 100 kB
425 MemFree: 2 kB
426 MemAvailable: 3 kB
427 Buffers: 4 kB
428 Cached: 5 kB
429 SwapCached: 6 kB
430 Active: 7 kB
431 Inactive: 8 kB
432 Active(anon): 9 kB
433 Inactive(anon): 10 kB
434 Active(file): 11 kB
435 Inactive(file): 12 kB
436 Unevictable: 13 kB
437 Mlocked: 14 kB
438 SwapTotal: 15 kB
439 SwapFree: 16 kB
440 Dirty: 17 kB
441 Writeback: 18 kB
442 AnonPages: 19 kB
443 Mapped: 20 kB
444 Shmem: 21 kB
445 Slab: 22 kB
446 SReclaimable: 23 kB
447 SUnreclaim: 24 kB
448 KernelStack: 25 kB
449 PageTables: 26 kB
450 NFS_Unstable: 27 kB
451 Bounce: 28 kB
452 WritebackTmp: 29 kB
453 CommitLimit: 30 kB
454 Committed_AS: 31 kB
455 VmallocTotal: 32 kB
456 VmallocUsed: 33 kB
457 VmallocChunk: 34 kB
458 HardwareCorrupted: 35 kB
459 AnonHugePages: 36 kB
460 ShmemHugePages: 37 kB
461 ShmemPmdMapped: 38 kB
462 CmaTotal: 39 kB
463 CmaFree: 40 kB
464 HugePages_Total: 41 kB
465 HugePages_Free: 42 kB
466 HugePages_Rsvd: 43 kB
467 HugePages_Surp: 44 kB
468 Hugepagesize: 45 kB
469 DirectMap46k: 46 kB
470 DirectMap47M: 47 kB
471 DirectMap48G: 48 kB
472 """).encode())
473 else:
474 return orig_open(name, *args, **kwargs)
475
476 orig_open = open
477 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
478 with mock.patch(patch_point, create=True, side_effect=open_mock) as m:
479 mem = psutil.virtual_memory()
480 assert m.called
481 self.assertEqual(mem.total, 100 * 1024)
482 self.assertEqual(mem.free, 2 * 1024)
483 self.assertEqual(mem.buffers, 4 * 1024)
484 # cached mem also includes reclaimable memory
485 self.assertEqual(mem.cached, (5 + 23) * 1024)
486 self.assertEqual(mem.shared, 21 * 1024)
487 self.assertEqual(mem.active, 7 * 1024)
488 self.assertEqual(mem.inactive, 8 * 1024)
489 self.assertEqual(mem.slab, 22 * 1024)
490 self.assertEqual(mem.available, 3 * 1024)
491
492
493 # =====================================================================
494 # --- system swap memory
495 # =====================================================================
496
497
498 @unittest.skipIf(not LINUX, "LINUX only")
499 class TestSystemSwapMemory(unittest.TestCase):
500
501 @staticmethod
502 def meminfo_has_swap_info():
503 """Return True if /proc/meminfo provides swap metrics."""
504 with open("/proc/meminfo") as f:
505 data = f.read()
506 return 'SwapTotal:' in data and 'SwapFree:' in data
507
508 def test_total(self):
509 free_value = free_swap().total
510 psutil_value = psutil.swap_memory().total
511 return self.assertAlmostEqual(
512 free_value, psutil_value, delta=MEMORY_TOLERANCE)
513
514 @retry_on_failure()
515 def test_used(self):
516 free_value = free_swap().used
517 psutil_value = psutil.swap_memory().used
518 return self.assertAlmostEqual(
519 free_value, psutil_value, delta=MEMORY_TOLERANCE)
520
521 @retry_on_failure()
522 def test_free(self):
523 free_value = free_swap().free
524 psutil_value = psutil.swap_memory().free
525 return self.assertAlmostEqual(
526 free_value, psutil_value, delta=MEMORY_TOLERANCE)
527
528 def test_missing_sin_sout(self):
529 with mock.patch('psutil._common.open', create=True) as m:
530 with warnings.catch_warnings(record=True) as ws:
531 warnings.simplefilter("always")
532 ret = psutil.swap_memory()
533 assert m.called
534 self.assertEqual(len(ws), 1)
535 w = ws[0]
536 assert w.filename.endswith('psutil/_pslinux.py')
537 self.assertIn(
538 "'sin' and 'sout' swap memory stats couldn't "
539 "be determined", str(w.message))
540 self.assertEqual(ret.sin, 0)
541 self.assertEqual(ret.sout, 0)
542
543 def test_no_vmstat_mocked(self):
544 # see https://github.com/giampaolo/psutil/issues/722
545 with mock_open_exception(
546 "/proc/vmstat",
547 IOError(errno.ENOENT, 'no such file or directory')) as m:
548 with warnings.catch_warnings(record=True) as ws:
549 warnings.simplefilter("always")
550 ret = psutil.swap_memory()
551 assert m.called
552 self.assertEqual(len(ws), 1)
553 w = ws[0]
554 assert w.filename.endswith('psutil/_pslinux.py')
555 self.assertIn(
556 "'sin' and 'sout' swap memory stats couldn't "
557 "be determined and were set to 0",
558 str(w.message))
559 self.assertEqual(ret.sin, 0)
560 self.assertEqual(ret.sout, 0)
561
562 def test_meminfo_against_sysinfo(self):
563 # Make sure the content of /proc/meminfo about swap memory
564 # matches sysinfo() syscall, see:
565 # https://github.com/giampaolo/psutil/issues/1015
566 if not self.meminfo_has_swap_info():
567 return unittest.skip("/proc/meminfo has no swap metrics")
568 with mock.patch('psutil._pslinux.cext.linux_sysinfo') as m:
569 swap = psutil.swap_memory()
570 assert not m.called
571 import psutil._psutil_linux as cext
572 _, _, _, _, total, free, unit_multiplier = cext.linux_sysinfo()
573 total *= unit_multiplier
574 free *= unit_multiplier
575 self.assertEqual(swap.total, total)
576 self.assertAlmostEqual(swap.free, free, delta=MEMORY_TOLERANCE)
577
578 def test_emulate_meminfo_has_no_metrics(self):
579 # Emulate a case where /proc/meminfo provides no swap metrics
580 # in which case sysinfo() syscall is supposed to be used
581 # as a fallback.
582 with mock_open_content("/proc/meminfo", b"") as m:
583 psutil.swap_memory()
584 assert m.called
585
586
587 # =====================================================================
588 # --- system CPU
589 # =====================================================================
590
591
592 @unittest.skipIf(not LINUX, "LINUX only")
593 class TestSystemCPUTimes(unittest.TestCase):
594
595 @unittest.skipIf(TRAVIS, "unknown failure on travis")
596 def test_fields(self):
597 fields = psutil.cpu_times()._fields
598 kernel_ver = re.findall(r'\d+\.\d+\.\d+', os.uname()[2])[0]
599 kernel_ver_info = tuple(map(int, kernel_ver.split('.')))
600 if kernel_ver_info >= (2, 6, 11):
601 self.assertIn('steal', fields)
602 else:
603 self.assertNotIn('steal', fields)
604 if kernel_ver_info >= (2, 6, 24):
605 self.assertIn('guest', fields)
606 else:
607 self.assertNotIn('guest', fields)
608 if kernel_ver_info >= (3, 2, 0):
609 self.assertIn('guest_nice', fields)
610 else:
611 self.assertNotIn('guest_nice', fields)
612
613
614 @unittest.skipIf(not LINUX, "LINUX only")
615 class TestSystemCPUCountLogical(unittest.TestCase):
616
617 @unittest.skipIf(not os.path.exists("/sys/devices/system/cpu/online"),
618 "/sys/devices/system/cpu/online does not exist")
619 def test_against_sysdev_cpu_online(self):
620 with open("/sys/devices/system/cpu/online") as f:
621 value = f.read().strip()
622 if "-" in str(value):
623 value = int(value.split('-')[1]) + 1
624 self.assertEqual(psutil.cpu_count(), value)
625
626 @unittest.skipIf(not os.path.exists("/sys/devices/system/cpu"),
627 "/sys/devices/system/cpu does not exist")
628 def test_against_sysdev_cpu_num(self):
629 ls = os.listdir("/sys/devices/system/cpu")
630 count = len([x for x in ls if re.search(r"cpu\d+$", x) is not None])
631 self.assertEqual(psutil.cpu_count(), count)
632
633 @unittest.skipIf(not which("nproc"), "nproc utility not available")
634 def test_against_nproc(self):
635 num = int(sh("nproc --all"))
636 self.assertEqual(psutil.cpu_count(logical=True), num)
637
638 @unittest.skipIf(not which("lscpu"), "lscpu utility not available")
639 def test_against_lscpu(self):
640 out = sh("lscpu -p")
641 num = len([x for x in out.split('\n') if not x.startswith('#')])
642 self.assertEqual(psutil.cpu_count(logical=True), num)
643
644 def test_emulate_fallbacks(self):
645 import psutil._pslinux
646 original = psutil._pslinux.cpu_count_logical()
647 # Here we want to mock os.sysconf("SC_NPROCESSORS_ONLN") in
648 # order to cause the parsing of /proc/cpuinfo and /proc/stat.
649 with mock.patch(
650 'psutil._pslinux.os.sysconf', side_effect=ValueError) as m:
651 self.assertEqual(psutil._pslinux.cpu_count_logical(), original)
652 assert m.called
653
654 # Let's have open() return emtpy data and make sure None is
655 # returned ('cause we mimick os.cpu_count()).
656 with mock.patch('psutil._common.open', create=True) as m:
657 self.assertIsNone(psutil._pslinux.cpu_count_logical())
658 self.assertEqual(m.call_count, 2)
659 # /proc/stat should be the last one
660 self.assertEqual(m.call_args[0][0], '/proc/stat')
661
662 # Let's push this a bit further and make sure /proc/cpuinfo
663 # parsing works as expected.
664 with open('/proc/cpuinfo', 'rb') as f:
665 cpuinfo_data = f.read()
666 fake_file = io.BytesIO(cpuinfo_data)
667 with mock.patch('psutil._common.open',
668 return_value=fake_file, create=True) as m:
669 self.assertEqual(psutil._pslinux.cpu_count_logical(), original)
670
671 # Finally, let's make /proc/cpuinfo return meaningless data;
672 # this way we'll fall back on relying on /proc/stat
673 with mock_open_content('/proc/cpuinfo', b"") as m:
674 self.assertEqual(psutil._pslinux.cpu_count_logical(), original)
675 m.called
676
677
678 @unittest.skipIf(not LINUX, "LINUX only")
679 class TestSystemCPUCountPhysical(unittest.TestCase):
680
681 @unittest.skipIf(not which("lscpu"), "lscpu utility not available")
682 def test_against_lscpu(self):
683 out = sh("lscpu -p")
684 core_ids = set()
685 for line in out.split('\n'):
686 if not line.startswith('#'):
687 fields = line.split(',')
688 core_ids.add(fields[1])
689 self.assertEqual(psutil.cpu_count(logical=False), len(core_ids))
690
691 def test_emulate_none(self):
692 with mock.patch('glob.glob', return_value=[]) as m1:
693 with mock.patch('psutil._common.open', create=True) as m2:
694 self.assertIsNone(psutil._pslinux.cpu_count_physical())
695 assert m1.called
696 assert m2.called
697
698
699 @unittest.skipIf(not LINUX, "LINUX only")
700 class TestSystemCPUFrequency(unittest.TestCase):
701
702 @unittest.skipIf(TRAVIS, "fails on Travis")
703 @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
704 def test_emulate_use_second_file(self):
705 # https://github.com/giampaolo/psutil/issues/981
706 def path_exists_mock(path):
707 if path.startswith("/sys/devices/system/cpu/cpufreq/policy"):
708 return False
709 else:
710 return orig_exists(path)
711
712 orig_exists = os.path.exists
713 with mock.patch("os.path.exists", side_effect=path_exists_mock,
714 create=True):
715 assert psutil.cpu_freq()
716
717 @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
718 def test_emulate_use_cpuinfo(self):
719 # Emulate a case where /sys/devices/system/cpu/cpufreq* does not
720 # exist and /proc/cpuinfo is used instead.
721 def path_exists_mock(path):
722 if path.startswith('/sys/devices/system/cpu/'):
723 return False
724 else:
725 if path == "/proc/cpuinfo":
726 flags.append(None)
727 return os_path_exists(path)
728
729 flags = []
730 os_path_exists = os.path.exists
731 try:
732 with mock.patch("os.path.exists", side_effect=path_exists_mock):
733 reload_module(psutil._pslinux)
734 ret = psutil.cpu_freq()
735 assert ret
736 assert flags
737 self.assertEqual(ret.max, 0.0)
738 self.assertEqual(ret.min, 0.0)
739 for freq in psutil.cpu_freq(percpu=True):
740 self.assertEqual(ret.max, 0.0)
741 self.assertEqual(ret.min, 0.0)
742 finally:
743 reload_module(psutil._pslinux)
744 reload_module(psutil)
745
746 @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
747 def test_emulate_data(self):
748 def open_mock(name, *args, **kwargs):
749 if (name.endswith('/scaling_cur_freq') and
750 name.startswith("/sys/devices/system/cpu/cpufreq/policy")):
751 return io.BytesIO(b"500000")
752 elif (name.endswith('/scaling_min_freq') and
753 name.startswith("/sys/devices/system/cpu/cpufreq/policy")):
754 return io.BytesIO(b"600000")
755 elif (name.endswith('/scaling_max_freq') and
756 name.startswith("/sys/devices/system/cpu/cpufreq/policy")):
757 return io.BytesIO(b"700000")
758 elif name == '/proc/cpuinfo':
759 return io.BytesIO(b"cpu MHz : 500")
760 else:
761 return orig_open(name, *args, **kwargs)
762
763 orig_open = open
764 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
765 with mock.patch(patch_point, side_effect=open_mock):
766 with mock.patch(
767 'os.path.exists', return_value=True):
768 freq = psutil.cpu_freq()
769 self.assertEqual(freq.current, 500.0)
770 # when /proc/cpuinfo is used min and max frequencies are not
771 # available and are set to 0.
772 if freq.min != 0.0:
773 self.assertEqual(freq.min, 600.0)
774 if freq.max != 0.0:
775 self.assertEqual(freq.max, 700.0)
776
777 @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
778 def test_emulate_multi_cpu(self):
779 def open_mock(name, *args, **kwargs):
780 n = name
781 if (n.endswith('/scaling_cur_freq') and
782 n.startswith("/sys/devices/system/cpu/cpufreq/policy0")):
783 return io.BytesIO(b"100000")
784 elif (n.endswith('/scaling_min_freq') and
785 n.startswith("/sys/devices/system/cpu/cpufreq/policy0")):
786 return io.BytesIO(b"200000")
787 elif (n.endswith('/scaling_max_freq') and
788 n.startswith("/sys/devices/system/cpu/cpufreq/policy0")):
789 return io.BytesIO(b"300000")
790 elif (n.endswith('/scaling_cur_freq') and
791 n.startswith("/sys/devices/system/cpu/cpufreq/policy1")):
792 return io.BytesIO(b"400000")
793 elif (n.endswith('/scaling_min_freq') and
794 n.startswith("/sys/devices/system/cpu/cpufreq/policy1")):
795 return io.BytesIO(b"500000")
796 elif (n.endswith('/scaling_max_freq') and
797 n.startswith("/sys/devices/system/cpu/cpufreq/policy1")):
798 return io.BytesIO(b"600000")
799 elif name == '/proc/cpuinfo':
800 return io.BytesIO(b"cpu MHz : 100\n"
801 b"cpu MHz : 400")
802 else:
803 return orig_open(name, *args, **kwargs)
804
805 orig_open = open
806 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
807 with mock.patch(patch_point, side_effect=open_mock):
808 with mock.patch('os.path.exists', return_value=True):
809 with mock.patch('psutil._pslinux.cpu_count_logical',
810 return_value=2):
811 freq = psutil.cpu_freq(percpu=True)
812 self.assertEqual(freq[0].current, 100.0)
813 if freq[0].min != 0.0:
814 self.assertEqual(freq[0].min, 200.0)
815 if freq[0].max != 0.0:
816 self.assertEqual(freq[0].max, 300.0)
817 self.assertEqual(freq[1].current, 400.0)
818 if freq[1].min != 0.0:
819 self.assertEqual(freq[1].min, 500.0)
820 if freq[1].max != 0.0:
821 self.assertEqual(freq[1].max, 600.0)
822
823 @unittest.skipIf(TRAVIS, "fails on Travis")
824 @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
825 def test_emulate_no_scaling_cur_freq_file(self):
826 # See: https://github.com/giampaolo/psutil/issues/1071
827 def open_mock(name, *args, **kwargs):
828 if name.endswith('/scaling_cur_freq'):
829 raise IOError(errno.ENOENT, "")
830 elif name.endswith('/cpuinfo_cur_freq'):
831 return io.BytesIO(b"200000")
832 elif name == '/proc/cpuinfo':
833 return io.BytesIO(b"cpu MHz : 200")
834 else:
835 return orig_open(name, *args, **kwargs)
836
837 orig_open = open
838 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
839 with mock.patch(patch_point, side_effect=open_mock):
840 with mock.patch('os.path.exists', return_value=True):
841 with mock.patch('psutil._pslinux.cpu_count_logical',
842 return_value=1):
843 freq = psutil.cpu_freq()
844 self.assertEqual(freq.current, 200)
845
846
847 @unittest.skipIf(not LINUX, "LINUX only")
848 class TestSystemCPUStats(unittest.TestCase):
849
850 @unittest.skipIf(TRAVIS, "fails on Travis")
851 def test_ctx_switches(self):
852 vmstat_value = vmstat("context switches")
853 psutil_value = psutil.cpu_stats().ctx_switches
854 self.assertAlmostEqual(vmstat_value, psutil_value, delta=500)
855
856 @unittest.skipIf(TRAVIS, "fails on Travis")
857 def test_interrupts(self):
858 vmstat_value = vmstat("interrupts")
859 psutil_value = psutil.cpu_stats().interrupts
860 self.assertAlmostEqual(vmstat_value, psutil_value, delta=500)
861
862
863 @unittest.skipIf(not LINUX, "LINUX only")
864 class TestLoadAvg(unittest.TestCase):
865
866 @unittest.skipIf(not HAS_GETLOADAVG, "not supported")
867 def test_getloadavg(self):
868 psutil_value = psutil.getloadavg()
869 with open("/proc/loadavg", "r") as f:
870 proc_value = f.read().split()
871
872 self.assertAlmostEqual(float(proc_value[0]), psutil_value[0], delta=1)
873 self.assertAlmostEqual(float(proc_value[1]), psutil_value[1], delta=1)
874 self.assertAlmostEqual(float(proc_value[2]), psutil_value[2], delta=1)
875
876
877 # =====================================================================
878 # --- system network
879 # =====================================================================
880
881
882 @unittest.skipIf(not LINUX, "LINUX only")
883 class TestSystemNetIfAddrs(unittest.TestCase):
884
885 def test_ips(self):
886 for name, addrs in psutil.net_if_addrs().items():
887 for addr in addrs:
888 if addr.family == psutil.AF_LINK:
889 self.assertEqual(addr.address, get_mac_address(name))
890 elif addr.family == socket.AF_INET:
891 self.assertEqual(addr.address, get_ipv4_address(name))
892 # TODO: test for AF_INET6 family
893
894 # XXX - not reliable when having virtual NICs installed by Docker.
895 # @unittest.skipIf(not which('ip'), "'ip' utility not available")
896 # @unittest.skipIf(TRAVIS, "skipped on Travis")
897 # def test_net_if_names(self):
898 # out = sh("ip addr").strip()
899 # nics = [x for x in psutil.net_if_addrs().keys() if ':' not in x]
900 # found = 0
901 # for line in out.split('\n'):
902 # line = line.strip()
903 # if re.search(r"^\d+:", line):
904 # found += 1
905 # name = line.split(':')[1].strip()
906 # self.assertIn(name, nics)
907 # self.assertEqual(len(nics), found, msg="%s\n---\n%s" % (
908 # pprint.pformat(nics), out))
909
910
911 @unittest.skipIf(not LINUX, "LINUX only")
912 class TestSystemNetIfStats(unittest.TestCase):
913
914 def test_against_ifconfig(self):
915 for name, stats in psutil.net_if_stats().items():
916 try:
917 out = sh("ifconfig %s" % name)
918 except RuntimeError:
919 pass
920 else:
921 # Not always reliable.
922 # self.assertEqual(stats.isup, 'RUNNING' in out, msg=out)
923 self.assertEqual(stats.mtu,
924 int(re.findall(r'(?i)MTU[: ](\d+)', out)[0]))
925
926
927 @unittest.skipIf(not LINUX, "LINUX only")
928 class TestSystemNetIOCounters(unittest.TestCase):
929
930 @retry_on_failure()
931 def test_against_ifconfig(self):
932 def ifconfig(nic):
933 ret = {}
934 out = sh("ifconfig %s" % name)
935 ret['packets_recv'] = int(
936 re.findall(r'RX packets[: ](\d+)', out)[0])
937 ret['packets_sent'] = int(
938 re.findall(r'TX packets[: ](\d+)', out)[0])
939 ret['errin'] = int(re.findall(r'errors[: ](\d+)', out)[0])
940 ret['errout'] = int(re.findall(r'errors[: ](\d+)', out)[1])
941 ret['dropin'] = int(re.findall(r'dropped[: ](\d+)', out)[0])
942 ret['dropout'] = int(re.findall(r'dropped[: ](\d+)', out)[1])
943 ret['bytes_recv'] = int(
944 re.findall(r'RX (?:packets \d+ +)?bytes[: ](\d+)', out)[0])
945 ret['bytes_sent'] = int(
946 re.findall(r'TX (?:packets \d+ +)?bytes[: ](\d+)', out)[0])
947 return ret
948
949 nio = psutil.net_io_counters(pernic=True, nowrap=False)
950 for name, stats in nio.items():
951 try:
952 ifconfig_ret = ifconfig(name)
953 except RuntimeError:
954 continue
955 self.assertAlmostEqual(
956 stats.bytes_recv, ifconfig_ret['bytes_recv'], delta=1024 * 5)
957 self.assertAlmostEqual(
958 stats.bytes_sent, ifconfig_ret['bytes_sent'], delta=1024 * 5)
959 self.assertAlmostEqual(
960 stats.packets_recv, ifconfig_ret['packets_recv'], delta=1024)
961 self.assertAlmostEqual(
962 stats.packets_sent, ifconfig_ret['packets_sent'], delta=1024)
963 self.assertAlmostEqual(
964 stats.errin, ifconfig_ret['errin'], delta=10)
965 self.assertAlmostEqual(
966 stats.errout, ifconfig_ret['errout'], delta=10)
967 self.assertAlmostEqual(
968 stats.dropin, ifconfig_ret['dropin'], delta=10)
969 self.assertAlmostEqual(
970 stats.dropout, ifconfig_ret['dropout'], delta=10)
971
972
973 @unittest.skipIf(not LINUX, "LINUX only")
974 class TestSystemNetConnections(unittest.TestCase):
975
976 @mock.patch('psutil._pslinux.socket.inet_ntop', side_effect=ValueError)
977 @mock.patch('psutil._pslinux.supports_ipv6', return_value=False)
978 def test_emulate_ipv6_unsupported(self, supports_ipv6, inet_ntop):
979 # see: https://github.com/giampaolo/psutil/issues/623
980 try:
981 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
982 self.addCleanup(s.close)
983 s.bind(("::1", 0))
984 except socket.error:
985 pass
986 psutil.net_connections(kind='inet6')
987
988 def test_emulate_unix(self):
989 with mock_open_content(
990 '/proc/net/unix',
991 textwrap.dedent("""\
992 0: 00000003 000 000 0001 03 462170 @/tmp/dbus-Qw2hMPIU3n
993 0: 00000003 000 000 0001 03 35010 @/tmp/dbus-tB2X8h69BQ
994 0: 00000003 000 000 0001 03 34424 @/tmp/dbus-cHy80Y8O
995 000000000000000000000000000000000000000000000000000000
996 """)) as m:
997 psutil.net_connections(kind='unix')
998 assert m.called
999
1000
1001 # =====================================================================
1002 # --- system disks
1003 # =====================================================================
1004
1005
1006 @unittest.skipIf(not LINUX, "LINUX only")
1007 class TestSystemDiskPartitions(unittest.TestCase):
1008
1009 @unittest.skipIf(not hasattr(os, 'statvfs'), "os.statvfs() not available")
1010 @skip_on_not_implemented()
1011 def test_against_df(self):
1012 # test psutil.disk_usage() and psutil.disk_partitions()
1013 # against "df -a"
1014 def df(path):
1015 out = sh('df -P -B 1 "%s"' % path).strip()
1016 lines = out.split('\n')
1017 lines.pop(0)
1018 line = lines.pop(0)
1019 dev, total, used, free = line.split()[:4]
1020 if dev == 'none':
1021 dev = ''
1022 total, used, free = int(total), int(used), int(free)
1023 return dev, total, used, free
1024
1025 for part in psutil.disk_partitions(all=False):
1026 usage = psutil.disk_usage(part.mountpoint)
1027 dev, total, used, free = df(part.mountpoint)
1028 self.assertEqual(usage.total, total)
1029 # 10 MB tollerance
1030 if abs(usage.free - free) > 10 * 1024 * 1024:
1031 self.fail("psutil=%s, df=%s" % (usage.free, free))
1032 if abs(usage.used - used) > 10 * 1024 * 1024:
1033 self.fail("psutil=%s, df=%s" % (usage.used, used))
1034
1035 def test_zfs_fs(self):
1036 # Test that ZFS partitions are returned.
1037 with open("/proc/filesystems", "r") as f:
1038 data = f.read()
1039 if 'zfs' in data:
1040 for part in psutil.disk_partitions():
1041 if part.fstype == 'zfs':
1042 break
1043 else:
1044 self.fail("couldn't find any ZFS partition")
1045 else:
1046 # No ZFS partitions on this system. Let's fake one.
1047 fake_file = io.StringIO(u("nodev\tzfs\n"))
1048 with mock.patch('psutil._common.open',
1049 return_value=fake_file, create=True) as m1:
1050 with mock.patch(
1051 'psutil._pslinux.cext.disk_partitions',
1052 return_value=[('/dev/sdb3', '/', 'zfs', 'rw')]) as m2:
1053 ret = psutil.disk_partitions()
1054 assert m1.called
1055 assert m2.called
1056 assert ret
1057 self.assertEqual(ret[0].fstype, 'zfs')
1058
1059 def test_emulate_realpath_fail(self):
1060 # See: https://github.com/giampaolo/psutil/issues/1307
1061 try:
1062 with mock.patch('os.path.realpath',
1063 return_value='/non/existent') as m:
1064 with self.assertRaises(FileNotFoundError):
1065 psutil.disk_partitions()
1066 assert m.called
1067 finally:
1068 psutil.PROCFS_PATH = "/proc"
1069
1070
1071 @unittest.skipIf(not LINUX, "LINUX only")
1072 class TestSystemDiskIoCounters(unittest.TestCase):
1073
1074 def test_emulate_kernel_2_4(self):
1075 # Tests /proc/diskstats parsing format for 2.4 kernels, see:
1076 # https://github.com/giampaolo/psutil/issues/767
1077 with mock_open_content(
1078 '/proc/diskstats',
1079 " 3 0 1 hda 2 3 4 5 6 7 8 9 10 11 12"):
1080 with mock.patch('psutil._pslinux.is_storage_device',
1081 return_value=True):
1082 ret = psutil.disk_io_counters(nowrap=False)
1083 self.assertEqual(ret.read_count, 1)
1084 self.assertEqual(ret.read_merged_count, 2)
1085 self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE)
1086 self.assertEqual(ret.read_time, 4)
1087 self.assertEqual(ret.write_count, 5)
1088 self.assertEqual(ret.write_merged_count, 6)
1089 self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE)
1090 self.assertEqual(ret.write_time, 8)
1091 self.assertEqual(ret.busy_time, 10)
1092
1093 def test_emulate_kernel_2_6_full(self):
1094 # Tests /proc/diskstats parsing format for 2.6 kernels,
1095 # lines reporting all metrics:
1096 # https://github.com/giampaolo/psutil/issues/767
1097 with mock_open_content(
1098 '/proc/diskstats',
1099 " 3 0 hda 1 2 3 4 5 6 7 8 9 10 11"):
1100 with mock.patch('psutil._pslinux.is_storage_device',
1101 return_value=True):
1102 ret = psutil.disk_io_counters(nowrap=False)
1103 self.assertEqual(ret.read_count, 1)
1104 self.assertEqual(ret.read_merged_count, 2)
1105 self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE)
1106 self.assertEqual(ret.read_time, 4)
1107 self.assertEqual(ret.write_count, 5)
1108 self.assertEqual(ret.write_merged_count, 6)
1109 self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE)
1110 self.assertEqual(ret.write_time, 8)
1111 self.assertEqual(ret.busy_time, 10)
1112
1113 def test_emulate_kernel_2_6_limited(self):
1114 # Tests /proc/diskstats parsing format for 2.6 kernels,
1115 # where one line of /proc/partitions return a limited
1116 # amount of metrics when it bumps into a partition
1117 # (instead of a disk). See:
1118 # https://github.com/giampaolo/psutil/issues/767
1119 with mock_open_content(
1120 '/proc/diskstats',
1121 " 3 1 hda 1 2 3 4"):
1122 with mock.patch('psutil._pslinux.is_storage_device',
1123 return_value=True):
1124 ret = psutil.disk_io_counters(nowrap=False)
1125 self.assertEqual(ret.read_count, 1)
1126 self.assertEqual(ret.read_bytes, 2 * SECTOR_SIZE)
1127 self.assertEqual(ret.write_count, 3)
1128 self.assertEqual(ret.write_bytes, 4 * SECTOR_SIZE)
1129
1130 self.assertEqual(ret.read_merged_count, 0)
1131 self.assertEqual(ret.read_time, 0)
1132 self.assertEqual(ret.write_merged_count, 0)
1133 self.assertEqual(ret.write_time, 0)
1134 self.assertEqual(ret.busy_time, 0)
1135
1136 def test_emulate_include_partitions(self):
1137 # Make sure that when perdisk=True disk partitions are returned,
1138 # see:
1139 # https://github.com/giampaolo/psutil/pull/1313#issuecomment-408626842
1140 with mock_open_content(
1141 '/proc/diskstats',
1142 textwrap.dedent("""\
1143 3 0 nvme0n1 1 2 3 4 5 6 7 8 9 10 11
1144 3 0 nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11
1145 """)):
1146 with mock.patch('psutil._pslinux.is_storage_device',
1147 return_value=False):
1148 ret = psutil.disk_io_counters(perdisk=True, nowrap=False)
1149 self.assertEqual(len(ret), 2)
1150 self.assertEqual(ret['nvme0n1'].read_count, 1)
1151 self.assertEqual(ret['nvme0n1p1'].read_count, 1)
1152 self.assertEqual(ret['nvme0n1'].write_count, 5)
1153 self.assertEqual(ret['nvme0n1p1'].write_count, 5)
1154
1155 def test_emulate_exclude_partitions(self):
1156 # Make sure that when perdisk=False partitions (e.g. 'sda1',
1157 # 'nvme0n1p1') are skipped and not included in the total count.
1158 # https://github.com/giampaolo/psutil/pull/1313#issuecomment-408626842
1159 with mock_open_content(
1160 '/proc/diskstats',
1161 textwrap.dedent("""\
1162 3 0 nvme0n1 1 2 3 4 5 6 7 8 9 10 11
1163 3 0 nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11
1164 """)):
1165 with mock.patch('psutil._pslinux.is_storage_device',
1166 return_value=False):
1167 ret = psutil.disk_io_counters(perdisk=False, nowrap=False)
1168 self.assertIsNone(ret)
1169
1170 #
1171 def is_storage_device(name):
1172 return name == 'nvme0n1'
1173
1174 with mock_open_content(
1175 '/proc/diskstats',
1176 textwrap.dedent("""\
1177 3 0 nvme0n1 1 2 3 4 5 6 7 8 9 10 11
1178 3 0 nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11
1179 """)):
1180 with mock.patch('psutil._pslinux.is_storage_device',
1181 create=True, side_effect=is_storage_device):
1182 ret = psutil.disk_io_counters(perdisk=False, nowrap=False)
1183 self.assertEqual(ret.read_count, 1)
1184 self.assertEqual(ret.write_count, 5)
1185
1186 def test_emulate_use_sysfs(self):
1187 def exists(path):
1188 if path == '/proc/diskstats':
1189 return False
1190 return True
1191
1192 wprocfs = psutil.disk_io_counters(perdisk=True)
1193 with mock.patch('psutil._pslinux.os.path.exists',
1194 create=True, side_effect=exists):
1195 wsysfs = psutil.disk_io_counters(perdisk=True)
1196 self.assertEqual(len(wprocfs), len(wsysfs))
1197
1198 def test_emulate_not_impl(self):
1199 def exists(path):
1200 return False
1201
1202 with mock.patch('psutil._pslinux.os.path.exists',
1203 create=True, side_effect=exists):
1204 self.assertRaises(NotImplementedError, psutil.disk_io_counters)
1205
1206
1207 # =====================================================================
1208 # --- misc
1209 # =====================================================================
1210
1211
1212 @unittest.skipIf(not LINUX, "LINUX only")
1213 class TestMisc(unittest.TestCase):
1214
1215 def test_boot_time(self):
1216 vmstat_value = vmstat('boot time')
1217 psutil_value = psutil.boot_time()
1218 self.assertEqual(int(vmstat_value), int(psutil_value))
1219
1220 def test_no_procfs_on_import(self):
1221 my_procfs = tempfile.mkdtemp()
1222
1223 with open(os.path.join(my_procfs, 'stat'), 'w') as f:
1224 f.write('cpu 0 0 0 0 0 0 0 0 0 0\n')
1225 f.write('cpu0 0 0 0 0 0 0 0 0 0 0\n')
1226 f.write('cpu1 0 0 0 0 0 0 0 0 0 0\n')
1227
1228 try:
1229 orig_open = open
1230
1231 def open_mock(name, *args, **kwargs):
1232 if name.startswith('/proc'):
1233 raise IOError(errno.ENOENT, 'rejecting access for test')
1234 return orig_open(name, *args, **kwargs)
1235
1236 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1237 with mock.patch(patch_point, side_effect=open_mock):
1238 reload_module(psutil)
1239
1240 self.assertRaises(IOError, psutil.cpu_times)
1241 self.assertRaises(IOError, psutil.cpu_times, percpu=True)
1242 self.assertRaises(IOError, psutil.cpu_percent)
1243 self.assertRaises(IOError, psutil.cpu_percent, percpu=True)
1244 self.assertRaises(IOError, psutil.cpu_times_percent)
1245 self.assertRaises(
1246 IOError, psutil.cpu_times_percent, percpu=True)
1247
1248 psutil.PROCFS_PATH = my_procfs
1249
1250 self.assertEqual(psutil.cpu_percent(), 0)
1251 self.assertEqual(sum(psutil.cpu_times_percent()), 0)
1252
1253 # since we don't know the number of CPUs at import time,
1254 # we awkwardly say there are none until the second call
1255 per_cpu_percent = psutil.cpu_percent(percpu=True)
1256 self.assertEqual(sum(per_cpu_percent), 0)
1257
1258 # ditto awkward length
1259 per_cpu_times_percent = psutil.cpu_times_percent(percpu=True)
1260 self.assertEqual(sum(map(sum, per_cpu_times_percent)), 0)
1261
1262 # much user, very busy
1263 with open(os.path.join(my_procfs, 'stat'), 'w') as f:
1264 f.write('cpu 1 0 0 0 0 0 0 0 0 0\n')
1265 f.write('cpu0 1 0 0 0 0 0 0 0 0 0\n')
1266 f.write('cpu1 1 0 0 0 0 0 0 0 0 0\n')
1267
1268 self.assertNotEqual(psutil.cpu_percent(), 0)
1269 self.assertNotEqual(
1270 sum(psutil.cpu_percent(percpu=True)), 0)
1271 self.assertNotEqual(sum(psutil.cpu_times_percent()), 0)
1272 self.assertNotEqual(
1273 sum(map(sum, psutil.cpu_times_percent(percpu=True))), 0)
1274 finally:
1275 shutil.rmtree(my_procfs)
1276 reload_module(psutil)
1277
1278 self.assertEqual(psutil.PROCFS_PATH, '/proc')
1279
1280 def test_cpu_steal_decrease(self):
1281 # Test cumulative cpu stats decrease. We should ignore this.
1282 # See issue #1210.
1283 with mock_open_content(
1284 "/proc/stat",
1285 textwrap.dedent("""\
1286 cpu 0 0 0 0 0 0 0 1 0 0
1287 cpu0 0 0 0 0 0 0 0 1 0 0
1288 cpu1 0 0 0 0 0 0 0 1 0 0
1289 """).encode()) as m:
1290 # first call to "percent" functions should read the new stat file
1291 # and compare to the "real" file read at import time - so the
1292 # values are meaningless
1293 psutil.cpu_percent()
1294 assert m.called
1295 psutil.cpu_percent(percpu=True)
1296 psutil.cpu_times_percent()
1297 psutil.cpu_times_percent(percpu=True)
1298
1299 with mock_open_content(
1300 "/proc/stat",
1301 textwrap.dedent("""\
1302 cpu 1 0 0 0 0 0 0 0 0 0
1303 cpu0 1 0 0 0 0 0 0 0 0 0
1304 cpu1 1 0 0 0 0 0 0 0 0 0
1305 """).encode()) as m:
1306 # Increase "user" while steal goes "backwards" to zero.
1307 cpu_percent = psutil.cpu_percent()
1308 assert m.called
1309 cpu_percent_percpu = psutil.cpu_percent(percpu=True)
1310 cpu_times_percent = psutil.cpu_times_percent()
1311 cpu_times_percent_percpu = psutil.cpu_times_percent(percpu=True)
1312 self.assertNotEqual(cpu_percent, 0)
1313 self.assertNotEqual(sum(cpu_percent_percpu), 0)
1314 self.assertNotEqual(sum(cpu_times_percent), 0)
1315 self.assertNotEqual(sum(cpu_times_percent), 100.0)
1316 self.assertNotEqual(sum(map(sum, cpu_times_percent_percpu)), 0)
1317 self.assertNotEqual(sum(map(sum, cpu_times_percent_percpu)), 100.0)
1318 self.assertEqual(cpu_times_percent.steal, 0)
1319 self.assertNotEqual(cpu_times_percent.user, 0)
1320
1321 def test_boot_time_mocked(self):
1322 with mock.patch('psutil._common.open', create=True) as m:
1323 self.assertRaises(
1324 RuntimeError,
1325 psutil._pslinux.boot_time)
1326 assert m.called
1327
1328 def test_users_mocked(self):
1329 # Make sure ':0' and ':0.0' (returned by C ext) are converted
1330 # to 'localhost'.
1331 with mock.patch('psutil._pslinux.cext.users',
1332 return_value=[('giampaolo', 'pts/2', ':0',
1333 1436573184.0, True, 2)]) as m:
1334 self.assertEqual(psutil.users()[0].host, 'localhost')
1335 assert m.called
1336 with mock.patch('psutil._pslinux.cext.users',
1337 return_value=[('giampaolo', 'pts/2', ':0.0',
1338 1436573184.0, True, 2)]) as m:
1339 self.assertEqual(psutil.users()[0].host, 'localhost')
1340 assert m.called
1341 # ...otherwise it should be returned as-is
1342 with mock.patch('psutil._pslinux.cext.users',
1343 return_value=[('giampaolo', 'pts/2', 'foo',
1344 1436573184.0, True, 2)]) as m:
1345 self.assertEqual(psutil.users()[0].host, 'foo')
1346 assert m.called
1347
1348 def test_procfs_path(self):
1349 tdir = tempfile.mkdtemp()
1350 try:
1351 psutil.PROCFS_PATH = tdir
1352 self.assertRaises(IOError, psutil.virtual_memory)
1353 self.assertRaises(IOError, psutil.cpu_times)
1354 self.assertRaises(IOError, psutil.cpu_times, percpu=True)
1355 self.assertRaises(IOError, psutil.boot_time)
1356 # self.assertRaises(IOError, psutil.pids)
1357 self.assertRaises(IOError, psutil.net_connections)
1358 self.assertRaises(IOError, psutil.net_io_counters)
1359 self.assertRaises(IOError, psutil.net_if_stats)
1360 # self.assertRaises(IOError, psutil.disk_io_counters)
1361 self.assertRaises(IOError, psutil.disk_partitions)
1362 self.assertRaises(psutil.NoSuchProcess, psutil.Process)
1363 finally:
1364 psutil.PROCFS_PATH = "/proc"
1365 os.rmdir(tdir)
1366
1367 def test_issue_687(self):
1368 # In case of thread ID:
1369 # - pid_exists() is supposed to return False
1370 # - Process(tid) is supposed to work
1371 # - pids() should not return the TID
1372 # See: https://github.com/giampaolo/psutil/issues/687
1373 t = ThreadTask()
1374 t.start()
1375 try:
1376 p = psutil.Process()
1377 tid = p.threads()[1].id
1378 assert not psutil.pid_exists(tid), tid
1379 pt = psutil.Process(tid)
1380 pt.as_dict()
1381 self.assertNotIn(tid, psutil.pids())
1382 finally:
1383 t.stop()
1384
1385 def test_pid_exists_no_proc_status(self):
1386 # Internally pid_exists relies on /proc/{pid}/status.
1387 # Emulate a case where this file is empty in which case
1388 # psutil is supposed to fall back on using pids().
1389 with mock_open_content("/proc/%s/status", "") as m:
1390 assert psutil.pid_exists(os.getpid())
1391 assert m.called
1392
1393
1394 # =====================================================================
1395 # --- sensors
1396 # =====================================================================
1397
1398
1399 @unittest.skipIf(not LINUX, "LINUX only")
1400 @unittest.skipIf(not HAS_BATTERY, "no battery")
1401 class TestSensorsBattery(unittest.TestCase):
1402
1403 @unittest.skipIf(not which("acpi"), "acpi utility not available")
1404 def test_percent(self):
1405 out = sh("acpi -b")
1406 acpi_value = int(out.split(",")[1].strip().replace('%', ''))
1407 psutil_value = psutil.sensors_battery().percent
1408 self.assertAlmostEqual(acpi_value, psutil_value, delta=1)
1409
1410 @unittest.skipIf(not which("acpi"), "acpi utility not available")
1411 def test_power_plugged(self):
1412 out = sh("acpi -b")
1413 if 'unknown' in out.lower():
1414 return unittest.skip("acpi output not reliable")
1415 if 'discharging at zero rate' in out:
1416 plugged = True
1417 else:
1418 plugged = "Charging" in out.split('\n')[0]
1419 self.assertEqual(psutil.sensors_battery().power_plugged, plugged)
1420
1421 def test_emulate_power_plugged(self):
1422 # Pretend the AC power cable is connected.
1423 def open_mock(name, *args, **kwargs):
1424 if name.endswith("AC0/online") or name.endswith("AC/online"):
1425 return io.BytesIO(b"1")
1426 else:
1427 return orig_open(name, *args, **kwargs)
1428
1429 orig_open = open
1430 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1431 with mock.patch(patch_point, side_effect=open_mock) as m:
1432 self.assertEqual(psutil.sensors_battery().power_plugged, True)
1433 self.assertEqual(
1434 psutil.sensors_battery().secsleft, psutil.POWER_TIME_UNLIMITED)
1435 assert m.called
1436
1437 def test_emulate_power_plugged_2(self):
1438 # Same as above but pretend /AC0/online does not exist in which
1439 # case code relies on /status file.
1440 def open_mock(name, *args, **kwargs):
1441 if name.endswith("AC0/online") or name.endswith("AC/online"):
1442 raise IOError(errno.ENOENT, "")
1443 elif name.endswith("/status"):
1444 return io.StringIO(u("charging"))
1445 else:
1446 return orig_open(name, *args, **kwargs)
1447
1448 orig_open = open
1449 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1450 with mock.patch(patch_point, side_effect=open_mock) as m:
1451 self.assertEqual(psutil.sensors_battery().power_plugged, True)
1452 assert m.called
1453
1454 def test_emulate_power_not_plugged(self):
1455 # Pretend the AC power cable is not connected.
1456 def open_mock(name, *args, **kwargs):
1457 if name.endswith("AC0/online") or name.endswith("AC/online"):
1458 return io.BytesIO(b"0")
1459 else:
1460 return orig_open(name, *args, **kwargs)
1461
1462 orig_open = open
1463 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1464 with mock.patch(patch_point, side_effect=open_mock) as m:
1465 self.assertEqual(psutil.sensors_battery().power_plugged, False)
1466 assert m.called
1467
1468 def test_emulate_power_not_plugged_2(self):
1469 # Same as above but pretend /AC0/online does not exist in which
1470 # case code relies on /status file.
1471 def open_mock(name, *args, **kwargs):
1472 if name.endswith("AC0/online") or name.endswith("AC/online"):
1473 raise IOError(errno.ENOENT, "")
1474 elif name.endswith("/status"):
1475 return io.StringIO(u("discharging"))
1476 else:
1477 return orig_open(name, *args, **kwargs)
1478
1479 orig_open = open
1480 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1481 with mock.patch(patch_point, side_effect=open_mock) as m:
1482 self.assertEqual(psutil.sensors_battery().power_plugged, False)
1483 assert m.called
1484
1485 def test_emulate_power_undetermined(self):
1486 # Pretend we can't know whether the AC power cable not
1487 # connected (assert fallback to False).
1488 def open_mock(name, *args, **kwargs):
1489 if name.startswith("/sys/class/power_supply/AC0/online") or \
1490 name.startswith("/sys/class/power_supply/AC/online"):
1491 raise IOError(errno.ENOENT, "")
1492 elif name.startswith("/sys/class/power_supply/BAT0/status"):
1493 return io.BytesIO(b"???")
1494 else:
1495 return orig_open(name, *args, **kwargs)
1496
1497 orig_open = open
1498 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1499 with mock.patch(patch_point, side_effect=open_mock) as m:
1500 self.assertIsNone(psutil.sensors_battery().power_plugged)
1501 assert m.called
1502
1503 def test_emulate_no_base_files(self):
1504 # Emulate a case where base metrics files are not present,
1505 # in which case we're supposed to get None.
1506 with mock_open_exception(
1507 "/sys/class/power_supply/BAT0/energy_now",
1508 IOError(errno.ENOENT, "")):
1509 with mock_open_exception(
1510 "/sys/class/power_supply/BAT0/charge_now",
1511 IOError(errno.ENOENT, "")):
1512 self.assertIsNone(psutil.sensors_battery())
1513
1514 def test_emulate_energy_full_0(self):
1515 # Emulate a case where energy_full files returns 0.
1516 with mock_open_content(
1517 "/sys/class/power_supply/BAT0/energy_full", b"0") as m:
1518 self.assertEqual(psutil.sensors_battery().percent, 0)
1519 assert m.called
1520
1521 def test_emulate_energy_full_not_avail(self):
1522 # Emulate a case where energy_full file does not exist.
1523 # Expected fallback on /capacity.
1524 with mock_open_exception(
1525 "/sys/class/power_supply/BAT0/energy_full",
1526 IOError(errno.ENOENT, "")):
1527 with mock_open_exception(
1528 "/sys/class/power_supply/BAT0/charge_full",
1529 IOError(errno.ENOENT, "")):
1530 with mock_open_content(
1531 "/sys/class/power_supply/BAT0/capacity", b"88"):
1532 self.assertEqual(psutil.sensors_battery().percent, 88)
1533
1534 def test_emulate_no_power(self):
1535 # Emulate a case where /AC0/online file nor /BAT0/status exist.
1536 with mock_open_exception(
1537 "/sys/class/power_supply/AC/online",
1538 IOError(errno.ENOENT, "")):
1539 with mock_open_exception(
1540 "/sys/class/power_supply/AC0/online",
1541 IOError(errno.ENOENT, "")):
1542 with mock_open_exception(
1543 "/sys/class/power_supply/BAT0/status",
1544 IOError(errno.ENOENT, "")):
1545 self.assertIsNone(psutil.sensors_battery().power_plugged)
1546
1547
1548 @unittest.skipIf(not LINUX, "LINUX only")
1549 class TestSensorsTemperatures(unittest.TestCase):
1550
1551 def test_emulate_class_hwmon(self):
1552 def open_mock(name, *args, **kwargs):
1553 if name.endswith('/name'):
1554 return io.StringIO(u("name"))
1555 elif name.endswith('/temp1_label'):
1556 return io.StringIO(u("label"))
1557 elif name.endswith('/temp1_input'):
1558 return io.BytesIO(b"30000")
1559 elif name.endswith('/temp1_max'):
1560 return io.BytesIO(b"40000")
1561 elif name.endswith('/temp1_crit'):
1562 return io.BytesIO(b"50000")
1563 else:
1564 return orig_open(name, *args, **kwargs)
1565
1566 orig_open = open
1567 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1568 with mock.patch(patch_point, side_effect=open_mock):
1569 # Test case with /sys/class/hwmon
1570 with mock.patch('glob.glob',
1571 return_value=['/sys/class/hwmon/hwmon0/temp1']):
1572 temp = psutil.sensors_temperatures()['name'][0]
1573 self.assertEqual(temp.label, 'label')
1574 self.assertEqual(temp.current, 30.0)
1575 self.assertEqual(temp.high, 40.0)
1576 self.assertEqual(temp.critical, 50.0)
1577
1578 def test_emulate_class_thermal(self):
1579 def open_mock(name, *args, **kwargs):
1580 if name.endswith('0_temp'):
1581 return io.BytesIO(b"50000")
1582 elif name.endswith('temp'):
1583 return io.BytesIO(b"30000")
1584 elif name.endswith('0_type'):
1585 return io.StringIO(u("critical"))
1586 elif name.endswith('type'):
1587 return io.StringIO(u("name"))
1588 else:
1589 return orig_open(name, *args, **kwargs)
1590
1591 def glob_mock(path):
1592 if path == '/sys/class/hwmon/hwmon*/temp*_*':
1593 return []
1594 elif path == '/sys/class/hwmon/hwmon*/device/temp*_*':
1595 return []
1596 elif path == '/sys/class/thermal/thermal_zone*':
1597 return ['/sys/class/thermal/thermal_zone0']
1598 elif path == '/sys/class/thermal/thermal_zone0/trip_point*':
1599 return ['/sys/class/thermal/thermal_zone1/trip_point_0_type',
1600 '/sys/class/thermal/thermal_zone1/trip_point_0_temp']
1601 return []
1602
1603 orig_open = open
1604 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1605 with mock.patch(patch_point, side_effect=open_mock):
1606 with mock.patch('glob.glob', create=True, side_effect=glob_mock):
1607 temp = psutil.sensors_temperatures()['name'][0]
1608 self.assertEqual(temp.label, '')
1609 self.assertEqual(temp.current, 30.0)
1610 self.assertEqual(temp.high, 50.0)
1611 self.assertEqual(temp.critical, 50.0)
1612
1613
1614 @unittest.skipIf(not LINUX, "LINUX only")
1615 class TestSensorsFans(unittest.TestCase):
1616
1617 def test_emulate_data(self):
1618 def open_mock(name, *args, **kwargs):
1619 if name.endswith('/name'):
1620 return io.StringIO(u("name"))
1621 elif name.endswith('/fan1_label'):
1622 return io.StringIO(u("label"))
1623 elif name.endswith('/fan1_input'):
1624 return io.StringIO(u("2000"))
1625 else:
1626 return orig_open(name, *args, **kwargs)
1627
1628 orig_open = open
1629 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1630 with mock.patch(patch_point, side_effect=open_mock):
1631 with mock.patch('glob.glob',
1632 return_value=['/sys/class/hwmon/hwmon2/fan1']):
1633 fan = psutil.sensors_fans()['name'][0]
1634 self.assertEqual(fan.label, 'label')
1635 self.assertEqual(fan.current, 2000)
1636
1637
1638 # =====================================================================
1639 # --- test process
1640 # =====================================================================
1641
1642
1643 @unittest.skipIf(not LINUX, "LINUX only")
1644 class TestProcess(unittest.TestCase):
1645
1646 def setUp(self):
1647 safe_rmpath(TESTFN)
1648
1649 tearDown = setUp
1650
1651 def test_memory_full_info(self):
1652 src = textwrap.dedent("""
1653 import time
1654 with open("%s", "w") as f:
1655 time.sleep(10)
1656 """ % TESTFN)
1657 sproc = pyrun(src)
1658 self.addCleanup(reap_children)
1659 call_until(lambda: os.listdir('.'), "'%s' not in ret" % TESTFN)
1660 p = psutil.Process(sproc.pid)
1661 time.sleep(.1)
1662 mem = p.memory_full_info()
1663 maps = p.memory_maps(grouped=False)
1664 self.assertAlmostEqual(
1665 mem.uss, sum([x.private_dirty + x.private_clean for x in maps]),
1666 delta=4096)
1667 self.assertAlmostEqual(
1668 mem.pss, sum([x.pss for x in maps]), delta=4096)
1669 self.assertAlmostEqual(
1670 mem.swap, sum([x.swap for x in maps]), delta=4096)
1671
1672 def test_memory_full_info_mocked(self):
1673 # See: https://github.com/giampaolo/psutil/issues/1222
1674 with mock_open_content(
1675 "/proc/%s/smaps" % os.getpid(),
1676 textwrap.dedent("""\
1677 fffff0 r-xp 00000000 00:00 0 [vsyscall]
1678 Size: 1 kB
1679 Rss: 2 kB
1680 Pss: 3 kB
1681 Shared_Clean: 4 kB
1682 Shared_Dirty: 5 kB
1683 Private_Clean: 6 kB
1684 Private_Dirty: 7 kB
1685 Referenced: 8 kB
1686 Anonymous: 9 kB
1687 LazyFree: 10 kB
1688 AnonHugePages: 11 kB
1689 ShmemPmdMapped: 12 kB
1690 Shared_Hugetlb: 13 kB
1691 Private_Hugetlb: 14 kB
1692 Swap: 15 kB
1693 SwapPss: 16 kB
1694 KernelPageSize: 17 kB
1695 MMUPageSize: 18 kB
1696 Locked: 19 kB
1697 VmFlags: rd ex
1698 """).encode()) as m:
1699 p = psutil.Process()
1700 mem = p.memory_full_info()
1701 assert m.called
1702 self.assertEqual(mem.uss, (6 + 7 + 14) * 1024)
1703 self.assertEqual(mem.pss, 3 * 1024)
1704 self.assertEqual(mem.swap, 15 * 1024)
1705
1706 # On PYPY file descriptors are not closed fast enough.
1707 @unittest.skipIf(PYPY, "unreliable on PYPY")
1708 def test_open_files_mode(self):
1709 def get_test_file():
1710 p = psutil.Process()
1711 giveup_at = time.time() + 2
1712 while True:
1713 for file in p.open_files():
1714 if file.path == os.path.abspath(TESTFN):
1715 return file
1716 elif time.time() > giveup_at:
1717 break
1718 raise RuntimeError("timeout looking for test file")
1719
1720 #
1721 with open(TESTFN, "w"):
1722 self.assertEqual(get_test_file().mode, "w")
1723 with open(TESTFN, "r"):
1724 self.assertEqual(get_test_file().mode, "r")
1725 with open(TESTFN, "a"):
1726 self.assertEqual(get_test_file().mode, "a")
1727 #
1728 with open(TESTFN, "r+"):
1729 self.assertEqual(get_test_file().mode, "r+")
1730 with open(TESTFN, "w+"):
1731 self.assertEqual(get_test_file().mode, "r+")
1732 with open(TESTFN, "a+"):
1733 self.assertEqual(get_test_file().mode, "a+")
1734 # note: "x" bit is not supported
1735 if PY3:
1736 safe_rmpath(TESTFN)
1737 with open(TESTFN, "x"):
1738 self.assertEqual(get_test_file().mode, "w")
1739 safe_rmpath(TESTFN)
1740 with open(TESTFN, "x+"):
1741 self.assertEqual(get_test_file().mode, "r+")
1742
1743 def test_open_files_file_gone(self):
1744 # simulates a file which gets deleted during open_files()
1745 # execution
1746 p = psutil.Process()
1747 files = p.open_files()
1748 with tempfile.NamedTemporaryFile():
1749 # give the kernel some time to see the new file
1750 call_until(p.open_files, "len(ret) != %i" % len(files))
1751 with mock.patch('psutil._pslinux.os.readlink',
1752 side_effect=OSError(errno.ENOENT, "")) as m:
1753 files = p.open_files()
1754 assert not files
1755 assert m.called
1756 # also simulate the case where os.readlink() returns EINVAL
1757 # in which case psutil is supposed to 'continue'
1758 with mock.patch('psutil._pslinux.os.readlink',
1759 side_effect=OSError(errno.EINVAL, "")) as m:
1760 self.assertEqual(p.open_files(), [])
1761 assert m.called
1762
1763 def test_open_files_fd_gone(self):
1764 # Simulate a case where /proc/{pid}/fdinfo/{fd} disappears
1765 # while iterating through fds.
1766 # https://travis-ci.org/giampaolo/psutil/jobs/225694530
1767 p = psutil.Process()
1768 files = p.open_files()
1769 with tempfile.NamedTemporaryFile():
1770 # give the kernel some time to see the new file
1771 call_until(p.open_files, "len(ret) != %i" % len(files))
1772 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1773 with mock.patch(patch_point,
1774 side_effect=IOError(errno.ENOENT, "")) as m:
1775 files = p.open_files()
1776 assert not files
1777 assert m.called
1778
1779 # --- mocked tests
1780
1781 def test_terminal_mocked(self):
1782 with mock.patch('psutil._pslinux._psposix.get_terminal_map',
1783 return_value={}) as m:
1784 self.assertIsNone(psutil._pslinux.Process(os.getpid()).terminal())
1785 assert m.called
1786
1787 # TODO: re-enable this test.
1788 # def test_num_ctx_switches_mocked(self):
1789 # with mock.patch('psutil._common.open', create=True) as m:
1790 # self.assertRaises(
1791 # NotImplementedError,
1792 # psutil._pslinux.Process(os.getpid()).num_ctx_switches)
1793 # assert m.called
1794
1795 def test_cmdline_mocked(self):
1796 # see: https://github.com/giampaolo/psutil/issues/639
1797 p = psutil.Process()
1798 fake_file = io.StringIO(u('foo\x00bar\x00'))
1799 with mock.patch('psutil._common.open',
1800 return_value=fake_file, create=True) as m:
1801 self.assertEqual(p.cmdline(), ['foo', 'bar'])
1802 assert m.called
1803 fake_file = io.StringIO(u('foo\x00bar\x00\x00'))
1804 with mock.patch('psutil._common.open',
1805 return_value=fake_file, create=True) as m:
1806 self.assertEqual(p.cmdline(), ['foo', 'bar', ''])
1807 assert m.called
1808
1809 def test_cmdline_spaces_mocked(self):
1810 # see: https://github.com/giampaolo/psutil/issues/1179
1811 p = psutil.Process()
1812 fake_file = io.StringIO(u('foo bar '))
1813 with mock.patch('psutil._common.open',
1814 return_value=fake_file, create=True) as m:
1815 self.assertEqual(p.cmdline(), ['foo', 'bar'])
1816 assert m.called
1817 fake_file = io.StringIO(u('foo bar '))
1818 with mock.patch('psutil._common.open',
1819 return_value=fake_file, create=True) as m:
1820 self.assertEqual(p.cmdline(), ['foo', 'bar', ''])
1821 assert m.called
1822
1823 def test_cmdline_mixed_separators(self):
1824 # https://github.com/giampaolo/psutil/issues/
1825 # 1179#issuecomment-552984549
1826 p = psutil.Process()
1827 fake_file = io.StringIO(u('foo\x20bar\x00'))
1828 with mock.patch('psutil._common.open',
1829 return_value=fake_file, create=True) as m:
1830 self.assertEqual(p.cmdline(), ['foo', 'bar'])
1831 assert m.called
1832
1833 def test_readlink_path_deleted_mocked(self):
1834 with mock.patch('psutil._pslinux.os.readlink',
1835 return_value='/home/foo (deleted)'):
1836 self.assertEqual(psutil.Process().exe(), "/home/foo")
1837 self.assertEqual(psutil.Process().cwd(), "/home/foo")
1838
1839 def test_threads_mocked(self):
1840 # Test the case where os.listdir() returns a file (thread)
1841 # which no longer exists by the time we open() it (race
1842 # condition). threads() is supposed to ignore that instead
1843 # of raising NSP.
1844 def open_mock(name, *args, **kwargs):
1845 if name.startswith('/proc/%s/task' % os.getpid()):
1846 raise IOError(errno.ENOENT, "")
1847 else:
1848 return orig_open(name, *args, **kwargs)
1849
1850 orig_open = open
1851 patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1852 with mock.patch(patch_point, side_effect=open_mock) as m:
1853 ret = psutil.Process().threads()
1854 assert m.called
1855 self.assertEqual(ret, [])
1856
1857 # ...but if it bumps into something != ENOENT we want an
1858 # exception.
1859 def open_mock(name, *args, **kwargs):
1860 if name.startswith('/proc/%s/task' % os.getpid()):
1861 raise IOError(errno.EPERM, "")
1862 else:
1863 return orig_open(name, *args, **kwargs)
1864
1865 with mock.patch(patch_point, side_effect=open_mock):
1866 self.assertRaises(psutil.AccessDenied, psutil.Process().threads)
1867
1868 def test_exe_mocked(self):
1869 with mock.patch('psutil._pslinux.readlink',
1870 side_effect=OSError(errno.ENOENT, "")) as m1:
1871 with mock.patch('psutil.Process.cmdline',
1872 side_effect=psutil.AccessDenied(0, "")) as m2:
1873 # No such file error; might be raised also if /proc/pid/exe
1874 # path actually exists for system processes with low pids
1875 # (about 0-20). In this case psutil is supposed to return
1876 # an empty string.
1877 ret = psutil.Process().exe()
1878 assert m1.called
1879 assert m2.called
1880 self.assertEqual(ret, "")
1881
1882 # ...but if /proc/pid no longer exist we're supposed to treat
1883 # it as an alias for zombie process
1884 with mock.patch('psutil._pslinux.os.path.lexists',
1885 return_value=False):
1886 self.assertRaises(
1887 psutil.ZombieProcess, psutil.Process().exe)
1888
1889 def test_issue_1014(self):
1890 # Emulates a case where smaps file does not exist. In this case
1891 # wrap_exception decorator should not raise NoSuchProcess.
1892 with mock_open_exception(
1893 '/proc/%s/smaps' % os.getpid(),
1894 IOError(errno.ENOENT, "")) as m:
1895 p = psutil.Process()
1896 with self.assertRaises(FileNotFoundError):
1897 p.memory_maps()
1898 assert m.called
1899
1900 @unittest.skipIf(not HAS_RLIMIT, "not supported")
1901 def test_rlimit_zombie(self):
1902 # Emulate a case where rlimit() raises ENOSYS, which may
1903 # happen in case of zombie process:
1904 # https://travis-ci.org/giampaolo/psutil/jobs/51368273
1905 with mock.patch("psutil._pslinux.cext.linux_prlimit",
1906 side_effect=OSError(errno.ENOSYS, "")) as m:
1907 p = psutil.Process()
1908 p.name()
1909 with self.assertRaises(psutil.ZombieProcess) as exc:
1910 p.rlimit(psutil.RLIMIT_NOFILE)
1911 assert m.called
1912 self.assertEqual(exc.exception.pid, p.pid)
1913 self.assertEqual(exc.exception.name, p.name())
1914
1915 def test_cwd_zombie(self):
1916 with mock.patch("psutil._pslinux.os.readlink",
1917 side_effect=OSError(errno.ENOENT, "")) as m:
1918 p = psutil.Process()
1919 p.name()
1920 with self.assertRaises(psutil.ZombieProcess) as exc:
1921 p.cwd()
1922 assert m.called
1923 self.assertEqual(exc.exception.pid, p.pid)
1924 self.assertEqual(exc.exception.name, p.name())
1925
1926 def test_stat_file_parsing(self):
1927 from psutil._pslinux import CLOCK_TICKS
1928
1929 args = [
1930 "0", # pid
1931 "(cat)", # name
1932 "Z", # status
1933 "1", # ppid
1934 "0", # pgrp
1935 "0", # session
1936 "0", # tty
1937 "0", # tpgid
1938 "0", # flags
1939 "0", # minflt
1940 "0", # cminflt
1941 "0", # majflt
1942 "0", # cmajflt
1943 "2", # utime
1944 "3", # stime
1945 "4", # cutime
1946 "5", # cstime
1947 "0", # priority
1948 "0", # nice
1949 "0", # num_threads
1950 "0", # itrealvalue
1951 "6", # starttime
1952 "0", # vsize
1953 "0", # rss
1954 "0", # rsslim
1955 "0", # startcode
1956 "0", # endcode
1957 "0", # startstack
1958 "0", # kstkesp
1959 "0", # kstkeip
1960 "0", # signal
1961 "0", # blocked
1962 "0", # sigignore
1963 "0", # sigcatch
1964 "0", # wchan
1965 "0", # nswap
1966 "0", # cnswap
1967 "0", # exit_signal
1968 "6", # processor
1969 "0", # rt priority
1970 "0", # policy
1971 "7", # delayacct_blkio_ticks
1972 ]
1973 content = " ".join(args).encode()
1974 with mock_open_content('/proc/%s/stat' % os.getpid(), content):
1975 p = psutil.Process()
1976 self.assertEqual(p.name(), 'cat')
1977 self.assertEqual(p.status(), psutil.STATUS_ZOMBIE)
1978 self.assertEqual(p.ppid(), 1)
1979 self.assertEqual(
1980 p.create_time(), 6 / CLOCK_TICKS + psutil.boot_time())
1981 cpu = p.cpu_times()
1982 self.assertEqual(cpu.user, 2 / CLOCK_TICKS)
1983 self.assertEqual(cpu.system, 3 / CLOCK_TICKS)
1984 self.assertEqual(cpu.children_user, 4 / CLOCK_TICKS)
1985 self.assertEqual(cpu.children_system, 5 / CLOCK_TICKS)
1986 self.assertEqual(cpu.iowait, 7 / CLOCK_TICKS)
1987 self.assertEqual(p.cpu_num(), 6)
1988
1989 def test_status_file_parsing(self):
1990 with mock_open_content(
1991 '/proc/%s/status' % os.getpid(),
1992 textwrap.dedent("""\
1993 Uid:\t1000\t1001\t1002\t1003
1994 Gid:\t1004\t1005\t1006\t1007
1995 Threads:\t66
1996 Cpus_allowed:\tf
1997 Cpus_allowed_list:\t0-7
1998 voluntary_ctxt_switches:\t12
1999 nonvoluntary_ctxt_switches:\t13""").encode()):
2000 p = psutil.Process()
2001 self.assertEqual(p.num_ctx_switches().voluntary, 12)
2002 self.assertEqual(p.num_ctx_switches().involuntary, 13)
2003 self.assertEqual(p.num_threads(), 66)
2004 uids = p.uids()
2005 self.assertEqual(uids.real, 1000)
2006 self.assertEqual(uids.effective, 1001)
2007 self.assertEqual(uids.saved, 1002)
2008 gids = p.gids()
2009 self.assertEqual(gids.real, 1004)
2010 self.assertEqual(gids.effective, 1005)
2011 self.assertEqual(gids.saved, 1006)
2012 self.assertEqual(p._proc._get_eligible_cpus(), list(range(0, 8)))
2013
2014
2015 @unittest.skipIf(not LINUX, "LINUX only")
2016 class TestProcessAgainstStatus(unittest.TestCase):
2017 """/proc/pid/stat and /proc/pid/status have many values in common.
2018 Whenever possible, psutil uses /proc/pid/stat (it's faster).
2019 For all those cases we check that the value found in
2020 /proc/pid/stat (by psutil) matches the one found in
2021 /proc/pid/status.
2022 """
2023
2024 @classmethod
2025 def setUpClass(cls):
2026 cls.proc = psutil.Process()
2027
2028 def read_status_file(self, linestart):
2029 with psutil._psplatform.open_text(
2030 '/proc/%s/status' % self.proc.pid) as f:
2031 for line in f:
2032 line = line.strip()
2033 if line.startswith(linestart):
2034 value = line.partition('\t')[2]
2035 try:
2036 return int(value)
2037 except ValueError:
2038 return value
2039 raise ValueError("can't find %r" % linestart)
2040
2041 def test_name(self):
2042 value = self.read_status_file("Name:")
2043 self.assertEqual(self.proc.name(), value)
2044
2045 def test_status(self):
2046 value = self.read_status_file("State:")
2047 value = value[value.find('(') + 1:value.rfind(')')]
2048 value = value.replace(' ', '-')
2049 self.assertEqual(self.proc.status(), value)
2050
2051 def test_ppid(self):
2052 value = self.read_status_file("PPid:")
2053 self.assertEqual(self.proc.ppid(), value)
2054
2055 def test_num_threads(self):
2056 value = self.read_status_file("Threads:")
2057 self.assertEqual(self.proc.num_threads(), value)
2058
2059 def test_uids(self):
2060 value = self.read_status_file("Uid:")
2061 value = tuple(map(int, value.split()[1:4]))
2062 self.assertEqual(self.proc.uids(), value)
2063
2064 def test_gids(self):
2065 value = self.read_status_file("Gid:")
2066 value = tuple(map(int, value.split()[1:4]))
2067 self.assertEqual(self.proc.gids(), value)
2068
2069 @retry_on_failure()
2070 def test_num_ctx_switches(self):
2071 value = self.read_status_file("voluntary_ctxt_switches:")
2072 self.assertEqual(self.proc.num_ctx_switches().voluntary, value)
2073 value = self.read_status_file("nonvoluntary_ctxt_switches:")
2074 self.assertEqual(self.proc.num_ctx_switches().involuntary, value)
2075
2076 def test_cpu_affinity(self):
2077 value = self.read_status_file("Cpus_allowed_list:")
2078 if '-' in str(value):
2079 min_, max_ = map(int, value.split('-'))
2080 self.assertEqual(
2081 self.proc.cpu_affinity(), list(range(min_, max_ + 1)))
2082
2083 def test_cpu_affinity_eligible_cpus(self):
2084 value = self.read_status_file("Cpus_allowed_list:")
2085 with mock.patch("psutil._pslinux.per_cpu_times") as m:
2086 self.proc._proc._get_eligible_cpus()
2087 if '-' in str(value):
2088 assert not m.called
2089 else:
2090 assert m.called
2091
2092
2093 # =====================================================================
2094 # --- test utils
2095 # =====================================================================
2096
2097
2098 @unittest.skipIf(not LINUX, "LINUX only")
2099 class TestUtils(unittest.TestCase):
2100
2101 def test_readlink(self):
2102 with mock.patch("os.readlink", return_value="foo (deleted)") as m:
2103 self.assertEqual(psutil._psplatform.readlink("bar"), "foo")
2104 assert m.called
2105
2106 def test_cat(self):
2107 fname = os.path.abspath(TESTFN)
2108 with open(fname, "wt") as f:
2109 f.write("foo ")
2110 self.assertEqual(psutil._psplatform.cat(TESTFN, binary=False), "foo")
2111 self.assertEqual(psutil._psplatform.cat(TESTFN, binary=True), b"foo")
2112 self.assertEqual(
2113 psutil._psplatform.cat(TESTFN + '??', fallback="bar"), "bar")
2114
2115
2116 if __name__ == '__main__':
2117 from psutil.tests.runner import run
2118 run(__file__)