view env/lib/python3.9/site-packages/networkx/algorithms/connectivity/tests/test_edge_kcomponents.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line source

import networkx as nx
import itertools as it
import pytest
from networkx.utils import pairwise
from networkx.algorithms.connectivity import bridge_components, EdgeComponentAuxGraph
from networkx.algorithms.connectivity.edge_kcomponents import general_k_edge_subgraphs


# ----------------
# Helper functions
# ----------------


def fset(list_of_sets):
    """ allows == to be used for list of sets """
    return set(map(frozenset, list_of_sets))


def _assert_subgraph_edge_connectivity(G, ccs_subgraph, k):
    """
    tests properties of k-edge-connected subgraphs

    the actual edge connectivity should be no less than k unless the cc is a
    single node.
    """
    for cc in ccs_subgraph:
        C = G.subgraph(cc)
        if len(cc) > 1:
            connectivity = nx.edge_connectivity(C)
            assert connectivity >= k


def _memo_connectivity(G, u, v, memo):
    edge = (u, v)
    if edge in memo:
        return memo[edge]
    if not G.is_directed():
        redge = (v, u)
        if redge in memo:
            return memo[redge]
    memo[edge] = nx.edge_connectivity(G, *edge)
    return memo[edge]


def _all_pairs_connectivity(G, cc, k, memo):
    # Brute force check
    for u, v in it.combinations(cc, 2):
        # Use a memoization dict to save on computation
        connectivity = _memo_connectivity(G, u, v, memo)
        if G.is_directed():
            connectivity = min(connectivity, _memo_connectivity(G, v, u, memo))
        assert connectivity >= k


def _assert_local_cc_edge_connectivity(G, ccs_local, k, memo):
    """
    tests properties of k-edge-connected components

    the local edge connectivity between each pair of nodes in the the original
    graph should be no less than k unless the cc is a single node.
    """
    for cc in ccs_local:
        if len(cc) > 1:
            # Strategy for testing a bit faster: If the subgraph has high edge
            # connectivity then it must have local connectivity
            C = G.subgraph(cc)
            connectivity = nx.edge_connectivity(C)
            if connectivity < k:
                # Otherwise do the brute force (with memoization) check
                _all_pairs_connectivity(G, cc, k, memo)


# Helper function
def _check_edge_connectivity(G):
    """
    Helper - generates all k-edge-components using the aux graph.  Checks the
    both local and subgraph edge connectivity of each cc. Also checks that
    alternate methods of computing the k-edge-ccs generate the same result.
    """
    # Construct the auxiliary graph that can be used to make each k-cc or k-sub
    aux_graph = EdgeComponentAuxGraph.construct(G)

    # memoize the local connectivity in this graph
    memo = {}

    for k in it.count(1):
        # Test "local" k-edge-components and k-edge-subgraphs
        ccs_local = fset(aux_graph.k_edge_components(k))
        ccs_subgraph = fset(aux_graph.k_edge_subgraphs(k))

        # Check connectivity properties that should be garuenteed by the
        # algorithms.
        _assert_local_cc_edge_connectivity(G, ccs_local, k, memo)
        _assert_subgraph_edge_connectivity(G, ccs_subgraph, k)

        if k == 1 or k == 2 and not G.is_directed():
            assert (
                ccs_local == ccs_subgraph
            ), "Subgraphs and components should be the same when k == 1 or (k == 2 and not G.directed())"

        if G.is_directed():
            # Test special case methods are the same as the aux graph
            if k == 1:
                alt_sccs = fset(nx.strongly_connected_components(G))
                assert alt_sccs == ccs_local, "k=1 failed alt"
                assert alt_sccs == ccs_subgraph, "k=1 failed alt"
        else:
            # Test special case methods are the same as the aux graph
            if k == 1:
                alt_ccs = fset(nx.connected_components(G))
                assert alt_ccs == ccs_local, "k=1 failed alt"
                assert alt_ccs == ccs_subgraph, "k=1 failed alt"
            elif k == 2:
                alt_bridge_ccs = fset(bridge_components(G))
                assert alt_bridge_ccs == ccs_local, "k=2 failed alt"
                assert alt_bridge_ccs == ccs_subgraph, "k=2 failed alt"
            # if new methods for k == 3 or k == 4 are implemented add them here

        # Check the general subgraph method works by itself
        alt_subgraph_ccs = fset(
            [set(C.nodes()) for C in general_k_edge_subgraphs(G, k=k)]
        )
        assert alt_subgraph_ccs == ccs_subgraph, "alt subgraph method failed"

        # Stop once k is larger than all special case methods
        # and we cannot break down ccs any further.
        if k > 2 and all(len(cc) == 1 for cc in ccs_local):
            break


