### view env/lib/python3.9/site-packages/networkx/algorithms/connectivity/tests/test_edge_kcomponents.py @ 0:4f3585e2f14bdraftdefaulttip

author shellac Mon, 22 Mar 2021 18:12:50 +0000
line wrap: on
line source
```
import networkx as nx
import itertools as it
import pytest
from networkx.utils import pairwise
from networkx.algorithms.connectivity import bridge_components, EdgeComponentAuxGraph
from networkx.algorithms.connectivity.edge_kcomponents import general_k_edge_subgraphs

# ----------------
# Helper functions
# ----------------

def fset(list_of_sets):
""" allows == to be used for list of sets """
return set(map(frozenset, list_of_sets))

def _assert_subgraph_edge_connectivity(G, ccs_subgraph, k):
"""
tests properties of k-edge-connected subgraphs

the actual edge connectivity should be no less than k unless the cc is a
single node.
"""
for cc in ccs_subgraph:
C = G.subgraph(cc)
if len(cc) > 1:
connectivity = nx.edge_connectivity(C)
assert connectivity >= k

def _memo_connectivity(G, u, v, memo):
edge = (u, v)
if edge in memo:
return memo[edge]
if not G.is_directed():
redge = (v, u)
if redge in memo:
return memo[redge]
memo[edge] = nx.edge_connectivity(G, *edge)
return memo[edge]

def _all_pairs_connectivity(G, cc, k, memo):
# Brute force check
for u, v in it.combinations(cc, 2):
# Use a memoization dict to save on computation
connectivity = _memo_connectivity(G, u, v, memo)
if G.is_directed():
connectivity = min(connectivity, _memo_connectivity(G, v, u, memo))
assert connectivity >= k

def _assert_local_cc_edge_connectivity(G, ccs_local, k, memo):
"""
tests properties of k-edge-connected components

the local edge connectivity between each pair of nodes in the the original
graph should be no less than k unless the cc is a single node.
"""
for cc in ccs_local:
if len(cc) > 1:
# Strategy for testing a bit faster: If the subgraph has high edge
# connectivity then it must have local connectivity
C = G.subgraph(cc)
connectivity = nx.edge_connectivity(C)
if connectivity < k:
# Otherwise do the brute force (with memoization) check
_all_pairs_connectivity(G, cc, k, memo)

# Helper function
def _check_edge_connectivity(G):
"""
Helper - generates all k-edge-components using the aux graph.  Checks the
both local and subgraph edge connectivity of each cc. Also checks that
alternate methods of computing the k-edge-ccs generate the same result.
"""
# Construct the auxiliary graph that can be used to make each k-cc or k-sub
aux_graph = EdgeComponentAuxGraph.construct(G)

# memoize the local connectivity in this graph
memo = {}

for k in it.count(1):
# Test "local" k-edge-components and k-edge-subgraphs
ccs_local = fset(aux_graph.k_edge_components(k))
ccs_subgraph = fset(aux_graph.k_edge_subgraphs(k))

# Check connectivity properties that should be garuenteed by the
# algorithms.
_assert_local_cc_edge_connectivity(G, ccs_local, k, memo)
_assert_subgraph_edge_connectivity(G, ccs_subgraph, k)

if k == 1 or k == 2 and not G.is_directed():
assert (
ccs_local == ccs_subgraph
), "Subgraphs and components should be the same when k == 1 or (k == 2 and not G.directed())"

if G.is_directed():
# Test special case methods are the same as the aux graph
if k == 1:
alt_sccs = fset(nx.strongly_connected_components(G))
assert alt_sccs == ccs_local, "k=1 failed alt"
assert alt_sccs == ccs_subgraph, "k=1 failed alt"
else:
# Test special case methods are the same as the aux graph
if k == 1:
alt_ccs = fset(nx.connected_components(G))
assert alt_ccs == ccs_local, "k=1 failed alt"
assert alt_ccs == ccs_subgraph, "k=1 failed alt"
elif k == 2:
alt_bridge_ccs = fset(bridge_components(G))
assert alt_bridge_ccs == ccs_local, "k=2 failed alt"
assert alt_bridge_ccs == ccs_subgraph, "k=2 failed alt"
# if new methods for k == 3 or k == 4 are implemented add them here

# Check the general subgraph method works by itself
alt_subgraph_ccs = fset(
[set(C.nodes()) for C in general_k_edge_subgraphs(G, k=k)]
)
assert alt_subgraph_ccs == ccs_subgraph, "alt subgraph method failed"

# Stop once k is larger than all special case methods
# and we cannot break down ccs any further.
if k > 2 and all(len(cc) == 1 for cc in ccs_local):
break

# ----------------
# Misc tests
# ----------------

def test_zero_k_exception():
G = nx.Graph()
# functions that return generators error immediately
pytest.raises(ValueError, nx.k_edge_components, G, k=0)
pytest.raises(ValueError, nx.k_edge_subgraphs, G, k=0)

# actual generators only error when you get the first item
aux_graph = EdgeComponentAuxGraph.construct(G)
pytest.raises(ValueError, list, aux_graph.k_edge_components(k=0))
pytest.raises(ValueError, list, aux_graph.k_edge_subgraphs(k=0))

pytest.raises(ValueError, list, general_k_edge_subgraphs(G, k=0))

def test_empty_input():
G = nx.Graph()
assert [] == list(nx.k_edge_components(G, k=5))
assert [] == list(nx.k_edge_subgraphs(G, k=5))

G = nx.DiGraph()
assert [] == list(nx.k_edge_components(G, k=5))
assert [] == list(nx.k_edge_subgraphs(G, k=5))

def test_not_implemented():
G = nx.MultiGraph()
pytest.raises(nx.NetworkXNotImplemented, EdgeComponentAuxGraph.construct, G)
pytest.raises(nx.NetworkXNotImplemented, nx.k_edge_components, G, k=2)
pytest.raises(nx.NetworkXNotImplemented, nx.k_edge_subgraphs, G, k=2)
pytest.raises(nx.NetworkXNotImplemented, bridge_components, G)
pytest.raises(nx.NetworkXNotImplemented, bridge_components, nx.DiGraph())

def test_general_k_edge_subgraph_quick_return():
# tests quick return optimization
G = nx.Graph()
subgraphs = list(general_k_edge_subgraphs(G, k=1))
assert len(subgraphs) == 1
for subgraph in subgraphs:
assert subgraph.number_of_nodes() == 1

subgraphs = list(general_k_edge_subgraphs(G, k=1))
assert len(subgraphs) == 2
for subgraph in subgraphs:
assert subgraph.number_of_nodes() == 1

# ----------------
# Undirected tests
# ----------------

def test_random_gnp():
# seeds = [1550709854, 1309423156, 4208992358, 2785630813, 1915069929]
seeds = [12, 13]

for seed in seeds:
G = nx.gnp_random_graph(20, 0.2, seed=seed)
_check_edge_connectivity(G)

def test_configuration():
# seeds = [2718183590, 2470619828, 1694705158, 3001036531, 2401251497]
seeds = [14, 15]
for seed in seeds:
deg_seq = nx.random_powerlaw_tree_sequence(20, seed=seed, tries=5000)
G = nx.Graph(nx.configuration_model(deg_seq, seed=seed))
G.remove_edges_from(nx.selfloop_edges(G))
_check_edge_connectivity(G)

def test_shell():
# seeds = [2057382236, 3331169846, 1840105863, 476020778, 2247498425]
seeds = 
for seed in seeds:
constructor = [(12, 70, 0.8), (15, 40, 0.6)]
G = nx.random_shell_graph(constructor, seed=seed)
_check_edge_connectivity(G)

def test_karate():
G = nx.karate_club_graph()
_check_edge_connectivity(G)

def test_tarjan_bridge():
# graph from tarjan paper
# RE Tarjan - "A note on finding the bridges of a graph"
# Information Processing Letters, 1974 - Elsevier
# doi:10.1016/0020-0190(74)90003-9.
# define 2-connected components and bridges
ccs = [
(1, 2, 4, 3, 1, 4),
(5, 6, 7, 5),
(8, 9, 10, 8),
(17, 18, 16, 15, 17),
(11, 12, 14, 13, 11, 14),
]
bridges = [(4, 8), (3, 5), (3, 17)]
G = nx.Graph(it.chain(*(pairwise(path) for path in ccs + bridges)))
_check_edge_connectivity(G)

def test_bridge_cc():
# define 2-connected components and bridges
cc2 = [(1, 2, 4, 3, 1, 4), (8, 9, 10, 8), (11, 12, 13, 11)]
bridges = [(4, 8), (3, 5), (20, 21), (22, 23, 24)]
G = nx.Graph(it.chain(*(pairwise(path) for path in cc2 + bridges)))
bridge_ccs = fset(bridge_components(G))
target_ccs = fset(
[{1, 2, 3, 4}, {5}, {8, 9, 10}, {11, 12, 13}, {20}, {21}, {22}, {23}, {24}]
)
assert bridge_ccs == target_ccs
_check_edge_connectivity(G)

def test_undirected_aux_graph():
# Graph similar to the one in
# http://journals.plos.org/plosone/article?id=10.1371/journal.pone.0136264
a, b, c, d, e, f, g, h, i = "abcdefghi"
paths = [
(a, d, b, f, c),
(a, e, b),
(a, e, b, c, g, b, a),
(c, b),
(f, g, f),
(h, i),
]
G = nx.Graph(it.chain(*[pairwise(path) for path in paths]))
aux_graph = EdgeComponentAuxGraph.construct(G)

components_1 = fset(aux_graph.k_edge_subgraphs(k=1))
target_1 = fset([{a, b, c, d, e, f, g}, {h, i}])
assert target_1 == components_1

# Check that the undirected case for k=1 agrees with CCs
alt_1 = fset(nx.k_edge_subgraphs(G, k=1))
assert alt_1 == components_1

components_2 = fset(aux_graph.k_edge_subgraphs(k=2))
target_2 = fset([{a, b, c, d, e, f, g}, {h}, {i}])
assert target_2 == components_2

# Check that the undirected case for k=2 agrees with bridge components
alt_2 = fset(nx.k_edge_subgraphs(G, k=2))
assert alt_2 == components_2

components_3 = fset(aux_graph.k_edge_subgraphs(k=3))
target_3 = fset([{a}, {b, c, f, g}, {d}, {e}, {h}, {i}])
assert target_3 == components_3

components_4 = fset(aux_graph.k_edge_subgraphs(k=4))
target_4 = fset([{a}, {b}, {c}, {d}, {e}, {f}, {g}, {h}, {i}])
assert target_4 == components_4

_check_edge_connectivity(G)

def test_local_subgraph_difference():
paths = [
(11, 12, 13, 14, 11, 13, 14, 12),  # first 4-clique
(21, 22, 23, 24, 21, 23, 24, 22),  # second 4-clique
# paths connecting each node of the 4 cliques
(11, 101, 21),
(12, 102, 22),
(13, 103, 23),
(14, 104, 24),
]
G = nx.Graph(it.chain(*[pairwise(path) for path in paths]))
aux_graph = EdgeComponentAuxGraph.construct(G)

# Each clique is returned separately in k-edge-subgraphs
subgraph_ccs = fset(aux_graph.k_edge_subgraphs(3))
subgraph_target = fset(
[{101}, {102}, {103}, {104}, {21, 22, 23, 24}, {11, 12, 13, 14}]
)
assert subgraph_ccs == subgraph_target

# But in k-edge-ccs they are returned together
# because they are locally 3-edge-connected
local_ccs = fset(aux_graph.k_edge_components(3))
local_target = fset([{101}, {102}, {103}, {104}, {11, 12, 13, 14, 21, 22, 23, 24}])
assert local_ccs == local_target

def test_local_subgraph_difference_directed():
dipaths = [(1, 2, 3, 4, 1), (1, 3, 1)]
G = nx.DiGraph(it.chain(*[pairwise(path) for path in dipaths]))

assert fset(nx.k_edge_components(G, k=1)) == fset(nx.k_edge_subgraphs(G, k=1))

# Unlike undirected graphs, when k=2, for directed graphs there is a case
# where the k-edge-ccs are not the same as the k-edge-subgraphs.
# (in directed graphs ccs and subgraphs are the same when k=2)
assert fset(nx.k_edge_components(G, k=2)) != fset(nx.k_edge_subgraphs(G, k=2))

assert fset(nx.k_edge_components(G, k=3)) == fset(nx.k_edge_subgraphs(G, k=3))

_check_edge_connectivity(G)

def test_triangles():
paths = [
(11, 12, 13, 11),  # first 3-clique
(21, 22, 23, 21),  # second 3-clique
(11, 21),  # connected by an edge
]
G = nx.Graph(it.chain(*[pairwise(path) for path in paths]))

# subgraph and ccs are the same in all cases here
assert fset(nx.k_edge_components(G, k=1)) == fset(nx.k_edge_subgraphs(G, k=1))

assert fset(nx.k_edge_components(G, k=2)) == fset(nx.k_edge_subgraphs(G, k=2))

assert fset(nx.k_edge_components(G, k=3)) == fset(nx.k_edge_subgraphs(G, k=3))

_check_edge_connectivity(G)

def test_four_clique():
paths = [
(11, 12, 13, 14, 11, 13, 14, 12),  # first 4-clique
(21, 22, 23, 24, 21, 23, 24, 22),  # second 4-clique
# paths connecting the 4 cliques such that they are
# 3-connected in G, but not in the subgraph.
# Case where the nodes bridging them do not have degree less than 3.
(100, 13),
(12, 100, 22),
(13, 200, 23),
(14, 300, 24),
]
G = nx.Graph(it.chain(*[pairwise(path) for path in paths]))

# The subgraphs and ccs are different for k=3
local_ccs = fset(nx.k_edge_components(G, k=3))
subgraphs = fset(nx.k_edge_subgraphs(G, k=3))
assert local_ccs != subgraphs

# The cliques ares in the same cc
clique1 = frozenset(paths)
clique2 = frozenset(paths)
assert clique1.union(clique2).union({100}) in local_ccs

# but different subgraphs
assert clique1 in subgraphs
assert clique2 in subgraphs

assert G.degree(100) == 3

_check_edge_connectivity(G)

def test_five_clique():
# Make a graph that can be disconnected less than 4 edges, but no node has
# degree less than 4.
G = nx.disjoint_union(nx.complete_graph(5), nx.complete_graph(5))
paths = [
(1, 100, 6),
(2, 100, 7),
(3, 200, 8),
(4, 200, 100),
]
G.add_edges_from(it.chain(*[pairwise(path) for path in paths]))
assert min(dict(nx.degree(G)).values()) == 4

# For k=3 they are the same
assert fset(nx.k_edge_components(G, k=3)) == fset(nx.k_edge_subgraphs(G, k=3))

# For k=4 they are the different
# the aux nodes are in the same CC as clique 1 but no the same subgraph
assert fset(nx.k_edge_components(G, k=4)) != fset(nx.k_edge_subgraphs(G, k=4))

# For k=5 they are not the same
assert fset(nx.k_edge_components(G, k=5)) != fset(nx.k_edge_subgraphs(G, k=5))

# For k=6 they are the same
assert fset(nx.k_edge_components(G, k=6)) == fset(nx.k_edge_subgraphs(G, k=6))
_check_edge_connectivity(G)

# ----------------
# Undirected tests
# ----------------

def test_directed_aux_graph():
# Graph similar to the one in
# http://journals.plos.org/plosone/article?id=10.1371/journal.pone.0136264
a, b, c, d, e, f, g, h, i = "abcdefghi"
dipaths = [
(a, d, b, f, c),
(a, e, b),
(a, e, b, c, g, b, a),
(c, b),
(f, g, f),
(h, i),
]
G = nx.DiGraph(it.chain(*[pairwise(path) for path in dipaths]))
aux_graph = EdgeComponentAuxGraph.construct(G)

components_1 = fset(aux_graph.k_edge_subgraphs(k=1))
target_1 = fset([{a, b, c, d, e, f, g}, {h}, {i}])
assert target_1 == components_1

# Check that the directed case for k=1 agrees with SCCs
alt_1 = fset(nx.strongly_connected_components(G))
assert alt_1 == components_1

components_2 = fset(aux_graph.k_edge_subgraphs(k=2))
target_2 = fset([{i}, {e}, {d}, {b, c, f, g}, {h}, {a}])
assert target_2 == components_2

components_3 = fset(aux_graph.k_edge_subgraphs(k=3))
target_3 = fset([{a}, {b}, {c}, {d}, {e}, {f}, {g}, {h}, {i}])
assert target_3 == components_3

def test_random_gnp_directed():
# seeds = [3894723670, 500186844, 267231174, 2181982262, 1116750056]
seeds = 
for seed in seeds:
G = nx.gnp_random_graph(20, 0.2, directed=True, seed=seed)
_check_edge_connectivity(G)

def test_configuration_directed():
# seeds = [671221681, 2403749451, 124433910, 672335939, 1193127215]
seeds = 
for seed in seeds:
deg_seq = nx.random_powerlaw_tree_sequence(20, seed=seed, tries=5000)
G = nx.DiGraph(nx.configuration_model(deg_seq, seed=seed))
G.remove_edges_from(nx.selfloop_edges(G))
_check_edge_connectivity(G)

def test_shell_directed():
# seeds = [3134027055, 4079264063, 1350769518, 1405643020, 530038094]
seeds = 
for seed in seeds:
constructor = [(12, 70, 0.8), (15, 40, 0.6)]
G = nx.random_shell_graph(constructor, seed=seed).to_directed()
_check_edge_connectivity(G)

def test_karate_directed():
G = nx.karate_club_graph().to_directed()
_check_edge_connectivity(G)
```