Mercurial > repos > shellac > sam_consensus_v3
diff env/lib/python3.9/site-packages/networkx/algorithms/flow/tests/test_maxflow_large_graph.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.9/site-packages/networkx/algorithms/flow/tests/test_maxflow_large_graph.py Mon Mar 22 18:12:50 2021 +0000 @@ -0,0 +1,146 @@ +"""Maximum flow algorithms test suite on large graphs. +""" + +import os +import pytest + +import networkx as nx +from networkx.algorithms.flow import build_flow_dict, build_residual_network +from networkx.algorithms.flow import boykov_kolmogorov +from networkx.algorithms.flow import dinitz +from networkx.algorithms.flow import edmonds_karp +from networkx.algorithms.flow import preflow_push +from networkx.algorithms.flow import shortest_augmenting_path +from networkx.testing import almost_equal + +flow_funcs = [ + boykov_kolmogorov, + dinitz, + edmonds_karp, + preflow_push, + shortest_augmenting_path, +] + + +def gen_pyramid(N): + # This graph admits a flow of value 1 for which every arc is at + # capacity (except the arcs incident to the sink which have + # infinite capacity). + G = nx.DiGraph() + + for i in range(N - 1): + cap = 1.0 / (i + 2) + for j in range(i + 1): + G.add_edge((i, j), (i + 1, j), capacity=cap) + cap = 1.0 / (i + 1) - cap + G.add_edge((i, j), (i + 1, j + 1), capacity=cap) + cap = 1.0 / (i + 2) - cap + + for j in range(N): + G.add_edge((N - 1, j), "t") + + return G + + +def read_graph(name): + dirname = os.path.dirname(__file__) + path = os.path.join(dirname, name + ".gpickle.bz2") + return nx.read_gpickle(path) + + +def validate_flows(G, s, t, soln_value, R, flow_func): + flow_value = R.graph["flow_value"] + flow_dict = build_flow_dict(G, R) + errmsg = f"Assertion failed in function: {flow_func.__name__}" + assert soln_value == flow_value, errmsg + assert set(G) == set(flow_dict), errmsg + for u in G: + assert set(G[u]) == set(flow_dict[u]), errmsg + excess = {u: 0 for u in flow_dict} + for u in flow_dict: + for v, flow in flow_dict[u].items(): + assert flow <= G[u][v].get("capacity", float("inf")), errmsg + assert flow >= 0, errmsg + excess[u] -= flow + excess[v] += flow + for u, exc in excess.items(): + if u == s: + assert exc == -soln_value, errmsg + elif u == t: + assert exc == soln_value, errmsg + else: + assert exc == 0, errmsg + + +class TestMaxflowLargeGraph: + def test_complete_graph(self): + N = 50 + G = nx.complete_graph(N) + nx.set_edge_attributes(G, 5, "capacity") + R = build_residual_network(G, "capacity") + kwargs = dict(residual=R) + + for flow_func in flow_funcs: + kwargs["flow_func"] = flow_func + errmsg = f"Assertion failed in function: {flow_func.__name__}" + flow_value = nx.maximum_flow_value(G, 1, 2, **kwargs) + assert flow_value == 5 * (N - 1), errmsg + + def test_pyramid(self): + N = 10 + # N = 100 # this gives a graph with 5051 nodes + G = gen_pyramid(N) + R = build_residual_network(G, "capacity") + kwargs = dict(residual=R) + + for flow_func in flow_funcs: + kwargs["flow_func"] = flow_func + errmsg = f"Assertion failed in function: {flow_func.__name__}" + flow_value = nx.maximum_flow_value(G, (0, 0), "t", **kwargs) + assert almost_equal(flow_value, 1.0), errmsg + + def test_gl1(self): + G = read_graph("gl1") + s = 1 + t = len(G) + R = build_residual_network(G, "capacity") + kwargs = dict(residual=R) + + # do one flow_func to save time + flow_func = flow_funcs[0] + validate_flows(G, s, t, 156545, flow_func(G, s, t, **kwargs), flow_func) + + # for flow_func in flow_funcs: + # validate_flows(G, s, t, 156545, flow_func(G, s, t, **kwargs), + # flow_func) + + @pytest.mark.slow + def test_gw1(self): + G = read_graph("gw1") + s = 1 + t = len(G) + R = build_residual_network(G, "capacity") + kwargs = dict(residual=R) + + for flow_func in flow_funcs: + validate_flows(G, s, t, 1202018, flow_func(G, s, t, **kwargs), flow_func) + + def test_wlm3(self): + G = read_graph("wlm3") + s = 1 + t = len(G) + R = build_residual_network(G, "capacity") + kwargs = dict(residual=R) + + # do one flow_func to save time + flow_func = flow_funcs[0] + validate_flows(G, s, t, 11875108, flow_func(G, s, t, **kwargs), flow_func) + + # for flow_func in flow_funcs: + # validate_flows(G, s, t, 11875108, flow_func(G, s, t, **kwargs), + # flow_func) + + def test_preflow_push_global_relabel(self): + G = read_graph("gw1") + R = preflow_push(G, 1, len(G), global_relabel_freq=50) + assert R.graph["flow_value"] == 1202018