# ----------------
# Misc tests
# ----------------


def test_zero_k_exception():
    G = nx.Graph()
    # functions that return generators error immediately
    pytest.raises(ValueError, nx.k_edge_components, G, k=0)
    pytest.raises(ValueError, nx.k_edge_subgraphs, G, k=0)

    # actual generators only error when you get the first item
    aux_graph = EdgeComponentAuxGraph.construct(G)
    pytest.raises(ValueError, list, aux_graph.k_edge_components(k=0))
    pytest.raises(ValueError, list, aux_graph.k_edge_subgraphs(k=0))

    pytest.raises(ValueError, list, general_k_edge_subgraphs(G, k=0))


def test_empty_input():
    G = nx.Graph()
    assert [] == list(nx.k_edge_components(G, k=5))
    assert [] == list(nx.k_edge_subgraphs(G, k=5))

    G = nx.DiGraph()
    assert [] == list(nx.k_edge_components(G, k=5))
    assert [] == list(nx.k_edge_subgraphs(G, k=5))


def test_not_implemented():
    G = nx.MultiGraph()
    pytest.raises(nx.NetworkXNotImplemented, EdgeComponentAuxGraph.construct, G)
    pytest.raises(nx.NetworkXNotImplemented, nx.k_edge_components, G, k=2)
    pytest.raises(nx.NetworkXNotImplemented, nx.k_edge_subgraphs, G, k=2)
    pytest.raises(nx.NetworkXNotImplemented, bridge_components, G)
    pytest.raises(nx.NetworkXNotImplemented, bridge_components, nx.DiGraph())


def test_general_k_edge_subgraph_quick_return():
    # tests quick return optimization
    G = nx.Graph()
    G.add_node(0)
    subgraphs = list(general_k_edge_subgraphs(G, k=1))
    assert len(subgraphs) == 1
    for subgraph in subgraphs:
        assert subgraph.number_of_nodes() == 1

    G.add_node(1)
    subgraphs = list(general_k_edge_subgraphs(G, k=1))
    assert len(subgraphs) == 2
    for subgraph in subgraphs:
        assert subgraph.number_of_nodes() == 1


# ----------------
# Undirected tests
# ----------------


def test_random_gnp():
    # seeds = [1550709854, 1309423156, 4208992358, 2785630813, 1915069929]
    seeds = [12, 13]

    for seed in seeds:
        G = nx.gnp_random_graph(20, 0.2, seed=seed)
        _check_edge_connectivity(G)


def test_configuration():
    # seeds = [2718183590, 2470619828, 1694705158, 3001036531, 2401251497]
    seeds = [14, 15]
    for seed in seeds:
        deg_seq = nx.random_powerlaw_tree_sequence(20, seed=seed, tries=5000)
        G = nx.Graph(nx.configuration_model(deg_seq, seed=seed))
        G.remove_edges_from(nx.selfloop_edges(G))
        _check_edge_connectivity(G)


def test_shell():
    # seeds = [2057382236, 3331169846, 1840105863, 476020778, 2247498425]
    seeds = [20]
    for seed in seeds:
        constructor = [(12, 70, 0.8), (15, 40, 0.6)]
        G = nx.random_shell_graph(constructor, seed=seed)
        _check_edge_connectivity(G)


def test_karate():
    G = nx.karate_club_graph()
    _check_edge_connectivity(G)


def test_tarjan_bridge():
    # graph from tarjan paper
    # RE Tarjan - "A note on finding the bridges of a graph"
    # Information Processing Letters, 1974 - Elsevier
    # doi:10.1016/0020-0190(74)90003-9.
    # define 2-connected components and bridges
    ccs = [
        (1, 2, 4, 3, 1, 4),
        (5, 6, 7, 5),
        (8, 9, 10, 8),
        (17, 18, 16, 15, 17),
        (11, 12, 14, 13, 11, 14),
    ]
    bridges = [(4, 8), (3, 5), (3, 17)]
    G = nx.Graph(it.chain(*(pairwise(path) for path in ccs + bridges)))
    _check_edge_connectivity(G)


def test_bridge_cc():
    # define 2-connected components and bridges
    cc2 = [(1, 2, 4, 3, 1, 4), (8, 9, 10, 8), (11, 12, 13, 11)]
    bridges = [(4, 8), (3, 5), (20, 21), (22, 23, 24)]
    G = nx.Graph(it.chain(*(pairwise(path) for path in cc2 + bridges)))
    bridge_ccs = fset(bridge_components(G))
    target_ccs = fset(
        [{1, 2, 3, 4}, {5}, {8, 9, 10}, {11, 12, 13}, {20}, {21}, {22}, {23}, {24}]
    )
    assert bridge_ccs == target_ccs
    _check_edge_connectivity(G)


