comparison env/lib/python3.9/site-packages/networkx/readwrite/tests/test_shp.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """Unit tests for shp.
2 """
3
4 import os
5 import tempfile
6 import pytest
7
8 ogr = pytest.importorskip("osgeo.ogr")
9
10 import networkx as nx
11
12
13 class TestShp:
14 def setup_method(self):
15 def createlayer(driver, layerType=ogr.wkbLineString):
16 lyr = driver.CreateLayer("edges", None, layerType)
17 namedef = ogr.FieldDefn("Name", ogr.OFTString)
18 namedef.SetWidth(32)
19 lyr.CreateField(namedef)
20 return lyr
21
22 drv = ogr.GetDriverByName("ESRI Shapefile")
23
24 testdir = os.path.join(tempfile.gettempdir(), "shpdir")
25 shppath = os.path.join(tempfile.gettempdir(), "tmpshp.shp")
26 multi_shppath = os.path.join(tempfile.gettempdir(), "tmp_mshp.shp")
27
28 self.deletetmp(drv, testdir, shppath, multi_shppath)
29 os.mkdir(testdir)
30
31 self.names = ["a", "b", "c", "c"] # edgenames
32 self.paths = (
33 [(1.0, 1.0), (2.0, 2.0)],
34 [(2.0, 2.0), (3.0, 3.0)],
35 [(0.9, 0.9), (4.0, 0.9), (4.0, 2.0)],
36 )
37
38 self.simplified_names = ["a", "b", "c"] # edgenames
39 self.simplified_paths = (
40 [(1.0, 1.0), (2.0, 2.0)],
41 [(2.0, 2.0), (3.0, 3.0)],
42 [(0.9, 0.9), (4.0, 2.0)],
43 )
44
45 self.multi_names = ["a", "a", "a", "a"] # edgenames
46
47 shp = drv.CreateDataSource(shppath)
48 lyr = createlayer(shp)
49
50 for path, name in zip(self.paths, self.names):
51 feat = ogr.Feature(lyr.GetLayerDefn())
52 g = ogr.Geometry(ogr.wkbLineString)
53 for p in path:
54 g.AddPoint_2D(*p)
55 feat.SetGeometry(g)
56 feat.SetField("Name", name)
57 lyr.CreateFeature(feat)
58
59 # create single record multiline shapefile for testing
60 multi_shp = drv.CreateDataSource(multi_shppath)
61 multi_lyr = createlayer(multi_shp, ogr.wkbMultiLineString)
62
63 multi_g = ogr.Geometry(ogr.wkbMultiLineString)
64 for path in self.paths:
65
66 g = ogr.Geometry(ogr.wkbLineString)
67 for p in path:
68 g.AddPoint_2D(*p)
69
70 multi_g.AddGeometry(g)
71
72 multi_feat = ogr.Feature(multi_lyr.GetLayerDefn())
73 multi_feat.SetGeometry(multi_g)
74 multi_feat.SetField("Name", "a")
75 multi_lyr.CreateFeature(multi_feat)
76
77 self.shppath = shppath
78 self.multi_shppath = multi_shppath
79 self.testdir = testdir
80 self.drv = drv
81
82 def deletetmp(self, drv, *paths):
83 for p in paths:
84 if os.path.exists(p):
85 drv.DeleteDataSource(p)
86
87 def testload(self):
88 def compare_graph_paths_names(g, paths, names):
89 expected = nx.DiGraph()
90 for p in paths:
91 nx.add_path(expected, p)
92 assert sorted(expected.nodes) == sorted(g.nodes)
93 assert sorted(expected.edges()) == sorted(g.edges())
94 g_names = [g.get_edge_data(s, e)["Name"] for s, e in g.edges()]
95 assert names == sorted(g_names)
96
97 # simplified
98 G = nx.read_shp(self.shppath)
99 compare_graph_paths_names(G, self.simplified_paths, self.simplified_names)
100
101 # unsimplified
102 G = nx.read_shp(self.shppath, simplify=False)
103 compare_graph_paths_names(G, self.paths, self.names)
104
105 # multiline unsimplified
106 G = nx.read_shp(self.multi_shppath, simplify=False)
107 compare_graph_paths_names(G, self.paths, self.multi_names)
108
109 def checkgeom(self, lyr, expected):
110 feature = lyr.GetNextFeature()
111 actualwkt = []
112 while feature:
113 actualwkt.append(feature.GetGeometryRef().ExportToWkt())
114 feature = lyr.GetNextFeature()
115 assert sorted(expected) == sorted(actualwkt)
116
117 def test_geometryexport(self):
118 expectedpoints_simple = (
119 "POINT (1 1)",
120 "POINT (2 2)",
121 "POINT (3 3)",
122 "POINT (0.9 0.9)",
123 "POINT (4 2)",
124 )
125 expectedlines_simple = (
126 "LINESTRING (1 1,2 2)",
127 "LINESTRING (2 2,3 3)",
128 "LINESTRING (0.9 0.9,4.0 0.9,4 2)",
129 )
130 expectedpoints = (
131 "POINT (1 1)",
132 "POINT (2 2)",
133 "POINT (3 3)",
134 "POINT (0.9 0.9)",
135 "POINT (4.0 0.9)",
136 "POINT (4 2)",
137 )
138 expectedlines = (
139 "LINESTRING (1 1,2 2)",
140 "LINESTRING (2 2,3 3)",
141 "LINESTRING (0.9 0.9,4.0 0.9)",
142 "LINESTRING (4.0 0.9,4 2)",
143 )
144
145 tpath = os.path.join(tempfile.gettempdir(), "shpdir")
146 G = nx.read_shp(self.shppath)
147 nx.write_shp(G, tpath)
148 shpdir = ogr.Open(tpath)
149 self.checkgeom(shpdir.GetLayerByName("nodes"), expectedpoints_simple)
150 self.checkgeom(shpdir.GetLayerByName("edges"), expectedlines_simple)
151
152 # Test unsimplified
153 # Nodes should have additional point,
154 # edges should be 'flattened'
155 G = nx.read_shp(self.shppath, simplify=False)
156 nx.write_shp(G, tpath)
157 shpdir = ogr.Open(tpath)
158 self.checkgeom(shpdir.GetLayerByName("nodes"), expectedpoints)
159 self.checkgeom(shpdir.GetLayerByName("edges"), expectedlines)
160
161 def test_attributeexport(self):
162 def testattributes(lyr, graph):
163 feature = lyr.GetNextFeature()
164 while feature:
165 coords = []
166 ref = feature.GetGeometryRef()
167 last = ref.GetPointCount() - 1
168 edge_nodes = (ref.GetPoint_2D(0), ref.GetPoint_2D(last))
169 name = feature.GetFieldAsString("Name")
170 assert graph.get_edge_data(*edge_nodes)["Name"] == name
171 feature = lyr.GetNextFeature()
172
173 tpath = os.path.join(tempfile.gettempdir(), "shpdir")
174
175 G = nx.read_shp(self.shppath)
176 nx.write_shp(G, tpath)
177 shpdir = ogr.Open(tpath)
178 edges = shpdir.GetLayerByName("edges")
179 testattributes(edges, G)
180
181 # Test export of node attributes in nx.write_shp (#2778)
182 def test_nodeattributeexport(self):
183 tpath = os.path.join(tempfile.gettempdir(), "shpdir")
184
185 G = nx.DiGraph()
186 A = (0, 0)
187 B = (1, 1)
188 C = (2, 2)
189 G.add_edge(A, B)
190 G.add_edge(A, C)
191 label = "node_label"
192 for n, d in G.nodes(data=True):
193 d["label"] = label
194 nx.write_shp(G, tpath)
195
196 H = nx.read_shp(tpath)
197 for n, d in H.nodes(data=True):
198 assert d["label"] == label
199
200 def test_wkt_export(self):
201 G = nx.DiGraph()
202 tpath = os.path.join(tempfile.gettempdir(), "shpdir")
203 points = ("POINT (0.9 0.9)", "POINT (4 2)")
204 line = ("LINESTRING (0.9 0.9,4 2)",)
205 G.add_node(1, Wkt=points[0])
206 G.add_node(2, Wkt=points[1])
207 G.add_edge(1, 2, Wkt=line[0])
208 try:
209 nx.write_shp(G, tpath)
210 except Exception as e:
211 assert False, e
212 shpdir = ogr.Open(tpath)
213 self.checkgeom(shpdir.GetLayerByName("nodes"), points)
214 self.checkgeom(shpdir.GetLayerByName("edges"), line)
215
216 def teardown_method(self):
217 self.deletetmp(self.drv, self.testdir, self.shppath)
218
219
220 def test_read_shp_nofile():
221 with pytest.raises(RuntimeError):
222 G = nx.read_shp("hopefully_this_file_will_not_be_available")
223
224
225 class TestMissingGeometry:
226 def setup_method(self):
227 self.setup_path()
228 self.delete_shapedir()
229 self.create_shapedir()
230
231 def teardown_method(self):
232 self.delete_shapedir()
233
234 def setup_path(self):
235 self.path = os.path.join(tempfile.gettempdir(), "missing_geometry")
236
237 def create_shapedir(self):
238 drv = ogr.GetDriverByName("ESRI Shapefile")
239 shp = drv.CreateDataSource(self.path)
240 lyr = shp.CreateLayer("nodes", None, ogr.wkbPoint)
241 feature = ogr.Feature(lyr.GetLayerDefn())
242 feature.SetGeometry(None)
243 lyr.CreateFeature(feature)
244 feature.Destroy()
245
246 def delete_shapedir(self):
247 drv = ogr.GetDriverByName("ESRI Shapefile")
248 if os.path.exists(self.path):
249 drv.DeleteDataSource(self.path)
250
251 def test_missing_geometry(self):
252 with pytest.raises(nx.NetworkXError):
253 G = nx.read_shp(self.path)
254
255
256 class TestMissingAttrWrite:
257 def setup_method(self):
258 self.setup_path()
259 self.delete_shapedir()
260
261 def teardown_method(self):
262 self.delete_shapedir()
263
264 def setup_path(self):
265 self.path = os.path.join(tempfile.gettempdir(), "missing_attributes")
266
267 def delete_shapedir(self):
268 drv = ogr.GetDriverByName("ESRI Shapefile")
269 if os.path.exists(self.path):
270 drv.DeleteDataSource(self.path)
271
272 def test_missing_attributes(self):
273 G = nx.DiGraph()
274 A = (0, 0)
275 B = (1, 1)
276 C = (2, 2)
277 G.add_edge(A, B, foo=100)
278 G.add_edge(A, C)
279
280 nx.write_shp(G, self.path)
281 H = nx.read_shp(self.path)
282
283 for u, v, d in H.edges(data=True):
284 if u == A and v == B:
285 assert d["foo"] == 100
286 if u == A and v == C:
287 assert d["foo"] is None