Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/psutil/tests/test_windows.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
| author | shellac |
|---|---|
| date | Sat, 02 May 2020 07:14:21 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:26e78fe6e8c4 |
|---|---|
| 1 #!/usr/bin/env python3 | |
| 2 # -*- coding: UTF-8 -* | |
| 3 | |
| 4 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. | |
| 5 # Use of this source code is governed by a BSD-style license that can be | |
| 6 # found in the LICENSE file. | |
| 7 | |
| 8 """Windows specific tests.""" | |
| 9 | |
| 10 import datetime | |
| 11 import errno | |
| 12 import glob | |
| 13 import os | |
| 14 import platform | |
| 15 import re | |
| 16 import signal | |
| 17 import subprocess | |
| 18 import sys | |
| 19 import time | |
| 20 import warnings | |
| 21 | |
| 22 import psutil | |
| 23 from psutil import WINDOWS | |
| 24 from psutil._compat import FileNotFoundError | |
| 25 from psutil.tests import APPVEYOR | |
| 26 from psutil.tests import get_test_subprocess | |
| 27 from psutil.tests import HAS_BATTERY | |
| 28 from psutil.tests import mock | |
| 29 from psutil.tests import PY3 | |
| 30 from psutil.tests import PYPY | |
| 31 from psutil.tests import reap_children | |
| 32 from psutil.tests import retry_on_failure | |
| 33 from psutil.tests import sh | |
| 34 from psutil.tests import unittest | |
| 35 | |
| 36 | |
| 37 if WINDOWS and not PYPY: | |
| 38 with warnings.catch_warnings(): | |
| 39 warnings.simplefilter("ignore") | |
| 40 import win32api # requires "pip install pypiwin32" | |
| 41 import win32con | |
| 42 import win32process | |
| 43 import wmi # requires "pip install wmi" / "make setup-dev-env" | |
| 44 | |
| 45 | |
| 46 cext = psutil._psplatform.cext | |
| 47 | |
| 48 # are we a 64 bit process | |
| 49 IS_64_BIT = sys.maxsize > 2**32 | |
| 50 | |
| 51 | |
| 52 def wrap_exceptions(fun): | |
| 53 def wrapper(self, *args, **kwargs): | |
| 54 try: | |
| 55 return fun(self, *args, **kwargs) | |
| 56 except OSError as err: | |
| 57 from psutil._pswindows import ACCESS_DENIED_SET | |
| 58 if err.errno in ACCESS_DENIED_SET: | |
| 59 raise psutil.AccessDenied(None, None) | |
| 60 if err.errno == errno.ESRCH: | |
| 61 raise psutil.NoSuchProcess(None, None) | |
| 62 raise | |
| 63 return wrapper | |
| 64 | |
| 65 | |
| 66 @unittest.skipIf(PYPY, "pywin32 not available on PYPY") # skip whole module | |
| 67 class TestCase(unittest.TestCase): | |
| 68 pass | |
| 69 | |
| 70 | |
| 71 # =================================================================== | |
| 72 # System APIs | |
| 73 # =================================================================== | |
| 74 | |
| 75 | |
| 76 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
| 77 class TestCpuAPIs(TestCase): | |
| 78 | |
| 79 @unittest.skipIf('NUMBER_OF_PROCESSORS' not in os.environ, | |
| 80 'NUMBER_OF_PROCESSORS env var is not available') | |
| 81 def test_cpu_count_vs_NUMBER_OF_PROCESSORS(self): | |
| 82 # Will likely fail on many-cores systems: | |
| 83 # https://stackoverflow.com/questions/31209256 | |
| 84 num_cpus = int(os.environ['NUMBER_OF_PROCESSORS']) | |
| 85 self.assertEqual(num_cpus, psutil.cpu_count()) | |
| 86 | |
| 87 def test_cpu_count_vs_GetSystemInfo(self): | |
| 88 # Will likely fail on many-cores systems: | |
| 89 # https://stackoverflow.com/questions/31209256 | |
| 90 sys_value = win32api.GetSystemInfo()[5] | |
| 91 psutil_value = psutil.cpu_count() | |
| 92 self.assertEqual(sys_value, psutil_value) | |
| 93 | |
| 94 def test_cpu_count_logical_vs_wmi(self): | |
| 95 w = wmi.WMI() | |
| 96 proc = w.Win32_Processor()[0] | |
| 97 self.assertEqual(psutil.cpu_count(), proc.NumberOfLogicalProcessors) | |
| 98 | |
| 99 def test_cpu_count_phys_vs_wmi(self): | |
| 100 w = wmi.WMI() | |
| 101 proc = w.Win32_Processor()[0] | |
| 102 self.assertEqual(psutil.cpu_count(logical=False), proc.NumberOfCores) | |
| 103 | |
| 104 def test_cpu_count_vs_cpu_times(self): | |
| 105 self.assertEqual(psutil.cpu_count(), | |
| 106 len(psutil.cpu_times(percpu=True))) | |
| 107 | |
| 108 def test_cpu_freq(self): | |
| 109 w = wmi.WMI() | |
| 110 proc = w.Win32_Processor()[0] | |
| 111 self.assertEqual(proc.CurrentClockSpeed, psutil.cpu_freq().current) | |
| 112 self.assertEqual(proc.MaxClockSpeed, psutil.cpu_freq().max) | |
| 113 | |
| 114 | |
| 115 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
| 116 class TestSystemAPIs(TestCase): | |
| 117 | |
| 118 def test_nic_names(self): | |
| 119 out = sh('ipconfig /all') | |
| 120 nics = psutil.net_io_counters(pernic=True).keys() | |
| 121 for nic in nics: | |
| 122 if "pseudo-interface" in nic.replace(' ', '-').lower(): | |
| 123 continue | |
| 124 if nic not in out: | |
| 125 self.fail( | |
| 126 "%r nic wasn't found in 'ipconfig /all' output" % nic) | |
| 127 | |
| 128 def test_total_phymem(self): | |
| 129 w = wmi.WMI().Win32_ComputerSystem()[0] | |
| 130 self.assertEqual(int(w.TotalPhysicalMemory), | |
| 131 psutil.virtual_memory().total) | |
| 132 | |
| 133 # @unittest.skipIf(wmi is None, "wmi module is not installed") | |
| 134 # def test__UPTIME(self): | |
| 135 # # _UPTIME constant is not public but it is used internally | |
| 136 # # as value to return for pid 0 creation time. | |
| 137 # # WMI behaves the same. | |
| 138 # w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
| 139 # p = psutil.Process(0) | |
| 140 # wmic_create = str(w.CreationDate.split('.')[0]) | |
| 141 # psutil_create = time.strftime("%Y%m%d%H%M%S", | |
| 142 # time.localtime(p.create_time())) | |
| 143 | |
| 144 # Note: this test is not very reliable | |
| 145 @unittest.skipIf(APPVEYOR, "test not relieable on appveyor") | |
| 146 @retry_on_failure() | |
| 147 def test_pids(self): | |
| 148 # Note: this test might fail if the OS is starting/killing | |
| 149 # other processes in the meantime | |
| 150 w = wmi.WMI().Win32_Process() | |
| 151 wmi_pids = set([x.ProcessId for x in w]) | |
| 152 psutil_pids = set(psutil.pids()) | |
| 153 self.assertEqual(wmi_pids, psutil_pids) | |
| 154 | |
| 155 @retry_on_failure() | |
| 156 def test_disks(self): | |
| 157 ps_parts = psutil.disk_partitions(all=True) | |
| 158 wmi_parts = wmi.WMI().Win32_LogicalDisk() | |
| 159 for ps_part in ps_parts: | |
| 160 for wmi_part in wmi_parts: | |
| 161 if ps_part.device.replace('\\', '') == wmi_part.DeviceID: | |
| 162 if not ps_part.mountpoint: | |
| 163 # this is usually a CD-ROM with no disk inserted | |
| 164 break | |
| 165 if 'cdrom' in ps_part.opts: | |
| 166 break | |
| 167 try: | |
| 168 usage = psutil.disk_usage(ps_part.mountpoint) | |
| 169 except FileNotFoundError: | |
| 170 # usually this is the floppy | |
| 171 break | |
| 172 self.assertEqual(usage.total, int(wmi_part.Size)) | |
| 173 wmi_free = int(wmi_part.FreeSpace) | |
| 174 self.assertEqual(usage.free, wmi_free) | |
| 175 # 10 MB tollerance | |
| 176 if abs(usage.free - wmi_free) > 10 * 1024 * 1024: | |
| 177 self.fail("psutil=%s, wmi=%s" % ( | |
| 178 usage.free, wmi_free)) | |
| 179 break | |
| 180 else: | |
| 181 self.fail("can't find partition %s" % repr(ps_part)) | |
| 182 | |
| 183 def test_disk_usage(self): | |
| 184 for disk in psutil.disk_partitions(): | |
| 185 if 'cdrom' in disk.opts: | |
| 186 continue | |
| 187 sys_value = win32api.GetDiskFreeSpaceEx(disk.mountpoint) | |
| 188 psutil_value = psutil.disk_usage(disk.mountpoint) | |
| 189 self.assertAlmostEqual(sys_value[0], psutil_value.free, | |
| 190 delta=1024 * 1024) | |
| 191 self.assertAlmostEqual(sys_value[1], psutil_value.total, | |
| 192 delta=1024 * 1024) | |
| 193 self.assertEqual(psutil_value.used, | |
| 194 psutil_value.total - psutil_value.free) | |
| 195 | |
| 196 def test_disk_partitions(self): | |
| 197 sys_value = [ | |
| 198 x + '\\' for x in win32api.GetLogicalDriveStrings().split("\\\x00") | |
| 199 if x and not x.startswith('A:')] | |
| 200 psutil_value = [x.mountpoint for x in psutil.disk_partitions(all=True)] | |
| 201 self.assertEqual(sys_value, psutil_value) | |
| 202 | |
| 203 def test_net_if_stats(self): | |
| 204 ps_names = set(cext.net_if_stats()) | |
| 205 wmi_adapters = wmi.WMI().Win32_NetworkAdapter() | |
| 206 wmi_names = set() | |
| 207 for wmi_adapter in wmi_adapters: | |
| 208 wmi_names.add(wmi_adapter.Name) | |
| 209 wmi_names.add(wmi_adapter.NetConnectionID) | |
| 210 self.assertTrue(ps_names & wmi_names, | |
| 211 "no common entries in %s, %s" % (ps_names, wmi_names)) | |
| 212 | |
| 213 def test_boot_time(self): | |
| 214 wmi_os = wmi.WMI().Win32_OperatingSystem() | |
| 215 wmi_btime_str = wmi_os[0].LastBootUpTime.split('.')[0] | |
| 216 wmi_btime_dt = datetime.datetime.strptime( | |
| 217 wmi_btime_str, "%Y%m%d%H%M%S") | |
| 218 psutil_dt = datetime.datetime.fromtimestamp(psutil.boot_time()) | |
| 219 diff = abs((wmi_btime_dt - psutil_dt).total_seconds()) | |
| 220 self.assertLessEqual(diff, 3) | |
| 221 | |
| 222 def test_boot_time_fluctuation(self): | |
| 223 # https://github.com/giampaolo/psutil/issues/1007 | |
| 224 with mock.patch('psutil._pswindows.cext.boot_time', return_value=5): | |
| 225 self.assertEqual(psutil.boot_time(), 5) | |
| 226 with mock.patch('psutil._pswindows.cext.boot_time', return_value=4): | |
| 227 self.assertEqual(psutil.boot_time(), 5) | |
| 228 with mock.patch('psutil._pswindows.cext.boot_time', return_value=6): | |
| 229 self.assertEqual(psutil.boot_time(), 5) | |
| 230 with mock.patch('psutil._pswindows.cext.boot_time', return_value=333): | |
| 231 self.assertEqual(psutil.boot_time(), 333) | |
| 232 | |
| 233 | |
| 234 # =================================================================== | |
| 235 # sensors_battery() | |
| 236 # =================================================================== | |
| 237 | |
| 238 | |
| 239 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
| 240 class TestSensorsBattery(TestCase): | |
| 241 | |
| 242 def test_has_battery(self): | |
| 243 if win32api.GetPwrCapabilities()['SystemBatteriesPresent']: | |
| 244 self.assertIsNotNone(psutil.sensors_battery()) | |
| 245 else: | |
| 246 self.assertIsNone(psutil.sensors_battery()) | |
| 247 | |
| 248 @unittest.skipIf(not HAS_BATTERY, "no battery") | |
| 249 def test_percent(self): | |
| 250 w = wmi.WMI() | |
| 251 battery_wmi = w.query('select * from Win32_Battery')[0] | |
| 252 battery_psutil = psutil.sensors_battery() | |
| 253 self.assertAlmostEqual( | |
| 254 battery_psutil.percent, battery_wmi.EstimatedChargeRemaining, | |
| 255 delta=1) | |
| 256 | |
| 257 @unittest.skipIf(not HAS_BATTERY, "no battery") | |
| 258 def test_power_plugged(self): | |
| 259 w = wmi.WMI() | |
| 260 battery_wmi = w.query('select * from Win32_Battery')[0] | |
| 261 battery_psutil = psutil.sensors_battery() | |
| 262 # Status codes: | |
| 263 # https://msdn.microsoft.com/en-us/library/aa394074(v=vs.85).aspx | |
| 264 self.assertEqual(battery_psutil.power_plugged, | |
| 265 battery_wmi.BatteryStatus == 2) | |
| 266 | |
| 267 def test_emulate_no_battery(self): | |
| 268 with mock.patch("psutil._pswindows.cext.sensors_battery", | |
| 269 return_value=(0, 128, 0, 0)) as m: | |
| 270 self.assertIsNone(psutil.sensors_battery()) | |
| 271 assert m.called | |
| 272 | |
| 273 def test_emulate_power_connected(self): | |
| 274 with mock.patch("psutil._pswindows.cext.sensors_battery", | |
| 275 return_value=(1, 0, 0, 0)) as m: | |
| 276 self.assertEqual(psutil.sensors_battery().secsleft, | |
| 277 psutil.POWER_TIME_UNLIMITED) | |
| 278 assert m.called | |
| 279 | |
| 280 def test_emulate_power_charging(self): | |
| 281 with mock.patch("psutil._pswindows.cext.sensors_battery", | |
| 282 return_value=(0, 8, 0, 0)) as m: | |
| 283 self.assertEqual(psutil.sensors_battery().secsleft, | |
| 284 psutil.POWER_TIME_UNLIMITED) | |
| 285 assert m.called | |
| 286 | |
| 287 def test_emulate_secs_left_unknown(self): | |
| 288 with mock.patch("psutil._pswindows.cext.sensors_battery", | |
| 289 return_value=(0, 0, 0, -1)) as m: | |
| 290 self.assertEqual(psutil.sensors_battery().secsleft, | |
| 291 psutil.POWER_TIME_UNKNOWN) | |
| 292 assert m.called | |
| 293 | |
| 294 | |
| 295 # =================================================================== | |
| 296 # Process APIs | |
| 297 # =================================================================== | |
| 298 | |
| 299 | |
| 300 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
| 301 class TestProcess(TestCase): | |
| 302 | |
| 303 @classmethod | |
| 304 def setUpClass(cls): | |
| 305 cls.pid = get_test_subprocess().pid | |
| 306 | |
| 307 @classmethod | |
| 308 def tearDownClass(cls): | |
| 309 reap_children() | |
| 310 | |
| 311 def test_issue_24(self): | |
| 312 p = psutil.Process(0) | |
| 313 self.assertRaises(psutil.AccessDenied, p.kill) | |
| 314 | |
| 315 def test_special_pid(self): | |
| 316 p = psutil.Process(4) | |
| 317 self.assertEqual(p.name(), 'System') | |
| 318 # use __str__ to access all common Process properties to check | |
| 319 # that nothing strange happens | |
| 320 str(p) | |
| 321 p.username() | |
| 322 self.assertTrue(p.create_time() >= 0.0) | |
| 323 try: | |
| 324 rss, vms = p.memory_info()[:2] | |
| 325 except psutil.AccessDenied: | |
| 326 # expected on Windows Vista and Windows 7 | |
| 327 if not platform.uname()[1] in ('vista', 'win-7', 'win7'): | |
| 328 raise | |
| 329 else: | |
| 330 self.assertTrue(rss > 0) | |
| 331 | |
| 332 def test_send_signal(self): | |
| 333 p = psutil.Process(self.pid) | |
| 334 self.assertRaises(ValueError, p.send_signal, signal.SIGINT) | |
| 335 | |
| 336 def test_num_handles_increment(self): | |
| 337 p = psutil.Process(os.getpid()) | |
| 338 before = p.num_handles() | |
| 339 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, | |
| 340 win32con.FALSE, os.getpid()) | |
| 341 after = p.num_handles() | |
| 342 self.assertEqual(after, before + 1) | |
| 343 win32api.CloseHandle(handle) | |
| 344 self.assertEqual(p.num_handles(), before) | |
| 345 | |
| 346 def test_handles_leak(self): | |
| 347 # Call all Process methods and make sure no handles are left | |
| 348 # open. This is here mainly to make sure functions using | |
| 349 # OpenProcess() always call CloseHandle(). | |
| 350 def call(p, attr): | |
| 351 attr = getattr(p, name, None) | |
| 352 if attr is not None and callable(attr): | |
| 353 attr() | |
| 354 else: | |
| 355 attr | |
| 356 | |
| 357 p = psutil.Process(self.pid) | |
| 358 failures = [] | |
| 359 for name in dir(psutil.Process): | |
| 360 if name.startswith('_') \ | |
| 361 or name in ('terminate', 'kill', 'suspend', 'resume', | |
| 362 'nice', 'send_signal', 'wait', 'children', | |
| 363 'as_dict', 'memory_info_ex'): | |
| 364 continue | |
| 365 else: | |
| 366 try: | |
| 367 call(p, name) | |
| 368 num1 = p.num_handles() | |
| 369 call(p, name) | |
| 370 num2 = p.num_handles() | |
| 371 except (psutil.NoSuchProcess, psutil.AccessDenied): | |
| 372 pass | |
| 373 else: | |
| 374 if num2 > num1: | |
| 375 fail = \ | |
| 376 "failure while processing Process.%s method " \ | |
| 377 "(before=%s, after=%s)" % (name, num1, num2) | |
| 378 failures.append(fail) | |
| 379 if failures: | |
| 380 self.fail('\n' + '\n'.join(failures)) | |
| 381 | |
| 382 @unittest.skipIf(not sys.version_info >= (2, 7), | |
| 383 "CTRL_* signals not supported") | |
| 384 def test_ctrl_signals(self): | |
| 385 p = psutil.Process(get_test_subprocess().pid) | |
| 386 p.send_signal(signal.CTRL_C_EVENT) | |
| 387 p.send_signal(signal.CTRL_BREAK_EVENT) | |
| 388 p.kill() | |
| 389 p.wait() | |
| 390 self.assertRaises(psutil.NoSuchProcess, | |
| 391 p.send_signal, signal.CTRL_C_EVENT) | |
| 392 self.assertRaises(psutil.NoSuchProcess, | |
| 393 p.send_signal, signal.CTRL_BREAK_EVENT) | |
| 394 | |
| 395 def test_username(self): | |
| 396 self.assertEqual(psutil.Process().username(), | |
| 397 win32api.GetUserNameEx(win32con.NameSamCompatible)) | |
| 398 | |
| 399 def test_cmdline(self): | |
| 400 sys_value = re.sub(' +', ' ', win32api.GetCommandLine()).strip() | |
| 401 psutil_value = ' '.join(psutil.Process().cmdline()) | |
| 402 self.assertEqual(sys_value, psutil_value) | |
| 403 | |
| 404 # XXX - occasional failures | |
| 405 | |
| 406 # def test_cpu_times(self): | |
| 407 # handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, | |
| 408 # win32con.FALSE, os.getpid()) | |
| 409 # self.addCleanup(win32api.CloseHandle, handle) | |
| 410 # sys_value = win32process.GetProcessTimes(handle) | |
| 411 # psutil_value = psutil.Process().cpu_times() | |
| 412 # self.assertAlmostEqual( | |
| 413 # psutil_value.user, sys_value['UserTime'] / 10000000.0, | |
| 414 # delta=0.2) | |
| 415 # self.assertAlmostEqual( | |
| 416 # psutil_value.user, sys_value['KernelTime'] / 10000000.0, | |
| 417 # delta=0.2) | |
| 418 | |
| 419 def test_nice(self): | |
| 420 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, | |
| 421 win32con.FALSE, os.getpid()) | |
| 422 self.addCleanup(win32api.CloseHandle, handle) | |
| 423 sys_value = win32process.GetPriorityClass(handle) | |
| 424 psutil_value = psutil.Process().nice() | |
| 425 self.assertEqual(psutil_value, sys_value) | |
| 426 | |
| 427 def test_memory_info(self): | |
| 428 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, | |
| 429 win32con.FALSE, self.pid) | |
| 430 self.addCleanup(win32api.CloseHandle, handle) | |
| 431 sys_value = win32process.GetProcessMemoryInfo(handle) | |
| 432 psutil_value = psutil.Process(self.pid).memory_info() | |
| 433 self.assertEqual( | |
| 434 sys_value['PeakWorkingSetSize'], psutil_value.peak_wset) | |
| 435 self.assertEqual( | |
| 436 sys_value['WorkingSetSize'], psutil_value.wset) | |
| 437 self.assertEqual( | |
| 438 sys_value['QuotaPeakPagedPoolUsage'], psutil_value.peak_paged_pool) | |
| 439 self.assertEqual( | |
| 440 sys_value['QuotaPagedPoolUsage'], psutil_value.paged_pool) | |
| 441 self.assertEqual( | |
| 442 sys_value['QuotaPeakNonPagedPoolUsage'], | |
| 443 psutil_value.peak_nonpaged_pool) | |
| 444 self.assertEqual( | |
| 445 sys_value['QuotaNonPagedPoolUsage'], psutil_value.nonpaged_pool) | |
| 446 self.assertEqual( | |
| 447 sys_value['PagefileUsage'], psutil_value.pagefile) | |
| 448 self.assertEqual( | |
| 449 sys_value['PeakPagefileUsage'], psutil_value.peak_pagefile) | |
| 450 | |
| 451 self.assertEqual(psutil_value.rss, psutil_value.wset) | |
| 452 self.assertEqual(psutil_value.vms, psutil_value.pagefile) | |
| 453 | |
| 454 def test_wait(self): | |
| 455 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, | |
| 456 win32con.FALSE, self.pid) | |
| 457 self.addCleanup(win32api.CloseHandle, handle) | |
| 458 p = psutil.Process(self.pid) | |
| 459 p.terminate() | |
| 460 psutil_value = p.wait() | |
| 461 sys_value = win32process.GetExitCodeProcess(handle) | |
| 462 self.assertEqual(psutil_value, sys_value) | |
| 463 | |
| 464 def test_cpu_affinity(self): | |
| 465 def from_bitmask(x): | |
| 466 return [i for i in range(64) if (1 << i) & x] | |
| 467 | |
| 468 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, | |
| 469 win32con.FALSE, self.pid) | |
| 470 self.addCleanup(win32api.CloseHandle, handle) | |
| 471 sys_value = from_bitmask( | |
| 472 win32process.GetProcessAffinityMask(handle)[0]) | |
| 473 psutil_value = psutil.Process(self.pid).cpu_affinity() | |
| 474 self.assertEqual(psutil_value, sys_value) | |
| 475 | |
| 476 def test_io_counters(self): | |
| 477 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, | |
| 478 win32con.FALSE, os.getpid()) | |
| 479 self.addCleanup(win32api.CloseHandle, handle) | |
| 480 sys_value = win32process.GetProcessIoCounters(handle) | |
| 481 psutil_value = psutil.Process().io_counters() | |
| 482 self.assertEqual( | |
| 483 psutil_value.read_count, sys_value['ReadOperationCount']) | |
| 484 self.assertEqual( | |
| 485 psutil_value.write_count, sys_value['WriteOperationCount']) | |
| 486 self.assertEqual( | |
| 487 psutil_value.read_bytes, sys_value['ReadTransferCount']) | |
| 488 self.assertEqual( | |
| 489 psutil_value.write_bytes, sys_value['WriteTransferCount']) | |
| 490 self.assertEqual( | |
| 491 psutil_value.other_count, sys_value['OtherOperationCount']) | |
| 492 self.assertEqual( | |
| 493 psutil_value.other_bytes, sys_value['OtherTransferCount']) | |
| 494 | |
| 495 def test_num_handles(self): | |
| 496 import ctypes | |
| 497 import ctypes.wintypes | |
| 498 PROCESS_QUERY_INFORMATION = 0x400 | |
| 499 handle = ctypes.windll.kernel32.OpenProcess( | |
| 500 PROCESS_QUERY_INFORMATION, 0, self.pid) | |
| 501 self.addCleanup(ctypes.windll.kernel32.CloseHandle, handle) | |
| 502 | |
| 503 hndcnt = ctypes.wintypes.DWORD() | |
| 504 ctypes.windll.kernel32.GetProcessHandleCount( | |
| 505 handle, ctypes.byref(hndcnt)) | |
| 506 sys_value = hndcnt.value | |
| 507 psutil_value = psutil.Process(self.pid).num_handles() | |
| 508 self.assertEqual(psutil_value, sys_value) | |
| 509 | |
| 510 def test_error_partial_copy(self): | |
| 511 # https://github.com/giampaolo/psutil/issues/875 | |
| 512 exc = WindowsError() | |
| 513 exc.winerror = 299 | |
| 514 with mock.patch("psutil._psplatform.cext.proc_cwd", side_effect=exc): | |
| 515 with mock.patch("time.sleep") as m: | |
| 516 p = psutil.Process() | |
| 517 self.assertRaises(psutil.AccessDenied, p.cwd) | |
| 518 self.assertGreaterEqual(m.call_count, 5) | |
| 519 | |
| 520 def test_exe(self): | |
| 521 # NtQuerySystemInformation succeeds if process is gone. Make sure | |
| 522 # it raises NSP for a non existent pid. | |
| 523 pid = psutil.pids()[-1] + 99999 | |
| 524 proc = psutil._psplatform.Process(pid) | |
| 525 self.assertRaises(psutil.NoSuchProcess, proc.exe) | |
| 526 | |
| 527 | |
| 528 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
| 529 class TestProcessWMI(TestCase): | |
| 530 """Compare Process API results with WMI.""" | |
| 531 | |
| 532 @classmethod | |
| 533 def setUpClass(cls): | |
| 534 cls.pid = get_test_subprocess().pid | |
| 535 | |
| 536 @classmethod | |
| 537 def tearDownClass(cls): | |
| 538 reap_children() | |
| 539 | |
| 540 def test_name(self): | |
| 541 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
| 542 p = psutil.Process(self.pid) | |
| 543 self.assertEqual(p.name(), w.Caption) | |
| 544 | |
| 545 def test_exe(self): | |
| 546 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
| 547 p = psutil.Process(self.pid) | |
| 548 # Note: wmi reports the exe as a lower case string. | |
| 549 # Being Windows paths case-insensitive we ignore that. | |
| 550 self.assertEqual(p.exe().lower(), w.ExecutablePath.lower()) | |
| 551 | |
| 552 def test_cmdline(self): | |
| 553 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
| 554 p = psutil.Process(self.pid) | |
| 555 self.assertEqual(' '.join(p.cmdline()), | |
| 556 w.CommandLine.replace('"', '')) | |
| 557 | |
| 558 def test_username(self): | |
| 559 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
| 560 p = psutil.Process(self.pid) | |
| 561 domain, _, username = w.GetOwner() | |
| 562 username = "%s\\%s" % (domain, username) | |
| 563 self.assertEqual(p.username(), username) | |
| 564 | |
| 565 def test_memory_rss(self): | |
| 566 time.sleep(0.1) | |
| 567 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
| 568 p = psutil.Process(self.pid) | |
| 569 rss = p.memory_info().rss | |
| 570 self.assertEqual(rss, int(w.WorkingSetSize)) | |
| 571 | |
| 572 def test_memory_vms(self): | |
| 573 time.sleep(0.1) | |
| 574 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
| 575 p = psutil.Process(self.pid) | |
| 576 vms = p.memory_info().vms | |
| 577 # http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx | |
| 578 # ...claims that PageFileUsage is represented in Kilo | |
| 579 # bytes but funnily enough on certain platforms bytes are | |
| 580 # returned instead. | |
| 581 wmi_usage = int(w.PageFileUsage) | |
| 582 if (vms != wmi_usage) and (vms != wmi_usage * 1024): | |
| 583 self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms)) | |
| 584 | |
| 585 def test_create_time(self): | |
| 586 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
| 587 p = psutil.Process(self.pid) | |
| 588 wmic_create = str(w.CreationDate.split('.')[0]) | |
| 589 psutil_create = time.strftime("%Y%m%d%H%M%S", | |
| 590 time.localtime(p.create_time())) | |
| 591 self.assertEqual(wmic_create, psutil_create) | |
| 592 | |
| 593 | |
| 594 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
| 595 class TestDualProcessImplementation(TestCase): | |
| 596 """ | |
| 597 Certain APIs on Windows have 2 internal implementations, one | |
| 598 based on documented Windows APIs, another one based | |
| 599 NtQuerySystemInformation() which gets called as fallback in | |
| 600 case the first fails because of limited permission error. | |
| 601 Here we test that the two methods return the exact same value, | |
| 602 see: | |
| 603 https://github.com/giampaolo/psutil/issues/304 | |
| 604 """ | |
| 605 | |
| 606 @classmethod | |
| 607 def setUpClass(cls): | |
| 608 cls.pid = get_test_subprocess().pid | |
| 609 | |
| 610 @classmethod | |
| 611 def tearDownClass(cls): | |
| 612 reap_children() | |
| 613 | |
| 614 def test_memory_info(self): | |
| 615 mem_1 = psutil.Process(self.pid).memory_info() | |
| 616 with mock.patch("psutil._psplatform.cext.proc_memory_info", | |
| 617 side_effect=OSError(errno.EPERM, "msg")) as fun: | |
| 618 mem_2 = psutil.Process(self.pid).memory_info() | |
| 619 self.assertEqual(len(mem_1), len(mem_2)) | |
| 620 for i in range(len(mem_1)): | |
| 621 self.assertGreaterEqual(mem_1[i], 0) | |
| 622 self.assertGreaterEqual(mem_2[i], 0) | |
| 623 self.assertAlmostEqual(mem_1[i], mem_2[i], delta=512) | |
| 624 assert fun.called | |
| 625 | |
| 626 def test_create_time(self): | |
| 627 ctime = psutil.Process(self.pid).create_time() | |
| 628 with mock.patch("psutil._psplatform.cext.proc_times", | |
| 629 side_effect=OSError(errno.EPERM, "msg")) as fun: | |
| 630 self.assertEqual(psutil.Process(self.pid).create_time(), ctime) | |
| 631 assert fun.called | |
| 632 | |
| 633 def test_cpu_times(self): | |
| 634 cpu_times_1 = psutil.Process(self.pid).cpu_times() | |
| 635 with mock.patch("psutil._psplatform.cext.proc_times", | |
| 636 side_effect=OSError(errno.EPERM, "msg")) as fun: | |
| 637 cpu_times_2 = psutil.Process(self.pid).cpu_times() | |
| 638 assert fun.called | |
| 639 self.assertAlmostEqual( | |
| 640 cpu_times_1.user, cpu_times_2.user, delta=0.01) | |
| 641 self.assertAlmostEqual( | |
| 642 cpu_times_1.system, cpu_times_2.system, delta=0.01) | |
| 643 | |
| 644 def test_io_counters(self): | |
| 645 io_counters_1 = psutil.Process(self.pid).io_counters() | |
| 646 with mock.patch("psutil._psplatform.cext.proc_io_counters", | |
| 647 side_effect=OSError(errno.EPERM, "msg")) as fun: | |
| 648 io_counters_2 = psutil.Process(self.pid).io_counters() | |
| 649 for i in range(len(io_counters_1)): | |
| 650 self.assertAlmostEqual( | |
| 651 io_counters_1[i], io_counters_2[i], delta=5) | |
| 652 assert fun.called | |
| 653 | |
| 654 def test_num_handles(self): | |
| 655 num_handles = psutil.Process(self.pid).num_handles() | |
| 656 with mock.patch("psutil._psplatform.cext.proc_num_handles", | |
| 657 side_effect=OSError(errno.EPERM, "msg")) as fun: | |
| 658 self.assertEqual(psutil.Process(self.pid).num_handles(), | |
| 659 num_handles) | |
| 660 assert fun.called | |
| 661 | |
| 662 def test_cmdline(self): | |
| 663 from psutil._pswindows import convert_oserror | |
| 664 for pid in psutil.pids(): | |
| 665 try: | |
| 666 a = cext.proc_cmdline(pid, use_peb=True) | |
| 667 b = cext.proc_cmdline(pid, use_peb=False) | |
| 668 except OSError as err: | |
| 669 err = convert_oserror(err) | |
| 670 if not isinstance(err, (psutil.AccessDenied, | |
| 671 psutil.NoSuchProcess)): | |
| 672 raise | |
| 673 else: | |
| 674 self.assertEqual(a, b) | |
| 675 | |
| 676 | |
| 677 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
| 678 class RemoteProcessTestCase(TestCase): | |
| 679 """Certain functions require calling ReadProcessMemory. | |
| 680 This trivially works when called on the current process. | |
| 681 Check that this works on other processes, especially when they | |
| 682 have a different bitness. | |
| 683 """ | |
| 684 | |
| 685 @staticmethod | |
| 686 def find_other_interpreter(): | |
| 687 # find a python interpreter that is of the opposite bitness from us | |
| 688 code = "import sys; sys.stdout.write(str(sys.maxsize > 2**32))" | |
| 689 | |
| 690 # XXX: a different and probably more stable approach might be to access | |
| 691 # the registry but accessing 64 bit paths from a 32 bit process | |
| 692 for filename in glob.glob(r"C:\Python*\python.exe"): | |
| 693 proc = subprocess.Popen(args=[filename, "-c", code], | |
| 694 stdout=subprocess.PIPE, | |
| 695 stderr=subprocess.STDOUT) | |
| 696 output, _ = proc.communicate() | |
| 697 if output == str(not IS_64_BIT): | |
| 698 return filename | |
| 699 | |
| 700 @classmethod | |
| 701 def setUpClass(cls): | |
| 702 other_python = cls.find_other_interpreter() | |
| 703 | |
| 704 if other_python is None: | |
| 705 raise unittest.SkipTest( | |
| 706 "could not find interpreter with opposite bitness") | |
| 707 | |
| 708 if IS_64_BIT: | |
| 709 cls.python64 = sys.executable | |
| 710 cls.python32 = other_python | |
| 711 else: | |
| 712 cls.python64 = other_python | |
| 713 cls.python32 = sys.executable | |
| 714 | |
| 715 test_args = ["-c", "import sys; sys.stdin.read()"] | |
| 716 | |
| 717 def setUp(self): | |
| 718 env = os.environ.copy() | |
| 719 env["THINK_OF_A_NUMBER"] = str(os.getpid()) | |
| 720 self.proc32 = get_test_subprocess([self.python32] + self.test_args, | |
| 721 env=env, | |
| 722 stdin=subprocess.PIPE) | |
| 723 self.proc64 = get_test_subprocess([self.python64] + self.test_args, | |
| 724 env=env, | |
| 725 stdin=subprocess.PIPE) | |
| 726 | |
| 727 def tearDown(self): | |
| 728 self.proc32.communicate() | |
| 729 self.proc64.communicate() | |
| 730 reap_children() | |
| 731 | |
| 732 @classmethod | |
| 733 def tearDownClass(cls): | |
| 734 reap_children() | |
| 735 | |
| 736 def test_cmdline_32(self): | |
| 737 p = psutil.Process(self.proc32.pid) | |
| 738 self.assertEqual(len(p.cmdline()), 3) | |
| 739 self.assertEqual(p.cmdline()[1:], self.test_args) | |
| 740 | |
| 741 def test_cmdline_64(self): | |
| 742 p = psutil.Process(self.proc64.pid) | |
| 743 self.assertEqual(len(p.cmdline()), 3) | |
| 744 self.assertEqual(p.cmdline()[1:], self.test_args) | |
| 745 | |
| 746 def test_cwd_32(self): | |
| 747 p = psutil.Process(self.proc32.pid) | |
| 748 self.assertEqual(p.cwd(), os.getcwd()) | |
| 749 | |
| 750 def test_cwd_64(self): | |
| 751 p = psutil.Process(self.proc64.pid) | |
| 752 self.assertEqual(p.cwd(), os.getcwd()) | |
| 753 | |
| 754 def test_environ_32(self): | |
| 755 p = psutil.Process(self.proc32.pid) | |
| 756 e = p.environ() | |
| 757 self.assertIn("THINK_OF_A_NUMBER", e) | |
| 758 self.assertEquals(e["THINK_OF_A_NUMBER"], str(os.getpid())) | |
| 759 | |
| 760 def test_environ_64(self): | |
| 761 p = psutil.Process(self.proc64.pid) | |
| 762 try: | |
| 763 p.environ() | |
| 764 except psutil.AccessDenied: | |
| 765 pass | |
| 766 | |
| 767 | |
| 768 # =================================================================== | |
| 769 # Windows services | |
| 770 # =================================================================== | |
| 771 | |
| 772 | |
| 773 @unittest.skipIf(not WINDOWS, "WINDOWS only") | |
| 774 class TestServices(TestCase): | |
| 775 | |
| 776 def test_win_service_iter(self): | |
| 777 valid_statuses = set([ | |
| 778 "running", | |
| 779 "paused", | |
| 780 "start", | |
| 781 "pause", | |
| 782 "continue", | |
| 783 "stop", | |
| 784 "stopped", | |
| 785 ]) | |
| 786 valid_start_types = set([ | |
| 787 "automatic", | |
| 788 "manual", | |
| 789 "disabled", | |
| 790 ]) | |
| 791 valid_statuses = set([ | |
| 792 "running", | |
| 793 "paused", | |
| 794 "start_pending", | |
| 795 "pause_pending", | |
| 796 "continue_pending", | |
| 797 "stop_pending", | |
| 798 "stopped" | |
| 799 ]) | |
| 800 for serv in psutil.win_service_iter(): | |
| 801 data = serv.as_dict() | |
| 802 self.assertIsInstance(data['name'], str) | |
| 803 self.assertNotEqual(data['name'].strip(), "") | |
| 804 self.assertIsInstance(data['display_name'], str) | |
| 805 self.assertIsInstance(data['username'], str) | |
| 806 self.assertIn(data['status'], valid_statuses) | |
| 807 if data['pid'] is not None: | |
| 808 psutil.Process(data['pid']) | |
| 809 self.assertIsInstance(data['binpath'], str) | |
| 810 self.assertIsInstance(data['username'], str) | |
| 811 self.assertIsInstance(data['start_type'], str) | |
| 812 self.assertIn(data['start_type'], valid_start_types) | |
| 813 self.assertIn(data['status'], valid_statuses) | |
| 814 self.assertIsInstance(data['description'], str) | |
| 815 pid = serv.pid() | |
| 816 if pid is not None: | |
| 817 p = psutil.Process(pid) | |
| 818 self.assertTrue(p.is_running()) | |
| 819 # win_service_get | |
| 820 s = psutil.win_service_get(serv.name()) | |
| 821 # test __eq__ | |
| 822 self.assertEqual(serv, s) | |
| 823 | |
| 824 def test_win_service_get(self): | |
| 825 ERROR_SERVICE_DOES_NOT_EXIST = \ | |
| 826 psutil._psplatform.cext.ERROR_SERVICE_DOES_NOT_EXIST | |
| 827 ERROR_ACCESS_DENIED = psutil._psplatform.cext.ERROR_ACCESS_DENIED | |
| 828 | |
| 829 name = next(psutil.win_service_iter()).name() | |
| 830 with self.assertRaises(psutil.NoSuchProcess) as cm: | |
| 831 psutil.win_service_get(name + '???') | |
| 832 self.assertEqual(cm.exception.name, name + '???') | |
| 833 | |
| 834 # test NoSuchProcess | |
| 835 service = psutil.win_service_get(name) | |
| 836 if PY3: | |
| 837 args = (0, "msg", 0, ERROR_SERVICE_DOES_NOT_EXIST) | |
| 838 else: | |
| 839 args = (ERROR_SERVICE_DOES_NOT_EXIST, "msg") | |
| 840 exc = WindowsError(*args) | |
| 841 with mock.patch("psutil._psplatform.cext.winservice_query_status", | |
| 842 side_effect=exc): | |
| 843 self.assertRaises(psutil.NoSuchProcess, service.status) | |
| 844 with mock.patch("psutil._psplatform.cext.winservice_query_config", | |
| 845 side_effect=exc): | |
| 846 self.assertRaises(psutil.NoSuchProcess, service.username) | |
| 847 | |
| 848 # test AccessDenied | |
| 849 if PY3: | |
| 850 args = (0, "msg", 0, ERROR_ACCESS_DENIED) | |
| 851 else: | |
| 852 args = (ERROR_ACCESS_DENIED, "msg") | |
| 853 exc = WindowsError(*args) | |
| 854 with mock.patch("psutil._psplatform.cext.winservice_query_status", | |
| 855 side_effect=exc): | |
| 856 self.assertRaises(psutil.AccessDenied, service.status) | |
| 857 with mock.patch("psutil._psplatform.cext.winservice_query_config", | |
| 858 side_effect=exc): | |
| 859 self.assertRaises(psutil.AccessDenied, service.username) | |
| 860 | |
| 861 # test __str__ and __repr__ | |
| 862 self.assertIn(service.name(), str(service)) | |
| 863 self.assertIn(service.display_name(), str(service)) | |
| 864 self.assertIn(service.name(), repr(service)) | |
| 865 self.assertIn(service.display_name(), repr(service)) | |
| 866 | |
| 867 | |
| 868 if __name__ == '__main__': | |
| 869 from psutil.tests.runner import run | |
| 870 run(__file__) |
