Mercurial > repos > bimib > cobraxy
view COBRAxy/marea_cluster.py @ 139:74b383211ab5 draft
Uploaded
| author | luca_milaz | 
|---|---|
| date | Thu, 31 Oct 2024 18:52:16 +0000 | 
| parents | 41f35c2f0c7b | 
| children | 3fca9b568faf | 
line wrap: on
 line source
# -*- coding: utf-8 -*- """ Created on Mon Jun 3 19:51:00 2019 @author: Narger """ import sys import argparse import os import numpy as np import pandas as pd from sklearn.datasets import make_blobs from sklearn.cluster import KMeans, DBSCAN, AgglomerativeClustering from sklearn.metrics import silhouette_samples, silhouette_score, cluster import matplotlib matplotlib.use('agg') import matplotlib.pyplot as plt import scipy.cluster.hierarchy as shc import matplotlib.cm as cm from typing import Optional, Dict, List ################################# process args ############################### def process_args(args :List[str]) -> argparse.Namespace: """ Processes command-line arguments. Args: args (list): List of command-line arguments. Returns: Namespace: An object containing parsed arguments. """ parser = argparse.ArgumentParser(usage = '%(prog)s [options]', description = 'process some value\'s' + ' genes to create class.') parser.add_argument('-ol', '--out_log', help = "Output log") parser.add_argument('-in', '--input', type = str, help = 'input dataset') parser.add_argument('-cy', '--cluster_type', type = str, choices = ['kmeans', 'dbscan', 'hierarchy'], default = 'kmeans', help = 'choose clustering algorythm') parser.add_argument('-k1', '--k_min', type = int, default = 2, help = 'choose minimun cluster number to be generated') parser.add_argument('-k2', '--k_max', type = int, default = 7, help = 'choose maximum cluster number to be generated') parser.add_argument('-el', '--elbow', type = str, default = 'false', choices = ['true', 'false'], help = 'choose if you want to generate an elbow plot for kmeans') parser.add_argument('-si', '--silhouette', type = str, default = 'false', choices = ['true', 'false'], help = 'choose if you want silhouette plots') parser.add_argument('-td', '--tool_dir', type = str, required = True, help = 'your tool directory') parser.add_argument('-ms', '--min_samples', type = float, help = 'min samples for dbscan (optional)') parser.add_argument('-ep', '--eps', type = float, help = 'eps for dbscan (optional)') parser.add_argument('-bc', '--best_cluster', type = str, help = 'output of best cluster tsv') args = parser.parse_args() return args ########################### warning ########################################### def warning(s :str) -> None: """ Log a warning message to an output log file and print it to the console. Args: s (str): The warning message to be logged and printed. Returns: None """ args = process_args(sys.argv) with open(args.out_log, 'a') as log: log.write(s + "\n\n") print(s) ########################## read dataset ###################################### def read_dataset(dataset :str) -> pd.DataFrame: """ Read dataset from a CSV file and return it as a Pandas DataFrame. Args: dataset (str): the path to the dataset to convert into a DataFrame Returns: pandas.DataFrame: The dataset loaded as a Pandas DataFrame. Raises: pandas.errors.EmptyDataError: If the dataset file is empty. sys.exit: If the dataset file has the wrong format (e.g., fewer than 2 columns) """ try: dataset = pd.read_csv(dataset, sep = '\t', header = 0) except pd.errors.EmptyDataError: sys.exit('Execution aborted: wrong format of dataset\n') if len(dataset.columns) < 2: sys.exit('Execution aborted: wrong format of dataset\n') return dataset ############################ rewrite_input ################################### def rewrite_input(dataset :pd.DataFrame) -> Dict[str, List[Optional[float]]]: """ Rewrite the dataset as a dictionary of lists instead of as a dictionary of dictionaries. Args: dataset (pandas.DataFrame): The dataset to be rewritten. Returns: dict: The rewritten dataset as a dictionary of lists. """ #Riscrivo il dataset come dizionario di liste, #non come dizionario di dizionari dataset.pop('Reactions', None) for key, val in dataset.items(): l = [] for i in val: if i == 'None': l.append(None) else: l.append(float(i)) dataset[key] = l return dataset ############################## write to csv ################################## def write_to_csv (dataset :pd.DataFrame, labels :List[str], name :str) -> None: """ Write dataset and predicted labels to a CSV file. Args: dataset (pandas.DataFrame): The dataset to be written. labels (list): The predicted labels for each data point. name (str): The name of the output CSV file. Returns: None """ #labels = predict predict = [x+1 for x in labels] classe = (pd.DataFrame(list(zip(dataset.index, predict)))).astype(str) dest = name classe.to_csv(dest, sep = '\t', index = False, header = ['Patient_ID', 'Class']) ########################### trova il massimo in lista ######################## def max_index (lista :List[int]) -> int: """ Find the index of the maximum value in a list. Args: lista (list): The list in which we search for the index of the maximum value. Returns: int: The index of the maximum value in the list. """ best = -1 best_index = 0 for i in range(len(lista)): if lista[i] > best: best = lista [i] best_index = i return best_index ################################ kmeans ##################################### def kmeans (k_min: int, k_max: int, dataset: pd.DataFrame, elbow: str, silhouette: str, best_cluster: str) -> None: """ Perform k-means clustering on the given dataset, which is an algorithm used to partition a dataset into groups (clusters) based on their characteristics. The goal is to divide the data into homogeneous groups, where the elements within each group are similar to each other and different from the elements in other groups. Args: k_min (int): The minimum number of clusters to consider. k_max (int): The maximum number of clusters to consider. dataset (pandas.DataFrame): The dataset to perform clustering on. elbow (str): Whether to generate an elbow plot for kmeans ('true' or 'false'). silhouette (str): Whether to generate silhouette plots ('true' or 'false'). best_cluster (str): The file path to save the output of the best cluster. Returns: None """ if not os.path.exists('clustering'): os.makedirs('clustering') if elbow == 'true': elbow = True else: elbow = False if silhouette == 'true': silhouette = True else: silhouette = False range_n_clusters = [i for i in range(k_min, k_max+1)] distortions = [] scores = [] all_labels = [] clusterer = KMeans(n_clusters=1, random_state=10) distortions.append(clusterer.fit(dataset).inertia_) for n_clusters in range_n_clusters: clusterer = KMeans(n_clusters=n_clusters, random_state=10) cluster_labels = clusterer.fit_predict(dataset) all_labels.append(cluster_labels) if n_clusters == 1: silhouette_avg = 0 else: silhouette_avg = silhouette_score(dataset, cluster_labels) scores.append(silhouette_avg) distortions.append(clusterer.fit(dataset).inertia_) best = max_index(scores) + k_min for i in range(len(all_labels)): prefix = '' if (i + k_min == best): prefix = '_BEST' write_to_csv(dataset, all_labels[i], 'clustering/kmeans_with_' + str(i + k_min) + prefix + '_clusters.tsv') if (prefix == '_BEST'): labels = all_labels[i] predict = [x+1 for x in labels] classe = (pd.DataFrame(list(zip(dataset.index, predict)))).astype(str) classe.to_csv(best_cluster, sep = '\t', index = False, header = ['Patient_ID', 'Class']) if silhouette: silhouette_draw(dataset, all_labels[i], i + k_min, 'clustering/silhouette_with_' + str(i + k_min) + prefix + '_clusters.png') if elbow: elbow_plot(distortions, k_min,k_max) ############################## elbow_plot #################################### def elbow_plot (distortions: List[float], k_min: int, k_max: int) -> None: """ Generate an elbow plot to visualize the distortion for different numbers of clusters. The elbow plot is a graphical tool used in clustering analysis to help identifying the appropriate number of clusters by looking for the point where the rate of decrease in distortion sharply decreases, indicating the optimal balance between model complexity and clustering quality. Args: distortions (list): List of distortion values for different numbers of clusters. k_min (int): The minimum number of clusters considered. k_max (int): The maximum number of clusters considered. Returns: None """ plt.figure(0) x = list(range(k_min, k_max + 1)) x.insert(0, 1) plt.plot(x, distortions, marker = 'o') plt.xlabel('Number of clusters (k)') plt.ylabel('Distortion') s = 'clustering/elbow_plot.png' fig = plt.gcf() fig.set_size_inches(18.5, 10.5, forward = True) fig.savefig(s, dpi=100) ############################## silhouette plot ############################### def silhouette_draw(dataset: pd.DataFrame, labels: List[str], n_clusters: int, path:str) -> None: """ Generate a silhouette plot for the clustering results. The silhouette coefficient is a measure used to evaluate the quality of clusters obtained from a clustering algorithmand it quantifies how similar an object is to its own cluster compared to other clusters. The silhouette coefficient ranges from -1 to 1, where: - A value close to +1 indicates that the object is well matched to its own cluster and poorly matched to neighboring clusters. This implies that the object is in a dense, well-separated cluster. - A value close to 0 indicates that the object is close to the decision boundary between two neighboring clusters. - A value close to -1 indicates that the object may have been assigned to the wrong cluster. Args: dataset (pandas.DataFrame): The dataset used for clustering. labels (list): The cluster labels assigned to each data point. n_clusters (int): The number of clusters. path (str): The path to save the silhouette plot image. Returns: None """ if n_clusters == 1: return None silhouette_avg = silhouette_score(dataset, labels) warning("For n_clusters = " + str(n_clusters) + " The average silhouette_score is: " + str(silhouette_avg)) plt.close('all') # Create a subplot with 1 row and 2 columns fig, (ax1) = plt.subplots(1, 1) fig.set_size_inches(18, 7) # The 1st subplot is the silhouette plot # The silhouette coefficient can range from -1, 1 but in this example all # lie within [-0.1, 1] ax1.set_xlim([-1, 1]) # The (n_clusters+1)*10 is for inserting blank space between silhouette # plots of individual clusters, to demarcate them clearly. ax1.set_ylim([0, len(dataset) + (n_clusters + 1) * 10]) # Compute the silhouette scores for each sample sample_silhouette_values = silhouette_samples(dataset, labels) y_lower = 10 for i in range(n_clusters): # Aggregate the silhouette scores for samples belonging to # cluster i, and sort them ith_cluster_silhouette_values = \ sample_silhouette_values[labels == i] ith_cluster_silhouette_values.sort() size_cluster_i = ith_cluster_silhouette_values.shape[0] y_upper = y_lower + size_cluster_i color = cm.nipy_spectral(float(i) / n_clusters) ax1.fill_betweenx(np.arange(y_lower, y_upper), 0, ith_cluster_silhouette_values, facecolor=color, edgecolor=color, alpha=0.7) # Label the silhouette plots with their cluster numbers at the middle ax1.text(-0.05, y_lower + 0.5 * size_cluster_i, str(i)) # Compute the new y_lower for next plot y_lower = y_upper + 10 # 10 for the 0 samples ax1.set_title("The silhouette plot for the various clusters.") ax1.set_xlabel("The silhouette coefficient values") ax1.set_ylabel("Cluster label") # The vertical line for average silhouette score of all the values ax1.axvline(x=silhouette_avg, color="red", linestyle="--") ax1.set_yticks([]) # Clear the yaxis labels / ticks ax1.set_xticks([-0.1, 0, 0.2, 0.4, 0.6, 0.8, 1]) plt.suptitle(("Silhouette analysis for clustering on sample data " "with n_clusters = " + str(n_clusters) + "\nAverage silhouette_score = " + str(silhouette_avg)), fontsize=12, fontweight='bold') plt.savefig(path, bbox_inches='tight') ######################## dbscan ############################################## def dbscan(dataset: pd.DataFrame, eps: float, min_samples: float, best_cluster: str) -> None: """ Perform DBSCAN clustering on the given dataset, which is a clustering algorithm that groups together closely packed points based on the notion of density. Args: dataset (pandas.DataFrame): The dataset to be clustered. eps (float): The maximum distance between two samples for one to be considered as in the neighborhood of the other. min_samples (float): The number of samples in a neighborhood for a point to be considered as a core point. best_cluster (str): The file path to save the output of the best cluster. Returns: None """ if not os.path.exists('clustering'): os.makedirs('clustering') if eps is not None: clusterer = DBSCAN(eps = eps, min_samples = min_samples) else: clusterer = DBSCAN() clustering = clusterer.fit(dataset) core_samples_mask = np.zeros_like(clustering.labels_, dtype=bool) core_samples_mask[clustering.core_sample_indices_] = True labels = clustering.labels_ # Number of clusters in labels, ignoring noise if present. n_clusters_ = len(set(labels)) - (1 if -1 in labels else 0) labels = labels predict = [x+1 for x in labels] classe = (pd.DataFrame(list(zip(dataset.index, predict)))).astype(str) classe.to_csv(best_cluster, sep = '\t', index = False, header = ['Patient_ID', 'Class']) ########################## hierachical ####################################### def hierachical_agglomerative(dataset: pd.DataFrame, k_min: int, k_max: int, best_cluster: str, silhouette: str) -> None: """ Perform hierarchical agglomerative clustering on the given dataset. Args: dataset (pandas.DataFrame): The dataset to be clustered. k_min (int): The minimum number of clusters to consider. k_max (int): The maximum number of clusters to consider. best_cluster (str): The file path to save the output of the best cluster. silhouette (str): Whether to generate silhouette plots ('true' or 'false'). Returns: None """ if not os.path.exists('clustering'): os.makedirs('clustering') plt.figure(figsize=(10, 7)) plt.title("Customer Dendograms") shc.dendrogram(shc.linkage(dataset, method='ward'), labels=dataset.index.values.tolist()) fig = plt.gcf() fig.savefig('clustering/dendogram.png', dpi=200) range_n_clusters = [i for i in range(k_min, k_max+1)] scores = [] labels = [] n_classi = dataset.shape[0] for n_clusters in range_n_clusters: cluster = AgglomerativeClustering(n_clusters=n_clusters, affinity='euclidean', linkage='ward') cluster.fit_predict(dataset) cluster_labels = cluster.labels_ labels.append(cluster_labels) write_to_csv(dataset, cluster_labels, 'clustering/hierarchical_with_' + str(n_clusters) + '_clusters.tsv') best = max_index(scores) + k_min for i in range(len(labels)): prefix = '' if (i + k_min == best): prefix = '_BEST' if silhouette == 'true': silhouette_draw(dataset, labels[i], i + k_min, 'clustering/silhouette_with_' + str(i + k_min) + prefix + '_clusters.png') for i in range(len(labels)): if (i + k_min == best): labels = labels[i] predict = [x+1 for x in labels] classe = (pd.DataFrame(list(zip(dataset.index, predict)))).astype(str) classe.to_csv(best_cluster, sep = '\t', index = False, header = ['Patient_ID', 'Class']) ############################# main ########################################### def main() -> None: """ Initializes everything and sets the program in motion based on the fronted input arguments. Returns: None """ if not os.path.exists('clustering'): os.makedirs('clustering') args = process_args(sys.argv) #Data read X = read_dataset(args.input) X = pd.DataFrame.to_dict(X, orient='list') X = rewrite_input(X) X = pd.DataFrame.from_dict(X, orient = 'index') for i in X.columns: tmp = X[i][0] if tmp == None: X = X.drop(columns=[i]) ## NAN TO HANLDE if args.k_max != None: numero_classi = X.shape[0] while args.k_max >= numero_classi: err = 'Skipping k = ' + str(args.k_max) + ' since it is >= number of classes of dataset' warning(err) args.k_max = args.k_max - 1 if args.cluster_type == 'kmeans': kmeans(args.k_min, args.k_max, X, args.elbow, args.silhouette, args.best_cluster) if args.cluster_type == 'dbscan': dbscan(X, args.eps, args.min_samples, args.best_cluster) if args.cluster_type == 'hierarchy': hierachical_agglomerative(X, args.k_min, args.k_max, args.best_cluster, args.silhouette) ############################################################################## if __name__ == "__main__": main()
