view env/lib/python3.9/site-packages/networkx/readwrite/tests/test_shp.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line source

"""Unit tests for shp.
"""

import os
import tempfile
import pytest

ogr = pytest.importorskip("osgeo.ogr")

import networkx as nx


class TestShp:
    def setup_method(self):
        def createlayer(driver, layerType=ogr.wkbLineString):
            lyr = driver.CreateLayer("edges", None, layerType)
            namedef = ogr.FieldDefn("Name", ogr.OFTString)
            namedef.SetWidth(32)
            lyr.CreateField(namedef)
            return lyr

        drv = ogr.GetDriverByName("ESRI Shapefile")

        testdir = os.path.join(tempfile.gettempdir(), "shpdir")
        shppath = os.path.join(tempfile.gettempdir(), "tmpshp.shp")
        multi_shppath = os.path.join(tempfile.gettempdir(), "tmp_mshp.shp")

        self.deletetmp(drv, testdir, shppath, multi_shppath)
        os.mkdir(testdir)

        self.names = ["a", "b", "c", "c"]  # edgenames
        self.paths = (
            [(1.0, 1.0), (2.0, 2.0)],
            [(2.0, 2.0), (3.0, 3.0)],
            [(0.9, 0.9), (4.0, 0.9), (4.0, 2.0)],
        )

        self.simplified_names = ["a", "b", "c"]  # edgenames
        self.simplified_paths = (
            [(1.0, 1.0), (2.0, 2.0)],
            [(2.0, 2.0), (3.0, 3.0)],
            [(0.9, 0.9), (4.0, 2.0)],
        )

        self.multi_names = ["a", "a", "a", "a"]  # edgenames

        shp = drv.CreateDataSource(shppath)
        lyr = createlayer(shp)

        for path, name in zip(self.paths, self.names):
            feat = ogr.Feature(lyr.GetLayerDefn())
            g = ogr.Geometry(ogr.wkbLineString)
            for p in path:
                g.AddPoint_2D(*p)
            feat.SetGeometry(g)
            feat.SetField("Name", name)
            lyr.CreateFeature(feat)

        # create single record multiline shapefile for testing
        multi_shp = drv.CreateDataSource(multi_shppath)
        multi_lyr = createlayer(multi_shp, ogr.wkbMultiLineString)

        multi_g = ogr.Geometry(ogr.wkbMultiLineString)
        for path in self.paths:

            g = ogr.Geometry(ogr.wkbLineString)
            for p in path:
                g.AddPoint_2D(*p)

            multi_g.AddGeometry(g)

        multi_feat = ogr.Feature(multi_lyr.GetLayerDefn())
        multi_feat.SetGeometry(multi_g)
        multi_feat.SetField("Name", "a")
        multi_lyr.CreateFeature(multi_feat)

        self.shppath = shppath
        self.multi_shppath = multi_shppath
        self.testdir = testdir
        self.drv = drv

    def deletetmp(self, drv, *paths):
        for p in paths:
            if os.path.exists(p):
                drv.DeleteDataSource(p)

    def testload(self):
        def compare_graph_paths_names(g, paths, names):
            expected = nx.DiGraph()
            for p in paths:
                nx.add_path(expected, p)
            assert sorted(expected.nodes) == sorted(g.nodes)
            assert sorted(expected.edges()) == sorted(g.edges())
            g_names = [g.get_edge_data(s, e)["Name"] for s, e in g.edges()]
            assert names == sorted(g_names)

        # simplified
        G = nx.read_shp(self.shppath)
        compare_graph_paths_names(G, self.simplified_paths, self.simplified_names)

        # unsimplified
        G = nx.read_shp(self.shppath, simplify=False)
        compare_graph_paths_names(G, self.paths, self.names)

        # multiline unsimplified
        G = nx.read_shp(self.multi_shppath, simplify=False)
        compare_graph_paths_names(G, self.paths, self.multi_names)

    def checkgeom(self, lyr, expected):
        feature = lyr.GetNextFeature()
        actualwkt = []
        while feature:
            actualwkt.append(feature.GetGeometryRef().ExportToWkt())
            feature = lyr.GetNextFeature()
        assert sorted(expected) == sorted(actualwkt)

    def test_geometryexport(self):
        expectedpoints_simple = (
            "POINT (1 1)",
            "POINT (2 2)",
            "POINT (3 3)",
            "POINT (0.9 0.9)",
            "POINT (4 2)",
        )
        expectedlines_simple = (
            "LINESTRING (1 1,2 2)",
            "LINESTRING (2 2,3 3)",
            "LINESTRING (0.9 0.9,4.0 0.9,4 2)",
        )
        expectedpoints = (
            "POINT (1 1)",
            "POINT (2 2)",
            "POINT (3 3)",
            "POINT (0.9 0.9)",
            "POINT (4.0 0.9)",
            "POINT (4 2)",
        )
        expectedlines = (
            "LINESTRING (1 1,2 2)",
            "LINESTRING (2 2,3 3)",
            "LINESTRING (0.9 0.9,4.0 0.9)",
            "LINESTRING (4.0 0.9,4 2)",
        )

        tpath = os.path.join(tempfile.gettempdir(), "shpdir")
        G = nx.read_shp(self.shppath)
        nx.write_shp(G, tpath)
        shpdir = ogr.Open(tpath)
        self.checkgeom(shpdir.GetLayerByName("nodes"), expectedpoints_simple)
        self.checkgeom(shpdir.GetLayerByName("edges"), expectedlines_simple)

        # Test unsimplified
        # Nodes should have additional point,
        # edges should be 'flattened'
        G = nx.read_shp(self.shppath, simplify=False)
        nx.write_shp(G, tpath)
        shpdir = ogr.Open(tpath)
        self.checkgeom(shpdir.GetLayerByName("nodes"), expectedpoints)
        self.checkgeom(shpdir.GetLayerByName("edges"), expectedlines)

    def test_attributeexport(self):
        def testattributes(lyr, graph):
            feature = lyr.GetNextFeature()
            while feature:
                coords = []
                ref = feature.GetGeometryRef()
                last = ref.GetPointCount() - 1
                edge_nodes = (ref.GetPoint_2D(0), ref.GetPoint_2D(last))
                name = feature.GetFieldAsString("Name")
                assert graph.get_edge_data(*edge_nodes)["Name"] == name
                feature = lyr.GetNextFeature()

        tpath = os.path.join(tempfile.gettempdir(), "shpdir")

        G = nx.read_shp(self.shppath)
        nx.write_shp(G, tpath)
        shpdir = ogr.Open(tpath)
        edges = shpdir.GetLayerByName("edges")
        testattributes(edges, G)

    # Test export of node attributes in nx.write_shp (#2778)
    def test_nodeattributeexport(self):
        tpath = os.path.join(tempfile.gettempdir(), "shpdir")

        G = nx.DiGraph()
        A = (0, 0)
        B = (1, 1)
        C = (2, 2)
        G.add_edge(A, B)
        G.add_edge(A, C)
        label = "node_label"
        for n, d in G.nodes(data=True):
            d["label"] = label
        nx.write_shp(G, tpath)

        H = nx.read_shp(tpath)
        for n, d in H.nodes(data=True):
            assert d["label"] == label

    def test_wkt_export(self):
        G = nx.DiGraph()
        tpath = os.path.join(tempfile.gettempdir(), "shpdir")
        points = ("POINT (0.9 0.9)", "POINT (4 2)")
        line = ("LINESTRING (0.9 0.9,4 2)",)
        G.add_node(1, Wkt=points[0])
        G.add_node(2, Wkt=points[1])
        G.add_edge(1, 2, Wkt=line[0])
        try:
            nx.write_shp(G, tpath)
        except Exception as e:
            assert False, e
        shpdir = ogr.Open(tpath)
        self.checkgeom(shpdir.GetLayerByName("nodes"), points)
        self.checkgeom(shpdir.GetLayerByName("edges"), line)

    def teardown_method(self):
        self.deletetmp(self.drv, self.testdir, self.shppath)


