Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/psutil/tests/test_linux.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. | |
4 # Use of this source code is governed by a BSD-style license that can be | |
5 # found in the LICENSE file. | |
6 | |
7 """Linux specific tests.""" | |
8 | |
9 from __future__ import division | |
10 import collections | |
11 import contextlib | |
12 import errno | |
13 import glob | |
14 import io | |
15 import os | |
16 import re | |
17 import shutil | |
18 import socket | |
19 import struct | |
20 import textwrap | |
21 import time | |
22 import warnings | |
23 | |
24 import psutil | |
25 from psutil import LINUX | |
26 from psutil._compat import basestring | |
27 from psutil._compat import FileNotFoundError | |
28 from psutil._compat import PY3 | |
29 from psutil._compat import u | |
30 from psutil.tests import call_until | |
31 from psutil.tests import GLOBAL_TIMEOUT | |
32 from psutil.tests import HAS_BATTERY | |
33 from psutil.tests import HAS_CPU_FREQ | |
34 from psutil.tests import HAS_GETLOADAVG | |
35 from psutil.tests import HAS_RLIMIT | |
36 from psutil.tests import mock | |
37 from psutil.tests import PsutilTestCase | |
38 from psutil.tests import PYPY | |
39 from psutil.tests import reload_module | |
40 from psutil.tests import retry_on_failure | |
41 from psutil.tests import safe_rmpath | |
42 from psutil.tests import sh | |
43 from psutil.tests import skip_on_not_implemented | |
44 from psutil.tests import ThreadTask | |
45 from psutil.tests import TOLERANCE_DISK_USAGE | |
46 from psutil.tests import TOLERANCE_SYS_MEM | |
47 from psutil.tests import unittest | |
48 from psutil.tests import which | |
49 | |
50 | |
51 HERE = os.path.abspath(os.path.dirname(__file__)) | |
52 SIOCGIFADDR = 0x8915 | |
53 SIOCGIFCONF = 0x8912 | |
54 SIOCGIFHWADDR = 0x8927 | |
55 SIOCGIFNETMASK = 0x891b | |
56 SIOCGIFBRDADDR = 0x8919 | |
57 if LINUX: | |
58 SECTOR_SIZE = 512 | |
59 EMPTY_TEMPERATURES = not glob.glob('/sys/class/hwmon/hwmon*') | |
60 | |
61 # ===================================================================== | |
62 # --- utils | |
63 # ===================================================================== | |
64 | |
65 | |
66 def get_ipv4_address(ifname): | |
67 import fcntl | |
68 ifname = ifname[:15] | |
69 if PY3: | |
70 ifname = bytes(ifname, 'ascii') | |
71 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
72 with contextlib.closing(s): | |
73 return socket.inet_ntoa( | |
74 fcntl.ioctl(s.fileno(), | |
75 SIOCGIFADDR, | |
76 struct.pack('256s', ifname))[20:24]) | |
77 | |
78 | |
79 def get_ipv4_netmask(ifname): | |
80 import fcntl | |
81 ifname = ifname[:15] | |
82 if PY3: | |
83 ifname = bytes(ifname, 'ascii') | |
84 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
85 with contextlib.closing(s): | |
86 return socket.inet_ntoa( | |
87 fcntl.ioctl(s.fileno(), | |
88 SIOCGIFNETMASK, | |
89 struct.pack('256s', ifname))[20:24]) | |
90 | |
91 | |
92 def get_ipv4_broadcast(ifname): | |
93 import fcntl | |
94 ifname = ifname[:15] | |
95 if PY3: | |
96 ifname = bytes(ifname, 'ascii') | |
97 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
98 with contextlib.closing(s): | |
99 return socket.inet_ntoa( | |
100 fcntl.ioctl(s.fileno(), | |
101 SIOCGIFBRDADDR, | |
102 struct.pack('256s', ifname))[20:24]) | |
103 | |
104 | |
105 def get_ipv6_address(ifname): | |
106 with open("/proc/net/if_inet6", 'rt') as f: | |
107 for line in f.readlines(): | |
108 fields = line.split() | |
109 if fields[-1] == ifname: | |
110 break | |
111 else: | |
112 raise ValueError("could not find interface %r" % ifname) | |
113 unformatted = fields[0] | |
114 groups = [] | |
115 for i in range(0, len(unformatted), 4): | |
116 groups.append(unformatted[i:i + 4]) | |
117 formatted = ":".join(groups) | |
118 packed = socket.inet_pton(socket.AF_INET6, formatted) | |
119 return socket.inet_ntop(socket.AF_INET6, packed) | |
120 | |
121 | |
122 def get_mac_address(ifname): | |
123 import fcntl | |
124 ifname = ifname[:15] | |
125 if PY3: | |
126 ifname = bytes(ifname, 'ascii') | |
127 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
128 with contextlib.closing(s): | |
129 info = fcntl.ioctl( | |
130 s.fileno(), SIOCGIFHWADDR, struct.pack('256s', ifname)) | |
131 if PY3: | |
132 def ord(x): | |
133 return x | |
134 else: | |
135 import __builtin__ | |
136 ord = __builtin__.ord | |
137 return ''.join(['%02x:' % ord(char) for char in info[18:24]])[:-1] | |
138 | |
139 | |
140 def free_swap(): | |
141 """Parse 'free' cmd and return swap memory's s total, used and free | |
142 values. | |
143 """ | |
144 out = sh('free -b', env={"LANG": "C.UTF-8"}) | |
145 lines = out.split('\n') | |
146 for line in lines: | |
147 if line.startswith('Swap'): | |
148 _, total, used, free = line.split() | |
149 nt = collections.namedtuple('free', 'total used free') | |
150 return nt(int(total), int(used), int(free)) | |
151 raise ValueError( | |
152 "can't find 'Swap' in 'free' output:\n%s" % '\n'.join(lines)) | |
153 | |
154 | |
155 def free_physmem(): | |
156 """Parse 'free' cmd and return physical memory's total, used | |
157 and free values. | |
158 """ | |
159 # Note: free can have 2 different formats, invalidating 'shared' | |
160 # and 'cached' memory which may have different positions so we | |
161 # do not return them. | |
162 # https://github.com/giampaolo/psutil/issues/538#issuecomment-57059946 | |
163 out = sh('free -b', env={"LANG": "C.UTF-8"}) | |
164 lines = out.split('\n') | |
165 for line in lines: | |
166 if line.startswith('Mem'): | |
167 total, used, free, shared = \ | |
168 [int(x) for x in line.split()[1:5]] | |
169 nt = collections.namedtuple( | |
170 'free', 'total used free shared output') | |
171 return nt(total, used, free, shared, out) | |
172 raise ValueError( | |
173 "can't find 'Mem' in 'free' output:\n%s" % '\n'.join(lines)) | |
174 | |
175 | |
176 def vmstat(stat): | |
177 out = sh("vmstat -s", env={"LANG": "C.UTF-8"}) | |
178 for line in out.split("\n"): | |
179 line = line.strip() | |
180 if stat in line: | |
181 return int(line.split(' ')[0]) | |
182 raise ValueError("can't find %r in 'vmstat' output" % stat) | |
183 | |
184 | |
185 def get_free_version_info(): | |
186 out = sh("free -V").strip() | |
187 if 'UNKNOWN' in out: | |
188 raise unittest.SkipTest("can't determine free version") | |
189 return tuple(map(int, out.split()[-1].split('.'))) | |
190 | |
191 | |
192 @contextlib.contextmanager | |
193 def mock_open_content(for_path, content): | |
194 """Mock open() builtin and forces it to return a certain `content` | |
195 on read() if the path being opened matches `for_path`. | |
196 """ | |
197 def open_mock(name, *args, **kwargs): | |
198 if name == for_path: | |
199 if PY3: | |
200 if isinstance(content, basestring): | |
201 return io.StringIO(content) | |
202 else: | |
203 return io.BytesIO(content) | |
204 else: | |
205 return io.BytesIO(content) | |
206 else: | |
207 return orig_open(name, *args, **kwargs) | |
208 | |
209 orig_open = open | |
210 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
211 with mock.patch(patch_point, create=True, side_effect=open_mock) as m: | |
212 yield m | |
213 | |
214 | |
215 @contextlib.contextmanager | |
216 def mock_open_exception(for_path, exc): | |
217 """Mock open() builtin and raises `exc` if the path being opened | |
218 matches `for_path`. | |
219 """ | |
220 def open_mock(name, *args, **kwargs): | |
221 if name == for_path: | |
222 raise exc | |
223 else: | |
224 return orig_open(name, *args, **kwargs) | |
225 | |
226 orig_open = open | |
227 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
228 with mock.patch(patch_point, create=True, side_effect=open_mock) as m: | |
229 yield m | |
230 | |
231 | |
232 # ===================================================================== | |
233 # --- system virtual memory | |
234 # ===================================================================== | |
235 | |
236 | |
237 @unittest.skipIf(not LINUX, "LINUX only") | |
238 class TestSystemVirtualMemory(PsutilTestCase): | |
239 | |
240 def test_total(self): | |
241 # free_value = free_physmem().total | |
242 # psutil_value = psutil.virtual_memory().total | |
243 # self.assertEqual(free_value, psutil_value) | |
244 vmstat_value = vmstat('total memory') * 1024 | |
245 psutil_value = psutil.virtual_memory().total | |
246 self.assertAlmostEqual(vmstat_value, psutil_value) | |
247 | |
248 @retry_on_failure() | |
249 def test_used(self): | |
250 # Older versions of procps used slab memory to calculate used memory. | |
251 # This got changed in: | |
252 # https://gitlab.com/procps-ng/procps/commit/ | |
253 # 05d751c4f076a2f0118b914c5e51cfbb4762ad8e | |
254 if get_free_version_info() < (3, 3, 12): | |
255 raise self.skipTest("old free version") | |
256 free = free_physmem() | |
257 free_value = free.used | |
258 psutil_value = psutil.virtual_memory().used | |
259 self.assertAlmostEqual( | |
260 free_value, psutil_value, delta=TOLERANCE_SYS_MEM, | |
261 msg='%s %s \n%s' % (free_value, psutil_value, free.output)) | |
262 | |
263 @retry_on_failure() | |
264 def test_free(self): | |
265 vmstat_value = vmstat('free memory') * 1024 | |
266 psutil_value = psutil.virtual_memory().free | |
267 self.assertAlmostEqual( | |
268 vmstat_value, psutil_value, delta=TOLERANCE_SYS_MEM) | |
269 | |
270 @retry_on_failure() | |
271 def test_buffers(self): | |
272 vmstat_value = vmstat('buffer memory') * 1024 | |
273 psutil_value = psutil.virtual_memory().buffers | |
274 self.assertAlmostEqual( | |
275 vmstat_value, psutil_value, delta=TOLERANCE_SYS_MEM) | |
276 | |
277 @retry_on_failure() | |
278 def test_active(self): | |
279 vmstat_value = vmstat('active memory') * 1024 | |
280 psutil_value = psutil.virtual_memory().active | |
281 self.assertAlmostEqual( | |
282 vmstat_value, psutil_value, delta=TOLERANCE_SYS_MEM) | |
283 | |
284 @retry_on_failure() | |
285 def test_inactive(self): | |
286 vmstat_value = vmstat('inactive memory') * 1024 | |
287 psutil_value = psutil.virtual_memory().inactive | |
288 self.assertAlmostEqual( | |
289 vmstat_value, psutil_value, delta=TOLERANCE_SYS_MEM) | |
290 | |
291 @retry_on_failure() | |
292 def test_shared(self): | |
293 free = free_physmem() | |
294 free_value = free.shared | |
295 if free_value == 0: | |
296 raise unittest.SkipTest("free does not support 'shared' column") | |
297 psutil_value = psutil.virtual_memory().shared | |
298 self.assertAlmostEqual( | |
299 free_value, psutil_value, delta=TOLERANCE_SYS_MEM, | |
300 msg='%s %s \n%s' % (free_value, psutil_value, free.output)) | |
301 | |
302 @retry_on_failure() | |
303 def test_available(self): | |
304 # "free" output format has changed at some point: | |
305 # https://github.com/giampaolo/psutil/issues/538#issuecomment-147192098 | |
306 out = sh("free -b") | |
307 lines = out.split('\n') | |
308 if 'available' not in lines[0]: | |
309 raise unittest.SkipTest("free does not support 'available' column") | |
310 else: | |
311 free_value = int(lines[1].split()[-1]) | |
312 psutil_value = psutil.virtual_memory().available | |
313 self.assertAlmostEqual( | |
314 free_value, psutil_value, delta=TOLERANCE_SYS_MEM, | |
315 msg='%s %s \n%s' % (free_value, psutil_value, out)) | |
316 | |
317 def test_warnings_on_misses(self): | |
318 # Emulate a case where /proc/meminfo provides few info. | |
319 # psutil is supposed to set the missing fields to 0 and | |
320 # raise a warning. | |
321 with mock_open_content( | |
322 '/proc/meminfo', | |
323 textwrap.dedent("""\ | |
324 Active(anon): 6145416 kB | |
325 Active(file): 2950064 kB | |
326 Inactive(anon): 574764 kB | |
327 Inactive(file): 1567648 kB | |
328 MemAvailable: -1 kB | |
329 MemFree: 2057400 kB | |
330 MemTotal: 16325648 kB | |
331 SReclaimable: 346648 kB | |
332 """).encode()) as m: | |
333 with warnings.catch_warnings(record=True) as ws: | |
334 warnings.simplefilter("always") | |
335 ret = psutil.virtual_memory() | |
336 assert m.called | |
337 self.assertEqual(len(ws), 1) | |
338 w = ws[0] | |
339 assert w.filename.endswith('psutil/_pslinux.py') | |
340 self.assertIn( | |
341 "memory stats couldn't be determined", str(w.message)) | |
342 self.assertIn("cached", str(w.message)) | |
343 self.assertIn("shared", str(w.message)) | |
344 self.assertIn("active", str(w.message)) | |
345 self.assertIn("inactive", str(w.message)) | |
346 self.assertIn("buffers", str(w.message)) | |
347 self.assertIn("available", str(w.message)) | |
348 self.assertEqual(ret.cached, 0) | |
349 self.assertEqual(ret.active, 0) | |
350 self.assertEqual(ret.inactive, 0) | |
351 self.assertEqual(ret.shared, 0) | |
352 self.assertEqual(ret.buffers, 0) | |
353 self.assertEqual(ret.available, 0) | |
354 self.assertEqual(ret.slab, 0) | |
355 | |
356 @retry_on_failure() | |
357 def test_avail_old_percent(self): | |
358 # Make sure that our calculation of avail mem for old kernels | |
359 # is off by max 15%. | |
360 from psutil._pslinux import calculate_avail_vmem | |
361 from psutil._pslinux import open_binary | |
362 | |
363 mems = {} | |
364 with open_binary('/proc/meminfo') as f: | |
365 for line in f: | |
366 fields = line.split() | |
367 mems[fields[0]] = int(fields[1]) * 1024 | |
368 | |
369 a = calculate_avail_vmem(mems) | |
370 if b'MemAvailable:' in mems: | |
371 b = mems[b'MemAvailable:'] | |
372 diff_percent = abs(a - b) / a * 100 | |
373 self.assertLess(diff_percent, 15) | |
374 | |
375 def test_avail_old_comes_from_kernel(self): | |
376 # Make sure "MemAvailable:" coluimn is used instead of relying | |
377 # on our internal algorithm to calculate avail mem. | |
378 with mock_open_content( | |
379 '/proc/meminfo', | |
380 textwrap.dedent("""\ | |
381 Active: 9444728 kB | |
382 Active(anon): 6145416 kB | |
383 Active(file): 2950064 kB | |
384 Buffers: 287952 kB | |
385 Cached: 4818144 kB | |
386 Inactive(file): 1578132 kB | |
387 Inactive(anon): 574764 kB | |
388 Inactive(file): 1567648 kB | |
389 MemAvailable: 6574984 kB | |
390 MemFree: 2057400 kB | |
391 MemTotal: 16325648 kB | |
392 Shmem: 577588 kB | |
393 SReclaimable: 346648 kB | |
394 """).encode()) as m: | |
395 with warnings.catch_warnings(record=True) as ws: | |
396 ret = psutil.virtual_memory() | |
397 assert m.called | |
398 self.assertEqual(ret.available, 6574984 * 1024) | |
399 w = ws[0] | |
400 self.assertIn( | |
401 "inactive memory stats couldn't be determined", str(w.message)) | |
402 | |
403 def test_avail_old_missing_fields(self): | |
404 # Remove Active(file), Inactive(file) and SReclaimable | |
405 # from /proc/meminfo and make sure the fallback is used | |
406 # (free + cached), | |
407 with mock_open_content( | |
408 "/proc/meminfo", | |
409 textwrap.dedent("""\ | |
410 Active: 9444728 kB | |
411 Active(anon): 6145416 kB | |
412 Buffers: 287952 kB | |
413 Cached: 4818144 kB | |
414 Inactive(file): 1578132 kB | |
415 Inactive(anon): 574764 kB | |
416 MemFree: 2057400 kB | |
417 MemTotal: 16325648 kB | |
418 Shmem: 577588 kB | |
419 """).encode()) as m: | |
420 with warnings.catch_warnings(record=True) as ws: | |
421 ret = psutil.virtual_memory() | |
422 assert m.called | |
423 self.assertEqual(ret.available, 2057400 * 1024 + 4818144 * 1024) | |
424 w = ws[0] | |
425 self.assertIn( | |
426 "inactive memory stats couldn't be determined", str(w.message)) | |
427 | |
428 def test_avail_old_missing_zoneinfo(self): | |
429 # Remove /proc/zoneinfo file. Make sure fallback is used | |
430 # (free + cached). | |
431 with mock_open_content( | |
432 "/proc/meminfo", | |
433 textwrap.dedent("""\ | |
434 Active: 9444728 kB | |
435 Active(anon): 6145416 kB | |
436 Active(file): 2950064 kB | |
437 Buffers: 287952 kB | |
438 Cached: 4818144 kB | |
439 Inactive(file): 1578132 kB | |
440 Inactive(anon): 574764 kB | |
441 Inactive(file): 1567648 kB | |
442 MemFree: 2057400 kB | |
443 MemTotal: 16325648 kB | |
444 Shmem: 577588 kB | |
445 SReclaimable: 346648 kB | |
446 """).encode()): | |
447 with mock_open_exception( | |
448 "/proc/zoneinfo", | |
449 IOError(errno.ENOENT, 'no such file or directory')): | |
450 with warnings.catch_warnings(record=True) as ws: | |
451 ret = psutil.virtual_memory() | |
452 self.assertEqual( | |
453 ret.available, 2057400 * 1024 + 4818144 * 1024) | |
454 w = ws[0] | |
455 self.assertIn( | |
456 "inactive memory stats couldn't be determined", | |
457 str(w.message)) | |
458 | |
459 def test_virtual_memory_mocked(self): | |
460 # Emulate /proc/meminfo because neither vmstat nor free return slab. | |
461 def open_mock(name, *args, **kwargs): | |
462 if name == '/proc/meminfo': | |
463 return io.BytesIO(textwrap.dedent("""\ | |
464 MemTotal: 100 kB | |
465 MemFree: 2 kB | |
466 MemAvailable: 3 kB | |
467 Buffers: 4 kB | |
468 Cached: 5 kB | |
469 SwapCached: 6 kB | |
470 Active: 7 kB | |
471 Inactive: 8 kB | |
472 Active(anon): 9 kB | |
473 Inactive(anon): 10 kB | |
474 Active(file): 11 kB | |
475 Inactive(file): 12 kB | |
476 Unevictable: 13 kB | |
477 Mlocked: 14 kB | |
478 SwapTotal: 15 kB | |
479 SwapFree: 16 kB | |
480 Dirty: 17 kB | |
481 Writeback: 18 kB | |
482 AnonPages: 19 kB | |
483 Mapped: 20 kB | |
484 Shmem: 21 kB | |
485 Slab: 22 kB | |
486 SReclaimable: 23 kB | |
487 SUnreclaim: 24 kB | |
488 KernelStack: 25 kB | |
489 PageTables: 26 kB | |
490 NFS_Unstable: 27 kB | |
491 Bounce: 28 kB | |
492 WritebackTmp: 29 kB | |
493 CommitLimit: 30 kB | |
494 Committed_AS: 31 kB | |
495 VmallocTotal: 32 kB | |
496 VmallocUsed: 33 kB | |
497 VmallocChunk: 34 kB | |
498 HardwareCorrupted: 35 kB | |
499 AnonHugePages: 36 kB | |
500 ShmemHugePages: 37 kB | |
501 ShmemPmdMapped: 38 kB | |
502 CmaTotal: 39 kB | |
503 CmaFree: 40 kB | |
504 HugePages_Total: 41 kB | |
505 HugePages_Free: 42 kB | |
506 HugePages_Rsvd: 43 kB | |
507 HugePages_Surp: 44 kB | |
508 Hugepagesize: 45 kB | |
509 DirectMap46k: 46 kB | |
510 DirectMap47M: 47 kB | |
511 DirectMap48G: 48 kB | |
512 """).encode()) | |
513 else: | |
514 return orig_open(name, *args, **kwargs) | |
515 | |
516 orig_open = open | |
517 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
518 with mock.patch(patch_point, create=True, side_effect=open_mock) as m: | |
519 mem = psutil.virtual_memory() | |
520 assert m.called | |
521 self.assertEqual(mem.total, 100 * 1024) | |
522 self.assertEqual(mem.free, 2 * 1024) | |
523 self.assertEqual(mem.buffers, 4 * 1024) | |
524 # cached mem also includes reclaimable memory | |
525 self.assertEqual(mem.cached, (5 + 23) * 1024) | |
526 self.assertEqual(mem.shared, 21 * 1024) | |
527 self.assertEqual(mem.active, 7 * 1024) | |
528 self.assertEqual(mem.inactive, 8 * 1024) | |
529 self.assertEqual(mem.slab, 22 * 1024) | |
530 self.assertEqual(mem.available, 3 * 1024) | |
531 | |
532 | |
533 # ===================================================================== | |
534 # --- system swap memory | |
535 # ===================================================================== | |
536 | |
537 | |
538 @unittest.skipIf(not LINUX, "LINUX only") | |
539 class TestSystemSwapMemory(PsutilTestCase): | |
540 | |
541 @staticmethod | |
542 def meminfo_has_swap_info(): | |
543 """Return True if /proc/meminfo provides swap metrics.""" | |
544 with open("/proc/meminfo") as f: | |
545 data = f.read() | |
546 return 'SwapTotal:' in data and 'SwapFree:' in data | |
547 | |
548 def test_total(self): | |
549 free_value = free_swap().total | |
550 psutil_value = psutil.swap_memory().total | |
551 return self.assertAlmostEqual( | |
552 free_value, psutil_value, delta=TOLERANCE_SYS_MEM) | |
553 | |
554 @retry_on_failure() | |
555 def test_used(self): | |
556 free_value = free_swap().used | |
557 psutil_value = psutil.swap_memory().used | |
558 return self.assertAlmostEqual( | |
559 free_value, psutil_value, delta=TOLERANCE_SYS_MEM) | |
560 | |
561 @retry_on_failure() | |
562 def test_free(self): | |
563 free_value = free_swap().free | |
564 psutil_value = psutil.swap_memory().free | |
565 return self.assertAlmostEqual( | |
566 free_value, psutil_value, delta=TOLERANCE_SYS_MEM) | |
567 | |
568 def test_missing_sin_sout(self): | |
569 with mock.patch('psutil._common.open', create=True) as m: | |
570 with warnings.catch_warnings(record=True) as ws: | |
571 warnings.simplefilter("always") | |
572 ret = psutil.swap_memory() | |
573 assert m.called | |
574 self.assertEqual(len(ws), 1) | |
575 w = ws[0] | |
576 assert w.filename.endswith('psutil/_pslinux.py') | |
577 self.assertIn( | |
578 "'sin' and 'sout' swap memory stats couldn't " | |
579 "be determined", str(w.message)) | |
580 self.assertEqual(ret.sin, 0) | |
581 self.assertEqual(ret.sout, 0) | |
582 | |
583 def test_no_vmstat_mocked(self): | |
584 # see https://github.com/giampaolo/psutil/issues/722 | |
585 with mock_open_exception( | |
586 "/proc/vmstat", | |
587 IOError(errno.ENOENT, 'no such file or directory')) as m: | |
588 with warnings.catch_warnings(record=True) as ws: | |
589 warnings.simplefilter("always") | |
590 ret = psutil.swap_memory() | |
591 assert m.called | |
592 self.assertEqual(len(ws), 1) | |
593 w = ws[0] | |
594 assert w.filename.endswith('psutil/_pslinux.py') | |
595 self.assertIn( | |
596 "'sin' and 'sout' swap memory stats couldn't " | |
597 "be determined and were set to 0", | |
598 str(w.message)) | |
599 self.assertEqual(ret.sin, 0) | |
600 self.assertEqual(ret.sout, 0) | |
601 | |
602 def test_meminfo_against_sysinfo(self): | |
603 # Make sure the content of /proc/meminfo about swap memory | |
604 # matches sysinfo() syscall, see: | |
605 # https://github.com/giampaolo/psutil/issues/1015 | |
606 if not self.meminfo_has_swap_info(): | |
607 return unittest.skip("/proc/meminfo has no swap metrics") | |
608 with mock.patch('psutil._pslinux.cext.linux_sysinfo') as m: | |
609 swap = psutil.swap_memory() | |
610 assert not m.called | |
611 import psutil._psutil_linux as cext | |
612 _, _, _, _, total, free, unit_multiplier = cext.linux_sysinfo() | |
613 total *= unit_multiplier | |
614 free *= unit_multiplier | |
615 self.assertEqual(swap.total, total) | |
616 self.assertAlmostEqual(swap.free, free, delta=TOLERANCE_SYS_MEM) | |
617 | |
618 def test_emulate_meminfo_has_no_metrics(self): | |
619 # Emulate a case where /proc/meminfo provides no swap metrics | |
620 # in which case sysinfo() syscall is supposed to be used | |
621 # as a fallback. | |
622 with mock_open_content("/proc/meminfo", b"") as m: | |
623 psutil.swap_memory() | |
624 assert m.called | |
625 | |
626 | |
627 # ===================================================================== | |
628 # --- system CPU | |
629 # ===================================================================== | |
630 | |
631 | |
632 @unittest.skipIf(not LINUX, "LINUX only") | |
633 class TestSystemCPUTimes(PsutilTestCase): | |
634 | |
635 def test_fields(self): | |
636 fields = psutil.cpu_times()._fields | |
637 kernel_ver = re.findall(r'\d+\.\d+\.\d+', os.uname()[2])[0] | |
638 kernel_ver_info = tuple(map(int, kernel_ver.split('.'))) | |
639 if kernel_ver_info >= (2, 6, 11): | |
640 self.assertIn('steal', fields) | |
641 else: | |
642 self.assertNotIn('steal', fields) | |
643 if kernel_ver_info >= (2, 6, 24): | |
644 self.assertIn('guest', fields) | |
645 else: | |
646 self.assertNotIn('guest', fields) | |
647 if kernel_ver_info >= (3, 2, 0): | |
648 self.assertIn('guest_nice', fields) | |
649 else: | |
650 self.assertNotIn('guest_nice', fields) | |
651 | |
652 | |
653 @unittest.skipIf(not LINUX, "LINUX only") | |
654 class TestSystemCPUCountLogical(PsutilTestCase): | |
655 | |
656 @unittest.skipIf(not os.path.exists("/sys/devices/system/cpu/online"), | |
657 "/sys/devices/system/cpu/online does not exist") | |
658 def test_against_sysdev_cpu_online(self): | |
659 with open("/sys/devices/system/cpu/online") as f: | |
660 value = f.read().strip() | |
661 if "-" in str(value): | |
662 value = int(value.split('-')[1]) + 1 | |
663 self.assertEqual(psutil.cpu_count(), value) | |
664 | |
665 @unittest.skipIf(not os.path.exists("/sys/devices/system/cpu"), | |
666 "/sys/devices/system/cpu does not exist") | |
667 def test_against_sysdev_cpu_num(self): | |
668 ls = os.listdir("/sys/devices/system/cpu") | |
669 count = len([x for x in ls if re.search(r"cpu\d+$", x) is not None]) | |
670 self.assertEqual(psutil.cpu_count(), count) | |
671 | |
672 @unittest.skipIf(not which("nproc"), "nproc utility not available") | |
673 def test_against_nproc(self): | |
674 num = int(sh("nproc --all")) | |
675 self.assertEqual(psutil.cpu_count(logical=True), num) | |
676 | |
677 @unittest.skipIf(not which("lscpu"), "lscpu utility not available") | |
678 def test_against_lscpu(self): | |
679 out = sh("lscpu -p") | |
680 num = len([x for x in out.split('\n') if not x.startswith('#')]) | |
681 self.assertEqual(psutil.cpu_count(logical=True), num) | |
682 | |
683 def test_emulate_fallbacks(self): | |
684 import psutil._pslinux | |
685 original = psutil._pslinux.cpu_count_logical() | |
686 # Here we want to mock os.sysconf("SC_NPROCESSORS_ONLN") in | |
687 # order to cause the parsing of /proc/cpuinfo and /proc/stat. | |
688 with mock.patch( | |
689 'psutil._pslinux.os.sysconf', side_effect=ValueError) as m: | |
690 self.assertEqual(psutil._pslinux.cpu_count_logical(), original) | |
691 assert m.called | |
692 | |
693 # Let's have open() return emtpy data and make sure None is | |
694 # returned ('cause we mimick os.cpu_count()). | |
695 with mock.patch('psutil._common.open', create=True) as m: | |
696 self.assertIsNone(psutil._pslinux.cpu_count_logical()) | |
697 self.assertEqual(m.call_count, 2) | |
698 # /proc/stat should be the last one | |
699 self.assertEqual(m.call_args[0][0], '/proc/stat') | |
700 | |
701 # Let's push this a bit further and make sure /proc/cpuinfo | |
702 # parsing works as expected. | |
703 with open('/proc/cpuinfo', 'rb') as f: | |
704 cpuinfo_data = f.read() | |
705 fake_file = io.BytesIO(cpuinfo_data) | |
706 with mock.patch('psutil._common.open', | |
707 return_value=fake_file, create=True) as m: | |
708 self.assertEqual(psutil._pslinux.cpu_count_logical(), original) | |
709 | |
710 # Finally, let's make /proc/cpuinfo return meaningless data; | |
711 # this way we'll fall back on relying on /proc/stat | |
712 with mock_open_content('/proc/cpuinfo', b"") as m: | |
713 self.assertEqual(psutil._pslinux.cpu_count_logical(), original) | |
714 m.called | |
715 | |
716 | |
717 @unittest.skipIf(not LINUX, "LINUX only") | |
718 class TestSystemCPUCountPhysical(PsutilTestCase): | |
719 | |
720 @unittest.skipIf(not which("lscpu"), "lscpu utility not available") | |
721 def test_against_lscpu(self): | |
722 out = sh("lscpu -p") | |
723 core_ids = set() | |
724 for line in out.split('\n'): | |
725 if not line.startswith('#'): | |
726 fields = line.split(',') | |
727 core_ids.add(fields[1]) | |
728 self.assertEqual(psutil.cpu_count(logical=False), len(core_ids)) | |
729 | |
730 def test_method_2(self): | |
731 meth_1 = psutil._pslinux.cpu_count_physical() | |
732 with mock.patch('glob.glob', return_value=[]) as m: | |
733 meth_2 = psutil._pslinux.cpu_count_physical() | |
734 assert m.called | |
735 if meth_1 is not None: | |
736 self.assertEqual(meth_1, meth_2) | |
737 | |
738 def test_emulate_none(self): | |
739 with mock.patch('glob.glob', return_value=[]) as m1: | |
740 with mock.patch('psutil._common.open', create=True) as m2: | |
741 self.assertIsNone(psutil._pslinux.cpu_count_physical()) | |
742 assert m1.called | |
743 assert m2.called | |
744 | |
745 | |
746 @unittest.skipIf(not LINUX, "LINUX only") | |
747 class TestSystemCPUFrequency(PsutilTestCase): | |
748 | |
749 @unittest.skipIf(not HAS_CPU_FREQ, "not supported") | |
750 def test_emulate_use_second_file(self): | |
751 # https://github.com/giampaolo/psutil/issues/981 | |
752 def path_exists_mock(path): | |
753 if path.startswith("/sys/devices/system/cpu/cpufreq/policy"): | |
754 return False | |
755 else: | |
756 return orig_exists(path) | |
757 | |
758 orig_exists = os.path.exists | |
759 with mock.patch("os.path.exists", side_effect=path_exists_mock, | |
760 create=True): | |
761 assert psutil.cpu_freq() | |
762 | |
763 @unittest.skipIf(not HAS_CPU_FREQ, "not supported") | |
764 def test_emulate_use_cpuinfo(self): | |
765 # Emulate a case where /sys/devices/system/cpu/cpufreq* does not | |
766 # exist and /proc/cpuinfo is used instead. | |
767 def path_exists_mock(path): | |
768 if path.startswith('/sys/devices/system/cpu/'): | |
769 return False | |
770 else: | |
771 if path == "/proc/cpuinfo": | |
772 flags.append(None) | |
773 return os_path_exists(path) | |
774 | |
775 flags = [] | |
776 os_path_exists = os.path.exists | |
777 try: | |
778 with mock.patch("os.path.exists", side_effect=path_exists_mock): | |
779 reload_module(psutil._pslinux) | |
780 ret = psutil.cpu_freq() | |
781 assert ret | |
782 assert flags | |
783 self.assertEqual(ret.max, 0.0) | |
784 self.assertEqual(ret.min, 0.0) | |
785 for freq in psutil.cpu_freq(percpu=True): | |
786 self.assertEqual(ret.max, 0.0) | |
787 self.assertEqual(ret.min, 0.0) | |
788 finally: | |
789 reload_module(psutil._pslinux) | |
790 reload_module(psutil) | |
791 | |
792 @unittest.skipIf(not HAS_CPU_FREQ, "not supported") | |
793 def test_emulate_data(self): | |
794 def open_mock(name, *args, **kwargs): | |
795 if (name.endswith('/scaling_cur_freq') and | |
796 name.startswith("/sys/devices/system/cpu/cpufreq/policy")): | |
797 return io.BytesIO(b"500000") | |
798 elif (name.endswith('/scaling_min_freq') and | |
799 name.startswith("/sys/devices/system/cpu/cpufreq/policy")): | |
800 return io.BytesIO(b"600000") | |
801 elif (name.endswith('/scaling_max_freq') and | |
802 name.startswith("/sys/devices/system/cpu/cpufreq/policy")): | |
803 return io.BytesIO(b"700000") | |
804 elif name == '/proc/cpuinfo': | |
805 return io.BytesIO(b"cpu MHz : 500") | |
806 else: | |
807 return orig_open(name, *args, **kwargs) | |
808 | |
809 orig_open = open | |
810 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
811 with mock.patch(patch_point, side_effect=open_mock): | |
812 with mock.patch( | |
813 'os.path.exists', return_value=True): | |
814 freq = psutil.cpu_freq() | |
815 self.assertEqual(freq.current, 500.0) | |
816 # when /proc/cpuinfo is used min and max frequencies are not | |
817 # available and are set to 0. | |
818 if freq.min != 0.0: | |
819 self.assertEqual(freq.min, 600.0) | |
820 if freq.max != 0.0: | |
821 self.assertEqual(freq.max, 700.0) | |
822 | |
823 @unittest.skipIf(not HAS_CPU_FREQ, "not supported") | |
824 def test_emulate_multi_cpu(self): | |
825 def open_mock(name, *args, **kwargs): | |
826 n = name | |
827 if (n.endswith('/scaling_cur_freq') and | |
828 n.startswith("/sys/devices/system/cpu/cpufreq/policy0")): | |
829 return io.BytesIO(b"100000") | |
830 elif (n.endswith('/scaling_min_freq') and | |
831 n.startswith("/sys/devices/system/cpu/cpufreq/policy0")): | |
832 return io.BytesIO(b"200000") | |
833 elif (n.endswith('/scaling_max_freq') and | |
834 n.startswith("/sys/devices/system/cpu/cpufreq/policy0")): | |
835 return io.BytesIO(b"300000") | |
836 elif (n.endswith('/scaling_cur_freq') and | |
837 n.startswith("/sys/devices/system/cpu/cpufreq/policy1")): | |
838 return io.BytesIO(b"400000") | |
839 elif (n.endswith('/scaling_min_freq') and | |
840 n.startswith("/sys/devices/system/cpu/cpufreq/policy1")): | |
841 return io.BytesIO(b"500000") | |
842 elif (n.endswith('/scaling_max_freq') and | |
843 n.startswith("/sys/devices/system/cpu/cpufreq/policy1")): | |
844 return io.BytesIO(b"600000") | |
845 elif name == '/proc/cpuinfo': | |
846 return io.BytesIO(b"cpu MHz : 100\n" | |
847 b"cpu MHz : 400") | |
848 else: | |
849 return orig_open(name, *args, **kwargs) | |
850 | |
851 orig_open = open | |
852 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
853 with mock.patch(patch_point, side_effect=open_mock): | |
854 with mock.patch('os.path.exists', return_value=True): | |
855 with mock.patch('psutil._pslinux.cpu_count_logical', | |
856 return_value=2): | |
857 freq = psutil.cpu_freq(percpu=True) | |
858 self.assertEqual(freq[0].current, 100.0) | |
859 if freq[0].min != 0.0: | |
860 self.assertEqual(freq[0].min, 200.0) | |
861 if freq[0].max != 0.0: | |
862 self.assertEqual(freq[0].max, 300.0) | |
863 self.assertEqual(freq[1].current, 400.0) | |
864 if freq[1].min != 0.0: | |
865 self.assertEqual(freq[1].min, 500.0) | |
866 if freq[1].max != 0.0: | |
867 self.assertEqual(freq[1].max, 600.0) | |
868 | |
869 @unittest.skipIf(not HAS_CPU_FREQ, "not supported") | |
870 def test_emulate_no_scaling_cur_freq_file(self): | |
871 # See: https://github.com/giampaolo/psutil/issues/1071 | |
872 def open_mock(name, *args, **kwargs): | |
873 if name.endswith('/scaling_cur_freq'): | |
874 raise IOError(errno.ENOENT, "") | |
875 elif name.endswith('/cpuinfo_cur_freq'): | |
876 return io.BytesIO(b"200000") | |
877 elif name == '/proc/cpuinfo': | |
878 return io.BytesIO(b"cpu MHz : 200") | |
879 else: | |
880 return orig_open(name, *args, **kwargs) | |
881 | |
882 orig_open = open | |
883 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
884 with mock.patch(patch_point, side_effect=open_mock): | |
885 with mock.patch('os.path.exists', return_value=True): | |
886 with mock.patch('psutil._pslinux.cpu_count_logical', | |
887 return_value=1): | |
888 freq = psutil.cpu_freq() | |
889 self.assertEqual(freq.current, 200) | |
890 | |
891 | |
892 @unittest.skipIf(not LINUX, "LINUX only") | |
893 class TestSystemCPUStats(PsutilTestCase): | |
894 | |
895 def test_ctx_switches(self): | |
896 vmstat_value = vmstat("context switches") | |
897 psutil_value = psutil.cpu_stats().ctx_switches | |
898 self.assertAlmostEqual(vmstat_value, psutil_value, delta=500) | |
899 | |
900 def test_interrupts(self): | |
901 vmstat_value = vmstat("interrupts") | |
902 psutil_value = psutil.cpu_stats().interrupts | |
903 self.assertAlmostEqual(vmstat_value, psutil_value, delta=500) | |
904 | |
905 | |
906 @unittest.skipIf(not LINUX, "LINUX only") | |
907 class TestLoadAvg(PsutilTestCase): | |
908 | |
909 @unittest.skipIf(not HAS_GETLOADAVG, "not supported") | |
910 def test_getloadavg(self): | |
911 psutil_value = psutil.getloadavg() | |
912 with open("/proc/loadavg", "r") as f: | |
913 proc_value = f.read().split() | |
914 | |
915 self.assertAlmostEqual(float(proc_value[0]), psutil_value[0], delta=1) | |
916 self.assertAlmostEqual(float(proc_value[1]), psutil_value[1], delta=1) | |
917 self.assertAlmostEqual(float(proc_value[2]), psutil_value[2], delta=1) | |
918 | |
919 | |
920 # ===================================================================== | |
921 # --- system network | |
922 # ===================================================================== | |
923 | |
924 | |
925 @unittest.skipIf(not LINUX, "LINUX only") | |
926 class TestSystemNetIfAddrs(PsutilTestCase): | |
927 | |
928 def test_ips(self): | |
929 for name, addrs in psutil.net_if_addrs().items(): | |
930 for addr in addrs: | |
931 if addr.family == psutil.AF_LINK: | |
932 self.assertEqual(addr.address, get_mac_address(name)) | |
933 elif addr.family == socket.AF_INET: | |
934 self.assertEqual(addr.address, get_ipv4_address(name)) | |
935 self.assertEqual(addr.netmask, get_ipv4_netmask(name)) | |
936 if addr.broadcast is not None: | |
937 self.assertEqual(addr.broadcast, | |
938 get_ipv4_broadcast(name)) | |
939 else: | |
940 self.assertEqual(get_ipv4_broadcast(name), '0.0.0.0') | |
941 elif addr.family == socket.AF_INET6: | |
942 # IPv6 addresses can have a percent symbol at the end. | |
943 # E.g. these 2 are equivalent: | |
944 # "fe80::1ff:fe23:4567:890a" | |
945 # "fe80::1ff:fe23:4567:890a%eth0" | |
946 # That is the "zone id" portion, which usually is the name | |
947 # of the network interface. | |
948 address = addr.address.split('%')[0] | |
949 self.assertEqual(address, get_ipv6_address(name)) | |
950 | |
951 # XXX - not reliable when having virtual NICs installed by Docker. | |
952 # @unittest.skipIf(not which('ip'), "'ip' utility not available") | |
953 # def test_net_if_names(self): | |
954 # out = sh("ip addr").strip() | |
955 # nics = [x for x in psutil.net_if_addrs().keys() if ':' not in x] | |
956 # found = 0 | |
957 # for line in out.split('\n'): | |
958 # line = line.strip() | |
959 # if re.search(r"^\d+:", line): | |
960 # found += 1 | |
961 # name = line.split(':')[1].strip() | |
962 # self.assertIn(name, nics) | |
963 # self.assertEqual(len(nics), found, msg="%s\n---\n%s" % ( | |
964 # pprint.pformat(nics), out)) | |
965 | |
966 | |
967 @unittest.skipIf(not LINUX, "LINUX only") | |
968 class TestSystemNetIfStats(PsutilTestCase): | |
969 | |
970 def test_against_ifconfig(self): | |
971 for name, stats in psutil.net_if_stats().items(): | |
972 try: | |
973 out = sh("ifconfig %s" % name) | |
974 except RuntimeError: | |
975 pass | |
976 else: | |
977 self.assertEqual(stats.isup, 'RUNNING' in out, msg=out) | |
978 self.assertEqual(stats.mtu, | |
979 int(re.findall(r'(?i)MTU[: ](\d+)', out)[0])) | |
980 | |
981 def test_mtu(self): | |
982 for name, stats in psutil.net_if_stats().items(): | |
983 with open("/sys/class/net/%s/mtu" % name, "rt") as f: | |
984 self.assertEqual(stats.mtu, int(f.read().strip())) | |
985 | |
986 | |
987 @unittest.skipIf(not LINUX, "LINUX only") | |
988 class TestSystemNetIOCounters(PsutilTestCase): | |
989 | |
990 @retry_on_failure() | |
991 def test_against_ifconfig(self): | |
992 def ifconfig(nic): | |
993 ret = {} | |
994 out = sh("ifconfig %s" % name) | |
995 ret['packets_recv'] = int( | |
996 re.findall(r'RX packets[: ](\d+)', out)[0]) | |
997 ret['packets_sent'] = int( | |
998 re.findall(r'TX packets[: ](\d+)', out)[0]) | |
999 ret['errin'] = int(re.findall(r'errors[: ](\d+)', out)[0]) | |
1000 ret['errout'] = int(re.findall(r'errors[: ](\d+)', out)[1]) | |
1001 ret['dropin'] = int(re.findall(r'dropped[: ](\d+)', out)[0]) | |
1002 ret['dropout'] = int(re.findall(r'dropped[: ](\d+)', out)[1]) | |
1003 ret['bytes_recv'] = int( | |
1004 re.findall(r'RX (?:packets \d+ +)?bytes[: ](\d+)', out)[0]) | |
1005 ret['bytes_sent'] = int( | |
1006 re.findall(r'TX (?:packets \d+ +)?bytes[: ](\d+)', out)[0]) | |
1007 return ret | |
1008 | |
1009 nio = psutil.net_io_counters(pernic=True, nowrap=False) | |
1010 for name, stats in nio.items(): | |
1011 try: | |
1012 ifconfig_ret = ifconfig(name) | |
1013 except RuntimeError: | |
1014 continue | |
1015 self.assertAlmostEqual( | |
1016 stats.bytes_recv, ifconfig_ret['bytes_recv'], delta=1024 * 5) | |
1017 self.assertAlmostEqual( | |
1018 stats.bytes_sent, ifconfig_ret['bytes_sent'], delta=1024 * 5) | |
1019 self.assertAlmostEqual( | |
1020 stats.packets_recv, ifconfig_ret['packets_recv'], delta=1024) | |
1021 self.assertAlmostEqual( | |
1022 stats.packets_sent, ifconfig_ret['packets_sent'], delta=1024) | |
1023 self.assertAlmostEqual( | |
1024 stats.errin, ifconfig_ret['errin'], delta=10) | |
1025 self.assertAlmostEqual( | |
1026 stats.errout, ifconfig_ret['errout'], delta=10) | |
1027 self.assertAlmostEqual( | |
1028 stats.dropin, ifconfig_ret['dropin'], delta=10) | |
1029 self.assertAlmostEqual( | |
1030 stats.dropout, ifconfig_ret['dropout'], delta=10) | |
1031 | |
1032 | |
1033 @unittest.skipIf(not LINUX, "LINUX only") | |
1034 class TestSystemNetConnections(PsutilTestCase): | |
1035 | |
1036 @mock.patch('psutil._pslinux.socket.inet_ntop', side_effect=ValueError) | |
1037 @mock.patch('psutil._pslinux.supports_ipv6', return_value=False) | |
1038 def test_emulate_ipv6_unsupported(self, supports_ipv6, inet_ntop): | |
1039 # see: https://github.com/giampaolo/psutil/issues/623 | |
1040 try: | |
1041 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) | |
1042 self.addCleanup(s.close) | |
1043 s.bind(("::1", 0)) | |
1044 except socket.error: | |
1045 pass | |
1046 psutil.net_connections(kind='inet6') | |
1047 | |
1048 def test_emulate_unix(self): | |
1049 with mock_open_content( | |
1050 '/proc/net/unix', | |
1051 textwrap.dedent("""\ | |
1052 0: 00000003 000 000 0001 03 462170 @/tmp/dbus-Qw2hMPIU3n | |
1053 0: 00000003 000 000 0001 03 35010 @/tmp/dbus-tB2X8h69BQ | |
1054 0: 00000003 000 000 0001 03 34424 @/tmp/dbus-cHy80Y8O | |
1055 000000000000000000000000000000000000000000000000000000 | |
1056 """)) as m: | |
1057 psutil.net_connections(kind='unix') | |
1058 assert m.called | |
1059 | |
1060 | |
1061 # ===================================================================== | |
1062 # --- system disks | |
1063 # ===================================================================== | |
1064 | |
1065 | |
1066 @unittest.skipIf(not LINUX, "LINUX only") | |
1067 class TestSystemDiskPartitions(PsutilTestCase): | |
1068 | |
1069 @unittest.skipIf(not hasattr(os, 'statvfs'), "os.statvfs() not available") | |
1070 @skip_on_not_implemented() | |
1071 def test_against_df(self): | |
1072 # test psutil.disk_usage() and psutil.disk_partitions() | |
1073 # against "df -a" | |
1074 def df(path): | |
1075 out = sh('df -P -B 1 "%s"' % path).strip() | |
1076 lines = out.split('\n') | |
1077 lines.pop(0) | |
1078 line = lines.pop(0) | |
1079 dev, total, used, free = line.split()[:4] | |
1080 if dev == 'none': | |
1081 dev = '' | |
1082 total, used, free = int(total), int(used), int(free) | |
1083 return dev, total, used, free | |
1084 | |
1085 for part in psutil.disk_partitions(all=False): | |
1086 usage = psutil.disk_usage(part.mountpoint) | |
1087 dev, total, used, free = df(part.mountpoint) | |
1088 self.assertEqual(usage.total, total) | |
1089 self.assertAlmostEqual(usage.free, free, | |
1090 delta=TOLERANCE_DISK_USAGE) | |
1091 self.assertAlmostEqual(usage.used, used, | |
1092 delta=TOLERANCE_DISK_USAGE) | |
1093 | |
1094 def test_zfs_fs(self): | |
1095 # Test that ZFS partitions are returned. | |
1096 with open("/proc/filesystems", "r") as f: | |
1097 data = f.read() | |
1098 if 'zfs' in data: | |
1099 for part in psutil.disk_partitions(): | |
1100 if part.fstype == 'zfs': | |
1101 break | |
1102 else: | |
1103 self.fail("couldn't find any ZFS partition") | |
1104 else: | |
1105 # No ZFS partitions on this system. Let's fake one. | |
1106 fake_file = io.StringIO(u("nodev\tzfs\n")) | |
1107 with mock.patch('psutil._common.open', | |
1108 return_value=fake_file, create=True) as m1: | |
1109 with mock.patch( | |
1110 'psutil._pslinux.cext.disk_partitions', | |
1111 return_value=[('/dev/sdb3', '/', 'zfs', 'rw')]) as m2: | |
1112 ret = psutil.disk_partitions() | |
1113 assert m1.called | |
1114 assert m2.called | |
1115 assert ret | |
1116 self.assertEqual(ret[0].fstype, 'zfs') | |
1117 | |
1118 def test_emulate_realpath_fail(self): | |
1119 # See: https://github.com/giampaolo/psutil/issues/1307 | |
1120 try: | |
1121 with mock.patch('os.path.realpath', | |
1122 return_value='/non/existent') as m: | |
1123 with self.assertRaises(FileNotFoundError): | |
1124 psutil.disk_partitions() | |
1125 assert m.called | |
1126 finally: | |
1127 psutil.PROCFS_PATH = "/proc" | |
1128 | |
1129 | |
1130 @unittest.skipIf(not LINUX, "LINUX only") | |
1131 class TestSystemDiskIoCounters(PsutilTestCase): | |
1132 | |
1133 def test_emulate_kernel_2_4(self): | |
1134 # Tests /proc/diskstats parsing format for 2.4 kernels, see: | |
1135 # https://github.com/giampaolo/psutil/issues/767 | |
1136 with mock_open_content( | |
1137 '/proc/diskstats', | |
1138 " 3 0 1 hda 2 3 4 5 6 7 8 9 10 11 12"): | |
1139 with mock.patch('psutil._pslinux.is_storage_device', | |
1140 return_value=True): | |
1141 ret = psutil.disk_io_counters(nowrap=False) | |
1142 self.assertEqual(ret.read_count, 1) | |
1143 self.assertEqual(ret.read_merged_count, 2) | |
1144 self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE) | |
1145 self.assertEqual(ret.read_time, 4) | |
1146 self.assertEqual(ret.write_count, 5) | |
1147 self.assertEqual(ret.write_merged_count, 6) | |
1148 self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE) | |
1149 self.assertEqual(ret.write_time, 8) | |
1150 self.assertEqual(ret.busy_time, 10) | |
1151 | |
1152 def test_emulate_kernel_2_6_full(self): | |
1153 # Tests /proc/diskstats parsing format for 2.6 kernels, | |
1154 # lines reporting all metrics: | |
1155 # https://github.com/giampaolo/psutil/issues/767 | |
1156 with mock_open_content( | |
1157 '/proc/diskstats', | |
1158 " 3 0 hda 1 2 3 4 5 6 7 8 9 10 11"): | |
1159 with mock.patch('psutil._pslinux.is_storage_device', | |
1160 return_value=True): | |
1161 ret = psutil.disk_io_counters(nowrap=False) | |
1162 self.assertEqual(ret.read_count, 1) | |
1163 self.assertEqual(ret.read_merged_count, 2) | |
1164 self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE) | |
1165 self.assertEqual(ret.read_time, 4) | |
1166 self.assertEqual(ret.write_count, 5) | |
1167 self.assertEqual(ret.write_merged_count, 6) | |
1168 self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE) | |
1169 self.assertEqual(ret.write_time, 8) | |
1170 self.assertEqual(ret.busy_time, 10) | |
1171 | |
1172 def test_emulate_kernel_2_6_limited(self): | |
1173 # Tests /proc/diskstats parsing format for 2.6 kernels, | |
1174 # where one line of /proc/partitions return a limited | |
1175 # amount of metrics when it bumps into a partition | |
1176 # (instead of a disk). See: | |
1177 # https://github.com/giampaolo/psutil/issues/767 | |
1178 with mock_open_content( | |
1179 '/proc/diskstats', | |
1180 " 3 1 hda 1 2 3 4"): | |
1181 with mock.patch('psutil._pslinux.is_storage_device', | |
1182 return_value=True): | |
1183 ret = psutil.disk_io_counters(nowrap=False) | |
1184 self.assertEqual(ret.read_count, 1) | |
1185 self.assertEqual(ret.read_bytes, 2 * SECTOR_SIZE) | |
1186 self.assertEqual(ret.write_count, 3) | |
1187 self.assertEqual(ret.write_bytes, 4 * SECTOR_SIZE) | |
1188 | |
1189 self.assertEqual(ret.read_merged_count, 0) | |
1190 self.assertEqual(ret.read_time, 0) | |
1191 self.assertEqual(ret.write_merged_count, 0) | |
1192 self.assertEqual(ret.write_time, 0) | |
1193 self.assertEqual(ret.busy_time, 0) | |
1194 | |
1195 def test_emulate_include_partitions(self): | |
1196 # Make sure that when perdisk=True disk partitions are returned, | |
1197 # see: | |
1198 # https://github.com/giampaolo/psutil/pull/1313#issuecomment-408626842 | |
1199 with mock_open_content( | |
1200 '/proc/diskstats', | |
1201 textwrap.dedent("""\ | |
1202 3 0 nvme0n1 1 2 3 4 5 6 7 8 9 10 11 | |
1203 3 0 nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11 | |
1204 """)): | |
1205 with mock.patch('psutil._pslinux.is_storage_device', | |
1206 return_value=False): | |
1207 ret = psutil.disk_io_counters(perdisk=True, nowrap=False) | |
1208 self.assertEqual(len(ret), 2) | |
1209 self.assertEqual(ret['nvme0n1'].read_count, 1) | |
1210 self.assertEqual(ret['nvme0n1p1'].read_count, 1) | |
1211 self.assertEqual(ret['nvme0n1'].write_count, 5) | |
1212 self.assertEqual(ret['nvme0n1p1'].write_count, 5) | |
1213 | |
1214 def test_emulate_exclude_partitions(self): | |
1215 # Make sure that when perdisk=False partitions (e.g. 'sda1', | |
1216 # 'nvme0n1p1') are skipped and not included in the total count. | |
1217 # https://github.com/giampaolo/psutil/pull/1313#issuecomment-408626842 | |
1218 with mock_open_content( | |
1219 '/proc/diskstats', | |
1220 textwrap.dedent("""\ | |
1221 3 0 nvme0n1 1 2 3 4 5 6 7 8 9 10 11 | |
1222 3 0 nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11 | |
1223 """)): | |
1224 with mock.patch('psutil._pslinux.is_storage_device', | |
1225 return_value=False): | |
1226 ret = psutil.disk_io_counters(perdisk=False, nowrap=False) | |
1227 self.assertIsNone(ret) | |
1228 | |
1229 # | |
1230 def is_storage_device(name): | |
1231 return name == 'nvme0n1' | |
1232 | |
1233 with mock_open_content( | |
1234 '/proc/diskstats', | |
1235 textwrap.dedent("""\ | |
1236 3 0 nvme0n1 1 2 3 4 5 6 7 8 9 10 11 | |
1237 3 0 nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11 | |
1238 """)): | |
1239 with mock.patch('psutil._pslinux.is_storage_device', | |
1240 create=True, side_effect=is_storage_device): | |
1241 ret = psutil.disk_io_counters(perdisk=False, nowrap=False) | |
1242 self.assertEqual(ret.read_count, 1) | |
1243 self.assertEqual(ret.write_count, 5) | |
1244 | |
1245 def test_emulate_use_sysfs(self): | |
1246 def exists(path): | |
1247 if path == '/proc/diskstats': | |
1248 return False | |
1249 return True | |
1250 | |
1251 wprocfs = psutil.disk_io_counters(perdisk=True) | |
1252 with mock.patch('psutil._pslinux.os.path.exists', | |
1253 create=True, side_effect=exists): | |
1254 wsysfs = psutil.disk_io_counters(perdisk=True) | |
1255 self.assertEqual(len(wprocfs), len(wsysfs)) | |
1256 | |
1257 def test_emulate_not_impl(self): | |
1258 def exists(path): | |
1259 return False | |
1260 | |
1261 with mock.patch('psutil._pslinux.os.path.exists', | |
1262 create=True, side_effect=exists): | |
1263 self.assertRaises(NotImplementedError, psutil.disk_io_counters) | |
1264 | |
1265 | |
1266 # ===================================================================== | |
1267 # --- misc | |
1268 # ===================================================================== | |
1269 | |
1270 | |
1271 @unittest.skipIf(not LINUX, "LINUX only") | |
1272 class TestMisc(PsutilTestCase): | |
1273 | |
1274 def test_boot_time(self): | |
1275 vmstat_value = vmstat('boot time') | |
1276 psutil_value = psutil.boot_time() | |
1277 self.assertEqual(int(vmstat_value), int(psutil_value)) | |
1278 | |
1279 def test_no_procfs_on_import(self): | |
1280 my_procfs = self.get_testfn() | |
1281 os.mkdir(my_procfs) | |
1282 | |
1283 with open(os.path.join(my_procfs, 'stat'), 'w') as f: | |
1284 f.write('cpu 0 0 0 0 0 0 0 0 0 0\n') | |
1285 f.write('cpu0 0 0 0 0 0 0 0 0 0 0\n') | |
1286 f.write('cpu1 0 0 0 0 0 0 0 0 0 0\n') | |
1287 | |
1288 try: | |
1289 orig_open = open | |
1290 | |
1291 def open_mock(name, *args, **kwargs): | |
1292 if name.startswith('/proc'): | |
1293 raise IOError(errno.ENOENT, 'rejecting access for test') | |
1294 return orig_open(name, *args, **kwargs) | |
1295 | |
1296 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
1297 with mock.patch(patch_point, side_effect=open_mock): | |
1298 reload_module(psutil) | |
1299 | |
1300 self.assertRaises(IOError, psutil.cpu_times) | |
1301 self.assertRaises(IOError, psutil.cpu_times, percpu=True) | |
1302 self.assertRaises(IOError, psutil.cpu_percent) | |
1303 self.assertRaises(IOError, psutil.cpu_percent, percpu=True) | |
1304 self.assertRaises(IOError, psutil.cpu_times_percent) | |
1305 self.assertRaises( | |
1306 IOError, psutil.cpu_times_percent, percpu=True) | |
1307 | |
1308 psutil.PROCFS_PATH = my_procfs | |
1309 | |
1310 self.assertEqual(psutil.cpu_percent(), 0) | |
1311 self.assertEqual(sum(psutil.cpu_times_percent()), 0) | |
1312 | |
1313 # since we don't know the number of CPUs at import time, | |
1314 # we awkwardly say there are none until the second call | |
1315 per_cpu_percent = psutil.cpu_percent(percpu=True) | |
1316 self.assertEqual(sum(per_cpu_percent), 0) | |
1317 | |
1318 # ditto awkward length | |
1319 per_cpu_times_percent = psutil.cpu_times_percent(percpu=True) | |
1320 self.assertEqual(sum(map(sum, per_cpu_times_percent)), 0) | |
1321 | |
1322 # much user, very busy | |
1323 with open(os.path.join(my_procfs, 'stat'), 'w') as f: | |
1324 f.write('cpu 1 0 0 0 0 0 0 0 0 0\n') | |
1325 f.write('cpu0 1 0 0 0 0 0 0 0 0 0\n') | |
1326 f.write('cpu1 1 0 0 0 0 0 0 0 0 0\n') | |
1327 | |
1328 self.assertNotEqual(psutil.cpu_percent(), 0) | |
1329 self.assertNotEqual( | |
1330 sum(psutil.cpu_percent(percpu=True)), 0) | |
1331 self.assertNotEqual(sum(psutil.cpu_times_percent()), 0) | |
1332 self.assertNotEqual( | |
1333 sum(map(sum, psutil.cpu_times_percent(percpu=True))), 0) | |
1334 finally: | |
1335 shutil.rmtree(my_procfs) | |
1336 reload_module(psutil) | |
1337 | |
1338 self.assertEqual(psutil.PROCFS_PATH, '/proc') | |
1339 | |
1340 def test_cpu_steal_decrease(self): | |
1341 # Test cumulative cpu stats decrease. We should ignore this. | |
1342 # See issue #1210. | |
1343 with mock_open_content( | |
1344 "/proc/stat", | |
1345 textwrap.dedent("""\ | |
1346 cpu 0 0 0 0 0 0 0 1 0 0 | |
1347 cpu0 0 0 0 0 0 0 0 1 0 0 | |
1348 cpu1 0 0 0 0 0 0 0 1 0 0 | |
1349 """).encode()) as m: | |
1350 # first call to "percent" functions should read the new stat file | |
1351 # and compare to the "real" file read at import time - so the | |
1352 # values are meaningless | |
1353 psutil.cpu_percent() | |
1354 assert m.called | |
1355 psutil.cpu_percent(percpu=True) | |
1356 psutil.cpu_times_percent() | |
1357 psutil.cpu_times_percent(percpu=True) | |
1358 | |
1359 with mock_open_content( | |
1360 "/proc/stat", | |
1361 textwrap.dedent("""\ | |
1362 cpu 1 0 0 0 0 0 0 0 0 0 | |
1363 cpu0 1 0 0 0 0 0 0 0 0 0 | |
1364 cpu1 1 0 0 0 0 0 0 0 0 0 | |
1365 """).encode()) as m: | |
1366 # Increase "user" while steal goes "backwards" to zero. | |
1367 cpu_percent = psutil.cpu_percent() | |
1368 assert m.called | |
1369 cpu_percent_percpu = psutil.cpu_percent(percpu=True) | |
1370 cpu_times_percent = psutil.cpu_times_percent() | |
1371 cpu_times_percent_percpu = psutil.cpu_times_percent(percpu=True) | |
1372 self.assertNotEqual(cpu_percent, 0) | |
1373 self.assertNotEqual(sum(cpu_percent_percpu), 0) | |
1374 self.assertNotEqual(sum(cpu_times_percent), 0) | |
1375 self.assertNotEqual(sum(cpu_times_percent), 100.0) | |
1376 self.assertNotEqual(sum(map(sum, cpu_times_percent_percpu)), 0) | |
1377 self.assertNotEqual(sum(map(sum, cpu_times_percent_percpu)), 100.0) | |
1378 self.assertEqual(cpu_times_percent.steal, 0) | |
1379 self.assertNotEqual(cpu_times_percent.user, 0) | |
1380 | |
1381 def test_boot_time_mocked(self): | |
1382 with mock.patch('psutil._common.open', create=True) as m: | |
1383 self.assertRaises( | |
1384 RuntimeError, | |
1385 psutil._pslinux.boot_time) | |
1386 assert m.called | |
1387 | |
1388 def test_users_mocked(self): | |
1389 # Make sure ':0' and ':0.0' (returned by C ext) are converted | |
1390 # to 'localhost'. | |
1391 with mock.patch('psutil._pslinux.cext.users', | |
1392 return_value=[('giampaolo', 'pts/2', ':0', | |
1393 1436573184.0, True, 2)]) as m: | |
1394 self.assertEqual(psutil.users()[0].host, 'localhost') | |
1395 assert m.called | |
1396 with mock.patch('psutil._pslinux.cext.users', | |
1397 return_value=[('giampaolo', 'pts/2', ':0.0', | |
1398 1436573184.0, True, 2)]) as m: | |
1399 self.assertEqual(psutil.users()[0].host, 'localhost') | |
1400 assert m.called | |
1401 # ...otherwise it should be returned as-is | |
1402 with mock.patch('psutil._pslinux.cext.users', | |
1403 return_value=[('giampaolo', 'pts/2', 'foo', | |
1404 1436573184.0, True, 2)]) as m: | |
1405 self.assertEqual(psutil.users()[0].host, 'foo') | |
1406 assert m.called | |
1407 | |
1408 def test_procfs_path(self): | |
1409 tdir = self.get_testfn() | |
1410 os.mkdir(tdir) | |
1411 try: | |
1412 psutil.PROCFS_PATH = tdir | |
1413 self.assertRaises(IOError, psutil.virtual_memory) | |
1414 self.assertRaises(IOError, psutil.cpu_times) | |
1415 self.assertRaises(IOError, psutil.cpu_times, percpu=True) | |
1416 self.assertRaises(IOError, psutil.boot_time) | |
1417 # self.assertRaises(IOError, psutil.pids) | |
1418 self.assertRaises(IOError, psutil.net_connections) | |
1419 self.assertRaises(IOError, psutil.net_io_counters) | |
1420 self.assertRaises(IOError, psutil.net_if_stats) | |
1421 # self.assertRaises(IOError, psutil.disk_io_counters) | |
1422 self.assertRaises(IOError, psutil.disk_partitions) | |
1423 self.assertRaises(psutil.NoSuchProcess, psutil.Process) | |
1424 finally: | |
1425 psutil.PROCFS_PATH = "/proc" | |
1426 | |
1427 @retry_on_failure() | |
1428 def test_issue_687(self): | |
1429 # In case of thread ID: | |
1430 # - pid_exists() is supposed to return False | |
1431 # - Process(tid) is supposed to work | |
1432 # - pids() should not return the TID | |
1433 # See: https://github.com/giampaolo/psutil/issues/687 | |
1434 t = ThreadTask() | |
1435 t.start() | |
1436 try: | |
1437 p = psutil.Process() | |
1438 threads = p.threads() | |
1439 self.assertEqual(len(threads), 2) | |
1440 tid = sorted(threads, key=lambda x: x.id)[1].id | |
1441 self.assertNotEqual(p.pid, tid) | |
1442 pt = psutil.Process(tid) | |
1443 pt.as_dict() | |
1444 self.assertNotIn(tid, psutil.pids()) | |
1445 finally: | |
1446 t.stop() | |
1447 | |
1448 def test_pid_exists_no_proc_status(self): | |
1449 # Internally pid_exists relies on /proc/{pid}/status. | |
1450 # Emulate a case where this file is empty in which case | |
1451 # psutil is supposed to fall back on using pids(). | |
1452 with mock_open_content("/proc/%s/status", "") as m: | |
1453 assert psutil.pid_exists(os.getpid()) | |
1454 assert m.called | |
1455 | |
1456 | |
1457 # ===================================================================== | |
1458 # --- sensors | |
1459 # ===================================================================== | |
1460 | |
1461 | |
1462 @unittest.skipIf(not LINUX, "LINUX only") | |
1463 @unittest.skipIf(not HAS_BATTERY, "no battery") | |
1464 class TestSensorsBattery(PsutilTestCase): | |
1465 | |
1466 @unittest.skipIf(not which("acpi"), "acpi utility not available") | |
1467 def test_percent(self): | |
1468 out = sh("acpi -b") | |
1469 acpi_value = int(out.split(",")[1].strip().replace('%', '')) | |
1470 psutil_value = psutil.sensors_battery().percent | |
1471 self.assertAlmostEqual(acpi_value, psutil_value, delta=1) | |
1472 | |
1473 def test_emulate_power_plugged(self): | |
1474 # Pretend the AC power cable is connected. | |
1475 def open_mock(name, *args, **kwargs): | |
1476 if name.endswith("AC0/online") or name.endswith("AC/online"): | |
1477 return io.BytesIO(b"1") | |
1478 else: | |
1479 return orig_open(name, *args, **kwargs) | |
1480 | |
1481 orig_open = open | |
1482 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
1483 with mock.patch(patch_point, side_effect=open_mock) as m: | |
1484 self.assertEqual(psutil.sensors_battery().power_plugged, True) | |
1485 self.assertEqual( | |
1486 psutil.sensors_battery().secsleft, psutil.POWER_TIME_UNLIMITED) | |
1487 assert m.called | |
1488 | |
1489 def test_emulate_power_plugged_2(self): | |
1490 # Same as above but pretend /AC0/online does not exist in which | |
1491 # case code relies on /status file. | |
1492 def open_mock(name, *args, **kwargs): | |
1493 if name.endswith("AC0/online") or name.endswith("AC/online"): | |
1494 raise IOError(errno.ENOENT, "") | |
1495 elif name.endswith("/status"): | |
1496 return io.StringIO(u("charging")) | |
1497 else: | |
1498 return orig_open(name, *args, **kwargs) | |
1499 | |
1500 orig_open = open | |
1501 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
1502 with mock.patch(patch_point, side_effect=open_mock) as m: | |
1503 self.assertEqual(psutil.sensors_battery().power_plugged, True) | |
1504 assert m.called | |
1505 | |
1506 def test_emulate_power_not_plugged(self): | |
1507 # Pretend the AC power cable is not connected. | |
1508 def open_mock(name, *args, **kwargs): | |
1509 if name.endswith("AC0/online") or name.endswith("AC/online"): | |
1510 return io.BytesIO(b"0") | |
1511 else: | |
1512 return orig_open(name, *args, **kwargs) | |
1513 | |
1514 orig_open = open | |
1515 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
1516 with mock.patch(patch_point, side_effect=open_mock) as m: | |
1517 self.assertEqual(psutil.sensors_battery().power_plugged, False) | |
1518 assert m.called | |
1519 | |
1520 def test_emulate_power_not_plugged_2(self): | |
1521 # Same as above but pretend /AC0/online does not exist in which | |
1522 # case code relies on /status file. | |
1523 def open_mock(name, *args, **kwargs): | |
1524 if name.endswith("AC0/online") or name.endswith("AC/online"): | |
1525 raise IOError(errno.ENOENT, "") | |
1526 elif name.endswith("/status"): | |
1527 return io.StringIO(u("discharging")) | |
1528 else: | |
1529 return orig_open(name, *args, **kwargs) | |
1530 | |
1531 orig_open = open | |
1532 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
1533 with mock.patch(patch_point, side_effect=open_mock) as m: | |
1534 self.assertEqual(psutil.sensors_battery().power_plugged, False) | |
1535 assert m.called | |
1536 | |
1537 def test_emulate_power_undetermined(self): | |
1538 # Pretend we can't know whether the AC power cable not | |
1539 # connected (assert fallback to False). | |
1540 def open_mock(name, *args, **kwargs): | |
1541 if name.startswith("/sys/class/power_supply/AC0/online") or \ | |
1542 name.startswith("/sys/class/power_supply/AC/online"): | |
1543 raise IOError(errno.ENOENT, "") | |
1544 elif name.startswith("/sys/class/power_supply/BAT0/status"): | |
1545 return io.BytesIO(b"???") | |
1546 else: | |
1547 return orig_open(name, *args, **kwargs) | |
1548 | |
1549 orig_open = open | |
1550 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
1551 with mock.patch(patch_point, side_effect=open_mock) as m: | |
1552 self.assertIsNone(psutil.sensors_battery().power_plugged) | |
1553 assert m.called | |
1554 | |
1555 def test_emulate_energy_full_0(self): | |
1556 # Emulate a case where energy_full files returns 0. | |
1557 with mock_open_content( | |
1558 "/sys/class/power_supply/BAT0/energy_full", b"0") as m: | |
1559 self.assertEqual(psutil.sensors_battery().percent, 0) | |
1560 assert m.called | |
1561 | |
1562 def test_emulate_energy_full_not_avail(self): | |
1563 # Emulate a case where energy_full file does not exist. | |
1564 # Expected fallback on /capacity. | |
1565 with mock_open_exception( | |
1566 "/sys/class/power_supply/BAT0/energy_full", | |
1567 IOError(errno.ENOENT, "")): | |
1568 with mock_open_exception( | |
1569 "/sys/class/power_supply/BAT0/charge_full", | |
1570 IOError(errno.ENOENT, "")): | |
1571 with mock_open_content( | |
1572 "/sys/class/power_supply/BAT0/capacity", b"88"): | |
1573 self.assertEqual(psutil.sensors_battery().percent, 88) | |
1574 | |
1575 def test_emulate_no_power(self): | |
1576 # Emulate a case where /AC0/online file nor /BAT0/status exist. | |
1577 with mock_open_exception( | |
1578 "/sys/class/power_supply/AC/online", | |
1579 IOError(errno.ENOENT, "")): | |
1580 with mock_open_exception( | |
1581 "/sys/class/power_supply/AC0/online", | |
1582 IOError(errno.ENOENT, "")): | |
1583 with mock_open_exception( | |
1584 "/sys/class/power_supply/BAT0/status", | |
1585 IOError(errno.ENOENT, "")): | |
1586 self.assertIsNone(psutil.sensors_battery().power_plugged) | |
1587 | |
1588 | |
1589 @unittest.skipIf(not LINUX, "LINUX only") | |
1590 class TestSensorsBatteryEmulated(PsutilTestCase): | |
1591 | |
1592 def test_it(self): | |
1593 def open_mock(name, *args, **kwargs): | |
1594 if name.endswith("/energy_now"): | |
1595 return io.StringIO(u("60000000")) | |
1596 elif name.endswith("/power_now"): | |
1597 return io.StringIO(u("0")) | |
1598 elif name.endswith("/energy_full"): | |
1599 return io.StringIO(u("60000001")) | |
1600 else: | |
1601 return orig_open(name, *args, **kwargs) | |
1602 | |
1603 orig_open = open | |
1604 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
1605 with mock.patch('os.listdir', return_value=["BAT0"]) as mlistdir: | |
1606 with mock.patch(patch_point, side_effect=open_mock) as mopen: | |
1607 self.assertIsNotNone(psutil.sensors_battery()) | |
1608 assert mlistdir.called | |
1609 assert mopen.called | |
1610 | |
1611 | |
1612 @unittest.skipIf(not LINUX, "LINUX only") | |
1613 class TestSensorsTemperatures(PsutilTestCase): | |
1614 | |
1615 def test_emulate_class_hwmon(self): | |
1616 def open_mock(name, *args, **kwargs): | |
1617 if name.endswith('/name'): | |
1618 return io.StringIO(u("name")) | |
1619 elif name.endswith('/temp1_label'): | |
1620 return io.StringIO(u("label")) | |
1621 elif name.endswith('/temp1_input'): | |
1622 return io.BytesIO(b"30000") | |
1623 elif name.endswith('/temp1_max'): | |
1624 return io.BytesIO(b"40000") | |
1625 elif name.endswith('/temp1_crit'): | |
1626 return io.BytesIO(b"50000") | |
1627 else: | |
1628 return orig_open(name, *args, **kwargs) | |
1629 | |
1630 orig_open = open | |
1631 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
1632 with mock.patch(patch_point, side_effect=open_mock): | |
1633 # Test case with /sys/class/hwmon | |
1634 with mock.patch('glob.glob', | |
1635 return_value=['/sys/class/hwmon/hwmon0/temp1']): | |
1636 temp = psutil.sensors_temperatures()['name'][0] | |
1637 self.assertEqual(temp.label, 'label') | |
1638 self.assertEqual(temp.current, 30.0) | |
1639 self.assertEqual(temp.high, 40.0) | |
1640 self.assertEqual(temp.critical, 50.0) | |
1641 | |
1642 def test_emulate_class_thermal(self): | |
1643 def open_mock(name, *args, **kwargs): | |
1644 if name.endswith('0_temp'): | |
1645 return io.BytesIO(b"50000") | |
1646 elif name.endswith('temp'): | |
1647 return io.BytesIO(b"30000") | |
1648 elif name.endswith('0_type'): | |
1649 return io.StringIO(u("critical")) | |
1650 elif name.endswith('type'): | |
1651 return io.StringIO(u("name")) | |
1652 else: | |
1653 return orig_open(name, *args, **kwargs) | |
1654 | |
1655 def glob_mock(path): | |
1656 if path == '/sys/class/hwmon/hwmon*/temp*_*': | |
1657 return [] | |
1658 elif path == '/sys/class/hwmon/hwmon*/device/temp*_*': | |
1659 return [] | |
1660 elif path == '/sys/class/thermal/thermal_zone*': | |
1661 return ['/sys/class/thermal/thermal_zone0'] | |
1662 elif path == '/sys/class/thermal/thermal_zone0/trip_point*': | |
1663 return ['/sys/class/thermal/thermal_zone1/trip_point_0_type', | |
1664 '/sys/class/thermal/thermal_zone1/trip_point_0_temp'] | |
1665 return [] | |
1666 | |
1667 orig_open = open | |
1668 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
1669 with mock.patch(patch_point, side_effect=open_mock): | |
1670 with mock.patch('glob.glob', create=True, side_effect=glob_mock): | |
1671 temp = psutil.sensors_temperatures()['name'][0] | |
1672 self.assertEqual(temp.label, '') | |
1673 self.assertEqual(temp.current, 30.0) | |
1674 self.assertEqual(temp.high, 50.0) | |
1675 self.assertEqual(temp.critical, 50.0) | |
1676 | |
1677 | |
1678 @unittest.skipIf(not LINUX, "LINUX only") | |
1679 class TestSensorsFans(PsutilTestCase): | |
1680 | |
1681 def test_emulate_data(self): | |
1682 def open_mock(name, *args, **kwargs): | |
1683 if name.endswith('/name'): | |
1684 return io.StringIO(u("name")) | |
1685 elif name.endswith('/fan1_label'): | |
1686 return io.StringIO(u("label")) | |
1687 elif name.endswith('/fan1_input'): | |
1688 return io.StringIO(u("2000")) | |
1689 else: | |
1690 return orig_open(name, *args, **kwargs) | |
1691 | |
1692 orig_open = open | |
1693 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
1694 with mock.patch(patch_point, side_effect=open_mock): | |
1695 with mock.patch('glob.glob', | |
1696 return_value=['/sys/class/hwmon/hwmon2/fan1']): | |
1697 fan = psutil.sensors_fans()['name'][0] | |
1698 self.assertEqual(fan.label, 'label') | |
1699 self.assertEqual(fan.current, 2000) | |
1700 | |
1701 | |
1702 # ===================================================================== | |
1703 # --- test process | |
1704 # ===================================================================== | |
1705 | |
1706 | |
1707 @unittest.skipIf(not LINUX, "LINUX only") | |
1708 class TestProcess(PsutilTestCase): | |
1709 | |
1710 @retry_on_failure() | |
1711 def test_memory_full_info(self): | |
1712 testfn = self.get_testfn() | |
1713 src = textwrap.dedent(""" | |
1714 import time | |
1715 with open("%s", "w") as f: | |
1716 time.sleep(10) | |
1717 """ % testfn) | |
1718 sproc = self.pyrun(src) | |
1719 call_until(lambda: os.listdir('.'), "'%s' not in ret" % testfn) | |
1720 p = psutil.Process(sproc.pid) | |
1721 time.sleep(.1) | |
1722 mem = p.memory_full_info() | |
1723 maps = p.memory_maps(grouped=False) | |
1724 self.assertAlmostEqual( | |
1725 mem.uss, sum([x.private_dirty + x.private_clean for x in maps]), | |
1726 delta=4096) | |
1727 self.assertAlmostEqual( | |
1728 mem.pss, sum([x.pss for x in maps]), delta=4096) | |
1729 self.assertAlmostEqual( | |
1730 mem.swap, sum([x.swap for x in maps]), delta=4096) | |
1731 | |
1732 def test_memory_full_info_mocked(self): | |
1733 # See: https://github.com/giampaolo/psutil/issues/1222 | |
1734 with mock_open_content( | |
1735 "/proc/%s/smaps" % os.getpid(), | |
1736 textwrap.dedent("""\ | |
1737 fffff0 r-xp 00000000 00:00 0 [vsyscall] | |
1738 Size: 1 kB | |
1739 Rss: 2 kB | |
1740 Pss: 3 kB | |
1741 Shared_Clean: 4 kB | |
1742 Shared_Dirty: 5 kB | |
1743 Private_Clean: 6 kB | |
1744 Private_Dirty: 7 kB | |
1745 Referenced: 8 kB | |
1746 Anonymous: 9 kB | |
1747 LazyFree: 10 kB | |
1748 AnonHugePages: 11 kB | |
1749 ShmemPmdMapped: 12 kB | |
1750 Shared_Hugetlb: 13 kB | |
1751 Private_Hugetlb: 14 kB | |
1752 Swap: 15 kB | |
1753 SwapPss: 16 kB | |
1754 KernelPageSize: 17 kB | |
1755 MMUPageSize: 18 kB | |
1756 Locked: 19 kB | |
1757 VmFlags: rd ex | |
1758 """).encode()) as m: | |
1759 p = psutil.Process() | |
1760 mem = p.memory_full_info() | |
1761 assert m.called | |
1762 self.assertEqual(mem.uss, (6 + 7 + 14) * 1024) | |
1763 self.assertEqual(mem.pss, 3 * 1024) | |
1764 self.assertEqual(mem.swap, 15 * 1024) | |
1765 | |
1766 # On PYPY file descriptors are not closed fast enough. | |
1767 @unittest.skipIf(PYPY, "unreliable on PYPY") | |
1768 def test_open_files_mode(self): | |
1769 def get_test_file(fname): | |
1770 p = psutil.Process() | |
1771 giveup_at = time.time() + GLOBAL_TIMEOUT | |
1772 while True: | |
1773 for file in p.open_files(): | |
1774 if file.path == os.path.abspath(fname): | |
1775 return file | |
1776 elif time.time() > giveup_at: | |
1777 break | |
1778 raise RuntimeError("timeout looking for test file") | |
1779 | |
1780 # | |
1781 testfn = self.get_testfn() | |
1782 with open(testfn, "w"): | |
1783 self.assertEqual(get_test_file(testfn).mode, "w") | |
1784 with open(testfn, "r"): | |
1785 self.assertEqual(get_test_file(testfn).mode, "r") | |
1786 with open(testfn, "a"): | |
1787 self.assertEqual(get_test_file(testfn).mode, "a") | |
1788 # | |
1789 with open(testfn, "r+"): | |
1790 self.assertEqual(get_test_file(testfn).mode, "r+") | |
1791 with open(testfn, "w+"): | |
1792 self.assertEqual(get_test_file(testfn).mode, "r+") | |
1793 with open(testfn, "a+"): | |
1794 self.assertEqual(get_test_file(testfn).mode, "a+") | |
1795 # note: "x" bit is not supported | |
1796 if PY3: | |
1797 safe_rmpath(testfn) | |
1798 with open(testfn, "x"): | |
1799 self.assertEqual(get_test_file(testfn).mode, "w") | |
1800 safe_rmpath(testfn) | |
1801 with open(testfn, "x+"): | |
1802 self.assertEqual(get_test_file(testfn).mode, "r+") | |
1803 | |
1804 def test_open_files_file_gone(self): | |
1805 # simulates a file which gets deleted during open_files() | |
1806 # execution | |
1807 p = psutil.Process() | |
1808 files = p.open_files() | |
1809 with open(self.get_testfn(), 'w'): | |
1810 # give the kernel some time to see the new file | |
1811 call_until(p.open_files, "len(ret) != %i" % len(files)) | |
1812 with mock.patch('psutil._pslinux.os.readlink', | |
1813 side_effect=OSError(errno.ENOENT, "")) as m: | |
1814 files = p.open_files() | |
1815 assert not files | |
1816 assert m.called | |
1817 # also simulate the case where os.readlink() returns EINVAL | |
1818 # in which case psutil is supposed to 'continue' | |
1819 with mock.patch('psutil._pslinux.os.readlink', | |
1820 side_effect=OSError(errno.EINVAL, "")) as m: | |
1821 self.assertEqual(p.open_files(), []) | |
1822 assert m.called | |
1823 | |
1824 def test_open_files_fd_gone(self): | |
1825 # Simulate a case where /proc/{pid}/fdinfo/{fd} disappears | |
1826 # while iterating through fds. | |
1827 # https://travis-ci.org/giampaolo/psutil/jobs/225694530 | |
1828 p = psutil.Process() | |
1829 files = p.open_files() | |
1830 with open(self.get_testfn(), 'w'): | |
1831 # give the kernel some time to see the new file | |
1832 call_until(p.open_files, "len(ret) != %i" % len(files)) | |
1833 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
1834 with mock.patch(patch_point, | |
1835 side_effect=IOError(errno.ENOENT, "")) as m: | |
1836 files = p.open_files() | |
1837 assert not files | |
1838 assert m.called | |
1839 | |
1840 # --- mocked tests | |
1841 | |
1842 def test_terminal_mocked(self): | |
1843 with mock.patch('psutil._pslinux._psposix.get_terminal_map', | |
1844 return_value={}) as m: | |
1845 self.assertIsNone(psutil._pslinux.Process(os.getpid()).terminal()) | |
1846 assert m.called | |
1847 | |
1848 # TODO: re-enable this test. | |
1849 # def test_num_ctx_switches_mocked(self): | |
1850 # with mock.patch('psutil._common.open', create=True) as m: | |
1851 # self.assertRaises( | |
1852 # NotImplementedError, | |
1853 # psutil._pslinux.Process(os.getpid()).num_ctx_switches) | |
1854 # assert m.called | |
1855 | |
1856 def test_cmdline_mocked(self): | |
1857 # see: https://github.com/giampaolo/psutil/issues/639 | |
1858 p = psutil.Process() | |
1859 fake_file = io.StringIO(u('foo\x00bar\x00')) | |
1860 with mock.patch('psutil._common.open', | |
1861 return_value=fake_file, create=True) as m: | |
1862 self.assertEqual(p.cmdline(), ['foo', 'bar']) | |
1863 assert m.called | |
1864 fake_file = io.StringIO(u('foo\x00bar\x00\x00')) | |
1865 with mock.patch('psutil._common.open', | |
1866 return_value=fake_file, create=True) as m: | |
1867 self.assertEqual(p.cmdline(), ['foo', 'bar', '']) | |
1868 assert m.called | |
1869 | |
1870 def test_cmdline_spaces_mocked(self): | |
1871 # see: https://github.com/giampaolo/psutil/issues/1179 | |
1872 p = psutil.Process() | |
1873 fake_file = io.StringIO(u('foo bar ')) | |
1874 with mock.patch('psutil._common.open', | |
1875 return_value=fake_file, create=True) as m: | |
1876 self.assertEqual(p.cmdline(), ['foo', 'bar']) | |
1877 assert m.called | |
1878 fake_file = io.StringIO(u('foo bar ')) | |
1879 with mock.patch('psutil._common.open', | |
1880 return_value=fake_file, create=True) as m: | |
1881 self.assertEqual(p.cmdline(), ['foo', 'bar', '']) | |
1882 assert m.called | |
1883 | |
1884 def test_cmdline_mixed_separators(self): | |
1885 # https://github.com/giampaolo/psutil/issues/ | |
1886 # 1179#issuecomment-552984549 | |
1887 p = psutil.Process() | |
1888 fake_file = io.StringIO(u('foo\x20bar\x00')) | |
1889 with mock.patch('psutil._common.open', | |
1890 return_value=fake_file, create=True) as m: | |
1891 self.assertEqual(p.cmdline(), ['foo', 'bar']) | |
1892 assert m.called | |
1893 | |
1894 def test_readlink_path_deleted_mocked(self): | |
1895 with mock.patch('psutil._pslinux.os.readlink', | |
1896 return_value='/home/foo (deleted)'): | |
1897 self.assertEqual(psutil.Process().exe(), "/home/foo") | |
1898 self.assertEqual(psutil.Process().cwd(), "/home/foo") | |
1899 | |
1900 def test_threads_mocked(self): | |
1901 # Test the case where os.listdir() returns a file (thread) | |
1902 # which no longer exists by the time we open() it (race | |
1903 # condition). threads() is supposed to ignore that instead | |
1904 # of raising NSP. | |
1905 def open_mock(name, *args, **kwargs): | |
1906 if name.startswith('/proc/%s/task' % os.getpid()): | |
1907 raise IOError(errno.ENOENT, "") | |
1908 else: | |
1909 return orig_open(name, *args, **kwargs) | |
1910 | |
1911 orig_open = open | |
1912 patch_point = 'builtins.open' if PY3 else '__builtin__.open' | |
1913 with mock.patch(patch_point, side_effect=open_mock) as m: | |
1914 ret = psutil.Process().threads() | |
1915 assert m.called | |
1916 self.assertEqual(ret, []) | |
1917 | |
1918 # ...but if it bumps into something != ENOENT we want an | |
1919 # exception. | |
1920 def open_mock(name, *args, **kwargs): | |
1921 if name.startswith('/proc/%s/task' % os.getpid()): | |
1922 raise IOError(errno.EPERM, "") | |
1923 else: | |
1924 return orig_open(name, *args, **kwargs) | |
1925 | |
1926 with mock.patch(patch_point, side_effect=open_mock): | |
1927 self.assertRaises(psutil.AccessDenied, psutil.Process().threads) | |
1928 | |
1929 def test_exe_mocked(self): | |
1930 with mock.patch('psutil._pslinux.readlink', | |
1931 side_effect=OSError(errno.ENOENT, "")) as m1: | |
1932 with mock.patch('psutil.Process.cmdline', | |
1933 side_effect=psutil.AccessDenied(0, "")) as m2: | |
1934 # No such file error; might be raised also if /proc/pid/exe | |
1935 # path actually exists for system processes with low pids | |
1936 # (about 0-20). In this case psutil is supposed to return | |
1937 # an empty string. | |
1938 ret = psutil.Process().exe() | |
1939 assert m1.called | |
1940 assert m2.called | |
1941 self.assertEqual(ret, "") | |
1942 | |
1943 # ...but if /proc/pid no longer exist we're supposed to treat | |
1944 # it as an alias for zombie process | |
1945 with mock.patch('psutil._pslinux.os.path.lexists', | |
1946 return_value=False): | |
1947 self.assertRaises( | |
1948 psutil.ZombieProcess, psutil.Process().exe) | |
1949 | |
1950 def test_issue_1014(self): | |
1951 # Emulates a case where smaps file does not exist. In this case | |
1952 # wrap_exception decorator should not raise NoSuchProcess. | |
1953 with mock_open_exception( | |
1954 '/proc/%s/smaps' % os.getpid(), | |
1955 IOError(errno.ENOENT, "")) as m: | |
1956 p = psutil.Process() | |
1957 with self.assertRaises(FileNotFoundError): | |
1958 p.memory_maps() | |
1959 assert m.called | |
1960 | |
1961 @unittest.skipIf(not HAS_RLIMIT, "not supported") | |
1962 def test_rlimit_zombie(self): | |
1963 # Emulate a case where rlimit() raises ENOSYS, which may | |
1964 # happen in case of zombie process: | |
1965 # https://travis-ci.org/giampaolo/psutil/jobs/51368273 | |
1966 with mock.patch("psutil._pslinux.prlimit", | |
1967 side_effect=OSError(errno.ENOSYS, "")) as m: | |
1968 p = psutil.Process() | |
1969 p.name() | |
1970 with self.assertRaises(psutil.ZombieProcess) as exc: | |
1971 p.rlimit(psutil.RLIMIT_NOFILE) | |
1972 assert m.called | |
1973 self.assertEqual(exc.exception.pid, p.pid) | |
1974 self.assertEqual(exc.exception.name, p.name()) | |
1975 | |
1976 def test_cwd_zombie(self): | |
1977 with mock.patch("psutil._pslinux.os.readlink", | |
1978 side_effect=OSError(errno.ENOENT, "")) as m: | |
1979 p = psutil.Process() | |
1980 p.name() | |
1981 with self.assertRaises(psutil.ZombieProcess) as exc: | |
1982 p.cwd() | |
1983 assert m.called | |
1984 self.assertEqual(exc.exception.pid, p.pid) | |
1985 self.assertEqual(exc.exception.name, p.name()) | |
1986 | |
1987 def test_stat_file_parsing(self): | |
1988 from psutil._pslinux import CLOCK_TICKS | |
1989 | |
1990 args = [ | |
1991 "0", # pid | |
1992 "(cat)", # name | |
1993 "Z", # status | |
1994 "1", # ppid | |
1995 "0", # pgrp | |
1996 "0", # session | |
1997 "0", # tty | |
1998 "0", # tpgid | |
1999 "0", # flags | |
2000 "0", # minflt | |
2001 "0", # cminflt | |
2002 "0", # majflt | |
2003 "0", # cmajflt | |
2004 "2", # utime | |
2005 "3", # stime | |
2006 "4", # cutime | |
2007 "5", # cstime | |
2008 "0", # priority | |
2009 "0", # nice | |
2010 "0", # num_threads | |
2011 "0", # itrealvalue | |
2012 "6", # starttime | |
2013 "0", # vsize | |
2014 "0", # rss | |
2015 "0", # rsslim | |
2016 "0", # startcode | |
2017 "0", # endcode | |
2018 "0", # startstack | |
2019 "0", # kstkesp | |
2020 "0", # kstkeip | |
2021 "0", # signal | |
2022 "0", # blocked | |
2023 "0", # sigignore | |
2024 "0", # sigcatch | |
2025 "0", # wchan | |
2026 "0", # nswap | |
2027 "0", # cnswap | |
2028 "0", # exit_signal | |
2029 "6", # processor | |
2030 "0", # rt priority | |
2031 "0", # policy | |
2032 "7", # delayacct_blkio_ticks | |
2033 ] | |
2034 content = " ".join(args).encode() | |
2035 with mock_open_content('/proc/%s/stat' % os.getpid(), content): | |
2036 p = psutil.Process() | |
2037 self.assertEqual(p.name(), 'cat') | |
2038 self.assertEqual(p.status(), psutil.STATUS_ZOMBIE) | |
2039 self.assertEqual(p.ppid(), 1) | |
2040 self.assertEqual( | |
2041 p.create_time(), 6 / CLOCK_TICKS + psutil.boot_time()) | |
2042 cpu = p.cpu_times() | |
2043 self.assertEqual(cpu.user, 2 / CLOCK_TICKS) | |
2044 self.assertEqual(cpu.system, 3 / CLOCK_TICKS) | |
2045 self.assertEqual(cpu.children_user, 4 / CLOCK_TICKS) | |
2046 self.assertEqual(cpu.children_system, 5 / CLOCK_TICKS) | |
2047 self.assertEqual(cpu.iowait, 7 / CLOCK_TICKS) | |
2048 self.assertEqual(p.cpu_num(), 6) | |
2049 | |
2050 def test_status_file_parsing(self): | |
2051 with mock_open_content( | |
2052 '/proc/%s/status' % os.getpid(), | |
2053 textwrap.dedent("""\ | |
2054 Uid:\t1000\t1001\t1002\t1003 | |
2055 Gid:\t1004\t1005\t1006\t1007 | |
2056 Threads:\t66 | |
2057 Cpus_allowed:\tf | |
2058 Cpus_allowed_list:\t0-7 | |
2059 voluntary_ctxt_switches:\t12 | |
2060 nonvoluntary_ctxt_switches:\t13""").encode()): | |
2061 p = psutil.Process() | |
2062 self.assertEqual(p.num_ctx_switches().voluntary, 12) | |
2063 self.assertEqual(p.num_ctx_switches().involuntary, 13) | |
2064 self.assertEqual(p.num_threads(), 66) | |
2065 uids = p.uids() | |
2066 self.assertEqual(uids.real, 1000) | |
2067 self.assertEqual(uids.effective, 1001) | |
2068 self.assertEqual(uids.saved, 1002) | |
2069 gids = p.gids() | |
2070 self.assertEqual(gids.real, 1004) | |
2071 self.assertEqual(gids.effective, 1005) | |
2072 self.assertEqual(gids.saved, 1006) | |
2073 self.assertEqual(p._proc._get_eligible_cpus(), list(range(0, 8))) | |
2074 | |
2075 | |
2076 @unittest.skipIf(not LINUX, "LINUX only") | |
2077 class TestProcessAgainstStatus(PsutilTestCase): | |
2078 """/proc/pid/stat and /proc/pid/status have many values in common. | |
2079 Whenever possible, psutil uses /proc/pid/stat (it's faster). | |
2080 For all those cases we check that the value found in | |
2081 /proc/pid/stat (by psutil) matches the one found in | |
2082 /proc/pid/status. | |
2083 """ | |
2084 | |
2085 @classmethod | |
2086 def setUpClass(cls): | |
2087 cls.proc = psutil.Process() | |
2088 | |
2089 def read_status_file(self, linestart): | |
2090 with psutil._psplatform.open_text( | |
2091 '/proc/%s/status' % self.proc.pid) as f: | |
2092 for line in f: | |
2093 line = line.strip() | |
2094 if line.startswith(linestart): | |
2095 value = line.partition('\t')[2] | |
2096 try: | |
2097 return int(value) | |
2098 except ValueError: | |
2099 return value | |
2100 raise ValueError("can't find %r" % linestart) | |
2101 | |
2102 def test_name(self): | |
2103 value = self.read_status_file("Name:") | |
2104 self.assertEqual(self.proc.name(), value) | |
2105 | |
2106 def test_status(self): | |
2107 value = self.read_status_file("State:") | |
2108 value = value[value.find('(') + 1:value.rfind(')')] | |
2109 value = value.replace(' ', '-') | |
2110 self.assertEqual(self.proc.status(), value) | |
2111 | |
2112 def test_ppid(self): | |
2113 value = self.read_status_file("PPid:") | |
2114 self.assertEqual(self.proc.ppid(), value) | |
2115 | |
2116 def test_num_threads(self): | |
2117 value = self.read_status_file("Threads:") | |
2118 self.assertEqual(self.proc.num_threads(), value) | |
2119 | |
2120 def test_uids(self): | |
2121 value = self.read_status_file("Uid:") | |
2122 value = tuple(map(int, value.split()[1:4])) | |
2123 self.assertEqual(self.proc.uids(), value) | |
2124 | |
2125 def test_gids(self): | |
2126 value = self.read_status_file("Gid:") | |
2127 value = tuple(map(int, value.split()[1:4])) | |
2128 self.assertEqual(self.proc.gids(), value) | |
2129 | |
2130 @retry_on_failure() | |
2131 def test_num_ctx_switches(self): | |
2132 value = self.read_status_file("voluntary_ctxt_switches:") | |
2133 self.assertEqual(self.proc.num_ctx_switches().voluntary, value) | |
2134 value = self.read_status_file("nonvoluntary_ctxt_switches:") | |
2135 self.assertEqual(self.proc.num_ctx_switches().involuntary, value) | |
2136 | |
2137 def test_cpu_affinity(self): | |
2138 value = self.read_status_file("Cpus_allowed_list:") | |
2139 if '-' in str(value): | |
2140 min_, max_ = map(int, value.split('-')) | |
2141 self.assertEqual( | |
2142 self.proc.cpu_affinity(), list(range(min_, max_ + 1))) | |
2143 | |
2144 def test_cpu_affinity_eligible_cpus(self): | |
2145 value = self.read_status_file("Cpus_allowed_list:") | |
2146 with mock.patch("psutil._pslinux.per_cpu_times") as m: | |
2147 self.proc._proc._get_eligible_cpus() | |
2148 if '-' in str(value): | |
2149 assert not m.called | |
2150 else: | |
2151 assert m.called | |
2152 | |
2153 | |
2154 # ===================================================================== | |
2155 # --- test utils | |
2156 # ===================================================================== | |
2157 | |
2158 | |
2159 @unittest.skipIf(not LINUX, "LINUX only") | |
2160 class TestUtils(PsutilTestCase): | |
2161 | |
2162 def test_readlink(self): | |
2163 with mock.patch("os.readlink", return_value="foo (deleted)") as m: | |
2164 self.assertEqual(psutil._psplatform.readlink("bar"), "foo") | |
2165 assert m.called | |
2166 | |
2167 def test_cat(self): | |
2168 testfn = self.get_testfn() | |
2169 with open(testfn, "wt") as f: | |
2170 f.write("foo ") | |
2171 self.assertEqual(psutil._psplatform.cat(testfn, binary=False), "foo") | |
2172 self.assertEqual(psutil._psplatform.cat(testfn, binary=True), b"foo") | |
2173 self.assertEqual( | |
2174 psutil._psplatform.cat(testfn + '??', fallback="bar"), "bar") | |
2175 | |
2176 | |
2177 if __name__ == '__main__': | |
2178 from psutil.tests.runner import run_from_name | |
2179 run_from_name(__file__) |