Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/networkx/utils/decorators.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 from collections import defaultdict | |
2 from os.path import splitext | |
3 from contextlib import contextmanager | |
4 from pathlib import Path | |
5 | |
6 import networkx as nx | |
7 from decorator import decorator | |
8 from networkx.utils import create_random_state, create_py_random_state | |
9 | |
10 __all__ = [ | |
11 "not_implemented_for", | |
12 "open_file", | |
13 "nodes_or_number", | |
14 "preserve_random_state", | |
15 "random_state", | |
16 "np_random_state", | |
17 "py_random_state", | |
18 ] | |
19 | |
20 | |
21 def not_implemented_for(*graph_types): | |
22 """Decorator to mark algorithms as not implemented | |
23 | |
24 Parameters | |
25 ---------- | |
26 graph_types : container of strings | |
27 Entries must be one of 'directed','undirected', 'multigraph', 'graph'. | |
28 | |
29 Returns | |
30 ------- | |
31 _require : function | |
32 The decorated function. | |
33 | |
34 Raises | |
35 ------ | |
36 NetworkXNotImplemented | |
37 If any of the packages cannot be imported | |
38 | |
39 Notes | |
40 ----- | |
41 Multiple types are joined logically with "and". | |
42 For "or" use multiple @not_implemented_for() lines. | |
43 | |
44 Examples | |
45 -------- | |
46 Decorate functions like this:: | |
47 | |
48 @not_implemnted_for('directed') | |
49 def sp_function(G): | |
50 pass | |
51 | |
52 @not_implemnted_for('directed','multigraph') | |
53 def sp_np_function(G): | |
54 pass | |
55 """ | |
56 | |
57 @decorator | |
58 def _not_implemented_for(not_implement_for_func, *args, **kwargs): | |
59 graph = args[0] | |
60 terms = { | |
61 "directed": graph.is_directed(), | |
62 "undirected": not graph.is_directed(), | |
63 "multigraph": graph.is_multigraph(), | |
64 "graph": not graph.is_multigraph(), | |
65 } | |
66 match = True | |
67 try: | |
68 for t in graph_types: | |
69 match = match and terms[t] | |
70 except KeyError as e: | |
71 raise KeyError( | |
72 "use one or more of " "directed, undirected, multigraph, graph" | |
73 ) from e | |
74 if match: | |
75 msg = f"not implemented for {' '.join(graph_types)} type" | |
76 raise nx.NetworkXNotImplemented(msg) | |
77 else: | |
78 return not_implement_for_func(*args, **kwargs) | |
79 | |
80 return _not_implemented_for | |
81 | |
82 | |
83 def _open_gz(path, mode): | |
84 import gzip | |
85 | |
86 return gzip.open(path, mode=mode) | |
87 | |
88 | |
89 def _open_bz2(path, mode): | |
90 import bz2 | |
91 | |
92 return bz2.BZ2File(path, mode=mode) | |
93 | |
94 | |
95 # To handle new extensions, define a function accepting a `path` and `mode`. | |
96 # Then add the extension to _dispatch_dict. | |
97 _dispatch_dict = defaultdict(lambda: open) | |
98 _dispatch_dict[".gz"] = _open_gz | |
99 _dispatch_dict[".bz2"] = _open_bz2 | |
100 _dispatch_dict[".gzip"] = _open_gz | |
101 | |
102 | |
103 def open_file(path_arg, mode="r"): | |
104 """Decorator to ensure clean opening and closing of files. | |
105 | |
106 Parameters | |
107 ---------- | |
108 path_arg : int | |
109 Location of the path argument in args. Even if the argument is a | |
110 named positional argument (with a default value), you must specify its | |
111 index as a positional argument. | |
112 mode : str | |
113 String for opening mode. | |
114 | |
115 Returns | |
116 ------- | |
117 _open_file : function | |
118 Function which cleanly executes the io. | |
119 | |
120 Examples | |
121 -------- | |
122 Decorate functions like this:: | |
123 | |
124 @open_file(0,'r') | |
125 def read_function(pathname): | |
126 pass | |
127 | |
128 @open_file(1,'w') | |
129 def write_function(G,pathname): | |
130 pass | |
131 | |
132 @open_file(1,'w') | |
133 def write_function(G, pathname='graph.dot') | |
134 pass | |
135 | |
136 @open_file('path', 'w+') | |
137 def another_function(arg, **kwargs): | |
138 path = kwargs['path'] | |
139 pass | |
140 """ | |
141 # Note that this decorator solves the problem when a path argument is | |
142 # specified as a string, but it does not handle the situation when the | |
143 # function wants to accept a default of None (and then handle it). | |
144 # Here is an example: | |
145 # | |
146 # @open_file('path') | |
147 # def some_function(arg1, arg2, path=None): | |
148 # if path is None: | |
149 # fobj = tempfile.NamedTemporaryFile(delete=False) | |
150 # close_fobj = True | |
151 # else: | |
152 # # `path` could have been a string or file object or something | |
153 # # similar. In any event, the decorator has given us a file object | |
154 # # and it will close it for us, if it should. | |
155 # fobj = path | |
156 # close_fobj = False | |
157 # | |
158 # try: | |
159 # fobj.write('blah') | |
160 # finally: | |
161 # if close_fobj: | |
162 # fobj.close() | |
163 # | |
164 # Normally, we'd want to use "with" to ensure that fobj gets closed. | |
165 # However, recall that the decorator will make `path` a file object for | |
166 # us, and using "with" would undesirably close that file object. Instead, | |
167 # you use a try block, as shown above. When we exit the function, fobj will | |
168 # be closed, if it should be, by the decorator. | |
169 | |
170 @decorator | |
171 def _open_file(func_to_be_decorated, *args, **kwargs): | |
172 | |
173 # Note that since we have used @decorator, *args, and **kwargs have | |
174 # already been resolved to match the function signature of func. This | |
175 # means default values have been propagated. For example, the function | |
176 # func(x, y, a=1, b=2, **kwargs) if called as func(0,1,b=5,c=10) would | |
177 # have args=(0,1,1,5) and kwargs={'c':10}. | |
178 | |
179 # First we parse the arguments of the decorator. The path_arg could | |
180 # be an positional argument or a keyword argument. Even if it is | |
181 try: | |
182 # path_arg is a required positional argument | |
183 # This works precisely because we are using @decorator | |
184 path = args[path_arg] | |
185 except TypeError: | |
186 # path_arg is a keyword argument. It is "required" in the sense | |
187 # that it must exist, according to the decorator specification, | |
188 # It can exist in `kwargs` by a developer specified default value | |
189 # or it could have been explicitly set by the user. | |
190 try: | |
191 path = kwargs[path_arg] | |
192 except KeyError as e: | |
193 # Could not find the keyword. Thus, no default was specified | |
194 # in the function signature and the user did not provide it. | |
195 msg = f"Missing required keyword argument: {path_arg}" | |
196 raise nx.NetworkXError(msg) from e | |
197 else: | |
198 is_kwarg = True | |
199 except IndexError as e: | |
200 # A "required" argument was missing. This can only happen if | |
201 # the decorator of the function was incorrectly specified. | |
202 # So this probably is not a user error, but a developer error. | |
203 msg = "path_arg of open_file decorator is incorrect" | |
204 raise nx.NetworkXError(msg) from e | |
205 else: | |
206 is_kwarg = False | |
207 | |
208 # Now we have the path_arg. There are two types of input to consider: | |
209 # 1) string representing a path that should be opened | |
210 # 2) an already opened file object | |
211 if isinstance(path, str): | |
212 ext = splitext(path)[1] | |
213 fobj = _dispatch_dict[ext](path, mode=mode) | |
214 close_fobj = True | |
215 elif hasattr(path, "read"): | |
216 # path is already a file-like object | |
217 fobj = path | |
218 close_fobj = False | |
219 elif isinstance(path, Path): | |
220 # path is a pathlib reference to a filename | |
221 fobj = _dispatch_dict[path.suffix](str(path), mode=mode) | |
222 close_fobj = True | |
223 else: | |
224 # could be None, in which case the algorithm will deal with it | |
225 fobj = path | |
226 close_fobj = False | |
227 | |
228 # Insert file object into args or kwargs. | |
229 if is_kwarg: | |
230 new_args = args | |
231 kwargs[path_arg] = fobj | |
232 else: | |
233 # args is a tuple, so we must convert to list before modifying it. | |
234 new_args = list(args) | |
235 new_args[path_arg] = fobj | |
236 | |
237 # Finally, we call the original function, making sure to close the fobj | |
238 try: | |
239 result = func_to_be_decorated(*new_args, **kwargs) | |
240 finally: | |
241 if close_fobj: | |
242 fobj.close() | |
243 | |
244 return result | |
245 | |
246 return _open_file | |
247 | |
248 | |
249 def nodes_or_number(which_args): | |
250 """Decorator to allow number of nodes or container of nodes. | |
251 | |
252 Parameters | |
253 ---------- | |
254 which_args : int or sequence of ints | |
255 Location of the node arguments in args. Even if the argument is a | |
256 named positional argument (with a default value), you must specify its | |
257 index as a positional argument. | |
258 If more than one node argument is allowed, can be a list of locations. | |
259 | |
260 Returns | |
261 ------- | |
262 _nodes_or_numbers : function | |
263 Function which replaces int args with ranges. | |
264 | |
265 Examples | |
266 -------- | |
267 Decorate functions like this:: | |
268 | |
269 @nodes_or_number(0) | |
270 def empty_graph(nodes): | |
271 pass | |
272 | |
273 @nodes_or_number([0,1]) | |
274 def grid_2d_graph(m1, m2, periodic=False): | |
275 pass | |
276 | |
277 @nodes_or_number(1) | |
278 def full_rary_tree(r, n) | |
279 # r is a number. n can be a number of a list of nodes | |
280 pass | |
281 """ | |
282 | |
283 @decorator | |
284 def _nodes_or_number(func_to_be_decorated, *args, **kw): | |
285 # form tuple of arg positions to be converted. | |
286 try: | |
287 iter_wa = iter(which_args) | |
288 except TypeError: | |
289 iter_wa = (which_args,) | |
290 # change each argument in turn | |
291 new_args = list(args) | |
292 for i in iter_wa: | |
293 n = args[i] | |
294 try: | |
295 nodes = list(range(n)) | |
296 except TypeError: | |
297 nodes = tuple(n) | |
298 else: | |
299 if n < 0: | |
300 msg = "Negative number of nodes not valid: {n}" | |
301 raise nx.NetworkXError(msg) | |
302 new_args[i] = (n, nodes) | |
303 return func_to_be_decorated(*new_args, **kw) | |
304 | |
305 return _nodes_or_number | |
306 | |
307 | |
308 def preserve_random_state(func): | |
309 """ Decorator to preserve the numpy.random state during a function. | |
310 | |
311 Parameters | |
312 ---------- | |
313 func : function | |
314 function around which to preserve the random state. | |
315 | |
316 Returns | |
317 ------- | |
318 wrapper : function | |
319 Function which wraps the input function by saving the state before | |
320 calling the function and restoring the function afterward. | |
321 | |
322 Examples | |
323 -------- | |
324 Decorate functions like this:: | |
325 | |
326 @preserve_random_state | |
327 def do_random_stuff(x, y): | |
328 return x + y * numpy.random.random() | |
329 | |
330 Notes | |
331 ----- | |
332 If numpy.random is not importable, the state is not saved or restored. | |
333 """ | |
334 try: | |
335 from numpy.random import get_state, seed, set_state | |
336 | |
337 @contextmanager | |
338 def save_random_state(): | |
339 state = get_state() | |
340 try: | |
341 yield | |
342 finally: | |
343 set_state(state) | |
344 | |
345 def wrapper(*args, **kwargs): | |
346 with save_random_state(): | |
347 seed(1234567890) | |
348 return func(*args, **kwargs) | |
349 | |
350 wrapper.__name__ = func.__name__ | |
351 return wrapper | |
352 except ImportError: | |
353 return func | |
354 | |
355 | |
356 def random_state(random_state_index): | |
357 """Decorator to generate a numpy.random.RandomState instance. | |
358 | |
359 Argument position `random_state_index` is processed by create_random_state. | |
360 The result is a numpy.random.RandomState instance. | |
361 | |
362 Parameters | |
363 ---------- | |
364 random_state_index : int | |
365 Location of the random_state argument in args that is to be used to | |
366 generate the numpy.random.RandomState instance. Even if the argument is | |
367 a named positional argument (with a default value), you must specify | |
368 its index as a positional argument. | |
369 | |
370 Returns | |
371 ------- | |
372 _random_state : function | |
373 Function whose random_state keyword argument is a RandomState instance. | |
374 | |
375 Examples | |
376 -------- | |
377 Decorate functions like this:: | |
378 | |
379 @np_random_state(0) | |
380 def random_float(random_state=None): | |
381 return random_state.rand() | |
382 | |
383 @np_random_state(1) | |
384 def random_array(dims, random_state=1): | |
385 return random_state.rand(*dims) | |
386 | |
387 See Also | |
388 -------- | |
389 py_random_state | |
390 """ | |
391 | |
392 @decorator | |
393 def _random_state(func, *args, **kwargs): | |
394 # Parse the decorator arguments. | |
395 try: | |
396 random_state_arg = args[random_state_index] | |
397 except TypeError as e: | |
398 raise nx.NetworkXError("random_state_index must be an integer") from e | |
399 except IndexError as e: | |
400 raise nx.NetworkXError("random_state_index is incorrect") from e | |
401 | |
402 # Create a numpy.random.RandomState instance | |
403 random_state = create_random_state(random_state_arg) | |
404 | |
405 # args is a tuple, so we must convert to list before modifying it. | |
406 new_args = list(args) | |
407 new_args[random_state_index] = random_state | |
408 return func(*new_args, **kwargs) | |
409 | |
410 return _random_state | |
411 | |
412 | |
413 np_random_state = random_state | |
414 | |
415 | |
416 def py_random_state(random_state_index): | |
417 """Decorator to generate a random.Random instance (or equiv). | |
418 | |
419 Argument position `random_state_index` processed by create_py_random_state. | |
420 The result is either a random.Random instance, or numpy.random.RandomState | |
421 instance with additional attributes to mimic basic methods of Random. | |
422 | |
423 Parameters | |
424 ---------- | |
425 random_state_index : int | |
426 Location of the random_state argument in args that is to be used to | |
427 generate the numpy.random.RandomState instance. Even if the argument is | |
428 a named positional argument (with a default value), you must specify | |
429 its index as a positional argument. | |
430 | |
431 Returns | |
432 ------- | |
433 _random_state : function | |
434 Function whose random_state keyword argument is a RandomState instance. | |
435 | |
436 Examples | |
437 -------- | |
438 Decorate functions like this:: | |
439 | |
440 @py_random_state(0) | |
441 def random_float(random_state=None): | |
442 return random_state.rand() | |
443 | |
444 @py_random_state(1) | |
445 def random_array(dims, random_state=1): | |
446 return random_state.rand(*dims) | |
447 | |
448 See Also | |
449 -------- | |
450 np_random_state | |
451 """ | |
452 | |
453 @decorator | |
454 def _random_state(func, *args, **kwargs): | |
455 # Parse the decorator arguments. | |
456 try: | |
457 random_state_arg = args[random_state_index] | |
458 except TypeError as e: | |
459 raise nx.NetworkXError("random_state_index must be an integer") from e | |
460 except IndexError as e: | |
461 raise nx.NetworkXError("random_state_index is incorrect") from e | |
462 | |
463 # Create a numpy.random.RandomState instance | |
464 random_state = create_py_random_state(random_state_arg) | |
465 | |
466 # args is a tuple, so we must convert to list before modifying it. | |
467 new_args = list(args) | |
468 new_args[random_state_index] = random_state | |
469 return func(*new_args, **kwargs) | |
470 | |
471 return _random_state |