Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/psutil/tests/test_osx.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
| author | shellac |
|---|---|
| date | Mon, 01 Jun 2020 08:59:25 -0400 |
| parents | 79f47841a781 |
| children |
comparison
equal
deleted
inserted
replaced
| 4:79f47841a781 | 5:9b1c78e6ba9c |
|---|---|
| 1 #!/usr/bin/env python3 | |
| 2 | |
| 3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. | |
| 4 # Use of this source code is governed by a BSD-style license that can be | |
| 5 # found in the LICENSE file. | |
| 6 | |
| 7 """MACOS specific tests.""" | |
| 8 | |
| 9 import os | |
| 10 import re | |
| 11 import time | |
| 12 | |
| 13 import psutil | |
| 14 from psutil import MACOS | |
| 15 from psutil.tests import create_zombie_proc | |
| 16 from psutil.tests import get_test_subprocess | |
| 17 from psutil.tests import HAS_BATTERY | |
| 18 from psutil.tests import MEMORY_TOLERANCE | |
| 19 from psutil.tests import reap_children | |
| 20 from psutil.tests import retry_on_failure | |
| 21 from psutil.tests import sh | |
| 22 from psutil.tests import unittest | |
| 23 | |
| 24 | |
| 25 PAGESIZE = os.sysconf("SC_PAGE_SIZE") if MACOS else None | |
| 26 | |
| 27 | |
| 28 def sysctl(cmdline): | |
| 29 """Expects a sysctl command with an argument and parse the result | |
| 30 returning only the value of interest. | |
| 31 """ | |
| 32 out = sh(cmdline) | |
| 33 result = out.split()[1] | |
| 34 try: | |
| 35 return int(result) | |
| 36 except ValueError: | |
| 37 return result | |
| 38 | |
| 39 | |
| 40 def vm_stat(field): | |
| 41 """Wrapper around 'vm_stat' cmdline utility.""" | |
| 42 out = sh('vm_stat') | |
| 43 for line in out.split('\n'): | |
| 44 if field in line: | |
| 45 break | |
| 46 else: | |
| 47 raise ValueError("line not found") | |
| 48 return int(re.search(r'\d+', line).group(0)) * PAGESIZE | |
| 49 | |
| 50 | |
| 51 # http://code.activestate.com/recipes/578019/ | |
| 52 def human2bytes(s): | |
| 53 SYMBOLS = { | |
| 54 'customary': ('B', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y'), | |
| 55 } | |
| 56 init = s | |
| 57 num = "" | |
| 58 while s and s[0:1].isdigit() or s[0:1] == '.': | |
| 59 num += s[0] | |
| 60 s = s[1:] | |
| 61 num = float(num) | |
| 62 letter = s.strip() | |
| 63 for name, sset in SYMBOLS.items(): | |
| 64 if letter in sset: | |
| 65 break | |
| 66 else: | |
| 67 if letter == 'k': | |
| 68 sset = SYMBOLS['customary'] | |
| 69 letter = letter.upper() | |
| 70 else: | |
| 71 raise ValueError("can't interpret %r" % init) | |
| 72 prefix = {sset[0]: 1} | |
| 73 for i, s in enumerate(sset[1:]): | |
| 74 prefix[s] = 1 << (i + 1) * 10 | |
| 75 return int(num * prefix[letter]) | |
| 76 | |
| 77 | |
| 78 @unittest.skipIf(not MACOS, "MACOS only") | |
| 79 class TestProcess(unittest.TestCase): | |
| 80 | |
| 81 @classmethod | |
| 82 def setUpClass(cls): | |
| 83 cls.pid = get_test_subprocess().pid | |
| 84 | |
| 85 @classmethod | |
| 86 def tearDownClass(cls): | |
| 87 reap_children() | |
| 88 | |
| 89 def test_process_create_time(self): | |
| 90 output = sh("ps -o lstart -p %s" % self.pid) | |
| 91 start_ps = output.replace('STARTED', '').strip() | |
| 92 hhmmss = start_ps.split(' ')[-2] | |
| 93 year = start_ps.split(' ')[-1] | |
| 94 start_psutil = psutil.Process(self.pid).create_time() | |
| 95 self.assertEqual( | |
| 96 hhmmss, | |
| 97 time.strftime("%H:%M:%S", time.localtime(start_psutil))) | |
| 98 self.assertEqual( | |
| 99 year, | |
| 100 time.strftime("%Y", time.localtime(start_psutil))) | |
| 101 | |
| 102 | |
| 103 @unittest.skipIf(not MACOS, "MACOS only") | |
| 104 class TestZombieProcessAPIs(unittest.TestCase): | |
| 105 | |
| 106 @classmethod | |
| 107 def setUpClass(cls): | |
| 108 zpid = create_zombie_proc() | |
| 109 cls.p = psutil.Process(zpid) | |
| 110 | |
| 111 @classmethod | |
| 112 def tearDownClass(cls): | |
| 113 reap_children(recursive=True) | |
| 114 | |
| 115 def test_pidtask_info(self): | |
| 116 self.assertEqual(self.p.status(), psutil.STATUS_ZOMBIE) | |
| 117 self.p.ppid() | |
| 118 self.p.uids() | |
| 119 self.p.gids() | |
| 120 self.p.terminal() | |
| 121 self.p.create_time() | |
| 122 | |
| 123 def test_exe(self): | |
| 124 self.assertRaises(psutil.ZombieProcess, self.p.exe) | |
| 125 | |
| 126 def test_cmdline(self): | |
| 127 self.assertRaises(psutil.ZombieProcess, self.p.cmdline) | |
| 128 | |
| 129 def test_environ(self): | |
| 130 self.assertRaises(psutil.ZombieProcess, self.p.environ) | |
| 131 | |
| 132 def test_cwd(self): | |
| 133 self.assertRaises(psutil.ZombieProcess, self.p.cwd) | |
| 134 | |
| 135 def test_memory_full_info(self): | |
| 136 self.assertRaises(psutil.ZombieProcess, self.p.memory_full_info) | |
| 137 | |
| 138 def test_cpu_times(self): | |
| 139 self.assertRaises(psutil.ZombieProcess, self.p.cpu_times) | |
| 140 | |
| 141 def test_num_ctx_switches(self): | |
| 142 self.assertRaises(psutil.ZombieProcess, self.p.num_ctx_switches) | |
| 143 | |
| 144 def test_num_threads(self): | |
| 145 self.assertRaises(psutil.ZombieProcess, self.p.num_threads) | |
| 146 | |
| 147 def test_open_files(self): | |
| 148 self.assertRaises(psutil.ZombieProcess, self.p.open_files) | |
| 149 | |
| 150 def test_connections(self): | |
| 151 self.assertRaises(psutil.ZombieProcess, self.p.connections) | |
| 152 | |
| 153 def test_num_fds(self): | |
| 154 self.assertRaises(psutil.ZombieProcess, self.p.num_fds) | |
| 155 | |
| 156 def test_threads(self): | |
| 157 self.assertRaises((psutil.ZombieProcess, psutil.AccessDenied), | |
| 158 self.p.threads) | |
| 159 | |
| 160 | |
| 161 @unittest.skipIf(not MACOS, "MACOS only") | |
| 162 class TestSystemAPIs(unittest.TestCase): | |
| 163 | |
| 164 # --- disk | |
| 165 | |
| 166 def test_disks(self): | |
| 167 # test psutil.disk_usage() and psutil.disk_partitions() | |
| 168 # against "df -a" | |
| 169 def df(path): | |
| 170 out = sh('df -k "%s"' % path).strip() | |
| 171 lines = out.split('\n') | |
| 172 lines.pop(0) | |
| 173 line = lines.pop(0) | |
| 174 dev, total, used, free = line.split()[:4] | |
| 175 if dev == 'none': | |
| 176 dev = '' | |
| 177 total = int(total) * 1024 | |
| 178 used = int(used) * 1024 | |
| 179 free = int(free) * 1024 | |
| 180 return dev, total, used, free | |
| 181 | |
| 182 for part in psutil.disk_partitions(all=False): | |
| 183 usage = psutil.disk_usage(part.mountpoint) | |
| 184 dev, total, used, free = df(part.mountpoint) | |
| 185 self.assertEqual(part.device, dev) | |
| 186 self.assertEqual(usage.total, total) | |
| 187 # 10 MB tollerance | |
| 188 if abs(usage.free - free) > 10 * 1024 * 1024: | |
| 189 self.fail("psutil=%s, df=%s" % usage.free, free) | |
| 190 if abs(usage.used - used) > 10 * 1024 * 1024: | |
| 191 self.fail("psutil=%s, df=%s" % usage.used, used) | |
| 192 | |
| 193 # --- cpu | |
| 194 | |
| 195 def test_cpu_count_logical(self): | |
| 196 num = sysctl("sysctl hw.logicalcpu") | |
| 197 self.assertEqual(num, psutil.cpu_count(logical=True)) | |
| 198 | |
| 199 def test_cpu_count_physical(self): | |
| 200 num = sysctl("sysctl hw.physicalcpu") | |
| 201 self.assertEqual(num, psutil.cpu_count(logical=False)) | |
| 202 | |
| 203 def test_cpu_freq(self): | |
| 204 freq = psutil.cpu_freq() | |
| 205 self.assertEqual( | |
| 206 freq.current * 1000 * 1000, sysctl("sysctl hw.cpufrequency")) | |
| 207 self.assertEqual( | |
| 208 freq.min * 1000 * 1000, sysctl("sysctl hw.cpufrequency_min")) | |
| 209 self.assertEqual( | |
| 210 freq.max * 1000 * 1000, sysctl("sysctl hw.cpufrequency_max")) | |
| 211 | |
| 212 # --- virtual mem | |
| 213 | |
| 214 def test_vmem_total(self): | |
| 215 sysctl_hwphymem = sysctl('sysctl hw.memsize') | |
| 216 self.assertEqual(sysctl_hwphymem, psutil.virtual_memory().total) | |
| 217 | |
| 218 @retry_on_failure() | |
| 219 def test_vmem_free(self): | |
| 220 vmstat_val = vm_stat("free") | |
| 221 psutil_val = psutil.virtual_memory().free | |
| 222 self.assertAlmostEqual(psutil_val, vmstat_val, delta=MEMORY_TOLERANCE) | |
| 223 | |
| 224 @retry_on_failure() | |
| 225 def test_vmem_active(self): | |
| 226 vmstat_val = vm_stat("active") | |
| 227 psutil_val = psutil.virtual_memory().active | |
| 228 self.assertAlmostEqual(psutil_val, vmstat_val, delta=MEMORY_TOLERANCE) | |
| 229 | |
| 230 @retry_on_failure() | |
| 231 def test_vmem_inactive(self): | |
| 232 vmstat_val = vm_stat("inactive") | |
| 233 psutil_val = psutil.virtual_memory().inactive | |
| 234 self.assertAlmostEqual(psutil_val, vmstat_val, delta=MEMORY_TOLERANCE) | |
| 235 | |
| 236 @retry_on_failure() | |
| 237 def test_vmem_wired(self): | |
| 238 vmstat_val = vm_stat("wired") | |
| 239 psutil_val = psutil.virtual_memory().wired | |
| 240 self.assertAlmostEqual(psutil_val, vmstat_val, delta=MEMORY_TOLERANCE) | |
| 241 | |
| 242 # --- swap mem | |
| 243 | |
| 244 @retry_on_failure() | |
| 245 def test_swapmem_sin(self): | |
| 246 vmstat_val = vm_stat("Pageins") | |
| 247 psutil_val = psutil.swap_memory().sin | |
| 248 self.assertEqual(psutil_val, vmstat_val) | |
| 249 | |
| 250 @retry_on_failure() | |
| 251 def test_swapmem_sout(self): | |
| 252 vmstat_val = vm_stat("Pageout") | |
| 253 psutil_val = psutil.swap_memory().sout | |
| 254 self.assertEqual(psutil_val, vmstat_val) | |
| 255 | |
| 256 # Not very reliable. | |
| 257 # def test_swapmem_total(self): | |
| 258 # out = sh('sysctl vm.swapusage') | |
| 259 # out = out.replace('vm.swapusage: ', '') | |
| 260 # total, used, free = re.findall('\d+.\d+\w', out) | |
| 261 # psutil_smem = psutil.swap_memory() | |
| 262 # self.assertEqual(psutil_smem.total, human2bytes(total)) | |
| 263 # self.assertEqual(psutil_smem.used, human2bytes(used)) | |
| 264 # self.assertEqual(psutil_smem.free, human2bytes(free)) | |
| 265 | |
| 266 # --- network | |
| 267 | |
| 268 def test_net_if_stats(self): | |
| 269 for name, stats in psutil.net_if_stats().items(): | |
| 270 try: | |
| 271 out = sh("ifconfig %s" % name) | |
| 272 except RuntimeError: | |
| 273 pass | |
| 274 else: | |
| 275 self.assertEqual(stats.isup, 'RUNNING' in out, msg=out) | |
| 276 self.assertEqual(stats.mtu, | |
| 277 int(re.findall(r'mtu (\d+)', out)[0])) | |
| 278 | |
| 279 # --- sensors_battery | |
| 280 | |
| 281 @unittest.skipIf(not HAS_BATTERY, "no battery") | |
| 282 def test_sensors_battery(self): | |
| 283 out = sh("pmset -g batt") | |
| 284 percent = re.search(r"(\d+)%", out).group(1) | |
| 285 drawing_from = re.search("Now drawing from '([^']+)'", out).group(1) | |
| 286 power_plugged = drawing_from == "AC Power" | |
| 287 psutil_result = psutil.sensors_battery() | |
| 288 self.assertEqual(psutil_result.power_plugged, power_plugged) | |
| 289 self.assertEqual(psutil_result.percent, int(percent)) | |
| 290 | |
| 291 | |
| 292 if __name__ == '__main__': | |
| 293 from psutil.tests.runner import run | |
| 294 run(__file__) |