def test_undirected_aux_graph():
    # Graph similar to the one in
    # http://journals.plos.org/plosone/article?id=10.1371/journal.pone.0136264
    a, b, c, d, e, f, g, h, i = "abcdefghi"
    paths = [
        (a, d, b, f, c),
        (a, e, b),
        (a, e, b, c, g, b, a),
        (c, b),
        (f, g, f),
        (h, i),
    ]
    G = nx.Graph(it.chain(*[pairwise(path) for path in paths]))
    aux_graph = EdgeComponentAuxGraph.construct(G)

    components_1 = fset(aux_graph.k_edge_subgraphs(k=1))
    target_1 = fset([{a, b, c, d, e, f, g}, {h, i}])
    assert target_1 == components_1

    # Check that the undirected case for k=1 agrees with CCs
    alt_1 = fset(nx.k_edge_subgraphs(G, k=1))
    assert alt_1 == components_1

    components_2 = fset(aux_graph.k_edge_subgraphs(k=2))
    target_2 = fset([{a, b, c, d, e, f, g}, {h}, {i}])
    assert target_2 == components_2

    # Check that the undirected case for k=2 agrees with bridge components
    alt_2 = fset(nx.k_edge_subgraphs(G, k=2))
    assert alt_2 == components_2

    components_3 = fset(aux_graph.k_edge_subgraphs(k=3))
    target_3 = fset([{a}, {b, c, f, g}, {d}, {e}, {h}, {i}])
    assert target_3 == components_3

    components_4 = fset(aux_graph.k_edge_subgraphs(k=4))
    target_4 = fset([{a}, {b}, {c}, {d}, {e}, {f}, {g}, {h}, {i}])
    assert target_4 == components_4

    _check_edge_connectivity(G)


def test_local_subgraph_difference():
    paths = [
        (11, 12, 13, 14, 11, 13, 14, 12),  # first 4-clique
        (21, 22, 23, 24, 21, 23, 24, 22),  # second 4-clique
        # paths connecting each node of the 4 cliques
        (11, 101, 21),
        (12, 102, 22),
        (13, 103, 23),
        (14, 104, 24),
    ]
    G = nx.Graph(it.chain(*[pairwise(path) for path in paths]))
    aux_graph = EdgeComponentAuxGraph.construct(G)

    # Each clique is returned separately in k-edge-subgraphs
    subgraph_ccs = fset(aux_graph.k_edge_subgraphs(3))
    subgraph_target = fset(
        [{101}, {102}, {103}, {104}, {21, 22, 23, 24}, {11, 12, 13, 14}]
    )
    assert subgraph_ccs == subgraph_target

    # But in k-edge-ccs they are returned together
    # because they are locally 3-edge-connected
    local_ccs = fset(aux_graph.k_edge_components(3))
    local_target = fset([{101}, {102}, {103}, {104}, {11, 12, 13, 14, 21, 22, 23, 24}])
    assert local_ccs == local_target


def test_local_subgraph_difference_directed():
    dipaths = [(1, 2, 3, 4, 1), (1, 3, 1)]
    G = nx.DiGraph(it.chain(*[pairwise(path) for path in dipaths]))

    assert fset(nx.k_edge_components(G, k=1)) == fset(nx.k_edge_subgraphs(G, k=1))

    # Unlike undirected graphs, when k=2, for directed graphs there is a case
    # where the k-edge-ccs are not the same as the k-edge-subgraphs.
    # (in directed graphs ccs and subgraphs are the same when k=2)
    assert fset(nx.k_edge_components(G, k=2)) != fset(nx.k_edge_subgraphs(G, k=2))

    assert fset(nx.k_edge_components(G, k=3)) == fset(nx.k_edge_subgraphs(G, k=3))

    _check_edge_connectivity(G)


def test_triangles():
    paths = [
        (11, 12, 13, 11),  # first 3-clique
        (21, 22, 23, 21),  # second 3-clique
        (11, 21),  # connected by an edge
    ]
    G = nx.Graph(it.chain(*[pairwise(path) for path in paths]))

    # subgraph and ccs are the same in all cases here
    assert fset(nx.k_edge_components(G, k=1)) == fset(nx.k_edge_subgraphs(G, k=1))

    assert fset(nx.k_edge_components(G, k=2)) == fset(nx.k_edge_subgraphs(G, k=2))

    assert fset(nx.k_edge_components(G, k=3)) == fset(nx.k_edge_subgraphs(G, k=3))

    _check_edge_connectivity(G)


