Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/psutil/tests/test_process.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
| author | shellac |
|---|---|
| date | Mon, 01 Jun 2020 08:59:25 -0400 |
| parents | 79f47841a781 |
| children |
comparison
equal
deleted
inserted
replaced
| 4:79f47841a781 | 5:9b1c78e6ba9c |
|---|---|
| 1 #!/usr/bin/env python3 | |
| 2 | |
| 3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. | |
| 4 # Use of this source code is governed by a BSD-style license that can be | |
| 5 # found in the LICENSE file. | |
| 6 | |
| 7 """Tests for psutil.Process class.""" | |
| 8 | |
| 9 import collections | |
| 10 import errno | |
| 11 import getpass | |
| 12 import itertools | |
| 13 import os | |
| 14 import signal | |
| 15 import socket | |
| 16 import subprocess | |
| 17 import sys | |
| 18 import tempfile | |
| 19 import textwrap | |
| 20 import time | |
| 21 import types | |
| 22 | |
| 23 import psutil | |
| 24 | |
| 25 from psutil import AIX | |
| 26 from psutil import BSD | |
| 27 from psutil import LINUX | |
| 28 from psutil import MACOS | |
| 29 from psutil import NETBSD | |
| 30 from psutil import OPENBSD | |
| 31 from psutil import OSX | |
| 32 from psutil import POSIX | |
| 33 from psutil import SUNOS | |
| 34 from psutil import WINDOWS | |
| 35 from psutil._common import open_text | |
| 36 from psutil._compat import long | |
| 37 from psutil._compat import PY3 | |
| 38 from psutil.tests import APPVEYOR | |
| 39 from psutil.tests import call_until | |
| 40 from psutil.tests import CIRRUS | |
| 41 from psutil.tests import copyload_shared_lib | |
| 42 from psutil.tests import create_exe | |
| 43 from psutil.tests import create_proc_children_pair | |
| 44 from psutil.tests import create_zombie_proc | |
| 45 from psutil.tests import enum | |
| 46 from psutil.tests import get_test_subprocess | |
| 47 from psutil.tests import HAS_CPU_AFFINITY | |
| 48 from psutil.tests import HAS_ENVIRON | |
| 49 from psutil.tests import HAS_IONICE | |
| 50 from psutil.tests import HAS_MEMORY_MAPS | |
| 51 from psutil.tests import HAS_PROC_CPU_NUM | |
| 52 from psutil.tests import HAS_PROC_IO_COUNTERS | |
| 53 from psutil.tests import HAS_RLIMIT | |
| 54 from psutil.tests import HAS_THREADS | |
| 55 from psutil.tests import mock | |
| 56 from psutil.tests import PYPY | |
| 57 from psutil.tests import PYTHON_EXE | |
| 58 from psutil.tests import reap_children | |
| 59 from psutil.tests import retry_on_failure | |
| 60 from psutil.tests import safe_rmpath | |
| 61 from psutil.tests import sh | |
| 62 from psutil.tests import skip_on_access_denied | |
| 63 from psutil.tests import skip_on_not_implemented | |
| 64 from psutil.tests import TESTFILE_PREFIX | |
| 65 from psutil.tests import TESTFN | |
| 66 from psutil.tests import ThreadTask | |
| 67 from psutil.tests import TRAVIS | |
| 68 from psutil.tests import unittest | |
| 69 from psutil.tests import wait_for_pid | |
| 70 | |
| 71 | |
| 72 # =================================================================== | |
| 73 # --- psutil.Process class tests | |
| 74 # =================================================================== | |
| 75 | |
| 76 class TestProcess(unittest.TestCase): | |
| 77 """Tests for psutil.Process class.""" | |
| 78 | |
| 79 def setUp(self): | |
| 80 safe_rmpath(TESTFN) | |
| 81 | |
| 82 def tearDown(self): | |
| 83 reap_children() | |
| 84 | |
| 85 def test_pid(self): | |
| 86 p = psutil.Process() | |
| 87 self.assertEqual(p.pid, os.getpid()) | |
| 88 sproc = get_test_subprocess() | |
| 89 self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid) | |
| 90 with self.assertRaises(AttributeError): | |
| 91 p.pid = 33 | |
| 92 | |
| 93 def test_kill(self): | |
| 94 sproc = get_test_subprocess() | |
| 95 test_pid = sproc.pid | |
| 96 p = psutil.Process(test_pid) | |
| 97 p.kill() | |
| 98 sig = p.wait() | |
| 99 self.assertFalse(psutil.pid_exists(test_pid)) | |
| 100 if POSIX: | |
| 101 self.assertEqual(sig, -signal.SIGKILL) | |
| 102 | |
| 103 def test_terminate(self): | |
| 104 sproc = get_test_subprocess() | |
| 105 test_pid = sproc.pid | |
| 106 p = psutil.Process(test_pid) | |
| 107 p.terminate() | |
| 108 sig = p.wait() | |
| 109 self.assertFalse(psutil.pid_exists(test_pid)) | |
| 110 if POSIX: | |
| 111 self.assertEqual(sig, -signal.SIGTERM) | |
| 112 | |
| 113 def test_send_signal(self): | |
| 114 sig = signal.SIGKILL if POSIX else signal.SIGTERM | |
| 115 sproc = get_test_subprocess() | |
| 116 p = psutil.Process(sproc.pid) | |
| 117 p.send_signal(sig) | |
| 118 exit_sig = p.wait() | |
| 119 self.assertFalse(psutil.pid_exists(p.pid)) | |
| 120 if POSIX: | |
| 121 self.assertEqual(exit_sig, -sig) | |
| 122 # | |
| 123 sproc = get_test_subprocess() | |
| 124 p = psutil.Process(sproc.pid) | |
| 125 p.send_signal(sig) | |
| 126 with mock.patch('psutil.os.kill', | |
| 127 side_effect=OSError(errno.ESRCH, "")): | |
| 128 with self.assertRaises(psutil.NoSuchProcess): | |
| 129 p.send_signal(sig) | |
| 130 # | |
| 131 sproc = get_test_subprocess() | |
| 132 p = psutil.Process(sproc.pid) | |
| 133 p.send_signal(sig) | |
| 134 with mock.patch('psutil.os.kill', | |
| 135 side_effect=OSError(errno.EPERM, "")): | |
| 136 with self.assertRaises(psutil.AccessDenied): | |
| 137 psutil.Process().send_signal(sig) | |
| 138 # Sending a signal to process with PID 0 is not allowed as | |
| 139 # it would affect every process in the process group of | |
| 140 # the calling process (os.getpid()) instead of PID 0"). | |
| 141 if 0 in psutil.pids(): | |
| 142 p = psutil.Process(0) | |
| 143 self.assertRaises(ValueError, p.send_signal, signal.SIGTERM) | |
| 144 | |
| 145 def test_wait(self): | |
| 146 # check exit code signal | |
| 147 sproc = get_test_subprocess() | |
| 148 p = psutil.Process(sproc.pid) | |
| 149 p.kill() | |
| 150 code = p.wait() | |
| 151 if POSIX: | |
| 152 self.assertEqual(code, -signal.SIGKILL) | |
| 153 else: | |
| 154 self.assertEqual(code, signal.SIGTERM) | |
| 155 self.assertFalse(p.is_running()) | |
| 156 | |
| 157 sproc = get_test_subprocess() | |
| 158 p = psutil.Process(sproc.pid) | |
| 159 p.terminate() | |
| 160 code = p.wait() | |
| 161 if POSIX: | |
| 162 self.assertEqual(code, -signal.SIGTERM) | |
| 163 else: | |
| 164 self.assertEqual(code, signal.SIGTERM) | |
| 165 self.assertFalse(p.is_running()) | |
| 166 | |
| 167 # check sys.exit() code | |
| 168 code = "import time, sys; time.sleep(0.01); sys.exit(5);" | |
| 169 sproc = get_test_subprocess([PYTHON_EXE, "-c", code]) | |
| 170 p = psutil.Process(sproc.pid) | |
| 171 self.assertEqual(p.wait(), 5) | |
| 172 self.assertFalse(p.is_running()) | |
| 173 | |
| 174 # Test wait() issued twice. | |
| 175 # It is not supposed to raise NSP when the process is gone. | |
| 176 # On UNIX this should return None, on Windows it should keep | |
| 177 # returning the exit code. | |
| 178 sproc = get_test_subprocess([PYTHON_EXE, "-c", code]) | |
| 179 p = psutil.Process(sproc.pid) | |
| 180 self.assertEqual(p.wait(), 5) | |
| 181 self.assertIn(p.wait(), (5, None)) | |
| 182 | |
| 183 # test timeout | |
| 184 sproc = get_test_subprocess() | |
| 185 p = psutil.Process(sproc.pid) | |
| 186 p.name() | |
| 187 self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01) | |
| 188 | |
| 189 # timeout < 0 not allowed | |
| 190 self.assertRaises(ValueError, p.wait, -1) | |
| 191 | |
| 192 def test_wait_non_children(self): | |
| 193 # Test wait() against a process which is not our direct | |
| 194 # child. | |
| 195 p1, p2 = create_proc_children_pair() | |
| 196 self.assertRaises(psutil.TimeoutExpired, p1.wait, 0.01) | |
| 197 self.assertRaises(psutil.TimeoutExpired, p2.wait, 0.01) | |
| 198 # We also terminate the direct child otherwise the | |
| 199 # grandchild will hang until the parent is gone. | |
| 200 p1.terminate() | |
| 201 p2.terminate() | |
| 202 ret1 = p1.wait() | |
| 203 ret2 = p2.wait() | |
| 204 if POSIX: | |
| 205 self.assertEqual(ret1, -signal.SIGTERM) | |
| 206 # For processes which are not our children we're supposed | |
| 207 # to get None. | |
| 208 self.assertEqual(ret2, None) | |
| 209 else: | |
| 210 self.assertEqual(ret1, signal.SIGTERM) | |
| 211 self.assertEqual(ret1, signal.SIGTERM) | |
| 212 | |
| 213 def test_wait_timeout_0(self): | |
| 214 sproc = get_test_subprocess() | |
| 215 p = psutil.Process(sproc.pid) | |
| 216 self.assertRaises(psutil.TimeoutExpired, p.wait, 0) | |
| 217 p.kill() | |
| 218 stop_at = time.time() + 2 | |
| 219 while True: | |
| 220 try: | |
| 221 code = p.wait(0) | |
| 222 except psutil.TimeoutExpired: | |
| 223 if time.time() >= stop_at: | |
| 224 raise | |
| 225 else: | |
| 226 break | |
| 227 if POSIX: | |
| 228 self.assertEqual(code, -signal.SIGKILL) | |
| 229 else: | |
| 230 self.assertEqual(code, signal.SIGTERM) | |
| 231 self.assertFalse(p.is_running()) | |
| 232 | |
| 233 def test_cpu_percent(self): | |
| 234 p = psutil.Process() | |
| 235 p.cpu_percent(interval=0.001) | |
| 236 p.cpu_percent(interval=0.001) | |
| 237 for x in range(100): | |
| 238 percent = p.cpu_percent(interval=None) | |
| 239 self.assertIsInstance(percent, float) | |
| 240 self.assertGreaterEqual(percent, 0.0) | |
| 241 with self.assertRaises(ValueError): | |
| 242 p.cpu_percent(interval=-1) | |
| 243 | |
| 244 def test_cpu_percent_numcpus_none(self): | |
| 245 # See: https://github.com/giampaolo/psutil/issues/1087 | |
| 246 with mock.patch('psutil.cpu_count', return_value=None) as m: | |
| 247 psutil.Process().cpu_percent() | |
| 248 assert m.called | |
| 249 | |
| 250 def test_cpu_times(self): | |
| 251 times = psutil.Process().cpu_times() | |
| 252 assert (times.user > 0.0) or (times.system > 0.0), times | |
| 253 assert (times.children_user >= 0.0), times | |
| 254 assert (times.children_system >= 0.0), times | |
| 255 if LINUX: | |
| 256 assert times.iowait >= 0.0, times | |
| 257 # make sure returned values can be pretty printed with strftime | |
| 258 for name in times._fields: | |
| 259 time.strftime("%H:%M:%S", time.localtime(getattr(times, name))) | |
| 260 | |
| 261 def test_cpu_times_2(self): | |
| 262 user_time, kernel_time = psutil.Process().cpu_times()[:2] | |
| 263 utime, ktime = os.times()[:2] | |
| 264 | |
| 265 # Use os.times()[:2] as base values to compare our results | |
| 266 # using a tolerance of +/- 0.1 seconds. | |
| 267 # It will fail if the difference between the values is > 0.1s. | |
| 268 if (max([user_time, utime]) - min([user_time, utime])) > 0.1: | |
| 269 self.fail("expected: %s, found: %s" % (utime, user_time)) | |
| 270 | |
| 271 if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1: | |
| 272 self.fail("expected: %s, found: %s" % (ktime, kernel_time)) | |
| 273 | |
| 274 @unittest.skipIf(not HAS_PROC_CPU_NUM, "not supported") | |
| 275 def test_cpu_num(self): | |
| 276 p = psutil.Process() | |
| 277 num = p.cpu_num() | |
| 278 self.assertGreaterEqual(num, 0) | |
| 279 if psutil.cpu_count() == 1: | |
| 280 self.assertEqual(num, 0) | |
| 281 self.assertIn(p.cpu_num(), range(psutil.cpu_count())) | |
| 282 | |
| 283 def test_create_time(self): | |
| 284 sproc = get_test_subprocess() | |
| 285 now = time.time() | |
| 286 p = psutil.Process(sproc.pid) | |
| 287 create_time = p.create_time() | |
| 288 | |
| 289 # Use time.time() as base value to compare our result using a | |
| 290 # tolerance of +/- 1 second. | |
| 291 # It will fail if the difference between the values is > 2s. | |
| 292 difference = abs(create_time - now) | |
| 293 if difference > 2: | |
| 294 self.fail("expected: %s, found: %s, difference: %s" | |
| 295 % (now, create_time, difference)) | |
| 296 | |
| 297 # make sure returned value can be pretty printed with strftime | |
| 298 time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time())) | |
| 299 | |
| 300 @unittest.skipIf(not POSIX, 'POSIX only') | |
| 301 @unittest.skipIf(TRAVIS or CIRRUS, 'not reliable on TRAVIS/CIRRUS') | |
| 302 def test_terminal(self): | |
| 303 terminal = psutil.Process().terminal() | |
| 304 if sys.stdin.isatty() or sys.stdout.isatty(): | |
| 305 tty = os.path.realpath(sh('tty')) | |
| 306 self.assertEqual(terminal, tty) | |
| 307 else: | |
| 308 self.assertIsNone(terminal) | |
| 309 | |
| 310 @unittest.skipIf(not HAS_PROC_IO_COUNTERS, 'not supported') | |
| 311 @skip_on_not_implemented(only_if=LINUX) | |
| 312 def test_io_counters(self): | |
| 313 p = psutil.Process() | |
| 314 # test reads | |
| 315 io1 = p.io_counters() | |
| 316 with open(PYTHON_EXE, 'rb') as f: | |
| 317 f.read() | |
| 318 io2 = p.io_counters() | |
| 319 if not BSD and not AIX: | |
| 320 self.assertGreater(io2.read_count, io1.read_count) | |
| 321 self.assertEqual(io2.write_count, io1.write_count) | |
| 322 if LINUX: | |
| 323 self.assertGreater(io2.read_chars, io1.read_chars) | |
| 324 self.assertEqual(io2.write_chars, io1.write_chars) | |
| 325 else: | |
| 326 self.assertGreaterEqual(io2.read_bytes, io1.read_bytes) | |
| 327 self.assertGreaterEqual(io2.write_bytes, io1.write_bytes) | |
| 328 | |
| 329 # test writes | |
| 330 io1 = p.io_counters() | |
| 331 with tempfile.TemporaryFile(prefix=TESTFILE_PREFIX) as f: | |
| 332 if PY3: | |
| 333 f.write(bytes("x" * 1000000, 'ascii')) | |
| 334 else: | |
| 335 f.write("x" * 1000000) | |
| 336 io2 = p.io_counters() | |
| 337 self.assertGreaterEqual(io2.write_count, io1.write_count) | |
| 338 self.assertGreaterEqual(io2.write_bytes, io1.write_bytes) | |
| 339 self.assertGreaterEqual(io2.read_count, io1.read_count) | |
| 340 self.assertGreaterEqual(io2.read_bytes, io1.read_bytes) | |
| 341 if LINUX: | |
| 342 self.assertGreater(io2.write_chars, io1.write_chars) | |
| 343 self.assertGreaterEqual(io2.read_chars, io1.read_chars) | |
| 344 | |
| 345 # sanity check | |
| 346 for i in range(len(io2)): | |
| 347 if BSD and i >= 2: | |
| 348 # On BSD read_bytes and write_bytes are always set to -1. | |
| 349 continue | |
| 350 self.assertGreaterEqual(io2[i], 0) | |
| 351 self.assertGreaterEqual(io2[i], 0) | |
| 352 | |
| 353 @unittest.skipIf(not HAS_IONICE, "not supported") | |
| 354 @unittest.skipIf(not LINUX, "linux only") | |
| 355 def test_ionice_linux(self): | |
| 356 p = psutil.Process() | |
| 357 self.assertEqual(p.ionice()[0], psutil.IOPRIO_CLASS_NONE) | |
| 358 self.assertEqual(psutil.IOPRIO_CLASS_NONE, 0) | |
| 359 self.assertEqual(psutil.IOPRIO_CLASS_RT, 1) # high | |
| 360 self.assertEqual(psutil.IOPRIO_CLASS_BE, 2) # normal | |
| 361 self.assertEqual(psutil.IOPRIO_CLASS_IDLE, 3) # low | |
| 362 try: | |
| 363 # low | |
| 364 p.ionice(psutil.IOPRIO_CLASS_IDLE) | |
| 365 self.assertEqual(tuple(p.ionice()), (psutil.IOPRIO_CLASS_IDLE, 0)) | |
| 366 with self.assertRaises(ValueError): # accepts no value | |
| 367 p.ionice(psutil.IOPRIO_CLASS_IDLE, value=7) | |
| 368 # normal | |
| 369 p.ionice(psutil.IOPRIO_CLASS_BE) | |
| 370 self.assertEqual(tuple(p.ionice()), (psutil.IOPRIO_CLASS_BE, 0)) | |
| 371 p.ionice(psutil.IOPRIO_CLASS_BE, value=7) | |
| 372 self.assertEqual(tuple(p.ionice()), (psutil.IOPRIO_CLASS_BE, 7)) | |
| 373 with self.assertRaises(ValueError): | |
| 374 p.ionice(psutil.IOPRIO_CLASS_BE, value=8) | |
| 375 # high | |
| 376 if os.getuid() == 0: # root | |
| 377 p.ionice(psutil.IOPRIO_CLASS_RT) | |
| 378 self.assertEqual(tuple(p.ionice()), | |
| 379 (psutil.IOPRIO_CLASS_RT, 0)) | |
| 380 p.ionice(psutil.IOPRIO_CLASS_RT, value=7) | |
| 381 self.assertEqual(tuple(p.ionice()), | |
| 382 (psutil.IOPRIO_CLASS_RT, 7)) | |
| 383 with self.assertRaises(ValueError): | |
| 384 p.ionice(psutil.IOPRIO_CLASS_IDLE, value=8) | |
| 385 # errs | |
| 386 self.assertRaisesRegex( | |
| 387 ValueError, "ioclass accepts no value", | |
| 388 p.ionice, psutil.IOPRIO_CLASS_NONE, 1) | |
| 389 self.assertRaisesRegex( | |
| 390 ValueError, "ioclass accepts no value", | |
| 391 p.ionice, psutil.IOPRIO_CLASS_IDLE, 1) | |
| 392 self.assertRaisesRegex( | |
| 393 ValueError, "'ioclass' argument must be specified", | |
| 394 p.ionice, value=1) | |
| 395 finally: | |
| 396 p.ionice(psutil.IOPRIO_CLASS_BE) | |
| 397 | |
| 398 @unittest.skipIf(not HAS_IONICE, "not supported") | |
| 399 @unittest.skipIf(not WINDOWS, 'not supported on this win version') | |
| 400 def test_ionice_win(self): | |
| 401 p = psutil.Process() | |
| 402 self.assertEqual(p.ionice(), psutil.IOPRIO_NORMAL) | |
| 403 try: | |
| 404 # base | |
| 405 p.ionice(psutil.IOPRIO_VERYLOW) | |
| 406 self.assertEqual(p.ionice(), psutil.IOPRIO_VERYLOW) | |
| 407 p.ionice(psutil.IOPRIO_LOW) | |
| 408 self.assertEqual(p.ionice(), psutil.IOPRIO_LOW) | |
| 409 try: | |
| 410 p.ionice(psutil.IOPRIO_HIGH) | |
| 411 except psutil.AccessDenied: | |
| 412 pass | |
| 413 else: | |
| 414 self.assertEqual(p.ionice(), psutil.IOPRIO_HIGH) | |
| 415 # errs | |
| 416 self.assertRaisesRegex( | |
| 417 TypeError, "value argument not accepted on Windows", | |
| 418 p.ionice, psutil.IOPRIO_NORMAL, value=1) | |
| 419 self.assertRaisesRegex( | |
| 420 ValueError, "is not a valid priority", | |
| 421 p.ionice, psutil.IOPRIO_HIGH + 1) | |
| 422 finally: | |
| 423 p.ionice(psutil.IOPRIO_NORMAL) | |
| 424 self.assertEqual(p.ionice(), psutil.IOPRIO_NORMAL) | |
| 425 | |
| 426 @unittest.skipIf(not HAS_RLIMIT, "not supported") | |
| 427 def test_rlimit_get(self): | |
| 428 import resource | |
| 429 p = psutil.Process(os.getpid()) | |
| 430 names = [x for x in dir(psutil) if x.startswith('RLIMIT')] | |
| 431 assert names, names | |
| 432 for name in names: | |
| 433 value = getattr(psutil, name) | |
| 434 self.assertGreaterEqual(value, 0) | |
| 435 if name in dir(resource): | |
| 436 self.assertEqual(value, getattr(resource, name)) | |
| 437 # XXX - On PyPy RLIMIT_INFINITY returned by | |
| 438 # resource.getrlimit() is reported as a very big long | |
| 439 # number instead of -1. It looks like a bug with PyPy. | |
| 440 if PYPY: | |
| 441 continue | |
| 442 self.assertEqual(p.rlimit(value), resource.getrlimit(value)) | |
| 443 else: | |
| 444 ret = p.rlimit(value) | |
| 445 self.assertEqual(len(ret), 2) | |
| 446 self.assertGreaterEqual(ret[0], -1) | |
| 447 self.assertGreaterEqual(ret[1], -1) | |
| 448 | |
| 449 @unittest.skipIf(not HAS_RLIMIT, "not supported") | |
| 450 def test_rlimit_set(self): | |
| 451 sproc = get_test_subprocess() | |
| 452 p = psutil.Process(sproc.pid) | |
| 453 p.rlimit(psutil.RLIMIT_NOFILE, (5, 5)) | |
| 454 self.assertEqual(p.rlimit(psutil.RLIMIT_NOFILE), (5, 5)) | |
| 455 # If pid is 0 prlimit() applies to the calling process and | |
| 456 # we don't want that. | |
| 457 with self.assertRaises(ValueError): | |
| 458 psutil._psplatform.Process(0).rlimit(0) | |
| 459 with self.assertRaises(ValueError): | |
| 460 p.rlimit(psutil.RLIMIT_NOFILE, (5, 5, 5)) | |
| 461 | |
| 462 @unittest.skipIf(not HAS_RLIMIT, "not supported") | |
| 463 def test_rlimit(self): | |
| 464 p = psutil.Process() | |
| 465 soft, hard = p.rlimit(psutil.RLIMIT_FSIZE) | |
| 466 try: | |
| 467 p.rlimit(psutil.RLIMIT_FSIZE, (1024, hard)) | |
| 468 with open(TESTFN, "wb") as f: | |
| 469 f.write(b"X" * 1024) | |
| 470 # write() or flush() doesn't always cause the exception | |
| 471 # but close() will. | |
| 472 with self.assertRaises(IOError) as exc: | |
| 473 with open(TESTFN, "wb") as f: | |
| 474 f.write(b"X" * 1025) | |
| 475 self.assertEqual(exc.exception.errno if PY3 else exc.exception[0], | |
| 476 errno.EFBIG) | |
| 477 finally: | |
| 478 p.rlimit(psutil.RLIMIT_FSIZE, (soft, hard)) | |
| 479 self.assertEqual(p.rlimit(psutil.RLIMIT_FSIZE), (soft, hard)) | |
| 480 | |
| 481 @unittest.skipIf(not HAS_RLIMIT, "not supported") | |
| 482 def test_rlimit_infinity(self): | |
| 483 # First set a limit, then re-set it by specifying INFINITY | |
| 484 # and assume we overridden the previous limit. | |
| 485 p = psutil.Process() | |
| 486 soft, hard = p.rlimit(psutil.RLIMIT_FSIZE) | |
| 487 try: | |
| 488 p.rlimit(psutil.RLIMIT_FSIZE, (1024, hard)) | |
| 489 p.rlimit(psutil.RLIMIT_FSIZE, (psutil.RLIM_INFINITY, hard)) | |
| 490 with open(TESTFN, "wb") as f: | |
| 491 f.write(b"X" * 2048) | |
| 492 finally: | |
| 493 p.rlimit(psutil.RLIMIT_FSIZE, (soft, hard)) | |
| 494 self.assertEqual(p.rlimit(psutil.RLIMIT_FSIZE), (soft, hard)) | |
| 495 | |
| 496 @unittest.skipIf(not HAS_RLIMIT, "not supported") | |
| 497 def test_rlimit_infinity_value(self): | |
| 498 # RLIMIT_FSIZE should be RLIM_INFINITY, which will be a really | |
| 499 # big number on a platform with large file support. On these | |
| 500 # platforms we need to test that the get/setrlimit functions | |
| 501 # properly convert the number to a C long long and that the | |
| 502 # conversion doesn't raise an error. | |
| 503 p = psutil.Process() | |
| 504 soft, hard = p.rlimit(psutil.RLIMIT_FSIZE) | |
| 505 self.assertEqual(psutil.RLIM_INFINITY, hard) | |
| 506 p.rlimit(psutil.RLIMIT_FSIZE, (soft, hard)) | |
| 507 | |
| 508 def test_num_threads(self): | |
| 509 # on certain platforms such as Linux we might test for exact | |
| 510 # thread number, since we always have with 1 thread per process, | |
| 511 # but this does not apply across all platforms (MACOS, Windows) | |
| 512 p = psutil.Process() | |
| 513 if OPENBSD: | |
| 514 try: | |
| 515 step1 = p.num_threads() | |
| 516 except psutil.AccessDenied: | |
| 517 raise unittest.SkipTest("on OpenBSD this requires root access") | |
| 518 else: | |
| 519 step1 = p.num_threads() | |
| 520 | |
| 521 with ThreadTask(): | |
| 522 step2 = p.num_threads() | |
| 523 self.assertEqual(step2, step1 + 1) | |
| 524 | |
| 525 @unittest.skipIf(not WINDOWS, 'WINDOWS only') | |
| 526 def test_num_handles(self): | |
| 527 # a better test is done later into test/_windows.py | |
| 528 p = psutil.Process() | |
| 529 self.assertGreater(p.num_handles(), 0) | |
| 530 | |
| 531 @unittest.skipIf(not HAS_THREADS, 'not supported') | |
| 532 def test_threads(self): | |
| 533 p = psutil.Process() | |
| 534 if OPENBSD: | |
| 535 try: | |
| 536 step1 = p.threads() | |
| 537 except psutil.AccessDenied: | |
| 538 raise unittest.SkipTest("on OpenBSD this requires root access") | |
| 539 else: | |
| 540 step1 = p.threads() | |
| 541 | |
| 542 with ThreadTask(): | |
| 543 step2 = p.threads() | |
| 544 self.assertEqual(len(step2), len(step1) + 1) | |
| 545 athread = step2[0] | |
| 546 # test named tuple | |
| 547 self.assertEqual(athread.id, athread[0]) | |
| 548 self.assertEqual(athread.user_time, athread[1]) | |
| 549 self.assertEqual(athread.system_time, athread[2]) | |
| 550 | |
| 551 @retry_on_failure() | |
| 552 @skip_on_access_denied(only_if=MACOS) | |
| 553 @unittest.skipIf(not HAS_THREADS, 'not supported') | |
| 554 def test_threads_2(self): | |
| 555 sproc = get_test_subprocess() | |
| 556 p = psutil.Process(sproc.pid) | |
| 557 if OPENBSD: | |
| 558 try: | |
| 559 p.threads() | |
| 560 except psutil.AccessDenied: | |
| 561 raise unittest.SkipTest( | |
| 562 "on OpenBSD this requires root access") | |
| 563 self.assertAlmostEqual( | |
| 564 p.cpu_times().user, | |
| 565 sum([x.user_time for x in p.threads()]), delta=0.1) | |
| 566 self.assertAlmostEqual( | |
| 567 p.cpu_times().system, | |
| 568 sum([x.system_time for x in p.threads()]), delta=0.1) | |
| 569 | |
| 570 def test_memory_info(self): | |
| 571 p = psutil.Process() | |
| 572 | |
| 573 # step 1 - get a base value to compare our results | |
| 574 rss1, vms1 = p.memory_info()[:2] | |
| 575 percent1 = p.memory_percent() | |
| 576 self.assertGreater(rss1, 0) | |
| 577 self.assertGreater(vms1, 0) | |
| 578 | |
| 579 # step 2 - allocate some memory | |
| 580 memarr = [None] * 1500000 | |
| 581 | |
| 582 rss2, vms2 = p.memory_info()[:2] | |
| 583 percent2 = p.memory_percent() | |
| 584 | |
| 585 # step 3 - make sure that the memory usage bumped up | |
| 586 self.assertGreater(rss2, rss1) | |
| 587 self.assertGreaterEqual(vms2, vms1) # vms might be equal | |
| 588 self.assertGreater(percent2, percent1) | |
| 589 del memarr | |
| 590 | |
| 591 if WINDOWS: | |
| 592 mem = p.memory_info() | |
| 593 self.assertEqual(mem.rss, mem.wset) | |
| 594 self.assertEqual(mem.vms, mem.pagefile) | |
| 595 | |
| 596 mem = p.memory_info() | |
| 597 for name in mem._fields: | |
| 598 self.assertGreaterEqual(getattr(mem, name), 0) | |
| 599 | |
| 600 def test_memory_full_info(self): | |
| 601 total = psutil.virtual_memory().total | |
| 602 mem = psutil.Process().memory_full_info() | |
| 603 for name in mem._fields: | |
| 604 value = getattr(mem, name) | |
| 605 self.assertGreaterEqual(value, 0, msg=(name, value)) | |
| 606 if name == 'vms' and OSX or LINUX: | |
| 607 continue | |
| 608 self.assertLessEqual(value, total, msg=(name, value, total)) | |
| 609 if LINUX or WINDOWS or MACOS: | |
| 610 self.assertGreaterEqual(mem.uss, 0) | |
| 611 if LINUX: | |
| 612 self.assertGreaterEqual(mem.pss, 0) | |
| 613 self.assertGreaterEqual(mem.swap, 0) | |
| 614 | |
| 615 @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported") | |
| 616 def test_memory_maps(self): | |
| 617 p = psutil.Process() | |
| 618 maps = p.memory_maps() | |
| 619 paths = [x for x in maps] | |
| 620 self.assertEqual(len(paths), len(set(paths))) | |
| 621 ext_maps = p.memory_maps(grouped=False) | |
| 622 | |
| 623 for nt in maps: | |
| 624 if not nt.path.startswith('['): | |
| 625 assert os.path.isabs(nt.path), nt.path | |
| 626 if POSIX: | |
| 627 try: | |
| 628 assert os.path.exists(nt.path) or \ | |
| 629 os.path.islink(nt.path), nt.path | |
| 630 except AssertionError: | |
| 631 if not LINUX: | |
| 632 raise | |
| 633 else: | |
| 634 # https://github.com/giampaolo/psutil/issues/759 | |
| 635 with open_text('/proc/self/smaps') as f: | |
| 636 data = f.read() | |
| 637 if "%s (deleted)" % nt.path not in data: | |
| 638 raise | |
| 639 else: | |
| 640 # XXX - On Windows we have this strange behavior with | |
| 641 # 64 bit dlls: they are visible via explorer but cannot | |
| 642 # be accessed via os.stat() (wtf?). | |
| 643 if '64' not in os.path.basename(nt.path): | |
| 644 assert os.path.exists(nt.path), nt.path | |
| 645 for nt in ext_maps: | |
| 646 for fname in nt._fields: | |
| 647 value = getattr(nt, fname) | |
| 648 if fname == 'path': | |
| 649 continue | |
| 650 elif fname in ('addr', 'perms'): | |
| 651 assert value, value | |
| 652 else: | |
| 653 self.assertIsInstance(value, (int, long)) | |
| 654 assert value >= 0, value | |
| 655 | |
| 656 @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported") | |
| 657 def test_memory_maps_lists_lib(self): | |
| 658 # Make sure a newly loaded shared lib is listed. | |
| 659 with copyload_shared_lib() as path: | |
| 660 def normpath(p): | |
| 661 return os.path.realpath(os.path.normcase(p)) | |
| 662 libpaths = [normpath(x.path) | |
| 663 for x in psutil.Process().memory_maps()] | |
| 664 self.assertIn(normpath(path), libpaths) | |
| 665 | |
| 666 def test_memory_percent(self): | |
| 667 p = psutil.Process() | |
| 668 p.memory_percent() | |
| 669 self.assertRaises(ValueError, p.memory_percent, memtype="?!?") | |
| 670 if LINUX or MACOS or WINDOWS: | |
| 671 p.memory_percent(memtype='uss') | |
| 672 | |
| 673 def test_is_running(self): | |
| 674 sproc = get_test_subprocess() | |
| 675 p = psutil.Process(sproc.pid) | |
| 676 assert p.is_running() | |
| 677 assert p.is_running() | |
| 678 p.kill() | |
| 679 p.wait() | |
| 680 assert not p.is_running() | |
| 681 assert not p.is_running() | |
| 682 | |
| 683 def test_exe(self): | |
| 684 sproc = get_test_subprocess() | |
| 685 exe = psutil.Process(sproc.pid).exe() | |
| 686 try: | |
| 687 self.assertEqual(exe, PYTHON_EXE) | |
| 688 except AssertionError: | |
| 689 if WINDOWS and len(exe) == len(PYTHON_EXE): | |
| 690 # on Windows we don't care about case sensitivity | |
| 691 normcase = os.path.normcase | |
| 692 self.assertEqual(normcase(exe), normcase(PYTHON_EXE)) | |
| 693 else: | |
| 694 # certain platforms such as BSD are more accurate returning: | |
| 695 # "/usr/local/bin/python2.7" | |
| 696 # ...instead of: | |
| 697 # "/usr/local/bin/python" | |
| 698 # We do not want to consider this difference in accuracy | |
| 699 # an error. | |
| 700 ver = "%s.%s" % (sys.version_info[0], sys.version_info[1]) | |
| 701 try: | |
| 702 self.assertEqual(exe.replace(ver, ''), | |
| 703 PYTHON_EXE.replace(ver, '')) | |
| 704 except AssertionError: | |
| 705 # Tipically MACOS. Really not sure what to do here. | |
| 706 pass | |
| 707 | |
| 708 out = sh([exe, "-c", "import os; print('hey')"]) | |
| 709 self.assertEqual(out, 'hey') | |
| 710 | |
| 711 def test_cmdline(self): | |
| 712 cmdline = [PYTHON_EXE, "-c", "import time; time.sleep(60)"] | |
| 713 sproc = get_test_subprocess(cmdline) | |
| 714 try: | |
| 715 self.assertEqual(' '.join(psutil.Process(sproc.pid).cmdline()), | |
| 716 ' '.join(cmdline)) | |
| 717 except AssertionError: | |
| 718 # XXX - most of the times the underlying sysctl() call on Net | |
| 719 # and Open BSD returns a truncated string. | |
| 720 # Also /proc/pid/cmdline behaves the same so it looks | |
| 721 # like this is a kernel bug. | |
| 722 # XXX - AIX truncates long arguments in /proc/pid/cmdline | |
| 723 if NETBSD or OPENBSD or AIX: | |
| 724 self.assertEqual( | |
| 725 psutil.Process(sproc.pid).cmdline()[0], PYTHON_EXE) | |
| 726 else: | |
| 727 raise | |
| 728 | |
| 729 @unittest.skipIf(PYPY, "broken on PYPY") | |
| 730 def test_long_cmdline(self): | |
| 731 create_exe(TESTFN) | |
| 732 self.addCleanup(safe_rmpath, TESTFN) | |
| 733 cmdline = [TESTFN] + (["0123456789"] * 20) | |
| 734 sproc = get_test_subprocess(cmdline) | |
| 735 p = psutil.Process(sproc.pid) | |
| 736 self.assertEqual(p.cmdline(), cmdline) | |
| 737 | |
| 738 def test_name(self): | |
| 739 sproc = get_test_subprocess(PYTHON_EXE) | |
| 740 name = psutil.Process(sproc.pid).name().lower() | |
| 741 pyexe = os.path.basename(os.path.realpath(sys.executable)).lower() | |
| 742 assert pyexe.startswith(name), (pyexe, name) | |
| 743 | |
| 744 @unittest.skipIf(PYPY, "unreliable on PYPY") | |
| 745 def test_long_name(self): | |
| 746 long_name = TESTFN + ("0123456789" * 2) | |
| 747 create_exe(long_name) | |
| 748 self.addCleanup(safe_rmpath, long_name) | |
| 749 sproc = get_test_subprocess(long_name) | |
| 750 p = psutil.Process(sproc.pid) | |
| 751 self.assertEqual(p.name(), os.path.basename(long_name)) | |
| 752 | |
| 753 # XXX | |
| 754 @unittest.skipIf(SUNOS, "broken on SUNOS") | |
| 755 @unittest.skipIf(AIX, "broken on AIX") | |
| 756 @unittest.skipIf(PYPY, "broken on PYPY") | |
| 757 def test_prog_w_funky_name(self): | |
| 758 # Test that name(), exe() and cmdline() correctly handle programs | |
| 759 # with funky chars such as spaces and ")", see: | |
| 760 # https://github.com/giampaolo/psutil/issues/628 | |
| 761 | |
| 762 def rm(): | |
| 763 # Try to limit occasional failures on Appveyor: | |
| 764 # https://ci.appveyor.com/project/giampaolo/psutil/build/1350/ | |
| 765 # job/lbo3bkju55le850n | |
| 766 try: | |
| 767 safe_rmpath(funky_path) | |
| 768 except OSError: | |
| 769 pass | |
| 770 | |
| 771 funky_path = TESTFN + 'foo bar )' | |
| 772 create_exe(funky_path) | |
| 773 self.addCleanup(rm) | |
| 774 cmdline = [funky_path, "-c", | |
| 775 "import time; [time.sleep(0.01) for x in range(3000)];" | |
| 776 "arg1", "arg2", "", "arg3", ""] | |
| 777 sproc = get_test_subprocess(cmdline) | |
| 778 p = psutil.Process(sproc.pid) | |
| 779 # ...in order to try to prevent occasional failures on travis | |
| 780 if TRAVIS: | |
| 781 wait_for_pid(p.pid) | |
| 782 self.assertEqual(p.cmdline(), cmdline) | |
| 783 self.assertEqual(p.name(), os.path.basename(funky_path)) | |
| 784 self.assertEqual(os.path.normcase(p.exe()), | |
| 785 os.path.normcase(funky_path)) | |
| 786 | |
| 787 @unittest.skipIf(not POSIX, 'POSIX only') | |
| 788 def test_uids(self): | |
| 789 p = psutil.Process() | |
| 790 real, effective, saved = p.uids() | |
| 791 # os.getuid() refers to "real" uid | |
| 792 self.assertEqual(real, os.getuid()) | |
| 793 # os.geteuid() refers to "effective" uid | |
| 794 self.assertEqual(effective, os.geteuid()) | |
| 795 # No such thing as os.getsuid() ("saved" uid), but starting | |
| 796 # from python 2.7 we have os.getresuid() which returns all | |
| 797 # of them. | |
| 798 if hasattr(os, "getresuid"): | |
| 799 self.assertEqual(os.getresuid(), p.uids()) | |
| 800 | |
| 801 @unittest.skipIf(not POSIX, 'POSIX only') | |
| 802 def test_gids(self): | |
| 803 p = psutil.Process() | |
| 804 real, effective, saved = p.gids() | |
| 805 # os.getuid() refers to "real" uid | |
| 806 self.assertEqual(real, os.getgid()) | |
| 807 # os.geteuid() refers to "effective" uid | |
| 808 self.assertEqual(effective, os.getegid()) | |
| 809 # No such thing as os.getsgid() ("saved" gid), but starting | |
| 810 # from python 2.7 we have os.getresgid() which returns all | |
| 811 # of them. | |
| 812 if hasattr(os, "getresuid"): | |
| 813 self.assertEqual(os.getresgid(), p.gids()) | |
| 814 | |
| 815 def test_nice(self): | |
| 816 p = psutil.Process() | |
| 817 self.assertRaises(TypeError, p.nice, "str") | |
| 818 if WINDOWS: | |
| 819 try: | |
| 820 init = p.nice() | |
| 821 if sys.version_info > (3, 4): | |
| 822 self.assertIsInstance(init, enum.IntEnum) | |
| 823 else: | |
| 824 self.assertIsInstance(init, int) | |
| 825 self.assertEqual(init, psutil.NORMAL_PRIORITY_CLASS) | |
| 826 p.nice(psutil.HIGH_PRIORITY_CLASS) | |
| 827 self.assertEqual(p.nice(), psutil.HIGH_PRIORITY_CLASS) | |
| 828 p.nice(psutil.NORMAL_PRIORITY_CLASS) | |
| 829 self.assertEqual(p.nice(), psutil.NORMAL_PRIORITY_CLASS) | |
| 830 finally: | |
| 831 p.nice(psutil.NORMAL_PRIORITY_CLASS) | |
| 832 else: | |
| 833 first_nice = p.nice() | |
| 834 try: | |
| 835 if hasattr(os, "getpriority"): | |
| 836 self.assertEqual( | |
| 837 os.getpriority(os.PRIO_PROCESS, os.getpid()), p.nice()) | |
| 838 p.nice(1) | |
| 839 self.assertEqual(p.nice(), 1) | |
| 840 if hasattr(os, "getpriority"): | |
| 841 self.assertEqual( | |
| 842 os.getpriority(os.PRIO_PROCESS, os.getpid()), p.nice()) | |
| 843 # XXX - going back to previous nice value raises | |
| 844 # AccessDenied on MACOS | |
| 845 if not MACOS: | |
| 846 p.nice(0) | |
| 847 self.assertEqual(p.nice(), 0) | |
| 848 except psutil.AccessDenied: | |
| 849 pass | |
| 850 finally: | |
| 851 try: | |
| 852 p.nice(first_nice) | |
| 853 except psutil.AccessDenied: | |
| 854 pass | |
| 855 | |
| 856 def test_status(self): | |
| 857 p = psutil.Process() | |
| 858 self.assertEqual(p.status(), psutil.STATUS_RUNNING) | |
| 859 | |
| 860 def test_username(self): | |
| 861 sproc = get_test_subprocess() | |
| 862 p = psutil.Process(sproc.pid) | |
| 863 username = p.username() | |
| 864 if WINDOWS: | |
| 865 domain, username = username.split('\\') | |
| 866 self.assertEqual(username, getpass.getuser()) | |
| 867 if 'USERDOMAIN' in os.environ: | |
| 868 self.assertEqual(domain, os.environ['USERDOMAIN']) | |
| 869 else: | |
| 870 self.assertEqual(username, getpass.getuser()) | |
| 871 | |
| 872 def test_cwd(self): | |
| 873 sproc = get_test_subprocess() | |
| 874 p = psutil.Process(sproc.pid) | |
| 875 self.assertEqual(p.cwd(), os.getcwd()) | |
| 876 | |
| 877 def test_cwd_2(self): | |
| 878 cmd = [PYTHON_EXE, "-c", | |
| 879 "import os, time; os.chdir('..'); time.sleep(60)"] | |
| 880 sproc = get_test_subprocess(cmd) | |
| 881 p = psutil.Process(sproc.pid) | |
| 882 call_until(p.cwd, "ret == os.path.dirname(os.getcwd())") | |
| 883 | |
| 884 @unittest.skipIf(not HAS_CPU_AFFINITY, 'not supported') | |
| 885 def test_cpu_affinity(self): | |
| 886 p = psutil.Process() | |
| 887 initial = p.cpu_affinity() | |
| 888 assert initial, initial | |
| 889 self.addCleanup(p.cpu_affinity, initial) | |
| 890 | |
| 891 if hasattr(os, "sched_getaffinity"): | |
| 892 self.assertEqual(initial, list(os.sched_getaffinity(p.pid))) | |
| 893 self.assertEqual(len(initial), len(set(initial))) | |
| 894 | |
| 895 all_cpus = list(range(len(psutil.cpu_percent(percpu=True)))) | |
| 896 # Work around travis failure: | |
| 897 # https://travis-ci.org/giampaolo/psutil/builds/284173194 | |
| 898 for n in all_cpus if not TRAVIS else initial: | |
| 899 p.cpu_affinity([n]) | |
| 900 self.assertEqual(p.cpu_affinity(), [n]) | |
| 901 if hasattr(os, "sched_getaffinity"): | |
| 902 self.assertEqual(p.cpu_affinity(), | |
| 903 list(os.sched_getaffinity(p.pid))) | |
| 904 # also test num_cpu() | |
| 905 if hasattr(p, "num_cpu"): | |
| 906 self.assertEqual(p.cpu_affinity()[0], p.num_cpu()) | |
| 907 | |
| 908 # [] is an alias for "all eligible CPUs"; on Linux this may | |
| 909 # not be equal to all available CPUs, see: | |
| 910 # https://github.com/giampaolo/psutil/issues/956 | |
| 911 p.cpu_affinity([]) | |
| 912 if LINUX: | |
| 913 self.assertEqual(p.cpu_affinity(), p._proc._get_eligible_cpus()) | |
| 914 else: | |
| 915 self.assertEqual(p.cpu_affinity(), all_cpus) | |
| 916 if hasattr(os, "sched_getaffinity"): | |
| 917 self.assertEqual(p.cpu_affinity(), | |
| 918 list(os.sched_getaffinity(p.pid))) | |
| 919 # | |
| 920 self.assertRaises(TypeError, p.cpu_affinity, 1) | |
| 921 p.cpu_affinity(initial) | |
| 922 # it should work with all iterables, not only lists | |
| 923 if not TRAVIS: | |
| 924 p.cpu_affinity(set(all_cpus)) | |
| 925 p.cpu_affinity(tuple(all_cpus)) | |
| 926 | |
| 927 @unittest.skipIf(not HAS_CPU_AFFINITY, 'not supported') | |
| 928 def test_cpu_affinity_errs(self): | |
| 929 sproc = get_test_subprocess() | |
| 930 p = psutil.Process(sproc.pid) | |
| 931 invalid_cpu = [len(psutil.cpu_times(percpu=True)) + 10] | |
| 932 self.assertRaises(ValueError, p.cpu_affinity, invalid_cpu) | |
| 933 self.assertRaises(ValueError, p.cpu_affinity, range(10000, 11000)) | |
| 934 self.assertRaises(TypeError, p.cpu_affinity, [0, "1"]) | |
| 935 self.assertRaises(ValueError, p.cpu_affinity, [0, -1]) | |
| 936 | |
| 937 @unittest.skipIf(not HAS_CPU_AFFINITY, 'not supported') | |
| 938 def test_cpu_affinity_all_combinations(self): | |
| 939 p = psutil.Process() | |
| 940 initial = p.cpu_affinity() | |
| 941 assert initial, initial | |
| 942 self.addCleanup(p.cpu_affinity, initial) | |
| 943 | |
| 944 # All possible CPU set combinations. | |
| 945 if len(initial) > 12: | |
| 946 initial = initial[:12] # ...otherwise it will take forever | |
| 947 combos = [] | |
| 948 for l in range(0, len(initial) + 1): | |
| 949 for subset in itertools.combinations(initial, l): | |
| 950 if subset: | |
| 951 combos.append(list(subset)) | |
| 952 | |
| 953 for combo in combos: | |
| 954 p.cpu_affinity(combo) | |
| 955 self.assertEqual(p.cpu_affinity(), combo) | |
| 956 | |
| 957 # TODO: #595 | |
| 958 @unittest.skipIf(BSD, "broken on BSD") | |
| 959 # can't find any process file on Appveyor | |
| 960 @unittest.skipIf(APPVEYOR, "unreliable on APPVEYOR") | |
| 961 def test_open_files(self): | |
| 962 # current process | |
| 963 p = psutil.Process() | |
| 964 files = p.open_files() | |
| 965 self.assertFalse(TESTFN in files) | |
| 966 with open(TESTFN, 'wb') as f: | |
| 967 f.write(b'x' * 1024) | |
| 968 f.flush() | |
| 969 # give the kernel some time to see the new file | |
| 970 files = call_until(p.open_files, "len(ret) != %i" % len(files)) | |
| 971 filenames = [os.path.normcase(x.path) for x in files] | |
| 972 self.assertIn(os.path.normcase(TESTFN), filenames) | |
| 973 if LINUX: | |
| 974 for file in files: | |
| 975 if file.path == TESTFN: | |
| 976 self.assertEqual(file.position, 1024) | |
| 977 for file in files: | |
| 978 assert os.path.isfile(file.path), file | |
| 979 | |
| 980 # another process | |
| 981 cmdline = "import time; f = open(r'%s', 'r'); time.sleep(60);" % TESTFN | |
| 982 sproc = get_test_subprocess([PYTHON_EXE, "-c", cmdline]) | |
| 983 p = psutil.Process(sproc.pid) | |
| 984 | |
| 985 for x in range(100): | |
| 986 filenames = [os.path.normcase(x.path) for x in p.open_files()] | |
| 987 if TESTFN in filenames: | |
| 988 break | |
| 989 time.sleep(.01) | |
| 990 else: | |
| 991 self.assertIn(os.path.normcase(TESTFN), filenames) | |
| 992 for file in filenames: | |
| 993 assert os.path.isfile(file), file | |
| 994 | |
| 995 # TODO: #595 | |
| 996 @unittest.skipIf(BSD, "broken on BSD") | |
| 997 # can't find any process file on Appveyor | |
| 998 @unittest.skipIf(APPVEYOR, "unreliable on APPVEYOR") | |
| 999 def test_open_files_2(self): | |
| 1000 # test fd and path fields | |
| 1001 normcase = os.path.normcase | |
| 1002 with open(TESTFN, 'w') as fileobj: | |
| 1003 p = psutil.Process() | |
| 1004 for file in p.open_files(): | |
| 1005 if normcase(file.path) == normcase(fileobj.name) or \ | |
| 1006 file.fd == fileobj.fileno(): | |
| 1007 break | |
| 1008 else: | |
| 1009 self.fail("no file found; files=%s" % repr(p.open_files())) | |
| 1010 self.assertEqual(normcase(file.path), normcase(fileobj.name)) | |
| 1011 if WINDOWS: | |
| 1012 self.assertEqual(file.fd, -1) | |
| 1013 else: | |
| 1014 self.assertEqual(file.fd, fileobj.fileno()) | |
| 1015 # test positions | |
| 1016 ntuple = p.open_files()[0] | |
| 1017 self.assertEqual(ntuple[0], ntuple.path) | |
| 1018 self.assertEqual(ntuple[1], ntuple.fd) | |
| 1019 # test file is gone | |
| 1020 self.assertNotIn(fileobj.name, p.open_files()) | |
| 1021 | |
| 1022 @unittest.skipIf(not POSIX, 'POSIX only') | |
| 1023 def test_num_fds(self): | |
| 1024 p = psutil.Process() | |
| 1025 start = p.num_fds() | |
| 1026 file = open(TESTFN, 'w') | |
| 1027 self.addCleanup(file.close) | |
| 1028 self.assertEqual(p.num_fds(), start + 1) | |
| 1029 sock = socket.socket() | |
| 1030 self.addCleanup(sock.close) | |
| 1031 self.assertEqual(p.num_fds(), start + 2) | |
| 1032 file.close() | |
| 1033 sock.close() | |
| 1034 self.assertEqual(p.num_fds(), start) | |
| 1035 | |
| 1036 @skip_on_not_implemented(only_if=LINUX) | |
| 1037 @unittest.skipIf(OPENBSD or NETBSD, "not reliable on OPENBSD & NETBSD") | |
| 1038 def test_num_ctx_switches(self): | |
| 1039 p = psutil.Process() | |
| 1040 before = sum(p.num_ctx_switches()) | |
| 1041 for x in range(500000): | |
| 1042 after = sum(p.num_ctx_switches()) | |
| 1043 if after > before: | |
| 1044 return | |
| 1045 self.fail("num ctx switches still the same after 50.000 iterations") | |
| 1046 | |
| 1047 def test_ppid(self): | |
| 1048 if hasattr(os, 'getppid'): | |
| 1049 self.assertEqual(psutil.Process().ppid(), os.getppid()) | |
| 1050 this_parent = os.getpid() | |
| 1051 sproc = get_test_subprocess() | |
| 1052 p = psutil.Process(sproc.pid) | |
| 1053 self.assertEqual(p.ppid(), this_parent) | |
| 1054 # no other process is supposed to have us as parent | |
| 1055 reap_children(recursive=True) | |
| 1056 if APPVEYOR: | |
| 1057 # Occasional failures, see: | |
| 1058 # https://ci.appveyor.com/project/giampaolo/psutil/build/ | |
| 1059 # job/0hs623nenj7w4m33 | |
| 1060 return | |
| 1061 for p in psutil.process_iter(): | |
| 1062 if p.pid == sproc.pid: | |
| 1063 continue | |
| 1064 # XXX: sometimes this fails on Windows; not sure why. | |
| 1065 self.assertNotEqual(p.ppid(), this_parent, msg=p) | |
| 1066 | |
| 1067 def test_parent(self): | |
| 1068 this_parent = os.getpid() | |
| 1069 sproc = get_test_subprocess() | |
| 1070 p = psutil.Process(sproc.pid) | |
| 1071 self.assertEqual(p.parent().pid, this_parent) | |
| 1072 | |
| 1073 lowest_pid = psutil.pids()[0] | |
| 1074 self.assertIsNone(psutil.Process(lowest_pid).parent()) | |
| 1075 | |
| 1076 def test_parent_multi(self): | |
| 1077 p1, p2 = create_proc_children_pair() | |
| 1078 self.assertEqual(p2.parent(), p1) | |
| 1079 self.assertEqual(p1.parent(), psutil.Process()) | |
| 1080 | |
| 1081 def test_parent_disappeared(self): | |
| 1082 # Emulate a case where the parent process disappeared. | |
| 1083 sproc = get_test_subprocess() | |
| 1084 p = psutil.Process(sproc.pid) | |
| 1085 with mock.patch("psutil.Process", | |
| 1086 side_effect=psutil.NoSuchProcess(0, 'foo')): | |
| 1087 self.assertIsNone(p.parent()) | |
| 1088 | |
| 1089 @retry_on_failure() | |
| 1090 def test_parents(self): | |
| 1091 assert psutil.Process().parents() | |
| 1092 p1, p2 = create_proc_children_pair() | |
| 1093 self.assertEqual(p1.parents()[0], psutil.Process()) | |
| 1094 self.assertEqual(p2.parents()[0], p1) | |
| 1095 self.assertEqual(p2.parents()[1], psutil.Process()) | |
| 1096 | |
| 1097 def test_children(self): | |
| 1098 reap_children(recursive=True) | |
| 1099 p = psutil.Process() | |
| 1100 self.assertEqual(p.children(), []) | |
| 1101 self.assertEqual(p.children(recursive=True), []) | |
| 1102 # On Windows we set the flag to 0 in order to cancel out the | |
| 1103 # CREATE_NO_WINDOW flag (enabled by default) which creates | |
| 1104 # an extra "conhost.exe" child. | |
| 1105 sproc = get_test_subprocess(creationflags=0) | |
| 1106 children1 = p.children() | |
| 1107 children2 = p.children(recursive=True) | |
| 1108 for children in (children1, children2): | |
| 1109 self.assertEqual(len(children), 1) | |
| 1110 self.assertEqual(children[0].pid, sproc.pid) | |
| 1111 self.assertEqual(children[0].ppid(), os.getpid()) | |
| 1112 | |
| 1113 def test_children_recursive(self): | |
| 1114 # Test children() against two sub processes, p1 and p2, where | |
| 1115 # p1 (our child) spawned p2 (our grandchild). | |
| 1116 p1, p2 = create_proc_children_pair() | |
| 1117 p = psutil.Process() | |
| 1118 self.assertEqual(p.children(), [p1]) | |
| 1119 self.assertEqual(p.children(recursive=True), [p1, p2]) | |
| 1120 # If the intermediate process is gone there's no way for | |
| 1121 # children() to recursively find it. | |
| 1122 p1.terminate() | |
| 1123 p1.wait() | |
| 1124 self.assertEqual(p.children(recursive=True), []) | |
| 1125 | |
| 1126 def test_children_duplicates(self): | |
| 1127 # find the process which has the highest number of children | |
| 1128 table = collections.defaultdict(int) | |
| 1129 for p in psutil.process_iter(): | |
| 1130 try: | |
| 1131 table[p.ppid()] += 1 | |
| 1132 except psutil.Error: | |
| 1133 pass | |
| 1134 # this is the one, now let's make sure there are no duplicates | |
| 1135 pid = sorted(table.items(), key=lambda x: x[1])[-1][0] | |
| 1136 p = psutil.Process(pid) | |
| 1137 try: | |
| 1138 c = p.children(recursive=True) | |
| 1139 except psutil.AccessDenied: # windows | |
| 1140 pass | |
| 1141 else: | |
| 1142 self.assertEqual(len(c), len(set(c))) | |
| 1143 | |
| 1144 def test_parents_and_children(self): | |
| 1145 p1, p2 = create_proc_children_pair() | |
| 1146 me = psutil.Process() | |
| 1147 # forward | |
| 1148 children = me.children(recursive=True) | |
| 1149 self.assertEqual(len(children), 2) | |
| 1150 self.assertEqual(children[0], p1) | |
| 1151 self.assertEqual(children[1], p2) | |
| 1152 # backward | |
| 1153 parents = p2.parents() | |
| 1154 self.assertEqual(parents[0], p1) | |
| 1155 self.assertEqual(parents[1], me) | |
| 1156 | |
| 1157 def test_suspend_resume(self): | |
| 1158 sproc = get_test_subprocess() | |
| 1159 p = psutil.Process(sproc.pid) | |
| 1160 p.suspend() | |
| 1161 for x in range(100): | |
| 1162 if p.status() == psutil.STATUS_STOPPED: | |
| 1163 break | |
| 1164 time.sleep(0.01) | |
| 1165 p.resume() | |
| 1166 self.assertNotEqual(p.status(), psutil.STATUS_STOPPED) | |
| 1167 | |
| 1168 def test_invalid_pid(self): | |
| 1169 self.assertRaises(TypeError, psutil.Process, "1") | |
| 1170 self.assertRaises(ValueError, psutil.Process, -1) | |
| 1171 | |
| 1172 def test_as_dict(self): | |
| 1173 p = psutil.Process() | |
| 1174 d = p.as_dict(attrs=['exe', 'name']) | |
| 1175 self.assertEqual(sorted(d.keys()), ['exe', 'name']) | |
| 1176 | |
| 1177 p = psutil.Process(min(psutil.pids())) | |
| 1178 d = p.as_dict(attrs=['connections'], ad_value='foo') | |
| 1179 if not isinstance(d['connections'], list): | |
| 1180 self.assertEqual(d['connections'], 'foo') | |
| 1181 | |
| 1182 # Test ad_value is set on AccessDenied. | |
| 1183 with mock.patch('psutil.Process.nice', create=True, | |
| 1184 side_effect=psutil.AccessDenied): | |
| 1185 self.assertEqual( | |
| 1186 p.as_dict(attrs=["nice"], ad_value=1), {"nice": 1}) | |
| 1187 | |
| 1188 # Test that NoSuchProcess bubbles up. | |
| 1189 with mock.patch('psutil.Process.nice', create=True, | |
| 1190 side_effect=psutil.NoSuchProcess(p.pid, "name")): | |
| 1191 self.assertRaises( | |
| 1192 psutil.NoSuchProcess, p.as_dict, attrs=["nice"]) | |
| 1193 | |
| 1194 # Test that ZombieProcess is swallowed. | |
| 1195 with mock.patch('psutil.Process.nice', create=True, | |
| 1196 side_effect=psutil.ZombieProcess(p.pid, "name")): | |
| 1197 self.assertEqual( | |
| 1198 p.as_dict(attrs=["nice"], ad_value="foo"), {"nice": "foo"}) | |
| 1199 | |
| 1200 # By default APIs raising NotImplementedError are | |
| 1201 # supposed to be skipped. | |
| 1202 with mock.patch('psutil.Process.nice', create=True, | |
| 1203 side_effect=NotImplementedError): | |
| 1204 d = p.as_dict() | |
| 1205 self.assertNotIn('nice', list(d.keys())) | |
| 1206 # ...unless the user explicitly asked for some attr. | |
| 1207 with self.assertRaises(NotImplementedError): | |
| 1208 p.as_dict(attrs=["nice"]) | |
| 1209 | |
| 1210 # errors | |
| 1211 with self.assertRaises(TypeError): | |
| 1212 p.as_dict('name') | |
| 1213 with self.assertRaises(ValueError): | |
| 1214 p.as_dict(['foo']) | |
| 1215 with self.assertRaises(ValueError): | |
| 1216 p.as_dict(['foo', 'bar']) | |
| 1217 | |
| 1218 def test_oneshot(self): | |
| 1219 with mock.patch("psutil._psplatform.Process.cpu_times") as m: | |
| 1220 p = psutil.Process() | |
| 1221 with p.oneshot(): | |
| 1222 p.cpu_times() | |
| 1223 p.cpu_times() | |
| 1224 self.assertEqual(m.call_count, 1) | |
| 1225 | |
| 1226 with mock.patch("psutil._psplatform.Process.cpu_times") as m: | |
| 1227 p.cpu_times() | |
| 1228 p.cpu_times() | |
| 1229 self.assertEqual(m.call_count, 2) | |
| 1230 | |
| 1231 def test_oneshot_twice(self): | |
| 1232 # Test the case where the ctx manager is __enter__ed twice. | |
| 1233 # The second __enter__ is supposed to resut in a NOOP. | |
| 1234 with mock.patch("psutil._psplatform.Process.cpu_times") as m1: | |
| 1235 with mock.patch("psutil._psplatform.Process.oneshot_enter") as m2: | |
| 1236 p = psutil.Process() | |
| 1237 with p.oneshot(): | |
| 1238 p.cpu_times() | |
| 1239 p.cpu_times() | |
| 1240 with p.oneshot(): | |
| 1241 p.cpu_times() | |
| 1242 p.cpu_times() | |
| 1243 self.assertEqual(m1.call_count, 1) | |
| 1244 self.assertEqual(m2.call_count, 1) | |
| 1245 | |
| 1246 with mock.patch("psutil._psplatform.Process.cpu_times") as m: | |
| 1247 p.cpu_times() | |
| 1248 p.cpu_times() | |
| 1249 self.assertEqual(m.call_count, 2) | |
| 1250 | |
| 1251 def test_oneshot_cache(self): | |
| 1252 # Make sure oneshot() cache is nonglobal. Instead it's | |
| 1253 # supposed to be bound to the Process instance, see: | |
| 1254 # https://github.com/giampaolo/psutil/issues/1373 | |
| 1255 p1, p2 = create_proc_children_pair() | |
| 1256 p1_ppid = p1.ppid() | |
| 1257 p2_ppid = p2.ppid() | |
| 1258 self.assertNotEqual(p1_ppid, p2_ppid) | |
| 1259 with p1.oneshot(): | |
| 1260 self.assertEqual(p1.ppid(), p1_ppid) | |
| 1261 self.assertEqual(p2.ppid(), p2_ppid) | |
| 1262 with p2.oneshot(): | |
| 1263 self.assertEqual(p1.ppid(), p1_ppid) | |
| 1264 self.assertEqual(p2.ppid(), p2_ppid) | |
| 1265 | |
| 1266 def test_halfway_terminated_process(self): | |
| 1267 # Test that NoSuchProcess exception gets raised in case the | |
| 1268 # process dies after we create the Process object. | |
| 1269 # Example: | |
| 1270 # >>> proc = Process(1234) | |
| 1271 # >>> time.sleep(2) # time-consuming task, process dies in meantime | |
| 1272 # >>> proc.name() | |
| 1273 # Refers to Issue #15 | |
| 1274 sproc = get_test_subprocess() | |
| 1275 p = psutil.Process(sproc.pid) | |
| 1276 p.terminate() | |
| 1277 p.wait() | |
| 1278 if WINDOWS: | |
| 1279 call_until(psutil.pids, "%s not in ret" % p.pid) | |
| 1280 assert not p.is_running() | |
| 1281 | |
| 1282 if WINDOWS: | |
| 1283 with self.assertRaises(psutil.NoSuchProcess): | |
| 1284 p.send_signal(signal.CTRL_C_EVENT) | |
| 1285 with self.assertRaises(psutil.NoSuchProcess): | |
| 1286 p.send_signal(signal.CTRL_BREAK_EVENT) | |
| 1287 | |
| 1288 excluded_names = ['pid', 'is_running', 'wait', 'create_time', | |
| 1289 'oneshot', 'memory_info_ex'] | |
| 1290 if LINUX and not HAS_RLIMIT: | |
| 1291 excluded_names.append('rlimit') | |
| 1292 for name in dir(p): | |
| 1293 if (name.startswith('_') or | |
| 1294 name in excluded_names): | |
| 1295 continue | |
| 1296 try: | |
| 1297 meth = getattr(p, name) | |
| 1298 # get/set methods | |
| 1299 if name == 'nice': | |
| 1300 if POSIX: | |
| 1301 ret = meth(1) | |
| 1302 else: | |
| 1303 ret = meth(psutil.NORMAL_PRIORITY_CLASS) | |
| 1304 elif name == 'ionice': | |
| 1305 ret = meth() | |
| 1306 ret = meth(2) | |
| 1307 elif name == 'rlimit': | |
| 1308 ret = meth(psutil.RLIMIT_NOFILE) | |
| 1309 ret = meth(psutil.RLIMIT_NOFILE, (5, 5)) | |
| 1310 elif name == 'cpu_affinity': | |
| 1311 ret = meth() | |
| 1312 ret = meth([0]) | |
| 1313 elif name == 'send_signal': | |
| 1314 ret = meth(signal.SIGTERM) | |
| 1315 else: | |
| 1316 ret = meth() | |
| 1317 except psutil.ZombieProcess: | |
| 1318 self.fail("ZombieProcess for %r was not supposed to happen" % | |
| 1319 name) | |
| 1320 except psutil.NoSuchProcess: | |
| 1321 pass | |
| 1322 except psutil.AccessDenied: | |
| 1323 if OPENBSD and name in ('threads', 'num_threads'): | |
| 1324 pass | |
| 1325 else: | |
| 1326 raise | |
| 1327 except NotImplementedError: | |
| 1328 pass | |
| 1329 else: | |
| 1330 # NtQuerySystemInformation succeeds if process is gone. | |
| 1331 if WINDOWS and name in ('exe', 'name'): | |
| 1332 normcase = os.path.normcase | |
| 1333 if name == 'exe': | |
| 1334 self.assertEqual(normcase(ret), normcase(PYTHON_EXE)) | |
| 1335 else: | |
| 1336 self.assertEqual( | |
| 1337 normcase(ret), | |
| 1338 normcase(os.path.basename(PYTHON_EXE))) | |
| 1339 continue | |
| 1340 self.fail( | |
| 1341 "NoSuchProcess exception not raised for %r, retval=%s" % ( | |
| 1342 name, ret)) | |
| 1343 | |
| 1344 @unittest.skipIf(not POSIX, 'POSIX only') | |
| 1345 def test_zombie_process(self): | |
| 1346 def succeed_or_zombie_p_exc(fun, *args, **kwargs): | |
| 1347 try: | |
| 1348 return fun(*args, **kwargs) | |
| 1349 except (psutil.ZombieProcess, psutil.AccessDenied): | |
| 1350 pass | |
| 1351 | |
| 1352 zpid = create_zombie_proc() | |
| 1353 self.addCleanup(reap_children, recursive=True) | |
| 1354 # A zombie process should always be instantiable | |
| 1355 zproc = psutil.Process(zpid) | |
| 1356 # ...and at least its status always be querable | |
| 1357 self.assertEqual(zproc.status(), psutil.STATUS_ZOMBIE) | |
| 1358 # ...and it should be considered 'running' | |
| 1359 self.assertTrue(zproc.is_running()) | |
| 1360 # ...and as_dict() shouldn't crash | |
| 1361 zproc.as_dict() | |
| 1362 # if cmdline succeeds it should be an empty list | |
| 1363 ret = succeed_or_zombie_p_exc(zproc.suspend) | |
| 1364 if ret is not None: | |
| 1365 self.assertEqual(ret, []) | |
| 1366 | |
| 1367 if hasattr(zproc, "rlimit"): | |
| 1368 succeed_or_zombie_p_exc(zproc.rlimit, psutil.RLIMIT_NOFILE) | |
| 1369 succeed_or_zombie_p_exc(zproc.rlimit, psutil.RLIMIT_NOFILE, | |
| 1370 (5, 5)) | |
| 1371 # set methods | |
| 1372 succeed_or_zombie_p_exc(zproc.parent) | |
| 1373 if hasattr(zproc, 'cpu_affinity'): | |
| 1374 try: | |
| 1375 succeed_or_zombie_p_exc(zproc.cpu_affinity, [0]) | |
| 1376 except ValueError as err: | |
| 1377 if TRAVIS and LINUX and "not eligible" in str(err): | |
| 1378 # https://travis-ci.org/giampaolo/psutil/jobs/279890461 | |
| 1379 pass | |
| 1380 else: | |
| 1381 raise | |
| 1382 | |
| 1383 succeed_or_zombie_p_exc(zproc.nice, 0) | |
| 1384 if hasattr(zproc, 'ionice'): | |
| 1385 if LINUX: | |
| 1386 succeed_or_zombie_p_exc(zproc.ionice, 2, 0) | |
| 1387 else: | |
| 1388 succeed_or_zombie_p_exc(zproc.ionice, 0) # Windows | |
| 1389 if hasattr(zproc, 'rlimit'): | |
| 1390 succeed_or_zombie_p_exc(zproc.rlimit, | |
| 1391 psutil.RLIMIT_NOFILE, (5, 5)) | |
| 1392 succeed_or_zombie_p_exc(zproc.suspend) | |
| 1393 succeed_or_zombie_p_exc(zproc.resume) | |
| 1394 succeed_or_zombie_p_exc(zproc.terminate) | |
| 1395 succeed_or_zombie_p_exc(zproc.kill) | |
| 1396 | |
| 1397 # ...its parent should 'see' it | |
| 1398 # edit: not true on BSD and MACOS | |
| 1399 # descendants = [x.pid for x in psutil.Process().children( | |
| 1400 # recursive=True)] | |
| 1401 # self.assertIn(zpid, descendants) | |
| 1402 # XXX should we also assume ppid be usable? Note: this | |
| 1403 # would be an important use case as the only way to get | |
| 1404 # rid of a zombie is to kill its parent. | |
| 1405 # self.assertEqual(zpid.ppid(), os.getpid()) | |
| 1406 # ...and all other APIs should be able to deal with it | |
| 1407 self.assertTrue(psutil.pid_exists(zpid)) | |
| 1408 if not TRAVIS and MACOS: | |
| 1409 # For some reason this started failing all of the sudden. | |
| 1410 # Maybe they upgraded MACOS version? | |
| 1411 # https://travis-ci.org/giampaolo/psutil/jobs/310896404 | |
| 1412 self.assertIn(zpid, psutil.pids()) | |
| 1413 self.assertIn(zpid, [x.pid for x in psutil.process_iter()]) | |
| 1414 psutil._pmap = {} | |
| 1415 self.assertIn(zpid, [x.pid for x in psutil.process_iter()]) | |
| 1416 | |
| 1417 @unittest.skipIf(not POSIX, 'POSIX only') | |
| 1418 def test_zombie_process_is_running_w_exc(self): | |
| 1419 # Emulate a case where internally is_running() raises | |
| 1420 # ZombieProcess. | |
| 1421 p = psutil.Process() | |
| 1422 with mock.patch("psutil.Process", | |
| 1423 side_effect=psutil.ZombieProcess(0)) as m: | |
| 1424 assert p.is_running() | |
| 1425 assert m.called | |
| 1426 | |
| 1427 @unittest.skipIf(not POSIX, 'POSIX only') | |
| 1428 def test_zombie_process_status_w_exc(self): | |
| 1429 # Emulate a case where internally status() raises | |
| 1430 # ZombieProcess. | |
| 1431 p = psutil.Process() | |
| 1432 with mock.patch("psutil._psplatform.Process.status", | |
| 1433 side_effect=psutil.ZombieProcess(0)) as m: | |
| 1434 self.assertEqual(p.status(), psutil.STATUS_ZOMBIE) | |
| 1435 assert m.called | |
| 1436 | |
| 1437 def test_pid_0(self): | |
| 1438 # Process(0) is supposed to work on all platforms except Linux | |
| 1439 if 0 not in psutil.pids(): | |
| 1440 self.assertRaises(psutil.NoSuchProcess, psutil.Process, 0) | |
| 1441 return | |
| 1442 | |
| 1443 # test all methods | |
| 1444 p = psutil.Process(0) | |
| 1445 for name in psutil._as_dict_attrnames: | |
| 1446 if name == 'pid': | |
| 1447 continue | |
| 1448 meth = getattr(p, name) | |
| 1449 try: | |
| 1450 ret = meth() | |
| 1451 except psutil.AccessDenied: | |
| 1452 pass | |
| 1453 else: | |
| 1454 if name in ("uids", "gids"): | |
| 1455 self.assertEqual(ret.real, 0) | |
| 1456 elif name == "username": | |
| 1457 if POSIX: | |
| 1458 self.assertEqual(p.username(), 'root') | |
| 1459 elif WINDOWS: | |
| 1460 self.assertEqual(p.username(), 'NT AUTHORITY\\SYSTEM') | |
| 1461 elif name == "name": | |
| 1462 assert name, name | |
| 1463 | |
| 1464 if hasattr(p, 'rlimit'): | |
| 1465 try: | |
| 1466 p.rlimit(psutil.RLIMIT_FSIZE) | |
| 1467 except psutil.AccessDenied: | |
| 1468 pass | |
| 1469 | |
| 1470 p.as_dict() | |
| 1471 | |
| 1472 if not OPENBSD: | |
| 1473 self.assertIn(0, psutil.pids()) | |
| 1474 self.assertTrue(psutil.pid_exists(0)) | |
| 1475 | |
| 1476 @unittest.skipIf(not HAS_ENVIRON, "not supported") | |
| 1477 def test_environ(self): | |
| 1478 def clean_dict(d): | |
| 1479 # Most of these are problematic on Travis. | |
| 1480 d.pop("PSUTIL_TESTING", None) | |
| 1481 d.pop("PLAT", None) | |
| 1482 d.pop("HOME", None) | |
| 1483 if MACOS: | |
| 1484 d.pop("__CF_USER_TEXT_ENCODING", None) | |
| 1485 d.pop("VERSIONER_PYTHON_PREFER_32_BIT", None) | |
| 1486 d.pop("VERSIONER_PYTHON_VERSION", None) | |
| 1487 return dict( | |
| 1488 [(k.replace("\r", "").replace("\n", ""), | |
| 1489 v.replace("\r", "").replace("\n", "")) | |
| 1490 for k, v in d.items()]) | |
| 1491 | |
| 1492 self.maxDiff = None | |
| 1493 p = psutil.Process() | |
| 1494 d1 = clean_dict(p.environ()) | |
| 1495 d2 = clean_dict(os.environ.copy()) | |
| 1496 self.assertEqual(d1, d2) | |
| 1497 | |
| 1498 @unittest.skipIf(not HAS_ENVIRON, "not supported") | |
| 1499 @unittest.skipIf(not POSIX, "POSIX only") | |
| 1500 def test_weird_environ(self): | |
| 1501 # environment variables can contain values without an equals sign | |
| 1502 code = textwrap.dedent(""" | |
| 1503 #include <unistd.h> | |
| 1504 #include <fcntl.h> | |
| 1505 char * const argv[] = {"cat", 0}; | |
| 1506 char * const envp[] = {"A=1", "X", "C=3", 0}; | |
| 1507 int main(void) { | |
| 1508 /* Close stderr on exec so parent can wait for the execve to | |
| 1509 * finish. */ | |
| 1510 if (fcntl(2, F_SETFD, FD_CLOEXEC) != 0) | |
| 1511 return 0; | |
| 1512 return execve("/bin/cat", argv, envp); | |
| 1513 } | |
| 1514 """) | |
| 1515 path = TESTFN | |
| 1516 create_exe(path, c_code=code) | |
| 1517 self.addCleanup(safe_rmpath, path) | |
| 1518 sproc = get_test_subprocess([path], | |
| 1519 stdin=subprocess.PIPE, | |
| 1520 stderr=subprocess.PIPE) | |
| 1521 p = psutil.Process(sproc.pid) | |
| 1522 wait_for_pid(p.pid) | |
| 1523 self.assertTrue(p.is_running()) | |
| 1524 # Wait for process to exec or exit. | |
| 1525 self.assertEqual(sproc.stderr.read(), b"") | |
| 1526 self.assertEqual(p.environ(), {"A": "1", "C": "3"}) | |
| 1527 sproc.communicate() | |
| 1528 self.assertEqual(sproc.returncode, 0) | |
| 1529 | |
| 1530 | |
| 1531 # =================================================================== | |
| 1532 # --- Limited user tests | |
| 1533 # =================================================================== | |
| 1534 | |
| 1535 | |
| 1536 if POSIX and os.getuid() == 0: | |
| 1537 class LimitedUserTestCase(TestProcess): | |
| 1538 """Repeat the previous tests by using a limited user. | |
| 1539 Executed only on UNIX and only if the user who run the test script | |
| 1540 is root. | |
| 1541 """ | |
| 1542 # the uid/gid the test suite runs under | |
| 1543 if hasattr(os, 'getuid'): | |
| 1544 PROCESS_UID = os.getuid() | |
| 1545 PROCESS_GID = os.getgid() | |
| 1546 | |
| 1547 def __init__(self, *args, **kwargs): | |
| 1548 TestProcess.__init__(self, *args, **kwargs) | |
| 1549 # re-define all existent test methods in order to | |
| 1550 # ignore AccessDenied exceptions | |
| 1551 for attr in [x for x in dir(self) if x.startswith('test')]: | |
| 1552 meth = getattr(self, attr) | |
| 1553 | |
| 1554 def test_(self): | |
| 1555 try: | |
| 1556 meth() | |
| 1557 except psutil.AccessDenied: | |
| 1558 pass | |
| 1559 setattr(self, attr, types.MethodType(test_, self)) | |
| 1560 | |
| 1561 def setUp(self): | |
| 1562 safe_rmpath(TESTFN) | |
| 1563 TestProcess.setUp(self) | |
| 1564 os.setegid(1000) | |
| 1565 os.seteuid(1000) | |
| 1566 | |
| 1567 def tearDown(self): | |
| 1568 os.setegid(self.PROCESS_UID) | |
| 1569 os.seteuid(self.PROCESS_GID) | |
| 1570 TestProcess.tearDown(self) | |
| 1571 | |
| 1572 def test_nice(self): | |
| 1573 try: | |
| 1574 psutil.Process().nice(-1) | |
| 1575 except psutil.AccessDenied: | |
| 1576 pass | |
| 1577 else: | |
| 1578 self.fail("exception not raised") | |
| 1579 | |
| 1580 def test_zombie_process(self): | |
| 1581 # causes problems if test test suite is run as root | |
| 1582 pass | |
| 1583 | |
| 1584 | |
| 1585 # =================================================================== | |
| 1586 # --- psutil.Popen tests | |
| 1587 # =================================================================== | |
| 1588 | |
| 1589 | |
| 1590 class TestPopen(unittest.TestCase): | |
| 1591 """Tests for psutil.Popen class.""" | |
| 1592 | |
| 1593 def tearDown(self): | |
| 1594 reap_children() | |
| 1595 | |
| 1596 def test_misc(self): | |
| 1597 # XXX this test causes a ResourceWarning on Python 3 because | |
| 1598 # psutil.__subproc instance doesn't get propertly freed. | |
| 1599 # Not sure what to do though. | |
| 1600 cmd = [PYTHON_EXE, "-c", "import time; time.sleep(60);"] | |
| 1601 with psutil.Popen(cmd, stdout=subprocess.PIPE, | |
| 1602 stderr=subprocess.PIPE) as proc: | |
| 1603 proc.name() | |
| 1604 proc.cpu_times() | |
| 1605 proc.stdin | |
| 1606 self.assertTrue(dir(proc)) | |
| 1607 self.assertRaises(AttributeError, getattr, proc, 'foo') | |
| 1608 proc.terminate() | |
| 1609 | |
| 1610 def test_ctx_manager(self): | |
| 1611 with psutil.Popen([PYTHON_EXE, "-V"], | |
| 1612 stdout=subprocess.PIPE, | |
| 1613 stderr=subprocess.PIPE, | |
| 1614 stdin=subprocess.PIPE) as proc: | |
| 1615 proc.communicate() | |
| 1616 assert proc.stdout.closed | |
| 1617 assert proc.stderr.closed | |
| 1618 assert proc.stdin.closed | |
| 1619 self.assertEqual(proc.returncode, 0) | |
| 1620 | |
| 1621 def test_kill_terminate(self): | |
| 1622 # subprocess.Popen()'s terminate(), kill() and send_signal() do | |
| 1623 # not raise exception after the process is gone. psutil.Popen | |
| 1624 # diverges from that. | |
| 1625 cmd = [PYTHON_EXE, "-c", "import time; time.sleep(60);"] | |
| 1626 with psutil.Popen(cmd, stdout=subprocess.PIPE, | |
| 1627 stderr=subprocess.PIPE) as proc: | |
| 1628 proc.terminate() | |
| 1629 proc.wait() | |
| 1630 self.assertRaises(psutil.NoSuchProcess, proc.terminate) | |
| 1631 self.assertRaises(psutil.NoSuchProcess, proc.kill) | |
| 1632 self.assertRaises(psutil.NoSuchProcess, proc.send_signal, | |
| 1633 signal.SIGTERM) | |
| 1634 if WINDOWS and sys.version_info >= (2, 7): | |
| 1635 self.assertRaises(psutil.NoSuchProcess, proc.send_signal, | |
| 1636 signal.CTRL_C_EVENT) | |
| 1637 self.assertRaises(psutil.NoSuchProcess, proc.send_signal, | |
| 1638 signal.CTRL_BREAK_EVENT) | |
| 1639 | |
| 1640 | |
| 1641 if __name__ == '__main__': | |
| 1642 from psutil.tests.runner import run | |
| 1643 run(__file__) |
