comparison ml_tool/ml_tool.py @ 0:76a728a52df6 draft default tip

planemo upload for repository https://github.com/jaidevjoshi83/MicroBiomML commit 5ef78d4decc95ac107c468499328e7f086289ff9-dirty
author jay
date Tue, 17 Feb 2026 10:52:45 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:76a728a52df6
1 from pycaret.classification import setup, create_model, tune_model, pull
2 import subprocess
3 import itertools
4 import sys
5 import argparse
6 import pandas as pd
7 import json
8 import io
9
10 def retrieve_results_from_hdc_folds(n_folds, text):
11
12 split_text = text.splitlines()
13 n_folds = n_folds
14 df_list = []
15 for i in range(n_folds):
16 for n, line in enumerate(split_text):
17 if f"Fold {i}" in split_text[n]:
18 df_list.append([float(split_text[n+2].split(":")[1]), 'NaN', float(split_text[n+5].split(":")[1]), float(split_text[n+4].split(":")[1]), float(split_text[n+3].split(":")[1]), "NaN", float(split_text[n+6].split(":")[1])])
19
20 df = pd.DataFrame(df_list, columns=["Accuracy", "AUC", "Recall", "Prec.", "F1", "Kappa", "MCC"])
21
22 mean_row = df.mean(numeric_only=True)
23 std_row = df.std(numeric_only=True)
24
25 mean_df = mean_row.to_frame().T
26 mean_df['Fold'] = 'Mean'
27
28 std_df = std_row.to_frame().T
29 std_df['Fold'] = 'Std'
30
31 df = df.reset_index().rename(columns={'index': 'Fold'})
32
33 df_with_stats = pd.concat([df, mean_df, std_df], ignore_index=True)
34
35 return df_with_stats
36
37 def convert_value(val):
38 """Convert string to appropriate Python type."""
39 val = val.strip()
40 if val.lower() == 'true':
41 return True
42 elif val.lower() == 'false':
43 return False
44 elif val.lower() == 'none':
45 return None
46 try:
47 if '.' in val:
48 return float(val)
49 else:
50 return int(val)
51 except ValueError:
52 return val
53
54 def read_params(filename):
55
56 print("Reading hyperparameters from:", filename)
57 """Read hyperparameter values from file."""
58 params = {}
59
60 with open(filename, 'r') as f:
61 for line in f:
62 parts = line.strip().split(',')
63 key = parts[0].strip()
64 values = [convert_value(val) for val in parts[1:]]
65 params[key] = values
66 return params
67
68 def tune_hdc(tune_param, data, output_tabular=None, output_html=None):
69 combinations = list(itertools.product(
70 tune_param['dimensionality'], tune_param['levels'], tune_param['retrain']
71 ))
72
73 full_score, f1_score = {}, {}
74
75 for n, combination in enumerate(combinations):
76 command = [
77 "chopin2.py", "--input", data,
78 "--dimensionality", str(combination[0]),
79 "--kfolds", "5",
80 "--levels", str(combination[1]),
81 "--retrain", str(combination[2])
82 ]
83 result = subprocess.run(command, capture_output=True, text=True)
84
85 if result.returncode == 0:
86 text = result.stdout
87 df_scores = retrieve_results_from_hdc_folds(5, text)
88
89 # Store the results for the current combination
90 full_score[n] = df_scores
91 # Get the mean F1 score from the results
92 mean_f1 = df_scores[df_scores['Fold'] == 'Mean']['F1'].iloc[0]
93 f1_score[n] = mean_f1
94
95 print(f"Combination {n}: {combination} -> Mean F1: {mean_f1}")
96
97 # The user might want to see the output for each run,
98 # but saving all of them to the same file will overwrite.
99 # Let's save only the best one at the end.
100 else:
101 print(f"Command failed for combination {combination}:", result.stderr)
102
103 if not f1_score:
104 print("No successful runs, cannot determine best parameters.")
105 return None
106
107 max_key = max(f1_score, key=lambda k: f1_score[k])
108 print(f"\nBest parameter combination key: {max_key} with F1 score: {f1_score[max_key]}")
109
110 best_results = full_score[max_key]
111
112 if output_tabular:
113 best_results.to_csv(output_tabular, sep='\t', index=False)
114 if output_html:
115 best_results.to_html(output_html, index=False)
116
117 return best_results
118
119
120 def run_pycaret(algo=None, custom_para=None, tune_para=None, file_path=None, setup_param=None, target_label=None, metadata_file=None, output_tabular=None, output_html=None, dp_columns=None, param_txt=None):
121
122 # print(target_label)
123 df = pd.read_csv(file_path, sep='\t')
124 df_metadata = pd.read_csv(metadata_file, sep='\t')
125
126 dp_column_list = [df.columns.tolist()[int(i)-1] for i in dp_columns.split(',')] if dp_columns else []
127
128 if dp_column_list:
129 df = df.drop(columns=dp_column_list)
130
131 # Index column drop removed
132 setup_dict = json.loads(setup_param)
133
134 # Handle target_label (index or name)
135 try:
136 col_idx = int(target_label) - 1
137 setup_dict['target'] = df_metadata.columns.tolist()[col_idx]
138 except ValueError:
139 setup_dict['target'] = target_label
140
141 combine_df = pd.concat([df, df_metadata[setup_dict['target']]], axis=1)
142
143 combine_df.to_csv("./training_data_with_target_columns.tsv", sep='\t', index=False)
144
145 # Check for empty or too small dataframe before setup
146 if combine_df.empty or len(combine_df) < 2:
147 print("Error: Not enough samples after filtering for PyCaret setup. Please check your input data and parameters.")
148 sys.exit(1)
149
150 if algo == 'hdc':
151
152 file_path = "./training_data_with_target_columns.tsv"
153 if custom_para and not tune_para:
154
155 custom_params = json.loads(custom_para)
156 command = ['chopin2.py', "--input", file_path, "--kfolds", "5"]
157
158 for c, v in custom_params.items():
159 command.append("--" + c)
160 command.append(str(v))
161
162 result = subprocess.run(command, capture_output=True, text=True)
163 print("--- HDC (chopin2.py) STDOUT ---")
164 print(result.stdout)
165 print("--- HDC (chopin2.py) STDERR ---")
166 print(result.stderr)
167 print("--- End HDC Output ---")
168 if result.returncode == 0:
169 text = result.stdout
170 df_scores = retrieve_results_from_hdc_folds(4, text)
171 if output_tabular:
172 df_scores.to_csv(output_tabular, sep='\t', index=False)
173 if output_html:
174 df_scores.to_html(output_html, index=False)
175 else:
176 print("Command failed:", result.stderr)
177
178 elif tune_para:
179 params = read_params(param_txt)
180 result = tune_hdc(params, file_path, output_tabular=output_tabular, output_html=output_html)
181 print("Best Tune Result:\n", result)
182
183 else:
184 command = ["chopin2.py", "--input", file_path, "--levels", "100", "--kfolds", "5"]
185 result = subprocess.run(command, capture_output=True, text=True)
186 if result.returncode == 0:
187 text = result.stdout
188 df_scores =retrieve_results_from_hdc_folds(5, text)
189 if output_tabular:
190 df_scores.to_csv(output_tabular, sep='\t', index=False)
191 if output_html:
192 df_scores.to_html(output_html, index=False)
193 else:
194 print("Command failed:", result.stderr)
195
196 else:
197 clf = setup(data=combine_df, **setup_dict)
198 if custom_para:
199 custom_params = json.loads(custom_para)
200 model = create_model(algo, **custom_params)
201 df_result = pull()
202 res = df_result.T['Mean']
203 print(res)
204 with open('logs.log', 'a') as f:
205 f.write(str(res) + '\n')
206 # Add three-letter classifier suffix (algorithm + 'C') to columns except 'Fold'
207 algo_abbr = (str(algo).upper()[:2] + 'C') if algo else "ALC"
208 df_result.columns = [col if col == 'Fold' else f"{col}_{algo_abbr}" for col in df_result.columns]
209 if output_tabular:
210 df_result.to_csv(output_tabular, sep='\t')
211 if output_html:
212 df_result.to_html(output_html)
213
214 elif tune_para:
215 params = read_params(param_txt)
216 # Generate all combinations of hyperparameters
217 keys, values = zip(*params.items())
218 combinations = [dict(zip(keys, v)) for v in itertools.product(*values)]
219 results = []
220 f1_scores = []
221 for idx, comb in enumerate(combinations):
222 print(f"Tuning combination {idx+1}/{len(combinations)}: {comb}")
223 try:
224 model = create_model(algo)
225 tuned_model = tune_model(model, custom_grid={k: [v] for k, v in comb.items()})
226 df_result = pull()
227 res = df_result.T['Mean']
228 print(f"Result for combination {comb}:\n{res}")
229 with open('logs.log', 'a') as f:
230 f.write(f"Combination {comb}: {str(res)}\n")
231 # Add three-letter classifier suffix (algorithm + 'C') to columns except 'Fold'
232 algo_abbr = (str(algo).upper()[:2] + 'C') if algo else "ALC"
233 df_result.columns = [col if col == 'Fold' else f"{col}_{algo_abbr}" for col in df_result.columns]
234 results.append(df_result)
235 # Try to get F1 score for ranking
236 try:
237 f1 = res['F1']
238 except Exception:
239 f1 = None
240 f1_scores.append(f1)
241 except ValueError as e:
242 print(f"Skipping invalid combination {comb}: {e}")
243 with open('logs.log', 'a') as f:
244 f.write(f"Skipping invalid combination {comb}: {e}\n")
245 results.append(pd.DataFrame()) # Add empty dataframe to keep indices aligned
246 f1_scores.append(None)
247
248 # Select best result by F1 score (if available)
249 if not any(f1 is not None for f1 in f1_scores):
250 print("No successful tuning runs. Cannot determine best parameters.")
251 # Exit or handle as appropriate
252 if output_tabular:
253 pd.DataFrame().to_csv(output_tabular, sep='\t')
254 if output_html:
255 pd.DataFrame().to_html(output_html)
256 return
257
258 best_idx = max((i for i, f1 in enumerate(f1_scores) if f1 is not None), key=lambda i: f1_scores[i])
259 best_result = results[best_idx]
260 best_comb = combinations[best_idx]
261 best_f1 = f1_scores[best_idx]
262
263 print(f"\nBest parameter combination: {best_comb} with F1 score: {best_f1}")
264 with open('logs.log', 'a') as f:
265 f.write(f"Best combination: {best_comb} F1: {best_f1}\n")
266 if output_tabular:
267 best_result.to_csv(output_tabular, sep='\t')
268 if output_html:
269 best_result.to_html(output_html)
270
271 else:
272 model = create_model(algo)
273 df_result = pull()
274 res = df_result.T['Mean']
275
276 with open('logs.log', 'a') as f:
277 f.write(str(res) + '\n')
278 # Add three-letter classifier suffix (algorithm + 'C') to columns except 'Fold'
279 algo_abbr = (str(algo).upper()[:2] + 'C') if algo else "ALC"
280 df_result.columns = [col if col == 'Fold' else f"{col}_{algo_abbr}" for col in df_result.columns]
281 if output_tabular:
282 df_result.to_csv(output_tabular, sep='\t')
283 if output_html:
284 df_result.to_html(output_html)
285
286 if __name__ == "__main__":
287 parser = argparse.ArgumentParser(description='Run PyCaret ML setup.')
288 parser.add_argument('--algo', type=str, required=False, help='Algorithm to run')
289 parser.add_argument('--data_file', type=str, required=True, help='Path to data file')
290 parser.add_argument('--metadata_file', type=str, required=True, help='Path to metadata file')
291 parser.add_argument('--custom_para', required=False, default=None, help='Custom hyperparameters (JSON string)')
292 parser.add_argument('--tune_para', required=False, default=None, help='Flag for tuning hyperparameters')
293 parser.add_argument('--setup', required=True, type=str, help='Setup parameters as JSON string')
294 parser.add_argument('--target_label', required=False, type=str, help='Name of the target label Column')
295 parser.add_argument('--output_tabular', required=False, type=str, help='Path to output tabular file')
296 parser.add_argument('--output_html', required=False, type=str, help='Path to output HTML file')
297 parser.add_argument('--dp_columns', required=False, type=str, help='Columns to drop from training data')
298 parser.add_argument('--param_file', type=str, required=False, help='Path to parameter file')
299
300
301 args = parser.parse_args()
302
303 run_pycaret(
304 algo=args.algo,
305 file_path=args.data_file,
306 custom_para=args.custom_para,
307 tune_para=args.tune_para,
308 setup_param=args.setup,
309 target_label=args.target_label,
310 metadata_file=args.metadata_file,
311 output_tabular=args.output_tabular,
312 output_html=args.output_html,
313 dp_columns=args.dp_columns,
314 param_txt=args.param_file
315 )
316
317
318