def test_four_clique():
    paths = [
        (11, 12, 13, 14, 11, 13, 14, 12),  # first 4-clique
        (21, 22, 23, 24, 21, 23, 24, 22),  # second 4-clique
        # paths connecting the 4 cliques such that they are
        # 3-connected in G, but not in the subgraph.
        # Case where the nodes bridging them do not have degree less than 3.
        (100, 13),
        (12, 100, 22),
        (13, 200, 23),
        (14, 300, 24),
    ]
    G = nx.Graph(it.chain(*[pairwise(path) for path in paths]))

    # The subgraphs and ccs are different for k=3
    local_ccs = fset(nx.k_edge_components(G, k=3))
    subgraphs = fset(nx.k_edge_subgraphs(G, k=3))
    assert local_ccs != subgraphs

    # The cliques ares in the same cc
    clique1 = frozenset(paths[0])
    clique2 = frozenset(paths[1])
    assert clique1.union(clique2).union({100}) in local_ccs

    # but different subgraphs
    assert clique1 in subgraphs
    assert clique2 in subgraphs

    assert G.degree(100) == 3

    _check_edge_connectivity(G)


def test_five_clique():
    # Make a graph that can be disconnected less than 4 edges, but no node has
    # degree less than 4.
    G = nx.disjoint_union(nx.complete_graph(5), nx.complete_graph(5))
    paths = [
        # add aux-connections
        (1, 100, 6),
        (2, 100, 7),
        (3, 200, 8),
        (4, 200, 100),
    ]
    G.add_edges_from(it.chain(*[pairwise(path) for path in paths]))
    assert min(dict(nx.degree(G)).values()) == 4

    # For k=3 they are the same
    assert fset(nx.k_edge_components(G, k=3)) == fset(nx.k_edge_subgraphs(G, k=3))

    # For k=4 they are the different
    # the aux nodes are in the same CC as clique 1 but no the same subgraph
    assert fset(nx.k_edge_components(G, k=4)) != fset(nx.k_edge_subgraphs(G, k=4))

    # For k=5 they are not the same
    assert fset(nx.k_edge_components(G, k=5)) != fset(nx.k_edge_subgraphs(G, k=5))

    # For k=6 they are the same
    assert fset(nx.k_edge_components(G, k=6)) == fset(nx.k_edge_subgraphs(G, k=6))
    _check_edge_connectivity(G)


# ----------------
# Undirected tests
# ----------------


def test_directed_aux_graph():
    # Graph similar to the one in
    # http://journals.plos.org/plosone/article?id=10.1371/journal.pone.0136264
    a, b, c, d, e, f, g, h, i = "abcdefghi"
    dipaths = [
        (a, d, b, f, c),
        (a, e, b),
        (a, e, b, c, g, b, a),
        (c, b),
        (f, g, f),
        (h, i),
    ]
    G = nx.DiGraph(it.chain(*[pairwise(path) for path in dipaths]))
    aux_graph = EdgeComponentAuxGraph.construct(G)

    components_1 = fset(aux_graph.k_edge_subgraphs(k=1))
    target_1 = fset([{a, b, c, d, e, f, g}, {h}, {i}])
    assert target_1 == components_1

    # Check that the directed case for k=1 agrees with SCCs
    alt_1 = fset(nx.strongly_connected_components(G))
    assert alt_1 == components_1

    components_2 = fset(aux_graph.k_edge_subgraphs(k=2))
    target_2 = fset([{i}, {e}, {d}, {b, c, f, g}, {h}, {a}])
    assert target_2 == components_2

    components_3 = fset(aux_graph.k_edge_subgraphs(k=3))
    target_3 = fset([{a}, {b}, {c}, {d}, {e}, {f}, {g}, {h}, {i}])
    assert target_3 == components_3


def test_random_gnp_directed():
    # seeds = [3894723670, 500186844, 267231174, 2181982262, 1116750056]
    seeds = [21]
    for seed in seeds:
        G = nx.gnp_random_graph(20, 0.2, directed=True, seed=seed)
        _check_edge_connectivity(G)


def test_configuration_directed():
    # seeds = [671221681, 2403749451, 124433910, 672335939, 1193127215]
    seeds = [67]
    for seed in seeds:
        deg_seq = nx.random_powerlaw_tree_sequence(20, seed=seed, tries=5000)
        G = nx.DiGraph(nx.configuration_model(deg_seq, seed=seed))
        G.remove_edges_from(nx.selfloop_edges(G))
        _check_edge_connectivity(G)


def test_shell_directed():
    # seeds = [3134027055, 4079264063, 1350769518, 1405643020, 530038094]
    seeds = [31]
    for seed in seeds:
        constructor = [(12, 70, 0.8), (15, 40, 0.6)]
        G = nx.random_shell_graph(constructor, seed=seed).to_directed()
        _check_edge_connectivity(G)


def test_karate_directed():
    G = nx.karate_club_graph().to_directed()
    _check_edge_connectivity(G)