diff env/lib/python3.9/site-packages/networkx/readwrite/tests/test_shp.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/networkx/readwrite/tests/test_shp.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,287 @@
+"""Unit tests for shp.
+"""
+
+import os
+import tempfile
+import pytest
+
+ogr = pytest.importorskip("osgeo.ogr")
+
+import networkx as nx
+
+
+class TestShp:
+    def setup_method(self):
+        def createlayer(driver, layerType=ogr.wkbLineString):
+            lyr = driver.CreateLayer("edges", None, layerType)
+            namedef = ogr.FieldDefn("Name", ogr.OFTString)
+            namedef.SetWidth(32)
+            lyr.CreateField(namedef)
+            return lyr
+
+        drv = ogr.GetDriverByName("ESRI Shapefile")
+
+        testdir = os.path.join(tempfile.gettempdir(), "shpdir")
+        shppath = os.path.join(tempfile.gettempdir(), "tmpshp.shp")
+        multi_shppath = os.path.join(tempfile.gettempdir(), "tmp_mshp.shp")
+
+        self.deletetmp(drv, testdir, shppath, multi_shppath)
+        os.mkdir(testdir)
+
+        self.names = ["a", "b", "c", "c"]  # edgenames
+        self.paths = (
+            [(1.0, 1.0), (2.0, 2.0)],
+            [(2.0, 2.0), (3.0, 3.0)],
+            [(0.9, 0.9), (4.0, 0.9), (4.0, 2.0)],
+        )
+
+        self.simplified_names = ["a", "b", "c"]  # edgenames
+        self.simplified_paths = (
+            [(1.0, 1.0), (2.0, 2.0)],
+            [(2.0, 2.0), (3.0, 3.0)],
+            [(0.9, 0.9), (4.0, 2.0)],
+        )
+
+        self.multi_names = ["a", "a", "a", "a"]  # edgenames
+
+        shp = drv.CreateDataSource(shppath)
+        lyr = createlayer(shp)
+
+        for path, name in zip(self.paths, self.names):
+            feat = ogr.Feature(lyr.GetLayerDefn())
+            g = ogr.Geometry(ogr.wkbLineString)
+            for p in path:
+                g.AddPoint_2D(*p)
+            feat.SetGeometry(g)
+            feat.SetField("Name", name)
+            lyr.CreateFeature(feat)
+
+        # create single record multiline shapefile for testing
+        multi_shp = drv.CreateDataSource(multi_shppath)
+        multi_lyr = createlayer(multi_shp, ogr.wkbMultiLineString)
+
+        multi_g = ogr.Geometry(ogr.wkbMultiLineString)
+        for path in self.paths:
+
+            g = ogr.Geometry(ogr.wkbLineString)
+            for p in path:
+                g.AddPoint_2D(*p)
+
+            multi_g.AddGeometry(g)
+
+        multi_feat = ogr.Feature(multi_lyr.GetLayerDefn())
+        multi_feat.SetGeometry(multi_g)
+        multi_feat.SetField("Name", "a")
+        multi_lyr.CreateFeature(multi_feat)
+
+        self.shppath = shppath
+        self.multi_shppath = multi_shppath
+        self.testdir = testdir
+        self.drv = drv
+
+    def deletetmp(self, drv, *paths):
+        for p in paths:
+            if os.path.exists(p):
+                drv.DeleteDataSource(p)
+
+    def testload(self):
+        def compare_graph_paths_names(g, paths, names):
+            expected = nx.DiGraph()
+            for p in paths:
+                nx.add_path(expected, p)
+            assert sorted(expected.nodes) == sorted(g.nodes)
+            assert sorted(expected.edges()) == sorted(g.edges())
+            g_names = [g.get_edge_data(s, e)["Name"] for s, e in g.edges()]
+            assert names == sorted(g_names)
+
+        # simplified
+        G = nx.read_shp(self.shppath)
+        compare_graph_paths_names(G, self.simplified_paths, self.simplified_names)
+
+        # unsimplified
+        G = nx.read_shp(self.shppath, simplify=False)
+        compare_graph_paths_names(G, self.paths, self.names)
+
+        # multiline unsimplified
+        G = nx.read_shp(self.multi_shppath, simplify=False)
+        compare_graph_paths_names(G, self.paths, self.multi_names)
+
+    def checkgeom(self, lyr, expected):
+        feature = lyr.GetNextFeature()
+        actualwkt = []
+        while feature:
+            actualwkt.append(feature.GetGeometryRef().ExportToWkt())
+            feature = lyr.GetNextFeature()
+        assert sorted(expected) == sorted(actualwkt)
+
+    def test_geometryexport(self):
+        expectedpoints_simple = (
+            "POINT (1 1)",
+            "POINT (2 2)",
+            "POINT (3 3)",
+            "POINT (0.9 0.9)",
+            "POINT (4 2)",
+        )
+        expectedlines_simple = (
+            "LINESTRING (1 1,2 2)",
+            "LINESTRING (2 2,3 3)",
+            "LINESTRING (0.9 0.9,4.0 0.9,4 2)",
+        )
+        expectedpoints = (
+            "POINT (1 1)",
+            "POINT (2 2)",
+            "POINT (3 3)",
+            "POINT (0.9 0.9)",
+            "POINT (4.0 0.9)",
+            "POINT (4 2)",
+        )
+        expectedlines = (
+            "LINESTRING (1 1,2 2)",
+            "LINESTRING (2 2,3 3)",
+            "LINESTRING (0.9 0.9,4.0 0.9)",
+            "LINESTRING (4.0 0.9,4 2)",
+        )
+
+        tpath = os.path.join(tempfile.gettempdir(), "shpdir")
+        G = nx.read_shp(self.shppath)
+        nx.write_shp(G, tpath)
+        shpdir = ogr.Open(tpath)
+        self.checkgeom(shpdir.GetLayerByName("nodes"), expectedpoints_simple)
+        self.checkgeom(shpdir.GetLayerByName("edges"), expectedlines_simple)
+
+        # Test unsimplified
+        # Nodes should have additional point,
+        # edges should be 'flattened'
+        G = nx.read_shp(self.shppath, simplify=False)
+        nx.write_shp(G, tpath)
+        shpdir = ogr.Open(tpath)
+        self.checkgeom(shpdir.GetLayerByName("nodes"), expectedpoints)
+        self.checkgeom(shpdir.GetLayerByName("edges"), expectedlines)
+
+    def test_attributeexport(self):
+        def testattributes(lyr, graph):
+            feature = lyr.GetNextFeature()
+            while feature:
+                coords = []
+                ref = feature.GetGeometryRef()
+                last = ref.GetPointCount() - 1
+                edge_nodes = (ref.GetPoint_2D(0), ref.GetPoint_2D(last))
+                name = feature.GetFieldAsString("Name")
+                assert graph.get_edge_data(*edge_nodes)["Name"] == name
+                feature = lyr.GetNextFeature()
+
+        tpath = os.path.join(tempfile.gettempdir(), "shpdir")
+
+        G = nx.read_shp(self.shppath)
+        nx.write_shp(G, tpath)
+        shpdir = ogr.Open(tpath)
+        edges = shpdir.GetLayerByName("edges")
+        testattributes(edges, G)
+
+    # Test export of node attributes in nx.write_shp (#2778)
+    def test_nodeattributeexport(self):
+        tpath = os.path.join(tempfile.gettempdir(), "shpdir")
+
+        G = nx.DiGraph()
+        A = (0, 0)
+        B = (1, 1)
+        C = (2, 2)
+        G.add_edge(A, B)
+        G.add_edge(A, C)
+        label = "node_label"
+        for n, d in G.nodes(data=True):
+            d["label"] = label
+        nx.write_shp(G, tpath)
+
+        H = nx.read_shp(tpath)
+        for n, d in H.nodes(data=True):
+            assert d["label"] == label
+
+    def test_wkt_export(self):
+        G = nx.DiGraph()
+        tpath = os.path.join(tempfile.gettempdir(), "shpdir")
+        points = ("POINT (0.9 0.9)", "POINT (4 2)")
+        line = ("LINESTRING (0.9 0.9,4 2)",)
+        G.add_node(1, Wkt=points[0])
+        G.add_node(2, Wkt=points[1])
+        G.add_edge(1, 2, Wkt=line[0])
+        try:
+            nx.write_shp(G, tpath)
+        except Exception as e:
+            assert False, e
+        shpdir = ogr.Open(tpath)
+        self.checkgeom(shpdir.GetLayerByName("nodes"), points)
+        self.checkgeom(shpdir.GetLayerByName("edges"), line)
+
+    def teardown_method(self):
+        self.deletetmp(self.drv, self.testdir, self.shppath)
+
+
+def test_read_shp_nofile():
+    with pytest.raises(RuntimeError):
+        G = nx.read_shp("hopefully_this_file_will_not_be_available")
+
+
+class TestMissingGeometry:
+    def setup_method(self):
+        self.setup_path()
+        self.delete_shapedir()
+        self.create_shapedir()
+
+    def teardown_method(self):
+        self.delete_shapedir()
+
+    def setup_path(self):
+        self.path = os.path.join(tempfile.gettempdir(), "missing_geometry")
+
+    def create_shapedir(self):
+        drv = ogr.GetDriverByName("ESRI Shapefile")
+        shp = drv.CreateDataSource(self.path)
+        lyr = shp.CreateLayer("nodes", None, ogr.wkbPoint)
+        feature = ogr.Feature(lyr.GetLayerDefn())
+        feature.SetGeometry(None)
+        lyr.CreateFeature(feature)
+        feature.Destroy()
+
+    def delete_shapedir(self):
+        drv = ogr.GetDriverByName("ESRI Shapefile")
+        if os.path.exists(self.path):
+            drv.DeleteDataSource(self.path)
+
+    def test_missing_geometry(self):
+        with pytest.raises(nx.NetworkXError):
+            G = nx.read_shp(self.path)
+
+
+class TestMissingAttrWrite:
+    def setup_method(self):
+        self.setup_path()
+        self.delete_shapedir()
+
+    def teardown_method(self):
+        self.delete_shapedir()
+
+    def setup_path(self):
+        self.path = os.path.join(tempfile.gettempdir(), "missing_attributes")
+
+    def delete_shapedir(self):
+        drv = ogr.GetDriverByName("ESRI Shapefile")
+        if os.path.exists(self.path):
+            drv.DeleteDataSource(self.path)
+
+    def test_missing_attributes(self):
+        G = nx.DiGraph()
+        A = (0, 0)
+        B = (1, 1)
+        C = (2, 2)
+        G.add_edge(A, B, foo=100)
+        G.add_edge(A, C)
+
+        nx.write_shp(G, self.path)
+        H = nx.read_shp(self.path)
+
+        for u, v, d in H.edges(data=True):
+            if u == A and v == B:
+                assert d["foo"] == 100
+            if u == A and v == C:
+                assert d["foo"] is None