Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/psutil/tests/test_misc.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 #!/usr/bin/env python3 | |
2 # -*- coding: utf-8 -*- | |
3 | |
4 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. | |
5 # Use of this source code is governed by a BSD-style license that can be | |
6 # found in the LICENSE file. | |
7 | |
8 """ | |
9 Miscellaneous tests. | |
10 """ | |
11 | |
12 import ast | |
13 import collections | |
14 import errno | |
15 import json | |
16 import os | |
17 import pickle | |
18 import socket | |
19 import stat | |
20 import sys | |
21 | |
22 from psutil import LINUX | |
23 from psutil import POSIX | |
24 from psutil import WINDOWS | |
25 from psutil._common import memoize | |
26 from psutil._common import memoize_when_activated | |
27 from psutil._common import supports_ipv6 | |
28 from psutil._common import wrap_numbers | |
29 from psutil._compat import PY3 | |
30 from psutil.tests import APPVEYOR | |
31 from psutil.tests import CI_TESTING | |
32 from psutil.tests import HAS_BATTERY | |
33 from psutil.tests import HAS_MEMORY_MAPS | |
34 from psutil.tests import HAS_NET_IO_COUNTERS | |
35 from psutil.tests import HAS_SENSORS_BATTERY | |
36 from psutil.tests import HAS_SENSORS_FANS | |
37 from psutil.tests import HAS_SENSORS_TEMPERATURES | |
38 from psutil.tests import import_module_by_path | |
39 from psutil.tests import mock | |
40 from psutil.tests import PsutilTestCase | |
41 from psutil.tests import PYTHON_EXE | |
42 from psutil.tests import reload_module | |
43 from psutil.tests import ROOT_DIR | |
44 from psutil.tests import SCRIPTS_DIR | |
45 from psutil.tests import sh | |
46 from psutil.tests import unittest | |
47 import psutil | |
48 import psutil.tests | |
49 | |
50 | |
51 PYTHON_39 = sys.version_info[:2] == (3, 9) | |
52 | |
53 | |
54 # =================================================================== | |
55 # --- Misc / generic tests. | |
56 # =================================================================== | |
57 | |
58 | |
59 class TestMisc(PsutilTestCase): | |
60 | |
61 def test_process__repr__(self, func=repr): | |
62 p = psutil.Process(self.spawn_testproc().pid) | |
63 r = func(p) | |
64 self.assertIn("psutil.Process", r) | |
65 self.assertIn("pid=%s" % p.pid, r) | |
66 self.assertIn("name='%s'" % str(p.name()), | |
67 r.replace("name=u'", "name='")) | |
68 self.assertIn("status=", r) | |
69 self.assertNotIn("exitcode=", r) | |
70 p.terminate() | |
71 p.wait() | |
72 r = func(p) | |
73 self.assertIn("status='terminated'", r) | |
74 self.assertIn("exitcode=", r) | |
75 | |
76 with mock.patch.object(psutil.Process, "name", | |
77 side_effect=psutil.ZombieProcess(os.getpid())): | |
78 p = psutil.Process() | |
79 r = func(p) | |
80 self.assertIn("pid=%s" % p.pid, r) | |
81 self.assertIn("status='zombie'", r) | |
82 self.assertNotIn("name=", r) | |
83 with mock.patch.object(psutil.Process, "name", | |
84 side_effect=psutil.NoSuchProcess(os.getpid())): | |
85 p = psutil.Process() | |
86 r = func(p) | |
87 self.assertIn("pid=%s" % p.pid, r) | |
88 self.assertIn("terminated", r) | |
89 self.assertNotIn("name=", r) | |
90 with mock.patch.object(psutil.Process, "name", | |
91 side_effect=psutil.AccessDenied(os.getpid())): | |
92 p = psutil.Process() | |
93 r = func(p) | |
94 self.assertIn("pid=%s" % p.pid, r) | |
95 self.assertNotIn("name=", r) | |
96 | |
97 def test_process__str__(self): | |
98 self.test_process__repr__(func=str) | |
99 | |
100 def test_no_such_process__repr__(self, func=repr): | |
101 self.assertEqual( | |
102 repr(psutil.NoSuchProcess(321)), | |
103 "psutil.NoSuchProcess process no longer exists (pid=321)") | |
104 self.assertEqual( | |
105 repr(psutil.NoSuchProcess(321, name='foo')), | |
106 "psutil.NoSuchProcess process no longer exists (pid=321, " | |
107 "name='foo')") | |
108 self.assertEqual( | |
109 repr(psutil.NoSuchProcess(321, msg='foo')), | |
110 "psutil.NoSuchProcess foo") | |
111 | |
112 def test_zombie_process__repr__(self, func=repr): | |
113 self.assertEqual( | |
114 repr(psutil.ZombieProcess(321)), | |
115 "psutil.ZombieProcess process still exists but it's a zombie " | |
116 "(pid=321)") | |
117 self.assertEqual( | |
118 repr(psutil.ZombieProcess(321, name='foo')), | |
119 "psutil.ZombieProcess process still exists but it's a zombie " | |
120 "(pid=321, name='foo')") | |
121 self.assertEqual( | |
122 repr(psutil.ZombieProcess(321, name='foo', ppid=1)), | |
123 "psutil.ZombieProcess process still exists but it's a zombie " | |
124 "(pid=321, name='foo', ppid=1)") | |
125 self.assertEqual( | |
126 repr(psutil.ZombieProcess(321, msg='foo')), | |
127 "psutil.ZombieProcess foo") | |
128 | |
129 def test_access_denied__repr__(self, func=repr): | |
130 self.assertEqual( | |
131 repr(psutil.AccessDenied(321)), | |
132 "psutil.AccessDenied (pid=321)") | |
133 self.assertEqual( | |
134 repr(psutil.AccessDenied(321, name='foo')), | |
135 "psutil.AccessDenied (pid=321, name='foo')") | |
136 self.assertEqual( | |
137 repr(psutil.AccessDenied(321, msg='foo')), | |
138 "psutil.AccessDenied foo") | |
139 | |
140 def test_timeout_expired__repr__(self, func=repr): | |
141 self.assertEqual( | |
142 repr(psutil.TimeoutExpired(321)), | |
143 "psutil.TimeoutExpired timeout after 321 seconds") | |
144 self.assertEqual( | |
145 repr(psutil.TimeoutExpired(321, pid=111)), | |
146 "psutil.TimeoutExpired timeout after 321 seconds (pid=111)") | |
147 self.assertEqual( | |
148 repr(psutil.TimeoutExpired(321, pid=111, name='foo')), | |
149 "psutil.TimeoutExpired timeout after 321 seconds " | |
150 "(pid=111, name='foo')") | |
151 | |
152 def test_process__eq__(self): | |
153 p1 = psutil.Process() | |
154 p2 = psutil.Process() | |
155 self.assertEqual(p1, p2) | |
156 p2._ident = (0, 0) | |
157 self.assertNotEqual(p1, p2) | |
158 self.assertNotEqual(p1, 'foo') | |
159 | |
160 def test_process__hash__(self): | |
161 s = set([psutil.Process(), psutil.Process()]) | |
162 self.assertEqual(len(s), 1) | |
163 | |
164 def test__all__(self): | |
165 dir_psutil = dir(psutil) | |
166 for name in dir_psutil: | |
167 if name in ('long', 'tests', 'test', 'PermissionError', | |
168 'ProcessLookupError'): | |
169 continue | |
170 if not name.startswith('_'): | |
171 try: | |
172 __import__(name) | |
173 except ImportError: | |
174 if name not in psutil.__all__: | |
175 fun = getattr(psutil, name) | |
176 if fun is None: | |
177 continue | |
178 if (fun.__doc__ is not None and | |
179 'deprecated' not in fun.__doc__.lower()): | |
180 self.fail('%r not in psutil.__all__' % name) | |
181 | |
182 # Import 'star' will break if __all__ is inconsistent, see: | |
183 # https://github.com/giampaolo/psutil/issues/656 | |
184 # Can't do `from psutil import *` as it won't work on python 3 | |
185 # so we simply iterate over __all__. | |
186 for name in psutil.__all__: | |
187 self.assertIn(name, dir_psutil) | |
188 | |
189 def test_version(self): | |
190 self.assertEqual('.'.join([str(x) for x in psutil.version_info]), | |
191 psutil.__version__) | |
192 | |
193 def test_process_as_dict_no_new_names(self): | |
194 # See https://github.com/giampaolo/psutil/issues/813 | |
195 p = psutil.Process() | |
196 p.foo = '1' | |
197 self.assertNotIn('foo', p.as_dict()) | |
198 | |
199 def test_memoize(self): | |
200 @memoize | |
201 def foo(*args, **kwargs): | |
202 "foo docstring" | |
203 calls.append(None) | |
204 return (args, kwargs) | |
205 | |
206 calls = [] | |
207 # no args | |
208 for x in range(2): | |
209 ret = foo() | |
210 expected = ((), {}) | |
211 self.assertEqual(ret, expected) | |
212 self.assertEqual(len(calls), 1) | |
213 # with args | |
214 for x in range(2): | |
215 ret = foo(1) | |
216 expected = ((1, ), {}) | |
217 self.assertEqual(ret, expected) | |
218 self.assertEqual(len(calls), 2) | |
219 # with args + kwargs | |
220 for x in range(2): | |
221 ret = foo(1, bar=2) | |
222 expected = ((1, ), {'bar': 2}) | |
223 self.assertEqual(ret, expected) | |
224 self.assertEqual(len(calls), 3) | |
225 # clear cache | |
226 foo.cache_clear() | |
227 ret = foo() | |
228 expected = ((), {}) | |
229 self.assertEqual(ret, expected) | |
230 self.assertEqual(len(calls), 4) | |
231 # docstring | |
232 self.assertEqual(foo.__doc__, "foo docstring") | |
233 | |
234 def test_memoize_when_activated(self): | |
235 class Foo: | |
236 | |
237 @memoize_when_activated | |
238 def foo(self): | |
239 calls.append(None) | |
240 | |
241 f = Foo() | |
242 calls = [] | |
243 f.foo() | |
244 f.foo() | |
245 self.assertEqual(len(calls), 2) | |
246 | |
247 # activate | |
248 calls = [] | |
249 f.foo.cache_activate(f) | |
250 f.foo() | |
251 f.foo() | |
252 self.assertEqual(len(calls), 1) | |
253 | |
254 # deactivate | |
255 calls = [] | |
256 f.foo.cache_deactivate(f) | |
257 f.foo() | |
258 f.foo() | |
259 self.assertEqual(len(calls), 2) | |
260 | |
261 def test_parse_environ_block(self): | |
262 from psutil._common import parse_environ_block | |
263 | |
264 def k(s): | |
265 return s.upper() if WINDOWS else s | |
266 | |
267 self.assertEqual(parse_environ_block("a=1\0"), | |
268 {k("a"): "1"}) | |
269 self.assertEqual(parse_environ_block("a=1\0b=2\0\0"), | |
270 {k("a"): "1", k("b"): "2"}) | |
271 self.assertEqual(parse_environ_block("a=1\0b=\0\0"), | |
272 {k("a"): "1", k("b"): ""}) | |
273 # ignore everything after \0\0 | |
274 self.assertEqual(parse_environ_block("a=1\0b=2\0\0c=3\0"), | |
275 {k("a"): "1", k("b"): "2"}) | |
276 # ignore everything that is not an assignment | |
277 self.assertEqual(parse_environ_block("xxx\0a=1\0"), {k("a"): "1"}) | |
278 self.assertEqual(parse_environ_block("a=1\0=b=2\0"), {k("a"): "1"}) | |
279 # do not fail if the block is incomplete | |
280 self.assertEqual(parse_environ_block("a=1\0b=2"), {k("a"): "1"}) | |
281 | |
282 def test_supports_ipv6(self): | |
283 self.addCleanup(supports_ipv6.cache_clear) | |
284 if supports_ipv6(): | |
285 with mock.patch('psutil._common.socket') as s: | |
286 s.has_ipv6 = False | |
287 supports_ipv6.cache_clear() | |
288 assert not supports_ipv6() | |
289 | |
290 supports_ipv6.cache_clear() | |
291 with mock.patch('psutil._common.socket.socket', | |
292 side_effect=socket.error) as s: | |
293 assert not supports_ipv6() | |
294 assert s.called | |
295 | |
296 supports_ipv6.cache_clear() | |
297 with mock.patch('psutil._common.socket.socket', | |
298 side_effect=socket.gaierror) as s: | |
299 assert not supports_ipv6() | |
300 supports_ipv6.cache_clear() | |
301 assert s.called | |
302 | |
303 supports_ipv6.cache_clear() | |
304 with mock.patch('psutil._common.socket.socket.bind', | |
305 side_effect=socket.gaierror) as s: | |
306 assert not supports_ipv6() | |
307 supports_ipv6.cache_clear() | |
308 assert s.called | |
309 else: | |
310 with self.assertRaises(Exception): | |
311 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) | |
312 try: | |
313 sock.bind(("::1", 0)) | |
314 finally: | |
315 sock.close() | |
316 | |
317 def test_isfile_strict(self): | |
318 from psutil._common import isfile_strict | |
319 this_file = os.path.abspath(__file__) | |
320 assert isfile_strict(this_file) | |
321 assert not isfile_strict(os.path.dirname(this_file)) | |
322 with mock.patch('psutil._common.os.stat', | |
323 side_effect=OSError(errno.EPERM, "foo")): | |
324 self.assertRaises(OSError, isfile_strict, this_file) | |
325 with mock.patch('psutil._common.os.stat', | |
326 side_effect=OSError(errno.EACCES, "foo")): | |
327 self.assertRaises(OSError, isfile_strict, this_file) | |
328 with mock.patch('psutil._common.os.stat', | |
329 side_effect=OSError(errno.ENOENT, "foo")): | |
330 assert not isfile_strict(this_file) | |
331 with mock.patch('psutil._common.stat.S_ISREG', return_value=False): | |
332 assert not isfile_strict(this_file) | |
333 | |
334 def test_serialization(self): | |
335 def check(ret): | |
336 if json is not None: | |
337 json.loads(json.dumps(ret)) | |
338 a = pickle.dumps(ret) | |
339 b = pickle.loads(a) | |
340 self.assertEqual(ret, b) | |
341 | |
342 check(psutil.Process().as_dict()) | |
343 check(psutil.virtual_memory()) | |
344 check(psutil.swap_memory()) | |
345 check(psutil.cpu_times()) | |
346 check(psutil.cpu_times_percent(interval=0)) | |
347 check(psutil.net_io_counters()) | |
348 if LINUX and not os.path.exists('/proc/diskstats'): | |
349 pass | |
350 else: | |
351 if not APPVEYOR: | |
352 check(psutil.disk_io_counters()) | |
353 check(psutil.disk_partitions()) | |
354 check(psutil.disk_usage(os.getcwd())) | |
355 check(psutil.users()) | |
356 | |
357 def test_setup_script(self): | |
358 setup_py = os.path.join(ROOT_DIR, 'setup.py') | |
359 if CI_TESTING and not os.path.exists(setup_py): | |
360 return self.skipTest("can't find setup.py") | |
361 module = import_module_by_path(setup_py) | |
362 self.assertRaises(SystemExit, module.setup) | |
363 self.assertEqual(module.get_version(), psutil.__version__) | |
364 | |
365 def test_ad_on_process_creation(self): | |
366 # We are supposed to be able to instantiate Process also in case | |
367 # of zombie processes or access denied. | |
368 with mock.patch.object(psutil.Process, 'create_time', | |
369 side_effect=psutil.AccessDenied) as meth: | |
370 psutil.Process() | |
371 assert meth.called | |
372 with mock.patch.object(psutil.Process, 'create_time', | |
373 side_effect=psutil.ZombieProcess(1)) as meth: | |
374 psutil.Process() | |
375 assert meth.called | |
376 with mock.patch.object(psutil.Process, 'create_time', | |
377 side_effect=ValueError) as meth: | |
378 with self.assertRaises(ValueError): | |
379 psutil.Process() | |
380 assert meth.called | |
381 | |
382 def test_sanity_version_check(self): | |
383 # see: https://github.com/giampaolo/psutil/issues/564 | |
384 with mock.patch( | |
385 "psutil._psplatform.cext.version", return_value="0.0.0"): | |
386 with self.assertRaises(ImportError) as cm: | |
387 reload_module(psutil) | |
388 self.assertIn("version conflict", str(cm.exception).lower()) | |
389 | |
390 | |
391 # =================================================================== | |
392 # --- Tests for wrap_numbers() function. | |
393 # =================================================================== | |
394 | |
395 | |
396 nt = collections.namedtuple('foo', 'a b c') | |
397 | |
398 | |
399 class TestWrapNumbers(PsutilTestCase): | |
400 | |
401 def setUp(self): | |
402 wrap_numbers.cache_clear() | |
403 | |
404 tearDown = setUp | |
405 | |
406 def test_first_call(self): | |
407 input = {'disk1': nt(5, 5, 5)} | |
408 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
409 | |
410 def test_input_hasnt_changed(self): | |
411 input = {'disk1': nt(5, 5, 5)} | |
412 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
413 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
414 | |
415 def test_increase_but_no_wrap(self): | |
416 input = {'disk1': nt(5, 5, 5)} | |
417 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
418 input = {'disk1': nt(10, 15, 20)} | |
419 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
420 input = {'disk1': nt(20, 25, 30)} | |
421 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
422 input = {'disk1': nt(20, 25, 30)} | |
423 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
424 | |
425 def test_wrap(self): | |
426 # let's say 100 is the threshold | |
427 input = {'disk1': nt(100, 100, 100)} | |
428 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
429 # first wrap restarts from 10 | |
430 input = {'disk1': nt(100, 100, 10)} | |
431 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
432 {'disk1': nt(100, 100, 110)}) | |
433 # then it remains the same | |
434 input = {'disk1': nt(100, 100, 10)} | |
435 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
436 {'disk1': nt(100, 100, 110)}) | |
437 # then it goes up | |
438 input = {'disk1': nt(100, 100, 90)} | |
439 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
440 {'disk1': nt(100, 100, 190)}) | |
441 # then it wraps again | |
442 input = {'disk1': nt(100, 100, 20)} | |
443 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
444 {'disk1': nt(100, 100, 210)}) | |
445 # and remains the same | |
446 input = {'disk1': nt(100, 100, 20)} | |
447 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
448 {'disk1': nt(100, 100, 210)}) | |
449 # now wrap another num | |
450 input = {'disk1': nt(50, 100, 20)} | |
451 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
452 {'disk1': nt(150, 100, 210)}) | |
453 # and again | |
454 input = {'disk1': nt(40, 100, 20)} | |
455 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
456 {'disk1': nt(190, 100, 210)}) | |
457 # keep it the same | |
458 input = {'disk1': nt(40, 100, 20)} | |
459 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
460 {'disk1': nt(190, 100, 210)}) | |
461 | |
462 def test_changing_keys(self): | |
463 # Emulate a case where the second call to disk_io() | |
464 # (or whatever) provides a new disk, then the new disk | |
465 # disappears on the third call. | |
466 input = {'disk1': nt(5, 5, 5)} | |
467 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
468 input = {'disk1': nt(5, 5, 5), | |
469 'disk2': nt(7, 7, 7)} | |
470 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
471 input = {'disk1': nt(8, 8, 8)} | |
472 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
473 | |
474 def test_changing_keys_w_wrap(self): | |
475 input = {'disk1': nt(50, 50, 50), | |
476 'disk2': nt(100, 100, 100)} | |
477 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
478 # disk 2 wraps | |
479 input = {'disk1': nt(50, 50, 50), | |
480 'disk2': nt(100, 100, 10)} | |
481 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
482 {'disk1': nt(50, 50, 50), | |
483 'disk2': nt(100, 100, 110)}) | |
484 # disk 2 disappears | |
485 input = {'disk1': nt(50, 50, 50)} | |
486 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
487 | |
488 # then it appears again; the old wrap is supposed to be | |
489 # gone. | |
490 input = {'disk1': nt(50, 50, 50), | |
491 'disk2': nt(100, 100, 100)} | |
492 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
493 # remains the same | |
494 input = {'disk1': nt(50, 50, 50), | |
495 'disk2': nt(100, 100, 100)} | |
496 self.assertEqual(wrap_numbers(input, 'disk_io'), input) | |
497 # and then wraps again | |
498 input = {'disk1': nt(50, 50, 50), | |
499 'disk2': nt(100, 100, 10)} | |
500 self.assertEqual(wrap_numbers(input, 'disk_io'), | |
501 {'disk1': nt(50, 50, 50), | |
502 'disk2': nt(100, 100, 110)}) | |
503 | |
504 def test_real_data(self): | |
505 d = {'nvme0n1': (300, 508, 640, 1571, 5970, 1987, 2049, 451751, 47048), | |
506 'nvme0n1p1': (1171, 2, 5600256, 1024, 516, 0, 0, 0, 8), | |
507 'nvme0n1p2': (54, 54, 2396160, 5165056, 4, 24, 30, 1207, 28), | |
508 'nvme0n1p3': (2389, 4539, 5154, 150, 4828, 1844, 2019, 398, 348)} | |
509 self.assertEqual(wrap_numbers(d, 'disk_io'), d) | |
510 self.assertEqual(wrap_numbers(d, 'disk_io'), d) | |
511 # decrease this ↓ | |
512 d = {'nvme0n1': (100, 508, 640, 1571, 5970, 1987, 2049, 451751, 47048), | |
513 'nvme0n1p1': (1171, 2, 5600256, 1024, 516, 0, 0, 0, 8), | |
514 'nvme0n1p2': (54, 54, 2396160, 5165056, 4, 24, 30, 1207, 28), | |
515 'nvme0n1p3': (2389, 4539, 5154, 150, 4828, 1844, 2019, 398, 348)} | |
516 out = wrap_numbers(d, 'disk_io') | |
517 self.assertEqual(out['nvme0n1'][0], 400) | |
518 | |
519 # --- cache tests | |
520 | |
521 def test_cache_first_call(self): | |
522 input = {'disk1': nt(5, 5, 5)} | |
523 wrap_numbers(input, 'disk_io') | |
524 cache = wrap_numbers.cache_info() | |
525 self.assertEqual(cache[0], {'disk_io': input}) | |
526 self.assertEqual(cache[1], {'disk_io': {}}) | |
527 self.assertEqual(cache[2], {'disk_io': {}}) | |
528 | |
529 def test_cache_call_twice(self): | |
530 input = {'disk1': nt(5, 5, 5)} | |
531 wrap_numbers(input, 'disk_io') | |
532 input = {'disk1': nt(10, 10, 10)} | |
533 wrap_numbers(input, 'disk_io') | |
534 cache = wrap_numbers.cache_info() | |
535 self.assertEqual(cache[0], {'disk_io': input}) | |
536 self.assertEqual( | |
537 cache[1], | |
538 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 0}}) | |
539 self.assertEqual(cache[2], {'disk_io': {}}) | |
540 | |
541 def test_cache_wrap(self): | |
542 # let's say 100 is the threshold | |
543 input = {'disk1': nt(100, 100, 100)} | |
544 wrap_numbers(input, 'disk_io') | |
545 | |
546 # first wrap restarts from 10 | |
547 input = {'disk1': nt(100, 100, 10)} | |
548 wrap_numbers(input, 'disk_io') | |
549 cache = wrap_numbers.cache_info() | |
550 self.assertEqual(cache[0], {'disk_io': input}) | |
551 self.assertEqual( | |
552 cache[1], | |
553 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 100}}) | |
554 self.assertEqual(cache[2], {'disk_io': {'disk1': set([('disk1', 2)])}}) | |
555 | |
556 def assert_(): | |
557 cache = wrap_numbers.cache_info() | |
558 self.assertEqual( | |
559 cache[1], | |
560 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, | |
561 ('disk1', 2): 100}}) | |
562 self.assertEqual(cache[2], | |
563 {'disk_io': {'disk1': set([('disk1', 2)])}}) | |
564 | |
565 # then it remains the same | |
566 input = {'disk1': nt(100, 100, 10)} | |
567 wrap_numbers(input, 'disk_io') | |
568 cache = wrap_numbers.cache_info() | |
569 self.assertEqual(cache[0], {'disk_io': input}) | |
570 assert_() | |
571 | |
572 # then it goes up | |
573 input = {'disk1': nt(100, 100, 90)} | |
574 wrap_numbers(input, 'disk_io') | |
575 cache = wrap_numbers.cache_info() | |
576 self.assertEqual(cache[0], {'disk_io': input}) | |
577 assert_() | |
578 | |
579 # then it wraps again | |
580 input = {'disk1': nt(100, 100, 20)} | |
581 wrap_numbers(input, 'disk_io') | |
582 cache = wrap_numbers.cache_info() | |
583 self.assertEqual(cache[0], {'disk_io': input}) | |
584 self.assertEqual( | |
585 cache[1], | |
586 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 190}}) | |
587 self.assertEqual(cache[2], {'disk_io': {'disk1': set([('disk1', 2)])}}) | |
588 | |
589 def test_cache_changing_keys(self): | |
590 input = {'disk1': nt(5, 5, 5)} | |
591 wrap_numbers(input, 'disk_io') | |
592 input = {'disk1': nt(5, 5, 5), | |
593 'disk2': nt(7, 7, 7)} | |
594 wrap_numbers(input, 'disk_io') | |
595 cache = wrap_numbers.cache_info() | |
596 self.assertEqual(cache[0], {'disk_io': input}) | |
597 self.assertEqual( | |
598 cache[1], | |
599 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 0}}) | |
600 self.assertEqual(cache[2], {'disk_io': {}}) | |
601 | |
602 def test_cache_clear(self): | |
603 input = {'disk1': nt(5, 5, 5)} | |
604 wrap_numbers(input, 'disk_io') | |
605 wrap_numbers(input, 'disk_io') | |
606 wrap_numbers.cache_clear('disk_io') | |
607 self.assertEqual(wrap_numbers.cache_info(), ({}, {}, {})) | |
608 wrap_numbers.cache_clear('disk_io') | |
609 wrap_numbers.cache_clear('?!?') | |
610 | |
611 @unittest.skipIf(not HAS_NET_IO_COUNTERS, 'not supported') | |
612 def test_cache_clear_public_apis(self): | |
613 if not psutil.disk_io_counters() or not psutil.net_io_counters(): | |
614 return self.skipTest("no disks or NICs available") | |
615 psutil.disk_io_counters() | |
616 psutil.net_io_counters() | |
617 caches = wrap_numbers.cache_info() | |
618 for cache in caches: | |
619 self.assertIn('psutil.disk_io_counters', cache) | |
620 self.assertIn('psutil.net_io_counters', cache) | |
621 | |
622 psutil.disk_io_counters.cache_clear() | |
623 caches = wrap_numbers.cache_info() | |
624 for cache in caches: | |
625 self.assertIn('psutil.net_io_counters', cache) | |
626 self.assertNotIn('psutil.disk_io_counters', cache) | |
627 | |
628 psutil.net_io_counters.cache_clear() | |
629 caches = wrap_numbers.cache_info() | |
630 self.assertEqual(caches, ({}, {}, {})) | |
631 | |
632 | |
633 # =================================================================== | |
634 # --- Example script tests | |
635 # =================================================================== | |
636 | |
637 | |
638 @unittest.skipIf(not os.path.exists(SCRIPTS_DIR), | |
639 "can't locate scripts directory") | |
640 class TestScripts(PsutilTestCase): | |
641 """Tests for scripts in the "scripts" directory.""" | |
642 | |
643 @staticmethod | |
644 def assert_stdout(exe, *args, **kwargs): | |
645 exe = '%s' % os.path.join(SCRIPTS_DIR, exe) | |
646 cmd = [PYTHON_EXE, exe] | |
647 for arg in args: | |
648 cmd.append(arg) | |
649 try: | |
650 out = sh(cmd, **kwargs).strip() | |
651 except RuntimeError as err: | |
652 if 'AccessDenied' in str(err): | |
653 return str(err) | |
654 else: | |
655 raise | |
656 assert out, out | |
657 return out | |
658 | |
659 @staticmethod | |
660 def assert_syntax(exe, args=None): | |
661 exe = os.path.join(SCRIPTS_DIR, exe) | |
662 if PY3: | |
663 f = open(exe, 'rt', encoding='utf8') | |
664 else: | |
665 f = open(exe, 'rt') | |
666 with f: | |
667 src = f.read() | |
668 ast.parse(src) | |
669 | |
670 def test_coverage(self): | |
671 # make sure all example scripts have a test method defined | |
672 meths = dir(self) | |
673 for name in os.listdir(SCRIPTS_DIR): | |
674 if name.endswith('.py'): | |
675 if 'test_' + os.path.splitext(name)[0] not in meths: | |
676 # self.assert_stdout(name) | |
677 self.fail('no test defined for %r script' | |
678 % os.path.join(SCRIPTS_DIR, name)) | |
679 | |
680 @unittest.skipIf(not POSIX, "POSIX only") | |
681 def test_executable(self): | |
682 for name in os.listdir(SCRIPTS_DIR): | |
683 if name.endswith('.py'): | |
684 path = os.path.join(SCRIPTS_DIR, name) | |
685 if not stat.S_IXUSR & os.stat(path)[stat.ST_MODE]: | |
686 self.fail('%r is not executable' % path) | |
687 | |
688 def test_disk_usage(self): | |
689 self.assert_stdout('disk_usage.py') | |
690 | |
691 def test_free(self): | |
692 self.assert_stdout('free.py') | |
693 | |
694 def test_meminfo(self): | |
695 self.assert_stdout('meminfo.py') | |
696 | |
697 def test_procinfo(self): | |
698 self.assert_stdout('procinfo.py', str(os.getpid())) | |
699 | |
700 @unittest.skipIf(CI_TESTING and not psutil.users(), "no users") | |
701 def test_who(self): | |
702 self.assert_stdout('who.py') | |
703 | |
704 def test_ps(self): | |
705 self.assert_stdout('ps.py') | |
706 | |
707 def test_pstree(self): | |
708 self.assert_stdout('pstree.py') | |
709 | |
710 def test_netstat(self): | |
711 self.assert_stdout('netstat.py') | |
712 | |
713 def test_ifconfig(self): | |
714 self.assert_stdout('ifconfig.py') | |
715 | |
716 @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported") | |
717 def test_pmap(self): | |
718 self.assert_stdout('pmap.py', str(os.getpid())) | |
719 | |
720 def test_procsmem(self): | |
721 if 'uss' not in psutil.Process().memory_full_info()._fields: | |
722 raise self.skipTest("not supported") | |
723 self.assert_stdout('procsmem.py') | |
724 | |
725 def test_killall(self): | |
726 self.assert_syntax('killall.py') | |
727 | |
728 def test_nettop(self): | |
729 self.assert_syntax('nettop.py') | |
730 | |
731 def test_top(self): | |
732 self.assert_syntax('top.py') | |
733 | |
734 def test_iotop(self): | |
735 self.assert_syntax('iotop.py') | |
736 | |
737 def test_pidof(self): | |
738 output = self.assert_stdout('pidof.py', psutil.Process().name()) | |
739 self.assertIn(str(os.getpid()), output) | |
740 | |
741 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
742 def test_winservices(self): | |
743 self.assert_stdout('winservices.py') | |
744 | |
745 def test_cpu_distribution(self): | |
746 self.assert_syntax('cpu_distribution.py') | |
747 | |
748 @unittest.skipIf(not HAS_SENSORS_TEMPERATURES, "not supported") | |
749 def test_temperatures(self): | |
750 if not psutil.sensors_temperatures(): | |
751 self.skipTest("no temperatures") | |
752 self.assert_stdout('temperatures.py') | |
753 | |
754 @unittest.skipIf(not HAS_SENSORS_FANS, "not supported") | |
755 def test_fans(self): | |
756 if not psutil.sensors_fans(): | |
757 self.skipTest("no fans") | |
758 self.assert_stdout('fans.py') | |
759 | |
760 @unittest.skipIf(not HAS_SENSORS_BATTERY, "not supported") | |
761 @unittest.skipIf(not HAS_BATTERY, "no battery") | |
762 def test_battery(self): | |
763 self.assert_stdout('battery.py') | |
764 | |
765 @unittest.skipIf(not HAS_SENSORS_BATTERY, "not supported") | |
766 @unittest.skipIf(not HAS_BATTERY, "no battery") | |
767 def test_sensors(self): | |
768 self.assert_stdout('sensors.py') | |
769 | |
770 | |
771 if __name__ == '__main__': | |
772 from psutil.tests.runner import run_from_name | |
773 run_from_name(__file__) |