Mercurial > repos > onnodg > cdhit_analysis
view cdhit_analysis.py @ 2:706b7acdb230 draft
planemo upload for repository https://github.com/Onnodg/Naturalis_NLOOR/tree/main/NLOOR_scripts/process_clusters_tool commit c2020ecc91cea0c8cf7439180cf796743c838b4d-dirty
| author | onnodg |
|---|---|
| date | Tue, 21 Oct 2025 07:54:21 +0000 |
| parents | ff68835adb2b |
| children | c6981ea453ae |
line wrap: on
line source
""" This script processes cluster output files from cd-hit-est for use in Galaxy. It extracts cluster information, associates taxa and e-values from annotation files, performs statistical calculations, and generates text and plot outputs summarizing similarity and taxonomic distributions. Main steps: 1. Parse cd-hit-est cluster file and (optional) annotation file. 2. Process each cluster to extract similarity, taxa, and e-value information. 3. Aggregate results across clusters. 4. Generate requested outputs: text summaries, plots, and Excel reports. Note: Uses a non-interactive matplotlib backend (Agg) for compatibility with Galaxy. """ import argparse from collections import Counter, defaultdict import os import re import matplotlib.pyplot as plt import pandas as pd from math import sqrt import openpyxl def parse_arguments(args_list=None): """Parse command-line arguments for the script.""" parser = argparse.ArgumentParser( description='Create taxa analysis from cd-hit cluster files') parser.add_argument('--input_cluster', type=str, required=True, help='Input cluster file (.clstr)') parser.add_argument('--input_annotation', type=str, required=False, help='Input annotation file (.out)') # Galaxy output files parser.add_argument('--output_similarity_txt', type=str, help='Similarity text output file') parser.add_argument('--output_similarity_plot', type=str, help='Similarity plot output file') parser.add_argument('--output_evalue_txt', type=str, help='E-value text output file') parser.add_argument('--output_evalue_plot', type=str, help='E-value plot output file') parser.add_argument('--output_count', type=str, help='Count summary output file') parser.add_argument('--output_taxa_clusters', type=str, help='Taxa per cluster output file') parser.add_argument('--output_taxa_processed', type=str, help='Processed taxa output file') # Plot parameters parser.add_argument('--simi_plot_y_min', type=float, default=95.0, help='Minimum value of the y-axis in the similarity plot') parser.add_argument('--simi_plot_y_max', type=float, default=100.0, help='Maximum value of the y-axis in the similarity plot') # Uncertain taxa configuration parser.add_argument('--uncertain_taxa_use_ratio', type=float, default=0.5, help='Ratio at which uncertain taxa count toward the correct taxa') parser.add_argument('--min_to_split', type=float, default=0.45, help='Minimum percentage for taxonomic split') parser.add_argument('--min_count_to_split', type=int, default=10, help='Minimum count for taxonomic split') # Processing options parser.add_argument('--show_unannotated_clusters', action='store_true', default=False, help='Show unannotated clusters in output') parser.add_argument('--make_taxa_in_cluster_split', action='store_true', default=False, help='Split clusters with multiple taxa') parser.add_argument('--print_empty_files', action='store_true', default=False, help='Print messages about empty annotation files') return parser.parse_args(args_list) # Color map for plots COLORMAP = [ # List of RGBA tuples for bar coloring in plots (0.12156862745098039, 0.4666666666666667, 0.7058823529411765, 1.0), (1.0, 0.4980392156862745, 0.054901960784313725, 1.0), (0.17254901960784313, 0.6274509803921569, 0.17254901960784313, 1.0), (0.8392156862745098, 0.15294117647058825, 0.1568627450980392, 1.0), (0.5803921568627451, 0.403921568627451, 0.7411764705882353, 1.0), (0.5490196078431373, 0.33725490196078434, 0.29411764705882354, 1.0), (0.8901960784313725, 0.4666666666666667, 0.7607843137254902, 1.0), (0.4980392156862745, 0.4980392156862745, 0.4980392156862745, 1.0), (0.7372549019607844, 0.7411764705882353, 0.13333333333333333, 1.0), (0.09019607843137255, 0.7450980392156863, 0.8117647058823529, 1.0) ] def parse_cluster_file(cluster_file, annotation_file=None, print_empty=False, raise_on_error=False): """ Parse the cd-hit-est cluster file (.clstr) and (optionally) an Excel annotation file. It extracts cluster information (header, read count, similarity) and associates taxonomic information and E-values from the annotation file based on the read header. :param cluster_file: Path to cd-hit cluster file (.clstr). :type cluster_file: str :param annotation_file: Path to Excel annotation file with taxa and e-values. :type annotation_file: str, optional :param print_empty: Print a message if the annotation file is not found or empty. :type print_empty: bool :param raise_on_error: Raise parsing errors instead of printing warnings. :type raise_on_error: bool :return: List of clusters, where each cluster is a dict mapping read header to a dict of read info. :rtype: list[dict[str, dict]] :raises ValueError: If similarity cannot be parsed from a cluster line. :raises UnboundLocalError: If an error occurs during annotation processing. """ clusters = [] current_cluster = {} # Load annotations if provided annotations = {} if annotation_file and os.path.exists(annotation_file): # Lees het Excel-bestand df = pd.read_excel(annotation_file, sheet_name='Individual_Reads', engine='openpyxl') # Itereer over de rijen for _, row in df.iterrows(): header = row['header'] # kolomnaam zoals in Excel evalue = row['e_value'] # of de kolomnaam die je wilt gebruiken taxa = row['taxa'] # afhankelijk van hoe je taxa opslaat annotations[header] = {'evalue': evalue, 'taxa': taxa} elif annotation_file and print_empty: print(f"Annotation file {annotation_file} not found or empty") with open(cluster_file, 'r') as f: for line in f: line = line.strip() if line.startswith('>Cluster'): # Start of new cluster, save previous if exists if current_cluster: clusters.append(current_cluster) current_cluster = {} else: # Parse sequence line parts = line.split() if len(parts) >= 2: # Extract header and count header_part = parts[2].strip('>.') header_parts = header_part.split('(') if len(header_parts) > 1: last_part = header_parts[-1].strip(')') header = header_parts[0] if last_part: count = int(last_part) else: print('no count') count = 0 header = '' # Extract similarity similarity_part = parts[-1].strip() if '*' in similarity_part: similarity = 100.0 # Representative sequence else: matches = re.findall(r'[\d.]+', similarity_part) if matches: similarity = float(matches[-1]) else: raise ValueError(f"Could not parse similarity from: '{similarity_part}'") # Get annotation info try: if header in annotations: taxa = annotations[header]['taxa'] evalue = annotations[header]['evalue'] else: taxa = 'Unannotated read' evalue = 'Unannotated read' current_cluster[header] = { 'count': count, 'similarity': similarity, 'taxa': taxa, 'evalue': evalue } except UnboundLocalError as e: if raise_on_error: raise UnboundLocalError(str(e)) print(f"Error: {e}, No annotations found") # Add the last cluster if current_cluster: clusters.append(current_cluster) return clusters def process_cluster_data(cluster): """ Process a single cluster to extract E-value, similarity, and taxa data for all reads. Aggregates information from all reads in the cluster, storing read counts, E-values, similarities, and taxa in lists and a dictionary. :param cluster: Cluster data mapping read headers to read info. :type cluster: dict :return: A tuple containing: (list of E-values, list of similarity values, dict of taxa -> counts). The first element of the E-value list ([0]) stores the count of unannotated reads. :rtype: tuple[list[float | int], list[float], dict[str, int]] """ eval_list = [0.0] # First element for unannotated count simi_list = [] taxa_dict = {'Unannotated read': 0} for header, info in cluster.items(): count = info['count'] similarity = info['similarity'] taxa = info['taxa'] evalue = info['evalue'] if evalue == 'Unannotated read': eval_list[0] += count taxa_dict['Unannotated read'] += count else: try: eval_val = float(evalue) for _ in range(count): eval_list.append(eval_val) except ValueError: eval_list[0] += count if taxa not in taxa_dict: taxa_dict[taxa] = 0 taxa_dict[taxa] += count # Add similarity values for _ in range(count): simi_list.append(similarity) return eval_list, simi_list, taxa_dict def calculate_cluster_taxa(taxa_dict, args): """ Calculate the most likely taxa for a cluster based on read counts. This function applies the 'uncertain taxa use ratio' for unannotated reads and uses a recursive approach to potentially split a cluster into sub-clusters if taxonomic dominance is not strong enough (based on ``min_to_split`` and ``min_count_to_split``). :param taxa_dict: Mapping of taxa (full string) -> read counts. :type taxa_dict: dict[str, int] :param args: Parsed script arguments, including parameters for taxa calculation. :type args: argparse.Namespace :return: A list of refined taxa assignments (dictionaries), where each dictionary represents a potentially split sub-cluster. :rtype: list[dict[str, float | int]] """ # Replace 'Unannotated read' with uncertain taxa format processed_dict = {} for taxa, count in taxa_dict.items(): if taxa == 'Unannotated read': uncertain_taxa = ' / '.join(['Uncertain taxa'] * 7) processed_dict[uncertain_taxa] = count else: processed_dict[taxa] = count return _recursive_taxa_calculation(processed_dict, args, 0) def _recursive_taxa_calculation(taxa_dict, args, level): """ Recursive helper to calculate and potentially split taxa at each taxonomic level. :param taxa_dict: Taxa counts at the current level (or sub-group). :type taxa_dict: dict[str, float | int] :param args: Parsed script arguments. :type args: argparse.Namespace :param level: Index of the current taxonomic level (0=kingdom, max 6=species). :type level: int :return: List of refined taxa dictionaries. :rtype: list[dict[str, float | int]] """ if level >= 7: # Max 7 taxonomic levels return [taxa_dict] level_dict = defaultdict(float) # Group by taxonomic level for taxa, count in taxa_dict.items(): taxa_parts = taxa.split(' / ') if level < len(taxa_parts): level_taxon = taxa_parts[level] if level_taxon == 'Uncertain taxa': level_dict[level_taxon] += count * args.uncertain_taxa_use_ratio else: level_dict[level_taxon] += count if len(level_dict) <= 1: # Only one taxon at this level, continue to next level return _recursive_taxa_calculation(taxa_dict, args, level + 1) # Sort by abundance sorted_taxa = sorted(level_dict.items(), key=lambda x: x[1], reverse=True) result = [] dominant_taxon = sorted_taxa[0][0] # Check if we should split for i in range(1, len(sorted_taxa)): secondary_taxon = sorted_taxa[i][0] total_count = sorted_taxa[0][1] + sorted_taxa[i][1] ratio = sorted_taxa[i][1] / total_count if ratio >= args.min_to_split or total_count <= args.min_count_to_split: # Split off this taxon split_dict = {taxa: count for taxa, count in taxa_dict.items() if taxa.split(' / ')[level] == secondary_taxon} result.extend(_recursive_taxa_calculation(split_dict, args, level + 1)) # Process the dominant group dominant_dict = {taxa: count for taxa, count in taxa_dict.items() if taxa.split(' / ')[level] == dominant_taxon} result.extend(_recursive_taxa_calculation(dominant_dict, args, level + 1)) return result def write_similarity_output(all_simi_data, output_file): """ Write the similarity text output, including the mean and standard deviation, and a count per similarity percentage. :param all_simi_data: List of all similarity percentages from all reads. :type all_simi_data: list[float] :param output_file: Path to the output file. :type output_file: str :return: None :rtype: None """ if not all_simi_data: return avg = sum(all_simi_data) / len(all_simi_data) variance = sum((s - avg) ** 2 for s in all_simi_data) / len(all_simi_data) st_dev = sqrt(variance) simi_counter = Counter(all_simi_data) simi_sorted = sorted(simi_counter.items(), key=lambda x: -x[0]) with open(output_file, 'w') as f: f.write(f"# Average similarity: {avg:.2f}\n") f.write(f"# Standard deviation: {st_dev:.2f}\n") f.write("similarity\tcount\n") for similarity, count in simi_sorted: f.write(f"{similarity}\t{count}\n") def write_evalue_output(all_eval_data, output_file): """ Write the E-value text output, including the count of unannotated reads and a count per E-value. :param all_eval_data: List of E-values from all reads. The first element ([0]) is the count of unannotated reads. :type all_eval_data: list[float | int] :param output_file: Path to the output file. :type output_file: str :return: None :rtype: None """ unanno_count = all_eval_data[0] eval_counter = Counter(all_eval_data[1:]) with open(output_file, 'w') as f: f.write("evalue\tcount\n") if unanno_count > 0: f.write(f"unannotated\t{unanno_count}\n") eval_sorted = sorted(eval_counter.items(), key=lambda x: (-x[1], float(x[0]) if isinstance(x[0], (int, float)) else float('inf'))) for value, count in eval_sorted: f.write(f"{value}\t{count}\n") def write_count_output(all_eval_data, cluster_data_list, output_file): """ Write a summary of annotated and unannotated read counts per cluster and for the total sample. :param all_eval_data: List of E-values from all reads for the total count summary. :type all_eval_data: list[float | int] :param cluster_data_list: List of tuples (eval_list, simi_list, taxa_dict) per cluster. :type cluster_data_list: list[tuple] :param output_file: Path to the output file. :type output_file: str :return: None :rtype: None """ with open(output_file, 'w') as f: f.write("cluster\tunannotated\tannotated\ttotal\tperc_unannotated\tperc_annotated\n") for cluster_num, (eval_list, _, _) in enumerate(cluster_data_list): unannotated = eval_list[0] annotated = len(eval_list[1:]) total = unannotated + annotated if total > 0: perc_annotated = (annotated / total) * 100 perc_unannotated = (unannotated / total) * 100 else: perc_annotated = perc_unannotated = 0 f.write( f"{cluster_num}\t{unannotated}\t{annotated}\t{total}\t{perc_unannotated:.2f}\t{perc_annotated:.2f}\n") # Add full sample summary total_unannotated = all_eval_data[0] total_annotated = len(all_eval_data[1:]) grand_total = total_unannotated + total_annotated if grand_total > 0: perc_annotated = (total_annotated / grand_total) * 100 perc_unannotated = (total_unannotated / grand_total) * 100 else: perc_annotated = perc_unannotated = 0 f.write( f"TOTAL\t{total_unannotated}\t{total_annotated}\t{grand_total}\t{perc_unannotated:.2f}\t{perc_annotated:.2f}\n") def write_taxa_clusters_output(cluster_data_list, output_file): """ Write raw taxa information per cluster to an Excel file. Each row contains the cluster number, read count, the full taxa string, and the separate taxonomic levels (kingdom through species). :param cluster_data_list: List of tuples (eval_list, simi_list, taxa_dict) per cluster. :type cluster_data_list: list[tuple] :param output_file: Path to the output Excel file. :type output_file: str :return: None :rtype: None """ # Create main dataframe data = [] for cluster_num, (_, _, taxa_dict) in enumerate(cluster_data_list): for taxa, count in taxa_dict.items(): if count > 0: # Split taxa into taxonomic levels taxa_levels = taxa.split(' / ') if taxa else [] taxa_levels += ['Unannotated read'] * (7 - len(taxa_levels)) try: data.append({ 'cluster': cluster_num, 'count': count, 'taxa_full': taxa, 'kingdom': taxa_levels[0], 'phylum': taxa_levels[1], 'class': taxa_levels[2], 'order': taxa_levels[3], 'family': taxa_levels[4], 'genus': taxa_levels[5], 'species': taxa_levels[6] }) except IndexError as e: # Skip entries with incomplete taxonomic data print(f"Skipped entry in cluster {cluster_num}: incomplete taxonomic data for '{taxa}, error: {e}'") continue df = pd.DataFrame(data) # Write to Excel temp_path = output_file + ".xlsx" os.makedirs(os.path.dirname(temp_path), exist_ok=True) with pd.ExcelWriter(temp_path, engine='openpyxl') as writer: df.to_excel(writer, sheet_name='Raw_Taxa_Clusters', index=False, engine='openpyxl') os.replace(temp_path, output_file) def write_taxa_processed_output(cluster_data_list, args, output_file): """ Write processed (potentially split) taxa information to an Excel file. This file contains the resulting sub-clusters from the taxonomic dominance analysis and a separate sheet documenting the parameters used. :param cluster_data_list: List of tuples (eval_list, simi_list, taxa_dict) per cluster. :type cluster_data_list: list[tuple] :param args: Parsed script arguments, used for taxa calculation and settings documentation. :type args: argparse.Namespace :param output_file: Path to the output Excel file. :type output_file: str :return: None :rtype: None """ # Create main dataframe data = [] for cluster_num, (_, _, taxa_dict) in enumerate(cluster_data_list): processed_taxa = calculate_cluster_taxa(taxa_dict, args) for taxa_group in processed_taxa: for taxa, count in taxa_group.items(): if 'Uncertain taxa / Uncertain taxa / Uncertain taxa' in taxa: if args.show_unannotated_clusters: data.append({ 'cluster': cluster_num, 'count': count, 'taxa_full': 'Unannotated read', 'kingdom': 'Unannotated', 'phylum': 'Unannotated', 'class': 'Unannotated', 'order': 'Unannotated', 'family': 'Unannotated', 'genus': 'Unannotated', 'species': 'Unannotated' }) else: # Split taxa into taxonomic levels taxa_levels = taxa.split(' / ') if taxa else [] try: data.append({ 'cluster': cluster_num, 'count': count, 'taxa_full': taxa, 'kingdom': taxa_levels[0], 'phylum': taxa_levels[1], 'class': taxa_levels[2], 'order': taxa_levels[3], 'family': taxa_levels[4], 'genus': taxa_levels[5], 'species': taxa_levels[6] }) except IndexError: # Skip entries with incomplete taxonomic data print(f"Skipped entry in cluster {cluster_num}: incomplete taxonomic data for '{taxa}'") continue df = pd.DataFrame(data) # Create settings dataframe settings_data = [ ['uncertain_taxa_use_ratio', args.uncertain_taxa_use_ratio], ['min_to_split', args.min_to_split], ['min_count_to_split', args.min_count_to_split] ] settings_df = pd.DataFrame(settings_data, columns=['Parameter', 'Value']) # Write to Excel with multiple sheets temp_path = output_file + ".xlsx" os.makedirs(os.path.dirname(temp_path), exist_ok=True) with pd.ExcelWriter(temp_path, engine='openpyxl') as writer: df.to_excel(writer, sheet_name='Processed_Taxa_Clusters', index=False, engine='openpyxl') settings_df.to_excel(writer, sheet_name='Settings', index=False, engine='openpyxl') # Auto-adjust column widths for better readability for sheet_name in writer.sheets: worksheet = writer.sheets[sheet_name] for column in worksheet.columns: max_length = 0 column_letter = column[0].column_letter for cell in column: try: if len(str(cell.value)) > max_length: max_length = len(str(cell.value)) except: pass adjusted_width = min(max_length + 2, 50) # Cap at 50 characters worksheet.column_dimensions[column_letter].width = adjusted_width os.replace(temp_path, output_file) def create_similarity_plot(all_simi_data, cluster_simi_lengths, args, output_file): """ Create a bar plot showing the distribution of intra-cluster similarity values. The plot uses different colors to distinguish reads belonging to different clusters. :param all_simi_data: List of all similarity values, sorted descending. :type all_simi_data: list[float] :param cluster_simi_lengths: List of lengths of similarity data per cluster, used for coloring. :type cluster_simi_lengths: list[int] :param args: Parsed script arguments, used for plot y-limits. :type args: argparse.Namespace :param output_file: Path to the output plot file (e.g., .png). :type output_file: str :return: None :rtype: None """ if not all_simi_data: return sorted_simi_list = sorted(all_simi_data, reverse=True) bar_positions = list(range(len(sorted_simi_list))) # Create colormap for different clusters colormap_full = [] for i, length in enumerate(cluster_simi_lengths): color = COLORMAP[i % len(COLORMAP)] colormap_full.extend([color] * length) plt.figure(figsize=(12, 6)) plt.bar(bar_positions, sorted_simi_list, width=1, color=colormap_full) plt.grid(axis='y', linestyle='--', color='gray', alpha=0.7) plt.ylabel("Similarity (%)") plt.xlabel("Reads") plt.title("Intra-cluster Similarity Distribution") plt.ylim(ymin=args.simi_plot_y_min, ymax=args.simi_plot_y_max) plt.tight_layout() plt.savefig(output_file, format='png', dpi=300, bbox_inches='tight') plt.close() def create_evalue_plot(all_eval_data, cluster_eval_lengths, output_file): """ Create a bar plot showing the distribution of E-values. The y-axis is log-scaled and displays ``1/E-values``. Reads are ordered by E-value (ascending). The plot uses different colors to distinguish reads belonging to different clusters. :param all_eval_data: List of E-values from all reads. Assumes E-values start at index 1. :type all_eval_data: list[float | int] :param cluster_eval_lengths: List of lengths of annotated E-value data per cluster, used for coloring. :type cluster_eval_lengths: list[int] :param output_file: Path to the output plot file (e.g., .png). :type output_file: str :return: None :rtype: None """ if len(all_eval_data) <= 1: # Only unannotated reads return sorted_eval_list = sorted(all_eval_data[1:]) # Skip unannotated count if not sorted_eval_list: return bar_positions = list(range(len(sorted_eval_list))) bar_heights = [1 / e if e > 0 else 0 for e in sorted_eval_list] # Create colormap for different clusters colormap_full = [] for i, length in enumerate(cluster_eval_lengths): color = COLORMAP[i % len(COLORMAP)] colormap_full.extend([color] * length) plt.figure(figsize=(12, 6)) plt.bar(bar_positions, bar_heights, width=1, color=colormap_full) plt.yscale('log') plt.grid(axis='y', linestyle='--', color='gray', alpha=0.7) plt.ylabel("1/E-values") plt.xlabel("Reads") plt.title("E-value Distribution") plt.tight_layout() plt.savefig(output_file, format='png', dpi=300, bbox_inches='tight') plt.close() def prepare_evalue_histogram(evalue_list, unannotated_list): """ Generate histogram data for E-value distributions. This function processes a list of E-values from BLAST or similar search results, filters out invalid or zero entries, and computes histogram data suitable for plotting. The histogram represents the frequency distribution of E-values across all annotated hits. :param evalue_list: List of E-values from BLAST hits :type evalue_list: list[float | int] :param unannotated_list: List of unannotated E-values :type unannotated_list: list :return: Tuple containing: - **counts** (*numpy.ndarray*): Number of entries per histogram bin. - **bins** (*numpy.ndarray*): Bin edges corresponding to the histogram. Returns ``(None, None)`` if no valid data is available. :rtype: tuple[numpy.ndarray, numpy.ndarray] | tuple[None, None] :note: - Only positive numeric E-values are included in the histogram. - Uses 50 bins in the range (0, 1) for visualization consistency. """ data = [ev for ev in evalue_list if isinstance(ev, (int, float)) and ev > 0] if not data: return None, None counts, bins, _ = plt.hist(data, bins=50, range=(0, 1)) plt.close() return counts, bins def create_evalue_plot_test(evalue_list, unannotated_list, output_file): """ Create and save an E-value distribution plot, returning the computed histogram data. This function visualizes the frequency distribution of E-values from BLAST or annotation results. It saves the plot to the specified output file and returns the histogram data (counts and bins) for testing with pytests. :param evalue_list: List of numeric E-values to plot :type evalue_list: list[float | int] :param unannotated_list: Optional list of E-values for unannotated sequences. :type unannotated_list: list :param output_file: Path where the histogram image will be saved. :type output_file: str :return: Tuple containing: - **counts** (*numpy.ndarray*): Frequency counts per histogram bin. - **bins** (*numpy.ndarray*): Histogram bin edges. Returns ``(None, None)`` if no valid data was available for plotting. :rtype: tuple[numpy.ndarray, numpy.ndarray] | tuple[None, None] """ counts, bins = prepare_evalue_histogram(evalue_list, unannotated_list) if counts is None: return None, None plt.hist([ev for ev in evalue_list if isinstance(ev, (int, float)) and ev > 0], bins=50, range=(0, 1)) plt.xlabel("E-value") plt.ylabel("Frequency") plt.title("E-value Distribution") plt.savefig(output_file) plt.close() return counts, bins def main(arg_list=None): """ Main entry point of the script. Parses arguments, processes cd-hit cluster data, aggregates results, and generates requested outputs (text summaries, plots, and Excel reports). :param arg_list: List of arguments for testing purposes. :type arg_list: list, optional :return: None :rtype: None """ args = parse_arguments(arg_list) # Parse cluster file clusters = parse_cluster_file( args.input_cluster, args.input_annotation, args.print_empty_files ) # Process each cluster all_eval_data = [0] # For full sample statistics all_simi_data = [] cluster_eval_lengths = [] cluster_simi_lengths = [] cluster_data_list = [] for cluster in clusters: eval_list, simi_list, taxa_dict = process_cluster_data(cluster) cluster_data_list.append((eval_list, simi_list, taxa_dict)) # Collect data for full sample plots all_eval_data[0] += eval_list[0] if len(eval_list) > 1: all_eval_data.extend(sorted(eval_list[1:])) cluster_eval_lengths.append(len(eval_list[1:])) if simi_list: all_simi_data.extend(sorted(simi_list, reverse=True)) cluster_simi_lengths.append(len(simi_list)) # Generate outputs based on what was requested if args.output_similarity_txt: write_similarity_output(all_simi_data, args.output_similarity_txt) if args.output_similarity_plot and all_simi_data: create_similarity_plot(all_simi_data, cluster_simi_lengths, args, args.output_similarity_plot) if args.output_evalue_txt: write_evalue_output(all_eval_data, args.output_evalue_txt) if args.output_evalue_plot and len(all_eval_data) > 1: create_evalue_plot(all_eval_data, cluster_eval_lengths, args.output_evalue_plot) if args.output_count: write_count_output(all_eval_data, cluster_data_list, args.output_count) if args.output_taxa_clusters: write_taxa_clusters_output(cluster_data_list, args.output_taxa_clusters) if args.output_taxa_processed: write_taxa_processed_output(cluster_data_list, args, args.output_taxa_processed) print(f"Processing complete. Processed {len(clusters)} clusters.") if __name__ == "__main__": main()
