Mercurial > repos > bimib > cobraxy
diff COBRAxy/marea_cluster.py @ 539:2fb97466e404 draft default tip
Uploaded
| author | francesco_lapi |
|---|---|
| date | Sat, 25 Oct 2025 14:55:13 +0000 |
| parents | fd53d42348bd |
| children |
line wrap: on
line diff
--- a/COBRAxy/marea_cluster.py Sat Oct 25 11:39:03 2025 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,557 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Created on Mon Jun 3 19:51:00 2019 -@author: Narger -""" - -import sys -import argparse -import os -import numpy as np -import pandas as pd -from sklearn.datasets import make_blobs -from sklearn.cluster import KMeans, DBSCAN, AgglomerativeClustering -from sklearn.metrics import silhouette_samples, silhouette_score, cluster -import matplotlib -matplotlib.use('agg') -import matplotlib.pyplot as plt -import scipy.cluster.hierarchy as shc -import matplotlib.cm as cm -from typing import Optional, Dict, List - -################################# process args ############################### -def process_args(args_in :List[str] = None) -> argparse.Namespace: - """ - Processes command-line arguments. - - Args: - args (list): List of command-line arguments. - - Returns: - Namespace: An object containing parsed arguments. - """ - parser = argparse.ArgumentParser(usage = '%(prog)s [options]', - description = 'process some value\'s' + - ' genes to create class.') - - parser.add_argument('-ol', '--out_log', - help = "Output log") - - parser.add_argument('-in', '--input', - type = str, - help = 'input dataset') - - parser.add_argument('-cy', '--cluster_type', - type = str, - choices = ['kmeans', 'dbscan', 'hierarchy'], - default = 'kmeans', - help = 'choose clustering algorythm') - - parser.add_argument('-sc', '--scaling', - type = str, - choices = ['true', 'false'], - default = 'true', - help = 'choose if you want to scaling the data') - - parser.add_argument('-k1', '--k_min', - type = int, - default = 2, - help = 'choose minimun cluster number to be generated') - - parser.add_argument('-k2', '--k_max', - type = int, - default = 7, - help = 'choose maximum cluster number to be generated') - - parser.add_argument('-el', '--elbow', - type = str, - default = 'false', - choices = ['true', 'false'], - help = 'choose if you want to generate an elbow plot for kmeans') - - parser.add_argument('-si', '--silhouette', - type = str, - default = 'false', - choices = ['true', 'false'], - help = 'choose if you want silhouette plots') - - parser.add_argument('-td', '--tool_dir', - type = str, - required = True, - help = 'your tool directory') - - parser.add_argument('-ms', '--min_samples', - type = int, - help = 'min samples for dbscan (optional)') - - parser.add_argument('-ep', '--eps', - type = float, - help = 'eps for dbscan (optional)') - - parser.add_argument('-bc', '--best_cluster', - type = str, - help = 'output of best cluster tsv') - - parser.add_argument( - '-idop', '--output_path', - type = str, - default='clustering/', - help = 'output path for maps') - - args_in = parser.parse_args(args_in) - return args_in - -########################### warning ########################################### -def warning(s :str) -> None: - """ - Log a warning message to an output log file and print it to the console. - - Args: - s (str): The warning message to be logged and printed. - - Returns: - None - """ - - with open(args.out_log, 'a') as log: - log.write(s + "\n\n") - print(s) - -########################## read dataset ###################################### -def read_dataset(dataset :str) -> pd.DataFrame: - """ - Read dataset from a CSV file and return it as a Pandas DataFrame. - - Args: - dataset (str): the path to the dataset to convert into a DataFrame - - Returns: - pandas.DataFrame: The dataset loaded as a Pandas DataFrame. - - Raises: - pandas.errors.EmptyDataError: If the dataset file is empty. - sys.exit: If the dataset file has the wrong format (e.g., fewer than 2 columns) - """ - try: - dataset = pd.read_csv(dataset, sep = '\t', header = 0) - except pd.errors.EmptyDataError: - sys.exit('Execution aborted: wrong format of dataset\n') - if len(dataset.columns) < 2: - sys.exit('Execution aborted: wrong format of dataset\n') - return dataset - -############################ rewrite_input ################################### -def rewrite_input(dataset :Dict) -> Dict[str, List[Optional[float]]]: - """ - Rewrite the dataset as a dictionary of lists instead of as a dictionary of dictionaries. - - Args: - dataset (pandas.DataFrame): The dataset to be rewritten. - - Returns: - dict: The rewritten dataset as a dictionary of lists. - """ - #Riscrivo il dataset come dizionario di liste, - #non come dizionario di dizionari - #dataset.pop('Reactions', None) - - for key, val in dataset.items(): - l = [] - for i in val: - if i == 'None': - l.append(None) - else: - l.append(float(i)) - - dataset[key] = l - - return dataset - -############################## write to csv ################################## -def write_to_csv (dataset :pd.DataFrame, labels :List[str], name :str) -> None: - """ - Write dataset and predicted labels to a CSV file. - - Args: - dataset (pandas.DataFrame): The dataset to be written. - labels (list): The predicted labels for each data point. - name (str): The name of the output CSV file. - - Returns: - None - """ - #labels = predict - predict = [x+1 for x in labels] - - classe = (pd.DataFrame(list(zip(dataset.index, predict)))).astype(str) - - dest = name - classe.to_csv(dest, sep = '\t', index = False, - header = ['Patient_ID', 'Class']) - -########################### trova il massimo in lista ######################## -def max_index (lista :List[int]) -> int: - """ - Find the index of the maximum value in a list. - - Args: - lista (list): The list in which we search for the index of the maximum value. - - Returns: - int: The index of the maximum value in the list. - """ - best = -1 - best_index = 0 - for i in range(len(lista)): - if lista[i] > best: - best = lista [i] - best_index = i - - return best_index - -################################ kmeans ##################################### -def kmeans (k_min: int, k_max: int, dataset: pd.DataFrame, elbow: str, silhouette: str, best_cluster: str) -> None: - """ - Perform k-means clustering on the given dataset, which is an algorithm used to partition a dataset into groups (clusters) based on their characteristics. - The goal is to divide the data into homogeneous groups, where the elements within each group are similar to each other and different from the elements in other groups. - - Args: - k_min (int): The minimum number of clusters to consider. - k_max (int): The maximum number of clusters to consider. - dataset (pandas.DataFrame): The dataset to perform clustering on. - elbow (str): Whether to generate an elbow plot for kmeans ('True' or 'False'). - silhouette (str): Whether to generate silhouette plots ('True' or 'False'). - best_cluster (str): The file path to save the output of the best cluster. - - Returns: - None - """ - if not os.path.exists(args.output_path): - os.makedirs(args.output_path) - - - if elbow == 'true': - elbow = True - else: - elbow = False - - if silhouette == 'true': - silhouette = True - else: - silhouette = False - - range_n_clusters = [i for i in range(k_min, k_max+1)] - distortions = [] - scores = [] - all_labels = [] - - clusterer = KMeans(n_clusters=1, random_state=10) - distortions.append(clusterer.fit(dataset).inertia_) - - - for n_clusters in range_n_clusters: - clusterer = KMeans(n_clusters=n_clusters, random_state=10) - cluster_labels = clusterer.fit_predict(dataset) - - all_labels.append(cluster_labels) - if n_clusters == 1: - silhouette_avg = 0 - else: - silhouette_avg = silhouette_score(dataset, cluster_labels) - scores.append(silhouette_avg) - distortions.append(clusterer.fit(dataset).inertia_) - - best = max_index(scores) + k_min - - for i in range(len(all_labels)): - prefix = '' - if (i + k_min == best): - prefix = '_BEST' - - write_to_csv(dataset, all_labels[i], f'{args.output_path}/kmeans_with_' + str(i + k_min) + prefix + '_clusters.tsv') - - - if (prefix == '_BEST'): - labels = all_labels[i] - predict = [x+1 for x in labels] - classe = (pd.DataFrame(list(zip(dataset.index, predict)))).astype(str) - classe.to_csv(best_cluster, sep = '\t', index = False, header = ['Patient_ID', 'Class']) - - - - - if silhouette: - silhouette_draw(dataset, all_labels[i], i + k_min, f'{args.output_path}/silhouette_with_' + str(i + k_min) + prefix + '_clusters.png') - - - if elbow: - elbow_plot(distortions, k_min,k_max) - - - - - -############################## elbow_plot #################################### -def elbow_plot (distortions: List[float], k_min: int, k_max: int) -> None: - """ - Generate an elbow plot to visualize the distortion for different numbers of clusters. - The elbow plot is a graphical tool used in clustering analysis to help identifying the appropriate number of clusters by looking for the point where the rate of decrease - in distortion sharply decreases, indicating the optimal balance between model complexity and clustering quality. - - Args: - distortions (list): List of distortion values for different numbers of clusters. - k_min (int): The minimum number of clusters considered. - k_max (int): The maximum number of clusters considered. - - Returns: - None - """ - plt.figure(0) - x = list(range(k_min, k_max + 1)) - x.insert(0, 1) - plt.plot(x, distortions, marker = 'o') - plt.xlabel('Number of clusters (k)') - plt.ylabel('Distortion') - s = f'{args.output_path}/elbow_plot.png' - fig = plt.gcf() - fig.set_size_inches(18.5, 10.5, forward = True) - fig.savefig(s, dpi=100) - - -############################## silhouette plot ############################### -def silhouette_draw(dataset: pd.DataFrame, labels: List[str], n_clusters: int, path:str) -> None: - """ - Generate a silhouette plot for the clustering results. - The silhouette coefficient is a measure used to evaluate the quality of clusters obtained from a clustering algorithmand it quantifies how similar an object is to its own cluster compared to other clusters. - The silhouette coefficient ranges from -1 to 1, where: - - A value close to +1 indicates that the object is well matched to its own cluster and poorly matched to neighboring clusters. This implies that the object is in a dense, well-separated cluster. - - A value close to 0 indicates that the object is close to the decision boundary between two neighboring clusters. - - A value close to -1 indicates that the object may have been assigned to the wrong cluster. - - Args: - dataset (pandas.DataFrame): The dataset used for clustering. - labels (list): The cluster labels assigned to each data point. - n_clusters (int): The number of clusters. - path (str): The path to save the silhouette plot image. - - Returns: - None - """ - if n_clusters == 1: - return None - - silhouette_avg = silhouette_score(dataset, labels) - warning("For n_clusters = " + str(n_clusters) + - " The average silhouette_score is: " + str(silhouette_avg)) - - plt.close('all') - # Create a subplot with 1 row and 2 columns - fig, (ax1) = plt.subplots(1, 1) - - fig.set_size_inches(18, 7) - - # The 1st subplot is the silhouette plot - # The silhouette coefficient can range from -1, 1 but in this example all - # lie within [-0.1, 1] - ax1.set_xlim([-1, 1]) - # The (n_clusters+1)*10 is for inserting blank space between silhouette - # plots of individual clusters, to demarcate them clearly. - ax1.set_ylim([0, len(dataset) + (n_clusters + 1) * 10]) - - # Compute the silhouette scores for each sample - sample_silhouette_values = silhouette_samples(dataset, labels) - - y_lower = 10 - for i in range(n_clusters): - # Aggregate the silhouette scores for samples belonging to - # cluster i, and sort them - ith_cluster_silhouette_values = \ - sample_silhouette_values[labels == i] - - ith_cluster_silhouette_values.sort() - - size_cluster_i = ith_cluster_silhouette_values.shape[0] - y_upper = y_lower + size_cluster_i - - color = cm.nipy_spectral(float(i) / n_clusters) - ax1.fill_betweenx(np.arange(y_lower, y_upper), - 0, ith_cluster_silhouette_values, - facecolor=color, edgecolor=color, alpha=0.7) - - # Label the silhouette plots with their cluster numbers at the middle - ax1.text(-0.05, y_lower + 0.5 * size_cluster_i, str(i)) - - # Compute the new y_lower for next plot - y_lower = y_upper + 10 # 10 for the 0 samples - - ax1.set_title("The silhouette plot for the various clusters.") - ax1.set_xlabel("The silhouette coefficient values") - ax1.set_ylabel("Cluster label") - - # The vertical line for average silhouette score of all the values - ax1.axvline(x=silhouette_avg, color="red", linestyle="--") - - ax1.set_yticks([]) # Clear the yaxis labels / ticks - ax1.set_xticks([-0.1, 0, 0.2, 0.4, 0.6, 0.8, 1]) - - - plt.suptitle(("Silhouette analysis for clustering on sample data " - "with n_clusters = " + str(n_clusters) + "\nAverage silhouette_score = " + str(silhouette_avg)), fontsize=12, fontweight='bold') - - - plt.savefig(path, bbox_inches='tight') - -######################## dbscan ############################################## -def dbscan(dataset: pd.DataFrame, eps: float, min_samples: float, best_cluster: str) -> None: - """ - Perform DBSCAN clustering on the given dataset, which is a clustering algorithm that groups together closely packed points based on the notion of density. - - Args: - dataset (pandas.DataFrame): The dataset to be clustered. - eps (float): The maximum distance between two samples for one to be considered as in the neighborhood of the other. - min_samples (float): The number of samples in a neighborhood for a point to be considered as a core point. - best_cluster (str): The file path to save the output of the best cluster. - - Returns: - None - """ - if not os.path.exists(args.output_path): - os.makedirs(args.output_path) - - if eps is not None: - clusterer = DBSCAN(eps = eps, min_samples = min_samples) - else: - clusterer = DBSCAN() - - clustering = clusterer.fit(dataset) - - core_samples_mask = np.zeros_like(clustering.labels_, dtype=bool) - core_samples_mask[clustering.core_sample_indices_] = True - labels = clustering.labels_ - - # Number of clusters in labels, ignoring noise if present. - n_clusters_ = len(set(labels)) - (1 if -1 in labels else 0) - - - labels = labels - predict = [x+1 for x in labels] - classe = (pd.DataFrame(list(zip(dataset.index, predict)))).astype(str) - classe.to_csv(best_cluster, sep = '\t', index = False, header = ['Patient_ID', 'Class']) - - -########################## hierachical ####################################### -def hierachical_agglomerative(dataset: pd.DataFrame, k_min: int, k_max: int, best_cluster: str, silhouette: str) -> None: - """ - Perform hierarchical agglomerative clustering on the given dataset. - - Args: - dataset (pandas.DataFrame): The dataset to be clustered. - k_min (int): The minimum number of clusters to consider. - k_max (int): The maximum number of clusters to consider. - best_cluster (str): The file path to save the output of the best cluster. - silhouette (str): Whether to generate silhouette plots ('True' or 'False'). - - Returns: - None - """ - if not os.path.exists(args.output_path): - os.makedirs(args.output_path) - - plt.figure(figsize=(10, 7)) - plt.title("Customer Dendograms") - shc.dendrogram(shc.linkage(dataset, method='ward'), labels=dataset.index.values.tolist()) - fig = plt.gcf() - fig.savefig(f'{args.output_path}/dendogram.png', dpi=200) - - range_n_clusters = [i for i in range(k_min, k_max+1)] - - scores = [] - labels = [] - - n_classi = dataset.shape[0] - - for n_clusters in range_n_clusters: - cluster = AgglomerativeClustering(n_clusters=n_clusters, affinity='euclidean', linkage='ward') - cluster.fit_predict(dataset) - cluster_labels = cluster.labels_ - labels.append(cluster_labels) - write_to_csv(dataset, cluster_labels, f'{args.output_path}/hierarchical_with_' + str(n_clusters) + '_clusters.tsv') - - best = max_index(scores) + k_min - - for i in range(len(labels)): - prefix = '' - if (i + k_min == best): - prefix = '_BEST' - if silhouette == 'true': - silhouette_draw(dataset, labels[i], i + k_min, f'{args.output_path}/silhouette_with_' + str(i + k_min) + prefix + '_clusters.png') - - for i in range(len(labels)): - if (i + k_min == best): - labels = labels[i] - predict = [x+1 for x in labels] - classe = (pd.DataFrame(list(zip(dataset.index, predict)))).astype(str) - classe.to_csv(best_cluster, sep = '\t', index = False, header = ['Patient_ID', 'Class']) - - -############################# main ########################################### -def main(args_in:List[str] = None) -> None: - """ - Initializes everything and sets the program in motion based on the fronted input arguments. - - Returns: - None - """ - global args - args = process_args(args_in) - - if not os.path.exists(args.output_path): - os.makedirs(args.output_path) - - #Data read - - X = read_dataset(args.input) - X = X.iloc[:, 1:] - X = pd.DataFrame.to_dict(X, orient='list') - X = rewrite_input(X) - X = pd.DataFrame.from_dict(X, orient = 'index') - - for i in X.columns: - if any(val is None or np.isnan(val) for val in X[i]): - X = X.drop(columns=[i]) - - if args.scaling == "true": - list_to_remove = [] - toll_std=1e-8 - for i in X.columns: - mean_i = X[i].mean() - std_i = X[i].std() - if std_i >toll_std: - #scaling with mean 0 and std 1 - X[i] = (X[i]-mean_i)/std_i - else: - #remove feature because std = 0 during clustering - list_to_remove.append(i) - if len(list_to_remove)>0: - X = X.drop(columns=list_to_remove) - - if args.k_max != None: - numero_classi = X.shape[0] - while args.k_max >= numero_classi: - err = 'Skipping k = ' + str(args.k_max) + ' since it is >= number of classes of dataset' - warning(err) - args.k_max = args.k_max - 1 - - - if args.cluster_type == 'kmeans': - kmeans(args.k_min, args.k_max, X, args.elbow, args.silhouette, args.best_cluster) - - if args.cluster_type == 'dbscan': - dbscan(X, args.eps, args.min_samples, args.best_cluster) - - if args.cluster_type == 'hierarchy': - hierachical_agglomerative(X, args.k_min, args.k_max, args.best_cluster, args.silhouette) - -############################################################################## -if __name__ == "__main__": - main()
