diff cdhit_analysis.py @ 0:00d56396b32a draft

planemo upload for repository https://github.com/Onnodg/Naturalis_NLOOR/tree/main/NLOOR_scripts/process_clusters_tool commit c944fd5685f295acba06679e85b67973c173b137
author onnodg
date Tue, 14 Oct 2025 09:09:46 +0000
parents
children ff68835adb2b
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/cdhit_analysis.py	Tue Oct 14 09:09:46 2025 +0000
@@ -0,0 +1,787 @@
+#!/usr/bin/env python3
+
+import argparse
+import os
+import re
+from collections import Counter, defaultdict
+from math import sqrt
+import pandas as pd
+import matplotlib
+
+matplotlib.use('Agg')  # Non-interactive backend for Galaxy
+import matplotlib.pyplot as plt
+
+"""
+This script processes cluster output files from cd-hit-est for use in Galaxy.
+It extracts cluster information, associates taxa and e-values from annotation files,
+performs statistical calculations, and generates text and plot outputs
+summarizing similarity and taxonomic distributions.
+
+
+Main steps:
+1. Parse cd-hit-est cluster file and (optional) annotation file.
+2. Process each cluster to extract similarity, taxa, and e-value information.
+3. Aggregate results across clusters.
+4. Generate requested outputs: text summaries, plots, and Excel reports.
+
+
+Note: Uses a non-interactive matplotlib backend (Agg) for compatibility with Galaxy.
+"""
+
+
+def parse_arguments(args_list=None):
+    """Parse command-line arguments for the script."""
+    parser = argparse.ArgumentParser(
+        description='Create taxa analysis from cd-hit cluster files')
+    parser.add_argument('--input_cluster', type=str, required=True,
+                        help='Input cluster file (.clstr)')
+    parser.add_argument('--input_annotation', type=str, required=False,
+                        help='Input annotation file (.out)')
+
+    # Galaxy output files
+    parser.add_argument('--output_similarity_txt', type=str,
+                        help='Similarity text output file')
+    parser.add_argument('--output_similarity_plot', type=str,
+                        help='Similarity plot output file')
+    parser.add_argument('--output_evalue_txt', type=str,
+                        help='E-value text output file')
+    parser.add_argument('--output_evalue_plot', type=str,
+                        help='E-value plot output file')
+    parser.add_argument('--output_count', type=str,
+                        help='Count summary output file')
+    parser.add_argument('--output_taxa_clusters', type=str,
+                        help='Taxa per cluster output file')
+    parser.add_argument('--output_taxa_processed', type=str,
+                        help='Processed taxa output file')
+    # Plot parameters
+    parser.add_argument('--simi_plot_y_min', type=float, default=95.0,
+                        help='Minimum value of the y-axis in the similarity plot')
+    parser.add_argument('--simi_plot_y_max', type=float, default=100.0,
+                        help='Maximum value of the y-axis in the similarity plot')
+
+    # Uncertain taxa configuration
+    parser.add_argument('--uncertain_taxa_use_ratio', type=float, default=0.5,
+                        help='Ratio at which uncertain taxa count toward the correct taxa')
+    parser.add_argument('--min_to_split', type=float, default=0.45,
+                        help='Minimum percentage for taxonomic split')
+    parser.add_argument('--min_count_to_split', type=int, default=10,
+                        help='Minimum count for taxonomic split')
+
+    # Processing options
+    parser.add_argument('--show_unannotated_clusters', action='store_true', default=False,
+                        help='Show unannotated clusters in output')
+    parser.add_argument('--make_taxa_in_cluster_split', action='store_true', default=False,
+                        help='Split clusters with multiple taxa')
+    parser.add_argument('--print_empty_files', action='store_true', default=False,
+                        help='Print messages about empty annotation files')
+
+    return parser.parse_args(args_list)
+
+
+# Color map for plots
+COLORMAP = [
+# List of RGBA tuples for bar coloring in plots
+    (0.12156862745098039, 0.4666666666666667, 0.7058823529411765, 1.0),
+    (1.0, 0.4980392156862745, 0.054901960784313725, 1.0),
+    (0.17254901960784313, 0.6274509803921569, 0.17254901960784313, 1.0),
+    (0.8392156862745098, 0.15294117647058825, 0.1568627450980392, 1.0),
+    (0.5803921568627451, 0.403921568627451, 0.7411764705882353, 1.0),
+    (0.5490196078431373, 0.33725490196078434, 0.29411764705882354, 1.0),
+    (0.8901960784313725, 0.4666666666666667, 0.7607843137254902, 1.0),
+    (0.4980392156862745, 0.4980392156862745, 0.4980392156862745, 1.0),
+    (0.7372549019607844, 0.7411764705882353, 0.13333333333333333, 1.0),
+    (0.09019607843137255, 0.7450980392156863, 0.8117647058823529, 1.0)
+]
+
+
+def parse_cluster_file(cluster_file, annotation_file=None, print_empty=False, raise_on_error=False):
+    """
+    Parse the cd-hit-est cluster file (.clstr) and (optionally) an Excel annotation file.
+
+    It extracts cluster information (header, read count, similarity) and associates
+    taxonomic information and E-values from the annotation file based on the read header.
+
+    :param cluster_file: Path to cd-hit cluster file (.clstr).
+    :type cluster_file: str
+    :param annotation_file: Path to Excel annotation file with taxa and e-values.
+    :type annotation_file: str, optional
+    :param print_empty: Print a message if the annotation file is not found or empty.
+    :type print_empty: bool
+    :param raise_on_error: Raise parsing errors instead of printing warnings.
+    :type raise_on_error: bool
+    :return: List of clusters, where each cluster is a dict mapping read header to a dict of read info.
+    :rtype: list[dict[str, dict]]
+    :raises ValueError: If similarity cannot be parsed from a cluster line.
+    :raises UnboundLocalError: If an error occurs during annotation processing.
+    """
+    clusters = []
+    current_cluster = {}
+
+    # Load annotations if provided
+    annotations = {}
+    if annotation_file and os.path.exists(annotation_file):
+        # Lees het Excel-bestand
+        df = pd.read_excel(annotation_file, sheet_name='Individual_Reads', engine='openpyxl')
+
+        # Itereer over de rijen
+        for _, row in df.iterrows():
+            header = row['header']  # kolomnaam zoals in Excel
+            evalue = row['e_value']  # of de kolomnaam die je wilt gebruiken
+            taxa = row['taxa']  # afhankelijk van hoe je taxa opslaat
+
+            annotations[header] = {'evalue': evalue, 'taxa': taxa}
+    elif annotation_file and print_empty:
+        print(f"Annotation file {annotation_file} not found or empty")
+    with open(cluster_file, 'r') as f:
+        for line in f:
+            line = line.strip()
+            if line.startswith('>Cluster'):
+                # Start of new cluster, save previous if exists
+                if current_cluster:
+                    clusters.append(current_cluster)
+                    current_cluster = {}
+            else:
+                # Parse sequence line
+                parts = line.split()
+                if len(parts) >= 2:
+                    # Extract header and count
+                    header_part = parts[2].strip('>.')
+                    header_parts = header_part.split('(')
+                    if len(header_parts) > 1:
+                        last_part = header_parts[-1].strip(')')
+                        header = header_parts[0]
+                        if last_part:
+                            count = int(last_part)
+                        else:
+                            print('no count')
+                            count = 0
+                            header = ''
+                    # Extract similarity
+                    similarity_part = parts[-1].strip()
+                    if '*' in similarity_part:
+                        similarity = 100.0  # Representative sequence
+                    else:
+                        matches = re.findall(r'[\d.]+', similarity_part)
+                        if matches:
+                            similarity = float(matches[-1])
+                        else:
+                            raise ValueError(f"Could not parse similarity from: '{similarity_part}'")
+                    # Get annotation info
+                    try:
+                        if header in annotations:
+                            taxa = annotations[header]['taxa']
+                            evalue = annotations[header]['evalue']
+                        else:
+                            taxa = 'Unannotated read'
+                            evalue = 'Unannotated read'
+
+                        current_cluster[header] = {
+                            'count': count,
+                            'similarity': similarity,
+                            'taxa': taxa,
+                            'evalue': evalue
+                        }
+                    except UnboundLocalError as e:
+                        if raise_on_error:
+                            raise UnboundLocalError(str(e))
+                        print(f"Error: {e}, No annotations found")
+
+    # Add the last cluster
+    if current_cluster:
+        clusters.append(current_cluster)
+
+    return clusters
+
+
+def process_cluster_data(cluster):
+    """
+    Process a single cluster to extract E-value, similarity, and taxa data for all reads.
+
+    Aggregates information from all reads in the cluster, storing read counts,
+    E-values, similarities, and taxa in lists and a dictionary.
+
+    :param cluster: Cluster data mapping read headers to read info.
+    :type cluster: dict
+    :return: A tuple containing: (list of E-values, list of similarity values, dict of taxa -> counts).
+             The first element of the E-value list ([0]) stores the count of unannotated reads.
+    :rtype: tuple[list[float | int], list[float], dict[str, int]]
+    """
+    eval_list = [0.0]  # First element for unannotated count
+    simi_list = []
+    taxa_dict = {'Unannotated read': 0}
+
+    for header, info in cluster.items():
+        count = info['count']
+        similarity = info['similarity']
+        taxa = info['taxa']
+        evalue = info['evalue']
+
+        if evalue == 'Unannotated read':
+            eval_list[0] += count
+            taxa_dict['Unannotated read'] += count
+        else:
+            try:
+                eval_val = float(evalue)
+                for _ in range(count):
+                    eval_list.append(eval_val)
+            except ValueError:
+                eval_list[0] += count
+
+            if taxa not in taxa_dict:
+                taxa_dict[taxa] = 0
+            taxa_dict[taxa] += count
+
+        # Add similarity values
+        for _ in range(count):
+            simi_list.append(similarity)
+    return eval_list, simi_list, taxa_dict
+
+
+def calculate_cluster_taxa(taxa_dict, args):
+    """
+    Calculate the most likely taxa for a cluster based on read counts.
+
+    This function applies the 'uncertain taxa use ratio' for unannotated reads
+    and uses a recursive approach to potentially split a cluster into sub-clusters
+    if taxonomic dominance is not strong enough (based on ``min_to_split``
+    and ``min_count_to_split``).
+
+    :param taxa_dict: Mapping of taxa (full string) -> read counts.
+    :type taxa_dict: dict[str, int]
+    :param args: Parsed script arguments, including parameters for taxa calculation.
+    :type args: argparse.Namespace
+    :return: A list of refined taxa assignments (dictionaries), where each dictionary
+             represents a potentially split sub-cluster.
+    :rtype: list[dict[str, float | int]]
+    """
+    # Replace 'Unannotated read' with uncertain taxa format
+    processed_dict = {}
+    for taxa, count in taxa_dict.items():
+        if taxa == 'Unannotated read':
+            uncertain_taxa = ' / '.join(['Uncertain taxa'] * 7)
+            processed_dict[uncertain_taxa] = count
+        else:
+            processed_dict[taxa] = count
+
+    return _recursive_taxa_calculation(processed_dict, args, 0)
+
+
+def _recursive_taxa_calculation(taxa_dict, args, level):
+    """
+    Recursive helper to calculate and potentially split taxa at each taxonomic level.
+
+    :param taxa_dict: Taxa counts at the current level (or sub-group).
+    :type taxa_dict: dict[str, float | int]
+    :param args: Parsed script arguments.
+    :type args: argparse.Namespace
+    :param level: Index of the current taxonomic level (0=kingdom, max 6=species).
+    :type level: int
+    :return: List of refined taxa dictionaries.
+    :rtype: list[dict[str, float | int]]
+    """
+    if level >= 7:  # Max 7 taxonomic levels
+        return [taxa_dict]
+
+    level_dict = defaultdict(float)
+
+    # Group by taxonomic level
+    for taxa, count in taxa_dict.items():
+        taxa_parts = taxa.split(' / ')
+        if level < len(taxa_parts):
+            level_taxon = taxa_parts[level]
+            if level_taxon == 'Uncertain taxa':
+                level_dict[level_taxon] += count * args.uncertain_taxa_use_ratio
+            else:
+                level_dict[level_taxon] += count
+
+    if len(level_dict) <= 1:
+        # Only one taxon at this level, continue to next level
+        return _recursive_taxa_calculation(taxa_dict, args, level + 1)
+
+    # Sort by abundance
+    sorted_taxa = sorted(level_dict.items(), key=lambda x: x[1], reverse=True)
+
+    result = []
+    dominant_taxon = sorted_taxa[0][0]
+
+    # Check if we should split
+    for i in range(1, len(sorted_taxa)):
+        secondary_taxon = sorted_taxa[i][0]
+        total_count = sorted_taxa[0][1] + sorted_taxa[i][1]
+        ratio = sorted_taxa[i][1] / total_count
+
+        if ratio >= args.min_to_split or total_count <= args.min_count_to_split:
+            # Split off this taxon
+            split_dict = {taxa: count for taxa, count in taxa_dict.items()
+                          if taxa.split(' / ')[level] == secondary_taxon}
+            result.extend(_recursive_taxa_calculation(split_dict, args, level + 1))
+
+    # Process the dominant group
+    dominant_dict = {taxa: count for taxa, count in taxa_dict.items()
+                     if taxa.split(' / ')[level] == dominant_taxon}
+    result.extend(_recursive_taxa_calculation(dominant_dict, args, level + 1))
+
+    return result
+
+
+def write_similarity_output(all_simi_data, output_file):
+    """
+    Write the similarity text output, including the mean and standard deviation,
+    and a count per similarity percentage.
+
+    :param all_simi_data: List of all similarity percentages from all reads.
+    :type all_simi_data: list[float]
+    :param output_file: Path to the output file.
+    :type output_file: str
+    :return: None
+    :rtype: None
+    """
+    if not all_simi_data:
+        return
+
+    avg = sum(all_simi_data) / len(all_simi_data)
+    variance = sum((s - avg) ** 2 for s in all_simi_data) / len(all_simi_data)
+    st_dev = sqrt(variance)
+
+    simi_counter = Counter(all_simi_data)
+    simi_sorted = sorted(simi_counter.items(), key=lambda x: -x[0])
+
+    with open(output_file, 'w') as f:
+        f.write(f"# Average similarity: {avg:.2f}\n")
+        f.write(f"# Standard deviation: {st_dev:.2f}\n")
+        f.write("similarity\tcount\n")
+        for similarity, count in simi_sorted:
+            f.write(f"{similarity}\t{count}\n")
+
+
+def write_evalue_output(all_eval_data, output_file):
+    """
+    Write the E-value text output, including the count of unannotated reads
+    and a count per E-value.
+
+    :param all_eval_data: List of E-values from all reads. The first element ([0]) is the count of unannotated reads.
+    :type all_eval_data: list[float | int]
+    :param output_file: Path to the output file.
+    :type output_file: str
+    :return: None
+    :rtype: None
+    """
+    unanno_count = all_eval_data[0]
+    eval_counter = Counter(all_eval_data[1:])
+
+    with open(output_file, 'w') as f:
+        f.write("evalue\tcount\n")
+        if unanno_count > 0:
+            f.write(f"unannotated\t{unanno_count}\n")
+
+        eval_sorted = sorted(eval_counter.items(),
+                             key=lambda x: (-x[1], float(x[0]) if isinstance(x[0], (int, float)) else float('inf')))
+        for value, count in eval_sorted:
+            f.write(f"{value}\t{count}\n")
+
+
+def write_count_output(all_eval_data, cluster_data_list, output_file):
+    """
+    Write a summary of annotated and unannotated read counts per cluster
+    and for the total sample.
+
+    :param all_eval_data: List of E-values from all reads for the total count summary.
+    :type all_eval_data: list[float | int]
+    :param cluster_data_list: List of tuples (eval_list, simi_list, taxa_dict) per cluster.
+    :type cluster_data_list: list[tuple]
+    :param output_file: Path to the output file.
+    :type output_file: str
+    :return: None
+    :rtype: None
+    """
+    with open(output_file, 'w') as f:
+        f.write("cluster\tunannotated\tannotated\ttotal\tperc_unannotated\tperc_annotated\n")
+
+        for cluster_num, (eval_list, _, _) in enumerate(cluster_data_list):
+            unannotated = eval_list[0]
+            annotated = len(eval_list[1:])
+            total = unannotated + annotated
+
+            if total > 0:
+                perc_annotated = (annotated / total) * 100
+                perc_unannotated = (unannotated / total) * 100
+            else:
+                perc_annotated = perc_unannotated = 0
+
+            f.write(
+                f"{cluster_num}\t{unannotated}\t{annotated}\t{total}\t{perc_unannotated:.2f}\t{perc_annotated:.2f}\n")
+
+        # Add full sample summary
+        total_unannotated = all_eval_data[0]
+        total_annotated = len(all_eval_data[1:])
+        grand_total = total_unannotated + total_annotated
+
+        if grand_total > 0:
+            perc_annotated = (total_annotated / grand_total) * 100
+            perc_unannotated = (total_unannotated / grand_total) * 100
+        else:
+            perc_annotated = perc_unannotated = 0
+
+        f.write(
+            f"TOTAL\t{total_unannotated}\t{total_annotated}\t{grand_total}\t{perc_unannotated:.2f}\t{perc_annotated:.2f}\n")
+
+
+def write_taxa_clusters_output(cluster_data_list, output_file):
+    """
+    Write raw taxa information per cluster to an Excel file.
+
+    Each row contains the cluster number, read count, the full taxa string,
+    and the separate taxonomic levels (kingdom through species).
+
+    :param cluster_data_list: List of tuples (eval_list, simi_list, taxa_dict) per cluster.
+    :type cluster_data_list: list[tuple]
+    :param output_file: Path to the output Excel file.
+    :type output_file: str
+    :return: None
+    :rtype: None
+    """
+    # Create main dataframe
+    data = []
+    for cluster_num, (_, _, taxa_dict) in enumerate(cluster_data_list):
+        for taxa, count in taxa_dict.items():
+            if count > 0:
+                # Split taxa into taxonomic levels
+                taxa_levels = taxa.split(' / ') if taxa else []
+                taxa_levels += ['Unannotated read'] * (7 - len(taxa_levels))
+                try:
+                    data.append({
+                        'cluster': cluster_num,
+                        'count': count,
+                        'taxa_full': taxa,
+                        'kingdom': taxa_levels[0],
+                        'phylum': taxa_levels[1],
+                        'class': taxa_levels[2],
+                        'order': taxa_levels[3],
+                        'family': taxa_levels[4],
+                        'genus': taxa_levels[5],
+                        'species': taxa_levels[6]
+                    })
+                except IndexError as e:
+                    # Skip entries with incomplete taxonomic data
+                    print(f"Skipped entry in cluster {cluster_num}: incomplete taxonomic data for '{taxa}, error: {e}'")
+                    continue
+
+    df = pd.DataFrame(data)
+    # Write to Excel
+    temp_path = output_file + ".xlsx"
+    os.makedirs(os.path.dirname(temp_path), exist_ok=True)
+    with pd.ExcelWriter(temp_path, engine='openpyxl') as writer:
+        df.to_excel(writer, sheet_name='Raw_Taxa_Clusters', index=False, engine='openpyxl')
+    os.replace(temp_path, output_file)
+
+def write_taxa_processed_output(cluster_data_list, args, output_file):
+    """
+    Write processed (potentially split) taxa information to an Excel file.
+
+    This file contains the resulting sub-clusters from the taxonomic dominance
+    analysis and a separate sheet documenting the parameters used.
+
+    :param cluster_data_list: List of tuples (eval_list, simi_list, taxa_dict) per cluster.
+    :type cluster_data_list: list[tuple]
+    :param args: Parsed script arguments, used for taxa calculation and settings documentation.
+    :type args: argparse.Namespace
+    :param output_file: Path to the output Excel file.
+    :type output_file: str
+    :return: None
+    :rtype: None
+    """
+    # Create main dataframe
+    data = []
+    for cluster_num, (_, _, taxa_dict) in enumerate(cluster_data_list):
+        processed_taxa = calculate_cluster_taxa(taxa_dict, args)
+
+        for taxa_group in processed_taxa:
+            for taxa, count in taxa_group.items():
+                if 'Uncertain taxa / Uncertain taxa / Uncertain taxa' in taxa:
+                    if args.show_unannotated_clusters:
+                        data.append({
+                            'cluster': cluster_num,
+                            'count': count,
+                            'taxa_full': 'Unannotated read',
+                            'kingdom': 'Unannotated',
+                            'phylum': 'Unannotated',
+                            'class': 'Unannotated',
+                            'order': 'Unannotated',
+                            'family': 'Unannotated',
+                            'genus': 'Unannotated',
+                            'species': 'Unannotated'
+                        })
+                else:
+                    # Split taxa into taxonomic levels
+                    taxa_levels = taxa.split(' / ') if taxa else []
+
+                    try:
+                        data.append({
+                            'cluster': cluster_num,
+                            'count': count,
+                            'taxa_full': taxa,
+                            'kingdom': taxa_levels[0],
+                            'phylum': taxa_levels[1],
+                            'class': taxa_levels[2],
+                            'order': taxa_levels[3],
+                            'family': taxa_levels[4],
+                            'genus': taxa_levels[5],
+                            'species': taxa_levels[6]
+                        })
+                    except IndexError:
+                        # Skip entries with incomplete taxonomic data
+                        print(f"Skipped entry in cluster {cluster_num}: incomplete taxonomic data for '{taxa}'")
+                        continue
+
+    df = pd.DataFrame(data)
+
+    # Create settings dataframe
+    settings_data = [
+        ['uncertain_taxa_use_ratio', args.uncertain_taxa_use_ratio],
+        ['min_to_split', args.min_to_split],
+        ['min_count_to_split', args.min_count_to_split]
+    ]
+    settings_df = pd.DataFrame(settings_data, columns=['Parameter', 'Value'])
+
+    # Write to Excel with multiple sheets
+    temp_path = output_file + ".xlsx"
+    os.makedirs(os.path.dirname(temp_path), exist_ok=True)
+    with pd.ExcelWriter(temp_path, engine='openpyxl') as writer:
+        df.to_excel(writer, sheet_name='Processed_Taxa_Clusters', index=False, engine='openpyxl')
+        settings_df.to_excel(writer, sheet_name='Settings', index=False, engine='openpyxl')
+
+        # Auto-adjust column widths for better readability
+        for sheet_name in writer.sheets:
+            worksheet = writer.sheets[sheet_name]
+            for column in worksheet.columns:
+                max_length = 0
+                column_letter = column[0].column_letter
+                for cell in column:
+                    try:
+                        if len(str(cell.value)) > max_length:
+                            max_length = len(str(cell.value))
+                    except:
+                        pass
+                adjusted_width = min(max_length + 2, 50)  # Cap at 50 characters
+                worksheet.column_dimensions[column_letter].width = adjusted_width
+    os.replace(temp_path, output_file)
+
+def create_similarity_plot(all_simi_data, cluster_simi_lengths, args, output_file):
+    """
+    Create a bar plot showing the distribution of intra-cluster similarity values.
+
+    The plot uses different colors to distinguish reads belonging to different clusters.
+
+    :param all_simi_data: List of all similarity values, sorted descending.
+    :type all_simi_data: list[float]
+    :param cluster_simi_lengths: List of lengths of similarity data per cluster, used for coloring.
+    :type cluster_simi_lengths: list[int]
+    :param args: Parsed script arguments, used for plot y-limits.
+    :type args: argparse.Namespace
+    :param output_file: Path to the output plot file (e.g., .png).
+    :type output_file: str
+    :return: None
+    :rtype: None
+    """
+    if not all_simi_data:
+        return
+
+    sorted_simi_list = sorted(all_simi_data, reverse=True)
+    bar_positions = list(range(len(sorted_simi_list)))
+
+    # Create colormap for different clusters
+    colormap_full = []
+    for i, length in enumerate(cluster_simi_lengths):
+        color = COLORMAP[i % len(COLORMAP)]
+        colormap_full.extend([color] * length)
+
+    plt.figure(figsize=(12, 6))
+    plt.bar(bar_positions, sorted_simi_list, width=1, color=colormap_full)
+    plt.grid(axis='y', linestyle='--', color='gray', alpha=0.7)
+    plt.ylabel("Similarity (%)")
+    plt.xlabel("Reads")
+    plt.title("Intra-cluster Similarity Distribution")
+    plt.ylim(ymin=args.simi_plot_y_min, ymax=args.simi_plot_y_max)
+    plt.tight_layout()
+
+    plt.savefig(output_file, format='png', dpi=300, bbox_inches='tight')
+    plt.close()
+
+
+def create_evalue_plot(all_eval_data, cluster_eval_lengths, output_file):
+    """
+    Create a bar plot showing the distribution of E-values.
+
+    The y-axis is log-scaled and displays ``1/E-values``. Reads are ordered
+    by E-value (ascending). The plot uses different colors to distinguish reads
+    belonging to different clusters.
+
+    :param all_eval_data: List of E-values from all reads. Assumes E-values start at index 1.
+    :type all_eval_data: list[float | int]
+    :param cluster_eval_lengths: List of lengths of annotated E-value data per cluster, used for coloring.
+    :type cluster_eval_lengths: list[int]
+    :param output_file: Path to the output plot file (e.g., .png).
+    :type output_file: str
+    :return: None
+    :rtype: None
+    """
+    if len(all_eval_data) <= 1:  # Only unannotated reads
+        return
+
+    sorted_eval_list = sorted(all_eval_data[1:])  # Skip unannotated count
+
+    if not sorted_eval_list:
+        return
+
+    bar_positions = list(range(len(sorted_eval_list)))
+    bar_heights = [1 / e if e > 0 else 0 for e in sorted_eval_list]
+
+    # Create colormap for different clusters
+    colormap_full = []
+    for i, length in enumerate(cluster_eval_lengths):
+        color = COLORMAP[i % len(COLORMAP)]
+        colormap_full.extend([color] * length)
+
+    plt.figure(figsize=(12, 6))
+    plt.bar(bar_positions, bar_heights, width=1, color=colormap_full)
+    plt.yscale('log')
+    plt.grid(axis='y', linestyle='--', color='gray', alpha=0.7)
+    plt.ylabel("1/E-values")
+    plt.xlabel("Reads")
+    plt.title("E-value Distribution")
+    plt.tight_layout()
+
+    plt.savefig(output_file, format='png', dpi=300, bbox_inches='tight')
+    plt.close()
+
+def prepare_evalue_histogram(evalue_list, unannotated_list):
+    """
+    Generate histogram data for E-value distributions.
+
+    This function processes a list of E-values from BLAST or similar search
+    results, filters out invalid or zero entries, and computes histogram data
+    suitable for plotting. The histogram represents the frequency distribution
+    of E-values across all annotated hits.
+
+    :param evalue_list: List of E-values from BLAST hits
+    :type evalue_list: list[float | int]
+    :param unannotated_list: List of unannotated E-values
+    :type unannotated_list: list
+    :return: Tuple containing:
+        - **counts** (*numpy.ndarray*): Number of entries per histogram bin.
+        - **bins** (*numpy.ndarray*): Bin edges corresponding to the histogram.
+        Returns ``(None, None)`` if no valid data is available.
+    :rtype: tuple[numpy.ndarray, numpy.ndarray] | tuple[None, None]
+    :note:
+        - Only positive numeric E-values are included in the histogram.
+        - Uses 50 bins in the range (0, 1) for visualization consistency.
+    """
+    data = [ev for ev in evalue_list if isinstance(ev, (int, float)) and ev > 0]
+    if not data:
+        return None, None
+
+    counts, bins, _ = plt.hist(data, bins=50, range=(0, 1))
+    plt.close()
+    return counts, bins
+
+def create_evalue_plot_test(evalue_list, unannotated_list, output_file):
+    """
+    Create and save an E-value distribution plot, returning the computed histogram data.
+
+    This function visualizes the frequency distribution of E-values from BLAST or
+    annotation results. It saves the plot to the specified output file and returns
+    the histogram data (counts and bins) for testing with pytests.
+
+    :param evalue_list: List of numeric E-values to plot
+    :type evalue_list: list[float | int]
+    :param unannotated_list: Optional list of E-values for unannotated sequences.
+    :type unannotated_list: list
+    :param output_file: Path where the histogram image  will be saved.
+    :type output_file: str
+
+    :return: Tuple containing:
+        - **counts** (*numpy.ndarray*): Frequency counts per histogram bin.
+        - **bins** (*numpy.ndarray*): Histogram bin edges.
+        Returns ``(None, None)`` if no valid data was available for plotting.
+    :rtype: tuple[numpy.ndarray, numpy.ndarray] | tuple[None, None]
+    """
+    counts, bins = prepare_evalue_histogram(evalue_list, unannotated_list)
+    if counts is None:
+        return None, None
+
+    plt.hist([ev for ev in evalue_list if isinstance(ev, (int, float)) and ev > 0],
+             bins=50, range=(0, 1))
+    plt.xlabel("E-value")
+    plt.ylabel("Frequency")
+    plt.title("E-value Distribution")
+    plt.savefig(output_file)
+    plt.close()
+    return counts, bins
+
+
+def main(arg_list=None):
+    """
+    Main entry point of the script.
+
+    Parses arguments, processes cd-hit cluster data, aggregates results,
+    and generates requested outputs (text summaries, plots, and Excel reports).
+
+    :param arg_list: List of arguments for testing purposes.
+    :type arg_list: list, optional
+    :return: None
+    :rtype: None
+    """
+    args = parse_arguments(arg_list)
+   # Parse cluster file
+    clusters = parse_cluster_file(
+        args.input_cluster,
+        args.input_annotation,
+        args.print_empty_files
+    )
+    # Process each cluster
+    all_eval_data = [0]  # For full sample statistics
+    all_simi_data = []
+    cluster_eval_lengths = []
+    cluster_simi_lengths = []
+    cluster_data_list = []
+
+    for cluster in clusters:
+        eval_list, simi_list, taxa_dict = process_cluster_data(cluster)
+        cluster_data_list.append((eval_list, simi_list, taxa_dict))
+        # Collect data for full sample plots
+        all_eval_data[0] += eval_list[0]
+        if len(eval_list) > 1:
+            all_eval_data.extend(sorted(eval_list[1:]))
+            cluster_eval_lengths.append(len(eval_list[1:]))
+
+        if simi_list:
+            all_simi_data.extend(sorted(simi_list, reverse=True))
+            cluster_simi_lengths.append(len(simi_list))
+
+    # Generate outputs based on what was requested
+    if args.output_similarity_txt:
+        write_similarity_output(all_simi_data, args.output_similarity_txt)
+
+    if args.output_similarity_plot and all_simi_data:
+        create_similarity_plot(all_simi_data, cluster_simi_lengths, args, args.output_similarity_plot)
+
+    if args.output_evalue_txt:
+        write_evalue_output(all_eval_data, args.output_evalue_txt)
+
+    if args.output_evalue_plot and len(all_eval_data) > 1:
+        create_evalue_plot(all_eval_data, cluster_eval_lengths, args.output_evalue_plot)
+
+    if args.output_count:
+        write_count_output(all_eval_data, cluster_data_list, args.output_count)
+
+    if args.output_taxa_clusters:
+        write_taxa_clusters_output(cluster_data_list, args.output_taxa_clusters)
+
+    if args.output_taxa_processed:
+        write_taxa_processed_output(cluster_data_list, args, args.output_taxa_processed)
+
+    print(f"Processing complete. Processed {len(clusters)} clusters.")
+
+
+if __name__ == "__main__":
+    main()
\ No newline at end of file