Mercurial > repos > muon-spectroscopy-computational-project > larch_select_paths
diff larch_select_paths.py @ 0:2e827836f0ad draft
planemo upload for repository https://github.com/MaterialsGalaxy/larch-tools/tree/main/larch_select_paths commit 5be486890442dedfb327289d597e1c8110240735
author | muon-spectroscopy-computational-project |
---|---|
date | Tue, 14 Nov 2023 15:35:52 +0000 |
parents | |
children | 7fdca938d90c |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/larch_select_paths.py Tue Nov 14 15:35:52 2023 +0000 @@ -0,0 +1,295 @@ +import csv +import json +import os +import re +import sys +from zipfile import ZIP_DEFLATED, ZipFile + + +class GDSWriter: + def __init__(self, default_variables: "dict[str, dict]"): + self.default_properties = { + "s02": {"name": "s02"}, + "e0": {"name": "e0"}, + "deltar": {"name": "alpha*reff"}, + "sigma2": {"name": "sigma2"}, + } + self.rows = [ + f"{'id':>4s}, {'name':>24s}, {'value':>5s}, {'expr':>4s}, " + f"{'vary':>4s}\n" + ] + self.names = set() + + for property in self.default_properties: + name = self.default_properties[property]["name"] + value = default_variables[property]["value"] + vary = default_variables[property]["vary"] + is_common = default_variables[property]["is_common"] + + self.default_properties[property]["value"] = value + self.default_properties[property]["vary"] = vary + self.default_properties[property]["is_common"] = is_common + + if is_common: + self.append_gds(name=name, value=value, vary=vary) + + def append_gds( + self, + name: str, + value: float = 0., + expr: str = None, + vary: bool = True, + label: str = "", + ): + """Append a single GDS variable to the list of rows, later to be + written to file. + + Args: + name (str): Name of the GDS variable. + value (float, optional): Starting value for variable. + Defaults to 0. + expr (str, optional): Expression for setting the variable. + Defaults to None. + vary (bool, optional): Whether the variable is optimised during the + fit. Defaults to True. + label (str, optional): Label to keep variables for different FEFF + directories distinct. Defaults to "". + """ + formatted_name = name if (label is None) else label + name + formatted_name = formatted_name.replace("*reff", "") + if not expr: + expr = " " + + if formatted_name in self.names: + raise ValueError(f"{formatted_name} already used as variable name") + self.names.add(formatted_name) + + self.rows.append( + f"{len(self.rows):4d}, {formatted_name:>24s}, {str(value):>5s}, " + f"{expr:>4s}, {str(vary):>4s}\n" + ) + + def parse_gds( + self, + property_name: str, + variable_name: str = None, + path_variable: dict = None, + directory_label: str = None, + path_label: str = None, + ) -> str: + """Parse and append a row defining a GDS variable for a particular + path. + + Args: + property_name (str): The property to which the variable + corresponds. Should be a key in `self.default_properties`. + variable_name (str, optional): Custom name for this variable. + Defaults to None. + path_variable (dict, optional): Dictionary defining the GDS + settings for this path's variable. Defaults to None. + directory_label (str, optional): Label to indicate paths from a + separate directory. Defaults to None. + path_label (str, optional): Label indicating the atoms involved in + this path. Defaults to None. + + Returns: + str: Either `variable_name`, the name used as a default globally + for this `property_name`, or an automatically generated unique + name. + """ + if variable_name: + self.append_gds( + name=variable_name, + value=path_variable["value"], + expr=path_variable["expr"], + vary=path_variable["vary"], + ) + return variable_name + elif self.default_properties[property_name]["is_common"]: + return self.default_properties[property_name]["name"] + else: + auto_name = self.default_properties[property_name]["name"] + if directory_label: + auto_name += f"_{directory_label}" + if path_label: + auto_name += f"_{path_label.lower().replace('.', '')}" + + self.append_gds( + name=auto_name, + value=self.default_properties[property_name]["value"], + vary=self.default_properties[property_name]["vary"], + ) + return auto_name + + def write(self): + """Write GDS rows to file. + """ + with open("gds.csv", "w") as out: + out.writelines(self.rows) + + +class PathsWriter: + def __init__(self, default_variables: "dict[str, dict]"): + self.rows = [ + f"{'id':>4s}, {'filename':>24s}, {'label':>24s}, {'s02':>3s}, " + f"{'e0':>4s}, {'sigma2':>24s}, {'deltar':>10s}\n" + ] + self.gds_writer = GDSWriter(default_variables=default_variables) + + def parse_feff_output( + self, + paths_file: str, + selection: "dict[str, str|list]", + directory_label: str = "", + ): + """Parse selected paths from CSV summary and define GDS variables. + + Args: + paths_file (str): CSV summary filename. + selection (dict[str, str|list]): Dictionary indicating which paths + to select, and how to define their variables. + directory_label (str, optional): Label to indicate paths from a + separate directory. Defaults to "". + """ + paths = selection["paths"] + path_values_ids = [path_value["id"] for path_value in paths] + + with open(paths_file) as file: + reader = csv.reader(file) + for row in reader: + id_match = re.search(r"\d+", row[0]) + if id_match: + path_id = int(id_match.group()) + filename = row[0].strip() + path_label = row[-2].strip() + variables = {} + + if path_id in path_values_ids: + path_value = paths[path_values_ids.index(path_id)] + for property in self.gds_writer.default_properties: + variables[property] = self.gds_writer.parse_gds( + property_name=property, + variable_name=path_value[property]["name"], + path_variable=path_value[property], + directory_label=directory_label, + path_label=path_label, + ) + self.parse_selected_path( + filename=filename, + path_label=path_label, + directory_label=directory_label, + **variables, + ) + elif selection["selection"] == "all" or int(row[-1]): + path_value = None + for property in self.gds_writer.default_properties: + variables[property] = self.gds_writer.parse_gds( + property_name=property, + directory_label=directory_label, + path_label=path_label, + ) + self.parse_selected_path( + filename=filename, + path_label=path_label, + directory_label=directory_label, + **variables, + ) + + def parse_selected_path( + self, + filename: str, + path_label: str, + directory_label: str = "", + s02: str = "s02", + e0: str = "e0", + sigma2: str = "sigma2", + deltar: str = "alpha*reff", + ): + """Format and append row representing a selected FEFF path. + + Args: + filename (str): Name of the underlying FEFF path file, without + parent directory. + path_label (str): Label indicating the atoms involved in this path. + directory_label (str, optional): Label to indicate paths from a + separate directory. Defaults to "". + s02 (str, optional): Electron screening factor variable name. + Defaults to "s02". + e0 (str, optional): Energy shift variable name. Defaults to "e0". + sigma2 (str, optional): Mean squared displacement variable name. + Defaults to "sigma2". + deltar (str, optional): Change in path length variable. + Defaults to "alpha*reff". + """ + if directory_label: + filename = os.path.join(directory_label, filename) + label = f"{directory_label}.{path_label}" + else: + filename = os.path.join("feff", filename) + label = path_label + + self.rows.append( + f"{len(self.rows):>4d}, {filename:>24s}, {label:>24s}, " + f"{s02:>3s}, {e0:>4s}, {sigma2:>24s}, {deltar:>10s}\n" + ) + + def write(self): + """Write selected path and GDS rows to file. + """ + self.gds_writer.write() + with open("sp.csv", "w") as out: + out.writelines(self.rows) + + +def main(input_values: dict): + """Select paths and define GDS parameters. + + Args: + input_values (dict): All input values from the Galaxy tool UI. + + Raises: + ValueError: If a FEFF label is not unique. + """ + default_variables = input_values["variables"] + + writer = PathsWriter(default_variables=default_variables) + + if len(input_values["feff_outputs"]) == 1: + feff_output = input_values["feff_outputs"][0] + writer.parse_feff_output( + paths_file=feff_output["paths_file"], + selection=feff_output["selection"], + ) + else: + zfill_length = len(str(len(input_values["feff_outputs"]))) + labels = set() + with ZipFile("merged.zip", "x", ZIP_DEFLATED) as zipfile_out: + for i, feff_output in enumerate(input_values["feff_outputs"]): + label = feff_output.pop("label") or str(i + 1).zfill( + zfill_length + ) + if label in labels: + raise ValueError(f"Label '{label}' is not unique") + labels.add(label) + + writer.parse_feff_output( + directory_label=label, + paths_file=feff_output["paths_file"], + selection=feff_output["selection"], + ) + + with ZipFile(feff_output["paths_zip"]) as z: + for zipinfo in z.infolist(): + if zipinfo.filename != "feff/": + zipinfo.filename = zipinfo.filename[5:] + z.extract(member=zipinfo, path=label) + zipfile_out.write( + os.path.join(label, zipinfo.filename) + ) + + writer.write() + + +if __name__ == "__main__": + input_values = json.load(open(sys.argv[1], "r", encoding="utf-8")) + main(input_values)