Mercurial > repos > muon-spectroscopy-computational-project > larch_select_paths
view larch_select_paths.py @ 0:2e827836f0ad draft
planemo upload for repository https://github.com/MaterialsGalaxy/larch-tools/tree/main/larch_select_paths commit 5be486890442dedfb327289d597e1c8110240735
author | muon-spectroscopy-computational-project |
---|---|
date | Tue, 14 Nov 2023 15:35:52 +0000 |
parents | |
children | 7fdca938d90c |
line wrap: on
line source
import csv import json import os import re import sys from zipfile import ZIP_DEFLATED, ZipFile class GDSWriter: def __init__(self, default_variables: "dict[str, dict]"): self.default_properties = { "s02": {"name": "s02"}, "e0": {"name": "e0"}, "deltar": {"name": "alpha*reff"}, "sigma2": {"name": "sigma2"}, } self.rows = [ f"{'id':>4s}, {'name':>24s}, {'value':>5s}, {'expr':>4s}, " f"{'vary':>4s}\n" ] self.names = set() for property in self.default_properties: name = self.default_properties[property]["name"] value = default_variables[property]["value"] vary = default_variables[property]["vary"] is_common = default_variables[property]["is_common"] self.default_properties[property]["value"] = value self.default_properties[property]["vary"] = vary self.default_properties[property]["is_common"] = is_common if is_common: self.append_gds(name=name, value=value, vary=vary) def append_gds( self, name: str, value: float = 0., expr: str = None, vary: bool = True, label: str = "", ): """Append a single GDS variable to the list of rows, later to be written to file. Args: name (str): Name of the GDS variable. value (float, optional): Starting value for variable. Defaults to 0. expr (str, optional): Expression for setting the variable. Defaults to None. vary (bool, optional): Whether the variable is optimised during the fit. Defaults to True. label (str, optional): Label to keep variables for different FEFF directories distinct. Defaults to "". """ formatted_name = name if (label is None) else label + name formatted_name = formatted_name.replace("*reff", "") if not expr: expr = " " if formatted_name in self.names: raise ValueError(f"{formatted_name} already used as variable name") self.names.add(formatted_name) self.rows.append( f"{len(self.rows):4d}, {formatted_name:>24s}, {str(value):>5s}, " f"{expr:>4s}, {str(vary):>4s}\n" ) def parse_gds( self, property_name: str, variable_name: str = None, path_variable: dict = None, directory_label: str = None, path_label: str = None, ) -> str: """Parse and append a row defining a GDS variable for a particular path. Args: property_name (str): The property to which the variable corresponds. Should be a key in `self.default_properties`. variable_name (str, optional): Custom name for this variable. Defaults to None. path_variable (dict, optional): Dictionary defining the GDS settings for this path's variable. Defaults to None. directory_label (str, optional): Label to indicate paths from a separate directory. Defaults to None. path_label (str, optional): Label indicating the atoms involved in this path. Defaults to None. Returns: str: Either `variable_name`, the name used as a default globally for this `property_name`, or an automatically generated unique name. """ if variable_name: self.append_gds( name=variable_name, value=path_variable["value"], expr=path_variable["expr"], vary=path_variable["vary"], ) return variable_name elif self.default_properties[property_name]["is_common"]: return self.default_properties[property_name]["name"] else: auto_name = self.default_properties[property_name]["name"] if directory_label: auto_name += f"_{directory_label}" if path_label: auto_name += f"_{path_label.lower().replace('.', '')}" self.append_gds( name=auto_name, value=self.default_properties[property_name]["value"], vary=self.default_properties[property_name]["vary"], ) return auto_name def write(self): """Write GDS rows to file. """ with open("gds.csv", "w") as out: out.writelines(self.rows) class PathsWriter: def __init__(self, default_variables: "dict[str, dict]"): self.rows = [ f"{'id':>4s}, {'filename':>24s}, {'label':>24s}, {'s02':>3s}, " f"{'e0':>4s}, {'sigma2':>24s}, {'deltar':>10s}\n" ] self.gds_writer = GDSWriter(default_variables=default_variables) def parse_feff_output( self, paths_file: str, selection: "dict[str, str|list]", directory_label: str = "", ): """Parse selected paths from CSV summary and define GDS variables. Args: paths_file (str): CSV summary filename. selection (dict[str, str|list]): Dictionary indicating which paths to select, and how to define their variables. directory_label (str, optional): Label to indicate paths from a separate directory. Defaults to "". """ paths = selection["paths"] path_values_ids = [path_value["id"] for path_value in paths] with open(paths_file) as file: reader = csv.reader(file) for row in reader: id_match = re.search(r"\d+", row[0]) if id_match: path_id = int(id_match.group()) filename = row[0].strip() path_label = row[-2].strip() variables = {} if path_id in path_values_ids: path_value = paths[path_values_ids.index(path_id)] for property in self.gds_writer.default_properties: variables[property] = self.gds_writer.parse_gds( property_name=property, variable_name=path_value[property]["name"], path_variable=path_value[property], directory_label=directory_label, path_label=path_label, ) self.parse_selected_path( filename=filename, path_label=path_label, directory_label=directory_label, **variables, ) elif selection["selection"] == "all" or int(row[-1]): path_value = None for property in self.gds_writer.default_properties: variables[property] = self.gds_writer.parse_gds( property_name=property, directory_label=directory_label, path_label=path_label, ) self.parse_selected_path( filename=filename, path_label=path_label, directory_label=directory_label, **variables, ) def parse_selected_path( self, filename: str, path_label: str, directory_label: str = "", s02: str = "s02", e0: str = "e0", sigma2: str = "sigma2", deltar: str = "alpha*reff", ): """Format and append row representing a selected FEFF path. Args: filename (str): Name of the underlying FEFF path file, without parent directory. path_label (str): Label indicating the atoms involved in this path. directory_label (str, optional): Label to indicate paths from a separate directory. Defaults to "". s02 (str, optional): Electron screening factor variable name. Defaults to "s02". e0 (str, optional): Energy shift variable name. Defaults to "e0". sigma2 (str, optional): Mean squared displacement variable name. Defaults to "sigma2". deltar (str, optional): Change in path length variable. Defaults to "alpha*reff". """ if directory_label: filename = os.path.join(directory_label, filename) label = f"{directory_label}.{path_label}" else: filename = os.path.join("feff", filename) label = path_label self.rows.append( f"{len(self.rows):>4d}, {filename:>24s}, {label:>24s}, " f"{s02:>3s}, {e0:>4s}, {sigma2:>24s}, {deltar:>10s}\n" ) def write(self): """Write selected path and GDS rows to file. """ self.gds_writer.write() with open("sp.csv", "w") as out: out.writelines(self.rows) def main(input_values: dict): """Select paths and define GDS parameters. Args: input_values (dict): All input values from the Galaxy tool UI. Raises: ValueError: If a FEFF label is not unique. """ default_variables = input_values["variables"] writer = PathsWriter(default_variables=default_variables) if len(input_values["feff_outputs"]) == 1: feff_output = input_values["feff_outputs"][0] writer.parse_feff_output( paths_file=feff_output["paths_file"], selection=feff_output["selection"], ) else: zfill_length = len(str(len(input_values["feff_outputs"]))) labels = set() with ZipFile("merged.zip", "x", ZIP_DEFLATED) as zipfile_out: for i, feff_output in enumerate(input_values["feff_outputs"]): label = feff_output.pop("label") or str(i + 1).zfill( zfill_length ) if label in labels: raise ValueError(f"Label '{label}' is not unique") labels.add(label) writer.parse_feff_output( directory_label=label, paths_file=feff_output["paths_file"], selection=feff_output["selection"], ) with ZipFile(feff_output["paths_zip"]) as z: for zipinfo in z.infolist(): if zipinfo.filename != "feff/": zipinfo.filename = zipinfo.filename[5:] z.extract(member=zipinfo, path=label) zipfile_out.write( os.path.join(label, zipinfo.filename) ) writer.write() if __name__ == "__main__": input_values = json.load(open(sys.argv[1], "r", encoding="utf-8")) main(input_values)