comparison env/lib/python3.7/site-packages/psutil/tests/test_osx.py @ 5:9b1c78e6ba9c draft default tip

"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author shellac
date Mon, 01 Jun 2020 08:59:25 -0400
parents 79f47841a781
children
comparison
equal deleted inserted replaced
4:79f47841a781 5:9b1c78e6ba9c
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
4 # Use of this source code is governed by a BSD-style license that can be
5 # found in the LICENSE file.
6
7 """MACOS specific tests."""
8
9 import os
10 import re
11 import time
12
13 import psutil
14 from psutil import MACOS
15 from psutil.tests import create_zombie_proc
16 from psutil.tests import get_test_subprocess
17 from psutil.tests import HAS_BATTERY
18 from psutil.tests import MEMORY_TOLERANCE
19 from psutil.tests import reap_children
20 from psutil.tests import retry_on_failure
21 from psutil.tests import sh
22 from psutil.tests import unittest
23
24
25 PAGESIZE = os.sysconf("SC_PAGE_SIZE") if MACOS else None
26
27
28 def sysctl(cmdline):
29 """Expects a sysctl command with an argument and parse the result
30 returning only the value of interest.
31 """
32 out = sh(cmdline)
33 result = out.split()[1]
34 try:
35 return int(result)
36 except ValueError:
37 return result
38
39
40 def vm_stat(field):
41 """Wrapper around 'vm_stat' cmdline utility."""
42 out = sh('vm_stat')
43 for line in out.split('\n'):
44 if field in line:
45 break
46 else:
47 raise ValueError("line not found")
48 return int(re.search(r'\d+', line).group(0)) * PAGESIZE
49
50
51 # http://code.activestate.com/recipes/578019/
52 def human2bytes(s):
53 SYMBOLS = {
54 'customary': ('B', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y'),
55 }
56 init = s
57 num = ""
58 while s and s[0:1].isdigit() or s[0:1] == '.':
59 num += s[0]
60 s = s[1:]
61 num = float(num)
62 letter = s.strip()
63 for name, sset in SYMBOLS.items():
64 if letter in sset:
65 break
66 else:
67 if letter == 'k':
68 sset = SYMBOLS['customary']
69 letter = letter.upper()
70 else:
71 raise ValueError("can't interpret %r" % init)
72 prefix = {sset[0]: 1}
73 for i, s in enumerate(sset[1:]):
74 prefix[s] = 1 << (i + 1) * 10
75 return int(num * prefix[letter])
76
77
78 @unittest.skipIf(not MACOS, "MACOS only")
79 class TestProcess(unittest.TestCase):
80
81 @classmethod
82 def setUpClass(cls):
83 cls.pid = get_test_subprocess().pid
84
85 @classmethod
86 def tearDownClass(cls):
87 reap_children()
88
89 def test_process_create_time(self):
90 output = sh("ps -o lstart -p %s" % self.pid)
91 start_ps = output.replace('STARTED', '').strip()
92 hhmmss = start_ps.split(' ')[-2]
93 year = start_ps.split(' ')[-1]
94 start_psutil = psutil.Process(self.pid).create_time()
95 self.assertEqual(
96 hhmmss,
97 time.strftime("%H:%M:%S", time.localtime(start_psutil)))
98 self.assertEqual(
99 year,
100 time.strftime("%Y", time.localtime(start_psutil)))
101
102
103 @unittest.skipIf(not MACOS, "MACOS only")
104 class TestZombieProcessAPIs(unittest.TestCase):
105
106 @classmethod
107 def setUpClass(cls):
108 zpid = create_zombie_proc()
109 cls.p = psutil.Process(zpid)
110
111 @classmethod
112 def tearDownClass(cls):
113 reap_children(recursive=True)
114
115 def test_pidtask_info(self):
116 self.assertEqual(self.p.status(), psutil.STATUS_ZOMBIE)
117 self.p.ppid()
118 self.p.uids()
119 self.p.gids()
120 self.p.terminal()
121 self.p.create_time()
122
123 def test_exe(self):
124 self.assertRaises(psutil.ZombieProcess, self.p.exe)
125
126 def test_cmdline(self):
127 self.assertRaises(psutil.ZombieProcess, self.p.cmdline)
128
129 def test_environ(self):
130 self.assertRaises(psutil.ZombieProcess, self.p.environ)
131
132 def test_cwd(self):
133 self.assertRaises(psutil.ZombieProcess, self.p.cwd)
134
135 def test_memory_full_info(self):
136 self.assertRaises(psutil.ZombieProcess, self.p.memory_full_info)
137
138 def test_cpu_times(self):
139 self.assertRaises(psutil.ZombieProcess, self.p.cpu_times)
140
141 def test_num_ctx_switches(self):
142 self.assertRaises(psutil.ZombieProcess, self.p.num_ctx_switches)
143
144 def test_num_threads(self):
145 self.assertRaises(psutil.ZombieProcess, self.p.num_threads)
146
147 def test_open_files(self):
148 self.assertRaises(psutil.ZombieProcess, self.p.open_files)
149
150 def test_connections(self):
151 self.assertRaises(psutil.ZombieProcess, self.p.connections)
152
153 def test_num_fds(self):
154 self.assertRaises(psutil.ZombieProcess, self.p.num_fds)
155
156 def test_threads(self):
157 self.assertRaises((psutil.ZombieProcess, psutil.AccessDenied),
158 self.p.threads)
159
160
161 @unittest.skipIf(not MACOS, "MACOS only")
162 class TestSystemAPIs(unittest.TestCase):
163
164 # --- disk
165
166 def test_disks(self):
167 # test psutil.disk_usage() and psutil.disk_partitions()
168 # against "df -a"
169 def df(path):
170 out = sh('df -k "%s"' % path).strip()
171 lines = out.split('\n')
172 lines.pop(0)
173 line = lines.pop(0)
174 dev, total, used, free = line.split()[:4]
175 if dev == 'none':
176 dev = ''
177 total = int(total) * 1024
178 used = int(used) * 1024
179 free = int(free) * 1024
180 return dev, total, used, free
181
182 for part in psutil.disk_partitions(all=False):
183 usage = psutil.disk_usage(part.mountpoint)
184 dev, total, used, free = df(part.mountpoint)
185 self.assertEqual(part.device, dev)
186 self.assertEqual(usage.total, total)
187 # 10 MB tollerance
188 if abs(usage.free - free) > 10 * 1024 * 1024:
189 self.fail("psutil=%s, df=%s" % usage.free, free)
190 if abs(usage.used - used) > 10 * 1024 * 1024:
191 self.fail("psutil=%s, df=%s" % usage.used, used)
192
193 # --- cpu
194
195 def test_cpu_count_logical(self):
196 num = sysctl("sysctl hw.logicalcpu")
197 self.assertEqual(num, psutil.cpu_count(logical=True))
198
199 def test_cpu_count_physical(self):
200 num = sysctl("sysctl hw.physicalcpu")
201 self.assertEqual(num, psutil.cpu_count(logical=False))
202
203 def test_cpu_freq(self):
204 freq = psutil.cpu_freq()
205 self.assertEqual(
206 freq.current * 1000 * 1000, sysctl("sysctl hw.cpufrequency"))
207 self.assertEqual(
208 freq.min * 1000 * 1000, sysctl("sysctl hw.cpufrequency_min"))
209 self.assertEqual(
210 freq.max * 1000 * 1000, sysctl("sysctl hw.cpufrequency_max"))
211
212 # --- virtual mem
213
214 def test_vmem_total(self):
215 sysctl_hwphymem = sysctl('sysctl hw.memsize')
216 self.assertEqual(sysctl_hwphymem, psutil.virtual_memory().total)
217
218 @retry_on_failure()
219 def test_vmem_free(self):
220 vmstat_val = vm_stat("free")
221 psutil_val = psutil.virtual_memory().free
222 self.assertAlmostEqual(psutil_val, vmstat_val, delta=MEMORY_TOLERANCE)
223
224 @retry_on_failure()
225 def test_vmem_active(self):
226 vmstat_val = vm_stat("active")
227 psutil_val = psutil.virtual_memory().active
228 self.assertAlmostEqual(psutil_val, vmstat_val, delta=MEMORY_TOLERANCE)
229
230 @retry_on_failure()
231 def test_vmem_inactive(self):
232 vmstat_val = vm_stat("inactive")
233 psutil_val = psutil.virtual_memory().inactive
234 self.assertAlmostEqual(psutil_val, vmstat_val, delta=MEMORY_TOLERANCE)
235
236 @retry_on_failure()
237 def test_vmem_wired(self):
238 vmstat_val = vm_stat("wired")
239 psutil_val = psutil.virtual_memory().wired
240 self.assertAlmostEqual(psutil_val, vmstat_val, delta=MEMORY_TOLERANCE)
241
242 # --- swap mem
243
244 @retry_on_failure()
245 def test_swapmem_sin(self):
246 vmstat_val = vm_stat("Pageins")
247 psutil_val = psutil.swap_memory().sin
248 self.assertEqual(psutil_val, vmstat_val)
249
250 @retry_on_failure()
251 def test_swapmem_sout(self):
252 vmstat_val = vm_stat("Pageout")
253 psutil_val = psutil.swap_memory().sout
254 self.assertEqual(psutil_val, vmstat_val)
255
256 # Not very reliable.
257 # def test_swapmem_total(self):
258 # out = sh('sysctl vm.swapusage')
259 # out = out.replace('vm.swapusage: ', '')
260 # total, used, free = re.findall('\d+.\d+\w', out)
261 # psutil_smem = psutil.swap_memory()
262 # self.assertEqual(psutil_smem.total, human2bytes(total))
263 # self.assertEqual(psutil_smem.used, human2bytes(used))
264 # self.assertEqual(psutil_smem.free, human2bytes(free))
265
266 # --- network
267
268 def test_net_if_stats(self):
269 for name, stats in psutil.net_if_stats().items():
270 try:
271 out = sh("ifconfig %s" % name)
272 except RuntimeError:
273 pass
274 else:
275 self.assertEqual(stats.isup, 'RUNNING' in out, msg=out)
276 self.assertEqual(stats.mtu,
277 int(re.findall(r'mtu (\d+)', out)[0]))
278
279 # --- sensors_battery
280
281 @unittest.skipIf(not HAS_BATTERY, "no battery")
282 def test_sensors_battery(self):
283 out = sh("pmset -g batt")
284 percent = re.search(r"(\d+)%", out).group(1)
285 drawing_from = re.search("Now drawing from '([^']+)'", out).group(1)
286 power_plugged = drawing_from == "AC Power"
287 psutil_result = psutil.sensors_battery()
288 self.assertEqual(psutil_result.power_plugged, power_plugged)
289 self.assertEqual(psutil_result.percent, int(percent))
290
291
292 if __name__ == '__main__':
293 from psutil.tests.runner import run
294 run(__file__)