def test_read_shp_nofile():
    with pytest.raises(RuntimeError):
        G = nx.read_shp("hopefully_this_file_will_not_be_available")


class TestMissingGeometry:
    def setup_method(self):
        self.setup_path()
        self.delete_shapedir()
        self.create_shapedir()

    def teardown_method(self):
        self.delete_shapedir()

    def setup_path(self):
        self.path = os.path.join(tempfile.gettempdir(), "missing_geometry")

    def create_shapedir(self):
        drv = ogr.GetDriverByName("ESRI Shapefile")
        shp = drv.CreateDataSource(self.path)
        lyr = shp.CreateLayer("nodes", None, ogr.wkbPoint)
        feature = ogr.Feature(lyr.GetLayerDefn())
        feature.SetGeometry(None)
        lyr.CreateFeature(feature)
        feature.Destroy()

    def delete_shapedir(self):
        drv = ogr.GetDriverByName("ESRI Shapefile")
        if os.path.exists(self.path):
            drv.DeleteDataSource(self.path)

    def test_missing_geometry(self):
        with pytest.raises(nx.NetworkXError):
            G = nx.read_shp(self.path)


class TestMissingAttrWrite:
    def setup_method(self):
        self.setup_path()
        self.delete_shapedir()

    def teardown_method(self):
        self.delete_shapedir()

    def setup_path(self):
        self.path = os.path.join(tempfile.gettempdir(), "missing_attributes")

    def delete_shapedir(self):
        drv = ogr.GetDriverByName("ESRI Shapefile")
        if os.path.exists(self.path):
            drv.DeleteDataSource(self.path)

    def test_missing_attributes(self):
        G = nx.DiGraph()
        A = (0, 0)
        B = (1, 1)
        C = (2, 2)
        G.add_edge(A, B, foo=100)
        G.add_edge(A, C)

        nx.write_shp(G, self.path)
        H = nx.read_shp(self.path)

        for u, v, d in H.edges(data=True):
            if u == A and v == B:
                assert d["foo"] == 100
            if u == A and v == C:
                assert d["foo"] is None