Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/psutil/tests/test_connections.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
| author | guerler |
|---|---|
| date | Fri, 31 Jul 2020 00:32:28 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| 0:d30785e31577 | 1:56ad4e20f292 |
|---|---|
| 1 #!/usr/bin/env python3 | |
| 2 | |
| 3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. | |
| 4 # Use of this source code is governed by a BSD-style license that can be | |
| 5 # found in the LICENSE file. | |
| 6 | |
| 7 """Tests for net_connections() and Process.connections() APIs.""" | |
| 8 | |
| 9 import contextlib | |
| 10 import errno | |
| 11 import os | |
| 12 import socket | |
| 13 import textwrap | |
| 14 from contextlib import closing | |
| 15 from socket import AF_INET | |
| 16 from socket import AF_INET6 | |
| 17 from socket import SOCK_DGRAM | |
| 18 from socket import SOCK_STREAM | |
| 19 | |
| 20 import psutil | |
| 21 from psutil import FREEBSD | |
| 22 from psutil import LINUX | |
| 23 from psutil import MACOS | |
| 24 from psutil import NETBSD | |
| 25 from psutil import OPENBSD | |
| 26 from psutil import POSIX | |
| 27 from psutil import SUNOS | |
| 28 from psutil import WINDOWS | |
| 29 from psutil._common import supports_ipv6 | |
| 30 from psutil._compat import PY3 | |
| 31 from psutil.tests import AF_UNIX | |
| 32 from psutil.tests import bind_socket | |
| 33 from psutil.tests import bind_unix_socket | |
| 34 from psutil.tests import check_net_address | |
| 35 from psutil.tests import CIRRUS | |
| 36 from psutil.tests import create_sockets | |
| 37 from psutil.tests import enum | |
| 38 from psutil.tests import get_free_port | |
| 39 from psutil.tests import HAS_CONNECTIONS_UNIX | |
| 40 from psutil.tests import PsutilTestCase | |
| 41 from psutil.tests import reap_children | |
| 42 from psutil.tests import retry_on_failure | |
| 43 from psutil.tests import serialrun | |
| 44 from psutil.tests import skip_on_access_denied | |
| 45 from psutil.tests import SKIP_SYSCONS | |
| 46 from psutil.tests import tcp_socketpair | |
| 47 from psutil.tests import TRAVIS | |
| 48 from psutil.tests import unittest | |
| 49 from psutil.tests import unix_socketpair | |
| 50 from psutil.tests import wait_for_file | |
| 51 | |
| 52 | |
| 53 thisproc = psutil.Process() | |
| 54 SOCK_SEQPACKET = getattr(socket, "SOCK_SEQPACKET", object()) | |
| 55 | |
| 56 | |
| 57 @serialrun | |
| 58 class _ConnTestCase(PsutilTestCase): | |
| 59 | |
| 60 def setUp(self): | |
| 61 if not (NETBSD or FREEBSD): | |
| 62 # process opens a UNIX socket to /var/log/run. | |
| 63 cons = thisproc.connections(kind='all') | |
| 64 assert not cons, cons | |
| 65 | |
| 66 def tearDown(self): | |
| 67 if not (FREEBSD or NETBSD): | |
| 68 # Make sure we closed all resources. | |
| 69 # NetBSD opens a UNIX socket to /var/log/run. | |
| 70 cons = thisproc.connections(kind='all') | |
| 71 assert not cons, cons | |
| 72 | |
| 73 def compare_procsys_connections(self, pid, proc_cons, kind='all'): | |
| 74 """Given a process PID and its list of connections compare | |
| 75 those against system-wide connections retrieved via | |
| 76 psutil.net_connections. | |
| 77 """ | |
| 78 try: | |
| 79 sys_cons = psutil.net_connections(kind=kind) | |
| 80 except psutil.AccessDenied: | |
| 81 # On MACOS, system-wide connections are retrieved by iterating | |
| 82 # over all processes | |
| 83 if MACOS: | |
| 84 return | |
| 85 else: | |
| 86 raise | |
| 87 # Filter for this proc PID and exlucde PIDs from the tuple. | |
| 88 sys_cons = [c[:-1] for c in sys_cons if c.pid == pid] | |
| 89 sys_cons.sort() | |
| 90 proc_cons.sort() | |
| 91 self.assertEqual(proc_cons, sys_cons) | |
| 92 | |
| 93 def check_connection_ntuple(self, conn): | |
| 94 """Check validity of a connection namedtuple.""" | |
| 95 def check_ntuple(conn): | |
| 96 has_pid = len(conn) == 7 | |
| 97 self.assertIn(len(conn), (6, 7)) | |
| 98 self.assertEqual(conn[0], conn.fd) | |
| 99 self.assertEqual(conn[1], conn.family) | |
| 100 self.assertEqual(conn[2], conn.type) | |
| 101 self.assertEqual(conn[3], conn.laddr) | |
| 102 self.assertEqual(conn[4], conn.raddr) | |
| 103 self.assertEqual(conn[5], conn.status) | |
| 104 if has_pid: | |
| 105 self.assertEqual(conn[6], conn.pid) | |
| 106 | |
| 107 def check_family(conn): | |
| 108 self.assertIn(conn.family, (AF_INET, AF_INET6, AF_UNIX)) | |
| 109 if enum is not None: | |
| 110 assert isinstance(conn.family, enum.IntEnum), conn | |
| 111 else: | |
| 112 assert isinstance(conn.family, int), conn | |
| 113 if conn.family == AF_INET: | |
| 114 # actually try to bind the local socket; ignore IPv6 | |
| 115 # sockets as their address might be represented as | |
| 116 # an IPv4-mapped-address (e.g. "::127.0.0.1") | |
| 117 # and that's rejected by bind() | |
| 118 s = socket.socket(conn.family, conn.type) | |
| 119 with contextlib.closing(s): | |
| 120 try: | |
| 121 s.bind((conn.laddr[0], 0)) | |
| 122 except socket.error as err: | |
| 123 if err.errno != errno.EADDRNOTAVAIL: | |
| 124 raise | |
| 125 elif conn.family == AF_UNIX: | |
| 126 self.assertEqual(conn.status, psutil.CONN_NONE) | |
| 127 | |
| 128 def check_type(conn): | |
| 129 # SOCK_SEQPACKET may happen in case of AF_UNIX socks | |
| 130 self.assertIn(conn.type, (SOCK_STREAM, SOCK_DGRAM, SOCK_SEQPACKET)) | |
| 131 if enum is not None: | |
| 132 assert isinstance(conn.type, enum.IntEnum), conn | |
| 133 else: | |
| 134 assert isinstance(conn.type, int), conn | |
| 135 if conn.type == SOCK_DGRAM: | |
| 136 self.assertEqual(conn.status, psutil.CONN_NONE) | |
| 137 | |
| 138 def check_addrs(conn): | |
| 139 # check IP address and port sanity | |
| 140 for addr in (conn.laddr, conn.raddr): | |
| 141 if conn.family in (AF_INET, AF_INET6): | |
| 142 self.assertIsInstance(addr, tuple) | |
| 143 if not addr: | |
| 144 continue | |
| 145 self.assertIsInstance(addr.port, int) | |
| 146 assert 0 <= addr.port <= 65535, addr.port | |
| 147 check_net_address(addr.ip, conn.family) | |
| 148 elif conn.family == AF_UNIX: | |
| 149 self.assertIsInstance(addr, str) | |
| 150 | |
| 151 def check_status(conn): | |
| 152 self.assertIsInstance(conn.status, str) | |
| 153 valids = [getattr(psutil, x) for x in dir(psutil) | |
| 154 if x.startswith('CONN_')] | |
| 155 self.assertIn(conn.status, valids) | |
| 156 if conn.family in (AF_INET, AF_INET6) and conn.type == SOCK_STREAM: | |
| 157 self.assertNotEqual(conn.status, psutil.CONN_NONE) | |
| 158 else: | |
| 159 self.assertEqual(conn.status, psutil.CONN_NONE) | |
| 160 | |
| 161 check_ntuple(conn) | |
| 162 check_family(conn) | |
| 163 check_type(conn) | |
| 164 check_addrs(conn) | |
| 165 check_status(conn) | |
| 166 | |
| 167 | |
| 168 class TestBasicOperations(_ConnTestCase): | |
| 169 | |
| 170 @unittest.skipIf(SKIP_SYSCONS, "requires root") | |
| 171 def test_system(self): | |
| 172 with create_sockets(): | |
| 173 for conn in psutil.net_connections(kind='all'): | |
| 174 self.check_connection_ntuple(conn) | |
| 175 | |
| 176 def test_process(self): | |
| 177 with create_sockets(): | |
| 178 for conn in psutil.Process().connections(kind='all'): | |
| 179 self.check_connection_ntuple(conn) | |
| 180 | |
| 181 def test_invalid_kind(self): | |
| 182 self.assertRaises(ValueError, thisproc.connections, kind='???') | |
| 183 self.assertRaises(ValueError, psutil.net_connections, kind='???') | |
| 184 | |
| 185 | |
| 186 @serialrun | |
| 187 class TestUnconnectedSockets(_ConnTestCase): | |
| 188 """Tests sockets which are open but not connected to anything.""" | |
| 189 | |
| 190 def get_conn_from_sock(self, sock): | |
| 191 cons = thisproc.connections(kind='all') | |
| 192 smap = dict([(c.fd, c) for c in cons]) | |
| 193 if NETBSD or FREEBSD: | |
| 194 # NetBSD opens a UNIX socket to /var/log/run | |
| 195 # so there may be more connections. | |
| 196 return smap[sock.fileno()] | |
| 197 else: | |
| 198 self.assertEqual(len(cons), 1) | |
| 199 if cons[0].fd != -1: | |
| 200 self.assertEqual(smap[sock.fileno()].fd, sock.fileno()) | |
| 201 return cons[0] | |
| 202 | |
| 203 def check_socket(self, sock): | |
| 204 """Given a socket, makes sure it matches the one obtained | |
| 205 via psutil. It assumes this process created one connection | |
| 206 only (the one supposed to be checked). | |
| 207 """ | |
| 208 conn = self.get_conn_from_sock(sock) | |
| 209 self.check_connection_ntuple(conn) | |
| 210 | |
| 211 # fd, family, type | |
| 212 if conn.fd != -1: | |
| 213 self.assertEqual(conn.fd, sock.fileno()) | |
| 214 self.assertEqual(conn.family, sock.family) | |
| 215 # see: http://bugs.python.org/issue30204 | |
| 216 self.assertEqual( | |
| 217 conn.type, sock.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE)) | |
| 218 | |
| 219 # local address | |
| 220 laddr = sock.getsockname() | |
| 221 if not laddr and PY3 and isinstance(laddr, bytes): | |
| 222 # See: http://bugs.python.org/issue30205 | |
| 223 laddr = laddr.decode() | |
| 224 if sock.family == AF_INET6: | |
| 225 laddr = laddr[:2] | |
| 226 if sock.family == AF_UNIX and OPENBSD: | |
| 227 # No addresses are set for UNIX sockets on OpenBSD. | |
| 228 pass | |
| 229 else: | |
| 230 self.assertEqual(conn.laddr, laddr) | |
| 231 | |
| 232 # XXX Solaris can't retrieve system-wide UNIX sockets | |
| 233 if sock.family == AF_UNIX and HAS_CONNECTIONS_UNIX: | |
| 234 cons = thisproc.connections(kind='all') | |
| 235 self.compare_procsys_connections(os.getpid(), cons, kind='all') | |
| 236 return conn | |
| 237 | |
| 238 def test_tcp_v4(self): | |
| 239 addr = ("127.0.0.1", get_free_port()) | |
| 240 with closing(bind_socket(AF_INET, SOCK_STREAM, addr=addr)) as sock: | |
| 241 conn = self.check_socket(sock) | |
| 242 assert not conn.raddr | |
| 243 self.assertEqual(conn.status, psutil.CONN_LISTEN) | |
| 244 | |
| 245 @unittest.skipIf(not supports_ipv6(), "IPv6 not supported") | |
| 246 def test_tcp_v6(self): | |
| 247 addr = ("::1", get_free_port()) | |
| 248 with closing(bind_socket(AF_INET6, SOCK_STREAM, addr=addr)) as sock: | |
| 249 conn = self.check_socket(sock) | |
| 250 assert not conn.raddr | |
| 251 self.assertEqual(conn.status, psutil.CONN_LISTEN) | |
| 252 | |
| 253 def test_udp_v4(self): | |
| 254 addr = ("127.0.0.1", get_free_port()) | |
| 255 with closing(bind_socket(AF_INET, SOCK_DGRAM, addr=addr)) as sock: | |
| 256 conn = self.check_socket(sock) | |
| 257 assert not conn.raddr | |
| 258 self.assertEqual(conn.status, psutil.CONN_NONE) | |
| 259 | |
| 260 @unittest.skipIf(not supports_ipv6(), "IPv6 not supported") | |
| 261 def test_udp_v6(self): | |
| 262 addr = ("::1", get_free_port()) | |
| 263 with closing(bind_socket(AF_INET6, SOCK_DGRAM, addr=addr)) as sock: | |
| 264 conn = self.check_socket(sock) | |
| 265 assert not conn.raddr | |
| 266 self.assertEqual(conn.status, psutil.CONN_NONE) | |
| 267 | |
| 268 @unittest.skipIf(not POSIX, 'POSIX only') | |
| 269 def test_unix_tcp(self): | |
| 270 testfn = self.get_testfn() | |
| 271 with closing(bind_unix_socket(testfn, type=SOCK_STREAM)) as sock: | |
| 272 conn = self.check_socket(sock) | |
| 273 assert not conn.raddr | |
| 274 self.assertEqual(conn.status, psutil.CONN_NONE) | |
| 275 | |
| 276 @unittest.skipIf(not POSIX, 'POSIX only') | |
| 277 def test_unix_udp(self): | |
| 278 testfn = self.get_testfn() | |
| 279 with closing(bind_unix_socket(testfn, type=SOCK_STREAM)) as sock: | |
| 280 conn = self.check_socket(sock) | |
| 281 assert not conn.raddr | |
| 282 self.assertEqual(conn.status, psutil.CONN_NONE) | |
| 283 | |
| 284 | |
| 285 @serialrun | |
| 286 class TestConnectedSocket(_ConnTestCase): | |
| 287 """Test socket pairs which are are actually connected to | |
| 288 each other. | |
| 289 """ | |
| 290 | |
| 291 # On SunOS, even after we close() it, the server socket stays around | |
| 292 # in TIME_WAIT state. | |
| 293 @unittest.skipIf(SUNOS, "unreliable on SUONS") | |
| 294 def test_tcp(self): | |
| 295 addr = ("127.0.0.1", get_free_port()) | |
| 296 assert not thisproc.connections(kind='tcp4') | |
| 297 server, client = tcp_socketpair(AF_INET, addr=addr) | |
| 298 try: | |
| 299 cons = thisproc.connections(kind='tcp4') | |
| 300 self.assertEqual(len(cons), 2) | |
| 301 self.assertEqual(cons[0].status, psutil.CONN_ESTABLISHED) | |
| 302 self.assertEqual(cons[1].status, psutil.CONN_ESTABLISHED) | |
| 303 # May not be fast enough to change state so it stays | |
| 304 # commenteed. | |
| 305 # client.close() | |
| 306 # cons = thisproc.connections(kind='all') | |
| 307 # self.assertEqual(len(cons), 1) | |
| 308 # self.assertEqual(cons[0].status, psutil.CONN_CLOSE_WAIT) | |
| 309 finally: | |
| 310 server.close() | |
| 311 client.close() | |
| 312 | |
| 313 @unittest.skipIf(not POSIX, 'POSIX only') | |
| 314 def test_unix(self): | |
| 315 testfn = self.get_testfn() | |
| 316 server, client = unix_socketpair(testfn) | |
| 317 try: | |
| 318 cons = thisproc.connections(kind='unix') | |
| 319 assert not (cons[0].laddr and cons[0].raddr) | |
| 320 assert not (cons[1].laddr and cons[1].raddr) | |
| 321 if NETBSD or FREEBSD: | |
| 322 # On NetBSD creating a UNIX socket will cause | |
| 323 # a UNIX connection to /var/run/log. | |
| 324 cons = [c for c in cons if c.raddr != '/var/run/log'] | |
| 325 if CIRRUS: | |
| 326 cons = [c for c in cons if c.fd in | |
| 327 (server.fileno(), client.fileno())] | |
| 328 self.assertEqual(len(cons), 2, msg=cons) | |
| 329 if LINUX or FREEBSD or SUNOS: | |
| 330 # remote path is never set | |
| 331 self.assertEqual(cons[0].raddr, "") | |
| 332 self.assertEqual(cons[1].raddr, "") | |
| 333 # one local address should though | |
| 334 self.assertEqual(testfn, cons[0].laddr or cons[1].laddr) | |
| 335 elif OPENBSD: | |
| 336 # No addresses whatsoever here. | |
| 337 for addr in (cons[0].laddr, cons[0].raddr, | |
| 338 cons[1].laddr, cons[1].raddr): | |
| 339 self.assertEqual(addr, "") | |
| 340 else: | |
| 341 # On other systems either the laddr or raddr | |
| 342 # of both peers are set. | |
| 343 self.assertEqual(cons[0].laddr or cons[1].laddr, testfn) | |
| 344 self.assertEqual(cons[0].raddr or cons[1].raddr, testfn) | |
| 345 finally: | |
| 346 server.close() | |
| 347 client.close() | |
| 348 | |
| 349 | |
| 350 class TestFilters(_ConnTestCase): | |
| 351 | |
| 352 def test_filters(self): | |
| 353 def check(kind, families, types): | |
| 354 for conn in thisproc.connections(kind=kind): | |
| 355 self.assertIn(conn.family, families) | |
| 356 self.assertIn(conn.type, types) | |
| 357 if not SKIP_SYSCONS: | |
| 358 for conn in psutil.net_connections(kind=kind): | |
| 359 self.assertIn(conn.family, families) | |
| 360 self.assertIn(conn.type, types) | |
| 361 | |
| 362 with create_sockets(): | |
| 363 check('all', | |
| 364 [AF_INET, AF_INET6, AF_UNIX], | |
| 365 [SOCK_STREAM, SOCK_DGRAM, SOCK_SEQPACKET]) | |
| 366 check('inet', | |
| 367 [AF_INET, AF_INET6], | |
| 368 [SOCK_STREAM, SOCK_DGRAM]) | |
| 369 check('inet4', | |
| 370 [AF_INET], | |
| 371 [SOCK_STREAM, SOCK_DGRAM]) | |
| 372 check('tcp', | |
| 373 [AF_INET, AF_INET6], | |
| 374 [SOCK_STREAM]) | |
| 375 check('tcp4', | |
| 376 [AF_INET], | |
| 377 [SOCK_STREAM]) | |
| 378 check('tcp6', | |
| 379 [AF_INET6], | |
| 380 [SOCK_STREAM]) | |
| 381 check('udp', | |
| 382 [AF_INET, AF_INET6], | |
| 383 [SOCK_DGRAM]) | |
| 384 check('udp4', | |
| 385 [AF_INET], | |
| 386 [SOCK_DGRAM]) | |
| 387 check('udp6', | |
| 388 [AF_INET6], | |
| 389 [SOCK_DGRAM]) | |
| 390 if HAS_CONNECTIONS_UNIX: | |
| 391 check('unix', | |
| 392 [AF_UNIX], | |
| 393 [SOCK_STREAM, SOCK_DGRAM, SOCK_SEQPACKET]) | |
| 394 | |
| 395 @skip_on_access_denied(only_if=MACOS) | |
| 396 def test_combos(self): | |
| 397 reap_children() | |
| 398 | |
| 399 def check_conn(proc, conn, family, type, laddr, raddr, status, kinds): | |
| 400 all_kinds = ("all", "inet", "inet4", "inet6", "tcp", "tcp4", | |
| 401 "tcp6", "udp", "udp4", "udp6") | |
| 402 self.check_connection_ntuple(conn) | |
| 403 self.assertEqual(conn.family, family) | |
| 404 self.assertEqual(conn.type, type) | |
| 405 self.assertEqual(conn.laddr, laddr) | |
| 406 self.assertEqual(conn.raddr, raddr) | |
| 407 self.assertEqual(conn.status, status) | |
| 408 for kind in all_kinds: | |
| 409 cons = proc.connections(kind=kind) | |
| 410 if kind in kinds: | |
| 411 assert cons | |
| 412 else: | |
| 413 assert not cons, cons | |
| 414 # compare against system-wide connections | |
| 415 # XXX Solaris can't retrieve system-wide UNIX | |
| 416 # sockets. | |
| 417 if HAS_CONNECTIONS_UNIX: | |
| 418 self.compare_procsys_connections(proc.pid, [conn]) | |
| 419 | |
| 420 tcp_template = textwrap.dedent(""" | |
| 421 import socket, time | |
| 422 s = socket.socket({family}, socket.SOCK_STREAM) | |
| 423 s.bind(('{addr}', 0)) | |
| 424 s.listen(5) | |
| 425 with open('{testfn}', 'w') as f: | |
| 426 f.write(str(s.getsockname()[:2])) | |
| 427 time.sleep(60) | |
| 428 """) | |
| 429 | |
| 430 udp_template = textwrap.dedent(""" | |
| 431 import socket, time | |
| 432 s = socket.socket({family}, socket.SOCK_DGRAM) | |
| 433 s.bind(('{addr}', 0)) | |
| 434 with open('{testfn}', 'w') as f: | |
| 435 f.write(str(s.getsockname()[:2])) | |
| 436 time.sleep(60) | |
| 437 """) | |
| 438 | |
| 439 # must be relative on Windows | |
| 440 testfile = os.path.basename(self.get_testfn(dir=os.getcwd())) | |
| 441 tcp4_template = tcp_template.format( | |
| 442 family=int(AF_INET), addr="127.0.0.1", testfn=testfile) | |
| 443 udp4_template = udp_template.format( | |
| 444 family=int(AF_INET), addr="127.0.0.1", testfn=testfile) | |
| 445 tcp6_template = tcp_template.format( | |
| 446 family=int(AF_INET6), addr="::1", testfn=testfile) | |
| 447 udp6_template = udp_template.format( | |
| 448 family=int(AF_INET6), addr="::1", testfn=testfile) | |
| 449 | |
| 450 # launch various subprocess instantiating a socket of various | |
| 451 # families and types to enrich psutil results | |
| 452 tcp4_proc = self.pyrun(tcp4_template) | |
| 453 tcp4_addr = eval(wait_for_file(testfile, delete=True)) | |
| 454 udp4_proc = self.pyrun(udp4_template) | |
| 455 udp4_addr = eval(wait_for_file(testfile, delete=True)) | |
| 456 if supports_ipv6(): | |
| 457 tcp6_proc = self.pyrun(tcp6_template) | |
| 458 tcp6_addr = eval(wait_for_file(testfile, delete=True)) | |
| 459 udp6_proc = self.pyrun(udp6_template) | |
| 460 udp6_addr = eval(wait_for_file(testfile, delete=True)) | |
| 461 else: | |
| 462 tcp6_proc = None | |
| 463 udp6_proc = None | |
| 464 tcp6_addr = None | |
| 465 udp6_addr = None | |
| 466 | |
| 467 for p in thisproc.children(): | |
| 468 cons = p.connections() | |
| 469 self.assertEqual(len(cons), 1) | |
| 470 for conn in cons: | |
| 471 # TCP v4 | |
| 472 if p.pid == tcp4_proc.pid: | |
| 473 check_conn(p, conn, AF_INET, SOCK_STREAM, tcp4_addr, (), | |
| 474 psutil.CONN_LISTEN, | |
| 475 ("all", "inet", "inet4", "tcp", "tcp4")) | |
| 476 # UDP v4 | |
| 477 elif p.pid == udp4_proc.pid: | |
| 478 check_conn(p, conn, AF_INET, SOCK_DGRAM, udp4_addr, (), | |
| 479 psutil.CONN_NONE, | |
| 480 ("all", "inet", "inet4", "udp", "udp4")) | |
| 481 # TCP v6 | |
| 482 elif p.pid == getattr(tcp6_proc, "pid", None): | |
| 483 check_conn(p, conn, AF_INET6, SOCK_STREAM, tcp6_addr, (), | |
| 484 psutil.CONN_LISTEN, | |
| 485 ("all", "inet", "inet6", "tcp", "tcp6")) | |
| 486 # UDP v6 | |
| 487 elif p.pid == getattr(udp6_proc, "pid", None): | |
| 488 check_conn(p, conn, AF_INET6, SOCK_DGRAM, udp6_addr, (), | |
| 489 psutil.CONN_NONE, | |
| 490 ("all", "inet", "inet6", "udp", "udp6")) | |
| 491 | |
| 492 def test_count(self): | |
| 493 with create_sockets(): | |
| 494 # tcp | |
| 495 cons = thisproc.connections(kind='tcp') | |
| 496 self.assertEqual(len(cons), 2 if supports_ipv6() else 1) | |
| 497 for conn in cons: | |
| 498 self.assertIn(conn.family, (AF_INET, AF_INET6)) | |
| 499 self.assertEqual(conn.type, SOCK_STREAM) | |
| 500 # tcp4 | |
| 501 cons = thisproc.connections(kind='tcp4') | |
| 502 self.assertEqual(len(cons), 1) | |
| 503 self.assertEqual(cons[0].family, AF_INET) | |
| 504 self.assertEqual(cons[0].type, SOCK_STREAM) | |
| 505 # tcp6 | |
| 506 if supports_ipv6(): | |
| 507 cons = thisproc.connections(kind='tcp6') | |
| 508 self.assertEqual(len(cons), 1) | |
| 509 self.assertEqual(cons[0].family, AF_INET6) | |
| 510 self.assertEqual(cons[0].type, SOCK_STREAM) | |
| 511 # udp | |
| 512 cons = thisproc.connections(kind='udp') | |
| 513 self.assertEqual(len(cons), 2 if supports_ipv6() else 1) | |
| 514 for conn in cons: | |
| 515 self.assertIn(conn.family, (AF_INET, AF_INET6)) | |
| 516 self.assertEqual(conn.type, SOCK_DGRAM) | |
| 517 # udp4 | |
| 518 cons = thisproc.connections(kind='udp4') | |
| 519 self.assertEqual(len(cons), 1) | |
| 520 self.assertEqual(cons[0].family, AF_INET) | |
| 521 self.assertEqual(cons[0].type, SOCK_DGRAM) | |
| 522 # udp6 | |
| 523 if supports_ipv6(): | |
| 524 cons = thisproc.connections(kind='udp6') | |
| 525 self.assertEqual(len(cons), 1) | |
| 526 self.assertEqual(cons[0].family, AF_INET6) | |
| 527 self.assertEqual(cons[0].type, SOCK_DGRAM) | |
| 528 # inet | |
| 529 cons = thisproc.connections(kind='inet') | |
| 530 self.assertEqual(len(cons), 4 if supports_ipv6() else 2) | |
| 531 for conn in cons: | |
| 532 self.assertIn(conn.family, (AF_INET, AF_INET6)) | |
| 533 self.assertIn(conn.type, (SOCK_STREAM, SOCK_DGRAM)) | |
| 534 # inet6 | |
| 535 if supports_ipv6(): | |
| 536 cons = thisproc.connections(kind='inet6') | |
| 537 self.assertEqual(len(cons), 2) | |
| 538 for conn in cons: | |
| 539 self.assertEqual(conn.family, AF_INET6) | |
| 540 self.assertIn(conn.type, (SOCK_STREAM, SOCK_DGRAM)) | |
| 541 # Skipped on BSD becayse by default the Python process | |
| 542 # creates a UNIX socket to '/var/run/log'. | |
| 543 if HAS_CONNECTIONS_UNIX and not (FREEBSD or NETBSD): | |
| 544 cons = thisproc.connections(kind='unix') | |
| 545 self.assertEqual(len(cons), 3) | |
| 546 for conn in cons: | |
| 547 self.assertEqual(conn.family, AF_UNIX) | |
| 548 self.assertIn(conn.type, (SOCK_STREAM, SOCK_DGRAM)) | |
| 549 | |
| 550 | |
| 551 @unittest.skipIf(SKIP_SYSCONS, "requires root") | |
| 552 class TestSystemWideConnections(_ConnTestCase): | |
| 553 """Tests for net_connections().""" | |
| 554 | |
| 555 def test_it(self): | |
| 556 def check(cons, families, types_): | |
| 557 for conn in cons: | |
| 558 self.assertIn(conn.family, families, msg=conn) | |
| 559 if conn.family != AF_UNIX: | |
| 560 self.assertIn(conn.type, types_, msg=conn) | |
| 561 self.check_connection_ntuple(conn) | |
| 562 | |
| 563 with create_sockets(): | |
| 564 from psutil._common import conn_tmap | |
| 565 for kind, groups in conn_tmap.items(): | |
| 566 # XXX: SunOS does not retrieve UNIX sockets. | |
| 567 if kind == 'unix' and not HAS_CONNECTIONS_UNIX: | |
| 568 continue | |
| 569 families, types_ = groups | |
| 570 cons = psutil.net_connections(kind) | |
| 571 self.assertEqual(len(cons), len(set(cons))) | |
| 572 check(cons, families, types_) | |
| 573 | |
| 574 # See: https://travis-ci.org/giampaolo/psutil/jobs/237566297 | |
| 575 @unittest.skipIf(MACOS and TRAVIS, "unreliable on MACOS + TRAVIS") | |
| 576 @retry_on_failure() | |
| 577 def test_multi_sockets_procs(self): | |
| 578 # Creates multiple sub processes, each creating different | |
| 579 # sockets. For each process check that proc.connections() | |
| 580 # and net_connections() return the same results. | |
| 581 # This is done mainly to check whether net_connections()'s | |
| 582 # pid is properly set, see: | |
| 583 # https://github.com/giampaolo/psutil/issues/1013 | |
| 584 with create_sockets() as socks: | |
| 585 expected = len(socks) | |
| 586 pids = [] | |
| 587 times = 10 | |
| 588 fnames = [] | |
| 589 for i in range(times): | |
| 590 fname = self.get_testfn() | |
| 591 fnames.append(fname) | |
| 592 src = textwrap.dedent("""\ | |
| 593 import time, os | |
| 594 from psutil.tests import create_sockets | |
| 595 with create_sockets(): | |
| 596 with open(r'%s', 'w') as f: | |
| 597 f.write("hello") | |
| 598 time.sleep(60) | |
| 599 """ % fname) | |
| 600 sproc = self.pyrun(src) | |
| 601 pids.append(sproc.pid) | |
| 602 | |
| 603 # sync | |
| 604 for fname in fnames: | |
| 605 wait_for_file(fname) | |
| 606 | |
| 607 syscons = [x for x in psutil.net_connections(kind='all') if x.pid | |
| 608 in pids] | |
| 609 for pid in pids: | |
| 610 self.assertEqual(len([x for x in syscons if x.pid == pid]), | |
| 611 expected) | |
| 612 p = psutil.Process(pid) | |
| 613 self.assertEqual(len(p.connections('all')), expected) | |
| 614 | |
| 615 | |
| 616 class TestMisc(PsutilTestCase): | |
| 617 | |
| 618 def test_connection_constants(self): | |
| 619 ints = [] | |
| 620 strs = [] | |
| 621 for name in dir(psutil): | |
| 622 if name.startswith('CONN_'): | |
| 623 num = getattr(psutil, name) | |
| 624 str_ = str(num) | |
| 625 assert str_.isupper(), str_ | |
| 626 self.assertNotIn(str, strs) | |
| 627 self.assertNotIn(num, ints) | |
| 628 ints.append(num) | |
| 629 strs.append(str_) | |
| 630 if SUNOS: | |
| 631 psutil.CONN_IDLE | |
| 632 psutil.CONN_BOUND | |
| 633 if WINDOWS: | |
| 634 psutil.CONN_DELETE_TCB | |
| 635 | |
| 636 | |
| 637 if __name__ == '__main__': | |
| 638 from psutil.tests.runner import run_from_name | |
| 639 run_from_name(__file__) |
