comparison env/lib/python3.9/site-packages/psutil/tests/test_misc.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3
4 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
5 # Use of this source code is governed by a BSD-style license that can be
6 # found in the LICENSE file.
7
8 """
9 Miscellaneous tests.
10 """
11
12 import ast
13 import collections
14 import errno
15 import json
16 import os
17 import pickle
18 import socket
19 import stat
20 import sys
21
22 from psutil import LINUX
23 from psutil import POSIX
24 from psutil import WINDOWS
25 from psutil._common import memoize
26 from psutil._common import memoize_when_activated
27 from psutil._common import supports_ipv6
28 from psutil._common import wrap_numbers
29 from psutil._compat import PY3
30 from psutil.tests import APPVEYOR
31 from psutil.tests import CI_TESTING
32 from psutil.tests import HAS_BATTERY
33 from psutil.tests import HAS_MEMORY_MAPS
34 from psutil.tests import HAS_NET_IO_COUNTERS
35 from psutil.tests import HAS_SENSORS_BATTERY
36 from psutil.tests import HAS_SENSORS_FANS
37 from psutil.tests import HAS_SENSORS_TEMPERATURES
38 from psutil.tests import import_module_by_path
39 from psutil.tests import mock
40 from psutil.tests import PsutilTestCase
41 from psutil.tests import PYTHON_EXE
42 from psutil.tests import reload_module
43 from psutil.tests import ROOT_DIR
44 from psutil.tests import SCRIPTS_DIR
45 from psutil.tests import sh
46 from psutil.tests import unittest
47 import psutil
48 import psutil.tests
49
50
51 PYTHON_39 = sys.version_info[:2] == (3, 9)
52
53
54 # ===================================================================
55 # --- Misc / generic tests.
56 # ===================================================================
57
58
59 class TestMisc(PsutilTestCase):
60
61 def test_process__repr__(self, func=repr):
62 p = psutil.Process(self.spawn_testproc().pid)
63 r = func(p)
64 self.assertIn("psutil.Process", r)
65 self.assertIn("pid=%s" % p.pid, r)
66 self.assertIn("name='%s'" % str(p.name()),
67 r.replace("name=u'", "name='"))
68 self.assertIn("status=", r)
69 self.assertNotIn("exitcode=", r)
70 p.terminate()
71 p.wait()
72 r = func(p)
73 self.assertIn("status='terminated'", r)
74 self.assertIn("exitcode=", r)
75
76 with mock.patch.object(psutil.Process, "name",
77 side_effect=psutil.ZombieProcess(os.getpid())):
78 p = psutil.Process()
79 r = func(p)
80 self.assertIn("pid=%s" % p.pid, r)
81 self.assertIn("status='zombie'", r)
82 self.assertNotIn("name=", r)
83 with mock.patch.object(psutil.Process, "name",
84 side_effect=psutil.NoSuchProcess(os.getpid())):
85 p = psutil.Process()
86 r = func(p)
87 self.assertIn("pid=%s" % p.pid, r)
88 self.assertIn("terminated", r)
89 self.assertNotIn("name=", r)
90 with mock.patch.object(psutil.Process, "name",
91 side_effect=psutil.AccessDenied(os.getpid())):
92 p = psutil.Process()
93 r = func(p)
94 self.assertIn("pid=%s" % p.pid, r)
95 self.assertNotIn("name=", r)
96
97 def test_process__str__(self):
98 self.test_process__repr__(func=str)
99
100 def test_no_such_process__repr__(self, func=repr):
101 self.assertEqual(
102 repr(psutil.NoSuchProcess(321)),
103 "psutil.NoSuchProcess process no longer exists (pid=321)")
104 self.assertEqual(
105 repr(psutil.NoSuchProcess(321, name='foo')),
106 "psutil.NoSuchProcess process no longer exists (pid=321, "
107 "name='foo')")
108 self.assertEqual(
109 repr(psutil.NoSuchProcess(321, msg='foo')),
110 "psutil.NoSuchProcess foo")
111
112 def test_zombie_process__repr__(self, func=repr):
113 self.assertEqual(
114 repr(psutil.ZombieProcess(321)),
115 "psutil.ZombieProcess process still exists but it's a zombie "
116 "(pid=321)")
117 self.assertEqual(
118 repr(psutil.ZombieProcess(321, name='foo')),
119 "psutil.ZombieProcess process still exists but it's a zombie "
120 "(pid=321, name='foo')")
121 self.assertEqual(
122 repr(psutil.ZombieProcess(321, name='foo', ppid=1)),
123 "psutil.ZombieProcess process still exists but it's a zombie "
124 "(pid=321, name='foo', ppid=1)")
125 self.assertEqual(
126 repr(psutil.ZombieProcess(321, msg='foo')),
127 "psutil.ZombieProcess foo")
128
129 def test_access_denied__repr__(self, func=repr):
130 self.assertEqual(
131 repr(psutil.AccessDenied(321)),
132 "psutil.AccessDenied (pid=321)")
133 self.assertEqual(
134 repr(psutil.AccessDenied(321, name='foo')),
135 "psutil.AccessDenied (pid=321, name='foo')")
136 self.assertEqual(
137 repr(psutil.AccessDenied(321, msg='foo')),
138 "psutil.AccessDenied foo")
139
140 def test_timeout_expired__repr__(self, func=repr):
141 self.assertEqual(
142 repr(psutil.TimeoutExpired(321)),
143 "psutil.TimeoutExpired timeout after 321 seconds")
144 self.assertEqual(
145 repr(psutil.TimeoutExpired(321, pid=111)),
146 "psutil.TimeoutExpired timeout after 321 seconds (pid=111)")
147 self.assertEqual(
148 repr(psutil.TimeoutExpired(321, pid=111, name='foo')),
149 "psutil.TimeoutExpired timeout after 321 seconds "
150 "(pid=111, name='foo')")
151
152 def test_process__eq__(self):
153 p1 = psutil.Process()
154 p2 = psutil.Process()
155 self.assertEqual(p1, p2)
156 p2._ident = (0, 0)
157 self.assertNotEqual(p1, p2)
158 self.assertNotEqual(p1, 'foo')
159
160 def test_process__hash__(self):
161 s = set([psutil.Process(), psutil.Process()])
162 self.assertEqual(len(s), 1)
163
164 def test__all__(self):
165 dir_psutil = dir(psutil)
166 for name in dir_psutil:
167 if name in ('long', 'tests', 'test', 'PermissionError',
168 'ProcessLookupError'):
169 continue
170 if not name.startswith('_'):
171 try:
172 __import__(name)
173 except ImportError:
174 if name not in psutil.__all__:
175 fun = getattr(psutil, name)
176 if fun is None:
177 continue
178 if (fun.__doc__ is not None and
179 'deprecated' not in fun.__doc__.lower()):
180 self.fail('%r not in psutil.__all__' % name)
181
182 # Import 'star' will break if __all__ is inconsistent, see:
183 # https://github.com/giampaolo/psutil/issues/656
184 # Can't do `from psutil import *` as it won't work on python 3
185 # so we simply iterate over __all__.
186 for name in psutil.__all__:
187 self.assertIn(name, dir_psutil)
188
189 def test_version(self):
190 self.assertEqual('.'.join([str(x) for x in psutil.version_info]),
191 psutil.__version__)
192
193 def test_process_as_dict_no_new_names(self):
194 # See https://github.com/giampaolo/psutil/issues/813
195 p = psutil.Process()
196 p.foo = '1'
197 self.assertNotIn('foo', p.as_dict())
198
199 def test_memoize(self):
200 @memoize
201 def foo(*args, **kwargs):
202 "foo docstring"
203 calls.append(None)
204 return (args, kwargs)
205
206 calls = []
207 # no args
208 for x in range(2):
209 ret = foo()
210 expected = ((), {})
211 self.assertEqual(ret, expected)
212 self.assertEqual(len(calls), 1)
213 # with args
214 for x in range(2):
215 ret = foo(1)
216 expected = ((1, ), {})
217 self.assertEqual(ret, expected)
218 self.assertEqual(len(calls), 2)
219 # with args + kwargs
220 for x in range(2):
221 ret = foo(1, bar=2)
222 expected = ((1, ), {'bar': 2})
223 self.assertEqual(ret, expected)
224 self.assertEqual(len(calls), 3)
225 # clear cache
226 foo.cache_clear()
227 ret = foo()
228 expected = ((), {})
229 self.assertEqual(ret, expected)
230 self.assertEqual(len(calls), 4)
231 # docstring
232 self.assertEqual(foo.__doc__, "foo docstring")
233
234 def test_memoize_when_activated(self):
235 class Foo:
236
237 @memoize_when_activated
238 def foo(self):
239 calls.append(None)
240
241 f = Foo()
242 calls = []
243 f.foo()
244 f.foo()
245 self.assertEqual(len(calls), 2)
246
247 # activate
248 calls = []
249 f.foo.cache_activate(f)
250 f.foo()
251 f.foo()
252 self.assertEqual(len(calls), 1)
253
254 # deactivate
255 calls = []
256 f.foo.cache_deactivate(f)
257 f.foo()
258 f.foo()
259 self.assertEqual(len(calls), 2)
260
261 def test_parse_environ_block(self):
262 from psutil._common import parse_environ_block
263
264 def k(s):
265 return s.upper() if WINDOWS else s
266
267 self.assertEqual(parse_environ_block("a=1\0"),
268 {k("a"): "1"})
269 self.assertEqual(parse_environ_block("a=1\0b=2\0\0"),
270 {k("a"): "1", k("b"): "2"})
271 self.assertEqual(parse_environ_block("a=1\0b=\0\0"),
272 {k("a"): "1", k("b"): ""})
273 # ignore everything after \0\0
274 self.assertEqual(parse_environ_block("a=1\0b=2\0\0c=3\0"),
275 {k("a"): "1", k("b"): "2"})
276 # ignore everything that is not an assignment
277 self.assertEqual(parse_environ_block("xxx\0a=1\0"), {k("a"): "1"})
278 self.assertEqual(parse_environ_block("a=1\0=b=2\0"), {k("a"): "1"})
279 # do not fail if the block is incomplete
280 self.assertEqual(parse_environ_block("a=1\0b=2"), {k("a"): "1"})
281
282 def test_supports_ipv6(self):
283 self.addCleanup(supports_ipv6.cache_clear)
284 if supports_ipv6():
285 with mock.patch('psutil._common.socket') as s:
286 s.has_ipv6 = False
287 supports_ipv6.cache_clear()
288 assert not supports_ipv6()
289
290 supports_ipv6.cache_clear()
291 with mock.patch('psutil._common.socket.socket',
292 side_effect=socket.error) as s:
293 assert not supports_ipv6()
294 assert s.called
295
296 supports_ipv6.cache_clear()
297 with mock.patch('psutil._common.socket.socket',
298 side_effect=socket.gaierror) as s:
299 assert not supports_ipv6()
300 supports_ipv6.cache_clear()
301 assert s.called
302
303 supports_ipv6.cache_clear()
304 with mock.patch('psutil._common.socket.socket.bind',
305 side_effect=socket.gaierror) as s:
306 assert not supports_ipv6()
307 supports_ipv6.cache_clear()
308 assert s.called
309 else:
310 with self.assertRaises(Exception):
311 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
312 try:
313 sock.bind(("::1", 0))
314 finally:
315 sock.close()
316
317 def test_isfile_strict(self):
318 from psutil._common import isfile_strict
319 this_file = os.path.abspath(__file__)
320 assert isfile_strict(this_file)
321 assert not isfile_strict(os.path.dirname(this_file))
322 with mock.patch('psutil._common.os.stat',
323 side_effect=OSError(errno.EPERM, "foo")):
324 self.assertRaises(OSError, isfile_strict, this_file)
325 with mock.patch('psutil._common.os.stat',
326 side_effect=OSError(errno.EACCES, "foo")):
327 self.assertRaises(OSError, isfile_strict, this_file)
328 with mock.patch('psutil._common.os.stat',
329 side_effect=OSError(errno.ENOENT, "foo")):
330 assert not isfile_strict(this_file)
331 with mock.patch('psutil._common.stat.S_ISREG', return_value=False):
332 assert not isfile_strict(this_file)
333
334 def test_serialization(self):
335 def check(ret):
336 if json is not None:
337 json.loads(json.dumps(ret))
338 a = pickle.dumps(ret)
339 b = pickle.loads(a)
340 self.assertEqual(ret, b)
341
342 check(psutil.Process().as_dict())
343 check(psutil.virtual_memory())
344 check(psutil.swap_memory())
345 check(psutil.cpu_times())
346 check(psutil.cpu_times_percent(interval=0))
347 check(psutil.net_io_counters())
348 if LINUX and not os.path.exists('/proc/diskstats'):
349 pass
350 else:
351 if not APPVEYOR:
352 check(psutil.disk_io_counters())
353 check(psutil.disk_partitions())
354 check(psutil.disk_usage(os.getcwd()))
355 check(psutil.users())
356
357 def test_setup_script(self):
358 setup_py = os.path.join(ROOT_DIR, 'setup.py')
359 if CI_TESTING and not os.path.exists(setup_py):
360 return self.skipTest("can't find setup.py")
361 module = import_module_by_path(setup_py)
362 self.assertRaises(SystemExit, module.setup)
363 self.assertEqual(module.get_version(), psutil.__version__)
364
365 def test_ad_on_process_creation(self):
366 # We are supposed to be able to instantiate Process also in case
367 # of zombie processes or access denied.
368 with mock.patch.object(psutil.Process, 'create_time',
369 side_effect=psutil.AccessDenied) as meth:
370 psutil.Process()
371 assert meth.called
372 with mock.patch.object(psutil.Process, 'create_time',
373 side_effect=psutil.ZombieProcess(1)) as meth:
374 psutil.Process()
375 assert meth.called
376 with mock.patch.object(psutil.Process, 'create_time',
377 side_effect=ValueError) as meth:
378 with self.assertRaises(ValueError):
379 psutil.Process()
380 assert meth.called
381
382 def test_sanity_version_check(self):
383 # see: https://github.com/giampaolo/psutil/issues/564
384 with mock.patch(
385 "psutil._psplatform.cext.version", return_value="0.0.0"):
386 with self.assertRaises(ImportError) as cm:
387 reload_module(psutil)
388 self.assertIn("version conflict", str(cm.exception).lower())
389
390
391 # ===================================================================
392 # --- Tests for wrap_numbers() function.
393 # ===================================================================
394
395
396 nt = collections.namedtuple('foo', 'a b c')
397
398
399 class TestWrapNumbers(PsutilTestCase):
400
401 def setUp(self):
402 wrap_numbers.cache_clear()
403
404 tearDown = setUp
405
406 def test_first_call(self):
407 input = {'disk1': nt(5, 5, 5)}
408 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
409
410 def test_input_hasnt_changed(self):
411 input = {'disk1': nt(5, 5, 5)}
412 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
413 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
414
415 def test_increase_but_no_wrap(self):
416 input = {'disk1': nt(5, 5, 5)}
417 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
418 input = {'disk1': nt(10, 15, 20)}
419 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
420 input = {'disk1': nt(20, 25, 30)}
421 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
422 input = {'disk1': nt(20, 25, 30)}
423 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
424
425 def test_wrap(self):
426 # let's say 100 is the threshold
427 input = {'disk1': nt(100, 100, 100)}
428 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
429 # first wrap restarts from 10
430 input = {'disk1': nt(100, 100, 10)}
431 self.assertEqual(wrap_numbers(input, 'disk_io'),
432 {'disk1': nt(100, 100, 110)})
433 # then it remains the same
434 input = {'disk1': nt(100, 100, 10)}
435 self.assertEqual(wrap_numbers(input, 'disk_io'),
436 {'disk1': nt(100, 100, 110)})
437 # then it goes up
438 input = {'disk1': nt(100, 100, 90)}
439 self.assertEqual(wrap_numbers(input, 'disk_io'),
440 {'disk1': nt(100, 100, 190)})
441 # then it wraps again
442 input = {'disk1': nt(100, 100, 20)}
443 self.assertEqual(wrap_numbers(input, 'disk_io'),
444 {'disk1': nt(100, 100, 210)})
445 # and remains the same
446 input = {'disk1': nt(100, 100, 20)}
447 self.assertEqual(wrap_numbers(input, 'disk_io'),
448 {'disk1': nt(100, 100, 210)})
449 # now wrap another num
450 input = {'disk1': nt(50, 100, 20)}
451 self.assertEqual(wrap_numbers(input, 'disk_io'),
452 {'disk1': nt(150, 100, 210)})
453 # and again
454 input = {'disk1': nt(40, 100, 20)}
455 self.assertEqual(wrap_numbers(input, 'disk_io'),
456 {'disk1': nt(190, 100, 210)})
457 # keep it the same
458 input = {'disk1': nt(40, 100, 20)}
459 self.assertEqual(wrap_numbers(input, 'disk_io'),
460 {'disk1': nt(190, 100, 210)})
461
462 def test_changing_keys(self):
463 # Emulate a case where the second call to disk_io()
464 # (or whatever) provides a new disk, then the new disk
465 # disappears on the third call.
466 input = {'disk1': nt(5, 5, 5)}
467 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
468 input = {'disk1': nt(5, 5, 5),
469 'disk2': nt(7, 7, 7)}
470 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
471 input = {'disk1': nt(8, 8, 8)}
472 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
473
474 def test_changing_keys_w_wrap(self):
475 input = {'disk1': nt(50, 50, 50),
476 'disk2': nt(100, 100, 100)}
477 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
478 # disk 2 wraps
479 input = {'disk1': nt(50, 50, 50),
480 'disk2': nt(100, 100, 10)}
481 self.assertEqual(wrap_numbers(input, 'disk_io'),
482 {'disk1': nt(50, 50, 50),
483 'disk2': nt(100, 100, 110)})
484 # disk 2 disappears
485 input = {'disk1': nt(50, 50, 50)}
486 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
487
488 # then it appears again; the old wrap is supposed to be
489 # gone.
490 input = {'disk1': nt(50, 50, 50),
491 'disk2': nt(100, 100, 100)}
492 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
493 # remains the same
494 input = {'disk1': nt(50, 50, 50),
495 'disk2': nt(100, 100, 100)}
496 self.assertEqual(wrap_numbers(input, 'disk_io'), input)
497 # and then wraps again
498 input = {'disk1': nt(50, 50, 50),
499 'disk2': nt(100, 100, 10)}
500 self.assertEqual(wrap_numbers(input, 'disk_io'),
501 {'disk1': nt(50, 50, 50),
502 'disk2': nt(100, 100, 110)})
503
504 def test_real_data(self):
505 d = {'nvme0n1': (300, 508, 640, 1571, 5970, 1987, 2049, 451751, 47048),
506 'nvme0n1p1': (1171, 2, 5600256, 1024, 516, 0, 0, 0, 8),
507 'nvme0n1p2': (54, 54, 2396160, 5165056, 4, 24, 30, 1207, 28),
508 'nvme0n1p3': (2389, 4539, 5154, 150, 4828, 1844, 2019, 398, 348)}
509 self.assertEqual(wrap_numbers(d, 'disk_io'), d)
510 self.assertEqual(wrap_numbers(d, 'disk_io'), d)
511 # decrease this ↓
512 d = {'nvme0n1': (100, 508, 640, 1571, 5970, 1987, 2049, 451751, 47048),
513 'nvme0n1p1': (1171, 2, 5600256, 1024, 516, 0, 0, 0, 8),
514 'nvme0n1p2': (54, 54, 2396160, 5165056, 4, 24, 30, 1207, 28),
515 'nvme0n1p3': (2389, 4539, 5154, 150, 4828, 1844, 2019, 398, 348)}
516 out = wrap_numbers(d, 'disk_io')
517 self.assertEqual(out['nvme0n1'][0], 400)
518
519 # --- cache tests
520
521 def test_cache_first_call(self):
522 input = {'disk1': nt(5, 5, 5)}
523 wrap_numbers(input, 'disk_io')
524 cache = wrap_numbers.cache_info()
525 self.assertEqual(cache[0], {'disk_io': input})
526 self.assertEqual(cache[1], {'disk_io': {}})
527 self.assertEqual(cache[2], {'disk_io': {}})
528
529 def test_cache_call_twice(self):
530 input = {'disk1': nt(5, 5, 5)}
531 wrap_numbers(input, 'disk_io')
532 input = {'disk1': nt(10, 10, 10)}
533 wrap_numbers(input, 'disk_io')
534 cache = wrap_numbers.cache_info()
535 self.assertEqual(cache[0], {'disk_io': input})
536 self.assertEqual(
537 cache[1],
538 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 0}})
539 self.assertEqual(cache[2], {'disk_io': {}})
540
541 def test_cache_wrap(self):
542 # let's say 100 is the threshold
543 input = {'disk1': nt(100, 100, 100)}
544 wrap_numbers(input, 'disk_io')
545
546 # first wrap restarts from 10
547 input = {'disk1': nt(100, 100, 10)}
548 wrap_numbers(input, 'disk_io')
549 cache = wrap_numbers.cache_info()
550 self.assertEqual(cache[0], {'disk_io': input})
551 self.assertEqual(
552 cache[1],
553 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 100}})
554 self.assertEqual(cache[2], {'disk_io': {'disk1': set([('disk1', 2)])}})
555
556 def assert_():
557 cache = wrap_numbers.cache_info()
558 self.assertEqual(
559 cache[1],
560 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0,
561 ('disk1', 2): 100}})
562 self.assertEqual(cache[2],
563 {'disk_io': {'disk1': set([('disk1', 2)])}})
564
565 # then it remains the same
566 input = {'disk1': nt(100, 100, 10)}
567 wrap_numbers(input, 'disk_io')
568 cache = wrap_numbers.cache_info()
569 self.assertEqual(cache[0], {'disk_io': input})
570 assert_()
571
572 # then it goes up
573 input = {'disk1': nt(100, 100, 90)}
574 wrap_numbers(input, 'disk_io')
575 cache = wrap_numbers.cache_info()
576 self.assertEqual(cache[0], {'disk_io': input})
577 assert_()
578
579 # then it wraps again
580 input = {'disk1': nt(100, 100, 20)}
581 wrap_numbers(input, 'disk_io')
582 cache = wrap_numbers.cache_info()
583 self.assertEqual(cache[0], {'disk_io': input})
584 self.assertEqual(
585 cache[1],
586 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 190}})
587 self.assertEqual(cache[2], {'disk_io': {'disk1': set([('disk1', 2)])}})
588
589 def test_cache_changing_keys(self):
590 input = {'disk1': nt(5, 5, 5)}
591 wrap_numbers(input, 'disk_io')
592 input = {'disk1': nt(5, 5, 5),
593 'disk2': nt(7, 7, 7)}
594 wrap_numbers(input, 'disk_io')
595 cache = wrap_numbers.cache_info()
596 self.assertEqual(cache[0], {'disk_io': input})
597 self.assertEqual(
598 cache[1],
599 {'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 0}})
600 self.assertEqual(cache[2], {'disk_io': {}})
601
602 def test_cache_clear(self):
603 input = {'disk1': nt(5, 5, 5)}
604 wrap_numbers(input, 'disk_io')
605 wrap_numbers(input, 'disk_io')
606 wrap_numbers.cache_clear('disk_io')
607 self.assertEqual(wrap_numbers.cache_info(), ({}, {}, {}))
608 wrap_numbers.cache_clear('disk_io')
609 wrap_numbers.cache_clear('?!?')
610
611 @unittest.skipIf(not HAS_NET_IO_COUNTERS, 'not supported')
612 def test_cache_clear_public_apis(self):
613 if not psutil.disk_io_counters() or not psutil.net_io_counters():
614 return self.skipTest("no disks or NICs available")
615 psutil.disk_io_counters()
616 psutil.net_io_counters()
617 caches = wrap_numbers.cache_info()
618 for cache in caches:
619 self.assertIn('psutil.disk_io_counters', cache)
620 self.assertIn('psutil.net_io_counters', cache)
621
622 psutil.disk_io_counters.cache_clear()
623 caches = wrap_numbers.cache_info()
624 for cache in caches:
625 self.assertIn('psutil.net_io_counters', cache)
626 self.assertNotIn('psutil.disk_io_counters', cache)
627
628 psutil.net_io_counters.cache_clear()
629 caches = wrap_numbers.cache_info()
630 self.assertEqual(caches, ({}, {}, {}))
631
632
633 # ===================================================================
634 # --- Example script tests
635 # ===================================================================
636
637
638 @unittest.skipIf(not os.path.exists(SCRIPTS_DIR),
639 "can't locate scripts directory")
640 class TestScripts(PsutilTestCase):
641 """Tests for scripts in the "scripts" directory."""
642
643 @staticmethod
644 def assert_stdout(exe, *args, **kwargs):
645 exe = '%s' % os.path.join(SCRIPTS_DIR, exe)
646 cmd = [PYTHON_EXE, exe]
647 for arg in args:
648 cmd.append(arg)
649 try:
650 out = sh(cmd, **kwargs).strip()
651 except RuntimeError as err:
652 if 'AccessDenied' in str(err):
653 return str(err)
654 else:
655 raise
656 assert out, out
657 return out
658
659 @staticmethod
660 def assert_syntax(exe, args=None):
661 exe = os.path.join(SCRIPTS_DIR, exe)
662 if PY3:
663 f = open(exe, 'rt', encoding='utf8')
664 else:
665 f = open(exe, 'rt')
666 with f:
667 src = f.read()
668 ast.parse(src)
669
670 def test_coverage(self):
671 # make sure all example scripts have a test method defined
672 meths = dir(self)
673 for name in os.listdir(SCRIPTS_DIR):
674 if name.endswith('.py'):
675 if 'test_' + os.path.splitext(name)[0] not in meths:
676 # self.assert_stdout(name)
677 self.fail('no test defined for %r script'
678 % os.path.join(SCRIPTS_DIR, name))
679
680 @unittest.skipIf(not POSIX, "POSIX only")
681 def test_executable(self):
682 for name in os.listdir(SCRIPTS_DIR):
683 if name.endswith('.py'):
684 path = os.path.join(SCRIPTS_DIR, name)
685 if not stat.S_IXUSR & os.stat(path)[stat.ST_MODE]:
686 self.fail('%r is not executable' % path)
687
688 def test_disk_usage(self):
689 self.assert_stdout('disk_usage.py')
690
691 def test_free(self):
692 self.assert_stdout('free.py')
693
694 def test_meminfo(self):
695 self.assert_stdout('meminfo.py')
696
697 def test_procinfo(self):
698 self.assert_stdout('procinfo.py', str(os.getpid()))
699
700 @unittest.skipIf(CI_TESTING and not psutil.users(), "no users")
701 def test_who(self):
702 self.assert_stdout('who.py')
703
704 def test_ps(self):
705 self.assert_stdout('ps.py')
706
707 def test_pstree(self):
708 self.assert_stdout('pstree.py')
709
710 def test_netstat(self):
711 self.assert_stdout('netstat.py')
712
713 def test_ifconfig(self):
714 self.assert_stdout('ifconfig.py')
715
716 @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported")
717 def test_pmap(self):
718 self.assert_stdout('pmap.py', str(os.getpid()))
719
720 def test_procsmem(self):
721 if 'uss' not in psutil.Process().memory_full_info()._fields:
722 raise self.skipTest("not supported")
723 self.assert_stdout('procsmem.py')
724
725 def test_killall(self):
726 self.assert_syntax('killall.py')
727
728 def test_nettop(self):
729 self.assert_syntax('nettop.py')
730
731 def test_top(self):
732 self.assert_syntax('top.py')
733
734 def test_iotop(self):
735 self.assert_syntax('iotop.py')
736
737 def test_pidof(self):
738 output = self.assert_stdout('pidof.py', psutil.Process().name())
739 self.assertIn(str(os.getpid()), output)
740
741 @unittest.skipIf(not WINDOWS, "WINDOWS only")
742 def test_winservices(self):
743 self.assert_stdout('winservices.py')
744
745 def test_cpu_distribution(self):
746 self.assert_syntax('cpu_distribution.py')
747
748 @unittest.skipIf(not HAS_SENSORS_TEMPERATURES, "not supported")
749 def test_temperatures(self):
750 if not psutil.sensors_temperatures():
751 self.skipTest("no temperatures")
752 self.assert_stdout('temperatures.py')
753
754 @unittest.skipIf(not HAS_SENSORS_FANS, "not supported")
755 def test_fans(self):
756 if not psutil.sensors_fans():
757 self.skipTest("no fans")
758 self.assert_stdout('fans.py')
759
760 @unittest.skipIf(not HAS_SENSORS_BATTERY, "not supported")
761 @unittest.skipIf(not HAS_BATTERY, "no battery")
762 def test_battery(self):
763 self.assert_stdout('battery.py')
764
765 @unittest.skipIf(not HAS_SENSORS_BATTERY, "not supported")
766 @unittest.skipIf(not HAS_BATTERY, "no battery")
767 def test_sensors(self):
768 self.assert_stdout('sensors.py')
769
770
771 if __name__ == '__main__':
772 from psutil.tests.runner import run_from_name
773 run_from_name(__file__)