view env/lib/python3.9/site-packages/networkx/utils/tests/test_decorators.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line source

import tempfile
import os
import pathlib
import random

import pytest

import networkx as nx
from networkx.utils.decorators import open_file, not_implemented_for
from networkx.utils.decorators import (
    preserve_random_state,
    py_random_state,
    np_random_state,
    random_state,
)
from networkx.utils.misc import PythonRandomInterface


def test_not_implemented_decorator():
    @not_implemented_for("directed")
    def test1(G):
        pass

    test1(nx.Graph())


def test_not_implemented_decorator_key():
    with pytest.raises(KeyError):

        @not_implemented_for("foo")
        def test1(G):
            pass

        test1(nx.Graph())


def test_not_implemented_decorator_raise():
    with pytest.raises(nx.NetworkXNotImplemented):

        @not_implemented_for("graph")
        def test1(G):
            pass

        test1(nx.Graph())


class TestOpenFileDecorator:
    def setup_method(self):
        self.text = ["Blah... ", "BLAH ", "BLAH!!!!"]
        self.fobj = tempfile.NamedTemporaryFile("wb+", delete=False)
        self.name = self.fobj.name

    def teardown_method(self):
        self.fobj.close()
        os.unlink(self.name)

    def write(self, path):
        for text in self.text:
            path.write(text.encode("ascii"))

    @open_file(1, "r")
    def read(self, path):
        return path.readlines()[0]

    @staticmethod
    @open_file(0, "wb")
    def writer_arg0(path):
        path.write(b"demo")

    @open_file(1, "wb+")
    def writer_arg1(self, path):
        self.write(path)

    @open_file(2, "wb")
    def writer_arg2default(self, x, path=None):
        if path is None:
            with tempfile.NamedTemporaryFile("wb+") as fh:
                self.write(fh)
        else:
            self.write(path)

    @open_file(4, "wb")
    def writer_arg4default(self, x, y, other="hello", path=None, **kwargs):
        if path is None:
            with tempfile.NamedTemporaryFile("wb+") as fh:
                self.write(fh)
        else:
            self.write(path)

    @open_file("path", "wb")
    def writer_kwarg(self, **kwargs):
        path = kwargs.get("path", None)
        if path is None:
            with tempfile.NamedTemporaryFile("wb+") as fh:
                self.write(fh)
        else:
            self.write(path)

    def test_writer_arg0_str(self):
        self.writer_arg0(self.name)

    def test_writer_arg0_fobj(self):
        self.writer_arg0(self.fobj)

    def test_writer_arg0_pathlib(self):
        self.writer_arg0(pathlib.Path(self.name))

    def test_writer_arg1_str(self):
        self.writer_arg1(self.name)
        assert self.read(self.name) == "".join(self.text)

    def test_writer_arg1_fobj(self):
        self.writer_arg1(self.fobj)
        assert not self.fobj.closed
        self.fobj.close()
        assert self.read(self.name) == "".join(self.text)

    def test_writer_arg2default_str(self):
        self.writer_arg2default(0, path=None)
        self.writer_arg2default(0, path=self.name)
        assert self.read(self.name) == "".join(self.text)

    def test_writer_arg2default_fobj(self):
        self.writer_arg2default(0, path=self.fobj)
        assert not self.fobj.closed
        self.fobj.close()
        assert self.read(self.name) == "".join(self.text)

    def test_writer_arg2default_fobj_path_none(self):
        self.writer_arg2default(0, path=None)

    def test_writer_arg4default_fobj(self):
        self.writer_arg4default(0, 1, dog="dog", other="other")
        self.writer_arg4default(0, 1, dog="dog", other="other", path=self.name)
        assert self.read(self.name) == "".join(self.text)

    def test_writer_kwarg_str(self):
        self.writer_kwarg(path=self.name)
        assert self.read(self.name) == "".join(self.text)

    def test_writer_kwarg_fobj(self):
        self.writer_kwarg(path=self.fobj)
        self.fobj.close()
        assert self.read(self.name) == "".join(self.text)

    def test_writer_kwarg_path_none(self):
        self.writer_kwarg(path=None)


