Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/networkx/readwrite/tests/test_shp.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 """Unit tests for shp. | |
2 """ | |
3 | |
4 import os | |
5 import tempfile | |
6 import pytest | |
7 | |
8 ogr = pytest.importorskip("osgeo.ogr") | |
9 | |
10 import networkx as nx | |
11 | |
12 | |
13 class TestShp: | |
14 def setup_method(self): | |
15 def createlayer(driver, layerType=ogr.wkbLineString): | |
16 lyr = driver.CreateLayer("edges", None, layerType) | |
17 namedef = ogr.FieldDefn("Name", ogr.OFTString) | |
18 namedef.SetWidth(32) | |
19 lyr.CreateField(namedef) | |
20 return lyr | |
21 | |
22 drv = ogr.GetDriverByName("ESRI Shapefile") | |
23 | |
24 testdir = os.path.join(tempfile.gettempdir(), "shpdir") | |
25 shppath = os.path.join(tempfile.gettempdir(), "tmpshp.shp") | |
26 multi_shppath = os.path.join(tempfile.gettempdir(), "tmp_mshp.shp") | |
27 | |
28 self.deletetmp(drv, testdir, shppath, multi_shppath) | |
29 os.mkdir(testdir) | |
30 | |
31 self.names = ["a", "b", "c", "c"] # edgenames | |
32 self.paths = ( | |
33 [(1.0, 1.0), (2.0, 2.0)], | |
34 [(2.0, 2.0), (3.0, 3.0)], | |
35 [(0.9, 0.9), (4.0, 0.9), (4.0, 2.0)], | |
36 ) | |
37 | |
38 self.simplified_names = ["a", "b", "c"] # edgenames | |
39 self.simplified_paths = ( | |
40 [(1.0, 1.0), (2.0, 2.0)], | |
41 [(2.0, 2.0), (3.0, 3.0)], | |
42 [(0.9, 0.9), (4.0, 2.0)], | |
43 ) | |
44 | |
45 self.multi_names = ["a", "a", "a", "a"] # edgenames | |
46 | |
47 shp = drv.CreateDataSource(shppath) | |
48 lyr = createlayer(shp) | |
49 | |
50 for path, name in zip(self.paths, self.names): | |
51 feat = ogr.Feature(lyr.GetLayerDefn()) | |
52 g = ogr.Geometry(ogr.wkbLineString) | |
53 for p in path: | |
54 g.AddPoint_2D(*p) | |
55 feat.SetGeometry(g) | |
56 feat.SetField("Name", name) | |
57 lyr.CreateFeature(feat) | |
58 | |
59 # create single record multiline shapefile for testing | |
60 multi_shp = drv.CreateDataSource(multi_shppath) | |
61 multi_lyr = createlayer(multi_shp, ogr.wkbMultiLineString) | |
62 | |
63 multi_g = ogr.Geometry(ogr.wkbMultiLineString) | |
64 for path in self.paths: | |
65 | |
66 g = ogr.Geometry(ogr.wkbLineString) | |
67 for p in path: | |
68 g.AddPoint_2D(*p) | |
69 | |
70 multi_g.AddGeometry(g) | |
71 | |
72 multi_feat = ogr.Feature(multi_lyr.GetLayerDefn()) | |
73 multi_feat.SetGeometry(multi_g) | |
74 multi_feat.SetField("Name", "a") | |
75 multi_lyr.CreateFeature(multi_feat) | |
76 | |
77 self.shppath = shppath | |
78 self.multi_shppath = multi_shppath | |
79 self.testdir = testdir | |
80 self.drv = drv | |
81 | |
82 def deletetmp(self, drv, *paths): | |
83 for p in paths: | |
84 if os.path.exists(p): | |
85 drv.DeleteDataSource(p) | |
86 | |
87 def testload(self): | |
88 def compare_graph_paths_names(g, paths, names): | |
89 expected = nx.DiGraph() | |
90 for p in paths: | |
91 nx.add_path(expected, p) | |
92 assert sorted(expected.nodes) == sorted(g.nodes) | |
93 assert sorted(expected.edges()) == sorted(g.edges()) | |
94 g_names = [g.get_edge_data(s, e)["Name"] for s, e in g.edges()] | |
95 assert names == sorted(g_names) | |
96 | |
97 # simplified | |
98 G = nx.read_shp(self.shppath) | |
99 compare_graph_paths_names(G, self.simplified_paths, self.simplified_names) | |
100 | |
101 # unsimplified | |
102 G = nx.read_shp(self.shppath, simplify=False) | |
103 compare_graph_paths_names(G, self.paths, self.names) | |
104 | |
105 # multiline unsimplified | |
106 G = nx.read_shp(self.multi_shppath, simplify=False) | |
107 compare_graph_paths_names(G, self.paths, self.multi_names) | |
108 | |
109 def checkgeom(self, lyr, expected): | |
110 feature = lyr.GetNextFeature() | |
111 actualwkt = [] | |
112 while feature: | |
113 actualwkt.append(feature.GetGeometryRef().ExportToWkt()) | |
114 feature = lyr.GetNextFeature() | |
115 assert sorted(expected) == sorted(actualwkt) | |
116 | |
117 def test_geometryexport(self): | |
118 expectedpoints_simple = ( | |
119 "POINT (1 1)", | |
120 "POINT (2 2)", | |
121 "POINT (3 3)", | |
122 "POINT (0.9 0.9)", | |
123 "POINT (4 2)", | |
124 ) | |
125 expectedlines_simple = ( | |
126 "LINESTRING (1 1,2 2)", | |
127 "LINESTRING (2 2,3 3)", | |
128 "LINESTRING (0.9 0.9,4.0 0.9,4 2)", | |
129 ) | |
130 expectedpoints = ( | |
131 "POINT (1 1)", | |
132 "POINT (2 2)", | |
133 "POINT (3 3)", | |
134 "POINT (0.9 0.9)", | |
135 "POINT (4.0 0.9)", | |
136 "POINT (4 2)", | |
137 ) | |
138 expectedlines = ( | |
139 "LINESTRING (1 1,2 2)", | |
140 "LINESTRING (2 2,3 3)", | |
141 "LINESTRING (0.9 0.9,4.0 0.9)", | |
142 "LINESTRING (4.0 0.9,4 2)", | |
143 ) | |
144 | |
145 tpath = os.path.join(tempfile.gettempdir(), "shpdir") | |
146 G = nx.read_shp(self.shppath) | |
147 nx.write_shp(G, tpath) | |
148 shpdir = ogr.Open(tpath) | |
149 self.checkgeom(shpdir.GetLayerByName("nodes"), expectedpoints_simple) | |
150 self.checkgeom(shpdir.GetLayerByName("edges"), expectedlines_simple) | |
151 | |
152 # Test unsimplified | |
153 # Nodes should have additional point, | |
154 # edges should be 'flattened' | |
155 G = nx.read_shp(self.shppath, simplify=False) | |
156 nx.write_shp(G, tpath) | |
157 shpdir = ogr.Open(tpath) | |
158 self.checkgeom(shpdir.GetLayerByName("nodes"), expectedpoints) | |
159 self.checkgeom(shpdir.GetLayerByName("edges"), expectedlines) | |
160 | |
161 def test_attributeexport(self): | |
162 def testattributes(lyr, graph): | |
163 feature = lyr.GetNextFeature() | |
164 while feature: | |
165 coords = [] | |
166 ref = feature.GetGeometryRef() | |
167 last = ref.GetPointCount() - 1 | |
168 edge_nodes = (ref.GetPoint_2D(0), ref.GetPoint_2D(last)) | |
169 name = feature.GetFieldAsString("Name") | |
170 assert graph.get_edge_data(*edge_nodes)["Name"] == name | |
171 feature = lyr.GetNextFeature() | |
172 | |
173 tpath = os.path.join(tempfile.gettempdir(), "shpdir") | |
174 | |
175 G = nx.read_shp(self.shppath) | |
176 nx.write_shp(G, tpath) | |
177 shpdir = ogr.Open(tpath) | |
178 edges = shpdir.GetLayerByName("edges") | |
179 testattributes(edges, G) | |
180 | |
181 # Test export of node attributes in nx.write_shp (#2778) | |
182 def test_nodeattributeexport(self): | |
183 tpath = os.path.join(tempfile.gettempdir(), "shpdir") | |
184 | |
185 G = nx.DiGraph() | |
186 A = (0, 0) | |
187 B = (1, 1) | |
188 C = (2, 2) | |
189 G.add_edge(A, B) | |
190 G.add_edge(A, C) | |
191 label = "node_label" | |
192 for n, d in G.nodes(data=True): | |
193 d["label"] = label | |
194 nx.write_shp(G, tpath) | |
195 | |
196 H = nx.read_shp(tpath) | |
197 for n, d in H.nodes(data=True): | |
198 assert d["label"] == label | |
199 | |
200 def test_wkt_export(self): | |
201 G = nx.DiGraph() | |
202 tpath = os.path.join(tempfile.gettempdir(), "shpdir") | |
203 points = ("POINT (0.9 0.9)", "POINT (4 2)") | |
204 line = ("LINESTRING (0.9 0.9,4 2)",) | |
205 G.add_node(1, Wkt=points[0]) | |
206 G.add_node(2, Wkt=points[1]) | |
207 G.add_edge(1, 2, Wkt=line[0]) | |
208 try: | |
209 nx.write_shp(G, tpath) | |
210 except Exception as e: | |
211 assert False, e | |
212 shpdir = ogr.Open(tpath) | |
213 self.checkgeom(shpdir.GetLayerByName("nodes"), points) | |
214 self.checkgeom(shpdir.GetLayerByName("edges"), line) | |
215 | |
216 def teardown_method(self): | |
217 self.deletetmp(self.drv, self.testdir, self.shppath) | |
218 | |
219 | |
220 def test_read_shp_nofile(): | |
221 with pytest.raises(RuntimeError): | |
222 G = nx.read_shp("hopefully_this_file_will_not_be_available") | |
223 | |
224 | |
225 class TestMissingGeometry: | |
226 def setup_method(self): | |
227 self.setup_path() | |
228 self.delete_shapedir() | |
229 self.create_shapedir() | |
230 | |
231 def teardown_method(self): | |
232 self.delete_shapedir() | |
233 | |
234 def setup_path(self): | |
235 self.path = os.path.join(tempfile.gettempdir(), "missing_geometry") | |
236 | |
237 def create_shapedir(self): | |
238 drv = ogr.GetDriverByName("ESRI Shapefile") | |
239 shp = drv.CreateDataSource(self.path) | |
240 lyr = shp.CreateLayer("nodes", None, ogr.wkbPoint) | |
241 feature = ogr.Feature(lyr.GetLayerDefn()) | |
242 feature.SetGeometry(None) | |
243 lyr.CreateFeature(feature) | |
244 feature.Destroy() | |
245 | |
246 def delete_shapedir(self): | |
247 drv = ogr.GetDriverByName("ESRI Shapefile") | |
248 if os.path.exists(self.path): | |
249 drv.DeleteDataSource(self.path) | |
250 | |
251 def test_missing_geometry(self): | |
252 with pytest.raises(nx.NetworkXError): | |
253 G = nx.read_shp(self.path) | |
254 | |
255 | |
256 class TestMissingAttrWrite: | |
257 def setup_method(self): | |
258 self.setup_path() | |
259 self.delete_shapedir() | |
260 | |
261 def teardown_method(self): | |
262 self.delete_shapedir() | |
263 | |
264 def setup_path(self): | |
265 self.path = os.path.join(tempfile.gettempdir(), "missing_attributes") | |
266 | |
267 def delete_shapedir(self): | |
268 drv = ogr.GetDriverByName("ESRI Shapefile") | |
269 if os.path.exists(self.path): | |
270 drv.DeleteDataSource(self.path) | |
271 | |
272 def test_missing_attributes(self): | |
273 G = nx.DiGraph() | |
274 A = (0, 0) | |
275 B = (1, 1) | |
276 C = (2, 2) | |
277 G.add_edge(A, B, foo=100) | |
278 G.add_edge(A, C) | |
279 | |
280 nx.write_shp(G, self.path) | |
281 H = nx.read_shp(self.path) | |
282 | |
283 for u, v, d in H.edges(data=True): | |
284 if u == A and v == B: | |
285 assert d["foo"] == 100 | |
286 if u == A and v == C: | |
287 assert d["foo"] is None |