Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/psutil/tests/test_windows.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
| author | guerler |
|---|---|
| date | Fri, 31 Jul 2020 00:32:28 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| 0:d30785e31577 | 1:56ad4e20f292 |
|---|---|
| 1 #!/usr/bin/env python3 | |
| 2 # -*- coding: UTF-8 -* | |
| 3 | |
| 4 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. | |
| 5 # Use of this source code is governed by a BSD-style license that can be | |
| 6 # found in the LICENSE file. | |
| 7 | |
| 8 """Windows specific tests.""" | |
| 9 | |
| 10 import datetime | |
| 11 import errno | |
| 12 import glob | |
| 13 import os | |
| 14 import platform | |
| 15 import re | |
| 16 import signal | |
| 17 import subprocess | |
| 18 import sys | |
| 19 import time | |
| 20 import warnings | |
| 21 | |
| 22 import psutil | |
| 23 from psutil import WINDOWS | |
| 24 from psutil._compat import FileNotFoundError | |
| 25 from psutil._compat import super | |
| 26 from psutil.tests import APPVEYOR | |
| 27 from psutil.tests import GITHUB_WHEELS | |
| 28 from psutil.tests import HAS_BATTERY | |
| 29 from psutil.tests import IS_64BIT | |
| 30 from psutil.tests import mock | |
| 31 from psutil.tests import PsutilTestCase | |
| 32 from psutil.tests import PY3 | |
| 33 from psutil.tests import PYPY | |
| 34 from psutil.tests import retry_on_failure | |
| 35 from psutil.tests import sh | |
| 36 from psutil.tests import spawn_testproc | |
| 37 from psutil.tests import terminate | |
| 38 from psutil.tests import TOLERANCE_DISK_USAGE | |
| 39 from psutil.tests import unittest | |
| 40 | |
| 41 | |
| 42 if WINDOWS and not PYPY: | |
| 43 with warnings.catch_warnings(): | |
| 44 warnings.simplefilter("ignore") | |
| 45 import win32api # requires "pip install pywin32" | |
| 46 import win32con | |
| 47 import win32process | |
| 48 import wmi # requires "pip install wmi" / "make setup-dev-env" | |
| 49 | |
| 50 | |
| 51 cext = psutil._psplatform.cext | |
| 52 | |
| 53 | |
| 54 def wrap_exceptions(fun): | |
| 55 def wrapper(self, *args, **kwargs): | |
| 56 try: | |
| 57 return fun(self, *args, **kwargs) | |
| 58 except OSError as err: | |
| 59 from psutil._pswindows import ACCESS_DENIED_SET | |
| 60 if err.errno in ACCESS_DENIED_SET: | |
| 61 raise psutil.AccessDenied(None, None) | |
| 62 if err.errno == errno.ESRCH: | |
| 63 raise psutil.NoSuchProcess(None, None) | |
| 64 raise | |
| 65 return wrapper | |
| 66 | |
| 67 | |
| 68 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
| 69 @unittest.skipIf(PYPY, "pywin32 not available on PYPY") | |
| 70 # https://github.com/giampaolo/psutil/pull/1762#issuecomment-632892692 | |
| 71 @unittest.skipIf(GITHUB_WHEELS and (not PY3 or not IS_64BIT), | |
| 72 "pywin32 broken on GITHUB + PY2") | |
| 73 class WindowsTestCase(PsutilTestCase): | |
| 74 pass | |
| 75 | |
| 76 | |
| 77 # =================================================================== | |
| 78 # System APIs | |
| 79 # =================================================================== | |
| 80 | |
| 81 | |
| 82 class TestCpuAPIs(WindowsTestCase): | |
| 83 | |
| 84 @unittest.skipIf('NUMBER_OF_PROCESSORS' not in os.environ, | |
| 85 'NUMBER_OF_PROCESSORS env var is not available') | |
| 86 def test_cpu_count_vs_NUMBER_OF_PROCESSORS(self): | |
| 87 # Will likely fail on many-cores systems: | |
| 88 # https://stackoverflow.com/questions/31209256 | |
| 89 num_cpus = int(os.environ['NUMBER_OF_PROCESSORS']) | |
| 90 self.assertEqual(num_cpus, psutil.cpu_count()) | |
| 91 | |
| 92 def test_cpu_count_vs_GetSystemInfo(self): | |
| 93 # Will likely fail on many-cores systems: | |
| 94 # https://stackoverflow.com/questions/31209256 | |
| 95 sys_value = win32api.GetSystemInfo()[5] | |
| 96 psutil_value = psutil.cpu_count() | |
| 97 self.assertEqual(sys_value, psutil_value) | |
| 98 | |
| 99 def test_cpu_count_logical_vs_wmi(self): | |
| 100 w = wmi.WMI() | |
| 101 proc = w.Win32_Processor()[0] | |
| 102 self.assertEqual(psutil.cpu_count(), proc.NumberOfLogicalProcessors) | |
| 103 | |
| 104 def test_cpu_count_phys_vs_wmi(self): | |
| 105 w = wmi.WMI() | |
| 106 proc = w.Win32_Processor()[0] | |
| 107 self.assertEqual(psutil.cpu_count(logical=False), proc.NumberOfCores) | |
| 108 | |
| 109 def test_cpu_count_vs_cpu_times(self): | |
| 110 self.assertEqual(psutil.cpu_count(), | |
| 111 len(psutil.cpu_times(percpu=True))) | |
| 112 | |
| 113 def test_cpu_freq(self): | |
| 114 w = wmi.WMI() | |
| 115 proc = w.Win32_Processor()[0] | |
| 116 self.assertEqual(proc.CurrentClockSpeed, psutil.cpu_freq().current) | |
| 117 self.assertEqual(proc.MaxClockSpeed, psutil.cpu_freq().max) | |
| 118 | |
| 119 | |
| 120 class TestSystemAPIs(WindowsTestCase): | |
| 121 | |
| 122 def test_nic_names(self): | |
| 123 out = sh('ipconfig /all') | |
| 124 nics = psutil.net_io_counters(pernic=True).keys() | |
| 125 for nic in nics: | |
| 126 if "pseudo-interface" in nic.replace(' ', '-').lower(): | |
| 127 continue | |
| 128 if nic not in out: | |
| 129 self.fail( | |
| 130 "%r nic wasn't found in 'ipconfig /all' output" % nic) | |
| 131 | |
| 132 def test_total_phymem(self): | |
| 133 w = wmi.WMI().Win32_ComputerSystem()[0] | |
| 134 self.assertEqual(int(w.TotalPhysicalMemory), | |
| 135 psutil.virtual_memory().total) | |
| 136 | |
| 137 # @unittest.skipIf(wmi is None, "wmi module is not installed") | |
| 138 # def test__UPTIME(self): | |
| 139 # # _UPTIME constant is not public but it is used internally | |
| 140 # # as value to return for pid 0 creation time. | |
| 141 # # WMI behaves the same. | |
| 142 # w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
| 143 # p = psutil.Process(0) | |
| 144 # wmic_create = str(w.CreationDate.split('.')[0]) | |
| 145 # psutil_create = time.strftime("%Y%m%d%H%M%S", | |
| 146 # time.localtime(p.create_time())) | |
| 147 | |
| 148 # Note: this test is not very reliable | |
| 149 @unittest.skipIf(APPVEYOR, "test not relieable on appveyor") | |
| 150 @retry_on_failure() | |
| 151 def test_pids(self): | |
| 152 # Note: this test might fail if the OS is starting/killing | |
| 153 # other processes in the meantime | |
| 154 w = wmi.WMI().Win32_Process() | |
| 155 wmi_pids = set([x.ProcessId for x in w]) | |
| 156 psutil_pids = set(psutil.pids()) | |
| 157 self.assertEqual(wmi_pids, psutil_pids) | |
| 158 | |
| 159 @retry_on_failure() | |
| 160 def test_disks(self): | |
| 161 ps_parts = psutil.disk_partitions(all=True) | |
| 162 wmi_parts = wmi.WMI().Win32_LogicalDisk() | |
| 163 for ps_part in ps_parts: | |
| 164 for wmi_part in wmi_parts: | |
| 165 if ps_part.device.replace('\\', '') == wmi_part.DeviceID: | |
| 166 if not ps_part.mountpoint: | |
| 167 # this is usually a CD-ROM with no disk inserted | |
| 168 break | |
| 169 if 'cdrom' in ps_part.opts: | |
| 170 break | |
| 171 if ps_part.mountpoint.startswith('A:'): | |
| 172 break # floppy | |
| 173 try: | |
| 174 usage = psutil.disk_usage(ps_part.mountpoint) | |
| 175 except FileNotFoundError: | |
| 176 # usually this is the floppy | |
| 177 break | |
| 178 self.assertEqual(usage.total, int(wmi_part.Size)) | |
| 179 wmi_free = int(wmi_part.FreeSpace) | |
| 180 self.assertEqual(usage.free, wmi_free) | |
| 181 # 10 MB tollerance | |
| 182 if abs(usage.free - wmi_free) > 10 * 1024 * 1024: | |
| 183 self.fail("psutil=%s, wmi=%s" % ( | |
| 184 usage.free, wmi_free)) | |
| 185 break | |
| 186 else: | |
| 187 self.fail("can't find partition %s" % repr(ps_part)) | |
| 188 | |
| 189 @retry_on_failure() | |
| 190 def test_disk_usage(self): | |
| 191 for disk in psutil.disk_partitions(): | |
| 192 if 'cdrom' in disk.opts: | |
| 193 continue | |
| 194 sys_value = win32api.GetDiskFreeSpaceEx(disk.mountpoint) | |
| 195 psutil_value = psutil.disk_usage(disk.mountpoint) | |
| 196 self.assertAlmostEqual(sys_value[0], psutil_value.free, | |
| 197 delta=TOLERANCE_DISK_USAGE) | |
| 198 self.assertAlmostEqual(sys_value[1], psutil_value.total, | |
| 199 delta=TOLERANCE_DISK_USAGE) | |
| 200 self.assertEqual(psutil_value.used, | |
| 201 psutil_value.total - psutil_value.free) | |
| 202 | |
| 203 def test_disk_partitions(self): | |
| 204 sys_value = [ | |
| 205 x + '\\' for x in win32api.GetLogicalDriveStrings().split("\\\x00") | |
| 206 if x and not x.startswith('A:')] | |
| 207 psutil_value = [x.mountpoint for x in psutil.disk_partitions(all=True) | |
| 208 if not x.mountpoint.startswith('A:')] | |
| 209 self.assertEqual(sys_value, psutil_value) | |
| 210 | |
| 211 def test_net_if_stats(self): | |
| 212 ps_names = set(cext.net_if_stats()) | |
| 213 wmi_adapters = wmi.WMI().Win32_NetworkAdapter() | |
| 214 wmi_names = set() | |
| 215 for wmi_adapter in wmi_adapters: | |
| 216 wmi_names.add(wmi_adapter.Name) | |
| 217 wmi_names.add(wmi_adapter.NetConnectionID) | |
| 218 self.assertTrue(ps_names & wmi_names, | |
| 219 "no common entries in %s, %s" % (ps_names, wmi_names)) | |
| 220 | |
| 221 def test_boot_time(self): | |
| 222 wmi_os = wmi.WMI().Win32_OperatingSystem() | |
| 223 wmi_btime_str = wmi_os[0].LastBootUpTime.split('.')[0] | |
| 224 wmi_btime_dt = datetime.datetime.strptime( | |
| 225 wmi_btime_str, "%Y%m%d%H%M%S") | |
| 226 psutil_dt = datetime.datetime.fromtimestamp(psutil.boot_time()) | |
| 227 diff = abs((wmi_btime_dt - psutil_dt).total_seconds()) | |
| 228 self.assertLessEqual(diff, 3) | |
| 229 | |
| 230 def test_boot_time_fluctuation(self): | |
| 231 # https://github.com/giampaolo/psutil/issues/1007 | |
| 232 with mock.patch('psutil._pswindows.cext.boot_time', return_value=5): | |
| 233 self.assertEqual(psutil.boot_time(), 5) | |
| 234 with mock.patch('psutil._pswindows.cext.boot_time', return_value=4): | |
| 235 self.assertEqual(psutil.boot_time(), 5) | |
| 236 with mock.patch('psutil._pswindows.cext.boot_time', return_value=6): | |
| 237 self.assertEqual(psutil.boot_time(), 5) | |
| 238 with mock.patch('psutil._pswindows.cext.boot_time', return_value=333): | |
| 239 self.assertEqual(psutil.boot_time(), 333) | |
| 240 | |
| 241 | |
| 242 # =================================================================== | |
| 243 # sensors_battery() | |
| 244 # =================================================================== | |
| 245 | |
| 246 | |
| 247 class TestSensorsBattery(WindowsTestCase): | |
| 248 | |
| 249 def test_has_battery(self): | |
| 250 if win32api.GetPwrCapabilities()['SystemBatteriesPresent']: | |
| 251 self.assertIsNotNone(psutil.sensors_battery()) | |
| 252 else: | |
| 253 self.assertIsNone(psutil.sensors_battery()) | |
| 254 | |
| 255 @unittest.skipIf(not HAS_BATTERY, "no battery") | |
| 256 def test_percent(self): | |
| 257 w = wmi.WMI() | |
| 258 battery_wmi = w.query('select * from Win32_Battery')[0] | |
| 259 battery_psutil = psutil.sensors_battery() | |
| 260 self.assertAlmostEqual( | |
| 261 battery_psutil.percent, battery_wmi.EstimatedChargeRemaining, | |
| 262 delta=1) | |
| 263 | |
| 264 @unittest.skipIf(not HAS_BATTERY, "no battery") | |
| 265 def test_power_plugged(self): | |
| 266 w = wmi.WMI() | |
| 267 battery_wmi = w.query('select * from Win32_Battery')[0] | |
| 268 battery_psutil = psutil.sensors_battery() | |
| 269 # Status codes: | |
| 270 # https://msdn.microsoft.com/en-us/library/aa394074(v=vs.85).aspx | |
| 271 self.assertEqual(battery_psutil.power_plugged, | |
| 272 battery_wmi.BatteryStatus == 2) | |
| 273 | |
| 274 def test_emulate_no_battery(self): | |
| 275 with mock.patch("psutil._pswindows.cext.sensors_battery", | |
| 276 return_value=(0, 128, 0, 0)) as m: | |
| 277 self.assertIsNone(psutil.sensors_battery()) | |
| 278 assert m.called | |
| 279 | |
| 280 def test_emulate_power_connected(self): | |
| 281 with mock.patch("psutil._pswindows.cext.sensors_battery", | |
| 282 return_value=(1, 0, 0, 0)) as m: | |
| 283 self.assertEqual(psutil.sensors_battery().secsleft, | |
| 284 psutil.POWER_TIME_UNLIMITED) | |
| 285 assert m.called | |
| 286 | |
| 287 def test_emulate_power_charging(self): | |
| 288 with mock.patch("psutil._pswindows.cext.sensors_battery", | |
| 289 return_value=(0, 8, 0, 0)) as m: | |
| 290 self.assertEqual(psutil.sensors_battery().secsleft, | |
| 291 psutil.POWER_TIME_UNLIMITED) | |
| 292 assert m.called | |
| 293 | |
| 294 def test_emulate_secs_left_unknown(self): | |
| 295 with mock.patch("psutil._pswindows.cext.sensors_battery", | |
| 296 return_value=(0, 0, 0, -1)) as m: | |
| 297 self.assertEqual(psutil.sensors_battery().secsleft, | |
| 298 psutil.POWER_TIME_UNKNOWN) | |
| 299 assert m.called | |
| 300 | |
| 301 | |
| 302 # =================================================================== | |
| 303 # Process APIs | |
| 304 # =================================================================== | |
| 305 | |
| 306 | |
| 307 class TestProcess(WindowsTestCase): | |
| 308 | |
| 309 @classmethod | |
| 310 def setUpClass(cls): | |
| 311 cls.pid = spawn_testproc().pid | |
| 312 | |
| 313 @classmethod | |
| 314 def tearDownClass(cls): | |
| 315 terminate(cls.pid) | |
| 316 | |
| 317 def test_issue_24(self): | |
| 318 p = psutil.Process(0) | |
| 319 self.assertRaises(psutil.AccessDenied, p.kill) | |
| 320 | |
| 321 def test_special_pid(self): | |
| 322 p = psutil.Process(4) | |
| 323 self.assertEqual(p.name(), 'System') | |
| 324 # use __str__ to access all common Process properties to check | |
| 325 # that nothing strange happens | |
| 326 str(p) | |
| 327 p.username() | |
| 328 self.assertTrue(p.create_time() >= 0.0) | |
| 329 try: | |
| 330 rss, vms = p.memory_info()[:2] | |
| 331 except psutil.AccessDenied: | |
| 332 # expected on Windows Vista and Windows 7 | |
| 333 if not platform.uname()[1] in ('vista', 'win-7', 'win7'): | |
| 334 raise | |
| 335 else: | |
| 336 self.assertTrue(rss > 0) | |
| 337 | |
| 338 def test_send_signal(self): | |
| 339 p = psutil.Process(self.pid) | |
| 340 self.assertRaises(ValueError, p.send_signal, signal.SIGINT) | |
| 341 | |
| 342 def test_num_handles_increment(self): | |
| 343 p = psutil.Process(os.getpid()) | |
| 344 before = p.num_handles() | |
| 345 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, | |
| 346 win32con.FALSE, os.getpid()) | |
| 347 after = p.num_handles() | |
| 348 self.assertEqual(after, before + 1) | |
| 349 win32api.CloseHandle(handle) | |
| 350 self.assertEqual(p.num_handles(), before) | |
| 351 | |
| 352 @unittest.skipIf(not sys.version_info >= (2, 7), | |
| 353 "CTRL_* signals not supported") | |
| 354 def test_ctrl_signals(self): | |
| 355 p = psutil.Process(self.spawn_testproc().pid) | |
| 356 p.send_signal(signal.CTRL_C_EVENT) | |
| 357 p.send_signal(signal.CTRL_BREAK_EVENT) | |
| 358 p.kill() | |
| 359 p.wait() | |
| 360 self.assertRaises(psutil.NoSuchProcess, | |
| 361 p.send_signal, signal.CTRL_C_EVENT) | |
| 362 self.assertRaises(psutil.NoSuchProcess, | |
| 363 p.send_signal, signal.CTRL_BREAK_EVENT) | |
| 364 | |
| 365 def test_username(self): | |
| 366 self.assertEqual(psutil.Process().username(), | |
| 367 win32api.GetUserNameEx(win32con.NameSamCompatible)) | |
| 368 | |
| 369 def test_cmdline(self): | |
| 370 sys_value = re.sub(' +', ' ', win32api.GetCommandLine()).strip() | |
| 371 psutil_value = ' '.join(psutil.Process().cmdline()) | |
| 372 self.assertEqual(sys_value, psutil_value) | |
| 373 | |
| 374 # XXX - occasional failures | |
| 375 | |
| 376 # def test_cpu_times(self): | |
| 377 # handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, | |
| 378 # win32con.FALSE, os.getpid()) | |
| 379 # self.addCleanup(win32api.CloseHandle, handle) | |
| 380 # sys_value = win32process.GetProcessTimes(handle) | |
| 381 # psutil_value = psutil.Process().cpu_times() | |
| 382 # self.assertAlmostEqual( | |
| 383 # psutil_value.user, sys_value['UserTime'] / 10000000.0, | |
| 384 # delta=0.2) | |
| 385 # self.assertAlmostEqual( | |
| 386 # psutil_value.user, sys_value['KernelTime'] / 10000000.0, | |
| 387 # delta=0.2) | |
| 388 | |
| 389 def test_nice(self): | |
| 390 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, | |
| 391 win32con.FALSE, os.getpid()) | |
| 392 self.addCleanup(win32api.CloseHandle, handle) | |
| 393 sys_value = win32process.GetPriorityClass(handle) | |
| 394 psutil_value = psutil.Process().nice() | |
| 395 self.assertEqual(psutil_value, sys_value) | |
| 396 | |
| 397 def test_memory_info(self): | |
| 398 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, | |
| 399 win32con.FALSE, self.pid) | |
| 400 self.addCleanup(win32api.CloseHandle, handle) | |
| 401 sys_value = win32process.GetProcessMemoryInfo(handle) | |
| 402 psutil_value = psutil.Process(self.pid).memory_info() | |
| 403 self.assertEqual( | |
| 404 sys_value['PeakWorkingSetSize'], psutil_value.peak_wset) | |
| 405 self.assertEqual( | |
| 406 sys_value['WorkingSetSize'], psutil_value.wset) | |
| 407 self.assertEqual( | |
| 408 sys_value['QuotaPeakPagedPoolUsage'], psutil_value.peak_paged_pool) | |
| 409 self.assertEqual( | |
| 410 sys_value['QuotaPagedPoolUsage'], psutil_value.paged_pool) | |
| 411 self.assertEqual( | |
| 412 sys_value['QuotaPeakNonPagedPoolUsage'], | |
| 413 psutil_value.peak_nonpaged_pool) | |
| 414 self.assertEqual( | |
| 415 sys_value['QuotaNonPagedPoolUsage'], psutil_value.nonpaged_pool) | |
| 416 self.assertEqual( | |
| 417 sys_value['PagefileUsage'], psutil_value.pagefile) | |
| 418 self.assertEqual( | |
| 419 sys_value['PeakPagefileUsage'], psutil_value.peak_pagefile) | |
| 420 | |
| 421 self.assertEqual(psutil_value.rss, psutil_value.wset) | |
| 422 self.assertEqual(psutil_value.vms, psutil_value.pagefile) | |
| 423 | |
| 424 def test_wait(self): | |
| 425 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, | |
| 426 win32con.FALSE, self.pid) | |
| 427 self.addCleanup(win32api.CloseHandle, handle) | |
| 428 p = psutil.Process(self.pid) | |
| 429 p.terminate() | |
| 430 psutil_value = p.wait() | |
| 431 sys_value = win32process.GetExitCodeProcess(handle) | |
| 432 self.assertEqual(psutil_value, sys_value) | |
| 433 | |
| 434 def test_cpu_affinity(self): | |
| 435 def from_bitmask(x): | |
| 436 return [i for i in range(64) if (1 << i) & x] | |
| 437 | |
| 438 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, | |
| 439 win32con.FALSE, self.pid) | |
| 440 self.addCleanup(win32api.CloseHandle, handle) | |
| 441 sys_value = from_bitmask( | |
| 442 win32process.GetProcessAffinityMask(handle)[0]) | |
| 443 psutil_value = psutil.Process(self.pid).cpu_affinity() | |
| 444 self.assertEqual(psutil_value, sys_value) | |
| 445 | |
| 446 def test_io_counters(self): | |
| 447 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, | |
| 448 win32con.FALSE, os.getpid()) | |
| 449 self.addCleanup(win32api.CloseHandle, handle) | |
| 450 sys_value = win32process.GetProcessIoCounters(handle) | |
| 451 psutil_value = psutil.Process().io_counters() | |
| 452 self.assertEqual( | |
| 453 psutil_value.read_count, sys_value['ReadOperationCount']) | |
| 454 self.assertEqual( | |
| 455 psutil_value.write_count, sys_value['WriteOperationCount']) | |
| 456 self.assertEqual( | |
| 457 psutil_value.read_bytes, sys_value['ReadTransferCount']) | |
| 458 self.assertEqual( | |
| 459 psutil_value.write_bytes, sys_value['WriteTransferCount']) | |
| 460 self.assertEqual( | |
| 461 psutil_value.other_count, sys_value['OtherOperationCount']) | |
| 462 self.assertEqual( | |
| 463 psutil_value.other_bytes, sys_value['OtherTransferCount']) | |
| 464 | |
| 465 def test_num_handles(self): | |
| 466 import ctypes | |
| 467 import ctypes.wintypes | |
| 468 PROCESS_QUERY_INFORMATION = 0x400 | |
| 469 handle = ctypes.windll.kernel32.OpenProcess( | |
| 470 PROCESS_QUERY_INFORMATION, 0, self.pid) | |
| 471 self.addCleanup(ctypes.windll.kernel32.CloseHandle, handle) | |
| 472 | |
| 473 hndcnt = ctypes.wintypes.DWORD() | |
| 474 ctypes.windll.kernel32.GetProcessHandleCount( | |
| 475 handle, ctypes.byref(hndcnt)) | |
| 476 sys_value = hndcnt.value | |
| 477 psutil_value = psutil.Process(self.pid).num_handles() | |
| 478 self.assertEqual(psutil_value, sys_value) | |
| 479 | |
| 480 def test_error_partial_copy(self): | |
| 481 # https://github.com/giampaolo/psutil/issues/875 | |
| 482 exc = WindowsError() | |
| 483 exc.winerror = 299 | |
| 484 with mock.patch("psutil._psplatform.cext.proc_cwd", side_effect=exc): | |
| 485 with mock.patch("time.sleep") as m: | |
| 486 p = psutil.Process() | |
| 487 self.assertRaises(psutil.AccessDenied, p.cwd) | |
| 488 self.assertGreaterEqual(m.call_count, 5) | |
| 489 | |
| 490 def test_exe(self): | |
| 491 # NtQuerySystemInformation succeeds if process is gone. Make sure | |
| 492 # it raises NSP for a non existent pid. | |
| 493 pid = psutil.pids()[-1] + 99999 | |
| 494 proc = psutil._psplatform.Process(pid) | |
| 495 self.assertRaises(psutil.NoSuchProcess, proc.exe) | |
| 496 | |
| 497 | |
| 498 class TestProcessWMI(WindowsTestCase): | |
| 499 """Compare Process API results with WMI.""" | |
| 500 | |
| 501 @classmethod | |
| 502 def setUpClass(cls): | |
| 503 cls.pid = spawn_testproc().pid | |
| 504 | |
| 505 @classmethod | |
| 506 def tearDownClass(cls): | |
| 507 terminate(cls.pid) | |
| 508 | |
| 509 def test_name(self): | |
| 510 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
| 511 p = psutil.Process(self.pid) | |
| 512 self.assertEqual(p.name(), w.Caption) | |
| 513 | |
| 514 # This fail on github because using virtualenv for test environment | |
| 515 @unittest.skipIf(GITHUB_WHEELS, "unreliable path on GITHUB_WHEELS") | |
| 516 def test_exe(self): | |
| 517 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
| 518 p = psutil.Process(self.pid) | |
| 519 # Note: wmi reports the exe as a lower case string. | |
| 520 # Being Windows paths case-insensitive we ignore that. | |
| 521 self.assertEqual(p.exe().lower(), w.ExecutablePath.lower()) | |
| 522 | |
| 523 def test_cmdline(self): | |
| 524 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
| 525 p = psutil.Process(self.pid) | |
| 526 self.assertEqual(' '.join(p.cmdline()), | |
| 527 w.CommandLine.replace('"', '')) | |
| 528 | |
| 529 def test_username(self): | |
| 530 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
| 531 p = psutil.Process(self.pid) | |
| 532 domain, _, username = w.GetOwner() | |
| 533 username = "%s\\%s" % (domain, username) | |
| 534 self.assertEqual(p.username(), username) | |
| 535 | |
| 536 @retry_on_failure() | |
| 537 def test_memory_rss(self): | |
| 538 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
| 539 p = psutil.Process(self.pid) | |
| 540 rss = p.memory_info().rss | |
| 541 self.assertEqual(rss, int(w.WorkingSetSize)) | |
| 542 | |
| 543 @retry_on_failure() | |
| 544 def test_memory_vms(self): | |
| 545 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
| 546 p = psutil.Process(self.pid) | |
| 547 vms = p.memory_info().vms | |
| 548 # http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx | |
| 549 # ...claims that PageFileUsage is represented in Kilo | |
| 550 # bytes but funnily enough on certain platforms bytes are | |
| 551 # returned instead. | |
| 552 wmi_usage = int(w.PageFileUsage) | |
| 553 if (vms != wmi_usage) and (vms != wmi_usage * 1024): | |
| 554 self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms)) | |
| 555 | |
| 556 def test_create_time(self): | |
| 557 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
| 558 p = psutil.Process(self.pid) | |
| 559 wmic_create = str(w.CreationDate.split('.')[0]) | |
| 560 psutil_create = time.strftime("%Y%m%d%H%M%S", | |
| 561 time.localtime(p.create_time())) | |
| 562 self.assertEqual(wmic_create, psutil_create) | |
| 563 | |
| 564 | |
| 565 # --- | |
| 566 | |
| 567 | |
| 568 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
| 569 class TestDualProcessImplementation(PsutilTestCase): | |
| 570 """ | |
| 571 Certain APIs on Windows have 2 internal implementations, one | |
| 572 based on documented Windows APIs, another one based | |
| 573 NtQuerySystemInformation() which gets called as fallback in | |
| 574 case the first fails because of limited permission error. | |
| 575 Here we test that the two methods return the exact same value, | |
| 576 see: | |
| 577 https://github.com/giampaolo/psutil/issues/304 | |
| 578 """ | |
| 579 | |
| 580 @classmethod | |
| 581 def setUpClass(cls): | |
| 582 cls.pid = spawn_testproc().pid | |
| 583 | |
| 584 @classmethod | |
| 585 def tearDownClass(cls): | |
| 586 terminate(cls.pid) | |
| 587 | |
| 588 def test_memory_info(self): | |
| 589 mem_1 = psutil.Process(self.pid).memory_info() | |
| 590 with mock.patch("psutil._psplatform.cext.proc_memory_info", | |
| 591 side_effect=OSError(errno.EPERM, "msg")) as fun: | |
| 592 mem_2 = psutil.Process(self.pid).memory_info() | |
| 593 self.assertEqual(len(mem_1), len(mem_2)) | |
| 594 for i in range(len(mem_1)): | |
| 595 self.assertGreaterEqual(mem_1[i], 0) | |
| 596 self.assertGreaterEqual(mem_2[i], 0) | |
| 597 self.assertAlmostEqual(mem_1[i], mem_2[i], delta=512) | |
| 598 assert fun.called | |
| 599 | |
| 600 def test_create_time(self): | |
| 601 ctime = psutil.Process(self.pid).create_time() | |
| 602 with mock.patch("psutil._psplatform.cext.proc_times", | |
| 603 side_effect=OSError(errno.EPERM, "msg")) as fun: | |
| 604 self.assertEqual(psutil.Process(self.pid).create_time(), ctime) | |
| 605 assert fun.called | |
| 606 | |
| 607 def test_cpu_times(self): | |
| 608 cpu_times_1 = psutil.Process(self.pid).cpu_times() | |
| 609 with mock.patch("psutil._psplatform.cext.proc_times", | |
| 610 side_effect=OSError(errno.EPERM, "msg")) as fun: | |
| 611 cpu_times_2 = psutil.Process(self.pid).cpu_times() | |
| 612 assert fun.called | |
| 613 self.assertAlmostEqual( | |
| 614 cpu_times_1.user, cpu_times_2.user, delta=0.01) | |
| 615 self.assertAlmostEqual( | |
| 616 cpu_times_1.system, cpu_times_2.system, delta=0.01) | |
| 617 | |
| 618 def test_io_counters(self): | |
| 619 io_counters_1 = psutil.Process(self.pid).io_counters() | |
| 620 with mock.patch("psutil._psplatform.cext.proc_io_counters", | |
| 621 side_effect=OSError(errno.EPERM, "msg")) as fun: | |
| 622 io_counters_2 = psutil.Process(self.pid).io_counters() | |
| 623 for i in range(len(io_counters_1)): | |
| 624 self.assertAlmostEqual( | |
| 625 io_counters_1[i], io_counters_2[i], delta=5) | |
| 626 assert fun.called | |
| 627 | |
| 628 def test_num_handles(self): | |
| 629 num_handles = psutil.Process(self.pid).num_handles() | |
| 630 with mock.patch("psutil._psplatform.cext.proc_num_handles", | |
| 631 side_effect=OSError(errno.EPERM, "msg")) as fun: | |
| 632 self.assertEqual(psutil.Process(self.pid).num_handles(), | |
| 633 num_handles) | |
| 634 assert fun.called | |
| 635 | |
| 636 def test_cmdline(self): | |
| 637 from psutil._pswindows import convert_oserror | |
| 638 for pid in psutil.pids(): | |
| 639 try: | |
| 640 a = cext.proc_cmdline(pid, use_peb=True) | |
| 641 b = cext.proc_cmdline(pid, use_peb=False) | |
| 642 except OSError as err: | |
| 643 err = convert_oserror(err) | |
| 644 if not isinstance(err, (psutil.AccessDenied, | |
| 645 psutil.NoSuchProcess)): | |
| 646 raise | |
| 647 else: | |
| 648 self.assertEqual(a, b) | |
| 649 | |
| 650 | |
| 651 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
| 652 class RemoteProcessTestCase(PsutilTestCase): | |
| 653 """Certain functions require calling ReadProcessMemory. | |
| 654 This trivially works when called on the current process. | |
| 655 Check that this works on other processes, especially when they | |
| 656 have a different bitness. | |
| 657 """ | |
| 658 | |
| 659 @staticmethod | |
| 660 def find_other_interpreter(): | |
| 661 # find a python interpreter that is of the opposite bitness from us | |
| 662 code = "import sys; sys.stdout.write(str(sys.maxsize > 2**32))" | |
| 663 | |
| 664 # XXX: a different and probably more stable approach might be to access | |
| 665 # the registry but accessing 64 bit paths from a 32 bit process | |
| 666 for filename in glob.glob(r"C:\Python*\python.exe"): | |
| 667 proc = subprocess.Popen(args=[filename, "-c", code], | |
| 668 stdout=subprocess.PIPE, | |
| 669 stderr=subprocess.STDOUT) | |
| 670 output, _ = proc.communicate() | |
| 671 proc.wait() | |
| 672 if output == str(not IS_64BIT): | |
| 673 return filename | |
| 674 | |
| 675 test_args = ["-c", "import sys; sys.stdin.read()"] | |
| 676 | |
| 677 def setUp(self): | |
| 678 super().setUp() | |
| 679 | |
| 680 other_python = self.find_other_interpreter() | |
| 681 if other_python is None: | |
| 682 raise unittest.SkipTest( | |
| 683 "could not find interpreter with opposite bitness") | |
| 684 if IS_64BIT: | |
| 685 self.python64 = sys.executable | |
| 686 self.python32 = other_python | |
| 687 else: | |
| 688 self.python64 = other_python | |
| 689 self.python32 = sys.executable | |
| 690 | |
| 691 env = os.environ.copy() | |
| 692 env["THINK_OF_A_NUMBER"] = str(os.getpid()) | |
| 693 self.proc32 = self.spawn_testproc( | |
| 694 [self.python32] + self.test_args, | |
| 695 env=env, | |
| 696 stdin=subprocess.PIPE) | |
| 697 self.proc64 = self.spawn_testproc( | |
| 698 [self.python64] + self.test_args, | |
| 699 env=env, | |
| 700 stdin=subprocess.PIPE) | |
| 701 | |
| 702 def tearDown(self): | |
| 703 super().tearDown() | |
| 704 self.proc32.communicate() | |
| 705 self.proc64.communicate() | |
| 706 | |
| 707 def test_cmdline_32(self): | |
| 708 p = psutil.Process(self.proc32.pid) | |
| 709 self.assertEqual(len(p.cmdline()), 3) | |
| 710 self.assertEqual(p.cmdline()[1:], self.test_args) | |
| 711 | |
| 712 def test_cmdline_64(self): | |
| 713 p = psutil.Process(self.proc64.pid) | |
| 714 self.assertEqual(len(p.cmdline()), 3) | |
| 715 self.assertEqual(p.cmdline()[1:], self.test_args) | |
| 716 | |
| 717 def test_cwd_32(self): | |
| 718 p = psutil.Process(self.proc32.pid) | |
| 719 self.assertEqual(p.cwd(), os.getcwd()) | |
| 720 | |
| 721 def test_cwd_64(self): | |
| 722 p = psutil.Process(self.proc64.pid) | |
| 723 self.assertEqual(p.cwd(), os.getcwd()) | |
| 724 | |
| 725 def test_environ_32(self): | |
| 726 p = psutil.Process(self.proc32.pid) | |
| 727 e = p.environ() | |
| 728 self.assertIn("THINK_OF_A_NUMBER", e) | |
| 729 self.assertEquals(e["THINK_OF_A_NUMBER"], str(os.getpid())) | |
| 730 | |
| 731 def test_environ_64(self): | |
| 732 p = psutil.Process(self.proc64.pid) | |
| 733 try: | |
| 734 p.environ() | |
| 735 except psutil.AccessDenied: | |
| 736 pass | |
| 737 | |
| 738 | |
| 739 # =================================================================== | |
| 740 # Windows services | |
| 741 # =================================================================== | |
| 742 | |
| 743 | |
| 744 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
| 745 class TestServices(PsutilTestCase): | |
| 746 | |
| 747 def test_win_service_iter(self): | |
| 748 valid_statuses = set([ | |
| 749 "running", | |
| 750 "paused", | |
| 751 "start", | |
| 752 "pause", | |
| 753 "continue", | |
| 754 "stop", | |
| 755 "stopped", | |
| 756 ]) | |
| 757 valid_start_types = set([ | |
| 758 "automatic", | |
| 759 "manual", | |
| 760 "disabled", | |
| 761 ]) | |
| 762 valid_statuses = set([ | |
| 763 "running", | |
| 764 "paused", | |
| 765 "start_pending", | |
| 766 "pause_pending", | |
| 767 "continue_pending", | |
| 768 "stop_pending", | |
| 769 "stopped" | |
| 770 ]) | |
| 771 for serv in psutil.win_service_iter(): | |
| 772 data = serv.as_dict() | |
| 773 self.assertIsInstance(data['name'], str) | |
| 774 self.assertNotEqual(data['name'].strip(), "") | |
| 775 self.assertIsInstance(data['display_name'], str) | |
| 776 self.assertIsInstance(data['username'], str) | |
| 777 self.assertIn(data['status'], valid_statuses) | |
| 778 if data['pid'] is not None: | |
| 779 psutil.Process(data['pid']) | |
| 780 self.assertIsInstance(data['binpath'], str) | |
| 781 self.assertIsInstance(data['username'], str) | |
| 782 self.assertIsInstance(data['start_type'], str) | |
| 783 self.assertIn(data['start_type'], valid_start_types) | |
| 784 self.assertIn(data['status'], valid_statuses) | |
| 785 self.assertIsInstance(data['description'], str) | |
| 786 pid = serv.pid() | |
| 787 if pid is not None: | |
| 788 p = psutil.Process(pid) | |
| 789 self.assertTrue(p.is_running()) | |
| 790 # win_service_get | |
| 791 s = psutil.win_service_get(serv.name()) | |
| 792 # test __eq__ | |
| 793 self.assertEqual(serv, s) | |
| 794 | |
| 795 def test_win_service_get(self): | |
| 796 ERROR_SERVICE_DOES_NOT_EXIST = \ | |
| 797 psutil._psplatform.cext.ERROR_SERVICE_DOES_NOT_EXIST | |
| 798 ERROR_ACCESS_DENIED = psutil._psplatform.cext.ERROR_ACCESS_DENIED | |
| 799 | |
| 800 name = next(psutil.win_service_iter()).name() | |
| 801 with self.assertRaises(psutil.NoSuchProcess) as cm: | |
| 802 psutil.win_service_get(name + '???') | |
| 803 self.assertEqual(cm.exception.name, name + '???') | |
| 804 | |
| 805 # test NoSuchProcess | |
| 806 service = psutil.win_service_get(name) | |
| 807 if PY3: | |
| 808 args = (0, "msg", 0, ERROR_SERVICE_DOES_NOT_EXIST) | |
| 809 else: | |
| 810 args = (ERROR_SERVICE_DOES_NOT_EXIST, "msg") | |
| 811 exc = WindowsError(*args) | |
| 812 with mock.patch("psutil._psplatform.cext.winservice_query_status", | |
| 813 side_effect=exc): | |
| 814 self.assertRaises(psutil.NoSuchProcess, service.status) | |
| 815 with mock.patch("psutil._psplatform.cext.winservice_query_config", | |
| 816 side_effect=exc): | |
| 817 self.assertRaises(psutil.NoSuchProcess, service.username) | |
| 818 | |
| 819 # test AccessDenied | |
| 820 if PY3: | |
| 821 args = (0, "msg", 0, ERROR_ACCESS_DENIED) | |
| 822 else: | |
| 823 args = (ERROR_ACCESS_DENIED, "msg") | |
| 824 exc = WindowsError(*args) | |
| 825 with mock.patch("psutil._psplatform.cext.winservice_query_status", | |
| 826 side_effect=exc): | |
| 827 self.assertRaises(psutil.AccessDenied, service.status) | |
| 828 with mock.patch("psutil._psplatform.cext.winservice_query_config", | |
| 829 side_effect=exc): | |
| 830 self.assertRaises(psutil.AccessDenied, service.username) | |
| 831 | |
| 832 # test __str__ and __repr__ | |
| 833 self.assertIn(service.name(), str(service)) | |
| 834 self.assertIn(service.display_name(), str(service)) | |
| 835 self.assertIn(service.name(), repr(service)) | |
| 836 self.assertIn(service.display_name(), repr(service)) | |
| 837 | |
| 838 | |
| 839 if __name__ == '__main__': | |
| 840 from psutil.tests.runner import run_from_name | |
| 841 run_from_name(__file__) |
