comparison env/lib/python3.9/site-packages/networkx/algorithms/flow/tests/test_maxflow.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """Maximum flow algorithms test suite.
2 """
3 import pytest
4
5 import networkx as nx
6 from networkx.algorithms.flow import build_flow_dict, build_residual_network
7 from networkx.algorithms.flow import boykov_kolmogorov
8 from networkx.algorithms.flow import edmonds_karp
9 from networkx.algorithms.flow import preflow_push
10 from networkx.algorithms.flow import shortest_augmenting_path
11 from networkx.algorithms.flow import dinitz
12
13 flow_funcs = [
14 boykov_kolmogorov,
15 dinitz,
16 edmonds_karp,
17 preflow_push,
18 shortest_augmenting_path,
19 ]
20 max_min_funcs = [nx.maximum_flow, nx.minimum_cut]
21 flow_value_funcs = [nx.maximum_flow_value, nx.minimum_cut_value]
22 interface_funcs = sum([max_min_funcs, flow_value_funcs], [])
23 all_funcs = sum([flow_funcs, interface_funcs], [])
24
25
26 def compute_cutset(G, partition):
27 reachable, non_reachable = partition
28 cutset = set()
29 for u, nbrs in ((n, G[n]) for n in reachable):
30 cutset.update((u, v) for v in nbrs if v in non_reachable)
31 return cutset
32
33
34 def validate_flows(G, s, t, flowDict, solnValue, capacity, flow_func):
35 errmsg = f"Assertion failed in function: {flow_func.__name__}"
36 assert set(G) == set(flowDict), errmsg
37 for u in G:
38 assert set(G[u]) == set(flowDict[u]), errmsg
39 excess = {u: 0 for u in flowDict}
40 for u in flowDict:
41 for v, flow in flowDict[u].items():
42 if capacity in G[u][v]:
43 assert flow <= G[u][v][capacity]
44 assert flow >= 0, errmsg
45 excess[u] -= flow
46 excess[v] += flow
47 for u, exc in excess.items():
48 if u == s:
49 assert exc == -solnValue, errmsg
50 elif u == t:
51 assert exc == solnValue, errmsg
52 else:
53 assert exc == 0, errmsg
54
55
56 def validate_cuts(G, s, t, solnValue, partition, capacity, flow_func):
57 errmsg = f"Assertion failed in function: {flow_func.__name__}"
58 assert all(n in G for n in partition[0]), errmsg
59 assert all(n in G for n in partition[1]), errmsg
60 cutset = compute_cutset(G, partition)
61 assert all(G.has_edge(u, v) for (u, v) in cutset), errmsg
62 assert solnValue == sum(G[u][v][capacity] for (u, v) in cutset), errmsg
63 H = G.copy()
64 H.remove_edges_from(cutset)
65 if not G.is_directed():
66 assert not nx.is_connected(H), errmsg
67 else:
68 assert not nx.is_strongly_connected(H), errmsg
69
70
71 def compare_flows_and_cuts(G, s, t, solnFlows, solnValue, capacity="capacity"):
72 for flow_func in flow_funcs:
73 errmsg = f"Assertion failed in function: {flow_func.__name__}"
74 R = flow_func(G, s, t, capacity)
75 # Test both legacy and new implementations.
76 flow_value = R.graph["flow_value"]
77 flow_dict = build_flow_dict(G, R)
78 assert flow_value == solnValue, errmsg
79 validate_flows(G, s, t, flow_dict, solnValue, capacity, flow_func)
80 # Minimum cut
81 cut_value, partition = nx.minimum_cut(
82 G, s, t, capacity=capacity, flow_func=flow_func
83 )
84 validate_cuts(G, s, t, solnValue, partition, capacity, flow_func)
85
86
87 class TestMaxflowMinCutCommon:
88 def test_graph1(self):
89 # Trivial undirected graph
90 G = nx.Graph()
91 G.add_edge(1, 2, capacity=1.0)
92
93 solnFlows = {1: {2: 1.0}, 2: {1: 1.0}}
94
95 compare_flows_and_cuts(G, 1, 2, solnFlows, 1.0)
96
97 def test_graph2(self):
98 # A more complex undirected graph
99 # adapted from www.topcoder.com/tc?module=Statc&d1=tutorials&d2=maxFlow
100 G = nx.Graph()
101 G.add_edge("x", "a", capacity=3.0)
102 G.add_edge("x", "b", capacity=1.0)
103 G.add_edge("a", "c", capacity=3.0)
104 G.add_edge("b", "c", capacity=5.0)
105 G.add_edge("b", "d", capacity=4.0)
106 G.add_edge("d", "e", capacity=2.0)
107 G.add_edge("c", "y", capacity=2.0)
108 G.add_edge("e", "y", capacity=3.0)
109
110 H = {
111 "x": {"a": 3, "b": 1},
112 "a": {"c": 3, "x": 3},
113 "b": {"c": 1, "d": 2, "x": 1},
114 "c": {"a": 3, "b": 1, "y": 2},
115 "d": {"b": 2, "e": 2},
116 "e": {"d": 2, "y": 2},
117 "y": {"c": 2, "e": 2},
118 }
119
120 compare_flows_and_cuts(G, "x", "y", H, 4.0)
121
122 def test_digraph1(self):
123 # The classic directed graph example
124 G = nx.DiGraph()
125 G.add_edge("a", "b", capacity=1000.0)
126 G.add_edge("a", "c", capacity=1000.0)
127 G.add_edge("b", "c", capacity=1.0)
128 G.add_edge("b", "d", capacity=1000.0)
129 G.add_edge("c", "d", capacity=1000.0)
130
131 H = {
132 "a": {"b": 1000.0, "c": 1000.0},
133 "b": {"c": 0, "d": 1000.0},
134 "c": {"d": 1000.0},
135 "d": {},
136 }
137
138 compare_flows_and_cuts(G, "a", "d", H, 2000.0)
139
140 def test_digraph2(self):
141 # An example in which some edges end up with zero flow.
142 G = nx.DiGraph()
143 G.add_edge("s", "b", capacity=2)
144 G.add_edge("s", "c", capacity=1)
145 G.add_edge("c", "d", capacity=1)
146 G.add_edge("d", "a", capacity=1)
147 G.add_edge("b", "a", capacity=2)
148 G.add_edge("a", "t", capacity=2)
149
150 H = {
151 "s": {"b": 2, "c": 0},
152 "c": {"d": 0},
153 "d": {"a": 0},
154 "b": {"a": 2},
155 "a": {"t": 2},
156 "t": {},
157 }
158
159 compare_flows_and_cuts(G, "s", "t", H, 2)
160
161 def test_digraph3(self):
162 # A directed graph example from Cormen et al.
163 G = nx.DiGraph()
164 G.add_edge("s", "v1", capacity=16.0)
165 G.add_edge("s", "v2", capacity=13.0)
166 G.add_edge("v1", "v2", capacity=10.0)
167 G.add_edge("v2", "v1", capacity=4.0)
168 G.add_edge("v1", "v3", capacity=12.0)
169 G.add_edge("v3", "v2", capacity=9.0)
170 G.add_edge("v2", "v4", capacity=14.0)
171 G.add_edge("v4", "v3", capacity=7.0)
172 G.add_edge("v3", "t", capacity=20.0)
173 G.add_edge("v4", "t", capacity=4.0)
174
175 H = {
176 "s": {"v1": 12.0, "v2": 11.0},
177 "v2": {"v1": 0, "v4": 11.0},
178 "v1": {"v2": 0, "v3": 12.0},
179 "v3": {"v2": 0, "t": 19.0},
180 "v4": {"v3": 7.0, "t": 4.0},
181 "t": {},
182 }
183
184 compare_flows_and_cuts(G, "s", "t", H, 23.0)
185
186 def test_digraph4(self):
187 # A more complex directed graph
188 # from www.topcoder.com/tc?module=Statc&d1=tutorials&d2=maxFlow
189 G = nx.DiGraph()
190 G.add_edge("x", "a", capacity=3.0)
191 G.add_edge("x", "b", capacity=1.0)
192 G.add_edge("a", "c", capacity=3.0)
193 G.add_edge("b", "c", capacity=5.0)
194 G.add_edge("b", "d", capacity=4.0)
195 G.add_edge("d", "e", capacity=2.0)
196 G.add_edge("c", "y", capacity=2.0)
197 G.add_edge("e", "y", capacity=3.0)
198
199 H = {
200 "x": {"a": 2.0, "b": 1.0},
201 "a": {"c": 2.0},
202 "b": {"c": 0, "d": 1.0},
203 "c": {"y": 2.0},
204 "d": {"e": 1.0},
205 "e": {"y": 1.0},
206 "y": {},
207 }
208
209 compare_flows_and_cuts(G, "x", "y", H, 3.0)
210
211 def test_wikipedia_dinitz_example(self):
212 # Nice example from https://en.wikipedia.org/wiki/Dinic's_algorithm
213 G = nx.DiGraph()
214 G.add_edge("s", 1, capacity=10)
215 G.add_edge("s", 2, capacity=10)
216 G.add_edge(1, 3, capacity=4)
217 G.add_edge(1, 4, capacity=8)
218 G.add_edge(1, 2, capacity=2)
219 G.add_edge(2, 4, capacity=9)
220 G.add_edge(3, "t", capacity=10)
221 G.add_edge(4, 3, capacity=6)
222 G.add_edge(4, "t", capacity=10)
223
224 solnFlows = {
225 1: {2: 0, 3: 4, 4: 6},
226 2: {4: 9},
227 3: {"t": 9},
228 4: {3: 5, "t": 10},
229 "s": {1: 10, 2: 9},
230 "t": {},
231 }
232
233 compare_flows_and_cuts(G, "s", "t", solnFlows, 19)
234
235 def test_optional_capacity(self):
236 # Test optional capacity parameter.
237 G = nx.DiGraph()
238 G.add_edge("x", "a", spam=3.0)
239 G.add_edge("x", "b", spam=1.0)
240 G.add_edge("a", "c", spam=3.0)
241 G.add_edge("b", "c", spam=5.0)
242 G.add_edge("b", "d", spam=4.0)
243 G.add_edge("d", "e", spam=2.0)
244 G.add_edge("c", "y", spam=2.0)
245 G.add_edge("e", "y", spam=3.0)
246
247 solnFlows = {
248 "x": {"a": 2.0, "b": 1.0},
249 "a": {"c": 2.0},
250 "b": {"c": 0, "d": 1.0},
251 "c": {"y": 2.0},
252 "d": {"e": 1.0},
253 "e": {"y": 1.0},
254 "y": {},
255 }
256 solnValue = 3.0
257 s = "x"
258 t = "y"
259
260 compare_flows_and_cuts(G, s, t, solnFlows, solnValue, capacity="spam")
261
262 def test_digraph_infcap_edges(self):
263 # DiGraph with infinite capacity edges
264 G = nx.DiGraph()
265 G.add_edge("s", "a")
266 G.add_edge("s", "b", capacity=30)
267 G.add_edge("a", "c", capacity=25)
268 G.add_edge("b", "c", capacity=12)
269 G.add_edge("a", "t", capacity=60)
270 G.add_edge("c", "t")
271
272 H = {
273 "s": {"a": 85, "b": 12},
274 "a": {"c": 25, "t": 60},
275 "b": {"c": 12},
276 "c": {"t": 37},
277 "t": {},
278 }
279
280 compare_flows_and_cuts(G, "s", "t", H, 97)
281
282 # DiGraph with infinite capacity digon
283 G = nx.DiGraph()
284 G.add_edge("s", "a", capacity=85)
285 G.add_edge("s", "b", capacity=30)
286 G.add_edge("a", "c")
287 G.add_edge("c", "a")
288 G.add_edge("b", "c", capacity=12)
289 G.add_edge("a", "t", capacity=60)
290 G.add_edge("c", "t", capacity=37)
291
292 H = {
293 "s": {"a": 85, "b": 12},
294 "a": {"c": 25, "t": 60},
295 "c": {"a": 0, "t": 37},
296 "b": {"c": 12},
297 "t": {},
298 }
299
300 compare_flows_and_cuts(G, "s", "t", H, 97)
301
302 def test_digraph_infcap_path(self):
303 # Graph with infinite capacity (s, t)-path
304 G = nx.DiGraph()
305 G.add_edge("s", "a")
306 G.add_edge("s", "b", capacity=30)
307 G.add_edge("a", "c")
308 G.add_edge("b", "c", capacity=12)
309 G.add_edge("a", "t", capacity=60)
310 G.add_edge("c", "t")
311
312 for flow_func in all_funcs:
313 pytest.raises(nx.NetworkXUnbounded, flow_func, G, "s", "t")
314
315 def test_graph_infcap_edges(self):
316 # Undirected graph with infinite capacity edges
317 G = nx.Graph()
318 G.add_edge("s", "a")
319 G.add_edge("s", "b", capacity=30)
320 G.add_edge("a", "c", capacity=25)
321 G.add_edge("b", "c", capacity=12)
322 G.add_edge("a", "t", capacity=60)
323 G.add_edge("c", "t")
324
325 H = {
326 "s": {"a": 85, "b": 12},
327 "a": {"c": 25, "s": 85, "t": 60},
328 "b": {"c": 12, "s": 12},
329 "c": {"a": 25, "b": 12, "t": 37},
330 "t": {"a": 60, "c": 37},
331 }
332
333 compare_flows_and_cuts(G, "s", "t", H, 97)
334
335 def test_digraph5(self):
336 # From ticket #429 by mfrasca.
337 G = nx.DiGraph()
338 G.add_edge("s", "a", capacity=2)
339 G.add_edge("s", "b", capacity=2)
340 G.add_edge("a", "b", capacity=5)
341 G.add_edge("a", "t", capacity=1)
342 G.add_edge("b", "a", capacity=1)
343 G.add_edge("b", "t", capacity=3)
344 flowSoln = {
345 "a": {"b": 1, "t": 1},
346 "b": {"a": 0, "t": 3},
347 "s": {"a": 2, "b": 2},
348 "t": {},
349 }
350 compare_flows_and_cuts(G, "s", "t", flowSoln, 4)
351
352 def test_disconnected(self):
353 G = nx.Graph()
354 G.add_weighted_edges_from([(0, 1, 1), (1, 2, 1), (2, 3, 1)], weight="capacity")
355 G.remove_node(1)
356 assert nx.maximum_flow_value(G, 0, 3) == 0
357 flowSoln = {0: {}, 2: {3: 0}, 3: {2: 0}}
358 compare_flows_and_cuts(G, 0, 3, flowSoln, 0)
359
360 def test_source_target_not_in_graph(self):
361 G = nx.Graph()
362 G.add_weighted_edges_from([(0, 1, 1), (1, 2, 1), (2, 3, 1)], weight="capacity")
363 G.remove_node(0)
364 for flow_func in all_funcs:
365 pytest.raises(nx.NetworkXError, flow_func, G, 0, 3)
366 G.add_weighted_edges_from([(0, 1, 1), (1, 2, 1), (2, 3, 1)], weight="capacity")
367 G.remove_node(3)
368 for flow_func in all_funcs:
369 pytest.raises(nx.NetworkXError, flow_func, G, 0, 3)
370
371 def test_source_target_coincide(self):
372 G = nx.Graph()
373 G.add_node(0)
374 for flow_func in all_funcs:
375 pytest.raises(nx.NetworkXError, flow_func, G, 0, 0)
376
377 def test_multigraphs_raise(self):
378 G = nx.MultiGraph()
379 M = nx.MultiDiGraph()
380 G.add_edges_from([(0, 1), (1, 0)], capacity=True)
381 for flow_func in all_funcs:
382 pytest.raises(nx.NetworkXError, flow_func, G, 0, 0)
383
384
385 class TestMaxFlowMinCutInterface:
386 def setup(self):
387 G = nx.DiGraph()
388 G.add_edge("x", "a", capacity=3.0)
389 G.add_edge("x", "b", capacity=1.0)
390 G.add_edge("a", "c", capacity=3.0)
391 G.add_edge("b", "c", capacity=5.0)
392 G.add_edge("b", "d", capacity=4.0)
393 G.add_edge("d", "e", capacity=2.0)
394 G.add_edge("c", "y", capacity=2.0)
395 G.add_edge("e", "y", capacity=3.0)
396 self.G = G
397 H = nx.DiGraph()
398 H.add_edge(0, 1, capacity=1.0)
399 H.add_edge(1, 2, capacity=1.0)
400 self.H = H
401
402 def test_flow_func_not_callable(self):
403 elements = ["this_should_be_callable", 10, {1, 2, 3}]
404 G = nx.Graph()
405 G.add_weighted_edges_from([(0, 1, 1), (1, 2, 1), (2, 3, 1)], weight="capacity")
406 for flow_func in interface_funcs:
407 for element in elements:
408 pytest.raises(nx.NetworkXError, flow_func, G, 0, 1, flow_func=element)
409 pytest.raises(nx.NetworkXError, flow_func, G, 0, 1, flow_func=element)
410
411 def test_flow_func_parameters(self):
412 G = self.G
413 fv = 3.0
414 for interface_func in interface_funcs:
415 for flow_func in flow_funcs:
416 errmsg = (
417 f"Assertion failed in function: {flow_func.__name__} "
418 f"in interface {interface_func.__name__}"
419 )
420 result = interface_func(G, "x", "y", flow_func=flow_func)
421 if interface_func in max_min_funcs:
422 result = result[0]
423 assert fv == result, errmsg
424
425 def test_minimum_cut_no_cutoff(self):
426 G = self.G
427 for flow_func in flow_funcs:
428 pytest.raises(
429 nx.NetworkXError,
430 nx.minimum_cut,
431 G,
432 "x",
433 "y",
434 flow_func=flow_func,
435 cutoff=1.0,
436 )
437 pytest.raises(
438 nx.NetworkXError,
439 nx.minimum_cut_value,
440 G,
441 "x",
442 "y",
443 flow_func=flow_func,
444 cutoff=1.0,
445 )
446
447 def test_kwargs(self):
448 G = self.H
449 fv = 1.0
450 to_test = (
451 (shortest_augmenting_path, dict(two_phase=True)),
452 (preflow_push, dict(global_relabel_freq=5)),
453 )
454 for interface_func in interface_funcs:
455 for flow_func, kwargs in to_test:
456 errmsg = (
457 f"Assertion failed in function: {flow_func.__name__} "
458 f"in interface {interface_func.__name__}"
459 )
460 result = interface_func(G, 0, 2, flow_func=flow_func, **kwargs)
461 if interface_func in max_min_funcs:
462 result = result[0]
463 assert fv == result, errmsg
464
465 def test_kwargs_default_flow_func(self):
466 G = self.H
467 for interface_func in interface_funcs:
468 pytest.raises(
469 nx.NetworkXError, interface_func, G, 0, 1, global_relabel_freq=2
470 )
471
472 def test_reusing_residual(self):
473 G = self.G
474 fv = 3.0
475 s, t = "x", "y"
476 R = build_residual_network(G, "capacity")
477 for interface_func in interface_funcs:
478 for flow_func in flow_funcs:
479 errmsg = (
480 f"Assertion failed in function: {flow_func.__name__} "
481 f"in interface {interface_func.__name__}"
482 )
483 for i in range(3):
484 result = interface_func(
485 G, "x", "y", flow_func=flow_func, residual=R
486 )
487 if interface_func in max_min_funcs:
488 result = result[0]
489 assert fv == result, errmsg
490
491
492 # Tests specific to one algorithm
493 def test_preflow_push_global_relabel_freq():
494 G = nx.DiGraph()
495 G.add_edge(1, 2, capacity=1)
496 R = preflow_push(G, 1, 2, global_relabel_freq=None)
497 assert R.graph["flow_value"] == 1
498 pytest.raises(nx.NetworkXError, preflow_push, G, 1, 2, global_relabel_freq=-1)
499
500
501 def test_preflow_push_makes_enough_space():
502 # From ticket #1542
503 G = nx.DiGraph()
504 nx.add_path(G, [0, 1, 3], capacity=1)
505 nx.add_path(G, [1, 2, 3], capacity=1)
506 R = preflow_push(G, 0, 3, value_only=False)
507 assert R.graph["flow_value"] == 1
508
509
510 def test_shortest_augmenting_path_two_phase():
511 k = 5
512 p = 1000
513 G = nx.DiGraph()
514 for i in range(k):
515 G.add_edge("s", (i, 0), capacity=1)
516 nx.add_path(G, ((i, j) for j in range(p)), capacity=1)
517 G.add_edge((i, p - 1), "t", capacity=1)
518 R = shortest_augmenting_path(G, "s", "t", two_phase=True)
519 assert R.graph["flow_value"] == k
520 R = shortest_augmenting_path(G, "s", "t", two_phase=False)
521 assert R.graph["flow_value"] == k
522
523
524 class TestCutoff:
525 def test_cutoff(self):
526 k = 5
527 p = 1000
528 G = nx.DiGraph()
529 for i in range(k):
530 G.add_edge("s", (i, 0), capacity=2)
531 nx.add_path(G, ((i, j) for j in range(p)), capacity=2)
532 G.add_edge((i, p - 1), "t", capacity=2)
533 R = shortest_augmenting_path(G, "s", "t", two_phase=True, cutoff=k)
534 assert k <= R.graph["flow_value"] <= (2 * k)
535 R = shortest_augmenting_path(G, "s", "t", two_phase=False, cutoff=k)
536 assert k <= R.graph["flow_value"] <= (2 * k)
537 R = edmonds_karp(G, "s", "t", cutoff=k)
538 assert k <= R.graph["flow_value"] <= (2 * k)
539
540 def test_complete_graph_cutoff(self):
541 G = nx.complete_graph(5)
542 nx.set_edge_attributes(G, {(u, v): 1 for u, v in G.edges()}, "capacity")
543 for flow_func in [shortest_augmenting_path, edmonds_karp]:
544 for cutoff in [3, 2, 1]:
545 result = nx.maximum_flow_value(
546 G, 0, 4, flow_func=flow_func, cutoff=cutoff
547 )
548 assert cutoff == result, f"cutoff error in {flow_func.__name__}"