diff env/lib/python3.9/site-packages/networkx/algorithms/flow/tests/test_maxflow_large_graph.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/networkx/algorithms/flow/tests/test_maxflow_large_graph.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,146 @@
+"""Maximum flow algorithms test suite on large graphs.
+"""
+
+import os
+import pytest
+
+import networkx as nx
+from networkx.algorithms.flow import build_flow_dict, build_residual_network
+from networkx.algorithms.flow import boykov_kolmogorov
+from networkx.algorithms.flow import dinitz
+from networkx.algorithms.flow import edmonds_karp
+from networkx.algorithms.flow import preflow_push
+from networkx.algorithms.flow import shortest_augmenting_path
+from networkx.testing import almost_equal
+
+flow_funcs = [
+    boykov_kolmogorov,
+    dinitz,
+    edmonds_karp,
+    preflow_push,
+    shortest_augmenting_path,
+]
+
+
+def gen_pyramid(N):
+    # This graph admits a flow of value 1 for which every arc is at
+    # capacity (except the arcs incident to the sink which have
+    # infinite capacity).
+    G = nx.DiGraph()
+
+    for i in range(N - 1):
+        cap = 1.0 / (i + 2)
+        for j in range(i + 1):
+            G.add_edge((i, j), (i + 1, j), capacity=cap)
+            cap = 1.0 / (i + 1) - cap
+            G.add_edge((i, j), (i + 1, j + 1), capacity=cap)
+            cap = 1.0 / (i + 2) - cap
+
+    for j in range(N):
+        G.add_edge((N - 1, j), "t")
+
+    return G
+
+
+def read_graph(name):
+    dirname = os.path.dirname(__file__)
+    path = os.path.join(dirname, name + ".gpickle.bz2")
+    return nx.read_gpickle(path)
+
+
+def validate_flows(G, s, t, soln_value, R, flow_func):
+    flow_value = R.graph["flow_value"]
+    flow_dict = build_flow_dict(G, R)
+    errmsg = f"Assertion failed in function: {flow_func.__name__}"
+    assert soln_value == flow_value, errmsg
+    assert set(G) == set(flow_dict), errmsg
+    for u in G:
+        assert set(G[u]) == set(flow_dict[u]), errmsg
+    excess = {u: 0 for u in flow_dict}
+    for u in flow_dict:
+        for v, flow in flow_dict[u].items():
+            assert flow <= G[u][v].get("capacity", float("inf")), errmsg
+            assert flow >= 0, errmsg
+            excess[u] -= flow
+            excess[v] += flow
+    for u, exc in excess.items():
+        if u == s:
+            assert exc == -soln_value, errmsg
+        elif u == t:
+            assert exc == soln_value, errmsg
+        else:
+            assert exc == 0, errmsg
+
+
+class TestMaxflowLargeGraph:
+    def test_complete_graph(self):
+        N = 50
+        G = nx.complete_graph(N)
+        nx.set_edge_attributes(G, 5, "capacity")
+        R = build_residual_network(G, "capacity")
+        kwargs = dict(residual=R)
+
+        for flow_func in flow_funcs:
+            kwargs["flow_func"] = flow_func
+            errmsg = f"Assertion failed in function: {flow_func.__name__}"
+            flow_value = nx.maximum_flow_value(G, 1, 2, **kwargs)
+            assert flow_value == 5 * (N - 1), errmsg
+
+    def test_pyramid(self):
+        N = 10
+        # N = 100 # this gives a graph with 5051 nodes
+        G = gen_pyramid(N)
+        R = build_residual_network(G, "capacity")
+        kwargs = dict(residual=R)
+
+        for flow_func in flow_funcs:
+            kwargs["flow_func"] = flow_func
+            errmsg = f"Assertion failed in function: {flow_func.__name__}"
+            flow_value = nx.maximum_flow_value(G, (0, 0), "t", **kwargs)
+            assert almost_equal(flow_value, 1.0), errmsg
+
+    def test_gl1(self):
+        G = read_graph("gl1")
+        s = 1
+        t = len(G)
+        R = build_residual_network(G, "capacity")
+        kwargs = dict(residual=R)
+
+        # do one flow_func to save time
+        flow_func = flow_funcs[0]
+        validate_flows(G, s, t, 156545, flow_func(G, s, t, **kwargs), flow_func)
+
+    #        for flow_func in flow_funcs:
+    #            validate_flows(G, s, t, 156545, flow_func(G, s, t, **kwargs),
+    #                           flow_func)
+
+    @pytest.mark.slow
+    def test_gw1(self):
+        G = read_graph("gw1")
+        s = 1
+        t = len(G)
+        R = build_residual_network(G, "capacity")
+        kwargs = dict(residual=R)
+
+        for flow_func in flow_funcs:
+            validate_flows(G, s, t, 1202018, flow_func(G, s, t, **kwargs), flow_func)
+
+    def test_wlm3(self):
+        G = read_graph("wlm3")
+        s = 1
+        t = len(G)
+        R = build_residual_network(G, "capacity")
+        kwargs = dict(residual=R)
+
+        # do one flow_func to save time
+        flow_func = flow_funcs[0]
+        validate_flows(G, s, t, 11875108, flow_func(G, s, t, **kwargs), flow_func)
+
+    #        for flow_func in flow_funcs:
+    #            validate_flows(G, s, t, 11875108, flow_func(G, s, t, **kwargs),
+    #                           flow_func)
+
+    def test_preflow_push_global_relabel(self):
+        G = read_graph("gw1")
+        R = preflow_push(G, 1, len(G), global_relabel_freq=50)
+        assert R.graph["flow_value"] == 1202018