Mercurial > repos > recetox > table_scipy_interpolate
comparison table_scipy_interpolate.py @ 0:0112f08c95ed draft default tip
planemo upload for repository https://github.com/RECETOX/galaxytools/tree/master/tools/tables commit d0ff40eb2b536fec6c973c3a9ea8e7f31cd9a0d6
author | recetox |
---|---|
date | Wed, 29 Jan 2025 15:36:02 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:0112f08c95ed |
---|---|
1 import argparse | |
2 import logging | |
3 from typing import Callable, Tuple | |
4 | |
5 | |
6 import numpy as np | |
7 import pandas as pd | |
8 from scipy.interpolate import Akima1DInterpolator, CubicSpline, PchipInterpolator | |
9 from utils import LoadDataAction, StoreOutputAction | |
10 | |
11 | |
12 class InterpolationModelAction(argparse.Action): | |
13 def __call__( | |
14 self, | |
15 parser: argparse.ArgumentParser, | |
16 namespace: argparse.Namespace, | |
17 values: str, | |
18 option_string: str = None, | |
19 ) -> None: | |
20 """ | |
21 Custom argparse action to map interpolation method names to their corresponding functions. | |
22 | |
23 Parameters: | |
24 parser (argparse.ArgumentParser): The argument parser instance. | |
25 namespace (argparse.Namespace): The namespace to hold the parsed values. | |
26 values (str): The interpolation method name. | |
27 option_string (str): The option string. | |
28 """ | |
29 interpolators = { | |
30 "linear": np.interp, | |
31 "cubic": CubicSpline, | |
32 "pchip": PchipInterpolator, | |
33 "akima": Akima1DInterpolator, | |
34 } | |
35 if values not in interpolators: | |
36 raise ValueError(f"Unknown interpolation method: {values}") | |
37 setattr(namespace, self.dest, interpolators[values]) | |
38 | |
39 | |
40 def interpolate_data( | |
41 reference: pd.DataFrame, | |
42 query: pd.DataFrame, | |
43 x_col: int, | |
44 y_col: int, | |
45 xnew_col: int, | |
46 model: Callable, | |
47 output_dataset: Tuple[Callable[[pd.DataFrame, str], None], str], | |
48 ) -> None: | |
49 """ | |
50 Interpolate data using the specified model. | |
51 | |
52 Parameters: | |
53 reference (pd.DataFrame): The reference dataset. | |
54 query (pd.DataFrame): The query dataset. | |
55 x_col (int): The 1-based index of the x column in the reference dataset. | |
56 y_col (int): The 1-based index of the y column in the reference dataset. | |
57 xnew_col (int): The 1-based index of the x column in the query dataset. | |
58 model (Callable): The interpolation model to use. | |
59 output_dataset (Tuple[Callable[[pd.DataFrame, str], None], str]): The output dataset and its file extension. | |
60 """ | |
61 try: | |
62 # Convert 1-based indices to 0-based indices | |
63 x_col_name = reference.columns[x_col - 1] | |
64 y_col_name = reference.columns[y_col - 1] | |
65 xnew_col_name = query.columns[xnew_col - 1] | |
66 | |
67 # Check if y_col already exists in the query dataset | |
68 if y_col_name in query.columns: | |
69 raise ValueError( | |
70 f"Column '{y_col_name}' already exists in the query dataset." | |
71 ) | |
72 | |
73 if model == np.interp: | |
74 query[y_col_name] = model( | |
75 query[xnew_col_name], reference[x_col_name], reference[y_col_name] | |
76 ) | |
77 else: | |
78 model_instance = model(reference[x_col_name], reference[y_col_name]) | |
79 query[y_col_name] = model_instance(query[xnew_col_name]).astype(float) | |
80 | |
81 write_func, file_path = output_dataset | |
82 write_func(query, file_path) | |
83 except Exception as e: | |
84 logging.error(f"Error in interpolate_data function: {e}") | |
85 raise | |
86 | |
87 | |
88 def main( | |
89 reference_dataset: pd.DataFrame, | |
90 query_dataset: pd.DataFrame, | |
91 x_col: int, | |
92 y_col: int, | |
93 xnew_col: int, | |
94 model: Callable, | |
95 output_dataset: Tuple[Callable[[pd.DataFrame, str], None], str], | |
96 ) -> None: | |
97 """ | |
98 Main function to load the datasets, perform interpolation, and save the result. | |
99 | |
100 Parameters: | |
101 reference_dataset (Tuple[pd.DataFrame, str]): The reference dataset and its file extension. | |
102 query_dataset (Tuple[pd.DataFrame, str]): The query dataset and its file extension. | |
103 x_col (int): The 1-based index of the x column in the reference dataset. | |
104 y_col (int): The 1-based index of the y column in the reference dataset. | |
105 xnew_col (int): The 1-based index of the x column in the query dataset. | |
106 model (Callable): The interpolation model to use. | |
107 output_dataset (Tuple[Callable[[pd.DataFrame, str], None], str]): The output dataset and its file extension. | |
108 """ | |
109 try: | |
110 interpolate_data(reference_dataset, query_dataset, x_col, y_col, xnew_col, model, output_dataset) | |
111 except Exception as e: | |
112 logging.error(f"Error in main function: {e}") | |
113 raise | |
114 | |
115 | |
116 if __name__ == "__main__": | |
117 logging.basicConfig(level=logging.INFO) | |
118 parser = argparse.ArgumentParser( | |
119 description="Interpolate data using various methods." | |
120 ) | |
121 parser.add_argument( | |
122 "--reference_dataset", | |
123 nargs=2, | |
124 action=LoadDataAction, | |
125 required=True, | |
126 help="Path to the reference dataset and its file extension (csv, tsv, parquet)", | |
127 ) | |
128 parser.add_argument( | |
129 "--query_dataset", | |
130 nargs=2, | |
131 action=LoadDataAction, | |
132 required=True, | |
133 help="Path to the query dataset and its file extension (csv, tsv, parquet)", | |
134 ) | |
135 parser.add_argument( | |
136 "--x_col", | |
137 type=int, | |
138 required=True, | |
139 help="1-based index of the x column in the reference dataset", | |
140 ) | |
141 parser.add_argument( | |
142 "--y_col", | |
143 type=int, | |
144 required=True, | |
145 help="1-based index of the y column in the reference dataset", | |
146 ) | |
147 parser.add_argument( | |
148 "--xnew_col", | |
149 type=int, | |
150 required=True, | |
151 help="1-based index of the x column in the query dataset", | |
152 ) | |
153 parser.add_argument( | |
154 "--model", | |
155 type=str, | |
156 action=InterpolationModelAction, | |
157 required=True, | |
158 help="Interpolation model to use (linear, cubic, pchip, akima)", | |
159 ) | |
160 parser.add_argument( | |
161 "--output_dataset", | |
162 nargs=2, | |
163 action=StoreOutputAction, | |
164 required=True, | |
165 help="Path to the output dataset and its file extension (csv, tsv, parquet)", | |
166 ) | |
167 | |
168 args = parser.parse_args() | |
169 main( | |
170 args.reference_dataset, | |
171 args.query_dataset, | |
172 args.x_col, | |
173 args.y_col, | |
174 args.xnew_col, | |
175 args.model, | |
176 args.output_dataset, | |
177 ) |