changeset 1:7fdca938d90c draft

planemo upload for repository https://github.com/MaterialsGalaxy/larch-tools/tree/main/larch_select_paths commit 1cf6d7160497ba58fe16a51f00d088a20934eba6
author muon-spectroscopy-computational-project
date Wed, 06 Dec 2023 13:04:15 +0000
parents 2e827836f0ad
children 200534149c4d
files larch_select_paths.py larch_select_paths.xml test-data/sp_criteria.csv
diffstat 3 files changed, 271 insertions(+), 60 deletions(-) [+]
line wrap: on
line diff
--- a/larch_select_paths.py	Tue Nov 14 15:35:52 2023 +0000
+++ b/larch_select_paths.py	Wed Dec 06 13:04:15 2023 +0000
@@ -3,9 +3,58 @@
 import os
 import re
 import sys
+from itertools import combinations
 from zipfile import ZIP_DEFLATED, ZipFile
 
 
+class CriteriaSelector:
+    def __init__(self, criteria: "dict[str, int|float]"):
+        self.max_number = criteria["max_number"]
+        self.max_path_length = criteria["max_path_length"]
+        self.min_amp_ratio = criteria["min_amplitude_ratio"]
+        self.max_degeneracy = criteria["max_degeneracy"]
+        self.path_count = 0
+
+    def evaluate(self, path_id: int, row: "list[str]") -> (bool, None):
+        if self.max_number and self.path_count >= self.max_number:
+            print(f"Reject path: {self.max_number} paths already reached")
+            return (False, None)
+
+        r_effective = float(row[5].strip())
+        if self.max_path_length and r_effective > self.max_path_length:
+            print(f"Reject path: {r_effective} > {self.max_path_length}")
+            return (False, None)
+
+        amplitude_ratio = float(row[2].strip())
+        if self.min_amp_ratio and (amplitude_ratio < self.min_amp_ratio):
+            print(f"Reject path: {amplitude_ratio} < {self.min_amp_ratio}")
+            return (False, None)
+
+        degeneracy = float(row[3].strip())
+        if self.max_degeneracy and degeneracy > self.max_degeneracy:
+            print(f"Reject path: {degeneracy} > {self.max_degeneracy}")
+            return (False, None)
+
+        self.path_count += 1
+        return (True, None)
+
+
+class ManualSelector:
+    def __init__(self, selection: dict):
+        self.select_all = selection["selection"] == "all"
+        self.paths = selection["paths"]
+        self.path_values_ids = [path_value["id"] for path_value in self.paths]
+
+    def evaluate(self, path_id: int, row: "list[str]") -> (bool, "None|dict"):
+        if path_id in self.path_values_ids:
+            return (True, self.paths[self.path_values_ids.index(path_id)])
+
+        if self.select_all or int(row[-1]):
+            return (True, None)
+
+        return (False, None)
+
+
 class GDSWriter:
     def __init__(self, default_variables: "dict[str, dict]"):
         self.default_properties = {
@@ -36,7 +85,7 @@
     def append_gds(
         self,
         name: str,
-        value: float = 0.,
+        value: float = 0.0,
         expr: str = None,
         vary: bool = True,
         label: str = "",
@@ -122,8 +171,7 @@
             return auto_name
 
     def write(self):
-        """Write GDS rows to file.
-        """
+        """Write GDS rows to file."""
         with open("gds.csv", "w") as out:
             out.writelines(self.rows)
 
@@ -135,6 +183,7 @@
             f"{'e0':>4s}, {'sigma2':>24s}, {'deltar':>10s}\n"
         ]
         self.gds_writer = GDSWriter(default_variables=default_variables)
+        self.all_combinations = [[0]]  # 0 corresponds to the header row
 
     def parse_feff_output(
         self,
@@ -151,49 +200,123 @@
             directory_label (str, optional): Label to indicate paths from a
                 separate directory. Defaults to "".
         """
-        paths = selection["paths"]
-        path_values_ids = [path_value["id"] for path_value in paths]
+        combinations_list = []
+        if selection["selection"] in {"criteria", "combinations"}:
+            selector = CriteriaSelector(selection)
+        else:
+            selector = ManualSelector(selection)
+
+        selected_ids = self.select_rows(paths_file, directory_label, selector)
+
+        if selection["selection"] == "combinations":
+            min_number = selection["min_combination_size"]
+            min_number = min(min_number, len(selected_ids))
+            max_number = selection["max_combination_size"]
+            if not max_number or max_number > len(selected_ids):
+                max_number = len(selected_ids)
+
+            for number_of_paths in range(min_number, max_number + 1):
+                for combination in combinations(selected_ids, number_of_paths):
+                    combinations_list.append(combination)
 
+            new_combinations = len(combinations_list)
+            print(
+                f"{new_combinations} combinations for {directory_label}:\n"
+                f"{combinations_list}"
+            )
+            old_combinations_len = len(self.all_combinations)
+            self.all_combinations *= new_combinations
+            for i, combination in enumerate(self.all_combinations):
+                new_combinations = combinations_list[i // old_combinations_len]
+                self.all_combinations[i] = combination + list(new_combinations)
+        else:
+            for combination in self.all_combinations:
+                combination.extend(selected_ids)
+
+    def select_rows(
+        self,
+        paths_file: str,
+        directory_label: str,
+        selector: "CriteriaSelector|ManualSelector",
+    ) -> "list[int]":
+        """Evaluate each row in turn to decide whether or not it should be
+        included in the final output. Does not account for combinations.
+
+        Args:
+            paths_file (str): CSV summary filename.
+            directory_label (str): Label to indicate paths from a separate
+                directory.
+            selector (CriteriaSelector|ManualSelector): Object to evaluate
+                whether to select each path or not.
+
+        Returns:
+            list[int]: The ids of the selected rows.
+        """
+        row_ids = []
         with open(paths_file) as file:
             reader = csv.reader(file)
             for row in reader:
                 id_match = re.search(r"\d+", row[0])
                 if id_match:
                     path_id = int(id_match.group())
-                    filename = row[0].strip()
-                    path_label = row[-2].strip()
-                    variables = {}
+                    selected, path_value = selector.evaluate(
+                        path_id=path_id,
+                        row=row,
+                    )
+                    if selected:
+                        filename = row[0].strip()
+                        path_label = row[-2].strip()
+                        row_id = self.parse_row(
+                            directory_label, filename, path_label, path_value
+                        )
+                        row_ids.append(row_id)
+
+        return row_ids
+
+    def parse_row(
+        self,
+        directory_label: str,
+        filename: str,
+        path_label: str,
+        path_value: "None|dict",
+    ) -> int:
+        """Parse row for GDS and path information.
 
-                    if path_id in path_values_ids:
-                        path_value = paths[path_values_ids.index(path_id)]
-                        for property in self.gds_writer.default_properties:
-                            variables[property] = self.gds_writer.parse_gds(
-                                property_name=property,
-                                variable_name=path_value[property]["name"],
-                                path_variable=path_value[property],
-                                directory_label=directory_label,
-                                path_label=path_label,
-                            )
-                        self.parse_selected_path(
-                            filename=filename,
-                            path_label=path_label,
-                            directory_label=directory_label,
-                            **variables,
-                        )
-                    elif selection["selection"] == "all" or int(row[-1]):
-                        path_value = None
-                        for property in self.gds_writer.default_properties:
-                            variables[property] = self.gds_writer.parse_gds(
-                                property_name=property,
-                                directory_label=directory_label,
-                                path_label=path_label,
-                            )
-                        self.parse_selected_path(
-                            filename=filename,
-                            path_label=path_label,
-                            directory_label=directory_label,
-                            **variables,
-                        )
+        Args:
+            directory_label (str): Label to indicate paths from a separate
+                directory.
+            filename (str): Filename for the FEFF path, extracted from row.
+            path_label (str): Label for the FEFF path, extracted from row.
+            path_value (None|dict): The values associated with the selected
+                FEFF path. May be None in which case defaults are used.
+
+        Returns:
+            int: The id of the added row.
+        """
+        variables = {}
+        if path_value is not None:
+            for property in self.gds_writer.default_properties:
+                variables[property] = self.gds_writer.parse_gds(
+                    property_name=property,
+                    variable_name=path_value[property]["name"],
+                    path_variable=path_value[property],
+                    directory_label=directory_label,
+                    path_label=path_label,
+                )
+        else:
+            for property in self.gds_writer.default_properties:
+                variables[property] = self.gds_writer.parse_gds(
+                    property_name=property,
+                    directory_label=directory_label,
+                    path_label=path_label,
+                )
+
+        return self.parse_selected_path(
+            filename=filename,
+            path_label=path_label,
+            directory_label=directory_label,
+            **variables,
+        )
 
     def parse_selected_path(
         self,
@@ -204,7 +327,7 @@
         e0: str = "e0",
         sigma2: str = "sigma2",
         deltar: str = "alpha*reff",
-    ):
+    ) -> int:
         """Format and append row representing a selected FEFF path.
 
         Args:
@@ -220,6 +343,9 @@
                 Defaults to "sigma2".
             deltar (str, optional): Change in path length variable.
                 Defaults to "alpha*reff".
+
+        Returns:
+            int: The id of the added row.
         """
         if directory_label:
             filename = os.path.join(directory_label, filename)
@@ -228,17 +354,29 @@
             filename = os.path.join("feff", filename)
             label = path_label
 
+        row_id = len(self.rows)
         self.rows.append(
-            f"{len(self.rows):>4d}, {filename:>24s}, {label:>24s}, "
+            f"{row_id:>4d}, {filename:>24s}, {label:>24s}, "
             f"{s02:>3s}, {e0:>4s}, {sigma2:>24s}, {deltar:>10s}\n"
         )
 
+        return row_id
+
     def write(self):
-        """Write selected path and GDS rows to file.
-        """
+        """Write selected path and GDS rows to file."""
         self.gds_writer.write()
-        with open("sp.csv", "w") as out:
-            out.writelines(self.rows)
+
+        if len(self.all_combinations) == 1:
+            with open("sp.csv", "w") as out:
+                out.writelines(self.rows)
+        else:
+            for combination in self.all_combinations:
+                filename = "_".join([str(c) for c in combination[1:]])
+                print(f"Writing combination {filename}")
+                with open(f"sp/{filename}.csv", "w") as out:
+                    for row_id, row in enumerate(self.rows):
+                        if row_id in combination:
+                            out.write(row)
 
 
 def main(input_values: dict):
@@ -265,9 +403,9 @@
         labels = set()
         with ZipFile("merged.zip", "x", ZIP_DEFLATED) as zipfile_out:
             for i, feff_output in enumerate(input_values["feff_outputs"]):
-                label = feff_output.pop("label") or str(i + 1).zfill(
-                    zfill_length
-                )
+                label = feff_output["label"]
+                if not label:
+                    label = str(i + 1).zfill(zfill_length)
                 if label in labels:
                     raise ValueError(f"Label '{label}' is not unique")
                 labels.add(label)
@@ -283,9 +421,8 @@
                         if zipinfo.filename != "feff/":
                             zipinfo.filename = zipinfo.filename[5:]
                             z.extract(member=zipinfo, path=label)
-                            zipfile_out.write(
-                                os.path.join(label, zipinfo.filename)
-                            )
+                            filename = os.path.join(label, zipinfo.filename)
+                            zipfile_out.write(filename)
 
     writer.write()
 
--- a/larch_select_paths.xml	Tue Nov 14 15:35:52 2023 +0000
+++ b/larch_select_paths.xml	Wed Dec 06 13:04:15 2023 +0000
@@ -4,7 +4,7 @@
         <!-- version of underlying tool (PEP 440) -->
         <token name="@TOOL_VERSION@">0.9.71</token>
         <!-- version of this tool wrapper (integer) -->
-        <token name="@WRAPPER_VERSION@">0</token>
+        <token name="@WRAPPER_VERSION@">1</token>
         <!-- citation should be updated with every underlying tool version -->
         <!-- typical fields to update are version, month, year, and doi -->
         <token name="@TOOL_CITATION@">10.1088/1742-6596/430/1/012007</token>
@@ -17,18 +17,27 @@
         <xml name="vary">
             <param name="vary" type="boolean" checked="true" label="Vary" help="If True, the initial 'Guess' will be optimised in the fitting. If False, the value will be 'Set' instead and not optimised."/>
         </xml>
+        <xml name="max_number">
+            <param name="max_number" type="integer" min="1" optional="true" label="Maximum number of paths" help="Will select (up to) this many paths, ordered by ascending path length, subject to criteria below. If unset, will select all that meet the criteria."/>
+        </xml>
+        <xml name="max_path_length">
+            <param name="max_path_length" type="float" min="0" optional="true" label="Maximum path length" help="Exclude paths with lengths greater than this (expressed in Angstrom). If unset, will not restrict based on path length."/>
+        </xml>
+        <xml name="min_amplitude_ratio">
+            <param name="min_amplitude_ratio" type="float" min="0" max="100" optional="true" label="Minimum amplitude ratio (%)" help="Exclude paths with a ratio below this ratio (expressed as a percentage). If unset, will not restrict based on amplitude ratio."/>
+        </xml>
+        <xml name="max_degeneracy">
+            <param name="max_degeneracy" type="integer" min="1" optional="true" label="Maximum degeneracy" help="Exclude paths with degeneracy above this value. If unset, will not restrict based on degeneracy."/>
+        </xml>
     </macros>
     <creator>
         <person givenName="Patrick" familyName="Austin" url="https://github.com/patrick-austin" identifier="https://orcid.org/0000-0002-6279-7823"/>
     </creator>
-    <requirements>
-        <requirement type="package" version="@TOOL_VERSION@">xraylarch</requirement>
-        <requirement type="package" version="3.5.2">matplotlib</requirement>
-    </requirements>
     <required_files>
         <include type="literal" path="larch_select_paths.py"/>
     </required_files>
     <command detect_errors="exit_code"><![CDATA[
+        mkdir sp &&
         python '${__tool_directory__}/larch_select_paths.py' '$inputs'
     ]]></command>
     <configfiles>
@@ -64,8 +73,8 @@
             <conditional name="selection">
                 <param name="selection" type="select" label="Selection method">
                     <option value="all" selected="true">All paths</option>
-                    <!-- <option value="number">Fixed number</option>
-                    <option value="combinations">Combinations</option> -->
+                    <option value="criteria">Criteria</option>
+                    <option value="combinations">Combinations</option>
                     <option value="manual">Manual</option>
                 </param>
                 <when value="all">
@@ -97,10 +106,20 @@
                         </section>
                     </repeat>
                 </when>
-                <!-- <when value="number">
+                <when value="criteria">
+                    <expand macro="max_number"/>
+                    <expand macro="max_path_length"/>
+                    <expand macro="min_amplitude_ratio"/>
+                    <expand macro="max_degeneracy"/>
                 </when>
                 <when value="combinations">
-                </when> -->
+                    <param name="min_combination_size" type="integer" value="1" min="1" label="Minimum combination size" help="Each combination will have at least this many paths. If less then this many paths match the below criteria, a single combination with all paths will be generated."/>
+                    <param name="max_combination_size" type="integer" optional="true" label="Maximum combination size" help="Each combination will have at most this many paths. Note this is distinct from the number of paths to consider, below. One might want to consider combinations of the 5 shortest paths (set using 'Maximum number of paths'), but have no more than 3 paths in each combination (set using this)."/>
+                    <expand macro="max_number"/>
+                    <expand macro="max_path_length"/>
+                    <expand macro="min_amplitude_ratio"/>
+                    <expand macro="max_degeneracy"/>
+                </when>
                 <when value="manual">
                     <repeat name="paths" title="Select paths" help="Identify paths to use in the fitting by their id, and optionally define their variables. This will overwrite and defaults set above.">
                         <param name="id" type="integer" value="1" min="1" label="Path ID" help="Numerical id of a path to select, this appears at the end of the label and filename in the path summary CSV."/>
@@ -138,7 +157,13 @@
             <filter>len(feff_outputs) > 1</filter>
         </data>
         <data name="gds_csv" format="gds" from_work_dir="gds.csv" label="GDS values for ${on_string}"/>
-        <data name="sp_csv" format="sp" from_work_dir="sp.csv" label="Selected paths for ${on_string}"/>
+        <data name="sp_csv" format="sp" from_work_dir="sp.csv" label="Selected paths for ${on_string}">
+            <filter>not any([f["selection"]["selection"] == "combinations" for f in feff_outputs])</filter>
+        </data>
+        <collection name="sp_collection" format="sp" type="list" label="Selected path combinations for ${on_string}">
+            <discover_datasets pattern="__name_and_ext__" directory="sp"/>
+            <filter>any([f["selection"]["selection"] == "combinations" for f in feff_outputs])</filter>
+        </collection>
     </outputs>
     <tests>
         <!-- Test defaults for CSV with select_all -->
@@ -272,6 +297,51 @@
             <output name="gds_csv" file="gds_merge_custom.csv"/>
             <output name="sp_csv" file="sp_merge_custom.csv"/>
         </test>
+        <!-- Test for criteria based selection -->
+        <test expect_num_outputs="2">
+            <repeat name="feff_outputs">
+                <param name="paths_zip" value="FEFF_paths.zip"/>
+                <param name="paths_file" value="[CSV_summary_of_1564889.cif].csv"/>
+                <conditional name="selection">
+                    <param name="selection" value="criteria"/>
+                    <param name="min_amplitude_ratio" value="20"/>
+                    <param name="max_degeneracy" value="4"/>
+                </conditional>
+            </repeat>
+            <output name="gds_csv" file="gds_default.csv"/>
+            <output name="sp_csv" file="sp_criteria.csv"/>
+        </test>
+        <!-- Test for combinations based selection -->
+        <test expect_num_outputs="3">
+            <!-- Should result in 4 + 6 + 4 + 1 = 15 combinations -->
+            <repeat name="feff_outputs">
+                <param name="paths_zip" value="FEFF_paths.zip"/>
+                <param name="paths_file" value="[CSV_summary_of_1564889.cif].csv"/>
+                <conditional name="selection">
+                    <param name="selection" value="combinations"/>
+                    <param name="min_amplitude_ratio" value="20"/>
+                </conditional>
+            </repeat>
+            <!-- Should result in 3 combinations -->
+            <repeat name="feff_outputs">
+                <param name="paths_zip" value="FEFF_paths.zip"/>
+                <param name="paths_file" value="[CSV_summary_of_1564889.cif].csv"/>
+                <conditional name="selection">
+                    <param name="selection" value="combinations"/>
+                    <param name="min_combination_size" value="2"/>
+                    <param name="max_combination_size" value="2"/>
+                    <param name="max_number" value="3"/>
+                </conditional>
+            </repeat>
+            <output name="merged_directories">
+                <assert_contents>
+                    <has_size value="206000" delta="1000"/>
+                </assert_contents>
+            </output>
+            <output name="gds_csv" file="gds_default.csv"/>
+            <!-- Should get 15 * 3 = 45 combinations in total -->
+            <output_collection name="sp_collection" type="list" count="45"/>
+        </test>
     </tests>
     <help><![CDATA[
         Select FEFF scattering paths to use in the fitting process.
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test-data/sp_criteria.csv	Wed Dec 06 13:04:15 2023 +0000
@@ -0,0 +1,4 @@
+  id,                 filename,                    label, s02,   e0,                   sigma2,     deltar
+   1,        feff/feff0001.dat,                   S.Fe.1, s02,   e0,                   sigma2, alpha*reff
+   2,        feff/feff0002.dat,                   S.Fe.2, s02,   e0,                   sigma2, alpha*reff
+   3,        feff/feff0005.dat,                   S.Fe.5, s02,   e0,                   sigma2, alpha*reff