diff env/lib/python3.9/site-packages/networkx/algorithms/flow/tests/test_maxflow.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/networkx/algorithms/flow/tests/test_maxflow.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,548 @@
+"""Maximum flow algorithms test suite.
+"""
+import pytest
+
+import networkx as nx
+from networkx.algorithms.flow import build_flow_dict, build_residual_network
+from networkx.algorithms.flow import boykov_kolmogorov
+from networkx.algorithms.flow import edmonds_karp
+from networkx.algorithms.flow import preflow_push
+from networkx.algorithms.flow import shortest_augmenting_path
+from networkx.algorithms.flow import dinitz
+
+flow_funcs = [
+    boykov_kolmogorov,
+    dinitz,
+    edmonds_karp,
+    preflow_push,
+    shortest_augmenting_path,
+]
+max_min_funcs = [nx.maximum_flow, nx.minimum_cut]
+flow_value_funcs = [nx.maximum_flow_value, nx.minimum_cut_value]
+interface_funcs = sum([max_min_funcs, flow_value_funcs], [])
+all_funcs = sum([flow_funcs, interface_funcs], [])
+
+
+def compute_cutset(G, partition):
+    reachable, non_reachable = partition
+    cutset = set()
+    for u, nbrs in ((n, G[n]) for n in reachable):
+        cutset.update((u, v) for v in nbrs if v in non_reachable)
+    return cutset
+
+
+def validate_flows(G, s, t, flowDict, solnValue, capacity, flow_func):
+    errmsg = f"Assertion failed in function: {flow_func.__name__}"
+    assert set(G) == set(flowDict), errmsg
+    for u in G:
+        assert set(G[u]) == set(flowDict[u]), errmsg
+    excess = {u: 0 for u in flowDict}
+    for u in flowDict:
+        for v, flow in flowDict[u].items():
+            if capacity in G[u][v]:
+                assert flow <= G[u][v][capacity]
+            assert flow >= 0, errmsg
+            excess[u] -= flow
+            excess[v] += flow
+    for u, exc in excess.items():
+        if u == s:
+            assert exc == -solnValue, errmsg
+        elif u == t:
+            assert exc == solnValue, errmsg
+        else:
+            assert exc == 0, errmsg
+
+
+def validate_cuts(G, s, t, solnValue, partition, capacity, flow_func):
+    errmsg = f"Assertion failed in function: {flow_func.__name__}"
+    assert all(n in G for n in partition[0]), errmsg
+    assert all(n in G for n in partition[1]), errmsg
+    cutset = compute_cutset(G, partition)
+    assert all(G.has_edge(u, v) for (u, v) in cutset), errmsg
+    assert solnValue == sum(G[u][v][capacity] for (u, v) in cutset), errmsg
+    H = G.copy()
+    H.remove_edges_from(cutset)
+    if not G.is_directed():
+        assert not nx.is_connected(H), errmsg
+    else:
+        assert not nx.is_strongly_connected(H), errmsg
+
+
+def compare_flows_and_cuts(G, s, t, solnFlows, solnValue, capacity="capacity"):
+    for flow_func in flow_funcs:
+        errmsg = f"Assertion failed in function: {flow_func.__name__}"
+        R = flow_func(G, s, t, capacity)
+        # Test both legacy and new implementations.
+        flow_value = R.graph["flow_value"]
+        flow_dict = build_flow_dict(G, R)
+        assert flow_value == solnValue, errmsg
+        validate_flows(G, s, t, flow_dict, solnValue, capacity, flow_func)
+        # Minimum cut
+        cut_value, partition = nx.minimum_cut(
+            G, s, t, capacity=capacity, flow_func=flow_func
+        )
+        validate_cuts(G, s, t, solnValue, partition, capacity, flow_func)
+
+
+class TestMaxflowMinCutCommon:
+    def test_graph1(self):
+        # Trivial undirected graph
+        G = nx.Graph()
+        G.add_edge(1, 2, capacity=1.0)
+
+        solnFlows = {1: {2: 1.0}, 2: {1: 1.0}}
+
+        compare_flows_and_cuts(G, 1, 2, solnFlows, 1.0)
+
+    def test_graph2(self):
+        # A more complex undirected graph
+        # adapted from www.topcoder.com/tc?module=Statc&d1=tutorials&d2=maxFlow
+        G = nx.Graph()
+        G.add_edge("x", "a", capacity=3.0)
+        G.add_edge("x", "b", capacity=1.0)
+        G.add_edge("a", "c", capacity=3.0)
+        G.add_edge("b", "c", capacity=5.0)
+        G.add_edge("b", "d", capacity=4.0)
+        G.add_edge("d", "e", capacity=2.0)
+        G.add_edge("c", "y", capacity=2.0)
+        G.add_edge("e", "y", capacity=3.0)
+
+        H = {
+            "x": {"a": 3, "b": 1},
+            "a": {"c": 3, "x": 3},
+            "b": {"c": 1, "d": 2, "x": 1},
+            "c": {"a": 3, "b": 1, "y": 2},
+            "d": {"b": 2, "e": 2},
+            "e": {"d": 2, "y": 2},
+            "y": {"c": 2, "e": 2},
+        }
+
+        compare_flows_and_cuts(G, "x", "y", H, 4.0)
+
+    def test_digraph1(self):
+        # The classic directed graph example
+        G = nx.DiGraph()
+        G.add_edge("a", "b", capacity=1000.0)
+        G.add_edge("a", "c", capacity=1000.0)
+        G.add_edge("b", "c", capacity=1.0)
+        G.add_edge("b", "d", capacity=1000.0)
+        G.add_edge("c", "d", capacity=1000.0)
+
+        H = {
+            "a": {"b": 1000.0, "c": 1000.0},
+            "b": {"c": 0, "d": 1000.0},
+            "c": {"d": 1000.0},
+            "d": {},
+        }
+
+        compare_flows_and_cuts(G, "a", "d", H, 2000.0)
+
+    def test_digraph2(self):
+        # An example in which some edges end up with zero flow.
+        G = nx.DiGraph()
+        G.add_edge("s", "b", capacity=2)
+        G.add_edge("s", "c", capacity=1)
+        G.add_edge("c", "d", capacity=1)
+        G.add_edge("d", "a", capacity=1)
+        G.add_edge("b", "a", capacity=2)
+        G.add_edge("a", "t", capacity=2)
+
+        H = {
+            "s": {"b": 2, "c": 0},
+            "c": {"d": 0},
+            "d": {"a": 0},
+            "b": {"a": 2},
+            "a": {"t": 2},
+            "t": {},
+        }
+
+        compare_flows_and_cuts(G, "s", "t", H, 2)
+
+    def test_digraph3(self):
+        # A directed graph example from Cormen et al.
+        G = nx.DiGraph()
+        G.add_edge("s", "v1", capacity=16.0)
+        G.add_edge("s", "v2", capacity=13.0)
+        G.add_edge("v1", "v2", capacity=10.0)
+        G.add_edge("v2", "v1", capacity=4.0)
+        G.add_edge("v1", "v3", capacity=12.0)
+        G.add_edge("v3", "v2", capacity=9.0)
+        G.add_edge("v2", "v4", capacity=14.0)
+        G.add_edge("v4", "v3", capacity=7.0)
+        G.add_edge("v3", "t", capacity=20.0)
+        G.add_edge("v4", "t", capacity=4.0)
+
+        H = {
+            "s": {"v1": 12.0, "v2": 11.0},
+            "v2": {"v1": 0, "v4": 11.0},
+            "v1": {"v2": 0, "v3": 12.0},
+            "v3": {"v2": 0, "t": 19.0},
+            "v4": {"v3": 7.0, "t": 4.0},
+            "t": {},
+        }
+
+        compare_flows_and_cuts(G, "s", "t", H, 23.0)
+
+    def test_digraph4(self):
+        # A more complex directed graph
+        # from www.topcoder.com/tc?module=Statc&d1=tutorials&d2=maxFlow
+        G = nx.DiGraph()
+        G.add_edge("x", "a", capacity=3.0)
+        G.add_edge("x", "b", capacity=1.0)
+        G.add_edge("a", "c", capacity=3.0)
+        G.add_edge("b", "c", capacity=5.0)
+        G.add_edge("b", "d", capacity=4.0)
+        G.add_edge("d", "e", capacity=2.0)
+        G.add_edge("c", "y", capacity=2.0)
+        G.add_edge("e", "y", capacity=3.0)
+
+        H = {
+            "x": {"a": 2.0, "b": 1.0},
+            "a": {"c": 2.0},
+            "b": {"c": 0, "d": 1.0},
+            "c": {"y": 2.0},
+            "d": {"e": 1.0},
+            "e": {"y": 1.0},
+            "y": {},
+        }
+
+        compare_flows_and_cuts(G, "x", "y", H, 3.0)
+
+    def test_wikipedia_dinitz_example(self):
+        # Nice example from https://en.wikipedia.org/wiki/Dinic's_algorithm
+        G = nx.DiGraph()
+        G.add_edge("s", 1, capacity=10)
+        G.add_edge("s", 2, capacity=10)
+        G.add_edge(1, 3, capacity=4)
+        G.add_edge(1, 4, capacity=8)
+        G.add_edge(1, 2, capacity=2)
+        G.add_edge(2, 4, capacity=9)
+        G.add_edge(3, "t", capacity=10)
+        G.add_edge(4, 3, capacity=6)
+        G.add_edge(4, "t", capacity=10)
+
+        solnFlows = {
+            1: {2: 0, 3: 4, 4: 6},
+            2: {4: 9},
+            3: {"t": 9},
+            4: {3: 5, "t": 10},
+            "s": {1: 10, 2: 9},
+            "t": {},
+        }
+
+        compare_flows_and_cuts(G, "s", "t", solnFlows, 19)
+
+    def test_optional_capacity(self):
+        # Test optional capacity parameter.
+        G = nx.DiGraph()
+        G.add_edge("x", "a", spam=3.0)
+        G.add_edge("x", "b", spam=1.0)
+        G.add_edge("a", "c", spam=3.0)
+        G.add_edge("b", "c", spam=5.0)
+        G.add_edge("b", "d", spam=4.0)
+        G.add_edge("d", "e", spam=2.0)
+        G.add_edge("c", "y", spam=2.0)
+        G.add_edge("e", "y", spam=3.0)
+
+        solnFlows = {
+            "x": {"a": 2.0, "b": 1.0},
+            "a": {"c": 2.0},
+            "b": {"c": 0, "d": 1.0},
+            "c": {"y": 2.0},
+            "d": {"e": 1.0},
+            "e": {"y": 1.0},
+            "y": {},
+        }
+        solnValue = 3.0
+        s = "x"
+        t = "y"
+
+        compare_flows_and_cuts(G, s, t, solnFlows, solnValue, capacity="spam")
+
+    def test_digraph_infcap_edges(self):
+        # DiGraph with infinite capacity edges
+        G = nx.DiGraph()
+        G.add_edge("s", "a")
+        G.add_edge("s", "b", capacity=30)
+        G.add_edge("a", "c", capacity=25)
+        G.add_edge("b", "c", capacity=12)
+        G.add_edge("a", "t", capacity=60)
+        G.add_edge("c", "t")
+
+        H = {
+            "s": {"a": 85, "b": 12},
+            "a": {"c": 25, "t": 60},
+            "b": {"c": 12},
+            "c": {"t": 37},
+            "t": {},
+        }
+
+        compare_flows_and_cuts(G, "s", "t", H, 97)
+
+        # DiGraph with infinite capacity digon
+        G = nx.DiGraph()
+        G.add_edge("s", "a", capacity=85)
+        G.add_edge("s", "b", capacity=30)
+        G.add_edge("a", "c")
+        G.add_edge("c", "a")
+        G.add_edge("b", "c", capacity=12)
+        G.add_edge("a", "t", capacity=60)
+        G.add_edge("c", "t", capacity=37)
+
+        H = {
+            "s": {"a": 85, "b": 12},
+            "a": {"c": 25, "t": 60},
+            "c": {"a": 0, "t": 37},
+            "b": {"c": 12},
+            "t": {},
+        }
+
+        compare_flows_and_cuts(G, "s", "t", H, 97)
+
+    def test_digraph_infcap_path(self):
+        # Graph with infinite capacity (s, t)-path
+        G = nx.DiGraph()
+        G.add_edge("s", "a")
+        G.add_edge("s", "b", capacity=30)
+        G.add_edge("a", "c")
+        G.add_edge("b", "c", capacity=12)
+        G.add_edge("a", "t", capacity=60)
+        G.add_edge("c", "t")
+
+        for flow_func in all_funcs:
+            pytest.raises(nx.NetworkXUnbounded, flow_func, G, "s", "t")
+
+    def test_graph_infcap_edges(self):
+        # Undirected graph with infinite capacity edges
+        G = nx.Graph()
+        G.add_edge("s", "a")
+        G.add_edge("s", "b", capacity=30)
+        G.add_edge("a", "c", capacity=25)
+        G.add_edge("b", "c", capacity=12)
+        G.add_edge("a", "t", capacity=60)
+        G.add_edge("c", "t")
+
+        H = {
+            "s": {"a": 85, "b": 12},
+            "a": {"c": 25, "s": 85, "t": 60},
+            "b": {"c": 12, "s": 12},
+            "c": {"a": 25, "b": 12, "t": 37},
+            "t": {"a": 60, "c": 37},
+        }
+
+        compare_flows_and_cuts(G, "s", "t", H, 97)
+
+    def test_digraph5(self):
+        # From ticket #429 by mfrasca.
+        G = nx.DiGraph()
+        G.add_edge("s", "a", capacity=2)
+        G.add_edge("s", "b", capacity=2)
+        G.add_edge("a", "b", capacity=5)
+        G.add_edge("a", "t", capacity=1)
+        G.add_edge("b", "a", capacity=1)
+        G.add_edge("b", "t", capacity=3)
+        flowSoln = {
+            "a": {"b": 1, "t": 1},
+            "b": {"a": 0, "t": 3},
+            "s": {"a": 2, "b": 2},
+            "t": {},
+        }
+        compare_flows_and_cuts(G, "s", "t", flowSoln, 4)
+
+    def test_disconnected(self):
+        G = nx.Graph()
+        G.add_weighted_edges_from([(0, 1, 1), (1, 2, 1), (2, 3, 1)], weight="capacity")
+        G.remove_node(1)
+        assert nx.maximum_flow_value(G, 0, 3) == 0
+        flowSoln = {0: {}, 2: {3: 0}, 3: {2: 0}}
+        compare_flows_and_cuts(G, 0, 3, flowSoln, 0)
+
+    def test_source_target_not_in_graph(self):
+        G = nx.Graph()
+        G.add_weighted_edges_from([(0, 1, 1), (1, 2, 1), (2, 3, 1)], weight="capacity")
+        G.remove_node(0)
+        for flow_func in all_funcs:
+            pytest.raises(nx.NetworkXError, flow_func, G, 0, 3)
+        G.add_weighted_edges_from([(0, 1, 1), (1, 2, 1), (2, 3, 1)], weight="capacity")
+        G.remove_node(3)
+        for flow_func in all_funcs:
+            pytest.raises(nx.NetworkXError, flow_func, G, 0, 3)
+
+    def test_source_target_coincide(self):
+        G = nx.Graph()
+        G.add_node(0)
+        for flow_func in all_funcs:
+            pytest.raises(nx.NetworkXError, flow_func, G, 0, 0)
+
+    def test_multigraphs_raise(self):
+        G = nx.MultiGraph()
+        M = nx.MultiDiGraph()
+        G.add_edges_from([(0, 1), (1, 0)], capacity=True)
+        for flow_func in all_funcs:
+            pytest.raises(nx.NetworkXError, flow_func, G, 0, 0)
+
+
+class TestMaxFlowMinCutInterface:
+    def setup(self):
+        G = nx.DiGraph()
+        G.add_edge("x", "a", capacity=3.0)
+        G.add_edge("x", "b", capacity=1.0)
+        G.add_edge("a", "c", capacity=3.0)
+        G.add_edge("b", "c", capacity=5.0)
+        G.add_edge("b", "d", capacity=4.0)
+        G.add_edge("d", "e", capacity=2.0)
+        G.add_edge("c", "y", capacity=2.0)
+        G.add_edge("e", "y", capacity=3.0)
+        self.G = G
+        H = nx.DiGraph()
+        H.add_edge(0, 1, capacity=1.0)
+        H.add_edge(1, 2, capacity=1.0)
+        self.H = H
+
+    def test_flow_func_not_callable(self):
+        elements = ["this_should_be_callable", 10, {1, 2, 3}]
+        G = nx.Graph()
+        G.add_weighted_edges_from([(0, 1, 1), (1, 2, 1), (2, 3, 1)], weight="capacity")
+        for flow_func in interface_funcs:
+            for element in elements:
+                pytest.raises(nx.NetworkXError, flow_func, G, 0, 1, flow_func=element)
+                pytest.raises(nx.NetworkXError, flow_func, G, 0, 1, flow_func=element)
+
+    def test_flow_func_parameters(self):
+        G = self.G
+        fv = 3.0
+        for interface_func in interface_funcs:
+            for flow_func in flow_funcs:
+                errmsg = (
+                    f"Assertion failed in function: {flow_func.__name__} "
+                    f"in interface {interface_func.__name__}"
+                )
+                result = interface_func(G, "x", "y", flow_func=flow_func)
+                if interface_func in max_min_funcs:
+                    result = result[0]
+                assert fv == result, errmsg
+
+    def test_minimum_cut_no_cutoff(self):
+        G = self.G
+        for flow_func in flow_funcs:
+            pytest.raises(
+                nx.NetworkXError,
+                nx.minimum_cut,
+                G,
+                "x",
+                "y",
+                flow_func=flow_func,
+                cutoff=1.0,
+            )
+            pytest.raises(
+                nx.NetworkXError,
+                nx.minimum_cut_value,
+                G,
+                "x",
+                "y",
+                flow_func=flow_func,
+                cutoff=1.0,
+            )
+
+    def test_kwargs(self):
+        G = self.H
+        fv = 1.0
+        to_test = (
+            (shortest_augmenting_path, dict(two_phase=True)),
+            (preflow_push, dict(global_relabel_freq=5)),
+        )
+        for interface_func in interface_funcs:
+            for flow_func, kwargs in to_test:
+                errmsg = (
+                    f"Assertion failed in function: {flow_func.__name__} "
+                    f"in interface {interface_func.__name__}"
+                )
+                result = interface_func(G, 0, 2, flow_func=flow_func, **kwargs)
+                if interface_func in max_min_funcs:
+                    result = result[0]
+                assert fv == result, errmsg
+
+    def test_kwargs_default_flow_func(self):
+        G = self.H
+        for interface_func in interface_funcs:
+            pytest.raises(
+                nx.NetworkXError, interface_func, G, 0, 1, global_relabel_freq=2
+            )
+
+    def test_reusing_residual(self):
+        G = self.G
+        fv = 3.0
+        s, t = "x", "y"
+        R = build_residual_network(G, "capacity")
+        for interface_func in interface_funcs:
+            for flow_func in flow_funcs:
+                errmsg = (
+                    f"Assertion failed in function: {flow_func.__name__} "
+                    f"in interface {interface_func.__name__}"
+                )
+                for i in range(3):
+                    result = interface_func(
+                        G, "x", "y", flow_func=flow_func, residual=R
+                    )
+                    if interface_func in max_min_funcs:
+                        result = result[0]
+                    assert fv == result, errmsg
+
+
+# Tests specific to one algorithm
+def test_preflow_push_global_relabel_freq():
+    G = nx.DiGraph()
+    G.add_edge(1, 2, capacity=1)
+    R = preflow_push(G, 1, 2, global_relabel_freq=None)
+    assert R.graph["flow_value"] == 1
+    pytest.raises(nx.NetworkXError, preflow_push, G, 1, 2, global_relabel_freq=-1)
+
+
+def test_preflow_push_makes_enough_space():
+    # From ticket #1542
+    G = nx.DiGraph()
+    nx.add_path(G, [0, 1, 3], capacity=1)
+    nx.add_path(G, [1, 2, 3], capacity=1)
+    R = preflow_push(G, 0, 3, value_only=False)
+    assert R.graph["flow_value"] == 1
+
+
+def test_shortest_augmenting_path_two_phase():
+    k = 5
+    p = 1000
+    G = nx.DiGraph()
+    for i in range(k):
+        G.add_edge("s", (i, 0), capacity=1)
+        nx.add_path(G, ((i, j) for j in range(p)), capacity=1)
+        G.add_edge((i, p - 1), "t", capacity=1)
+    R = shortest_augmenting_path(G, "s", "t", two_phase=True)
+    assert R.graph["flow_value"] == k
+    R = shortest_augmenting_path(G, "s", "t", two_phase=False)
+    assert R.graph["flow_value"] == k
+
+
+class TestCutoff:
+    def test_cutoff(self):
+        k = 5
+        p = 1000
+        G = nx.DiGraph()
+        for i in range(k):
+            G.add_edge("s", (i, 0), capacity=2)
+            nx.add_path(G, ((i, j) for j in range(p)), capacity=2)
+            G.add_edge((i, p - 1), "t", capacity=2)
+        R = shortest_augmenting_path(G, "s", "t", two_phase=True, cutoff=k)
+        assert k <= R.graph["flow_value"] <= (2 * k)
+        R = shortest_augmenting_path(G, "s", "t", two_phase=False, cutoff=k)
+        assert k <= R.graph["flow_value"] <= (2 * k)
+        R = edmonds_karp(G, "s", "t", cutoff=k)
+        assert k <= R.graph["flow_value"] <= (2 * k)
+
+    def test_complete_graph_cutoff(self):
+        G = nx.complete_graph(5)
+        nx.set_edge_attributes(G, {(u, v): 1 for u, v in G.edges()}, "capacity")
+        for flow_func in [shortest_augmenting_path, edmonds_karp]:
+            for cutoff in [3, 2, 1]:
+                result = nx.maximum_flow_value(
+                    G, 0, 4, flow_func=flow_func, cutoff=cutoff
+                )
+                assert cutoff == result, f"cutoff error in {flow_func.__name__}"