Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/psutil/tests/test_memleaks.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. | |
4 # Use of this source code is governed by a BSD-style license that can be | |
5 # found in the LICENSE file. | |
6 | |
7 """ | |
8 Tests for detecting function memory leaks (typically the ones | |
9 implemented in C). It does so by calling a function many times and | |
10 checking whether process memory usage keeps increasing between | |
11 calls or over time. | |
12 Note that this may produce false positives (especially on Windows | |
13 for some reason). | |
14 PyPy appears to be completely unstable for this framework, probably | |
15 because of how its JIT handles memory, so tests are skipped. | |
16 """ | |
17 | |
18 from __future__ import print_function | |
19 import functools | |
20 import os | |
21 | |
22 import psutil | |
23 import psutil._common | |
24 from psutil import LINUX | |
25 from psutil import MACOS | |
26 from psutil import OPENBSD | |
27 from psutil import POSIX | |
28 from psutil import SUNOS | |
29 from psutil import WINDOWS | |
30 from psutil._compat import ProcessLookupError | |
31 from psutil._compat import super | |
32 from psutil.tests import create_sockets | |
33 from psutil.tests import get_testfn | |
34 from psutil.tests import HAS_CPU_AFFINITY | |
35 from psutil.tests import HAS_CPU_FREQ | |
36 from psutil.tests import HAS_ENVIRON | |
37 from psutil.tests import HAS_IONICE | |
38 from psutil.tests import HAS_MEMORY_MAPS | |
39 from psutil.tests import HAS_NET_IO_COUNTERS | |
40 from psutil.tests import HAS_PROC_CPU_NUM | |
41 from psutil.tests import HAS_PROC_IO_COUNTERS | |
42 from psutil.tests import HAS_RLIMIT | |
43 from psutil.tests import HAS_SENSORS_BATTERY | |
44 from psutil.tests import HAS_SENSORS_FANS | |
45 from psutil.tests import HAS_SENSORS_TEMPERATURES | |
46 from psutil.tests import process_namespace | |
47 from psutil.tests import skip_on_access_denied | |
48 from psutil.tests import spawn_testproc | |
49 from psutil.tests import system_namespace | |
50 from psutil.tests import terminate | |
51 from psutil.tests import TestMemoryLeak | |
52 from psutil.tests import unittest | |
53 | |
54 | |
55 cext = psutil._psplatform.cext | |
56 thisproc = psutil.Process() | |
57 FEW_TIMES = 5 | |
58 | |
59 | |
60 def fewtimes_if_linux(): | |
61 """Decorator for those Linux functions which are implemented in pure | |
62 Python, and which we want to run faster. | |
63 """ | |
64 def decorator(fun): | |
65 @functools.wraps(fun) | |
66 def wrapper(self, *args, **kwargs): | |
67 if LINUX: | |
68 before = self.__class__.times | |
69 try: | |
70 self.__class__.times = FEW_TIMES | |
71 return fun(self, *args, **kwargs) | |
72 finally: | |
73 self.__class__.times = before | |
74 else: | |
75 return fun(self, *args, **kwargs) | |
76 return wrapper | |
77 return decorator | |
78 | |
79 | |
80 # =================================================================== | |
81 # Process class | |
82 # =================================================================== | |
83 | |
84 | |
85 class TestProcessObjectLeaks(TestMemoryLeak): | |
86 """Test leaks of Process class methods.""" | |
87 | |
88 proc = thisproc | |
89 | |
90 def test_coverage(self): | |
91 ns = process_namespace(None) | |
92 ns.test_class_coverage(self, ns.getters + ns.setters) | |
93 | |
94 @fewtimes_if_linux() | |
95 def test_name(self): | |
96 self.execute(self.proc.name) | |
97 | |
98 @fewtimes_if_linux() | |
99 def test_cmdline(self): | |
100 self.execute(self.proc.cmdline) | |
101 | |
102 @fewtimes_if_linux() | |
103 def test_exe(self): | |
104 self.execute(self.proc.exe) | |
105 | |
106 @fewtimes_if_linux() | |
107 def test_ppid(self): | |
108 self.execute(self.proc.ppid) | |
109 | |
110 @unittest.skipIf(not POSIX, "POSIX only") | |
111 @fewtimes_if_linux() | |
112 def test_uids(self): | |
113 self.execute(self.proc.uids) | |
114 | |
115 @unittest.skipIf(not POSIX, "POSIX only") | |
116 @fewtimes_if_linux() | |
117 def test_gids(self): | |
118 self.execute(self.proc.gids) | |
119 | |
120 @fewtimes_if_linux() | |
121 def test_status(self): | |
122 self.execute(self.proc.status) | |
123 | |
124 def test_nice(self): | |
125 self.execute(self.proc.nice) | |
126 | |
127 def test_nice_set(self): | |
128 niceness = thisproc.nice() | |
129 self.execute(lambda: self.proc.nice(niceness)) | |
130 | |
131 @unittest.skipIf(not HAS_IONICE, "not supported") | |
132 def test_ionice(self): | |
133 self.execute(self.proc.ionice) | |
134 | |
135 @unittest.skipIf(not HAS_IONICE, "not supported") | |
136 def test_ionice_set(self): | |
137 if WINDOWS: | |
138 value = thisproc.ionice() | |
139 self.execute(lambda: self.proc.ionice(value)) | |
140 else: | |
141 self.execute(lambda: self.proc.ionice(psutil.IOPRIO_CLASS_NONE)) | |
142 fun = functools.partial(cext.proc_ioprio_set, os.getpid(), -1, 0) | |
143 self.execute_w_exc(OSError, fun) | |
144 | |
145 @unittest.skipIf(not HAS_PROC_IO_COUNTERS, "not supported") | |
146 @fewtimes_if_linux() | |
147 def test_io_counters(self): | |
148 self.execute(self.proc.io_counters) | |
149 | |
150 @unittest.skipIf(POSIX, "worthless on POSIX") | |
151 def test_username(self): | |
152 # always open 1 handle on Windows (only once) | |
153 psutil.Process().username() | |
154 self.execute(self.proc.username) | |
155 | |
156 @fewtimes_if_linux() | |
157 def test_create_time(self): | |
158 self.execute(self.proc.create_time) | |
159 | |
160 @fewtimes_if_linux() | |
161 @skip_on_access_denied(only_if=OPENBSD) | |
162 def test_num_threads(self): | |
163 self.execute(self.proc.num_threads) | |
164 | |
165 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
166 def test_num_handles(self): | |
167 self.execute(self.proc.num_handles) | |
168 | |
169 @unittest.skipIf(not POSIX, "POSIX only") | |
170 @fewtimes_if_linux() | |
171 def test_num_fds(self): | |
172 self.execute(self.proc.num_fds) | |
173 | |
174 @fewtimes_if_linux() | |
175 def test_num_ctx_switches(self): | |
176 self.execute(self.proc.num_ctx_switches) | |
177 | |
178 @fewtimes_if_linux() | |
179 @skip_on_access_denied(only_if=OPENBSD) | |
180 def test_threads(self): | |
181 self.execute(self.proc.threads) | |
182 | |
183 @fewtimes_if_linux() | |
184 def test_cpu_times(self): | |
185 self.execute(self.proc.cpu_times) | |
186 | |
187 @fewtimes_if_linux() | |
188 @unittest.skipIf(not HAS_PROC_CPU_NUM, "not supported") | |
189 def test_cpu_num(self): | |
190 self.execute(self.proc.cpu_num) | |
191 | |
192 @fewtimes_if_linux() | |
193 def test_memory_info(self): | |
194 self.execute(self.proc.memory_info) | |
195 | |
196 @fewtimes_if_linux() | |
197 def test_memory_full_info(self): | |
198 self.execute(self.proc.memory_full_info) | |
199 | |
200 @unittest.skipIf(not POSIX, "POSIX only") | |
201 @fewtimes_if_linux() | |
202 def test_terminal(self): | |
203 self.execute(self.proc.terminal) | |
204 | |
205 def test_resume(self): | |
206 times = FEW_TIMES if POSIX else self.times | |
207 self.execute(self.proc.resume, times=times) | |
208 | |
209 @fewtimes_if_linux() | |
210 def test_cwd(self): | |
211 self.execute(self.proc.cwd) | |
212 | |
213 @unittest.skipIf(not HAS_CPU_AFFINITY, "not supported") | |
214 def test_cpu_affinity(self): | |
215 self.execute(self.proc.cpu_affinity) | |
216 | |
217 @unittest.skipIf(not HAS_CPU_AFFINITY, "not supported") | |
218 def test_cpu_affinity_set(self): | |
219 affinity = thisproc.cpu_affinity() | |
220 self.execute(lambda: self.proc.cpu_affinity(affinity)) | |
221 self.execute_w_exc( | |
222 ValueError, lambda: self.proc.cpu_affinity([-1])) | |
223 | |
224 @fewtimes_if_linux() | |
225 def test_open_files(self): | |
226 with open(get_testfn(), 'w'): | |
227 self.execute(self.proc.open_files) | |
228 | |
229 @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported") | |
230 @fewtimes_if_linux() | |
231 def test_memory_maps(self): | |
232 self.execute(self.proc.memory_maps) | |
233 | |
234 @unittest.skipIf(not LINUX, "LINUX only") | |
235 @unittest.skipIf(not HAS_RLIMIT, "not supported") | |
236 def test_rlimit(self): | |
237 self.execute(lambda: self.proc.rlimit(psutil.RLIMIT_NOFILE)) | |
238 | |
239 @unittest.skipIf(not LINUX, "LINUX only") | |
240 @unittest.skipIf(not HAS_RLIMIT, "not supported") | |
241 def test_rlimit_set(self): | |
242 limit = thisproc.rlimit(psutil.RLIMIT_NOFILE) | |
243 self.execute(lambda: self.proc.rlimit(psutil.RLIMIT_NOFILE, limit)) | |
244 self.execute_w_exc((OSError, ValueError), lambda: self.proc.rlimit(-1)) | |
245 | |
246 @fewtimes_if_linux() | |
247 # Windows implementation is based on a single system-wide | |
248 # function (tested later). | |
249 @unittest.skipIf(WINDOWS, "worthless on WINDOWS") | |
250 def test_connections(self): | |
251 # TODO: UNIX sockets are temporarily implemented by parsing | |
252 # 'pfiles' cmd output; we don't want that part of the code to | |
253 # be executed. | |
254 with create_sockets(): | |
255 kind = 'inet' if SUNOS else 'all' | |
256 self.execute(lambda: self.proc.connections(kind)) | |
257 | |
258 @unittest.skipIf(not HAS_ENVIRON, "not supported") | |
259 def test_environ(self): | |
260 self.execute(self.proc.environ) | |
261 | |
262 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
263 def test_proc_info(self): | |
264 self.execute(lambda: cext.proc_info(os.getpid())) | |
265 | |
266 | |
267 class TestTerminatedProcessLeaks(TestProcessObjectLeaks): | |
268 """Repeat the tests above looking for leaks occurring when dealing | |
269 with terminated processes raising NoSuchProcess exception. | |
270 The C functions are still invoked but will follow different code | |
271 paths. We'll check those code paths. | |
272 """ | |
273 | |
274 @classmethod | |
275 def setUpClass(cls): | |
276 super().setUpClass() | |
277 cls.subp = spawn_testproc() | |
278 cls.proc = psutil.Process(cls.subp.pid) | |
279 cls.proc.kill() | |
280 cls.proc.wait() | |
281 | |
282 @classmethod | |
283 def tearDownClass(cls): | |
284 super().tearDownClass() | |
285 terminate(cls.subp) | |
286 | |
287 def call(self, fun): | |
288 try: | |
289 fun() | |
290 except psutil.NoSuchProcess: | |
291 pass | |
292 | |
293 if WINDOWS: | |
294 | |
295 def test_kill(self): | |
296 self.execute(self.proc.kill) | |
297 | |
298 def test_terminate(self): | |
299 self.execute(self.proc.terminate) | |
300 | |
301 def test_suspend(self): | |
302 self.execute(self.proc.suspend) | |
303 | |
304 def test_resume(self): | |
305 self.execute(self.proc.resume) | |
306 | |
307 def test_wait(self): | |
308 self.execute(self.proc.wait) | |
309 | |
310 def test_proc_info(self): | |
311 # test dual implementation | |
312 def call(): | |
313 try: | |
314 return cext.proc_info(self.proc.pid) | |
315 except ProcessLookupError: | |
316 pass | |
317 | |
318 self.execute(call) | |
319 | |
320 | |
321 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
322 class TestProcessDualImplementation(TestMemoryLeak): | |
323 | |
324 def test_cmdline_peb_true(self): | |
325 self.execute(lambda: cext.proc_cmdline(os.getpid(), use_peb=True)) | |
326 | |
327 def test_cmdline_peb_false(self): | |
328 self.execute(lambda: cext.proc_cmdline(os.getpid(), use_peb=False)) | |
329 | |
330 | |
331 # =================================================================== | |
332 # system APIs | |
333 # =================================================================== | |
334 | |
335 | |
336 class TestModuleFunctionsLeaks(TestMemoryLeak): | |
337 """Test leaks of psutil module functions.""" | |
338 | |
339 def test_coverage(self): | |
340 ns = system_namespace() | |
341 ns.test_class_coverage(self, ns.all) | |
342 | |
343 # --- cpu | |
344 | |
345 @fewtimes_if_linux() | |
346 def test_cpu_count(self): # logical | |
347 self.execute(lambda: psutil.cpu_count(logical=True)) | |
348 | |
349 @fewtimes_if_linux() | |
350 def test_cpu_count_physical(self): | |
351 self.execute(lambda: psutil.cpu_count(logical=False)) | |
352 | |
353 @fewtimes_if_linux() | |
354 def test_cpu_times(self): | |
355 self.execute(psutil.cpu_times) | |
356 | |
357 @fewtimes_if_linux() | |
358 def test_per_cpu_times(self): | |
359 self.execute(lambda: psutil.cpu_times(percpu=True)) | |
360 | |
361 @fewtimes_if_linux() | |
362 def test_cpu_stats(self): | |
363 self.execute(psutil.cpu_stats) | |
364 | |
365 @fewtimes_if_linux() | |
366 @unittest.skipIf(not HAS_CPU_FREQ, "not supported") | |
367 def test_cpu_freq(self): | |
368 self.execute(psutil.cpu_freq) | |
369 | |
370 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
371 def test_getloadavg(self): | |
372 psutil.getloadavg() | |
373 self.execute(psutil.getloadavg) | |
374 | |
375 # --- mem | |
376 | |
377 def test_virtual_memory(self): | |
378 self.execute(psutil.virtual_memory) | |
379 | |
380 # TODO: remove this skip when this gets fixed | |
381 @unittest.skipIf(SUNOS, "worthless on SUNOS (uses a subprocess)") | |
382 def test_swap_memory(self): | |
383 self.execute(psutil.swap_memory) | |
384 | |
385 def test_pid_exists(self): | |
386 times = FEW_TIMES if POSIX else self.times | |
387 self.execute(lambda: psutil.pid_exists(os.getpid()), times=times) | |
388 | |
389 # --- disk | |
390 | |
391 def test_disk_usage(self): | |
392 times = FEW_TIMES if POSIX else self.times | |
393 self.execute(lambda: psutil.disk_usage('.'), times=times) | |
394 | |
395 def test_disk_partitions(self): | |
396 self.execute(psutil.disk_partitions) | |
397 | |
398 @unittest.skipIf(LINUX and not os.path.exists('/proc/diskstats'), | |
399 '/proc/diskstats not available on this Linux version') | |
400 @fewtimes_if_linux() | |
401 def test_disk_io_counters(self): | |
402 self.execute(lambda: psutil.disk_io_counters(nowrap=False)) | |
403 | |
404 # --- proc | |
405 | |
406 @fewtimes_if_linux() | |
407 def test_pids(self): | |
408 self.execute(psutil.pids) | |
409 | |
410 # --- net | |
411 | |
412 @fewtimes_if_linux() | |
413 @unittest.skipIf(not HAS_NET_IO_COUNTERS, 'not supported') | |
414 def test_net_io_counters(self): | |
415 self.execute(lambda: psutil.net_io_counters(nowrap=False)) | |
416 | |
417 @fewtimes_if_linux() | |
418 @unittest.skipIf(MACOS and os.getuid() != 0, "need root access") | |
419 def test_net_connections(self): | |
420 # always opens and handle on Windows() (once) | |
421 psutil.net_connections(kind='all') | |
422 with create_sockets(): | |
423 self.execute(lambda: psutil.net_connections(kind='all')) | |
424 | |
425 def test_net_if_addrs(self): | |
426 # Note: verified that on Windows this was a false positive. | |
427 tolerance = 80 * 1024 if WINDOWS else self.tolerance | |
428 self.execute(psutil.net_if_addrs, tolerance=tolerance) | |
429 | |
430 def test_net_if_stats(self): | |
431 self.execute(psutil.net_if_stats) | |
432 | |
433 # --- sensors | |
434 | |
435 @fewtimes_if_linux() | |
436 @unittest.skipIf(not HAS_SENSORS_BATTERY, "not supported") | |
437 def test_sensors_battery(self): | |
438 self.execute(psutil.sensors_battery) | |
439 | |
440 @fewtimes_if_linux() | |
441 @unittest.skipIf(not HAS_SENSORS_TEMPERATURES, "not supported") | |
442 def test_sensors_temperatures(self): | |
443 self.execute(psutil.sensors_temperatures) | |
444 | |
445 @fewtimes_if_linux() | |
446 @unittest.skipIf(not HAS_SENSORS_FANS, "not supported") | |
447 def test_sensors_fans(self): | |
448 self.execute(psutil.sensors_fans) | |
449 | |
450 # --- others | |
451 | |
452 @fewtimes_if_linux() | |
453 def test_boot_time(self): | |
454 self.execute(psutil.boot_time) | |
455 | |
456 def test_users(self): | |
457 self.execute(psutil.users) | |
458 | |
459 if WINDOWS: | |
460 | |
461 # --- win services | |
462 | |
463 def test_win_service_iter(self): | |
464 self.execute(cext.winservice_enumerate) | |
465 | |
466 def test_win_service_get(self): | |
467 pass | |
468 | |
469 def test_win_service_get_config(self): | |
470 name = next(psutil.win_service_iter()).name() | |
471 self.execute(lambda: cext.winservice_query_config(name)) | |
472 | |
473 def test_win_service_get_status(self): | |
474 name = next(psutil.win_service_iter()).name() | |
475 self.execute(lambda: cext.winservice_query_status(name)) | |
476 | |
477 def test_win_service_get_description(self): | |
478 name = next(psutil.win_service_iter()).name() | |
479 self.execute(lambda: cext.winservice_query_descr(name)) | |
480 | |
481 | |
482 if __name__ == '__main__': | |
483 from psutil.tests.runner import run_from_name | |
484 run_from_name(__file__) |