@preserve_random_state
def test_preserve_random_state():
    try:
        import numpy.random

        r = numpy.random.random()
    except ImportError:
        return
    assert abs(r - 0.61879477158568) < 1e-16


class TestRandomState:
    @classmethod
    def setup_class(cls):
        global np
        np = pytest.importorskip("numpy")

    @random_state(1)
    def instantiate_random_state(self, random_state):
        assert isinstance(random_state, np.random.RandomState)
        return random_state.random_sample()

    @np_random_state(1)
    def instantiate_np_random_state(self, random_state):
        assert isinstance(random_state, np.random.RandomState)
        return random_state.random_sample()

    @py_random_state(1)
    def instantiate_py_random_state(self, random_state):
        assert isinstance(random_state, random.Random) or isinstance(
            random_state, PythonRandomInterface
        )
        return random_state.random()

    def test_random_state_None(self):
        np.random.seed(42)
        rv = np.random.random_sample()
        np.random.seed(42)
        assert rv == self.instantiate_random_state(None)
        np.random.seed(42)
        assert rv == self.instantiate_np_random_state(None)

        random.seed(42)
        rv = random.random()
        random.seed(42)
        assert rv == self.instantiate_py_random_state(None)

    def test_random_state_np_random(self):
        np.random.seed(42)
        rv = np.random.random_sample()
        np.random.seed(42)
        assert rv == self.instantiate_random_state(np.random)
        np.random.seed(42)
        assert rv == self.instantiate_np_random_state(np.random)
        np.random.seed(42)
        assert rv == self.instantiate_py_random_state(np.random)

    def test_random_state_int(self):
        np.random.seed(42)
        np_rv = np.random.random_sample()
        random.seed(42)
        py_rv = random.random()

        np.random.seed(42)
        seed = 1
        rval = self.instantiate_random_state(seed)
        rval_expected = np.random.RandomState(seed).rand()
        assert rval, rval_expected

        rval = self.instantiate_np_random_state(seed)
        rval_expected = np.random.RandomState(seed).rand()
        assert rval, rval_expected
        # test that global seed wasn't changed in function
        assert np_rv == np.random.random_sample()

        random.seed(42)
        rval = self.instantiate_py_random_state(seed)
        rval_expected = random.Random(seed).random()
        assert rval, rval_expected
        # test that global seed wasn't changed in function
        assert py_rv == random.random()

    def test_random_state_np_random_RandomState(self):
        np.random.seed(42)
        np_rv = np.random.random_sample()

        np.random.seed(42)
        seed = 1
        rng = np.random.RandomState(seed)
        rval = self.instantiate_random_state(rng)
        rval_expected = np.random.RandomState(seed).rand()
        assert rval, rval_expected

        rval = self.instantiate_np_random_state(seed)
        rval_expected = np.random.RandomState(seed).rand()
        assert rval, rval_expected

        rval = self.instantiate_py_random_state(seed)
        rval_expected = np.random.RandomState(seed).rand()
        assert rval, rval_expected
        # test that global seed wasn't changed in function
        assert np_rv == np.random.random_sample()

    def test_random_state_py_random(self):
        seed = 1
        rng = random.Random(seed)
        rv = self.instantiate_py_random_state(rng)
        assert rv, random.Random(seed).random()

        pytest.raises(ValueError, self.instantiate_random_state, rng)
        pytest.raises(ValueError, self.instantiate_np_random_state, rng)


def test_random_state_string_arg_index():
    with pytest.raises(nx.NetworkXError):

        @random_state("a")
        def make_random_state(rs):
            pass

        rstate = make_random_state(1)


def test_py_random_state_string_arg_index():
    with pytest.raises(nx.NetworkXError):

        @py_random_state("a")
        def make_random_state(rs):
            pass

        rstate = make_random_state(1)


def test_random_state_invalid_arg_index():
    with pytest.raises(nx.NetworkXError):

        @random_state(2)
        def make_random_state(rs):
            pass

        rstate = make_random_state(1)


def test_py_random_state_invalid_arg_index():
    with pytest.raises(nx.NetworkXError):

        @py_random_state(2)
        def make_random_state(rs):
            pass

        rstate = make_random_state(1)