Mercurial > repos > recetox > ipapy2_clustering
comparison utils.py @ 0:cb18b8fcb441 draft default tip
planemo upload for repository https://github.com/RECETOX/galaxytools/tree/master/tools/ipapy2 commit 64b61ff2823b4f54868c0ab7a4c0dc49eaf2979a
| author | recetox |
|---|---|
| date | Fri, 16 May 2025 08:02:15 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:cb18b8fcb441 |
|---|---|
| 1 import argparse | |
| 2 from typing import Tuple | |
| 3 | |
| 4 import pandas as pd | |
| 5 | |
| 6 | |
| 7 class LoadDataAction(argparse.Action): | |
| 8 """ | |
| 9 Custom argparse action to load data from a file. | |
| 10 Supported file formats: CSV, TSV, Tabular and Parquet. | |
| 11 | |
| 12 """ | |
| 13 | |
| 14 def __call__(self, parser, namespace, values, option_string=None): | |
| 15 """ | |
| 16 Load data from a file and store it in the namespace. | |
| 17 :param namespace: Namespace object | |
| 18 :param values: Tuple containing the file path and file extension | |
| 19 :param option_string: Option string | |
| 20 :return: None | |
| 21 """ | |
| 22 | |
| 23 file_path, file_extension = values | |
| 24 file_extension = file_extension.lower() | |
| 25 if file_extension == "csv": | |
| 26 df = pd.read_csv(file_path, keep_default_na=False).replace("", None) | |
| 27 elif file_extension in ["tsv", "tabular"]: | |
| 28 df = pd.read_csv(file_path, sep="\t", keep_default_na=False).replace( | |
| 29 "", None | |
| 30 ) | |
| 31 elif file_extension == "parquet": | |
| 32 df = pd.read_parquet(file_path).replace("", None) | |
| 33 else: | |
| 34 raise ValueError(f"Unsupported file format: {file_extension}") | |
| 35 setattr(namespace, self.dest, df) | |
| 36 | |
| 37 | |
| 38 class LoadTextAction(argparse.Action): | |
| 39 """ | |
| 40 Custom argparse action to load data from a text file. | |
| 41 """ | |
| 42 | |
| 43 def __call__(self, parser, namespace, values, option_string=None): | |
| 44 """ | |
| 45 Load data from a text file and store it in the namespace. | |
| 46 :param namespace: Namespace object | |
| 47 :param values: Tuple containing the file path and file extension | |
| 48 :param option_string: Option string | |
| 49 :return: None | |
| 50 """ | |
| 51 file_path, _ = values | |
| 52 data = [] | |
| 53 if file_path: | |
| 54 with open(file_path, "r") as f: | |
| 55 for line in f: | |
| 56 data.append(int(line.strip())) | |
| 57 setattr(namespace, self.dest, data) | |
| 58 | |
| 59 | |
| 60 def write_csv(df: pd.DataFrame, file_path: str) -> None: | |
| 61 """ | |
| 62 Write the dataframe to a CSV file. | |
| 63 | |
| 64 Parameters: | |
| 65 df (pd.DataFrame): The dataframe to write. | |
| 66 file_path (str): The path to the output CSV file. | |
| 67 """ | |
| 68 df.to_csv(file_path, index=False) | |
| 69 | |
| 70 | |
| 71 def write_tsv(df: pd.DataFrame, file_path: str) -> None: | |
| 72 """ | |
| 73 Write the dataframe to a TSV file. | |
| 74 | |
| 75 Parameters: | |
| 76 df (pd.DataFrame): The dataframe to write. | |
| 77 file_path (str): The path to the output TSV file. | |
| 78 """ | |
| 79 df.to_csv(file_path, sep="\t", index=False) | |
| 80 | |
| 81 | |
| 82 def write_parquet(df: pd.DataFrame, file_path: str) -> None: | |
| 83 """ | |
| 84 Write the dataframe to a Parquet file. | |
| 85 | |
| 86 Parameters: | |
| 87 df (pd.DataFrame): The dataframe to write. | |
| 88 file_path (str): The path to the output Parquet file. | |
| 89 """ | |
| 90 df.to_parquet(file_path, index=False) | |
| 91 | |
| 92 | |
| 93 def write_text(data: list, file_path: str) -> None: | |
| 94 """ | |
| 95 Write the data to a text file. | |
| 96 | |
| 97 Parameters: | |
| 98 data (list): The data to write. | |
| 99 file_path (str): The path to the output text file. | |
| 100 """ | |
| 101 if file_path: | |
| 102 with open(file_path, "w") as f: | |
| 103 for s in data: | |
| 104 f.write(str(s) + "\n") | |
| 105 | |
| 106 | |
| 107 class StoreOutputAction(argparse.Action): | |
| 108 def __call__( | |
| 109 self, | |
| 110 parser: argparse.ArgumentParser, | |
| 111 namespace: argparse.Namespace, | |
| 112 values: Tuple[str, str], | |
| 113 option_string: str = None, | |
| 114 ) -> None: | |
| 115 """ | |
| 116 Custom argparse action to store the output function and file path based on file extension. | |
| 117 | |
| 118 Parameters: | |
| 119 parser (argparse.ArgumentParser): The argument parser instance. | |
| 120 namespace (argparse.Namespace): The namespace to hold the parsed values. | |
| 121 values (Tuple[str, str]): The file path and file extension. | |
| 122 option_string (str): The option string. | |
| 123 """ | |
| 124 file_path, file_extension = values | |
| 125 file_extension = file_extension.lower() | |
| 126 if file_extension == "csv": | |
| 127 write_func = write_csv | |
| 128 elif file_extension in ["tsv", "tabular"]: | |
| 129 write_func = write_tsv | |
| 130 elif file_extension == "parquet": | |
| 131 write_func = write_parquet | |
| 132 elif file_extension == "txt": | |
| 133 write_func = write_text | |
| 134 else: | |
| 135 raise ValueError(f"Unsupported file format: {file_extension}") | |
| 136 setattr(namespace, self.dest, (write_func, file_path)) | |
| 137 | |
| 138 | |
| 139 def flattern_annotations(annotations: dict) -> pd.DataFrame: | |
| 140 """ | |
| 141 Flatten the annotations dictionary and convert it to a dataframe. | |
| 142 | |
| 143 Parameters: | |
| 144 annotations (dict): The annotations dictionary. | |
| 145 | |
| 146 Returns: | |
| 147 pd.DataFrame: The flattened annotations dataframe. | |
| 148 """ | |
| 149 annotations_flat = pd.DataFrame() | |
| 150 for peak_id in annotations: | |
| 151 annotation = annotations[peak_id] | |
| 152 annotation["peak_id"] = peak_id | |
| 153 annotations_flat = pd.concat([annotations_flat, annotation]) | |
| 154 return annotations_flat | |
| 155 | |
| 156 | |
| 157 def group_by_peak_id(df: pd.DataFrame) -> dict: | |
| 158 """ | |
| 159 Convert a pandas dataframe to a dictionary where each key is a unique 'peak_id' | |
| 160 and each value is a dataframe subset corresponding to that 'peak_id'. | |
| 161 | |
| 162 Parameters: | |
| 163 df (pd.DataFrame): The input dataframe. | |
| 164 | |
| 165 Returns: | |
| 166 dict: The dictionary representation of the dataframe. | |
| 167 """ | |
| 168 annotations = {} | |
| 169 keys = set(df["peak_id"]) | |
| 170 for i in keys: | |
| 171 annotations[i] = df[df["peak_id"] == i].drop("peak_id", axis=1) | |
| 172 return annotations | |
| 173 | |
| 174 | |
| 175 class CustomArgumentParser(argparse.ArgumentParser): | |
| 176 def __init__(self, *args, **kwargs): | |
| 177 super().__init__(*args, **kwargs) | |
| 178 self.register("action", "load_data", LoadDataAction) | |
| 179 self.register("action", "store_output", StoreOutputAction) | |
| 180 self.register("action", "load_text", LoadTextAction) | |
| 181 self.add_argument( | |
| 182 "--output_dataset", | |
| 183 nargs=2, | |
| 184 action="store_output", | |
| 185 required=True, | |
| 186 help="A file path for the output results.", | |
| 187 ) | |
| 188 | |
| 189 | |
| 190 class MSArgumentParser(CustomArgumentParser): | |
| 191 def __init__(self, *args, **kwargs): | |
| 192 super().__init__(*args, **kwargs) | |
| 193 self.add_argument( | |
| 194 "--ncores", | |
| 195 type=int, | |
| 196 default=1, | |
| 197 help="The number of cores to use for parallel processing.", | |
| 198 ) | |
| 199 self.add_argument( | |
| 200 "--pRTout", | |
| 201 type=float, | |
| 202 default=0.4, | |
| 203 help=( | |
| 204 "multiplicative factor for the RT if measured RT is outside the RTrange" | |
| 205 " present in the database." | |
| 206 ), | |
| 207 ) | |
| 208 self.add_argument( | |
| 209 "--pRTNone", | |
| 210 type=float, | |
| 211 default=0.8, | |
| 212 help=( | |
| 213 "multiplicative factor for the RT if no RTrange present in the" | |
| 214 " database." | |
| 215 ), | |
| 216 ) | |
| 217 self.add_argument( | |
| 218 "--ppmthr", | |
| 219 type=float, | |
| 220 help=( | |
| 221 "maximum ppm possible for the annotations. if not provided equal to" | |
| 222 " 2*ppm." | |
| 223 ), | |
| 224 ) | |
| 225 self.add_argument( | |
| 226 "--ppm", | |
| 227 type=float, | |
| 228 required=True, | |
| 229 default=100, | |
| 230 help="accuracy of the MS instrument used.", | |
| 231 ) | |
| 232 self.add_argument( | |
| 233 "--ratiosd", | |
| 234 type=float, | |
| 235 default=0.9, | |
| 236 help=( | |
| 237 "acceptable ratio between predicted intensity and observed intensity of" | |
| 238 " isotopes." | |
| 239 ), | |
| 240 ) | |
| 241 self.add_argument( | |
| 242 "--ppmunk", | |
| 243 type=float, | |
| 244 help=( | |
| 245 "pm associated to the 'unknown' annotation. If not provided equal to" | |
| 246 " ppm." | |
| 247 ), | |
| 248 ) | |
| 249 self.add_argument( | |
| 250 "--ratiounk", | |
| 251 type=float, | |
| 252 default=0.5, | |
| 253 help="isotope ratio associated to the 'unknown' annotation.", | |
| 254 ) | |
| 255 | |
| 256 | |
| 257 class GibbsArgumentParser(CustomArgumentParser): | |
| 258 def __init__(self, *args, **kwargs): | |
| 259 super().__init__(*args, **kwargs) | |
| 260 self.add_argument( | |
| 261 "--noits", | |
| 262 type=int, | |
| 263 help="number of iterations if the Gibbs sampler to be run", | |
| 264 ) | |
| 265 self.add_argument( | |
| 266 "--burn", | |
| 267 type=int, | |
| 268 help="""number of iterations to be ignored when computing posterior | |
| 269 probabilities. If None, is set to 10% of total iterations""", | |
| 270 ) | |
| 271 self.add_argument( | |
| 272 "--delta_add", | |
| 273 type=float, | |
| 274 default=1, | |
| 275 help="""parameter used when computing the conditional priors. The | |
| 276 parameter must be positive. The smaller the parameter the more | |
| 277 weight the adducts connections have on the posterior | |
| 278 probabilities. Default 1.""", | |
| 279 ) | |
| 280 self.add_argument( | |
| 281 "--all_out", | |
| 282 type=bool, | |
| 283 help="Output all the Gibbs sampler results.", | |
| 284 ) | |
| 285 self.add_argument( | |
| 286 "--zs_out", | |
| 287 nargs=2, | |
| 288 action="store_output", | |
| 289 help="A file path for the output results of the Gibbs sampler.", | |
| 290 ) | |
| 291 self.add_argument( | |
| 292 "--zs", | |
| 293 nargs=2, | |
| 294 action="load_text", | |
| 295 help="""a txt file containing the list of assignments computed in a previous run of the Gibbs sampler. | |
| 296 Optional, default None.""", | |
| 297 ) |
