# HG changeset patch # User muon-spectroscopy-computational-project # Date 1701867855 0 # Node ID 7fdca938d90cece115b2f05f307587202d76238d # Parent 2e827836f0ad3a4b5704048649faca7bcbf15394 planemo upload for repository https://github.com/MaterialsGalaxy/larch-tools/tree/main/larch_select_paths commit 1cf6d7160497ba58fe16a51f00d088a20934eba6 diff -r 2e827836f0ad -r 7fdca938d90c larch_select_paths.py --- a/larch_select_paths.py Tue Nov 14 15:35:52 2023 +0000 +++ b/larch_select_paths.py Wed Dec 06 13:04:15 2023 +0000 @@ -3,9 +3,58 @@ import os import re import sys +from itertools import combinations from zipfile import ZIP_DEFLATED, ZipFile +class CriteriaSelector: + def __init__(self, criteria: "dict[str, int|float]"): + self.max_number = criteria["max_number"] + self.max_path_length = criteria["max_path_length"] + self.min_amp_ratio = criteria["min_amplitude_ratio"] + self.max_degeneracy = criteria["max_degeneracy"] + self.path_count = 0 + + def evaluate(self, path_id: int, row: "list[str]") -> (bool, None): + if self.max_number and self.path_count >= self.max_number: + print(f"Reject path: {self.max_number} paths already reached") + return (False, None) + + r_effective = float(row[5].strip()) + if self.max_path_length and r_effective > self.max_path_length: + print(f"Reject path: {r_effective} > {self.max_path_length}") + return (False, None) + + amplitude_ratio = float(row[2].strip()) + if self.min_amp_ratio and (amplitude_ratio < self.min_amp_ratio): + print(f"Reject path: {amplitude_ratio} < {self.min_amp_ratio}") + return (False, None) + + degeneracy = float(row[3].strip()) + if self.max_degeneracy and degeneracy > self.max_degeneracy: + print(f"Reject path: {degeneracy} > {self.max_degeneracy}") + return (False, None) + + self.path_count += 1 + return (True, None) + + +class ManualSelector: + def __init__(self, selection: dict): + self.select_all = selection["selection"] == "all" + self.paths = selection["paths"] + self.path_values_ids = [path_value["id"] for path_value in self.paths] + + def evaluate(self, path_id: int, row: "list[str]") -> (bool, "None|dict"): + if path_id in self.path_values_ids: + return (True, self.paths[self.path_values_ids.index(path_id)]) + + if self.select_all or int(row[-1]): + return (True, None) + + return (False, None) + + class GDSWriter: def __init__(self, default_variables: "dict[str, dict]"): self.default_properties = { @@ -36,7 +85,7 @@ def append_gds( self, name: str, - value: float = 0., + value: float = 0.0, expr: str = None, vary: bool = True, label: str = "", @@ -122,8 +171,7 @@ return auto_name def write(self): - """Write GDS rows to file. - """ + """Write GDS rows to file.""" with open("gds.csv", "w") as out: out.writelines(self.rows) @@ -135,6 +183,7 @@ f"{'e0':>4s}, {'sigma2':>24s}, {'deltar':>10s}\n" ] self.gds_writer = GDSWriter(default_variables=default_variables) + self.all_combinations = [[0]] # 0 corresponds to the header row def parse_feff_output( self, @@ -151,49 +200,123 @@ directory_label (str, optional): Label to indicate paths from a separate directory. Defaults to "". """ - paths = selection["paths"] - path_values_ids = [path_value["id"] for path_value in paths] + combinations_list = [] + if selection["selection"] in {"criteria", "combinations"}: + selector = CriteriaSelector(selection) + else: + selector = ManualSelector(selection) + + selected_ids = self.select_rows(paths_file, directory_label, selector) + + if selection["selection"] == "combinations": + min_number = selection["min_combination_size"] + min_number = min(min_number, len(selected_ids)) + max_number = selection["max_combination_size"] + if not max_number or max_number > len(selected_ids): + max_number = len(selected_ids) + + for number_of_paths in range(min_number, max_number + 1): + for combination in combinations(selected_ids, number_of_paths): + combinations_list.append(combination) + new_combinations = len(combinations_list) + print( + f"{new_combinations} combinations for {directory_label}:\n" + f"{combinations_list}" + ) + old_combinations_len = len(self.all_combinations) + self.all_combinations *= new_combinations + for i, combination in enumerate(self.all_combinations): + new_combinations = combinations_list[i // old_combinations_len] + self.all_combinations[i] = combination + list(new_combinations) + else: + for combination in self.all_combinations: + combination.extend(selected_ids) + + def select_rows( + self, + paths_file: str, + directory_label: str, + selector: "CriteriaSelector|ManualSelector", + ) -> "list[int]": + """Evaluate each row in turn to decide whether or not it should be + included in the final output. Does not account for combinations. + + Args: + paths_file (str): CSV summary filename. + directory_label (str): Label to indicate paths from a separate + directory. + selector (CriteriaSelector|ManualSelector): Object to evaluate + whether to select each path or not. + + Returns: + list[int]: The ids of the selected rows. + """ + row_ids = [] with open(paths_file) as file: reader = csv.reader(file) for row in reader: id_match = re.search(r"\d+", row[0]) if id_match: path_id = int(id_match.group()) - filename = row[0].strip() - path_label = row[-2].strip() - variables = {} + selected, path_value = selector.evaluate( + path_id=path_id, + row=row, + ) + if selected: + filename = row[0].strip() + path_label = row[-2].strip() + row_id = self.parse_row( + directory_label, filename, path_label, path_value + ) + row_ids.append(row_id) + + return row_ids + + def parse_row( + self, + directory_label: str, + filename: str, + path_label: str, + path_value: "None|dict", + ) -> int: + """Parse row for GDS and path information. - if path_id in path_values_ids: - path_value = paths[path_values_ids.index(path_id)] - for property in self.gds_writer.default_properties: - variables[property] = self.gds_writer.parse_gds( - property_name=property, - variable_name=path_value[property]["name"], - path_variable=path_value[property], - directory_label=directory_label, - path_label=path_label, - ) - self.parse_selected_path( - filename=filename, - path_label=path_label, - directory_label=directory_label, - **variables, - ) - elif selection["selection"] == "all" or int(row[-1]): - path_value = None - for property in self.gds_writer.default_properties: - variables[property] = self.gds_writer.parse_gds( - property_name=property, - directory_label=directory_label, - path_label=path_label, - ) - self.parse_selected_path( - filename=filename, - path_label=path_label, - directory_label=directory_label, - **variables, - ) + Args: + directory_label (str): Label to indicate paths from a separate + directory. + filename (str): Filename for the FEFF path, extracted from row. + path_label (str): Label for the FEFF path, extracted from row. + path_value (None|dict): The values associated with the selected + FEFF path. May be None in which case defaults are used. + + Returns: + int: The id of the added row. + """ + variables = {} + if path_value is not None: + for property in self.gds_writer.default_properties: + variables[property] = self.gds_writer.parse_gds( + property_name=property, + variable_name=path_value[property]["name"], + path_variable=path_value[property], + directory_label=directory_label, + path_label=path_label, + ) + else: + for property in self.gds_writer.default_properties: + variables[property] = self.gds_writer.parse_gds( + property_name=property, + directory_label=directory_label, + path_label=path_label, + ) + + return self.parse_selected_path( + filename=filename, + path_label=path_label, + directory_label=directory_label, + **variables, + ) def parse_selected_path( self, @@ -204,7 +327,7 @@ e0: str = "e0", sigma2: str = "sigma2", deltar: str = "alpha*reff", - ): + ) -> int: """Format and append row representing a selected FEFF path. Args: @@ -220,6 +343,9 @@ Defaults to "sigma2". deltar (str, optional): Change in path length variable. Defaults to "alpha*reff". + + Returns: + int: The id of the added row. """ if directory_label: filename = os.path.join(directory_label, filename) @@ -228,17 +354,29 @@ filename = os.path.join("feff", filename) label = path_label + row_id = len(self.rows) self.rows.append( - f"{len(self.rows):>4d}, {filename:>24s}, {label:>24s}, " + f"{row_id:>4d}, {filename:>24s}, {label:>24s}, " f"{s02:>3s}, {e0:>4s}, {sigma2:>24s}, {deltar:>10s}\n" ) + return row_id + def write(self): - """Write selected path and GDS rows to file. - """ + """Write selected path and GDS rows to file.""" self.gds_writer.write() - with open("sp.csv", "w") as out: - out.writelines(self.rows) + + if len(self.all_combinations) == 1: + with open("sp.csv", "w") as out: + out.writelines(self.rows) + else: + for combination in self.all_combinations: + filename = "_".join([str(c) for c in combination[1:]]) + print(f"Writing combination {filename}") + with open(f"sp/{filename}.csv", "w") as out: + for row_id, row in enumerate(self.rows): + if row_id in combination: + out.write(row) def main(input_values: dict): @@ -265,9 +403,9 @@ labels = set() with ZipFile("merged.zip", "x", ZIP_DEFLATED) as zipfile_out: for i, feff_output in enumerate(input_values["feff_outputs"]): - label = feff_output.pop("label") or str(i + 1).zfill( - zfill_length - ) + label = feff_output["label"] + if not label: + label = str(i + 1).zfill(zfill_length) if label in labels: raise ValueError(f"Label '{label}' is not unique") labels.add(label) @@ -283,9 +421,8 @@ if zipinfo.filename != "feff/": zipinfo.filename = zipinfo.filename[5:] z.extract(member=zipinfo, path=label) - zipfile_out.write( - os.path.join(label, zipinfo.filename) - ) + filename = os.path.join(label, zipinfo.filename) + zipfile_out.write(filename) writer.write() diff -r 2e827836f0ad -r 7fdca938d90c larch_select_paths.xml --- a/larch_select_paths.xml Tue Nov 14 15:35:52 2023 +0000 +++ b/larch_select_paths.xml Wed Dec 06 13:04:15 2023 +0000 @@ -4,7 +4,7 @@ 0.9.71 - 0 + 1 10.1088/1742-6596/430/1/012007 @@ -17,18 +17,27 @@ + + + + + + + + + + + + - - xraylarch - matplotlib - @@ -64,8 +73,8 @@ - + + @@ -97,10 +106,20 @@ - + + + + + + + @@ -138,7 +157,13 @@ len(feff_outputs) > 1 - + + not any([f["selection"]["selection"] == "combinations" for f in feff_outputs]) + + + + any([f["selection"]["selection"] == "combinations" for f in feff_outputs]) + @@ -272,6 +297,51 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +