Mercurial > repos > shellac > sam_consensus_v3
diff env/lib/python3.9/site-packages/networkx/utils/tests/test_decorators.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.9/site-packages/networkx/utils/tests/test_decorators.py Mon Mar 22 18:12:50 2021 +0000 @@ -0,0 +1,300 @@ +import tempfile +import os +import pathlib +import random + +import pytest + +import networkx as nx +from networkx.utils.decorators import open_file, not_implemented_for +from networkx.utils.decorators import ( + preserve_random_state, + py_random_state, + np_random_state, + random_state, +) +from networkx.utils.misc import PythonRandomInterface + + +def test_not_implemented_decorator(): + @not_implemented_for("directed") + def test1(G): + pass + + test1(nx.Graph()) + + +def test_not_implemented_decorator_key(): + with pytest.raises(KeyError): + + @not_implemented_for("foo") + def test1(G): + pass + + test1(nx.Graph()) + + +def test_not_implemented_decorator_raise(): + with pytest.raises(nx.NetworkXNotImplemented): + + @not_implemented_for("graph") + def test1(G): + pass + + test1(nx.Graph()) + + +class TestOpenFileDecorator: + def setup_method(self): + self.text = ["Blah... ", "BLAH ", "BLAH!!!!"] + self.fobj = tempfile.NamedTemporaryFile("wb+", delete=False) + self.name = self.fobj.name + + def teardown_method(self): + self.fobj.close() + os.unlink(self.name) + + def write(self, path): + for text in self.text: + path.write(text.encode("ascii")) + + @open_file(1, "r") + def read(self, path): + return path.readlines()[0] + + @staticmethod + @open_file(0, "wb") + def writer_arg0(path): + path.write(b"demo") + + @open_file(1, "wb+") + def writer_arg1(self, path): + self.write(path) + + @open_file(2, "wb") + def writer_arg2default(self, x, path=None): + if path is None: + with tempfile.NamedTemporaryFile("wb+") as fh: + self.write(fh) + else: + self.write(path) + + @open_file(4, "wb") + def writer_arg4default(self, x, y, other="hello", path=None, **kwargs): + if path is None: + with tempfile.NamedTemporaryFile("wb+") as fh: + self.write(fh) + else: + self.write(path) + + @open_file("path", "wb") + def writer_kwarg(self, **kwargs): + path = kwargs.get("path", None) + if path is None: + with tempfile.NamedTemporaryFile("wb+") as fh: + self.write(fh) + else: + self.write(path) + + def test_writer_arg0_str(self): + self.writer_arg0(self.name) + + def test_writer_arg0_fobj(self): + self.writer_arg0(self.fobj) + + def test_writer_arg0_pathlib(self): + self.writer_arg0(pathlib.Path(self.name)) + + def test_writer_arg1_str(self): + self.writer_arg1(self.name) + assert self.read(self.name) == "".join(self.text) + + def test_writer_arg1_fobj(self): + self.writer_arg1(self.fobj) + assert not self.fobj.closed + self.fobj.close() + assert self.read(self.name) == "".join(self.text) + + def test_writer_arg2default_str(self): + self.writer_arg2default(0, path=None) + self.writer_arg2default(0, path=self.name) + assert self.read(self.name) == "".join(self.text) + + def test_writer_arg2default_fobj(self): + self.writer_arg2default(0, path=self.fobj) + assert not self.fobj.closed + self.fobj.close() + assert self.read(self.name) == "".join(self.text) + + def test_writer_arg2default_fobj_path_none(self): + self.writer_arg2default(0, path=None) + + def test_writer_arg4default_fobj(self): + self.writer_arg4default(0, 1, dog="dog", other="other") + self.writer_arg4default(0, 1, dog="dog", other="other", path=self.name) + assert self.read(self.name) == "".join(self.text) + + def test_writer_kwarg_str(self): + self.writer_kwarg(path=self.name) + assert self.read(self.name) == "".join(self.text) + + def test_writer_kwarg_fobj(self): + self.writer_kwarg(path=self.fobj) + self.fobj.close() + assert self.read(self.name) == "".join(self.text) + + def test_writer_kwarg_path_none(self): + self.writer_kwarg(path=None) + + +@preserve_random_state +def test_preserve_random_state(): + try: + import numpy.random + + r = numpy.random.random() + except ImportError: + return + assert abs(r - 0.61879477158568) < 1e-16 + + +class TestRandomState: + @classmethod + def setup_class(cls): + global np + np = pytest.importorskip("numpy") + + @random_state(1) + def instantiate_random_state(self, random_state): + assert isinstance(random_state, np.random.RandomState) + return random_state.random_sample() + + @np_random_state(1) + def instantiate_np_random_state(self, random_state): + assert isinstance(random_state, np.random.RandomState) + return random_state.random_sample() + + @py_random_state(1) + def instantiate_py_random_state(self, random_state): + assert isinstance(random_state, random.Random) or isinstance( + random_state, PythonRandomInterface + ) + return random_state.random() + + def test_random_state_None(self): + np.random.seed(42) + rv = np.random.random_sample() + np.random.seed(42) + assert rv == self.instantiate_random_state(None) + np.random.seed(42) + assert rv == self.instantiate_np_random_state(None) + + random.seed(42) + rv = random.random() + random.seed(42) + assert rv == self.instantiate_py_random_state(None) + + def test_random_state_np_random(self): + np.random.seed(42) + rv = np.random.random_sample() + np.random.seed(42) + assert rv == self.instantiate_random_state(np.random) + np.random.seed(42) + assert rv == self.instantiate_np_random_state(np.random) + np.random.seed(42) + assert rv == self.instantiate_py_random_state(np.random) + + def test_random_state_int(self): + np.random.seed(42) + np_rv = np.random.random_sample() + random.seed(42) + py_rv = random.random() + + np.random.seed(42) + seed = 1 + rval = self.instantiate_random_state(seed) + rval_expected = np.random.RandomState(seed).rand() + assert rval, rval_expected + + rval = self.instantiate_np_random_state(seed) + rval_expected = np.random.RandomState(seed).rand() + assert rval, rval_expected + # test that global seed wasn't changed in function + assert np_rv == np.random.random_sample() + + random.seed(42) + rval = self.instantiate_py_random_state(seed) + rval_expected = random.Random(seed).random() + assert rval, rval_expected + # test that global seed wasn't changed in function + assert py_rv == random.random() + + def test_random_state_np_random_RandomState(self): + np.random.seed(42) + np_rv = np.random.random_sample() + + np.random.seed(42) + seed = 1 + rng = np.random.RandomState(seed) + rval = self.instantiate_random_state(rng) + rval_expected = np.random.RandomState(seed).rand() + assert rval, rval_expected + + rval = self.instantiate_np_random_state(seed) + rval_expected = np.random.RandomState(seed).rand() + assert rval, rval_expected + + rval = self.instantiate_py_random_state(seed) + rval_expected = np.random.RandomState(seed).rand() + assert rval, rval_expected + # test that global seed wasn't changed in function + assert np_rv == np.random.random_sample() + + def test_random_state_py_random(self): + seed = 1 + rng = random.Random(seed) + rv = self.instantiate_py_random_state(rng) + assert rv, random.Random(seed).random() + + pytest.raises(ValueError, self.instantiate_random_state, rng) + pytest.raises(ValueError, self.instantiate_np_random_state, rng) + + +def test_random_state_string_arg_index(): + with pytest.raises(nx.NetworkXError): + + @random_state("a") + def make_random_state(rs): + pass + + rstate = make_random_state(1) + + +def test_py_random_state_string_arg_index(): + with pytest.raises(nx.NetworkXError): + + @py_random_state("a") + def make_random_state(rs): + pass + + rstate = make_random_state(1) + + +def test_random_state_invalid_arg_index(): + with pytest.raises(nx.NetworkXError): + + @random_state(2) + def make_random_state(rs): + pass + + rstate = make_random_state(1) + + +def test_py_random_state_invalid_arg_index(): + with pytest.raises(nx.NetworkXError): + + @py_random_state(2) + def make_random_state(rs): + pass + + rstate = make_random_state(1)