Mercurial > repos > jay > feature_selector
view feature_selection/featureSelection.py @ 0:76a728a52df6 draft default tip
planemo upload for repository https://github.com/jaidevjoshi83/MicroBiomML commit 5ef78d4decc95ac107c468499328e7f086289ff9-dirty
| author | jay |
|---|---|
| date | Tue, 17 Feb 2026 10:52:45 +0000 |
| parents | |
| children |
line wrap: on
line source
import pandas as pd from sklearn.feature_selection import SequentialFeatureSelector from sklearn.linear_model import LogisticRegression from sklearn.tree import DecisionTreeClassifier from sklearn.ensemble import RandomForestClassifier from sklearn.svm import SVC import os import time import argparse import json import subprocess def retrieve_results_from_hdc_folds(n_folds, text): split_text = text.splitlines() n_folds = n_folds df_list = [] for i in range(n_folds): for n, line in enumerate(split_text): if f"Fold {i}" in split_text[n]: df_list.append([float(split_text[n+2].split(":")[1]), 'NaN', float(split_text[n+5].split(":")[1]), float(split_text[n+4].split(":")[1]), float(split_text[n+3].split(":")[1]), "NaN", float(split_text[n+6].split(":")[1])]) df = pd.DataFrame(df_list, columns=["Accuracy", "AUC", "Recall", "Prec.", "F1", "Kappa", "MCC"]) mean_row = df.mean(numeric_only=True) std_row = df.std(numeric_only=True) mean_df = mean_row.to_frame().T mean_df['Fold'] = 'Mean' std_df = std_row.to_frame().T std_df['Fold'] = 'Std' df = df.reset_index().rename(columns={'index': 'Fold'}) df_with_stats = pd.concat([df, mean_df, std_df], ignore_index=True) return df_with_stats def parse_arguments(): """Parse command-line arguments.""" parser = argparse.ArgumentParser(description="Feature selection using SequentialFeatureSelector on a single TSV file") parser.add_argument('--input', required=True, help="Path to count matrix file") parser.add_argument('--metadata', required=True, help="Path to metadata file") parser.add_argument('--threads', type=int, required=True, help="Number of threads") parser.add_argument('--classifier', required=True, choices=['lr', 'dt', 'sv', 'rf', 'hdc'], help="Classifier choice") parser.add_argument('--label', required=True, help="Name of the class label column in the dataset") parser.add_argument('--tol', type=float, default=1e-5, help="Tolerance for SequentialFeatureSelector convergence (default: 1e-5)") parser.add_argument('--index_clm', type=str, default='sample_id', help="Index Column") parser.add_argument('--feature_out', type=str, default='out.tsv', help="Output file for selected features") parser.add_argument('--log', type=str, default='out.log', help="Log file") parser.add_argument('--feature_selection', type=str, default=None, help="Path to feature selection file") parser.add_argument('--dp_columns', type=str, help='Columns to drop from training data') return parser.parse_args() def load_and_preprocess_data(args): """Load and preprocess the input data.""" df_counts = pd.read_csv(args.input, sep="\t") if args.dp_columns: dp_column_list = [df_counts.columns.tolist()[int(i) - 1] for i in args.dp_columns.split(',')] df_counts.drop(columns=dp_column_list, inplace=True) df_metadata = pd.read_csv(args.metadata, sep="\t") target_column = df_metadata.columns.to_list()[int(args.label)-1] if args.index_clm and args.index_clm in df_counts.columns and args.index_clm in df_metadata.columns: df_counts.set_index(args.index_clm, inplace=True) df_metadata.set_index(args.index_clm, inplace=True) df = pd.concat([df_counts, df_metadata[target_column]], axis=1) df.to_csv(f"temp_input.tsv", sep='\t', index=False) if args.feature_selection: with open(args.feature_selection) as f: features = [line.strip() for line in f] df = df[features] return df, target_column def run_hdc_classification(args, df, dir_name, start_time): """Run HDC classification and extract features.""" temp_input_file = f"./{dir_name}/temp_hdc_input.tsv" df.to_csv(temp_input_file, sep='\t', index=False) command = ["chopin2.py", "--input", temp_input_file, "--kfolds", "5", "--levels", "100", "--feature-selection", "backward"] result = subprocess.run(command, capture_output=True, text=True) if result.returncode == 0: text = result.stdout # Extract and save features try: features_start = text.index("Features\n----------") + len("Features\n----------\n") features_end = text.index("\n\nTotal ML models:") selected_features_str = text[features_start:features_end] selected_features = selected_features_str.strip().split('\n') print("Selected Features:", selected_features) out_df = pd.DataFrame(selected_features, columns=['feature_name']) out_df.to_csv("selected_features.tsv", sep="\t", index=False) except ValueError: print("Could not find the feature list in the HDC output.") # Parse and write scores to the log file try: df_scores = retrieve_results_from_hdc_folds(5, text) with open("hdc_performance.log", 'a') as log_file: log_file.write("\n--- HDC Performance Scores ---\n") df_scores.to_string(log_file) log_file.write("\n") except Exception as e: print(f"\nCould not parse performance scores: {e}") else: print("Command failed:", result.stderr) elapsed_time = round(time.time() - start_time, 3) with open(args.log, 'w') as log_file: log_file.write(f"{'time in seconds'}\t{'algorithm'}\n") log_file.write(f"{elapsed_time}\t{args.classifier}\n") def run_sequential_feature_selection(args, df, start_time, target_label): """Run sequential feature selection for non-HDC classifiers.""" classifiers = { 'lr': LogisticRegression(), 'dt': DecisionTreeClassifier(), 'sv': SVC(), 'rf': RandomForestClassifier() } classifier = classifiers[args.classifier] if target_label not in df.columns: raise ValueError(f"Label column '{target_label}' not found in the data.") X = df.drop(columns=[target_label]) labels = list(set(df[target_label].to_list() )) if len(labels) != 2: raise ValueError(f"Expected exactly 2 class labels, found {len(labels)}: {labels}") label_mapping = {labels[0]: 0, labels[1]: 1} y = df[target_label].map(label_mapping).tolist() sfs = SequentialFeatureSelector( classifier, n_features_to_select='auto', direction='backward', tol=args.tol, n_jobs=args.threads ) sfs.fit(X, y) selected_feature_names = X.columns[sfs.get_support()] out_df = pd.DataFrame(selected_feature_names, columns=['feature_name']) out_df.to_csv(args.feature_out, sep="\t", index=False) elapsed_time = round(time.time() - start_time, 3) with open(args.log, 'w') as log_file: log_file.write(f"{'time in seconds'}\t{'algorithm'}\n") log_file.write(f"{elapsed_time}\t{args.classifier}\n") def main(): """Main function to orchestrate the feature selection process.""" args = parse_arguments() dir_name = os.path.splitext(os.path.basename(args.input))[0] + "_" + args.classifier os.makedirs(f"./{dir_name}", exist_ok=True) start_time = time.time() df, target_label = load_and_preprocess_data(args) if args.classifier == 'hdc': run_hdc_classification(args, df, dir_name, start_time) else: run_sequential_feature_selection(args, df, start_time, target_label) if __name__ == "__main__": main()
