Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/psutil/tests/test_process.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. | |
4 # Use of this source code is governed by a BSD-style license that can be | |
5 # found in the LICENSE file. | |
6 | |
7 """Tests for psutil.Process class.""" | |
8 | |
9 import collections | |
10 import errno | |
11 import getpass | |
12 import itertools | |
13 import os | |
14 import signal | |
15 import socket | |
16 import stat | |
17 import subprocess | |
18 import sys | |
19 import textwrap | |
20 import time | |
21 import types | |
22 | |
23 import psutil | |
24 | |
25 from psutil import AIX | |
26 from psutil import BSD | |
27 from psutil import LINUX | |
28 from psutil import MACOS | |
29 from psutil import NETBSD | |
30 from psutil import OPENBSD | |
31 from psutil import OSX | |
32 from psutil import POSIX | |
33 from psutil import SUNOS | |
34 from psutil import WINDOWS | |
35 from psutil._common import open_text | |
36 from psutil._compat import FileNotFoundError | |
37 from psutil._compat import long | |
38 from psutil._compat import PY3 | |
39 from psutil._compat import super | |
40 from psutil.tests import APPVEYOR | |
41 from psutil.tests import call_until | |
42 from psutil.tests import CI_TESTING | |
43 from psutil.tests import copyload_shared_lib | |
44 from psutil.tests import create_exe | |
45 from psutil.tests import GITHUB_ACTIONS | |
46 from psutil.tests import GLOBAL_TIMEOUT | |
47 from psutil.tests import HAS_CPU_AFFINITY | |
48 from psutil.tests import HAS_ENVIRON | |
49 from psutil.tests import HAS_IONICE | |
50 from psutil.tests import HAS_MEMORY_MAPS | |
51 from psutil.tests import HAS_PROC_CPU_NUM | |
52 from psutil.tests import HAS_PROC_IO_COUNTERS | |
53 from psutil.tests import HAS_RLIMIT | |
54 from psutil.tests import HAS_THREADS | |
55 from psutil.tests import mock | |
56 from psutil.tests import process_namespace | |
57 from psutil.tests import PsutilTestCase | |
58 from psutil.tests import PYPY | |
59 from psutil.tests import PYTHON_EXE | |
60 from psutil.tests import reap_children | |
61 from psutil.tests import retry_on_failure | |
62 from psutil.tests import sh | |
63 from psutil.tests import skip_on_access_denied | |
64 from psutil.tests import skip_on_not_implemented | |
65 from psutil.tests import ThreadTask | |
66 from psutil.tests import unittest | |
67 from psutil.tests import wait_for_pid | |
68 | |
69 | |
70 # =================================================================== | |
71 # --- psutil.Process class tests | |
72 # =================================================================== | |
73 | |
74 | |
75 class TestProcess(PsutilTestCase): | |
76 """Tests for psutil.Process class.""" | |
77 | |
78 def spawn_psproc(self, *args, **kwargs): | |
79 sproc = self.spawn_testproc(*args, **kwargs) | |
80 return psutil.Process(sproc.pid) | |
81 | |
82 # --- | |
83 | |
84 def test_pid(self): | |
85 p = psutil.Process() | |
86 self.assertEqual(p.pid, os.getpid()) | |
87 with self.assertRaises(AttributeError): | |
88 p.pid = 33 | |
89 | |
90 def test_kill(self): | |
91 p = self.spawn_psproc() | |
92 p.kill() | |
93 code = p.wait() | |
94 if WINDOWS: | |
95 self.assertEqual(code, signal.SIGTERM) | |
96 else: | |
97 self.assertEqual(code, -signal.SIGKILL) | |
98 self.assertProcessGone(p) | |
99 | |
100 def test_terminate(self): | |
101 p = self.spawn_psproc() | |
102 p.terminate() | |
103 code = p.wait() | |
104 if WINDOWS: | |
105 self.assertEqual(code, signal.SIGTERM) | |
106 else: | |
107 self.assertEqual(code, -signal.SIGTERM) | |
108 self.assertProcessGone(p) | |
109 | |
110 def test_send_signal(self): | |
111 sig = signal.SIGKILL if POSIX else signal.SIGTERM | |
112 p = self.spawn_psproc() | |
113 p.send_signal(sig) | |
114 code = p.wait() | |
115 if WINDOWS: | |
116 self.assertEqual(code, sig) | |
117 else: | |
118 self.assertEqual(code, -sig) | |
119 self.assertProcessGone(p) | |
120 | |
121 @unittest.skipIf(not POSIX, "not POSIX") | |
122 def test_send_signal_mocked(self): | |
123 sig = signal.SIGTERM | |
124 p = self.spawn_psproc() | |
125 with mock.patch('psutil.os.kill', | |
126 side_effect=OSError(errno.ESRCH, "")): | |
127 self.assertRaises(psutil.NoSuchProcess, p.send_signal, sig) | |
128 | |
129 p = self.spawn_psproc() | |
130 with mock.patch('psutil.os.kill', | |
131 side_effect=OSError(errno.EPERM, "")): | |
132 self.assertRaises(psutil.AccessDenied, p.send_signal, sig) | |
133 | |
134 def test_wait_exited(self): | |
135 # Test waitpid() + WIFEXITED -> WEXITSTATUS. | |
136 # normal return, same as exit(0) | |
137 cmd = [PYTHON_EXE, "-c", "pass"] | |
138 p = self.spawn_psproc(cmd) | |
139 code = p.wait() | |
140 self.assertEqual(code, 0) | |
141 self.assertProcessGone(p) | |
142 # exit(1), implicit in case of error | |
143 cmd = [PYTHON_EXE, "-c", "1 / 0"] | |
144 p = self.spawn_psproc(cmd, stderr=subprocess.PIPE) | |
145 code = p.wait() | |
146 self.assertEqual(code, 1) | |
147 self.assertProcessGone(p) | |
148 # via sys.exit() | |
149 cmd = [PYTHON_EXE, "-c", "import sys; sys.exit(5);"] | |
150 p = self.spawn_psproc(cmd) | |
151 code = p.wait() | |
152 self.assertEqual(code, 5) | |
153 self.assertProcessGone(p) | |
154 # via os._exit() | |
155 cmd = [PYTHON_EXE, "-c", "import os; os._exit(5);"] | |
156 p = self.spawn_psproc(cmd) | |
157 code = p.wait() | |
158 self.assertEqual(code, 5) | |
159 self.assertProcessGone(p) | |
160 | |
161 def test_wait_stopped(self): | |
162 p = self.spawn_psproc() | |
163 if POSIX: | |
164 # Test waitpid() + WIFSTOPPED and WIFCONTINUED. | |
165 # Note: if a process is stopped it ignores SIGTERM. | |
166 p.send_signal(signal.SIGSTOP) | |
167 self.assertRaises(psutil.TimeoutExpired, p.wait, timeout=0.001) | |
168 p.send_signal(signal.SIGCONT) | |
169 self.assertRaises(psutil.TimeoutExpired, p.wait, timeout=0.001) | |
170 p.send_signal(signal.SIGTERM) | |
171 self.assertEqual(p.wait(), -signal.SIGTERM) | |
172 self.assertEqual(p.wait(), -signal.SIGTERM) | |
173 else: | |
174 p.suspend() | |
175 self.assertRaises(psutil.TimeoutExpired, p.wait, timeout=0.001) | |
176 p.resume() | |
177 self.assertRaises(psutil.TimeoutExpired, p.wait, timeout=0.001) | |
178 p.terminate() | |
179 self.assertEqual(p.wait(), signal.SIGTERM) | |
180 self.assertEqual(p.wait(), signal.SIGTERM) | |
181 | |
182 def test_wait_non_children(self): | |
183 # Test wait() against a process which is not our direct | |
184 # child. | |
185 child, grandchild = self.spawn_children_pair() | |
186 self.assertRaises(psutil.TimeoutExpired, child.wait, 0.01) | |
187 self.assertRaises(psutil.TimeoutExpired, grandchild.wait, 0.01) | |
188 # We also terminate the direct child otherwise the | |
189 # grandchild will hang until the parent is gone. | |
190 child.terminate() | |
191 grandchild.terminate() | |
192 child_ret = child.wait() | |
193 grandchild_ret = grandchild.wait() | |
194 if POSIX: | |
195 self.assertEqual(child_ret, -signal.SIGTERM) | |
196 # For processes which are not our children we're supposed | |
197 # to get None. | |
198 self.assertEqual(grandchild_ret, None) | |
199 else: | |
200 self.assertEqual(child_ret, signal.SIGTERM) | |
201 self.assertEqual(child_ret, signal.SIGTERM) | |
202 | |
203 def test_wait_timeout(self): | |
204 p = self.spawn_psproc() | |
205 p.name() | |
206 self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01) | |
207 self.assertRaises(psutil.TimeoutExpired, p.wait, 0) | |
208 self.assertRaises(ValueError, p.wait, -1) | |
209 | |
210 def test_wait_timeout_nonblocking(self): | |
211 p = self.spawn_psproc() | |
212 self.assertRaises(psutil.TimeoutExpired, p.wait, 0) | |
213 p.kill() | |
214 stop_at = time.time() + GLOBAL_TIMEOUT | |
215 while time.time() < stop_at: | |
216 try: | |
217 code = p.wait(0) | |
218 break | |
219 except psutil.TimeoutExpired: | |
220 pass | |
221 else: | |
222 raise self.fail('timeout') | |
223 if POSIX: | |
224 self.assertEqual(code, -signal.SIGKILL) | |
225 else: | |
226 self.assertEqual(code, signal.SIGTERM) | |
227 self.assertProcessGone(p) | |
228 | |
229 def test_cpu_percent(self): | |
230 p = psutil.Process() | |
231 p.cpu_percent(interval=0.001) | |
232 p.cpu_percent(interval=0.001) | |
233 for x in range(100): | |
234 percent = p.cpu_percent(interval=None) | |
235 self.assertIsInstance(percent, float) | |
236 self.assertGreaterEqual(percent, 0.0) | |
237 with self.assertRaises(ValueError): | |
238 p.cpu_percent(interval=-1) | |
239 | |
240 def test_cpu_percent_numcpus_none(self): | |
241 # See: https://github.com/giampaolo/psutil/issues/1087 | |
242 with mock.patch('psutil.cpu_count', return_value=None) as m: | |
243 psutil.Process().cpu_percent() | |
244 assert m.called | |
245 | |
246 def test_cpu_times(self): | |
247 times = psutil.Process().cpu_times() | |
248 assert (times.user > 0.0) or (times.system > 0.0), times | |
249 assert (times.children_user >= 0.0), times | |
250 assert (times.children_system >= 0.0), times | |
251 if LINUX: | |
252 assert times.iowait >= 0.0, times | |
253 # make sure returned values can be pretty printed with strftime | |
254 for name in times._fields: | |
255 time.strftime("%H:%M:%S", time.localtime(getattr(times, name))) | |
256 | |
257 def test_cpu_times_2(self): | |
258 user_time, kernel_time = psutil.Process().cpu_times()[:2] | |
259 utime, ktime = os.times()[:2] | |
260 | |
261 # Use os.times()[:2] as base values to compare our results | |
262 # using a tolerance of +/- 0.1 seconds. | |
263 # It will fail if the difference between the values is > 0.1s. | |
264 if (max([user_time, utime]) - min([user_time, utime])) > 0.1: | |
265 self.fail("expected: %s, found: %s" % (utime, user_time)) | |
266 | |
267 if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1: | |
268 self.fail("expected: %s, found: %s" % (ktime, kernel_time)) | |
269 | |
270 @unittest.skipIf(not HAS_PROC_CPU_NUM, "not supported") | |
271 def test_cpu_num(self): | |
272 p = psutil.Process() | |
273 num = p.cpu_num() | |
274 self.assertGreaterEqual(num, 0) | |
275 if psutil.cpu_count() == 1: | |
276 self.assertEqual(num, 0) | |
277 self.assertIn(p.cpu_num(), range(psutil.cpu_count())) | |
278 | |
279 def test_create_time(self): | |
280 p = self.spawn_psproc() | |
281 now = time.time() | |
282 create_time = p.create_time() | |
283 | |
284 # Use time.time() as base value to compare our result using a | |
285 # tolerance of +/- 1 second. | |
286 # It will fail if the difference between the values is > 2s. | |
287 difference = abs(create_time - now) | |
288 if difference > 2: | |
289 self.fail("expected: %s, found: %s, difference: %s" | |
290 % (now, create_time, difference)) | |
291 | |
292 # make sure returned value can be pretty printed with strftime | |
293 time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time())) | |
294 | |
295 @unittest.skipIf(not POSIX, 'POSIX only') | |
296 def test_terminal(self): | |
297 terminal = psutil.Process().terminal() | |
298 if terminal is not None: | |
299 tty = os.path.realpath(sh('tty')) | |
300 self.assertEqual(terminal, tty) | |
301 | |
302 @unittest.skipIf(not HAS_PROC_IO_COUNTERS, 'not supported') | |
303 @skip_on_not_implemented(only_if=LINUX) | |
304 def test_io_counters(self): | |
305 p = psutil.Process() | |
306 # test reads | |
307 io1 = p.io_counters() | |
308 with open(PYTHON_EXE, 'rb') as f: | |
309 f.read() | |
310 io2 = p.io_counters() | |
311 if not BSD and not AIX: | |
312 self.assertGreater(io2.read_count, io1.read_count) | |
313 self.assertEqual(io2.write_count, io1.write_count) | |
314 if LINUX: | |
315 self.assertGreater(io2.read_chars, io1.read_chars) | |
316 self.assertEqual(io2.write_chars, io1.write_chars) | |
317 else: | |
318 self.assertGreaterEqual(io2.read_bytes, io1.read_bytes) | |
319 self.assertGreaterEqual(io2.write_bytes, io1.write_bytes) | |
320 | |
321 # test writes | |
322 io1 = p.io_counters() | |
323 with open(self.get_testfn(), 'wb') as f: | |
324 if PY3: | |
325 f.write(bytes("x" * 1000000, 'ascii')) | |
326 else: | |
327 f.write("x" * 1000000) | |
328 io2 = p.io_counters() | |
329 self.assertGreaterEqual(io2.write_count, io1.write_count) | |
330 self.assertGreaterEqual(io2.write_bytes, io1.write_bytes) | |
331 self.assertGreaterEqual(io2.read_count, io1.read_count) | |
332 self.assertGreaterEqual(io2.read_bytes, io1.read_bytes) | |
333 if LINUX: | |
334 self.assertGreater(io2.write_chars, io1.write_chars) | |
335 self.assertGreaterEqual(io2.read_chars, io1.read_chars) | |
336 | |
337 # sanity check | |
338 for i in range(len(io2)): | |
339 if BSD and i >= 2: | |
340 # On BSD read_bytes and write_bytes are always set to -1. | |
341 continue | |
342 self.assertGreaterEqual(io2[i], 0) | |
343 self.assertGreaterEqual(io2[i], 0) | |
344 | |
345 @unittest.skipIf(not HAS_IONICE, "not supported") | |
346 @unittest.skipIf(not LINUX, "linux only") | |
347 def test_ionice_linux(self): | |
348 p = psutil.Process() | |
349 if not CI_TESTING: | |
350 self.assertEqual(p.ionice()[0], psutil.IOPRIO_CLASS_NONE) | |
351 self.assertEqual(psutil.IOPRIO_CLASS_NONE, 0) | |
352 self.assertEqual(psutil.IOPRIO_CLASS_RT, 1) # high | |
353 self.assertEqual(psutil.IOPRIO_CLASS_BE, 2) # normal | |
354 self.assertEqual(psutil.IOPRIO_CLASS_IDLE, 3) # low | |
355 init = p.ionice() | |
356 try: | |
357 # low | |
358 p.ionice(psutil.IOPRIO_CLASS_IDLE) | |
359 self.assertEqual(tuple(p.ionice()), (psutil.IOPRIO_CLASS_IDLE, 0)) | |
360 with self.assertRaises(ValueError): # accepts no value | |
361 p.ionice(psutil.IOPRIO_CLASS_IDLE, value=7) | |
362 # normal | |
363 p.ionice(psutil.IOPRIO_CLASS_BE) | |
364 self.assertEqual(tuple(p.ionice()), (psutil.IOPRIO_CLASS_BE, 0)) | |
365 p.ionice(psutil.IOPRIO_CLASS_BE, value=7) | |
366 self.assertEqual(tuple(p.ionice()), (psutil.IOPRIO_CLASS_BE, 7)) | |
367 with self.assertRaises(ValueError): | |
368 p.ionice(psutil.IOPRIO_CLASS_BE, value=8) | |
369 try: | |
370 p.ionice(psutil.IOPRIO_CLASS_RT, value=7) | |
371 except psutil.AccessDenied: | |
372 pass | |
373 # errs | |
374 self.assertRaisesRegex( | |
375 ValueError, "ioclass accepts no value", | |
376 p.ionice, psutil.IOPRIO_CLASS_NONE, 1) | |
377 self.assertRaisesRegex( | |
378 ValueError, "ioclass accepts no value", | |
379 p.ionice, psutil.IOPRIO_CLASS_IDLE, 1) | |
380 self.assertRaisesRegex( | |
381 ValueError, "'ioclass' argument must be specified", | |
382 p.ionice, value=1) | |
383 finally: | |
384 ioclass, value = init | |
385 if ioclass == psutil.IOPRIO_CLASS_NONE: | |
386 value = 0 | |
387 p.ionice(ioclass, value) | |
388 | |
389 @unittest.skipIf(not HAS_IONICE, "not supported") | |
390 @unittest.skipIf(not WINDOWS, 'not supported on this win version') | |
391 def test_ionice_win(self): | |
392 p = psutil.Process() | |
393 if not CI_TESTING: | |
394 self.assertEqual(p.ionice(), psutil.IOPRIO_NORMAL) | |
395 init = p.ionice() | |
396 try: | |
397 # base | |
398 p.ionice(psutil.IOPRIO_VERYLOW) | |
399 self.assertEqual(p.ionice(), psutil.IOPRIO_VERYLOW) | |
400 p.ionice(psutil.IOPRIO_LOW) | |
401 self.assertEqual(p.ionice(), psutil.IOPRIO_LOW) | |
402 try: | |
403 p.ionice(psutil.IOPRIO_HIGH) | |
404 except psutil.AccessDenied: | |
405 pass | |
406 else: | |
407 self.assertEqual(p.ionice(), psutil.IOPRIO_HIGH) | |
408 # errs | |
409 self.assertRaisesRegex( | |
410 TypeError, "value argument not accepted on Windows", | |
411 p.ionice, psutil.IOPRIO_NORMAL, value=1) | |
412 self.assertRaisesRegex( | |
413 ValueError, "is not a valid priority", | |
414 p.ionice, psutil.IOPRIO_HIGH + 1) | |
415 finally: | |
416 p.ionice(init) | |
417 | |
418 @unittest.skipIf(not HAS_RLIMIT, "not supported") | |
419 def test_rlimit_get(self): | |
420 import resource | |
421 p = psutil.Process(os.getpid()) | |
422 names = [x for x in dir(psutil) if x.startswith('RLIMIT')] | |
423 assert names, names | |
424 for name in names: | |
425 value = getattr(psutil, name) | |
426 self.assertGreaterEqual(value, 0) | |
427 if name in dir(resource): | |
428 self.assertEqual(value, getattr(resource, name)) | |
429 # XXX - On PyPy RLIMIT_INFINITY returned by | |
430 # resource.getrlimit() is reported as a very big long | |
431 # number instead of -1. It looks like a bug with PyPy. | |
432 if PYPY: | |
433 continue | |
434 self.assertEqual(p.rlimit(value), resource.getrlimit(value)) | |
435 else: | |
436 ret = p.rlimit(value) | |
437 self.assertEqual(len(ret), 2) | |
438 self.assertGreaterEqual(ret[0], -1) | |
439 self.assertGreaterEqual(ret[1], -1) | |
440 | |
441 @unittest.skipIf(not HAS_RLIMIT, "not supported") | |
442 def test_rlimit_set(self): | |
443 p = self.spawn_psproc() | |
444 p.rlimit(psutil.RLIMIT_NOFILE, (5, 5)) | |
445 self.assertEqual(p.rlimit(psutil.RLIMIT_NOFILE), (5, 5)) | |
446 # If pid is 0 prlimit() applies to the calling process and | |
447 # we don't want that. | |
448 if LINUX: | |
449 with self.assertRaisesRegex(ValueError, "can't use prlimit"): | |
450 psutil._psplatform.Process(0).rlimit(0) | |
451 with self.assertRaises(ValueError): | |
452 p.rlimit(psutil.RLIMIT_NOFILE, (5, 5, 5)) | |
453 | |
454 @unittest.skipIf(not HAS_RLIMIT, "not supported") | |
455 def test_rlimit(self): | |
456 p = psutil.Process() | |
457 testfn = self.get_testfn() | |
458 soft, hard = p.rlimit(psutil.RLIMIT_FSIZE) | |
459 try: | |
460 p.rlimit(psutil.RLIMIT_FSIZE, (1024, hard)) | |
461 with open(testfn, "wb") as f: | |
462 f.write(b"X" * 1024) | |
463 # write() or flush() doesn't always cause the exception | |
464 # but close() will. | |
465 with self.assertRaises(IOError) as exc: | |
466 with open(testfn, "wb") as f: | |
467 f.write(b"X" * 1025) | |
468 self.assertEqual(exc.exception.errno if PY3 else exc.exception[0], | |
469 errno.EFBIG) | |
470 finally: | |
471 p.rlimit(psutil.RLIMIT_FSIZE, (soft, hard)) | |
472 self.assertEqual(p.rlimit(psutil.RLIMIT_FSIZE), (soft, hard)) | |
473 | |
474 @unittest.skipIf(not HAS_RLIMIT, "not supported") | |
475 def test_rlimit_infinity(self): | |
476 # First set a limit, then re-set it by specifying INFINITY | |
477 # and assume we overridden the previous limit. | |
478 p = psutil.Process() | |
479 soft, hard = p.rlimit(psutil.RLIMIT_FSIZE) | |
480 try: | |
481 p.rlimit(psutil.RLIMIT_FSIZE, (1024, hard)) | |
482 p.rlimit(psutil.RLIMIT_FSIZE, (psutil.RLIM_INFINITY, hard)) | |
483 with open(self.get_testfn(), "wb") as f: | |
484 f.write(b"X" * 2048) | |
485 finally: | |
486 p.rlimit(psutil.RLIMIT_FSIZE, (soft, hard)) | |
487 self.assertEqual(p.rlimit(psutil.RLIMIT_FSIZE), (soft, hard)) | |
488 | |
489 @unittest.skipIf(not HAS_RLIMIT, "not supported") | |
490 def test_rlimit_infinity_value(self): | |
491 # RLIMIT_FSIZE should be RLIM_INFINITY, which will be a really | |
492 # big number on a platform with large file support. On these | |
493 # platforms we need to test that the get/setrlimit functions | |
494 # properly convert the number to a C long long and that the | |
495 # conversion doesn't raise an error. | |
496 p = psutil.Process() | |
497 soft, hard = p.rlimit(psutil.RLIMIT_FSIZE) | |
498 self.assertEqual(psutil.RLIM_INFINITY, hard) | |
499 p.rlimit(psutil.RLIMIT_FSIZE, (soft, hard)) | |
500 | |
501 def test_num_threads(self): | |
502 # on certain platforms such as Linux we might test for exact | |
503 # thread number, since we always have with 1 thread per process, | |
504 # but this does not apply across all platforms (MACOS, Windows) | |
505 p = psutil.Process() | |
506 if OPENBSD: | |
507 try: | |
508 step1 = p.num_threads() | |
509 except psutil.AccessDenied: | |
510 raise unittest.SkipTest("on OpenBSD this requires root access") | |
511 else: | |
512 step1 = p.num_threads() | |
513 | |
514 with ThreadTask(): | |
515 step2 = p.num_threads() | |
516 self.assertEqual(step2, step1 + 1) | |
517 | |
518 @unittest.skipIf(not WINDOWS, 'WINDOWS only') | |
519 def test_num_handles(self): | |
520 # a better test is done later into test/_windows.py | |
521 p = psutil.Process() | |
522 self.assertGreater(p.num_handles(), 0) | |
523 | |
524 @unittest.skipIf(not HAS_THREADS, 'not supported') | |
525 def test_threads(self): | |
526 p = psutil.Process() | |
527 if OPENBSD: | |
528 try: | |
529 step1 = p.threads() | |
530 except psutil.AccessDenied: | |
531 raise unittest.SkipTest("on OpenBSD this requires root access") | |
532 else: | |
533 step1 = p.threads() | |
534 | |
535 with ThreadTask(): | |
536 step2 = p.threads() | |
537 self.assertEqual(len(step2), len(step1) + 1) | |
538 athread = step2[0] | |
539 # test named tuple | |
540 self.assertEqual(athread.id, athread[0]) | |
541 self.assertEqual(athread.user_time, athread[1]) | |
542 self.assertEqual(athread.system_time, athread[2]) | |
543 | |
544 @retry_on_failure() | |
545 @skip_on_access_denied(only_if=MACOS) | |
546 @unittest.skipIf(not HAS_THREADS, 'not supported') | |
547 def test_threads_2(self): | |
548 p = self.spawn_psproc() | |
549 if OPENBSD: | |
550 try: | |
551 p.threads() | |
552 except psutil.AccessDenied: | |
553 raise unittest.SkipTest( | |
554 "on OpenBSD this requires root access") | |
555 self.assertAlmostEqual( | |
556 p.cpu_times().user, | |
557 sum([x.user_time for x in p.threads()]), delta=0.1) | |
558 self.assertAlmostEqual( | |
559 p.cpu_times().system, | |
560 sum([x.system_time for x in p.threads()]), delta=0.1) | |
561 | |
562 @retry_on_failure() | |
563 def test_memory_info(self): | |
564 p = psutil.Process() | |
565 | |
566 # step 1 - get a base value to compare our results | |
567 rss1, vms1 = p.memory_info()[:2] | |
568 percent1 = p.memory_percent() | |
569 self.assertGreater(rss1, 0) | |
570 self.assertGreater(vms1, 0) | |
571 | |
572 # step 2 - allocate some memory | |
573 memarr = [None] * 1500000 | |
574 | |
575 rss2, vms2 = p.memory_info()[:2] | |
576 percent2 = p.memory_percent() | |
577 | |
578 # step 3 - make sure that the memory usage bumped up | |
579 self.assertGreater(rss2, rss1) | |
580 self.assertGreaterEqual(vms2, vms1) # vms might be equal | |
581 self.assertGreater(percent2, percent1) | |
582 del memarr | |
583 | |
584 if WINDOWS: | |
585 mem = p.memory_info() | |
586 self.assertEqual(mem.rss, mem.wset) | |
587 self.assertEqual(mem.vms, mem.pagefile) | |
588 | |
589 mem = p.memory_info() | |
590 for name in mem._fields: | |
591 self.assertGreaterEqual(getattr(mem, name), 0) | |
592 | |
593 def test_memory_full_info(self): | |
594 p = psutil.Process() | |
595 total = psutil.virtual_memory().total | |
596 mem = p.memory_full_info() | |
597 for name in mem._fields: | |
598 value = getattr(mem, name) | |
599 self.assertGreaterEqual(value, 0, msg=(name, value)) | |
600 if name == 'vms' and OSX or LINUX: | |
601 continue | |
602 self.assertLessEqual(value, total, msg=(name, value, total)) | |
603 if LINUX or WINDOWS or MACOS: | |
604 self.assertGreaterEqual(mem.uss, 0) | |
605 if LINUX: | |
606 self.assertGreaterEqual(mem.pss, 0) | |
607 self.assertGreaterEqual(mem.swap, 0) | |
608 | |
609 @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported") | |
610 def test_memory_maps(self): | |
611 p = psutil.Process() | |
612 maps = p.memory_maps() | |
613 paths = [x for x in maps] | |
614 self.assertEqual(len(paths), len(set(paths))) | |
615 ext_maps = p.memory_maps(grouped=False) | |
616 | |
617 for nt in maps: | |
618 if not nt.path.startswith('['): | |
619 assert os.path.isabs(nt.path), nt.path | |
620 if POSIX: | |
621 try: | |
622 assert os.path.exists(nt.path) or \ | |
623 os.path.islink(nt.path), nt.path | |
624 except AssertionError: | |
625 if not LINUX: | |
626 raise | |
627 else: | |
628 # https://github.com/giampaolo/psutil/issues/759 | |
629 with open_text('/proc/self/smaps') as f: | |
630 data = f.read() | |
631 if "%s (deleted)" % nt.path not in data: | |
632 raise | |
633 else: | |
634 # XXX - On Windows we have this strange behavior with | |
635 # 64 bit dlls: they are visible via explorer but cannot | |
636 # be accessed via os.stat() (wtf?). | |
637 if '64' not in os.path.basename(nt.path): | |
638 try: | |
639 st = os.stat(nt.path) | |
640 except FileNotFoundError: | |
641 pass | |
642 else: | |
643 assert stat.S_ISREG(st.st_mode), nt.path | |
644 for nt in ext_maps: | |
645 for fname in nt._fields: | |
646 value = getattr(nt, fname) | |
647 if fname == 'path': | |
648 continue | |
649 elif fname in ('addr', 'perms'): | |
650 assert value, value | |
651 else: | |
652 self.assertIsInstance(value, (int, long)) | |
653 assert value >= 0, value | |
654 | |
655 @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported") | |
656 def test_memory_maps_lists_lib(self): | |
657 # Make sure a newly loaded shared lib is listed. | |
658 p = psutil.Process() | |
659 with copyload_shared_lib() as path: | |
660 def normpath(p): | |
661 return os.path.realpath(os.path.normcase(p)) | |
662 libpaths = [normpath(x.path) | |
663 for x in p.memory_maps()] | |
664 self.assertIn(normpath(path), libpaths) | |
665 | |
666 def test_memory_percent(self): | |
667 p = psutil.Process() | |
668 p.memory_percent() | |
669 self.assertRaises(ValueError, p.memory_percent, memtype="?!?") | |
670 if LINUX or MACOS or WINDOWS: | |
671 p.memory_percent(memtype='uss') | |
672 | |
673 def test_is_running(self): | |
674 p = self.spawn_psproc() | |
675 assert p.is_running() | |
676 assert p.is_running() | |
677 p.kill() | |
678 p.wait() | |
679 assert not p.is_running() | |
680 assert not p.is_running() | |
681 | |
682 def test_exe(self): | |
683 p = self.spawn_psproc() | |
684 exe = p.exe() | |
685 try: | |
686 self.assertEqual(exe, PYTHON_EXE) | |
687 except AssertionError: | |
688 if WINDOWS and len(exe) == len(PYTHON_EXE): | |
689 # on Windows we don't care about case sensitivity | |
690 normcase = os.path.normcase | |
691 self.assertEqual(normcase(exe), normcase(PYTHON_EXE)) | |
692 else: | |
693 # certain platforms such as BSD are more accurate returning: | |
694 # "/usr/local/bin/python2.7" | |
695 # ...instead of: | |
696 # "/usr/local/bin/python" | |
697 # We do not want to consider this difference in accuracy | |
698 # an error. | |
699 ver = "%s.%s" % (sys.version_info[0], sys.version_info[1]) | |
700 try: | |
701 self.assertEqual(exe.replace(ver, ''), | |
702 PYTHON_EXE.replace(ver, '')) | |
703 except AssertionError: | |
704 # Tipically MACOS. Really not sure what to do here. | |
705 pass | |
706 | |
707 out = sh([exe, "-c", "import os; print('hey')"]) | |
708 self.assertEqual(out, 'hey') | |
709 | |
710 def test_cmdline(self): | |
711 cmdline = [PYTHON_EXE, "-c", "import time; time.sleep(60)"] | |
712 p = self.spawn_psproc(cmdline) | |
713 # XXX - most of the times the underlying sysctl() call on Net | |
714 # and Open BSD returns a truncated string. | |
715 # Also /proc/pid/cmdline behaves the same so it looks | |
716 # like this is a kernel bug. | |
717 # XXX - AIX truncates long arguments in /proc/pid/cmdline | |
718 if NETBSD or OPENBSD or AIX: | |
719 self.assertEqual(p.cmdline()[0], PYTHON_EXE) | |
720 else: | |
721 self.assertEqual(' '.join(p.cmdline()), ' '.join(cmdline)) | |
722 | |
723 @unittest.skipIf(PYPY, "broken on PYPY") | |
724 def test_long_cmdline(self): | |
725 testfn = self.get_testfn() | |
726 create_exe(testfn) | |
727 cmdline = [testfn] + (["0123456789"] * 20) | |
728 p = self.spawn_psproc(cmdline) | |
729 self.assertEqual(p.cmdline(), cmdline) | |
730 | |
731 def test_name(self): | |
732 p = self.spawn_psproc(PYTHON_EXE) | |
733 name = p.name().lower() | |
734 pyexe = os.path.basename(os.path.realpath(sys.executable)).lower() | |
735 assert pyexe.startswith(name), (pyexe, name) | |
736 | |
737 @unittest.skipIf(PYPY, "unreliable on PYPY") | |
738 def test_long_name(self): | |
739 testfn = self.get_testfn(suffix="0123456789" * 2) | |
740 create_exe(testfn) | |
741 p = self.spawn_psproc(testfn) | |
742 self.assertEqual(p.name(), os.path.basename(testfn)) | |
743 | |
744 # XXX | |
745 @unittest.skipIf(SUNOS, "broken on SUNOS") | |
746 @unittest.skipIf(AIX, "broken on AIX") | |
747 @unittest.skipIf(PYPY, "broken on PYPY") | |
748 def test_prog_w_funky_name(self): | |
749 # Test that name(), exe() and cmdline() correctly handle programs | |
750 # with funky chars such as spaces and ")", see: | |
751 # https://github.com/giampaolo/psutil/issues/628 | |
752 funky_path = self.get_testfn(suffix='foo bar )') | |
753 create_exe(funky_path) | |
754 cmdline = [funky_path, "-c", | |
755 "import time; [time.sleep(0.01) for x in range(3000)];" | |
756 "arg1", "arg2", "", "arg3", ""] | |
757 p = self.spawn_psproc(cmdline) | |
758 self.assertEqual(p.cmdline(), cmdline) | |
759 self.assertEqual(p.name(), os.path.basename(funky_path)) | |
760 self.assertEqual(os.path.normcase(p.exe()), | |
761 os.path.normcase(funky_path)) | |
762 | |
763 @unittest.skipIf(not POSIX, 'POSIX only') | |
764 def test_uids(self): | |
765 p = psutil.Process() | |
766 real, effective, saved = p.uids() | |
767 # os.getuid() refers to "real" uid | |
768 self.assertEqual(real, os.getuid()) | |
769 # os.geteuid() refers to "effective" uid | |
770 self.assertEqual(effective, os.geteuid()) | |
771 # No such thing as os.getsuid() ("saved" uid), but starting | |
772 # from python 2.7 we have os.getresuid() which returns all | |
773 # of them. | |
774 if hasattr(os, "getresuid"): | |
775 self.assertEqual(os.getresuid(), p.uids()) | |
776 | |
777 @unittest.skipIf(not POSIX, 'POSIX only') | |
778 def test_gids(self): | |
779 p = psutil.Process() | |
780 real, effective, saved = p.gids() | |
781 # os.getuid() refers to "real" uid | |
782 self.assertEqual(real, os.getgid()) | |
783 # os.geteuid() refers to "effective" uid | |
784 self.assertEqual(effective, os.getegid()) | |
785 # No such thing as os.getsgid() ("saved" gid), but starting | |
786 # from python 2.7 we have os.getresgid() which returns all | |
787 # of them. | |
788 if hasattr(os, "getresuid"): | |
789 self.assertEqual(os.getresgid(), p.gids()) | |
790 | |
791 def test_nice(self): | |
792 p = psutil.Process() | |
793 self.assertRaises(TypeError, p.nice, "str") | |
794 init = p.nice() | |
795 try: | |
796 if WINDOWS: | |
797 for prio in [psutil.NORMAL_PRIORITY_CLASS, | |
798 psutil.IDLE_PRIORITY_CLASS, | |
799 psutil.BELOW_NORMAL_PRIORITY_CLASS, | |
800 psutil.REALTIME_PRIORITY_CLASS, | |
801 psutil.HIGH_PRIORITY_CLASS, | |
802 psutil.ABOVE_NORMAL_PRIORITY_CLASS]: | |
803 with self.subTest(prio=prio): | |
804 try: | |
805 p.nice(prio) | |
806 except psutil.AccessDenied: | |
807 pass | |
808 else: | |
809 self.assertEqual(p.nice(), prio) | |
810 else: | |
811 try: | |
812 if hasattr(os, "getpriority"): | |
813 self.assertEqual( | |
814 os.getpriority(os.PRIO_PROCESS, os.getpid()), | |
815 p.nice()) | |
816 p.nice(1) | |
817 self.assertEqual(p.nice(), 1) | |
818 if hasattr(os, "getpriority"): | |
819 self.assertEqual( | |
820 os.getpriority(os.PRIO_PROCESS, os.getpid()), | |
821 p.nice()) | |
822 # XXX - going back to previous nice value raises | |
823 # AccessDenied on MACOS | |
824 if not MACOS: | |
825 p.nice(0) | |
826 self.assertEqual(p.nice(), 0) | |
827 except psutil.AccessDenied: | |
828 pass | |
829 finally: | |
830 try: | |
831 p.nice(init) | |
832 except psutil.AccessDenied: | |
833 pass | |
834 | |
835 def test_status(self): | |
836 p = psutil.Process() | |
837 self.assertEqual(p.status(), psutil.STATUS_RUNNING) | |
838 | |
839 def test_username(self): | |
840 p = self.spawn_psproc() | |
841 username = p.username() | |
842 if WINDOWS: | |
843 domain, username = username.split('\\') | |
844 self.assertEqual(username, getpass.getuser()) | |
845 if 'USERDOMAIN' in os.environ: | |
846 self.assertEqual(domain, os.environ['USERDOMAIN']) | |
847 else: | |
848 self.assertEqual(username, getpass.getuser()) | |
849 | |
850 def test_cwd(self): | |
851 p = self.spawn_psproc() | |
852 self.assertEqual(p.cwd(), os.getcwd()) | |
853 | |
854 def test_cwd_2(self): | |
855 cmd = [PYTHON_EXE, "-c", | |
856 "import os, time; os.chdir('..'); time.sleep(60)"] | |
857 p = self.spawn_psproc(cmd) | |
858 call_until(p.cwd, "ret == os.path.dirname(os.getcwd())") | |
859 | |
860 @unittest.skipIf(not HAS_CPU_AFFINITY, 'not supported') | |
861 def test_cpu_affinity(self): | |
862 p = psutil.Process() | |
863 initial = p.cpu_affinity() | |
864 assert initial, initial | |
865 self.addCleanup(p.cpu_affinity, initial) | |
866 | |
867 if hasattr(os, "sched_getaffinity"): | |
868 self.assertEqual(initial, list(os.sched_getaffinity(p.pid))) | |
869 self.assertEqual(len(initial), len(set(initial))) | |
870 | |
871 all_cpus = list(range(len(psutil.cpu_percent(percpu=True)))) | |
872 for n in all_cpus: | |
873 p.cpu_affinity([n]) | |
874 self.assertEqual(p.cpu_affinity(), [n]) | |
875 if hasattr(os, "sched_getaffinity"): | |
876 self.assertEqual(p.cpu_affinity(), | |
877 list(os.sched_getaffinity(p.pid))) | |
878 # also test num_cpu() | |
879 if hasattr(p, "num_cpu"): | |
880 self.assertEqual(p.cpu_affinity()[0], p.num_cpu()) | |
881 | |
882 # [] is an alias for "all eligible CPUs"; on Linux this may | |
883 # not be equal to all available CPUs, see: | |
884 # https://github.com/giampaolo/psutil/issues/956 | |
885 p.cpu_affinity([]) | |
886 if LINUX: | |
887 self.assertEqual(p.cpu_affinity(), p._proc._get_eligible_cpus()) | |
888 else: | |
889 self.assertEqual(p.cpu_affinity(), all_cpus) | |
890 if hasattr(os, "sched_getaffinity"): | |
891 self.assertEqual(p.cpu_affinity(), | |
892 list(os.sched_getaffinity(p.pid))) | |
893 # | |
894 self.assertRaises(TypeError, p.cpu_affinity, 1) | |
895 p.cpu_affinity(initial) | |
896 # it should work with all iterables, not only lists | |
897 p.cpu_affinity(set(all_cpus)) | |
898 p.cpu_affinity(tuple(all_cpus)) | |
899 | |
900 @unittest.skipIf(not HAS_CPU_AFFINITY, 'not supported') | |
901 def test_cpu_affinity_errs(self): | |
902 p = self.spawn_psproc() | |
903 invalid_cpu = [len(psutil.cpu_times(percpu=True)) + 10] | |
904 self.assertRaises(ValueError, p.cpu_affinity, invalid_cpu) | |
905 self.assertRaises(ValueError, p.cpu_affinity, range(10000, 11000)) | |
906 self.assertRaises(TypeError, p.cpu_affinity, [0, "1"]) | |
907 self.assertRaises(ValueError, p.cpu_affinity, [0, -1]) | |
908 | |
909 @unittest.skipIf(not HAS_CPU_AFFINITY, 'not supported') | |
910 def test_cpu_affinity_all_combinations(self): | |
911 p = psutil.Process() | |
912 initial = p.cpu_affinity() | |
913 assert initial, initial | |
914 self.addCleanup(p.cpu_affinity, initial) | |
915 | |
916 # All possible CPU set combinations. | |
917 if len(initial) > 12: | |
918 initial = initial[:12] # ...otherwise it will take forever | |
919 combos = [] | |
920 for i in range(0, len(initial) + 1): | |
921 for subset in itertools.combinations(initial, i): | |
922 if subset: | |
923 combos.append(list(subset)) | |
924 | |
925 for combo in combos: | |
926 p.cpu_affinity(combo) | |
927 self.assertEqual(sorted(p.cpu_affinity()), sorted(combo)) | |
928 | |
929 # TODO: #595 | |
930 @unittest.skipIf(BSD, "broken on BSD") | |
931 # can't find any process file on Appveyor | |
932 @unittest.skipIf(APPVEYOR, "unreliable on APPVEYOR") | |
933 def test_open_files(self): | |
934 p = psutil.Process() | |
935 testfn = self.get_testfn() | |
936 files = p.open_files() | |
937 self.assertNotIn(testfn, files) | |
938 with open(testfn, 'wb') as f: | |
939 f.write(b'x' * 1024) | |
940 f.flush() | |
941 # give the kernel some time to see the new file | |
942 files = call_until(p.open_files, "len(ret) != %i" % len(files)) | |
943 filenames = [os.path.normcase(x.path) for x in files] | |
944 self.assertIn(os.path.normcase(testfn), filenames) | |
945 if LINUX: | |
946 for file in files: | |
947 if file.path == testfn: | |
948 self.assertEqual(file.position, 1024) | |
949 for file in files: | |
950 assert os.path.isfile(file.path), file | |
951 | |
952 # another process | |
953 cmdline = "import time; f = open(r'%s', 'r'); time.sleep(60);" % testfn | |
954 p = self.spawn_psproc([PYTHON_EXE, "-c", cmdline]) | |
955 | |
956 for x in range(100): | |
957 filenames = [os.path.normcase(x.path) for x in p.open_files()] | |
958 if testfn in filenames: | |
959 break | |
960 time.sleep(.01) | |
961 else: | |
962 self.assertIn(os.path.normcase(testfn), filenames) | |
963 for file in filenames: | |
964 assert os.path.isfile(file), file | |
965 | |
966 # TODO: #595 | |
967 @unittest.skipIf(BSD, "broken on BSD") | |
968 # can't find any process file on Appveyor | |
969 @unittest.skipIf(APPVEYOR, "unreliable on APPVEYOR") | |
970 def test_open_files_2(self): | |
971 # test fd and path fields | |
972 p = psutil.Process() | |
973 normcase = os.path.normcase | |
974 testfn = self.get_testfn() | |
975 with open(testfn, 'w') as fileobj: | |
976 for file in p.open_files(): | |
977 if normcase(file.path) == normcase(fileobj.name) or \ | |
978 file.fd == fileobj.fileno(): | |
979 break | |
980 else: | |
981 self.fail("no file found; files=%s" % repr(p.open_files())) | |
982 self.assertEqual(normcase(file.path), normcase(fileobj.name)) | |
983 if WINDOWS: | |
984 self.assertEqual(file.fd, -1) | |
985 else: | |
986 self.assertEqual(file.fd, fileobj.fileno()) | |
987 # test positions | |
988 ntuple = p.open_files()[0] | |
989 self.assertEqual(ntuple[0], ntuple.path) | |
990 self.assertEqual(ntuple[1], ntuple.fd) | |
991 # test file is gone | |
992 self.assertNotIn(fileobj.name, p.open_files()) | |
993 | |
994 @unittest.skipIf(not POSIX, 'POSIX only') | |
995 def test_num_fds(self): | |
996 p = psutil.Process() | |
997 testfn = self.get_testfn() | |
998 start = p.num_fds() | |
999 file = open(testfn, 'w') | |
1000 self.addCleanup(file.close) | |
1001 self.assertEqual(p.num_fds(), start + 1) | |
1002 sock = socket.socket() | |
1003 self.addCleanup(sock.close) | |
1004 self.assertEqual(p.num_fds(), start + 2) | |
1005 file.close() | |
1006 sock.close() | |
1007 self.assertEqual(p.num_fds(), start) | |
1008 | |
1009 @skip_on_not_implemented(only_if=LINUX) | |
1010 @unittest.skipIf(OPENBSD or NETBSD, "not reliable on OPENBSD & NETBSD") | |
1011 def test_num_ctx_switches(self): | |
1012 p = psutil.Process() | |
1013 before = sum(p.num_ctx_switches()) | |
1014 for x in range(500000): | |
1015 after = sum(p.num_ctx_switches()) | |
1016 if after > before: | |
1017 return | |
1018 self.fail("num ctx switches still the same after 50.000 iterations") | |
1019 | |
1020 def test_ppid(self): | |
1021 p = psutil.Process() | |
1022 if hasattr(os, 'getppid'): | |
1023 self.assertEqual(p.ppid(), os.getppid()) | |
1024 p = self.spawn_psproc() | |
1025 self.assertEqual(p.ppid(), os.getpid()) | |
1026 if APPVEYOR: | |
1027 # Occasional failures, see: | |
1028 # https://ci.appveyor.com/project/giampaolo/psutil/build/ | |
1029 # job/0hs623nenj7w4m33 | |
1030 return | |
1031 | |
1032 def test_parent(self): | |
1033 p = self.spawn_psproc() | |
1034 self.assertEqual(p.parent().pid, os.getpid()) | |
1035 | |
1036 lowest_pid = psutil.pids()[0] | |
1037 self.assertIsNone(psutil.Process(lowest_pid).parent()) | |
1038 | |
1039 def test_parent_multi(self): | |
1040 parent = psutil.Process() | |
1041 child, grandchild = self.spawn_children_pair() | |
1042 self.assertEqual(grandchild.parent(), child) | |
1043 self.assertEqual(child.parent(), parent) | |
1044 | |
1045 def test_parent_disappeared(self): | |
1046 # Emulate a case where the parent process disappeared. | |
1047 p = self.spawn_psproc() | |
1048 with mock.patch("psutil.Process", | |
1049 side_effect=psutil.NoSuchProcess(0, 'foo')): | |
1050 self.assertIsNone(p.parent()) | |
1051 | |
1052 @retry_on_failure() | |
1053 def test_parents(self): | |
1054 parent = psutil.Process() | |
1055 assert parent.parents() | |
1056 child, grandchild = self.spawn_children_pair() | |
1057 self.assertEqual(child.parents()[0], parent) | |
1058 self.assertEqual(grandchild.parents()[0], child) | |
1059 self.assertEqual(grandchild.parents()[1], parent) | |
1060 | |
1061 def test_children(self): | |
1062 parent = psutil.Process() | |
1063 self.assertEqual(parent.children(), []) | |
1064 self.assertEqual(parent.children(recursive=True), []) | |
1065 # On Windows we set the flag to 0 in order to cancel out the | |
1066 # CREATE_NO_WINDOW flag (enabled by default) which creates | |
1067 # an extra "conhost.exe" child. | |
1068 child = self.spawn_psproc(creationflags=0) | |
1069 children1 = parent.children() | |
1070 children2 = parent.children(recursive=True) | |
1071 for children in (children1, children2): | |
1072 self.assertEqual(len(children), 1) | |
1073 self.assertEqual(children[0].pid, child.pid) | |
1074 self.assertEqual(children[0].ppid(), parent.pid) | |
1075 | |
1076 def test_children_recursive(self): | |
1077 # Test children() against two sub processes, p1 and p2, where | |
1078 # p1 (our child) spawned p2 (our grandchild). | |
1079 parent = psutil.Process() | |
1080 child, grandchild = self.spawn_children_pair() | |
1081 self.assertEqual(parent.children(), [child]) | |
1082 self.assertEqual(parent.children(recursive=True), [child, grandchild]) | |
1083 # If the intermediate process is gone there's no way for | |
1084 # children() to recursively find it. | |
1085 child.terminate() | |
1086 child.wait() | |
1087 self.assertEqual(parent.children(recursive=True), []) | |
1088 | |
1089 def test_children_duplicates(self): | |
1090 # find the process which has the highest number of children | |
1091 table = collections.defaultdict(int) | |
1092 for p in psutil.process_iter(): | |
1093 try: | |
1094 table[p.ppid()] += 1 | |
1095 except psutil.Error: | |
1096 pass | |
1097 # this is the one, now let's make sure there are no duplicates | |
1098 pid = sorted(table.items(), key=lambda x: x[1])[-1][0] | |
1099 if LINUX and pid == 0: | |
1100 raise self.skipTest("PID 0") | |
1101 p = psutil.Process(pid) | |
1102 try: | |
1103 c = p.children(recursive=True) | |
1104 except psutil.AccessDenied: # windows | |
1105 pass | |
1106 else: | |
1107 self.assertEqual(len(c), len(set(c))) | |
1108 | |
1109 def test_parents_and_children(self): | |
1110 parent = psutil.Process() | |
1111 child, grandchild = self.spawn_children_pair() | |
1112 # forward | |
1113 children = parent.children(recursive=True) | |
1114 self.assertEqual(len(children), 2) | |
1115 self.assertEqual(children[0], child) | |
1116 self.assertEqual(children[1], grandchild) | |
1117 # backward | |
1118 parents = grandchild.parents() | |
1119 self.assertEqual(parents[0], child) | |
1120 self.assertEqual(parents[1], parent) | |
1121 | |
1122 def test_suspend_resume(self): | |
1123 p = self.spawn_psproc() | |
1124 p.suspend() | |
1125 for x in range(100): | |
1126 if p.status() == psutil.STATUS_STOPPED: | |
1127 break | |
1128 time.sleep(0.01) | |
1129 p.resume() | |
1130 self.assertNotEqual(p.status(), psutil.STATUS_STOPPED) | |
1131 | |
1132 def test_invalid_pid(self): | |
1133 self.assertRaises(TypeError, psutil.Process, "1") | |
1134 self.assertRaises(ValueError, psutil.Process, -1) | |
1135 | |
1136 def test_as_dict(self): | |
1137 p = psutil.Process() | |
1138 d = p.as_dict(attrs=['exe', 'name']) | |
1139 self.assertEqual(sorted(d.keys()), ['exe', 'name']) | |
1140 | |
1141 p = psutil.Process(min(psutil.pids())) | |
1142 d = p.as_dict(attrs=['connections'], ad_value='foo') | |
1143 if not isinstance(d['connections'], list): | |
1144 self.assertEqual(d['connections'], 'foo') | |
1145 | |
1146 # Test ad_value is set on AccessDenied. | |
1147 with mock.patch('psutil.Process.nice', create=True, | |
1148 side_effect=psutil.AccessDenied): | |
1149 self.assertEqual( | |
1150 p.as_dict(attrs=["nice"], ad_value=1), {"nice": 1}) | |
1151 | |
1152 # Test that NoSuchProcess bubbles up. | |
1153 with mock.patch('psutil.Process.nice', create=True, | |
1154 side_effect=psutil.NoSuchProcess(p.pid, "name")): | |
1155 self.assertRaises( | |
1156 psutil.NoSuchProcess, p.as_dict, attrs=["nice"]) | |
1157 | |
1158 # Test that ZombieProcess is swallowed. | |
1159 with mock.patch('psutil.Process.nice', create=True, | |
1160 side_effect=psutil.ZombieProcess(p.pid, "name")): | |
1161 self.assertEqual( | |
1162 p.as_dict(attrs=["nice"], ad_value="foo"), {"nice": "foo"}) | |
1163 | |
1164 # By default APIs raising NotImplementedError are | |
1165 # supposed to be skipped. | |
1166 with mock.patch('psutil.Process.nice', create=True, | |
1167 side_effect=NotImplementedError): | |
1168 d = p.as_dict() | |
1169 self.assertNotIn('nice', list(d.keys())) | |
1170 # ...unless the user explicitly asked for some attr. | |
1171 with self.assertRaises(NotImplementedError): | |
1172 p.as_dict(attrs=["nice"]) | |
1173 | |
1174 # errors | |
1175 with self.assertRaises(TypeError): | |
1176 p.as_dict('name') | |
1177 with self.assertRaises(ValueError): | |
1178 p.as_dict(['foo']) | |
1179 with self.assertRaises(ValueError): | |
1180 p.as_dict(['foo', 'bar']) | |
1181 | |
1182 def test_oneshot(self): | |
1183 p = psutil.Process() | |
1184 with mock.patch("psutil._psplatform.Process.cpu_times") as m: | |
1185 with p.oneshot(): | |
1186 p.cpu_times() | |
1187 p.cpu_times() | |
1188 self.assertEqual(m.call_count, 1) | |
1189 | |
1190 with mock.patch("psutil._psplatform.Process.cpu_times") as m: | |
1191 p.cpu_times() | |
1192 p.cpu_times() | |
1193 self.assertEqual(m.call_count, 2) | |
1194 | |
1195 def test_oneshot_twice(self): | |
1196 # Test the case where the ctx manager is __enter__ed twice. | |
1197 # The second __enter__ is supposed to resut in a NOOP. | |
1198 p = psutil.Process() | |
1199 with mock.patch("psutil._psplatform.Process.cpu_times") as m1: | |
1200 with mock.patch("psutil._psplatform.Process.oneshot_enter") as m2: | |
1201 with p.oneshot(): | |
1202 p.cpu_times() | |
1203 p.cpu_times() | |
1204 with p.oneshot(): | |
1205 p.cpu_times() | |
1206 p.cpu_times() | |
1207 self.assertEqual(m1.call_count, 1) | |
1208 self.assertEqual(m2.call_count, 1) | |
1209 | |
1210 with mock.patch("psutil._psplatform.Process.cpu_times") as m: | |
1211 p.cpu_times() | |
1212 p.cpu_times() | |
1213 self.assertEqual(m.call_count, 2) | |
1214 | |
1215 def test_oneshot_cache(self): | |
1216 # Make sure oneshot() cache is nonglobal. Instead it's | |
1217 # supposed to be bound to the Process instance, see: | |
1218 # https://github.com/giampaolo/psutil/issues/1373 | |
1219 p1, p2 = self.spawn_children_pair() | |
1220 p1_ppid = p1.ppid() | |
1221 p2_ppid = p2.ppid() | |
1222 self.assertNotEqual(p1_ppid, p2_ppid) | |
1223 with p1.oneshot(): | |
1224 self.assertEqual(p1.ppid(), p1_ppid) | |
1225 self.assertEqual(p2.ppid(), p2_ppid) | |
1226 with p2.oneshot(): | |
1227 self.assertEqual(p1.ppid(), p1_ppid) | |
1228 self.assertEqual(p2.ppid(), p2_ppid) | |
1229 | |
1230 def test_halfway_terminated_process(self): | |
1231 # Test that NoSuchProcess exception gets raised in case the | |
1232 # process dies after we create the Process object. | |
1233 # Example: | |
1234 # >>> proc = Process(1234) | |
1235 # >>> time.sleep(2) # time-consuming task, process dies in meantime | |
1236 # >>> proc.name() | |
1237 # Refers to Issue #15 | |
1238 def assert_raises_nsp(fun, fun_name): | |
1239 try: | |
1240 ret = fun() | |
1241 except psutil.ZombieProcess: # differentiate from NSP | |
1242 raise | |
1243 except psutil.NoSuchProcess: | |
1244 pass | |
1245 except psutil.AccessDenied: | |
1246 if OPENBSD and fun_name in ('threads', 'num_threads'): | |
1247 return | |
1248 raise | |
1249 else: | |
1250 # NtQuerySystemInformation succeeds even if process is gone. | |
1251 if WINDOWS and fun_name in ('exe', 'name'): | |
1252 return | |
1253 raise self.fail("%r didn't raise NSP and returned %r " | |
1254 "instead" % (fun, ret)) | |
1255 | |
1256 p = self.spawn_psproc() | |
1257 p.terminate() | |
1258 p.wait() | |
1259 if WINDOWS: # XXX | |
1260 call_until(psutil.pids, "%s not in ret" % p.pid) | |
1261 self.assertProcessGone(p) | |
1262 | |
1263 ns = process_namespace(p) | |
1264 for fun, name in ns.iter(ns.all): | |
1265 assert_raises_nsp(fun, name) | |
1266 | |
1267 # NtQuerySystemInformation succeeds even if process is gone. | |
1268 if WINDOWS and not GITHUB_ACTIONS: | |
1269 normcase = os.path.normcase | |
1270 self.assertEqual(normcase(p.exe()), normcase(PYTHON_EXE)) | |
1271 | |
1272 @unittest.skipIf(not POSIX, 'POSIX only') | |
1273 def test_zombie_process(self): | |
1274 def succeed_or_zombie_p_exc(fun): | |
1275 try: | |
1276 return fun() | |
1277 except (psutil.ZombieProcess, psutil.AccessDenied): | |
1278 pass | |
1279 | |
1280 parent, zombie = self.spawn_zombie() | |
1281 # A zombie process should always be instantiable | |
1282 zproc = psutil.Process(zombie.pid) | |
1283 # ...and at least its status always be querable | |
1284 self.assertEqual(zproc.status(), psutil.STATUS_ZOMBIE) | |
1285 # ...and it should be considered 'running' | |
1286 assert zproc.is_running() | |
1287 # ...and as_dict() shouldn't crash | |
1288 zproc.as_dict() | |
1289 # ...its parent should 'see' it (edit: not true on BSD and MACOS | |
1290 # descendants = [x.pid for x in psutil.Process().children( | |
1291 # recursive=True)] | |
1292 # self.assertIn(zpid, descendants) | |
1293 # XXX should we also assume ppid be usable? Note: this | |
1294 # would be an important use case as the only way to get | |
1295 # rid of a zombie is to kill its parent. | |
1296 # self.assertEqual(zpid.ppid(), os.getpid()) | |
1297 # ...and all other APIs should be able to deal with it | |
1298 | |
1299 ns = process_namespace(zproc) | |
1300 for fun, name in ns.iter(ns.all): | |
1301 succeed_or_zombie_p_exc(fun) | |
1302 | |
1303 assert psutil.pid_exists(zproc.pid) | |
1304 self.assertIn(zproc.pid, psutil.pids()) | |
1305 self.assertIn(zproc.pid, [x.pid for x in psutil.process_iter()]) | |
1306 psutil._pmap = {} | |
1307 self.assertIn(zproc.pid, [x.pid for x in psutil.process_iter()]) | |
1308 | |
1309 @unittest.skipIf(not POSIX, 'POSIX only') | |
1310 def test_zombie_process_is_running_w_exc(self): | |
1311 # Emulate a case where internally is_running() raises | |
1312 # ZombieProcess. | |
1313 p = psutil.Process() | |
1314 with mock.patch("psutil.Process", | |
1315 side_effect=psutil.ZombieProcess(0)) as m: | |
1316 assert p.is_running() | |
1317 assert m.called | |
1318 | |
1319 @unittest.skipIf(not POSIX, 'POSIX only') | |
1320 def test_zombie_process_status_w_exc(self): | |
1321 # Emulate a case where internally status() raises | |
1322 # ZombieProcess. | |
1323 p = psutil.Process() | |
1324 with mock.patch("psutil._psplatform.Process.status", | |
1325 side_effect=psutil.ZombieProcess(0)) as m: | |
1326 self.assertEqual(p.status(), psutil.STATUS_ZOMBIE) | |
1327 assert m.called | |
1328 | |
1329 def test_pid_0(self): | |
1330 # Process(0) is supposed to work on all platforms except Linux | |
1331 if 0 not in psutil.pids(): | |
1332 self.assertRaises(psutil.NoSuchProcess, psutil.Process, 0) | |
1333 # These 2 are a contradiction, but "ps" says PID 1's parent | |
1334 # is PID 0. | |
1335 assert not psutil.pid_exists(0) | |
1336 self.assertEqual(psutil.Process(1).ppid(), 0) | |
1337 return | |
1338 | |
1339 p = psutil.Process(0) | |
1340 exc = psutil.AccessDenied if WINDOWS else ValueError | |
1341 self.assertRaises(exc, p.wait) | |
1342 self.assertRaises(exc, p.terminate) | |
1343 self.assertRaises(exc, p.suspend) | |
1344 self.assertRaises(exc, p.resume) | |
1345 self.assertRaises(exc, p.kill) | |
1346 self.assertRaises(exc, p.send_signal, signal.SIGTERM) | |
1347 | |
1348 # test all methods | |
1349 ns = process_namespace(p) | |
1350 for fun, name in ns.iter(ns.getters + ns.setters): | |
1351 try: | |
1352 ret = fun() | |
1353 except psutil.AccessDenied: | |
1354 pass | |
1355 else: | |
1356 if name in ("uids", "gids"): | |
1357 self.assertEqual(ret.real, 0) | |
1358 elif name == "username": | |
1359 user = 'NT AUTHORITY\\SYSTEM' if WINDOWS else 'root' | |
1360 self.assertEqual(p.username(), user) | |
1361 elif name == "name": | |
1362 assert name, name | |
1363 | |
1364 if not OPENBSD: | |
1365 self.assertIn(0, psutil.pids()) | |
1366 assert psutil.pid_exists(0) | |
1367 | |
1368 @unittest.skipIf(not HAS_ENVIRON, "not supported") | |
1369 def test_environ(self): | |
1370 def clean_dict(d): | |
1371 # Most of these are problematic on Travis. | |
1372 d.pop("PSUTIL_TESTING", None) | |
1373 d.pop("PLAT", None) | |
1374 d.pop("HOME", None) | |
1375 if MACOS: | |
1376 d.pop("__CF_USER_TEXT_ENCODING", None) | |
1377 d.pop("VERSIONER_PYTHON_PREFER_32_BIT", None) | |
1378 d.pop("VERSIONER_PYTHON_VERSION", None) | |
1379 return dict( | |
1380 [(k.replace("\r", "").replace("\n", ""), | |
1381 v.replace("\r", "").replace("\n", "")) | |
1382 for k, v in d.items()]) | |
1383 | |
1384 self.maxDiff = None | |
1385 p = psutil.Process() | |
1386 d1 = clean_dict(p.environ()) | |
1387 d2 = clean_dict(os.environ.copy()) | |
1388 if not OSX and GITHUB_ACTIONS: | |
1389 self.assertEqual(d1, d2) | |
1390 | |
1391 @unittest.skipIf(not HAS_ENVIRON, "not supported") | |
1392 @unittest.skipIf(not POSIX, "POSIX only") | |
1393 def test_weird_environ(self): | |
1394 # environment variables can contain values without an equals sign | |
1395 code = textwrap.dedent(""" | |
1396 #include <unistd.h> | |
1397 #include <fcntl.h> | |
1398 char * const argv[] = {"cat", 0}; | |
1399 char * const envp[] = {"A=1", "X", "C=3", 0}; | |
1400 int main(void) { | |
1401 /* Close stderr on exec so parent can wait for the execve to | |
1402 * finish. */ | |
1403 if (fcntl(2, F_SETFD, FD_CLOEXEC) != 0) | |
1404 return 0; | |
1405 return execve("/bin/cat", argv, envp); | |
1406 } | |
1407 """) | |
1408 path = self.get_testfn() | |
1409 create_exe(path, c_code=code) | |
1410 sproc = self.spawn_testproc( | |
1411 [path], stdin=subprocess.PIPE, stderr=subprocess.PIPE) | |
1412 p = psutil.Process(sproc.pid) | |
1413 wait_for_pid(p.pid) | |
1414 assert p.is_running() | |
1415 # Wait for process to exec or exit. | |
1416 self.assertEqual(sproc.stderr.read(), b"") | |
1417 if MACOS and CI_TESTING: | |
1418 try: | |
1419 env = p.environ() | |
1420 except psutil.AccessDenied: | |
1421 # XXX: fails sometimes with: | |
1422 # PermissionError from 'sysctl(KERN_PROCARGS2) -> EIO' | |
1423 return | |
1424 else: | |
1425 env = p.environ() | |
1426 self.assertEqual(env, {"A": "1", "C": "3"}) | |
1427 sproc.communicate() | |
1428 self.assertEqual(sproc.returncode, 0) | |
1429 | |
1430 | |
1431 # =================================================================== | |
1432 # --- Limited user tests | |
1433 # =================================================================== | |
1434 | |
1435 | |
1436 if POSIX and os.getuid() == 0: | |
1437 | |
1438 class LimitedUserTestCase(TestProcess): | |
1439 """Repeat the previous tests by using a limited user. | |
1440 Executed only on UNIX and only if the user who run the test script | |
1441 is root. | |
1442 """ | |
1443 # the uid/gid the test suite runs under | |
1444 if hasattr(os, 'getuid'): | |
1445 PROCESS_UID = os.getuid() | |
1446 PROCESS_GID = os.getgid() | |
1447 | |
1448 def __init__(self, *args, **kwargs): | |
1449 super().__init__(*args, **kwargs) | |
1450 # re-define all existent test methods in order to | |
1451 # ignore AccessDenied exceptions | |
1452 for attr in [x for x in dir(self) if x.startswith('test')]: | |
1453 meth = getattr(self, attr) | |
1454 | |
1455 def test_(self): | |
1456 try: | |
1457 meth() | |
1458 except psutil.AccessDenied: | |
1459 pass | |
1460 setattr(self, attr, types.MethodType(test_, self)) | |
1461 | |
1462 def setUp(self): | |
1463 super().setUp() | |
1464 os.setegid(1000) | |
1465 os.seteuid(1000) | |
1466 | |
1467 def tearDown(self): | |
1468 os.setegid(self.PROCESS_UID) | |
1469 os.seteuid(self.PROCESS_GID) | |
1470 super().tearDown() | |
1471 | |
1472 def test_nice(self): | |
1473 try: | |
1474 psutil.Process().nice(-1) | |
1475 except psutil.AccessDenied: | |
1476 pass | |
1477 else: | |
1478 self.fail("exception not raised") | |
1479 | |
1480 @unittest.skipIf(1, "causes problem as root") | |
1481 def test_zombie_process(self): | |
1482 pass | |
1483 | |
1484 | |
1485 # =================================================================== | |
1486 # --- psutil.Popen tests | |
1487 # =================================================================== | |
1488 | |
1489 | |
1490 class TestPopen(PsutilTestCase): | |
1491 """Tests for psutil.Popen class.""" | |
1492 | |
1493 @classmethod | |
1494 def tearDownClass(cls): | |
1495 reap_children() | |
1496 | |
1497 def test_misc(self): | |
1498 # XXX this test causes a ResourceWarning on Python 3 because | |
1499 # psutil.__subproc instance doesn't get propertly freed. | |
1500 # Not sure what to do though. | |
1501 cmd = [PYTHON_EXE, "-c", "import time; time.sleep(60);"] | |
1502 with psutil.Popen(cmd, stdout=subprocess.PIPE, | |
1503 stderr=subprocess.PIPE) as proc: | |
1504 proc.name() | |
1505 proc.cpu_times() | |
1506 proc.stdin | |
1507 self.assertTrue(dir(proc)) | |
1508 self.assertRaises(AttributeError, getattr, proc, 'foo') | |
1509 proc.terminate() | |
1510 if POSIX: | |
1511 self.assertEqual(proc.wait(5), -signal.SIGTERM) | |
1512 else: | |
1513 self.assertEqual(proc.wait(5), signal.SIGTERM) | |
1514 | |
1515 def test_ctx_manager(self): | |
1516 with psutil.Popen([PYTHON_EXE, "-V"], | |
1517 stdout=subprocess.PIPE, | |
1518 stderr=subprocess.PIPE, | |
1519 stdin=subprocess.PIPE) as proc: | |
1520 proc.communicate() | |
1521 assert proc.stdout.closed | |
1522 assert proc.stderr.closed | |
1523 assert proc.stdin.closed | |
1524 self.assertEqual(proc.returncode, 0) | |
1525 | |
1526 def test_kill_terminate(self): | |
1527 # subprocess.Popen()'s terminate(), kill() and send_signal() do | |
1528 # not raise exception after the process is gone. psutil.Popen | |
1529 # diverges from that. | |
1530 cmd = [PYTHON_EXE, "-c", "import time; time.sleep(60);"] | |
1531 with psutil.Popen(cmd, stdout=subprocess.PIPE, | |
1532 stderr=subprocess.PIPE) as proc: | |
1533 proc.terminate() | |
1534 proc.wait() | |
1535 self.assertRaises(psutil.NoSuchProcess, proc.terminate) | |
1536 self.assertRaises(psutil.NoSuchProcess, proc.kill) | |
1537 self.assertRaises(psutil.NoSuchProcess, proc.send_signal, | |
1538 signal.SIGTERM) | |
1539 if WINDOWS and sys.version_info >= (2, 7): | |
1540 self.assertRaises(psutil.NoSuchProcess, proc.send_signal, | |
1541 signal.CTRL_C_EVENT) | |
1542 self.assertRaises(psutil.NoSuchProcess, proc.send_signal, | |
1543 signal.CTRL_BREAK_EVENT) | |
1544 | |
1545 | |
1546 if __name__ == '__main__': | |
1547 from psutil.tests.runner import run_from_name | |
1548 run_from_name(__file__) |