Mercurial > repos > muon-spectroscopy-computational-project > larch_select_paths
comparison larch_select_paths.py @ 0:2e827836f0ad draft
planemo upload for repository https://github.com/MaterialsGalaxy/larch-tools/tree/main/larch_select_paths commit 5be486890442dedfb327289d597e1c8110240735
author | muon-spectroscopy-computational-project |
---|---|
date | Tue, 14 Nov 2023 15:35:52 +0000 (16 months ago) |
parents | |
children | 7fdca938d90c |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:2e827836f0ad |
---|---|
1 import csv | |
2 import json | |
3 import os | |
4 import re | |
5 import sys | |
6 from zipfile import ZIP_DEFLATED, ZipFile | |
7 | |
8 | |
9 class GDSWriter: | |
10 def __init__(self, default_variables: "dict[str, dict]"): | |
11 self.default_properties = { | |
12 "s02": {"name": "s02"}, | |
13 "e0": {"name": "e0"}, | |
14 "deltar": {"name": "alpha*reff"}, | |
15 "sigma2": {"name": "sigma2"}, | |
16 } | |
17 self.rows = [ | |
18 f"{'id':>4s}, {'name':>24s}, {'value':>5s}, {'expr':>4s}, " | |
19 f"{'vary':>4s}\n" | |
20 ] | |
21 self.names = set() | |
22 | |
23 for property in self.default_properties: | |
24 name = self.default_properties[property]["name"] | |
25 value = default_variables[property]["value"] | |
26 vary = default_variables[property]["vary"] | |
27 is_common = default_variables[property]["is_common"] | |
28 | |
29 self.default_properties[property]["value"] = value | |
30 self.default_properties[property]["vary"] = vary | |
31 self.default_properties[property]["is_common"] = is_common | |
32 | |
33 if is_common: | |
34 self.append_gds(name=name, value=value, vary=vary) | |
35 | |
36 def append_gds( | |
37 self, | |
38 name: str, | |
39 value: float = 0., | |
40 expr: str = None, | |
41 vary: bool = True, | |
42 label: str = "", | |
43 ): | |
44 """Append a single GDS variable to the list of rows, later to be | |
45 written to file. | |
46 | |
47 Args: | |
48 name (str): Name of the GDS variable. | |
49 value (float, optional): Starting value for variable. | |
50 Defaults to 0. | |
51 expr (str, optional): Expression for setting the variable. | |
52 Defaults to None. | |
53 vary (bool, optional): Whether the variable is optimised during the | |
54 fit. Defaults to True. | |
55 label (str, optional): Label to keep variables for different FEFF | |
56 directories distinct. Defaults to "". | |
57 """ | |
58 formatted_name = name if (label is None) else label + name | |
59 formatted_name = formatted_name.replace("*reff", "") | |
60 if not expr: | |
61 expr = " " | |
62 | |
63 if formatted_name in self.names: | |
64 raise ValueError(f"{formatted_name} already used as variable name") | |
65 self.names.add(formatted_name) | |
66 | |
67 self.rows.append( | |
68 f"{len(self.rows):4d}, {formatted_name:>24s}, {str(value):>5s}, " | |
69 f"{expr:>4s}, {str(vary):>4s}\n" | |
70 ) | |
71 | |
72 def parse_gds( | |
73 self, | |
74 property_name: str, | |
75 variable_name: str = None, | |
76 path_variable: dict = None, | |
77 directory_label: str = None, | |
78 path_label: str = None, | |
79 ) -> str: | |
80 """Parse and append a row defining a GDS variable for a particular | |
81 path. | |
82 | |
83 Args: | |
84 property_name (str): The property to which the variable | |
85 corresponds. Should be a key in `self.default_properties`. | |
86 variable_name (str, optional): Custom name for this variable. | |
87 Defaults to None. | |
88 path_variable (dict, optional): Dictionary defining the GDS | |
89 settings for this path's variable. Defaults to None. | |
90 directory_label (str, optional): Label to indicate paths from a | |
91 separate directory. Defaults to None. | |
92 path_label (str, optional): Label indicating the atoms involved in | |
93 this path. Defaults to None. | |
94 | |
95 Returns: | |
96 str: Either `variable_name`, the name used as a default globally | |
97 for this `property_name`, or an automatically generated unique | |
98 name. | |
99 """ | |
100 if variable_name: | |
101 self.append_gds( | |
102 name=variable_name, | |
103 value=path_variable["value"], | |
104 expr=path_variable["expr"], | |
105 vary=path_variable["vary"], | |
106 ) | |
107 return variable_name | |
108 elif self.default_properties[property_name]["is_common"]: | |
109 return self.default_properties[property_name]["name"] | |
110 else: | |
111 auto_name = self.default_properties[property_name]["name"] | |
112 if directory_label: | |
113 auto_name += f"_{directory_label}" | |
114 if path_label: | |
115 auto_name += f"_{path_label.lower().replace('.', '')}" | |
116 | |
117 self.append_gds( | |
118 name=auto_name, | |
119 value=self.default_properties[property_name]["value"], | |
120 vary=self.default_properties[property_name]["vary"], | |
121 ) | |
122 return auto_name | |
123 | |
124 def write(self): | |
125 """Write GDS rows to file. | |
126 """ | |
127 with open("gds.csv", "w") as out: | |
128 out.writelines(self.rows) | |
129 | |
130 | |
131 class PathsWriter: | |
132 def __init__(self, default_variables: "dict[str, dict]"): | |
133 self.rows = [ | |
134 f"{'id':>4s}, {'filename':>24s}, {'label':>24s}, {'s02':>3s}, " | |
135 f"{'e0':>4s}, {'sigma2':>24s}, {'deltar':>10s}\n" | |
136 ] | |
137 self.gds_writer = GDSWriter(default_variables=default_variables) | |
138 | |
139 def parse_feff_output( | |
140 self, | |
141 paths_file: str, | |
142 selection: "dict[str, str|list]", | |
143 directory_label: str = "", | |
144 ): | |
145 """Parse selected paths from CSV summary and define GDS variables. | |
146 | |
147 Args: | |
148 paths_file (str): CSV summary filename. | |
149 selection (dict[str, str|list]): Dictionary indicating which paths | |
150 to select, and how to define their variables. | |
151 directory_label (str, optional): Label to indicate paths from a | |
152 separate directory. Defaults to "". | |
153 """ | |
154 paths = selection["paths"] | |
155 path_values_ids = [path_value["id"] for path_value in paths] | |
156 | |
157 with open(paths_file) as file: | |
158 reader = csv.reader(file) | |
159 for row in reader: | |
160 id_match = re.search(r"\d+", row[0]) | |
161 if id_match: | |
162 path_id = int(id_match.group()) | |
163 filename = row[0].strip() | |
164 path_label = row[-2].strip() | |
165 variables = {} | |
166 | |
167 if path_id in path_values_ids: | |
168 path_value = paths[path_values_ids.index(path_id)] | |
169 for property in self.gds_writer.default_properties: | |
170 variables[property] = self.gds_writer.parse_gds( | |
171 property_name=property, | |
172 variable_name=path_value[property]["name"], | |
173 path_variable=path_value[property], | |
174 directory_label=directory_label, | |
175 path_label=path_label, | |
176 ) | |
177 self.parse_selected_path( | |
178 filename=filename, | |
179 path_label=path_label, | |
180 directory_label=directory_label, | |
181 **variables, | |
182 ) | |
183 elif selection["selection"] == "all" or int(row[-1]): | |
184 path_value = None | |
185 for property in self.gds_writer.default_properties: | |
186 variables[property] = self.gds_writer.parse_gds( | |
187 property_name=property, | |
188 directory_label=directory_label, | |
189 path_label=path_label, | |
190 ) | |
191 self.parse_selected_path( | |
192 filename=filename, | |
193 path_label=path_label, | |
194 directory_label=directory_label, | |
195 **variables, | |
196 ) | |
197 | |
198 def parse_selected_path( | |
199 self, | |
200 filename: str, | |
201 path_label: str, | |
202 directory_label: str = "", | |
203 s02: str = "s02", | |
204 e0: str = "e0", | |
205 sigma2: str = "sigma2", | |
206 deltar: str = "alpha*reff", | |
207 ): | |
208 """Format and append row representing a selected FEFF path. | |
209 | |
210 Args: | |
211 filename (str): Name of the underlying FEFF path file, without | |
212 parent directory. | |
213 path_label (str): Label indicating the atoms involved in this path. | |
214 directory_label (str, optional): Label to indicate paths from a | |
215 separate directory. Defaults to "". | |
216 s02 (str, optional): Electron screening factor variable name. | |
217 Defaults to "s02". | |
218 e0 (str, optional): Energy shift variable name. Defaults to "e0". | |
219 sigma2 (str, optional): Mean squared displacement variable name. | |
220 Defaults to "sigma2". | |
221 deltar (str, optional): Change in path length variable. | |
222 Defaults to "alpha*reff". | |
223 """ | |
224 if directory_label: | |
225 filename = os.path.join(directory_label, filename) | |
226 label = f"{directory_label}.{path_label}" | |
227 else: | |
228 filename = os.path.join("feff", filename) | |
229 label = path_label | |
230 | |
231 self.rows.append( | |
232 f"{len(self.rows):>4d}, {filename:>24s}, {label:>24s}, " | |
233 f"{s02:>3s}, {e0:>4s}, {sigma2:>24s}, {deltar:>10s}\n" | |
234 ) | |
235 | |
236 def write(self): | |
237 """Write selected path and GDS rows to file. | |
238 """ | |
239 self.gds_writer.write() | |
240 with open("sp.csv", "w") as out: | |
241 out.writelines(self.rows) | |
242 | |
243 | |
244 def main(input_values: dict): | |
245 """Select paths and define GDS parameters. | |
246 | |
247 Args: | |
248 input_values (dict): All input values from the Galaxy tool UI. | |
249 | |
250 Raises: | |
251 ValueError: If a FEFF label is not unique. | |
252 """ | |
253 default_variables = input_values["variables"] | |
254 | |
255 writer = PathsWriter(default_variables=default_variables) | |
256 | |
257 if len(input_values["feff_outputs"]) == 1: | |
258 feff_output = input_values["feff_outputs"][0] | |
259 writer.parse_feff_output( | |
260 paths_file=feff_output["paths_file"], | |
261 selection=feff_output["selection"], | |
262 ) | |
263 else: | |
264 zfill_length = len(str(len(input_values["feff_outputs"]))) | |
265 labels = set() | |
266 with ZipFile("merged.zip", "x", ZIP_DEFLATED) as zipfile_out: | |
267 for i, feff_output in enumerate(input_values["feff_outputs"]): | |
268 label = feff_output.pop("label") or str(i + 1).zfill( | |
269 zfill_length | |
270 ) | |
271 if label in labels: | |
272 raise ValueError(f"Label '{label}' is not unique") | |
273 labels.add(label) | |
274 | |
275 writer.parse_feff_output( | |
276 directory_label=label, | |
277 paths_file=feff_output["paths_file"], | |
278 selection=feff_output["selection"], | |
279 ) | |
280 | |
281 with ZipFile(feff_output["paths_zip"]) as z: | |
282 for zipinfo in z.infolist(): | |
283 if zipinfo.filename != "feff/": | |
284 zipinfo.filename = zipinfo.filename[5:] | |
285 z.extract(member=zipinfo, path=label) | |
286 zipfile_out.write( | |
287 os.path.join(label, zipinfo.filename) | |
288 ) | |
289 | |
290 writer.write() | |
291 | |
292 | |
293 if __name__ == "__main__": | |
294 input_values = json.load(open(sys.argv[1], "r", encoding="utf-8")) | |
295 main(input_values) |