Mercurial > repos > recetox > ipapy2_ms1_annotation
comparison utils.py @ 0:7f84a8a5edde draft default tip
planemo upload for repository https://github.com/RECETOX/galaxytools/tree/master/tools/ipapy2 commit 64b61ff2823b4f54868c0ab7a4c0dc49eaf2979a
author | recetox |
---|---|
date | Fri, 16 May 2025 08:00:41 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:7f84a8a5edde |
---|---|
1 import argparse | |
2 from typing import Tuple | |
3 | |
4 import pandas as pd | |
5 | |
6 | |
7 class LoadDataAction(argparse.Action): | |
8 """ | |
9 Custom argparse action to load data from a file. | |
10 Supported file formats: CSV, TSV, Tabular and Parquet. | |
11 | |
12 """ | |
13 | |
14 def __call__(self, parser, namespace, values, option_string=None): | |
15 """ | |
16 Load data from a file and store it in the namespace. | |
17 :param namespace: Namespace object | |
18 :param values: Tuple containing the file path and file extension | |
19 :param option_string: Option string | |
20 :return: None | |
21 """ | |
22 | |
23 file_path, file_extension = values | |
24 file_extension = file_extension.lower() | |
25 if file_extension == "csv": | |
26 df = pd.read_csv(file_path, keep_default_na=False).replace("", None) | |
27 elif file_extension in ["tsv", "tabular"]: | |
28 df = pd.read_csv(file_path, sep="\t", keep_default_na=False).replace( | |
29 "", None | |
30 ) | |
31 elif file_extension == "parquet": | |
32 df = pd.read_parquet(file_path).replace("", None) | |
33 else: | |
34 raise ValueError(f"Unsupported file format: {file_extension}") | |
35 setattr(namespace, self.dest, df) | |
36 | |
37 | |
38 class LoadTextAction(argparse.Action): | |
39 """ | |
40 Custom argparse action to load data from a text file. | |
41 """ | |
42 | |
43 def __call__(self, parser, namespace, values, option_string=None): | |
44 """ | |
45 Load data from a text file and store it in the namespace. | |
46 :param namespace: Namespace object | |
47 :param values: Tuple containing the file path and file extension | |
48 :param option_string: Option string | |
49 :return: None | |
50 """ | |
51 file_path, _ = values | |
52 data = [] | |
53 if file_path: | |
54 with open(file_path, "r") as f: | |
55 for line in f: | |
56 data.append(int(line.strip())) | |
57 setattr(namespace, self.dest, data) | |
58 | |
59 | |
60 def write_csv(df: pd.DataFrame, file_path: str) -> None: | |
61 """ | |
62 Write the dataframe to a CSV file. | |
63 | |
64 Parameters: | |
65 df (pd.DataFrame): The dataframe to write. | |
66 file_path (str): The path to the output CSV file. | |
67 """ | |
68 df.to_csv(file_path, index=False) | |
69 | |
70 | |
71 def write_tsv(df: pd.DataFrame, file_path: str) -> None: | |
72 """ | |
73 Write the dataframe to a TSV file. | |
74 | |
75 Parameters: | |
76 df (pd.DataFrame): The dataframe to write. | |
77 file_path (str): The path to the output TSV file. | |
78 """ | |
79 df.to_csv(file_path, sep="\t", index=False) | |
80 | |
81 | |
82 def write_parquet(df: pd.DataFrame, file_path: str) -> None: | |
83 """ | |
84 Write the dataframe to a Parquet file. | |
85 | |
86 Parameters: | |
87 df (pd.DataFrame): The dataframe to write. | |
88 file_path (str): The path to the output Parquet file. | |
89 """ | |
90 df.to_parquet(file_path, index=False) | |
91 | |
92 | |
93 def write_text(data: list, file_path: str) -> None: | |
94 """ | |
95 Write the data to a text file. | |
96 | |
97 Parameters: | |
98 data (list): The data to write. | |
99 file_path (str): The path to the output text file. | |
100 """ | |
101 if file_path: | |
102 with open(file_path, "w") as f: | |
103 for s in data: | |
104 f.write(str(s) + "\n") | |
105 | |
106 | |
107 class StoreOutputAction(argparse.Action): | |
108 def __call__( | |
109 self, | |
110 parser: argparse.ArgumentParser, | |
111 namespace: argparse.Namespace, | |
112 values: Tuple[str, str], | |
113 option_string: str = None, | |
114 ) -> None: | |
115 """ | |
116 Custom argparse action to store the output function and file path based on file extension. | |
117 | |
118 Parameters: | |
119 parser (argparse.ArgumentParser): The argument parser instance. | |
120 namespace (argparse.Namespace): The namespace to hold the parsed values. | |
121 values (Tuple[str, str]): The file path and file extension. | |
122 option_string (str): The option string. | |
123 """ | |
124 file_path, file_extension = values | |
125 file_extension = file_extension.lower() | |
126 if file_extension == "csv": | |
127 write_func = write_csv | |
128 elif file_extension in ["tsv", "tabular"]: | |
129 write_func = write_tsv | |
130 elif file_extension == "parquet": | |
131 write_func = write_parquet | |
132 elif file_extension == "txt": | |
133 write_func = write_text | |
134 else: | |
135 raise ValueError(f"Unsupported file format: {file_extension}") | |
136 setattr(namespace, self.dest, (write_func, file_path)) | |
137 | |
138 | |
139 def flattern_annotations(annotations: dict) -> pd.DataFrame: | |
140 """ | |
141 Flatten the annotations dictionary and convert it to a dataframe. | |
142 | |
143 Parameters: | |
144 annotations (dict): The annotations dictionary. | |
145 | |
146 Returns: | |
147 pd.DataFrame: The flattened annotations dataframe. | |
148 """ | |
149 annotations_flat = pd.DataFrame() | |
150 for peak_id in annotations: | |
151 annotation = annotations[peak_id] | |
152 annotation["peak_id"] = peak_id | |
153 annotations_flat = pd.concat([annotations_flat, annotation]) | |
154 return annotations_flat | |
155 | |
156 | |
157 def group_by_peak_id(df: pd.DataFrame) -> dict: | |
158 """ | |
159 Convert a pandas dataframe to a dictionary where each key is a unique 'peak_id' | |
160 and each value is a dataframe subset corresponding to that 'peak_id'. | |
161 | |
162 Parameters: | |
163 df (pd.DataFrame): The input dataframe. | |
164 | |
165 Returns: | |
166 dict: The dictionary representation of the dataframe. | |
167 """ | |
168 annotations = {} | |
169 keys = set(df["peak_id"]) | |
170 for i in keys: | |
171 annotations[i] = df[df["peak_id"] == i].drop("peak_id", axis=1) | |
172 return annotations | |
173 | |
174 | |
175 class CustomArgumentParser(argparse.ArgumentParser): | |
176 def __init__(self, *args, **kwargs): | |
177 super().__init__(*args, **kwargs) | |
178 self.register("action", "load_data", LoadDataAction) | |
179 self.register("action", "store_output", StoreOutputAction) | |
180 self.register("action", "load_text", LoadTextAction) | |
181 self.add_argument( | |
182 "--output_dataset", | |
183 nargs=2, | |
184 action="store_output", | |
185 required=True, | |
186 help="A file path for the output results.", | |
187 ) | |
188 | |
189 | |
190 class MSArgumentParser(CustomArgumentParser): | |
191 def __init__(self, *args, **kwargs): | |
192 super().__init__(*args, **kwargs) | |
193 self.add_argument( | |
194 "--ncores", | |
195 type=int, | |
196 default=1, | |
197 help="The number of cores to use for parallel processing.", | |
198 ) | |
199 self.add_argument( | |
200 "--pRTout", | |
201 type=float, | |
202 default=0.4, | |
203 help=( | |
204 "multiplicative factor for the RT if measured RT is outside the RTrange" | |
205 " present in the database." | |
206 ), | |
207 ) | |
208 self.add_argument( | |
209 "--pRTNone", | |
210 type=float, | |
211 default=0.8, | |
212 help=( | |
213 "multiplicative factor for the RT if no RTrange present in the" | |
214 " database." | |
215 ), | |
216 ) | |
217 self.add_argument( | |
218 "--ppmthr", | |
219 type=float, | |
220 help=( | |
221 "maximum ppm possible for the annotations. if not provided equal to" | |
222 " 2*ppm." | |
223 ), | |
224 ) | |
225 self.add_argument( | |
226 "--ppm", | |
227 type=float, | |
228 required=True, | |
229 default=100, | |
230 help="accuracy of the MS instrument used.", | |
231 ) | |
232 self.add_argument( | |
233 "--ratiosd", | |
234 type=float, | |
235 default=0.9, | |
236 help=( | |
237 "acceptable ratio between predicted intensity and observed intensity of" | |
238 " isotopes." | |
239 ), | |
240 ) | |
241 self.add_argument( | |
242 "--ppmunk", | |
243 type=float, | |
244 help=( | |
245 "pm associated to the 'unknown' annotation. If not provided equal to" | |
246 " ppm." | |
247 ), | |
248 ) | |
249 self.add_argument( | |
250 "--ratiounk", | |
251 type=float, | |
252 default=0.5, | |
253 help="isotope ratio associated to the 'unknown' annotation.", | |
254 ) | |
255 | |
256 | |
257 class GibbsArgumentParser(CustomArgumentParser): | |
258 def __init__(self, *args, **kwargs): | |
259 super().__init__(*args, **kwargs) | |
260 self.add_argument( | |
261 "--noits", | |
262 type=int, | |
263 help="number of iterations if the Gibbs sampler to be run", | |
264 ) | |
265 self.add_argument( | |
266 "--burn", | |
267 type=int, | |
268 help="""number of iterations to be ignored when computing posterior | |
269 probabilities. If None, is set to 10% of total iterations""", | |
270 ) | |
271 self.add_argument( | |
272 "--delta_add", | |
273 type=float, | |
274 default=1, | |
275 help="""parameter used when computing the conditional priors. The | |
276 parameter must be positive. The smaller the parameter the more | |
277 weight the adducts connections have on the posterior | |
278 probabilities. Default 1.""", | |
279 ) | |
280 self.add_argument( | |
281 "--all_out", | |
282 type=bool, | |
283 help="Output all the Gibbs sampler results.", | |
284 ) | |
285 self.add_argument( | |
286 "--zs_out", | |
287 nargs=2, | |
288 action="store_output", | |
289 help="A file path for the output results of the Gibbs sampler.", | |
290 ) | |
291 self.add_argument( | |
292 "--zs", | |
293 nargs=2, | |
294 action="load_text", | |
295 help="""a txt file containing the list of assignments computed in a previous run of the Gibbs sampler. | |
296 Optional, default None.""", | |
297 ) |