diff larch_select_paths.py @ 0:2e827836f0ad draft

planemo upload for repository https://github.com/MaterialsGalaxy/larch-tools/tree/main/larch_select_paths commit 5be486890442dedfb327289d597e1c8110240735
author muon-spectroscopy-computational-project
date Tue, 14 Nov 2023 15:35:52 +0000
parents
children 7fdca938d90c
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/larch_select_paths.py	Tue Nov 14 15:35:52 2023 +0000
@@ -0,0 +1,295 @@
+import csv
+import json
+import os
+import re
+import sys
+from zipfile import ZIP_DEFLATED, ZipFile
+
+
+class GDSWriter:
+    def __init__(self, default_variables: "dict[str, dict]"):
+        self.default_properties = {
+            "s02": {"name": "s02"},
+            "e0": {"name": "e0"},
+            "deltar": {"name": "alpha*reff"},
+            "sigma2": {"name": "sigma2"},
+        }
+        self.rows = [
+            f"{'id':>4s}, {'name':>24s}, {'value':>5s}, {'expr':>4s}, "
+            f"{'vary':>4s}\n"
+        ]
+        self.names = set()
+
+        for property in self.default_properties:
+            name = self.default_properties[property]["name"]
+            value = default_variables[property]["value"]
+            vary = default_variables[property]["vary"]
+            is_common = default_variables[property]["is_common"]
+
+            self.default_properties[property]["value"] = value
+            self.default_properties[property]["vary"] = vary
+            self.default_properties[property]["is_common"] = is_common
+
+            if is_common:
+                self.append_gds(name=name, value=value, vary=vary)
+
+    def append_gds(
+        self,
+        name: str,
+        value: float = 0.,
+        expr: str = None,
+        vary: bool = True,
+        label: str = "",
+    ):
+        """Append a single GDS variable to the list of rows, later to be
+        written to file.
+
+        Args:
+            name (str): Name of the GDS variable.
+            value (float, optional): Starting value for variable.
+                Defaults to 0.
+            expr (str, optional): Expression for setting the variable.
+                Defaults to None.
+            vary (bool, optional): Whether the variable is optimised during the
+                fit. Defaults to True.
+            label (str, optional): Label to keep variables for different FEFF
+                directories distinct. Defaults to "".
+        """
+        formatted_name = name if (label is None) else label + name
+        formatted_name = formatted_name.replace("*reff", "")
+        if not expr:
+            expr = "    "
+
+        if formatted_name in self.names:
+            raise ValueError(f"{formatted_name} already used as variable name")
+        self.names.add(formatted_name)
+
+        self.rows.append(
+            f"{len(self.rows):4d}, {formatted_name:>24s}, {str(value):>5s}, "
+            f"{expr:>4s}, {str(vary):>4s}\n"
+        )
+
+    def parse_gds(
+        self,
+        property_name: str,
+        variable_name: str = None,
+        path_variable: dict = None,
+        directory_label: str = None,
+        path_label: str = None,
+    ) -> str:
+        """Parse and append a row defining a GDS variable for a particular
+        path.
+
+        Args:
+            property_name (str): The property to which the variable
+                corresponds. Should be a key in `self.default_properties`.
+            variable_name (str, optional): Custom name for this variable.
+                Defaults to None.
+            path_variable (dict, optional): Dictionary defining the GDS
+                settings for this path's variable. Defaults to None.
+            directory_label (str, optional): Label to indicate paths from a
+                separate directory. Defaults to None.
+            path_label (str, optional): Label indicating the atoms involved in
+                this path. Defaults to None.
+
+        Returns:
+            str: Either `variable_name`, the name used as a default globally
+                for this `property_name`, or an automatically generated unique
+                name.
+        """
+        if variable_name:
+            self.append_gds(
+                name=variable_name,
+                value=path_variable["value"],
+                expr=path_variable["expr"],
+                vary=path_variable["vary"],
+            )
+            return variable_name
+        elif self.default_properties[property_name]["is_common"]:
+            return self.default_properties[property_name]["name"]
+        else:
+            auto_name = self.default_properties[property_name]["name"]
+            if directory_label:
+                auto_name += f"_{directory_label}"
+            if path_label:
+                auto_name += f"_{path_label.lower().replace('.', '')}"
+
+            self.append_gds(
+                name=auto_name,
+                value=self.default_properties[property_name]["value"],
+                vary=self.default_properties[property_name]["vary"],
+            )
+            return auto_name
+
+    def write(self):
+        """Write GDS rows to file.
+        """
+        with open("gds.csv", "w") as out:
+            out.writelines(self.rows)
+
+
+class PathsWriter:
+    def __init__(self, default_variables: "dict[str, dict]"):
+        self.rows = [
+            f"{'id':>4s}, {'filename':>24s}, {'label':>24s}, {'s02':>3s}, "
+            f"{'e0':>4s}, {'sigma2':>24s}, {'deltar':>10s}\n"
+        ]
+        self.gds_writer = GDSWriter(default_variables=default_variables)
+
+    def parse_feff_output(
+        self,
+        paths_file: str,
+        selection: "dict[str, str|list]",
+        directory_label: str = "",
+    ):
+        """Parse selected paths from CSV summary and define GDS variables.
+
+        Args:
+            paths_file (str): CSV summary filename.
+            selection (dict[str, str|list]): Dictionary indicating which paths
+                to select, and how to define their variables.
+            directory_label (str, optional): Label to indicate paths from a
+                separate directory. Defaults to "".
+        """
+        paths = selection["paths"]
+        path_values_ids = [path_value["id"] for path_value in paths]
+
+        with open(paths_file) as file:
+            reader = csv.reader(file)
+            for row in reader:
+                id_match = re.search(r"\d+", row[0])
+                if id_match:
+                    path_id = int(id_match.group())
+                    filename = row[0].strip()
+                    path_label = row[-2].strip()
+                    variables = {}
+
+                    if path_id in path_values_ids:
+                        path_value = paths[path_values_ids.index(path_id)]
+                        for property in self.gds_writer.default_properties:
+                            variables[property] = self.gds_writer.parse_gds(
+                                property_name=property,
+                                variable_name=path_value[property]["name"],
+                                path_variable=path_value[property],
+                                directory_label=directory_label,
+                                path_label=path_label,
+                            )
+                        self.parse_selected_path(
+                            filename=filename,
+                            path_label=path_label,
+                            directory_label=directory_label,
+                            **variables,
+                        )
+                    elif selection["selection"] == "all" or int(row[-1]):
+                        path_value = None
+                        for property in self.gds_writer.default_properties:
+                            variables[property] = self.gds_writer.parse_gds(
+                                property_name=property,
+                                directory_label=directory_label,
+                                path_label=path_label,
+                            )
+                        self.parse_selected_path(
+                            filename=filename,
+                            path_label=path_label,
+                            directory_label=directory_label,
+                            **variables,
+                        )
+
+    def parse_selected_path(
+        self,
+        filename: str,
+        path_label: str,
+        directory_label: str = "",
+        s02: str = "s02",
+        e0: str = "e0",
+        sigma2: str = "sigma2",
+        deltar: str = "alpha*reff",
+    ):
+        """Format and append row representing a selected FEFF path.
+
+        Args:
+            filename (str): Name of the underlying FEFF path file, without
+                parent directory.
+            path_label (str): Label indicating the atoms involved in this path.
+            directory_label (str, optional): Label to indicate paths from a
+                separate directory. Defaults to "".
+            s02 (str, optional): Electron screening factor variable name.
+                Defaults to "s02".
+            e0 (str, optional): Energy shift variable name. Defaults to "e0".
+            sigma2 (str, optional): Mean squared displacement variable name.
+                Defaults to "sigma2".
+            deltar (str, optional): Change in path length variable.
+                Defaults to "alpha*reff".
+        """
+        if directory_label:
+            filename = os.path.join(directory_label, filename)
+            label = f"{directory_label}.{path_label}"
+        else:
+            filename = os.path.join("feff", filename)
+            label = path_label
+
+        self.rows.append(
+            f"{len(self.rows):>4d}, {filename:>24s}, {label:>24s}, "
+            f"{s02:>3s}, {e0:>4s}, {sigma2:>24s}, {deltar:>10s}\n"
+        )
+
+    def write(self):
+        """Write selected path and GDS rows to file.
+        """
+        self.gds_writer.write()
+        with open("sp.csv", "w") as out:
+            out.writelines(self.rows)
+
+
+def main(input_values: dict):
+    """Select paths and define GDS parameters.
+
+    Args:
+        input_values (dict): All input values from the Galaxy tool UI.
+
+    Raises:
+        ValueError: If a FEFF label is not unique.
+    """
+    default_variables = input_values["variables"]
+
+    writer = PathsWriter(default_variables=default_variables)
+
+    if len(input_values["feff_outputs"]) == 1:
+        feff_output = input_values["feff_outputs"][0]
+        writer.parse_feff_output(
+            paths_file=feff_output["paths_file"],
+            selection=feff_output["selection"],
+        )
+    else:
+        zfill_length = len(str(len(input_values["feff_outputs"])))
+        labels = set()
+        with ZipFile("merged.zip", "x", ZIP_DEFLATED) as zipfile_out:
+            for i, feff_output in enumerate(input_values["feff_outputs"]):
+                label = feff_output.pop("label") or str(i + 1).zfill(
+                    zfill_length
+                )
+                if label in labels:
+                    raise ValueError(f"Label '{label}' is not unique")
+                labels.add(label)
+
+                writer.parse_feff_output(
+                    directory_label=label,
+                    paths_file=feff_output["paths_file"],
+                    selection=feff_output["selection"],
+                )
+
+                with ZipFile(feff_output["paths_zip"]) as z:
+                    for zipinfo in z.infolist():
+                        if zipinfo.filename != "feff/":
+                            zipinfo.filename = zipinfo.filename[5:]
+                            z.extract(member=zipinfo, path=label)
+                            zipfile_out.write(
+                                os.path.join(label, zipinfo.filename)
+                            )
+
+    writer.write()
+
+
+if __name__ == "__main__":
+    input_values = json.load(open(sys.argv[1], "r", encoding="utf-8"))
+    main(input_values)