comparison env/lib/python3.9/site-packages/psutil/tests/test_process.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
4 # Use of this source code is governed by a BSD-style license that can be
5 # found in the LICENSE file.
6
7 """Tests for psutil.Process class."""
8
9 import collections
10 import errno
11 import getpass
12 import itertools
13 import os
14 import signal
15 import socket
16 import stat
17 import subprocess
18 import sys
19 import textwrap
20 import time
21 import types
22
23 import psutil
24
25 from psutil import AIX
26 from psutil import BSD
27 from psutil import LINUX
28 from psutil import MACOS
29 from psutil import NETBSD
30 from psutil import OPENBSD
31 from psutil import OSX
32 from psutil import POSIX
33 from psutil import SUNOS
34 from psutil import WINDOWS
35 from psutil._common import open_text
36 from psutil._compat import FileNotFoundError
37 from psutil._compat import long
38 from psutil._compat import PY3
39 from psutil._compat import super
40 from psutil.tests import APPVEYOR
41 from psutil.tests import call_until
42 from psutil.tests import CI_TESTING
43 from psutil.tests import copyload_shared_lib
44 from psutil.tests import create_exe
45 from psutil.tests import GITHUB_ACTIONS
46 from psutil.tests import GLOBAL_TIMEOUT
47 from psutil.tests import HAS_CPU_AFFINITY
48 from psutil.tests import HAS_ENVIRON
49 from psutil.tests import HAS_IONICE
50 from psutil.tests import HAS_MEMORY_MAPS
51 from psutil.tests import HAS_PROC_CPU_NUM
52 from psutil.tests import HAS_PROC_IO_COUNTERS
53 from psutil.tests import HAS_RLIMIT
54 from psutil.tests import HAS_THREADS
55 from psutil.tests import mock
56 from psutil.tests import process_namespace
57 from psutil.tests import PsutilTestCase
58 from psutil.tests import PYPY
59 from psutil.tests import PYTHON_EXE
60 from psutil.tests import reap_children
61 from psutil.tests import retry_on_failure
62 from psutil.tests import sh
63 from psutil.tests import skip_on_access_denied
64 from psutil.tests import skip_on_not_implemented
65 from psutil.tests import ThreadTask
66 from psutil.tests import unittest
67 from psutil.tests import wait_for_pid
68
69
70 # ===================================================================
71 # --- psutil.Process class tests
72 # ===================================================================
73
74
75 class TestProcess(PsutilTestCase):
76 """Tests for psutil.Process class."""
77
78 def spawn_psproc(self, *args, **kwargs):
79 sproc = self.spawn_testproc(*args, **kwargs)
80 return psutil.Process(sproc.pid)
81
82 # ---
83
84 def test_pid(self):
85 p = psutil.Process()
86 self.assertEqual(p.pid, os.getpid())
87 with self.assertRaises(AttributeError):
88 p.pid = 33
89
90 def test_kill(self):
91 p = self.spawn_psproc()
92 p.kill()
93 code = p.wait()
94 if WINDOWS:
95 self.assertEqual(code, signal.SIGTERM)
96 else:
97 self.assertEqual(code, -signal.SIGKILL)
98 self.assertProcessGone(p)
99
100 def test_terminate(self):
101 p = self.spawn_psproc()
102 p.terminate()
103 code = p.wait()
104 if WINDOWS:
105 self.assertEqual(code, signal.SIGTERM)
106 else:
107 self.assertEqual(code, -signal.SIGTERM)
108 self.assertProcessGone(p)
109
110 def test_send_signal(self):
111 sig = signal.SIGKILL if POSIX else signal.SIGTERM
112 p = self.spawn_psproc()
113 p.send_signal(sig)
114 code = p.wait()
115 if WINDOWS:
116 self.assertEqual(code, sig)
117 else:
118 self.assertEqual(code, -sig)
119 self.assertProcessGone(p)
120
121 @unittest.skipIf(not POSIX, "not POSIX")
122 def test_send_signal_mocked(self):
123 sig = signal.SIGTERM
124 p = self.spawn_psproc()
125 with mock.patch('psutil.os.kill',
126 side_effect=OSError(errno.ESRCH, "")):
127 self.assertRaises(psutil.NoSuchProcess, p.send_signal, sig)
128
129 p = self.spawn_psproc()
130 with mock.patch('psutil.os.kill',
131 side_effect=OSError(errno.EPERM, "")):
132 self.assertRaises(psutil.AccessDenied, p.send_signal, sig)
133
134 def test_wait_exited(self):
135 # Test waitpid() + WIFEXITED -> WEXITSTATUS.
136 # normal return, same as exit(0)
137 cmd = [PYTHON_EXE, "-c", "pass"]
138 p = self.spawn_psproc(cmd)
139 code = p.wait()
140 self.assertEqual(code, 0)
141 self.assertProcessGone(p)
142 # exit(1), implicit in case of error
143 cmd = [PYTHON_EXE, "-c", "1 / 0"]
144 p = self.spawn_psproc(cmd, stderr=subprocess.PIPE)
145 code = p.wait()
146 self.assertEqual(code, 1)
147 self.assertProcessGone(p)
148 # via sys.exit()
149 cmd = [PYTHON_EXE, "-c", "import sys; sys.exit(5);"]
150 p = self.spawn_psproc(cmd)
151 code = p.wait()
152 self.assertEqual(code, 5)
153 self.assertProcessGone(p)
154 # via os._exit()
155 cmd = [PYTHON_EXE, "-c", "import os; os._exit(5);"]
156 p = self.spawn_psproc(cmd)
157 code = p.wait()
158 self.assertEqual(code, 5)
159 self.assertProcessGone(p)
160
161 def test_wait_stopped(self):
162 p = self.spawn_psproc()
163 if POSIX:
164 # Test waitpid() + WIFSTOPPED and WIFCONTINUED.
165 # Note: if a process is stopped it ignores SIGTERM.
166 p.send_signal(signal.SIGSTOP)
167 self.assertRaises(psutil.TimeoutExpired, p.wait, timeout=0.001)
168 p.send_signal(signal.SIGCONT)
169 self.assertRaises(psutil.TimeoutExpired, p.wait, timeout=0.001)
170 p.send_signal(signal.SIGTERM)
171 self.assertEqual(p.wait(), -signal.SIGTERM)
172 self.assertEqual(p.wait(), -signal.SIGTERM)
173 else:
174 p.suspend()
175 self.assertRaises(psutil.TimeoutExpired, p.wait, timeout=0.001)
176 p.resume()
177 self.assertRaises(psutil.TimeoutExpired, p.wait, timeout=0.001)
178 p.terminate()
179 self.assertEqual(p.wait(), signal.SIGTERM)
180 self.assertEqual(p.wait(), signal.SIGTERM)
181
182 def test_wait_non_children(self):
183 # Test wait() against a process which is not our direct
184 # child.
185 child, grandchild = self.spawn_children_pair()
186 self.assertRaises(psutil.TimeoutExpired, child.wait, 0.01)
187 self.assertRaises(psutil.TimeoutExpired, grandchild.wait, 0.01)
188 # We also terminate the direct child otherwise the
189 # grandchild will hang until the parent is gone.
190 child.terminate()
191 grandchild.terminate()
192 child_ret = child.wait()
193 grandchild_ret = grandchild.wait()
194 if POSIX:
195 self.assertEqual(child_ret, -signal.SIGTERM)
196 # For processes which are not our children we're supposed
197 # to get None.
198 self.assertEqual(grandchild_ret, None)
199 else:
200 self.assertEqual(child_ret, signal.SIGTERM)
201 self.assertEqual(child_ret, signal.SIGTERM)
202
203 def test_wait_timeout(self):
204 p = self.spawn_psproc()
205 p.name()
206 self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01)
207 self.assertRaises(psutil.TimeoutExpired, p.wait, 0)
208 self.assertRaises(ValueError, p.wait, -1)
209
210 def test_wait_timeout_nonblocking(self):
211 p = self.spawn_psproc()
212 self.assertRaises(psutil.TimeoutExpired, p.wait, 0)
213 p.kill()
214 stop_at = time.time() + GLOBAL_TIMEOUT
215 while time.time() < stop_at:
216 try:
217 code = p.wait(0)
218 break
219 except psutil.TimeoutExpired:
220 pass
221 else:
222 raise self.fail('timeout')
223 if POSIX:
224 self.assertEqual(code, -signal.SIGKILL)
225 else:
226 self.assertEqual(code, signal.SIGTERM)
227 self.assertProcessGone(p)
228
229 def test_cpu_percent(self):
230 p = psutil.Process()
231 p.cpu_percent(interval=0.001)
232 p.cpu_percent(interval=0.001)
233 for x in range(100):
234 percent = p.cpu_percent(interval=None)
235 self.assertIsInstance(percent, float)
236 self.assertGreaterEqual(percent, 0.0)
237 with self.assertRaises(ValueError):
238 p.cpu_percent(interval=-1)
239
240 def test_cpu_percent_numcpus_none(self):
241 # See: https://github.com/giampaolo/psutil/issues/1087
242 with mock.patch('psutil.cpu_count', return_value=None) as m:
243 psutil.Process().cpu_percent()
244 assert m.called
245
246 def test_cpu_times(self):
247 times = psutil.Process().cpu_times()
248 assert (times.user > 0.0) or (times.system > 0.0), times
249 assert (times.children_user >= 0.0), times
250 assert (times.children_system >= 0.0), times
251 if LINUX:
252 assert times.iowait >= 0.0, times
253 # make sure returned values can be pretty printed with strftime
254 for name in times._fields:
255 time.strftime("%H:%M:%S", time.localtime(getattr(times, name)))
256
257 def test_cpu_times_2(self):
258 user_time, kernel_time = psutil.Process().cpu_times()[:2]
259 utime, ktime = os.times()[:2]
260
261 # Use os.times()[:2] as base values to compare our results
262 # using a tolerance of +/- 0.1 seconds.
263 # It will fail if the difference between the values is > 0.1s.
264 if (max([user_time, utime]) - min([user_time, utime])) > 0.1:
265 self.fail("expected: %s, found: %s" % (utime, user_time))
266
267 if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1:
268 self.fail("expected: %s, found: %s" % (ktime, kernel_time))
269
270 @unittest.skipIf(not HAS_PROC_CPU_NUM, "not supported")
271 def test_cpu_num(self):
272 p = psutil.Process()
273 num = p.cpu_num()
274 self.assertGreaterEqual(num, 0)
275 if psutil.cpu_count() == 1:
276 self.assertEqual(num, 0)
277 self.assertIn(p.cpu_num(), range(psutil.cpu_count()))
278
279 def test_create_time(self):
280 p = self.spawn_psproc()
281 now = time.time()
282 create_time = p.create_time()
283
284 # Use time.time() as base value to compare our result using a
285 # tolerance of +/- 1 second.
286 # It will fail if the difference between the values is > 2s.
287 difference = abs(create_time - now)
288 if difference > 2:
289 self.fail("expected: %s, found: %s, difference: %s"
290 % (now, create_time, difference))
291
292 # make sure returned value can be pretty printed with strftime
293 time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time()))
294
295 @unittest.skipIf(not POSIX, 'POSIX only')
296 def test_terminal(self):
297 terminal = psutil.Process().terminal()
298 if terminal is not None:
299 tty = os.path.realpath(sh('tty'))
300 self.assertEqual(terminal, tty)
301
302 @unittest.skipIf(not HAS_PROC_IO_COUNTERS, 'not supported')
303 @skip_on_not_implemented(only_if=LINUX)
304 def test_io_counters(self):
305 p = psutil.Process()
306 # test reads
307 io1 = p.io_counters()
308 with open(PYTHON_EXE, 'rb') as f:
309 f.read()
310 io2 = p.io_counters()
311 if not BSD and not AIX:
312 self.assertGreater(io2.read_count, io1.read_count)
313 self.assertEqual(io2.write_count, io1.write_count)
314 if LINUX:
315 self.assertGreater(io2.read_chars, io1.read_chars)
316 self.assertEqual(io2.write_chars, io1.write_chars)
317 else:
318 self.assertGreaterEqual(io2.read_bytes, io1.read_bytes)
319 self.assertGreaterEqual(io2.write_bytes, io1.write_bytes)
320
321 # test writes
322 io1 = p.io_counters()
323 with open(self.get_testfn(), 'wb') as f:
324 if PY3:
325 f.write(bytes("x" * 1000000, 'ascii'))
326 else:
327 f.write("x" * 1000000)
328 io2 = p.io_counters()
329 self.assertGreaterEqual(io2.write_count, io1.write_count)
330 self.assertGreaterEqual(io2.write_bytes, io1.write_bytes)
331 self.assertGreaterEqual(io2.read_count, io1.read_count)
332 self.assertGreaterEqual(io2.read_bytes, io1.read_bytes)
333 if LINUX:
334 self.assertGreater(io2.write_chars, io1.write_chars)
335 self.assertGreaterEqual(io2.read_chars, io1.read_chars)
336
337 # sanity check
338 for i in range(len(io2)):
339 if BSD and i >= 2:
340 # On BSD read_bytes and write_bytes are always set to -1.
341 continue
342 self.assertGreaterEqual(io2[i], 0)
343 self.assertGreaterEqual(io2[i], 0)
344
345 @unittest.skipIf(not HAS_IONICE, "not supported")
346 @unittest.skipIf(not LINUX, "linux only")
347 def test_ionice_linux(self):
348 p = psutil.Process()
349 if not CI_TESTING:
350 self.assertEqual(p.ionice()[0], psutil.IOPRIO_CLASS_NONE)
351 self.assertEqual(psutil.IOPRIO_CLASS_NONE, 0)
352 self.assertEqual(psutil.IOPRIO_CLASS_RT, 1) # high
353 self.assertEqual(psutil.IOPRIO_CLASS_BE, 2) # normal
354 self.assertEqual(psutil.IOPRIO_CLASS_IDLE, 3) # low
355 init = p.ionice()
356 try:
357 # low
358 p.ionice(psutil.IOPRIO_CLASS_IDLE)
359 self.assertEqual(tuple(p.ionice()), (psutil.IOPRIO_CLASS_IDLE, 0))
360 with self.assertRaises(ValueError): # accepts no value
361 p.ionice(psutil.IOPRIO_CLASS_IDLE, value=7)
362 # normal
363 p.ionice(psutil.IOPRIO_CLASS_BE)
364 self.assertEqual(tuple(p.ionice()), (psutil.IOPRIO_CLASS_BE, 0))
365 p.ionice(psutil.IOPRIO_CLASS_BE, value=7)
366 self.assertEqual(tuple(p.ionice()), (psutil.IOPRIO_CLASS_BE, 7))
367 with self.assertRaises(ValueError):
368 p.ionice(psutil.IOPRIO_CLASS_BE, value=8)
369 try:
370 p.ionice(psutil.IOPRIO_CLASS_RT, value=7)
371 except psutil.AccessDenied:
372 pass
373 # errs
374 self.assertRaisesRegex(
375 ValueError, "ioclass accepts no value",
376 p.ionice, psutil.IOPRIO_CLASS_NONE, 1)
377 self.assertRaisesRegex(
378 ValueError, "ioclass accepts no value",
379 p.ionice, psutil.IOPRIO_CLASS_IDLE, 1)
380 self.assertRaisesRegex(
381 ValueError, "'ioclass' argument must be specified",
382 p.ionice, value=1)
383 finally:
384 ioclass, value = init
385 if ioclass == psutil.IOPRIO_CLASS_NONE:
386 value = 0
387 p.ionice(ioclass, value)
388
389 @unittest.skipIf(not HAS_IONICE, "not supported")
390 @unittest.skipIf(not WINDOWS, 'not supported on this win version')
391 def test_ionice_win(self):
392 p = psutil.Process()
393 if not CI_TESTING:
394 self.assertEqual(p.ionice(), psutil.IOPRIO_NORMAL)
395 init = p.ionice()
396 try:
397 # base
398 p.ionice(psutil.IOPRIO_VERYLOW)
399 self.assertEqual(p.ionice(), psutil.IOPRIO_VERYLOW)
400 p.ionice(psutil.IOPRIO_LOW)
401 self.assertEqual(p.ionice(), psutil.IOPRIO_LOW)
402 try:
403 p.ionice(psutil.IOPRIO_HIGH)
404 except psutil.AccessDenied:
405 pass
406 else:
407 self.assertEqual(p.ionice(), psutil.IOPRIO_HIGH)
408 # errs
409 self.assertRaisesRegex(
410 TypeError, "value argument not accepted on Windows",
411 p.ionice, psutil.IOPRIO_NORMAL, value=1)
412 self.assertRaisesRegex(
413 ValueError, "is not a valid priority",
414 p.ionice, psutil.IOPRIO_HIGH + 1)
415 finally:
416 p.ionice(init)
417
418 @unittest.skipIf(not HAS_RLIMIT, "not supported")
419 def test_rlimit_get(self):
420 import resource
421 p = psutil.Process(os.getpid())
422 names = [x for x in dir(psutil) if x.startswith('RLIMIT')]
423 assert names, names
424 for name in names:
425 value = getattr(psutil, name)
426 self.assertGreaterEqual(value, 0)
427 if name in dir(resource):
428 self.assertEqual(value, getattr(resource, name))
429 # XXX - On PyPy RLIMIT_INFINITY returned by
430 # resource.getrlimit() is reported as a very big long
431 # number instead of -1. It looks like a bug with PyPy.
432 if PYPY:
433 continue
434 self.assertEqual(p.rlimit(value), resource.getrlimit(value))
435 else:
436 ret = p.rlimit(value)
437 self.assertEqual(len(ret), 2)
438 self.assertGreaterEqual(ret[0], -1)
439 self.assertGreaterEqual(ret[1], -1)
440
441 @unittest.skipIf(not HAS_RLIMIT, "not supported")
442 def test_rlimit_set(self):
443 p = self.spawn_psproc()
444 p.rlimit(psutil.RLIMIT_NOFILE, (5, 5))
445 self.assertEqual(p.rlimit(psutil.RLIMIT_NOFILE), (5, 5))
446 # If pid is 0 prlimit() applies to the calling process and
447 # we don't want that.
448 if LINUX:
449 with self.assertRaisesRegex(ValueError, "can't use prlimit"):
450 psutil._psplatform.Process(0).rlimit(0)
451 with self.assertRaises(ValueError):
452 p.rlimit(psutil.RLIMIT_NOFILE, (5, 5, 5))
453
454 @unittest.skipIf(not HAS_RLIMIT, "not supported")
455 def test_rlimit(self):
456 p = psutil.Process()
457 testfn = self.get_testfn()
458 soft, hard = p.rlimit(psutil.RLIMIT_FSIZE)
459 try:
460 p.rlimit(psutil.RLIMIT_FSIZE, (1024, hard))
461 with open(testfn, "wb") as f:
462 f.write(b"X" * 1024)
463 # write() or flush() doesn't always cause the exception
464 # but close() will.
465 with self.assertRaises(IOError) as exc:
466 with open(testfn, "wb") as f:
467 f.write(b"X" * 1025)
468 self.assertEqual(exc.exception.errno if PY3 else exc.exception[0],
469 errno.EFBIG)
470 finally:
471 p.rlimit(psutil.RLIMIT_FSIZE, (soft, hard))
472 self.assertEqual(p.rlimit(psutil.RLIMIT_FSIZE), (soft, hard))
473
474 @unittest.skipIf(not HAS_RLIMIT, "not supported")
475 def test_rlimit_infinity(self):
476 # First set a limit, then re-set it by specifying INFINITY
477 # and assume we overridden the previous limit.
478 p = psutil.Process()
479 soft, hard = p.rlimit(psutil.RLIMIT_FSIZE)
480 try:
481 p.rlimit(psutil.RLIMIT_FSIZE, (1024, hard))
482 p.rlimit(psutil.RLIMIT_FSIZE, (psutil.RLIM_INFINITY, hard))
483 with open(self.get_testfn(), "wb") as f:
484 f.write(b"X" * 2048)
485 finally:
486 p.rlimit(psutil.RLIMIT_FSIZE, (soft, hard))
487 self.assertEqual(p.rlimit(psutil.RLIMIT_FSIZE), (soft, hard))
488
489 @unittest.skipIf(not HAS_RLIMIT, "not supported")
490 def test_rlimit_infinity_value(self):
491 # RLIMIT_FSIZE should be RLIM_INFINITY, which will be a really
492 # big number on a platform with large file support. On these
493 # platforms we need to test that the get/setrlimit functions
494 # properly convert the number to a C long long and that the
495 # conversion doesn't raise an error.
496 p = psutil.Process()
497 soft, hard = p.rlimit(psutil.RLIMIT_FSIZE)
498 self.assertEqual(psutil.RLIM_INFINITY, hard)
499 p.rlimit(psutil.RLIMIT_FSIZE, (soft, hard))
500
501 def test_num_threads(self):
502 # on certain platforms such as Linux we might test for exact
503 # thread number, since we always have with 1 thread per process,
504 # but this does not apply across all platforms (MACOS, Windows)
505 p = psutil.Process()
506 if OPENBSD:
507 try:
508 step1 = p.num_threads()
509 except psutil.AccessDenied:
510 raise unittest.SkipTest("on OpenBSD this requires root access")
511 else:
512 step1 = p.num_threads()
513
514 with ThreadTask():
515 step2 = p.num_threads()
516 self.assertEqual(step2, step1 + 1)
517
518 @unittest.skipIf(not WINDOWS, 'WINDOWS only')
519 def test_num_handles(self):
520 # a better test is done later into test/_windows.py
521 p = psutil.Process()
522 self.assertGreater(p.num_handles(), 0)
523
524 @unittest.skipIf(not HAS_THREADS, 'not supported')
525 def test_threads(self):
526 p = psutil.Process()
527 if OPENBSD:
528 try:
529 step1 = p.threads()
530 except psutil.AccessDenied:
531 raise unittest.SkipTest("on OpenBSD this requires root access")
532 else:
533 step1 = p.threads()
534
535 with ThreadTask():
536 step2 = p.threads()
537 self.assertEqual(len(step2), len(step1) + 1)
538 athread = step2[0]
539 # test named tuple
540 self.assertEqual(athread.id, athread[0])
541 self.assertEqual(athread.user_time, athread[1])
542 self.assertEqual(athread.system_time, athread[2])
543
544 @retry_on_failure()
545 @skip_on_access_denied(only_if=MACOS)
546 @unittest.skipIf(not HAS_THREADS, 'not supported')
547 def test_threads_2(self):
548 p = self.spawn_psproc()
549 if OPENBSD:
550 try:
551 p.threads()
552 except psutil.AccessDenied:
553 raise unittest.SkipTest(
554 "on OpenBSD this requires root access")
555 self.assertAlmostEqual(
556 p.cpu_times().user,
557 sum([x.user_time for x in p.threads()]), delta=0.1)
558 self.assertAlmostEqual(
559 p.cpu_times().system,
560 sum([x.system_time for x in p.threads()]), delta=0.1)
561
562 @retry_on_failure()
563 def test_memory_info(self):
564 p = psutil.Process()
565
566 # step 1 - get a base value to compare our results
567 rss1, vms1 = p.memory_info()[:2]
568 percent1 = p.memory_percent()
569 self.assertGreater(rss1, 0)
570 self.assertGreater(vms1, 0)
571
572 # step 2 - allocate some memory
573 memarr = [None] * 1500000
574
575 rss2, vms2 = p.memory_info()[:2]
576 percent2 = p.memory_percent()
577
578 # step 3 - make sure that the memory usage bumped up
579 self.assertGreater(rss2, rss1)
580 self.assertGreaterEqual(vms2, vms1) # vms might be equal
581 self.assertGreater(percent2, percent1)
582 del memarr
583
584 if WINDOWS:
585 mem = p.memory_info()
586 self.assertEqual(mem.rss, mem.wset)
587 self.assertEqual(mem.vms, mem.pagefile)
588
589 mem = p.memory_info()
590 for name in mem._fields:
591 self.assertGreaterEqual(getattr(mem, name), 0)
592
593 def test_memory_full_info(self):
594 p = psutil.Process()
595 total = psutil.virtual_memory().total
596 mem = p.memory_full_info()
597 for name in mem._fields:
598 value = getattr(mem, name)
599 self.assertGreaterEqual(value, 0, msg=(name, value))
600 if name == 'vms' and OSX or LINUX:
601 continue
602 self.assertLessEqual(value, total, msg=(name, value, total))
603 if LINUX or WINDOWS or MACOS:
604 self.assertGreaterEqual(mem.uss, 0)
605 if LINUX:
606 self.assertGreaterEqual(mem.pss, 0)
607 self.assertGreaterEqual(mem.swap, 0)
608
609 @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported")
610 def test_memory_maps(self):
611 p = psutil.Process()
612 maps = p.memory_maps()
613 paths = [x for x in maps]
614 self.assertEqual(len(paths), len(set(paths)))
615 ext_maps = p.memory_maps(grouped=False)
616
617 for nt in maps:
618 if not nt.path.startswith('['):
619 assert os.path.isabs(nt.path), nt.path
620 if POSIX:
621 try:
622 assert os.path.exists(nt.path) or \
623 os.path.islink(nt.path), nt.path
624 except AssertionError:
625 if not LINUX:
626 raise
627 else:
628 # https://github.com/giampaolo/psutil/issues/759
629 with open_text('/proc/self/smaps') as f:
630 data = f.read()
631 if "%s (deleted)" % nt.path not in data:
632 raise
633 else:
634 # XXX - On Windows we have this strange behavior with
635 # 64 bit dlls: they are visible via explorer but cannot
636 # be accessed via os.stat() (wtf?).
637 if '64' not in os.path.basename(nt.path):
638 try:
639 st = os.stat(nt.path)
640 except FileNotFoundError:
641 pass
642 else:
643 assert stat.S_ISREG(st.st_mode), nt.path
644 for nt in ext_maps:
645 for fname in nt._fields:
646 value = getattr(nt, fname)
647 if fname == 'path':
648 continue
649 elif fname in ('addr', 'perms'):
650 assert value, value
651 else:
652 self.assertIsInstance(value, (int, long))
653 assert value >= 0, value
654
655 @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported")
656 def test_memory_maps_lists_lib(self):
657 # Make sure a newly loaded shared lib is listed.
658 p = psutil.Process()
659 with copyload_shared_lib() as path:
660 def normpath(p):
661 return os.path.realpath(os.path.normcase(p))
662 libpaths = [normpath(x.path)
663 for x in p.memory_maps()]
664 self.assertIn(normpath(path), libpaths)
665
666 def test_memory_percent(self):
667 p = psutil.Process()
668 p.memory_percent()
669 self.assertRaises(ValueError, p.memory_percent, memtype="?!?")
670 if LINUX or MACOS or WINDOWS:
671 p.memory_percent(memtype='uss')
672
673 def test_is_running(self):
674 p = self.spawn_psproc()
675 assert p.is_running()
676 assert p.is_running()
677 p.kill()
678 p.wait()
679 assert not p.is_running()
680 assert not p.is_running()
681
682 def test_exe(self):
683 p = self.spawn_psproc()
684 exe = p.exe()
685 try:
686 self.assertEqual(exe, PYTHON_EXE)
687 except AssertionError:
688 if WINDOWS and len(exe) == len(PYTHON_EXE):
689 # on Windows we don't care about case sensitivity
690 normcase = os.path.normcase
691 self.assertEqual(normcase(exe), normcase(PYTHON_EXE))
692 else:
693 # certain platforms such as BSD are more accurate returning:
694 # "/usr/local/bin/python2.7"
695 # ...instead of:
696 # "/usr/local/bin/python"
697 # We do not want to consider this difference in accuracy
698 # an error.
699 ver = "%s.%s" % (sys.version_info[0], sys.version_info[1])
700 try:
701 self.assertEqual(exe.replace(ver, ''),
702 PYTHON_EXE.replace(ver, ''))
703 except AssertionError:
704 # Tipically MACOS. Really not sure what to do here.
705 pass
706
707 out = sh([exe, "-c", "import os; print('hey')"])
708 self.assertEqual(out, 'hey')
709
710 def test_cmdline(self):
711 cmdline = [PYTHON_EXE, "-c", "import time; time.sleep(60)"]
712 p = self.spawn_psproc(cmdline)
713 # XXX - most of the times the underlying sysctl() call on Net
714 # and Open BSD returns a truncated string.
715 # Also /proc/pid/cmdline behaves the same so it looks
716 # like this is a kernel bug.
717 # XXX - AIX truncates long arguments in /proc/pid/cmdline
718 if NETBSD or OPENBSD or AIX:
719 self.assertEqual(p.cmdline()[0], PYTHON_EXE)
720 else:
721 self.assertEqual(' '.join(p.cmdline()), ' '.join(cmdline))
722
723 @unittest.skipIf(PYPY, "broken on PYPY")
724 def test_long_cmdline(self):
725 testfn = self.get_testfn()
726 create_exe(testfn)
727 cmdline = [testfn] + (["0123456789"] * 20)
728 p = self.spawn_psproc(cmdline)
729 self.assertEqual(p.cmdline(), cmdline)
730
731 def test_name(self):
732 p = self.spawn_psproc(PYTHON_EXE)
733 name = p.name().lower()
734 pyexe = os.path.basename(os.path.realpath(sys.executable)).lower()
735 assert pyexe.startswith(name), (pyexe, name)
736
737 @unittest.skipIf(PYPY, "unreliable on PYPY")
738 def test_long_name(self):
739 testfn = self.get_testfn(suffix="0123456789" * 2)
740 create_exe(testfn)
741 p = self.spawn_psproc(testfn)
742 self.assertEqual(p.name(), os.path.basename(testfn))
743
744 # XXX
745 @unittest.skipIf(SUNOS, "broken on SUNOS")
746 @unittest.skipIf(AIX, "broken on AIX")
747 @unittest.skipIf(PYPY, "broken on PYPY")
748 def test_prog_w_funky_name(self):
749 # Test that name(), exe() and cmdline() correctly handle programs
750 # with funky chars such as spaces and ")", see:
751 # https://github.com/giampaolo/psutil/issues/628
752 funky_path = self.get_testfn(suffix='foo bar )')
753 create_exe(funky_path)
754 cmdline = [funky_path, "-c",
755 "import time; [time.sleep(0.01) for x in range(3000)];"
756 "arg1", "arg2", "", "arg3", ""]
757 p = self.spawn_psproc(cmdline)
758 self.assertEqual(p.cmdline(), cmdline)
759 self.assertEqual(p.name(), os.path.basename(funky_path))
760 self.assertEqual(os.path.normcase(p.exe()),
761 os.path.normcase(funky_path))
762
763 @unittest.skipIf(not POSIX, 'POSIX only')
764 def test_uids(self):
765 p = psutil.Process()
766 real, effective, saved = p.uids()
767 # os.getuid() refers to "real" uid
768 self.assertEqual(real, os.getuid())
769 # os.geteuid() refers to "effective" uid
770 self.assertEqual(effective, os.geteuid())
771 # No such thing as os.getsuid() ("saved" uid), but starting
772 # from python 2.7 we have os.getresuid() which returns all
773 # of them.
774 if hasattr(os, "getresuid"):
775 self.assertEqual(os.getresuid(), p.uids())
776
777 @unittest.skipIf(not POSIX, 'POSIX only')
778 def test_gids(self):
779 p = psutil.Process()
780 real, effective, saved = p.gids()
781 # os.getuid() refers to "real" uid
782 self.assertEqual(real, os.getgid())
783 # os.geteuid() refers to "effective" uid
784 self.assertEqual(effective, os.getegid())
785 # No such thing as os.getsgid() ("saved" gid), but starting
786 # from python 2.7 we have os.getresgid() which returns all
787 # of them.
788 if hasattr(os, "getresuid"):
789 self.assertEqual(os.getresgid(), p.gids())
790
791 def test_nice(self):
792 p = psutil.Process()
793 self.assertRaises(TypeError, p.nice, "str")
794 init = p.nice()
795 try:
796 if WINDOWS:
797 for prio in [psutil.NORMAL_PRIORITY_CLASS,
798 psutil.IDLE_PRIORITY_CLASS,
799 psutil.BELOW_NORMAL_PRIORITY_CLASS,
800 psutil.REALTIME_PRIORITY_CLASS,
801 psutil.HIGH_PRIORITY_CLASS,
802 psutil.ABOVE_NORMAL_PRIORITY_CLASS]:
803 with self.subTest(prio=prio):
804 try:
805 p.nice(prio)
806 except psutil.AccessDenied:
807 pass
808 else:
809 self.assertEqual(p.nice(), prio)
810 else:
811 try:
812 if hasattr(os, "getpriority"):
813 self.assertEqual(
814 os.getpriority(os.PRIO_PROCESS, os.getpid()),
815 p.nice())
816 p.nice(1)
817 self.assertEqual(p.nice(), 1)
818 if hasattr(os, "getpriority"):
819 self.assertEqual(
820 os.getpriority(os.PRIO_PROCESS, os.getpid()),
821 p.nice())
822 # XXX - going back to previous nice value raises
823 # AccessDenied on MACOS
824 if not MACOS:
825 p.nice(0)
826 self.assertEqual(p.nice(), 0)
827 except psutil.AccessDenied:
828 pass
829 finally:
830 try:
831 p.nice(init)
832 except psutil.AccessDenied:
833 pass
834
835 def test_status(self):
836 p = psutil.Process()
837 self.assertEqual(p.status(), psutil.STATUS_RUNNING)
838
839 def test_username(self):
840 p = self.spawn_psproc()
841 username = p.username()
842 if WINDOWS:
843 domain, username = username.split('\\')
844 self.assertEqual(username, getpass.getuser())
845 if 'USERDOMAIN' in os.environ:
846 self.assertEqual(domain, os.environ['USERDOMAIN'])
847 else:
848 self.assertEqual(username, getpass.getuser())
849
850 def test_cwd(self):
851 p = self.spawn_psproc()
852 self.assertEqual(p.cwd(), os.getcwd())
853
854 def test_cwd_2(self):
855 cmd = [PYTHON_EXE, "-c",
856 "import os, time; os.chdir('..'); time.sleep(60)"]
857 p = self.spawn_psproc(cmd)
858 call_until(p.cwd, "ret == os.path.dirname(os.getcwd())")
859
860 @unittest.skipIf(not HAS_CPU_AFFINITY, 'not supported')
861 def test_cpu_affinity(self):
862 p = psutil.Process()
863 initial = p.cpu_affinity()
864 assert initial, initial
865 self.addCleanup(p.cpu_affinity, initial)
866
867 if hasattr(os, "sched_getaffinity"):
868 self.assertEqual(initial, list(os.sched_getaffinity(p.pid)))
869 self.assertEqual(len(initial), len(set(initial)))
870
871 all_cpus = list(range(len(psutil.cpu_percent(percpu=True))))
872 for n in all_cpus:
873 p.cpu_affinity([n])
874 self.assertEqual(p.cpu_affinity(), [n])
875 if hasattr(os, "sched_getaffinity"):
876 self.assertEqual(p.cpu_affinity(),
877 list(os.sched_getaffinity(p.pid)))
878 # also test num_cpu()
879 if hasattr(p, "num_cpu"):
880 self.assertEqual(p.cpu_affinity()[0], p.num_cpu())
881
882 # [] is an alias for "all eligible CPUs"; on Linux this may
883 # not be equal to all available CPUs, see:
884 # https://github.com/giampaolo/psutil/issues/956
885 p.cpu_affinity([])
886 if LINUX:
887 self.assertEqual(p.cpu_affinity(), p._proc._get_eligible_cpus())
888 else:
889 self.assertEqual(p.cpu_affinity(), all_cpus)
890 if hasattr(os, "sched_getaffinity"):
891 self.assertEqual(p.cpu_affinity(),
892 list(os.sched_getaffinity(p.pid)))
893 #
894 self.assertRaises(TypeError, p.cpu_affinity, 1)
895 p.cpu_affinity(initial)
896 # it should work with all iterables, not only lists
897 p.cpu_affinity(set(all_cpus))
898 p.cpu_affinity(tuple(all_cpus))
899
900 @unittest.skipIf(not HAS_CPU_AFFINITY, 'not supported')
901 def test_cpu_affinity_errs(self):
902 p = self.spawn_psproc()
903 invalid_cpu = [len(psutil.cpu_times(percpu=True)) + 10]
904 self.assertRaises(ValueError, p.cpu_affinity, invalid_cpu)
905 self.assertRaises(ValueError, p.cpu_affinity, range(10000, 11000))
906 self.assertRaises(TypeError, p.cpu_affinity, [0, "1"])
907 self.assertRaises(ValueError, p.cpu_affinity, [0, -1])
908
909 @unittest.skipIf(not HAS_CPU_AFFINITY, 'not supported')
910 def test_cpu_affinity_all_combinations(self):
911 p = psutil.Process()
912 initial = p.cpu_affinity()
913 assert initial, initial
914 self.addCleanup(p.cpu_affinity, initial)
915
916 # All possible CPU set combinations.
917 if len(initial) > 12:
918 initial = initial[:12] # ...otherwise it will take forever
919 combos = []
920 for i in range(0, len(initial) + 1):
921 for subset in itertools.combinations(initial, i):
922 if subset:
923 combos.append(list(subset))
924
925 for combo in combos:
926 p.cpu_affinity(combo)
927 self.assertEqual(sorted(p.cpu_affinity()), sorted(combo))
928
929 # TODO: #595
930 @unittest.skipIf(BSD, "broken on BSD")
931 # can't find any process file on Appveyor
932 @unittest.skipIf(APPVEYOR, "unreliable on APPVEYOR")
933 def test_open_files(self):
934 p = psutil.Process()
935 testfn = self.get_testfn()
936 files = p.open_files()
937 self.assertNotIn(testfn, files)
938 with open(testfn, 'wb') as f:
939 f.write(b'x' * 1024)
940 f.flush()
941 # give the kernel some time to see the new file
942 files = call_until(p.open_files, "len(ret) != %i" % len(files))
943 filenames = [os.path.normcase(x.path) for x in files]
944 self.assertIn(os.path.normcase(testfn), filenames)
945 if LINUX:
946 for file in files:
947 if file.path == testfn:
948 self.assertEqual(file.position, 1024)
949 for file in files:
950 assert os.path.isfile(file.path), file
951
952 # another process
953 cmdline = "import time; f = open(r'%s', 'r'); time.sleep(60);" % testfn
954 p = self.spawn_psproc([PYTHON_EXE, "-c", cmdline])
955
956 for x in range(100):
957 filenames = [os.path.normcase(x.path) for x in p.open_files()]
958 if testfn in filenames:
959 break
960 time.sleep(.01)
961 else:
962 self.assertIn(os.path.normcase(testfn), filenames)
963 for file in filenames:
964 assert os.path.isfile(file), file
965
966 # TODO: #595
967 @unittest.skipIf(BSD, "broken on BSD")
968 # can't find any process file on Appveyor
969 @unittest.skipIf(APPVEYOR, "unreliable on APPVEYOR")
970 def test_open_files_2(self):
971 # test fd and path fields
972 p = psutil.Process()
973 normcase = os.path.normcase
974 testfn = self.get_testfn()
975 with open(testfn, 'w') as fileobj:
976 for file in p.open_files():
977 if normcase(file.path) == normcase(fileobj.name) or \
978 file.fd == fileobj.fileno():
979 break
980 else:
981 self.fail("no file found; files=%s" % repr(p.open_files()))
982 self.assertEqual(normcase(file.path), normcase(fileobj.name))
983 if WINDOWS:
984 self.assertEqual(file.fd, -1)
985 else:
986 self.assertEqual(file.fd, fileobj.fileno())
987 # test positions
988 ntuple = p.open_files()[0]
989 self.assertEqual(ntuple[0], ntuple.path)
990 self.assertEqual(ntuple[1], ntuple.fd)
991 # test file is gone
992 self.assertNotIn(fileobj.name, p.open_files())
993
994 @unittest.skipIf(not POSIX, 'POSIX only')
995 def test_num_fds(self):
996 p = psutil.Process()
997 testfn = self.get_testfn()
998 start = p.num_fds()
999 file = open(testfn, 'w')
1000 self.addCleanup(file.close)
1001 self.assertEqual(p.num_fds(), start + 1)
1002 sock = socket.socket()
1003 self.addCleanup(sock.close)
1004 self.assertEqual(p.num_fds(), start + 2)
1005 file.close()
1006 sock.close()
1007 self.assertEqual(p.num_fds(), start)
1008
1009 @skip_on_not_implemented(only_if=LINUX)
1010 @unittest.skipIf(OPENBSD or NETBSD, "not reliable on OPENBSD & NETBSD")
1011 def test_num_ctx_switches(self):
1012 p = psutil.Process()
1013 before = sum(p.num_ctx_switches())
1014 for x in range(500000):
1015 after = sum(p.num_ctx_switches())
1016 if after > before:
1017 return
1018 self.fail("num ctx switches still the same after 50.000 iterations")
1019
1020 def test_ppid(self):
1021 p = psutil.Process()
1022 if hasattr(os, 'getppid'):
1023 self.assertEqual(p.ppid(), os.getppid())
1024 p = self.spawn_psproc()
1025 self.assertEqual(p.ppid(), os.getpid())
1026 if APPVEYOR:
1027 # Occasional failures, see:
1028 # https://ci.appveyor.com/project/giampaolo/psutil/build/
1029 # job/0hs623nenj7w4m33
1030 return
1031
1032 def test_parent(self):
1033 p = self.spawn_psproc()
1034 self.assertEqual(p.parent().pid, os.getpid())
1035
1036 lowest_pid = psutil.pids()[0]
1037 self.assertIsNone(psutil.Process(lowest_pid).parent())
1038
1039 def test_parent_multi(self):
1040 parent = psutil.Process()
1041 child, grandchild = self.spawn_children_pair()
1042 self.assertEqual(grandchild.parent(), child)
1043 self.assertEqual(child.parent(), parent)
1044
1045 def test_parent_disappeared(self):
1046 # Emulate a case where the parent process disappeared.
1047 p = self.spawn_psproc()
1048 with mock.patch("psutil.Process",
1049 side_effect=psutil.NoSuchProcess(0, 'foo')):
1050 self.assertIsNone(p.parent())
1051
1052 @retry_on_failure()
1053 def test_parents(self):
1054 parent = psutil.Process()
1055 assert parent.parents()
1056 child, grandchild = self.spawn_children_pair()
1057 self.assertEqual(child.parents()[0], parent)
1058 self.assertEqual(grandchild.parents()[0], child)
1059 self.assertEqual(grandchild.parents()[1], parent)
1060
1061 def test_children(self):
1062 parent = psutil.Process()
1063 self.assertEqual(parent.children(), [])
1064 self.assertEqual(parent.children(recursive=True), [])
1065 # On Windows we set the flag to 0 in order to cancel out the
1066 # CREATE_NO_WINDOW flag (enabled by default) which creates
1067 # an extra "conhost.exe" child.
1068 child = self.spawn_psproc(creationflags=0)
1069 children1 = parent.children()
1070 children2 = parent.children(recursive=True)
1071 for children in (children1, children2):
1072 self.assertEqual(len(children), 1)
1073 self.assertEqual(children[0].pid, child.pid)
1074 self.assertEqual(children[0].ppid(), parent.pid)
1075
1076 def test_children_recursive(self):
1077 # Test children() against two sub processes, p1 and p2, where
1078 # p1 (our child) spawned p2 (our grandchild).
1079 parent = psutil.Process()
1080 child, grandchild = self.spawn_children_pair()
1081 self.assertEqual(parent.children(), [child])
1082 self.assertEqual(parent.children(recursive=True), [child, grandchild])
1083 # If the intermediate process is gone there's no way for
1084 # children() to recursively find it.
1085 child.terminate()
1086 child.wait()
1087 self.assertEqual(parent.children(recursive=True), [])
1088
1089 def test_children_duplicates(self):
1090 # find the process which has the highest number of children
1091 table = collections.defaultdict(int)
1092 for p in psutil.process_iter():
1093 try:
1094 table[p.ppid()] += 1
1095 except psutil.Error:
1096 pass
1097 # this is the one, now let's make sure there are no duplicates
1098 pid = sorted(table.items(), key=lambda x: x[1])[-1][0]
1099 if LINUX and pid == 0:
1100 raise self.skipTest("PID 0")
1101 p = psutil.Process(pid)
1102 try:
1103 c = p.children(recursive=True)
1104 except psutil.AccessDenied: # windows
1105 pass
1106 else:
1107 self.assertEqual(len(c), len(set(c)))
1108
1109 def test_parents_and_children(self):
1110 parent = psutil.Process()
1111 child, grandchild = self.spawn_children_pair()
1112 # forward
1113 children = parent.children(recursive=True)
1114 self.assertEqual(len(children), 2)
1115 self.assertEqual(children[0], child)
1116 self.assertEqual(children[1], grandchild)
1117 # backward
1118 parents = grandchild.parents()
1119 self.assertEqual(parents[0], child)
1120 self.assertEqual(parents[1], parent)
1121
1122 def test_suspend_resume(self):
1123 p = self.spawn_psproc()
1124 p.suspend()
1125 for x in range(100):
1126 if p.status() == psutil.STATUS_STOPPED:
1127 break
1128 time.sleep(0.01)
1129 p.resume()
1130 self.assertNotEqual(p.status(), psutil.STATUS_STOPPED)
1131
1132 def test_invalid_pid(self):
1133 self.assertRaises(TypeError, psutil.Process, "1")
1134 self.assertRaises(ValueError, psutil.Process, -1)
1135
1136 def test_as_dict(self):
1137 p = psutil.Process()
1138 d = p.as_dict(attrs=['exe', 'name'])
1139 self.assertEqual(sorted(d.keys()), ['exe', 'name'])
1140
1141 p = psutil.Process(min(psutil.pids()))
1142 d = p.as_dict(attrs=['connections'], ad_value='foo')
1143 if not isinstance(d['connections'], list):
1144 self.assertEqual(d['connections'], 'foo')
1145
1146 # Test ad_value is set on AccessDenied.
1147 with mock.patch('psutil.Process.nice', create=True,
1148 side_effect=psutil.AccessDenied):
1149 self.assertEqual(
1150 p.as_dict(attrs=["nice"], ad_value=1), {"nice": 1})
1151
1152 # Test that NoSuchProcess bubbles up.
1153 with mock.patch('psutil.Process.nice', create=True,
1154 side_effect=psutil.NoSuchProcess(p.pid, "name")):
1155 self.assertRaises(
1156 psutil.NoSuchProcess, p.as_dict, attrs=["nice"])
1157
1158 # Test that ZombieProcess is swallowed.
1159 with mock.patch('psutil.Process.nice', create=True,
1160 side_effect=psutil.ZombieProcess(p.pid, "name")):
1161 self.assertEqual(
1162 p.as_dict(attrs=["nice"], ad_value="foo"), {"nice": "foo"})
1163
1164 # By default APIs raising NotImplementedError are
1165 # supposed to be skipped.
1166 with mock.patch('psutil.Process.nice', create=True,
1167 side_effect=NotImplementedError):
1168 d = p.as_dict()
1169 self.assertNotIn('nice', list(d.keys()))
1170 # ...unless the user explicitly asked for some attr.
1171 with self.assertRaises(NotImplementedError):
1172 p.as_dict(attrs=["nice"])
1173
1174 # errors
1175 with self.assertRaises(TypeError):
1176 p.as_dict('name')
1177 with self.assertRaises(ValueError):
1178 p.as_dict(['foo'])
1179 with self.assertRaises(ValueError):
1180 p.as_dict(['foo', 'bar'])
1181
1182 def test_oneshot(self):
1183 p = psutil.Process()
1184 with mock.patch("psutil._psplatform.Process.cpu_times") as m:
1185 with p.oneshot():
1186 p.cpu_times()
1187 p.cpu_times()
1188 self.assertEqual(m.call_count, 1)
1189
1190 with mock.patch("psutil._psplatform.Process.cpu_times") as m:
1191 p.cpu_times()
1192 p.cpu_times()
1193 self.assertEqual(m.call_count, 2)
1194
1195 def test_oneshot_twice(self):
1196 # Test the case where the ctx manager is __enter__ed twice.
1197 # The second __enter__ is supposed to resut in a NOOP.
1198 p = psutil.Process()
1199 with mock.patch("psutil._psplatform.Process.cpu_times") as m1:
1200 with mock.patch("psutil._psplatform.Process.oneshot_enter") as m2:
1201 with p.oneshot():
1202 p.cpu_times()
1203 p.cpu_times()
1204 with p.oneshot():
1205 p.cpu_times()
1206 p.cpu_times()
1207 self.assertEqual(m1.call_count, 1)
1208 self.assertEqual(m2.call_count, 1)
1209
1210 with mock.patch("psutil._psplatform.Process.cpu_times") as m:
1211 p.cpu_times()
1212 p.cpu_times()
1213 self.assertEqual(m.call_count, 2)
1214
1215 def test_oneshot_cache(self):
1216 # Make sure oneshot() cache is nonglobal. Instead it's
1217 # supposed to be bound to the Process instance, see:
1218 # https://github.com/giampaolo/psutil/issues/1373
1219 p1, p2 = self.spawn_children_pair()
1220 p1_ppid = p1.ppid()
1221 p2_ppid = p2.ppid()
1222 self.assertNotEqual(p1_ppid, p2_ppid)
1223 with p1.oneshot():
1224 self.assertEqual(p1.ppid(), p1_ppid)
1225 self.assertEqual(p2.ppid(), p2_ppid)
1226 with p2.oneshot():
1227 self.assertEqual(p1.ppid(), p1_ppid)
1228 self.assertEqual(p2.ppid(), p2_ppid)
1229
1230 def test_halfway_terminated_process(self):
1231 # Test that NoSuchProcess exception gets raised in case the
1232 # process dies after we create the Process object.
1233 # Example:
1234 # >>> proc = Process(1234)
1235 # >>> time.sleep(2) # time-consuming task, process dies in meantime
1236 # >>> proc.name()
1237 # Refers to Issue #15
1238 def assert_raises_nsp(fun, fun_name):
1239 try:
1240 ret = fun()
1241 except psutil.ZombieProcess: # differentiate from NSP
1242 raise
1243 except psutil.NoSuchProcess:
1244 pass
1245 except psutil.AccessDenied:
1246 if OPENBSD and fun_name in ('threads', 'num_threads'):
1247 return
1248 raise
1249 else:
1250 # NtQuerySystemInformation succeeds even if process is gone.
1251 if WINDOWS and fun_name in ('exe', 'name'):
1252 return
1253 raise self.fail("%r didn't raise NSP and returned %r "
1254 "instead" % (fun, ret))
1255
1256 p = self.spawn_psproc()
1257 p.terminate()
1258 p.wait()
1259 if WINDOWS: # XXX
1260 call_until(psutil.pids, "%s not in ret" % p.pid)
1261 self.assertProcessGone(p)
1262
1263 ns = process_namespace(p)
1264 for fun, name in ns.iter(ns.all):
1265 assert_raises_nsp(fun, name)
1266
1267 # NtQuerySystemInformation succeeds even if process is gone.
1268 if WINDOWS and not GITHUB_ACTIONS:
1269 normcase = os.path.normcase
1270 self.assertEqual(normcase(p.exe()), normcase(PYTHON_EXE))
1271
1272 @unittest.skipIf(not POSIX, 'POSIX only')
1273 def test_zombie_process(self):
1274 def succeed_or_zombie_p_exc(fun):
1275 try:
1276 return fun()
1277 except (psutil.ZombieProcess, psutil.AccessDenied):
1278 pass
1279
1280 parent, zombie = self.spawn_zombie()
1281 # A zombie process should always be instantiable
1282 zproc = psutil.Process(zombie.pid)
1283 # ...and at least its status always be querable
1284 self.assertEqual(zproc.status(), psutil.STATUS_ZOMBIE)
1285 # ...and it should be considered 'running'
1286 assert zproc.is_running()
1287 # ...and as_dict() shouldn't crash
1288 zproc.as_dict()
1289 # ...its parent should 'see' it (edit: not true on BSD and MACOS
1290 # descendants = [x.pid for x in psutil.Process().children(
1291 # recursive=True)]
1292 # self.assertIn(zpid, descendants)
1293 # XXX should we also assume ppid be usable? Note: this
1294 # would be an important use case as the only way to get
1295 # rid of a zombie is to kill its parent.
1296 # self.assertEqual(zpid.ppid(), os.getpid())
1297 # ...and all other APIs should be able to deal with it
1298
1299 ns = process_namespace(zproc)
1300 for fun, name in ns.iter(ns.all):
1301 succeed_or_zombie_p_exc(fun)
1302
1303 assert psutil.pid_exists(zproc.pid)
1304 self.assertIn(zproc.pid, psutil.pids())
1305 self.assertIn(zproc.pid, [x.pid for x in psutil.process_iter()])
1306 psutil._pmap = {}
1307 self.assertIn(zproc.pid, [x.pid for x in psutil.process_iter()])
1308
1309 @unittest.skipIf(not POSIX, 'POSIX only')
1310 def test_zombie_process_is_running_w_exc(self):
1311 # Emulate a case where internally is_running() raises
1312 # ZombieProcess.
1313 p = psutil.Process()
1314 with mock.patch("psutil.Process",
1315 side_effect=psutil.ZombieProcess(0)) as m:
1316 assert p.is_running()
1317 assert m.called
1318
1319 @unittest.skipIf(not POSIX, 'POSIX only')
1320 def test_zombie_process_status_w_exc(self):
1321 # Emulate a case where internally status() raises
1322 # ZombieProcess.
1323 p = psutil.Process()
1324 with mock.patch("psutil._psplatform.Process.status",
1325 side_effect=psutil.ZombieProcess(0)) as m:
1326 self.assertEqual(p.status(), psutil.STATUS_ZOMBIE)
1327 assert m.called
1328
1329 def test_pid_0(self):
1330 # Process(0) is supposed to work on all platforms except Linux
1331 if 0 not in psutil.pids():
1332 self.assertRaises(psutil.NoSuchProcess, psutil.Process, 0)
1333 # These 2 are a contradiction, but "ps" says PID 1's parent
1334 # is PID 0.
1335 assert not psutil.pid_exists(0)
1336 self.assertEqual(psutil.Process(1).ppid(), 0)
1337 return
1338
1339 p = psutil.Process(0)
1340 exc = psutil.AccessDenied if WINDOWS else ValueError
1341 self.assertRaises(exc, p.wait)
1342 self.assertRaises(exc, p.terminate)
1343 self.assertRaises(exc, p.suspend)
1344 self.assertRaises(exc, p.resume)
1345 self.assertRaises(exc, p.kill)
1346 self.assertRaises(exc, p.send_signal, signal.SIGTERM)
1347
1348 # test all methods
1349 ns = process_namespace(p)
1350 for fun, name in ns.iter(ns.getters + ns.setters):
1351 try:
1352 ret = fun()
1353 except psutil.AccessDenied:
1354 pass
1355 else:
1356 if name in ("uids", "gids"):
1357 self.assertEqual(ret.real, 0)
1358 elif name == "username":
1359 user = 'NT AUTHORITY\\SYSTEM' if WINDOWS else 'root'
1360 self.assertEqual(p.username(), user)
1361 elif name == "name":
1362 assert name, name
1363
1364 if not OPENBSD:
1365 self.assertIn(0, psutil.pids())
1366 assert psutil.pid_exists(0)
1367
1368 @unittest.skipIf(not HAS_ENVIRON, "not supported")
1369 def test_environ(self):
1370 def clean_dict(d):
1371 # Most of these are problematic on Travis.
1372 d.pop("PSUTIL_TESTING", None)
1373 d.pop("PLAT", None)
1374 d.pop("HOME", None)
1375 if MACOS:
1376 d.pop("__CF_USER_TEXT_ENCODING", None)
1377 d.pop("VERSIONER_PYTHON_PREFER_32_BIT", None)
1378 d.pop("VERSIONER_PYTHON_VERSION", None)
1379 return dict(
1380 [(k.replace("\r", "").replace("\n", ""),
1381 v.replace("\r", "").replace("\n", ""))
1382 for k, v in d.items()])
1383
1384 self.maxDiff = None
1385 p = psutil.Process()
1386 d1 = clean_dict(p.environ())
1387 d2 = clean_dict(os.environ.copy())
1388 if not OSX and GITHUB_ACTIONS:
1389 self.assertEqual(d1, d2)
1390
1391 @unittest.skipIf(not HAS_ENVIRON, "not supported")
1392 @unittest.skipIf(not POSIX, "POSIX only")
1393 def test_weird_environ(self):
1394 # environment variables can contain values without an equals sign
1395 code = textwrap.dedent("""
1396 #include <unistd.h>
1397 #include <fcntl.h>
1398 char * const argv[] = {"cat", 0};
1399 char * const envp[] = {"A=1", "X", "C=3", 0};
1400 int main(void) {
1401 /* Close stderr on exec so parent can wait for the execve to
1402 * finish. */
1403 if (fcntl(2, F_SETFD, FD_CLOEXEC) != 0)
1404 return 0;
1405 return execve("/bin/cat", argv, envp);
1406 }
1407 """)
1408 path = self.get_testfn()
1409 create_exe(path, c_code=code)
1410 sproc = self.spawn_testproc(
1411 [path], stdin=subprocess.PIPE, stderr=subprocess.PIPE)
1412 p = psutil.Process(sproc.pid)
1413 wait_for_pid(p.pid)
1414 assert p.is_running()
1415 # Wait for process to exec or exit.
1416 self.assertEqual(sproc.stderr.read(), b"")
1417 if MACOS and CI_TESTING:
1418 try:
1419 env = p.environ()
1420 except psutil.AccessDenied:
1421 # XXX: fails sometimes with:
1422 # PermissionError from 'sysctl(KERN_PROCARGS2) -> EIO'
1423 return
1424 else:
1425 env = p.environ()
1426 self.assertEqual(env, {"A": "1", "C": "3"})
1427 sproc.communicate()
1428 self.assertEqual(sproc.returncode, 0)
1429
1430
1431 # ===================================================================
1432 # --- Limited user tests
1433 # ===================================================================
1434
1435
1436 if POSIX and os.getuid() == 0:
1437
1438 class LimitedUserTestCase(TestProcess):
1439 """Repeat the previous tests by using a limited user.
1440 Executed only on UNIX and only if the user who run the test script
1441 is root.
1442 """
1443 # the uid/gid the test suite runs under
1444 if hasattr(os, 'getuid'):
1445 PROCESS_UID = os.getuid()
1446 PROCESS_GID = os.getgid()
1447
1448 def __init__(self, *args, **kwargs):
1449 super().__init__(*args, **kwargs)
1450 # re-define all existent test methods in order to
1451 # ignore AccessDenied exceptions
1452 for attr in [x for x in dir(self) if x.startswith('test')]:
1453 meth = getattr(self, attr)
1454
1455 def test_(self):
1456 try:
1457 meth()
1458 except psutil.AccessDenied:
1459 pass
1460 setattr(self, attr, types.MethodType(test_, self))
1461
1462 def setUp(self):
1463 super().setUp()
1464 os.setegid(1000)
1465 os.seteuid(1000)
1466
1467 def tearDown(self):
1468 os.setegid(self.PROCESS_UID)
1469 os.seteuid(self.PROCESS_GID)
1470 super().tearDown()
1471
1472 def test_nice(self):
1473 try:
1474 psutil.Process().nice(-1)
1475 except psutil.AccessDenied:
1476 pass
1477 else:
1478 self.fail("exception not raised")
1479
1480 @unittest.skipIf(1, "causes problem as root")
1481 def test_zombie_process(self):
1482 pass
1483
1484
1485 # ===================================================================
1486 # --- psutil.Popen tests
1487 # ===================================================================
1488
1489
1490 class TestPopen(PsutilTestCase):
1491 """Tests for psutil.Popen class."""
1492
1493 @classmethod
1494 def tearDownClass(cls):
1495 reap_children()
1496
1497 def test_misc(self):
1498 # XXX this test causes a ResourceWarning on Python 3 because
1499 # psutil.__subproc instance doesn't get propertly freed.
1500 # Not sure what to do though.
1501 cmd = [PYTHON_EXE, "-c", "import time; time.sleep(60);"]
1502 with psutil.Popen(cmd, stdout=subprocess.PIPE,
1503 stderr=subprocess.PIPE) as proc:
1504 proc.name()
1505 proc.cpu_times()
1506 proc.stdin
1507 self.assertTrue(dir(proc))
1508 self.assertRaises(AttributeError, getattr, proc, 'foo')
1509 proc.terminate()
1510 if POSIX:
1511 self.assertEqual(proc.wait(5), -signal.SIGTERM)
1512 else:
1513 self.assertEqual(proc.wait(5), signal.SIGTERM)
1514
1515 def test_ctx_manager(self):
1516 with psutil.Popen([PYTHON_EXE, "-V"],
1517 stdout=subprocess.PIPE,
1518 stderr=subprocess.PIPE,
1519 stdin=subprocess.PIPE) as proc:
1520 proc.communicate()
1521 assert proc.stdout.closed
1522 assert proc.stderr.closed
1523 assert proc.stdin.closed
1524 self.assertEqual(proc.returncode, 0)
1525
1526 def test_kill_terminate(self):
1527 # subprocess.Popen()'s terminate(), kill() and send_signal() do
1528 # not raise exception after the process is gone. psutil.Popen
1529 # diverges from that.
1530 cmd = [PYTHON_EXE, "-c", "import time; time.sleep(60);"]
1531 with psutil.Popen(cmd, stdout=subprocess.PIPE,
1532 stderr=subprocess.PIPE) as proc:
1533 proc.terminate()
1534 proc.wait()
1535 self.assertRaises(psutil.NoSuchProcess, proc.terminate)
1536 self.assertRaises(psutil.NoSuchProcess, proc.kill)
1537 self.assertRaises(psutil.NoSuchProcess, proc.send_signal,
1538 signal.SIGTERM)
1539 if WINDOWS and sys.version_info >= (2, 7):
1540 self.assertRaises(psutil.NoSuchProcess, proc.send_signal,
1541 signal.CTRL_C_EVENT)
1542 self.assertRaises(psutil.NoSuchProcess, proc.send_signal,
1543 signal.CTRL_BREAK_EVENT)
1544
1545
1546 if __name__ == '__main__':
1547 from psutil.tests.runner import run_from_name
1548 run_from_name(__file__)