comparison larch_select_paths.py @ 0:2e827836f0ad draft

planemo upload for repository https://github.com/MaterialsGalaxy/larch-tools/tree/main/larch_select_paths commit 5be486890442dedfb327289d597e1c8110240735
author muon-spectroscopy-computational-project
date Tue, 14 Nov 2023 15:35:52 +0000 (16 months ago)
parents
children 7fdca938d90c
comparison
equal deleted inserted replaced
-1:000000000000 0:2e827836f0ad
1 import csv
2 import json
3 import os
4 import re
5 import sys
6 from zipfile import ZIP_DEFLATED, ZipFile
7
8
9 class GDSWriter:
10 def __init__(self, default_variables: "dict[str, dict]"):
11 self.default_properties = {
12 "s02": {"name": "s02"},
13 "e0": {"name": "e0"},
14 "deltar": {"name": "alpha*reff"},
15 "sigma2": {"name": "sigma2"},
16 }
17 self.rows = [
18 f"{'id':>4s}, {'name':>24s}, {'value':>5s}, {'expr':>4s}, "
19 f"{'vary':>4s}\n"
20 ]
21 self.names = set()
22
23 for property in self.default_properties:
24 name = self.default_properties[property]["name"]
25 value = default_variables[property]["value"]
26 vary = default_variables[property]["vary"]
27 is_common = default_variables[property]["is_common"]
28
29 self.default_properties[property]["value"] = value
30 self.default_properties[property]["vary"] = vary
31 self.default_properties[property]["is_common"] = is_common
32
33 if is_common:
34 self.append_gds(name=name, value=value, vary=vary)
35
36 def append_gds(
37 self,
38 name: str,
39 value: float = 0.,
40 expr: str = None,
41 vary: bool = True,
42 label: str = "",
43 ):
44 """Append a single GDS variable to the list of rows, later to be
45 written to file.
46
47 Args:
48 name (str): Name of the GDS variable.
49 value (float, optional): Starting value for variable.
50 Defaults to 0.
51 expr (str, optional): Expression for setting the variable.
52 Defaults to None.
53 vary (bool, optional): Whether the variable is optimised during the
54 fit. Defaults to True.
55 label (str, optional): Label to keep variables for different FEFF
56 directories distinct. Defaults to "".
57 """
58 formatted_name = name if (label is None) else label + name
59 formatted_name = formatted_name.replace("*reff", "")
60 if not expr:
61 expr = " "
62
63 if formatted_name in self.names:
64 raise ValueError(f"{formatted_name} already used as variable name")
65 self.names.add(formatted_name)
66
67 self.rows.append(
68 f"{len(self.rows):4d}, {formatted_name:>24s}, {str(value):>5s}, "
69 f"{expr:>4s}, {str(vary):>4s}\n"
70 )
71
72 def parse_gds(
73 self,
74 property_name: str,
75 variable_name: str = None,
76 path_variable: dict = None,
77 directory_label: str = None,
78 path_label: str = None,
79 ) -> str:
80 """Parse and append a row defining a GDS variable for a particular
81 path.
82
83 Args:
84 property_name (str): The property to which the variable
85 corresponds. Should be a key in `self.default_properties`.
86 variable_name (str, optional): Custom name for this variable.
87 Defaults to None.
88 path_variable (dict, optional): Dictionary defining the GDS
89 settings for this path's variable. Defaults to None.
90 directory_label (str, optional): Label to indicate paths from a
91 separate directory. Defaults to None.
92 path_label (str, optional): Label indicating the atoms involved in
93 this path. Defaults to None.
94
95 Returns:
96 str: Either `variable_name`, the name used as a default globally
97 for this `property_name`, or an automatically generated unique
98 name.
99 """
100 if variable_name:
101 self.append_gds(
102 name=variable_name,
103 value=path_variable["value"],
104 expr=path_variable["expr"],
105 vary=path_variable["vary"],
106 )
107 return variable_name
108 elif self.default_properties[property_name]["is_common"]:
109 return self.default_properties[property_name]["name"]
110 else:
111 auto_name = self.default_properties[property_name]["name"]
112 if directory_label:
113 auto_name += f"_{directory_label}"
114 if path_label:
115 auto_name += f"_{path_label.lower().replace('.', '')}"
116
117 self.append_gds(
118 name=auto_name,
119 value=self.default_properties[property_name]["value"],
120 vary=self.default_properties[property_name]["vary"],
121 )
122 return auto_name
123
124 def write(self):
125 """Write GDS rows to file.
126 """
127 with open("gds.csv", "w") as out:
128 out.writelines(self.rows)
129
130
131 class PathsWriter:
132 def __init__(self, default_variables: "dict[str, dict]"):
133 self.rows = [
134 f"{'id':>4s}, {'filename':>24s}, {'label':>24s}, {'s02':>3s}, "
135 f"{'e0':>4s}, {'sigma2':>24s}, {'deltar':>10s}\n"
136 ]
137 self.gds_writer = GDSWriter(default_variables=default_variables)
138
139 def parse_feff_output(
140 self,
141 paths_file: str,
142 selection: "dict[str, str|list]",
143 directory_label: str = "",
144 ):
145 """Parse selected paths from CSV summary and define GDS variables.
146
147 Args:
148 paths_file (str): CSV summary filename.
149 selection (dict[str, str|list]): Dictionary indicating which paths
150 to select, and how to define their variables.
151 directory_label (str, optional): Label to indicate paths from a
152 separate directory. Defaults to "".
153 """
154 paths = selection["paths"]
155 path_values_ids = [path_value["id"] for path_value in paths]
156
157 with open(paths_file) as file:
158 reader = csv.reader(file)
159 for row in reader:
160 id_match = re.search(r"\d+", row[0])
161 if id_match:
162 path_id = int(id_match.group())
163 filename = row[0].strip()
164 path_label = row[-2].strip()
165 variables = {}
166
167 if path_id in path_values_ids:
168 path_value = paths[path_values_ids.index(path_id)]
169 for property in self.gds_writer.default_properties:
170 variables[property] = self.gds_writer.parse_gds(
171 property_name=property,
172 variable_name=path_value[property]["name"],
173 path_variable=path_value[property],
174 directory_label=directory_label,
175 path_label=path_label,
176 )
177 self.parse_selected_path(
178 filename=filename,
179 path_label=path_label,
180 directory_label=directory_label,
181 **variables,
182 )
183 elif selection["selection"] == "all" or int(row[-1]):
184 path_value = None
185 for property in self.gds_writer.default_properties:
186 variables[property] = self.gds_writer.parse_gds(
187 property_name=property,
188 directory_label=directory_label,
189 path_label=path_label,
190 )
191 self.parse_selected_path(
192 filename=filename,
193 path_label=path_label,
194 directory_label=directory_label,
195 **variables,
196 )
197
198 def parse_selected_path(
199 self,
200 filename: str,
201 path_label: str,
202 directory_label: str = "",
203 s02: str = "s02",
204 e0: str = "e0",
205 sigma2: str = "sigma2",
206 deltar: str = "alpha*reff",
207 ):
208 """Format and append row representing a selected FEFF path.
209
210 Args:
211 filename (str): Name of the underlying FEFF path file, without
212 parent directory.
213 path_label (str): Label indicating the atoms involved in this path.
214 directory_label (str, optional): Label to indicate paths from a
215 separate directory. Defaults to "".
216 s02 (str, optional): Electron screening factor variable name.
217 Defaults to "s02".
218 e0 (str, optional): Energy shift variable name. Defaults to "e0".
219 sigma2 (str, optional): Mean squared displacement variable name.
220 Defaults to "sigma2".
221 deltar (str, optional): Change in path length variable.
222 Defaults to "alpha*reff".
223 """
224 if directory_label:
225 filename = os.path.join(directory_label, filename)
226 label = f"{directory_label}.{path_label}"
227 else:
228 filename = os.path.join("feff", filename)
229 label = path_label
230
231 self.rows.append(
232 f"{len(self.rows):>4d}, {filename:>24s}, {label:>24s}, "
233 f"{s02:>3s}, {e0:>4s}, {sigma2:>24s}, {deltar:>10s}\n"
234 )
235
236 def write(self):
237 """Write selected path and GDS rows to file.
238 """
239 self.gds_writer.write()
240 with open("sp.csv", "w") as out:
241 out.writelines(self.rows)
242
243
244 def main(input_values: dict):
245 """Select paths and define GDS parameters.
246
247 Args:
248 input_values (dict): All input values from the Galaxy tool UI.
249
250 Raises:
251 ValueError: If a FEFF label is not unique.
252 """
253 default_variables = input_values["variables"]
254
255 writer = PathsWriter(default_variables=default_variables)
256
257 if len(input_values["feff_outputs"]) == 1:
258 feff_output = input_values["feff_outputs"][0]
259 writer.parse_feff_output(
260 paths_file=feff_output["paths_file"],
261 selection=feff_output["selection"],
262 )
263 else:
264 zfill_length = len(str(len(input_values["feff_outputs"])))
265 labels = set()
266 with ZipFile("merged.zip", "x", ZIP_DEFLATED) as zipfile_out:
267 for i, feff_output in enumerate(input_values["feff_outputs"]):
268 label = feff_output.pop("label") or str(i + 1).zfill(
269 zfill_length
270 )
271 if label in labels:
272 raise ValueError(f"Label '{label}' is not unique")
273 labels.add(label)
274
275 writer.parse_feff_output(
276 directory_label=label,
277 paths_file=feff_output["paths_file"],
278 selection=feff_output["selection"],
279 )
280
281 with ZipFile(feff_output["paths_zip"]) as z:
282 for zipinfo in z.infolist():
283 if zipinfo.filename != "feff/":
284 zipinfo.filename = zipinfo.filename[5:]
285 z.extract(member=zipinfo, path=label)
286 zipfile_out.write(
287 os.path.join(label, zipinfo.filename)
288 )
289
290 writer.write()
291
292
293 if __name__ == "__main__":
294 input_values = json.load(open(sys.argv[1], "r", encoding="utf-8"))
295 main(input_values)