view cdhit_analysis.py @ 3:c6981ea453ae draft default tip

planemo upload for repository https://github.com/Onnodg/Naturalis_NLOOR/tree/main/NLOOR_scripts/process_clusters_tool commit ef31054ae26e19eff2f1b1f6c7979e39c47c0d5b-dirty
author onnodg
date Fri, 24 Oct 2025 09:38:24 +0000
parents 706b7acdb230
children
line wrap: on
line source

"""
This script processes cluster output files from cd-hit-est for use in Galaxy.
It extracts cluster information, associates taxa and e-values from annotation files,
performs statistical calculations, and generates text and plot outputs
summarizing similarity and taxonomic distributions.


Main steps:
1. Parse cd-hit-est cluster file and (optional) annotation file.
2. Process each cluster to extract similarity, taxa, and e-value information.
3. Aggregate results across clusters.
4. Generate requested outputs: text summaries, plots, and Excel reports.
"""

import argparse
from collections import Counter, defaultdict
import os
import re
import matplotlib.pyplot as plt
import pandas as pd
from math import sqrt
import openpyxl



def parse_arguments(args_list=None):
    """Parse command-line arguments for the script."""
    parser = argparse.ArgumentParser(
        description='Create taxa analysis from cd-hit cluster files')
    parser.add_argument('--input_cluster', type=str, required=True,
                        help='Input cluster file (.clstr)')
    parser.add_argument('--input_annotation', type=str, required=False,
                        help='Input annotation file (.out)')

    # Galaxy output files
    parser.add_argument('--output_similarity_txt', type=str,
                        help='Similarity text output file')
    parser.add_argument('--output_similarity_plot', type=str,
                        help='Similarity plot output file')
    parser.add_argument('--output_evalue_txt', type=str,
                        help='E-value text output file')
    parser.add_argument('--output_evalue_plot', type=str,
                        help='E-value plot output file')
    parser.add_argument('--output_count', type=str,
                        help='Count summary output file')
    parser.add_argument('--output_taxa_clusters', type=str,
                        help='Taxa per cluster output file')
    parser.add_argument('--output_taxa_processed', type=str,
                        help='Processed taxa output file')
    # Plot parameters
    parser.add_argument('--simi_plot_y_min', type=float, default=95.0,
                        help='Minimum value of the y-axis in the similarity plot')
    parser.add_argument('--simi_plot_y_max', type=float, default=100.0,
                        help='Maximum value of the y-axis in the similarity plot')

    # Uncertain taxa configuration
    parser.add_argument('--uncertain_taxa_use_ratio', type=float, default=0.5,
                        help='Ratio at which uncertain taxa count toward the correct taxa')
    parser.add_argument('--min_to_split', type=float, default=0.45,
                        help='Minimum percentage for taxonomic split')
    parser.add_argument('--min_count_to_split', type=int, default=10,
                        help='Minimum count for taxonomic split')

    # Processing options
    parser.add_argument('--show_unannotated_clusters', action='store_true', default=False,
                        help='Show unannotated clusters in output')
    parser.add_argument('--make_taxa_in_cluster_split', action='store_true', default=False,
                        help='Split clusters with multiple taxa')
    parser.add_argument('--print_empty_files', action='store_true', default=False,
                        help='Print messages about empty annotation files')

    return parser.parse_args(args_list)


# Color map for plots
COLORMAP = [
# List of RGBA tuples for bar coloring in plots
    (0.12156862745098039, 0.4666666666666667, 0.7058823529411765, 1.0),
    (1.0, 0.4980392156862745, 0.054901960784313725, 1.0),
    (0.17254901960784313, 0.6274509803921569, 0.17254901960784313, 1.0),
    (0.8392156862745098, 0.15294117647058825, 0.1568627450980392, 1.0),
    (0.5803921568627451, 0.403921568627451, 0.7411764705882353, 1.0),
    (0.5490196078431373, 0.33725490196078434, 0.29411764705882354, 1.0),
    (0.8901960784313725, 0.4666666666666667, 0.7607843137254902, 1.0),
    (0.4980392156862745, 0.4980392156862745, 0.4980392156862745, 1.0),
    (0.7372549019607844, 0.7411764705882353, 0.13333333333333333, 1.0),
    (0.09019607843137255, 0.7450980392156863, 0.8117647058823529, 1.0)
]


def parse_cluster_file(cluster_file, annotation_file=None, print_empty=False, raise_on_error=False):
    """
    Parse the cd-hit-est cluster file (.clstr) and (optionally) an Excel annotation file.

    It extracts cluster information (header, read count, similarity) and associates
    taxonomic information and E-values from the annotation file based on the read header.

    :param cluster_file: Path to cd-hit cluster file (.clstr).
    :type cluster_file: str
    :param annotation_file: Path to Excel annotation file with taxa and e-values.
    :type annotation_file: str, optional
    :param print_empty: Print a message if the annotation file is not found or empty.
    :type print_empty: bool
    :param raise_on_error: Raise parsing errors instead of printing warnings.
    :type raise_on_error: bool
    :return: List of clusters, where each cluster is a dict mapping read header to a dict of read info.
    :rtype: list[dict[str, dict]]
    :raises ValueError: If similarity cannot be parsed from a cluster line.
    :raises UnboundLocalError: If an error occurs during annotation processing.
    """
    clusters = []
    current_cluster = {}

    # Load annotations if provided
    annotations = {}
    if annotation_file and os.path.exists(annotation_file):
        # Lees het Excel-bestand
        df = pd.read_excel(annotation_file, sheet_name='Individual_Reads', engine='openpyxl')

        # Itereer over de rijen
        for _, row in df.iterrows():
            header = row['header']  # kolomnaam zoals in Excel
            evalue = row['e_value']  # of de kolomnaam die je wilt gebruiken
            taxa = row['taxa']  # afhankelijk van hoe je taxa opslaat

            annotations[header] = {'evalue': evalue, 'taxa': taxa}
    elif annotation_file and print_empty:
        print(f"Annotation file {annotation_file} not found or empty")
    with open(cluster_file, 'r') as f:
        for line in f:
            line = line.strip()
            if line.startswith('>Cluster'):
                # Start of new cluster, save previous if exists
                if current_cluster:
                    clusters.append(current_cluster)
                    current_cluster = {}
            else:
                # Parse sequence line
                parts = line.split()
                if len(parts) >= 2:
                    # Extract header and count
                    header_part = parts[2].strip('>.')
                    header_parts = header_part.split('(')
                    if len(header_parts) > 1:
                        last_part = header_parts[-1].strip(')')
                        header = header_parts[0]
                        if last_part:
                            count = int(last_part)
                        else:
                            print('no count')
                            count = 0
                            header = ''
                    # Extract similarity
                    similarity_part = parts[-1].strip()
                    if '*' in similarity_part:
                        similarity = 100.0  # Representative sequence
                    else:
                        matches = re.findall(r'[\d.]+', similarity_part)
                        if matches:
                            similarity = float(matches[-1])
                        else:
                            raise ValueError(f"Could not parse similarity from: '{similarity_part}'")
                    # Get annotation info
                    try:
                        if header in annotations:
                            taxa = annotations[header]['taxa']
                            evalue = annotations[header]['evalue']
                        else:
                            taxa = 'Unannotated read'
                            evalue = 'Unannotated read'

                        current_cluster[header] = {
                            'count': count,
                            'similarity': similarity,
                            'taxa': taxa,
                            'evalue': evalue
                        }
                    except UnboundLocalError as e:
                        if raise_on_error:
                            raise UnboundLocalError(str(e))
                        print(f"Error: {e}, No annotations found")

    # Add the last cluster
    if current_cluster:
        clusters.append(current_cluster)

    return clusters


def process_cluster_data(cluster):
    """
    Process a single cluster to extract E-value, similarity, and taxa data for all reads.

    Aggregates information from all reads in the cluster, storing read counts,
    E-values, similarities, and taxa in lists and a dictionary.

    :param cluster: Cluster data mapping read headers to read info.
    :type cluster: dict
    :return: A tuple containing: (list of E-values, list of similarity values, dict of taxa -> counts).
             The first element of the E-value list ([0]) stores the count of unannotated reads.
    :rtype: tuple[list[float | int], list[float], dict[str, int]]
    """
    eval_list = [0.0]  # First element for unannotated count
    simi_list = []
    taxa_dict = {'Unannotated read': 0}

    for header, info in cluster.items():
        count = info['count']
        similarity = info['similarity']
        taxa = info['taxa']
        evalue = info['evalue']

        if evalue == 'Unannotated read':
            eval_list[0] += count
            taxa_dict['Unannotated read'] += count
        else:
            try:
                eval_val = float(evalue)
                for _ in range(count):
                    eval_list.append(eval_val)
            except ValueError:
                eval_list[0] += count

            if taxa not in taxa_dict:
                taxa_dict[taxa] = 0
            taxa_dict[taxa] += count

        # Add similarity values
        for _ in range(count):
            simi_list.append(similarity)
    return eval_list, simi_list, taxa_dict


def calculate_cluster_taxa(taxa_dict, args):
    """
    Calculate the most likely taxa for a cluster based on read counts.

    This function applies the 'uncertain taxa use ratio' for unannotated reads
    and uses a recursive approach to potentially split a cluster into sub-clusters
    if taxonomic dominance is not strong enough (based on ``min_to_split``
    and ``min_count_to_split``).

    :param taxa_dict: Mapping of taxa (full string) -> read counts.
    :type taxa_dict: dict[str, int]
    :param args: Parsed script arguments, including parameters for taxa calculation.
    :type args: argparse.Namespace
    :return: A list of refined taxa assignments (dictionaries), where each dictionary
             represents a potentially split sub-cluster.
    :rtype: list[dict[str, float | int]]
    """
    # Replace 'Unannotated read' with uncertain taxa format
    processed_dict = {}
    for taxa, count in taxa_dict.items():
        if taxa == 'Unannotated read':
            uncertain_taxa = ' / '.join(['Uncertain taxa'] * 7)
            processed_dict[uncertain_taxa] = count
        else:
            processed_dict[taxa] = count

    return _recursive_taxa_calculation(processed_dict, args, 0)


def _recursive_taxa_calculation(taxa_dict, args, level):
    """
    Recursive helper to calculate and potentially split taxa at each taxonomic level.

    :param taxa_dict: Taxa counts at the current level (or sub-group).
    :type taxa_dict: dict[str, float | int]
    :param args: Parsed script arguments.
    :type args: argparse.Namespace
    :param level: Index of the current taxonomic level (0=kingdom, max 6=species).
    :type level: int
    :return: List of refined taxa dictionaries.
    :rtype: list[dict[str, float | int]]
    """
    if level >= 7:  # Max 7 taxonomic levels
        return [taxa_dict]

    level_dict = defaultdict(float)

    # Group by taxonomic level
    for taxa, count in taxa_dict.items():
        taxa_parts = taxa.split(' / ')
        if level < len(taxa_parts):
            level_taxon = taxa_parts[level]
            if level_taxon == 'Uncertain taxa':
                level_dict[level_taxon] += count * args.uncertain_taxa_use_ratio
            else:
                level_dict[level_taxon] += count

    if len(level_dict) <= 1:
        # Only one taxon at this level, continue to next level
        return _recursive_taxa_calculation(taxa_dict, args, level + 1)

    # Sort by abundance
    sorted_taxa = sorted(level_dict.items(), key=lambda x: x[1], reverse=True)

    result = []
    dominant_taxon = sorted_taxa[0][0]

    # Check if we should split
    for i in range(1, len(sorted_taxa)):
        secondary_taxon = sorted_taxa[i][0]
        total_count = sorted_taxa[0][1] + sorted_taxa[i][1]
        ratio = sorted_taxa[i][1] / total_count

        if ratio >= args.min_to_split or total_count <= args.min_count_to_split:
            # Split off this taxon
            split_dict = {taxa: count for taxa, count in taxa_dict.items()
                          if taxa.split(' / ')[level] == secondary_taxon}
            result.extend(_recursive_taxa_calculation(split_dict, args, level + 1))

    # Process the dominant group
    dominant_dict = {taxa: count for taxa, count in taxa_dict.items()
                     if taxa.split(' / ')[level] == dominant_taxon}
    result.extend(_recursive_taxa_calculation(dominant_dict, args, level + 1))

    return result


def write_similarity_output(all_simi_data, output_file):
    """
    Write the similarity text output, including the mean and standard deviation,
    and a count per similarity percentage.

    :param all_simi_data: List of all similarity percentages from all reads.
    :type all_simi_data: list[float]
    :param output_file: Path to the output file.
    :type output_file: str
    :return: None
    :rtype: None
    """
    if not all_simi_data:
        return

    avg = sum(all_simi_data) / len(all_simi_data)
    variance = sum((s - avg) ** 2 for s in all_simi_data) / len(all_simi_data)
    st_dev = sqrt(variance)

    simi_counter = Counter(all_simi_data)
    simi_sorted = sorted(simi_counter.items(), key=lambda x: -x[0])

    with open(output_file, 'w') as f:
        f.write(f"# Average similarity: {avg:.2f}\n")
        f.write(f"# Standard deviation: {st_dev:.2f}\n")
        f.write("similarity\tcount\n")
        for similarity, count in simi_sorted:
            f.write(f"{similarity}\t{count}\n")


def write_evalue_output(all_eval_data, output_file):
    """
    Write the E-value text output, including the count of unannotated reads
    and a count per E-value.

    :param all_eval_data: List of E-values from all reads. The first element ([0]) is the count of unannotated reads.
    :type all_eval_data: list[float | int]
    :param output_file: Path to the output file.
    :type output_file: str
    :return: None
    :rtype: None
    """
    unanno_count = all_eval_data[0]
    eval_counter = Counter(all_eval_data[1:])

    with open(output_file, 'w') as f:
        f.write("evalue\tcount\n")
        if unanno_count > 0:
            f.write(f"unannotated\t{unanno_count}\n")

        eval_sorted = sorted(eval_counter.items(),
                             key=lambda x: (-x[1], float(x[0]) if isinstance(x[0], (int, float)) else float('inf')))
        for value, count in eval_sorted:
            f.write(f"{value}\t{count}\n")


def write_count_output(all_eval_data, cluster_data_list, output_file):
    """
    Write a summary of annotated and unannotated read counts per cluster
    and for the total sample.

    :param all_eval_data: List of E-values from all reads for the total count summary.
    :type all_eval_data: list[float | int]
    :param cluster_data_list: List of tuples (eval_list, simi_list, taxa_dict) per cluster.
    :type cluster_data_list: list[tuple]
    :param output_file: Path to the output file.
    :type output_file: str
    :return: None
    :rtype: None
    """
    with open(output_file, 'w') as f:
        f.write("cluster\tunannotated\tannotated\ttotal\tperc_unannotated\tperc_annotated\n")

        for cluster_num, (eval_list, _, _) in enumerate(cluster_data_list):
            unannotated = eval_list[0]
            annotated = len(eval_list[1:])
            total = unannotated + annotated

            if total > 0:
                perc_annotated = (annotated / total) * 100
                perc_unannotated = (unannotated / total) * 100
            else:
                perc_annotated = perc_unannotated = 0

            f.write(
                f"{cluster_num}\t{unannotated}\t{annotated}\t{total}\t{perc_unannotated:.2f}\t{perc_annotated:.2f}\n")

        # Add full sample summary
        total_unannotated = all_eval_data[0]
        total_annotated = len(all_eval_data[1:])
        grand_total = total_unannotated + total_annotated

        if grand_total > 0:
            perc_annotated = (total_annotated / grand_total) * 100
            perc_unannotated = (total_unannotated / grand_total) * 100
        else:
            perc_annotated = perc_unannotated = 0

        f.write(
            f"TOTAL\t{total_unannotated}\t{total_annotated}\t{grand_total}\t{perc_unannotated:.2f}\t{perc_annotated:.2f}\n")


def write_taxa_clusters_output(cluster_data_list, output_file):
    """
    Write raw taxa information per cluster to an Excel file.

    Each row contains the cluster number, read count, the full taxa string,
    and the separate taxonomic levels (kingdom through species).

    :param cluster_data_list: List of tuples (eval_list, simi_list, taxa_dict) per cluster.
    :type cluster_data_list: list[tuple]
    :param output_file: Path to the output Excel file.
    :type output_file: str
    :return: None
    :rtype: None
    """
    # Create main dataframe
    data = []
    for cluster_num, (_, _, taxa_dict) in enumerate(cluster_data_list):
        for taxa, count in taxa_dict.items():
            if count > 0:
                # Split taxa into taxonomic levels
                taxa_levels = taxa.split(' / ') if taxa else []
                taxa_levels += ['Unannotated read'] * (7 - len(taxa_levels))
                try:
                    data.append({
                        'cluster': cluster_num,
                        'count': count,
                        'taxa_full': taxa,
                        'kingdom': taxa_levels[0],
                        'phylum': taxa_levels[1],
                        'class': taxa_levels[2],
                        'order': taxa_levels[3],
                        'family': taxa_levels[4],
                        'genus': taxa_levels[5],
                        'species': taxa_levels[6]
                    })
                except IndexError as e:
                    # Skip entries with incomplete taxonomic data
                    print(f"Skipped entry in cluster {cluster_num}: incomplete taxonomic data for '{taxa}, error: {e}'")
                    continue

    df = pd.DataFrame(data)
    # Write to Excel
    temp_path = output_file + ".xlsx"
    os.makedirs(os.path.dirname(temp_path), exist_ok=True)
    with pd.ExcelWriter(temp_path, engine='openpyxl') as writer:
        df.to_excel(writer, sheet_name='Raw_Taxa_Clusters', index=False, engine='openpyxl')
    os.replace(temp_path, output_file)

def write_taxa_processed_output(cluster_data_list, args, output_file):
    """
    Write processed (potentially split) taxa information to an Excel file.

    This file contains the resulting sub-clusters from the taxonomic dominance
    analysis and a separate sheet documenting the parameters used.

    :param cluster_data_list: List of tuples (eval_list, simi_list, taxa_dict) per cluster.
    :type cluster_data_list: list[tuple]
    :param args: Parsed script arguments, used for taxa calculation and settings documentation.
    :type args: argparse.Namespace
    :param output_file: Path to the output Excel file.
    :type output_file: str
    :return: None
    :rtype: None
    """
    # Create main dataframe
    data = []
    for cluster_num, (_, _, taxa_dict) in enumerate(cluster_data_list):
        processed_taxa = calculate_cluster_taxa(taxa_dict, args)

        for taxa_group in processed_taxa:
            for taxa, count in taxa_group.items():
                if 'Uncertain taxa / Uncertain taxa / Uncertain taxa' in taxa:
                    if args.show_unannotated_clusters:
                        data.append({
                            'cluster': cluster_num,
                            'count': count,
                            'taxa_full': 'Unannotated read',
                            'kingdom': 'Unannotated',
                            'phylum': 'Unannotated',
                            'class': 'Unannotated',
                            'order': 'Unannotated',
                            'family': 'Unannotated',
                            'genus': 'Unannotated',
                            'species': 'Unannotated'
                        })
                else:
                    # Split taxa into taxonomic levels
                    taxa_levels = taxa.split(' / ') if taxa else []

                    try:
                        data.append({
                            'cluster': cluster_num,
                            'count': count,
                            'taxa_full': taxa,
                            'kingdom': taxa_levels[0],
                            'phylum': taxa_levels[1],
                            'class': taxa_levels[2],
                            'order': taxa_levels[3],
                            'family': taxa_levels[4],
                            'genus': taxa_levels[5],
                            'species': taxa_levels[6]
                        })
                    except IndexError:
                        # Skip entries with incomplete taxonomic data
                        print(f"Skipped entry in cluster {cluster_num}: incomplete taxonomic data for '{taxa}'")
                        continue

    df = pd.DataFrame(data)

    # Create settings dataframe
    settings_data = [
        ['uncertain_taxa_use_ratio', args.uncertain_taxa_use_ratio],
        ['min_to_split', args.min_to_split],
        ['min_count_to_split', args.min_count_to_split]
    ]
    settings_df = pd.DataFrame(settings_data, columns=['Parameter', 'Value'])

    # Write to Excel with multiple sheets
    temp_path = output_file + ".xlsx"
    os.makedirs(os.path.dirname(temp_path), exist_ok=True)
    with pd.ExcelWriter(temp_path, engine='openpyxl') as writer:
        df.to_excel(writer, sheet_name='Processed_Taxa_Clusters', index=False, engine='openpyxl')
        settings_df.to_excel(writer, sheet_name='Settings', index=False, engine='openpyxl')

        # Auto-adjust column widths for better readability
        for sheet_name in writer.sheets:
            worksheet = writer.sheets[sheet_name]
            for column in worksheet.columns:
                max_length = 0
                column_letter = column[0].column_letter
                for cell in column:
                    try:
                        if len(str(cell.value)) > max_length:
                            max_length = len(str(cell.value))
                    except:
                        pass
                adjusted_width = min(max_length + 2, 50)  # Cap at 50 characters
                worksheet.column_dimensions[column_letter].width = adjusted_width
    os.replace(temp_path, output_file)

def create_similarity_plot(all_simi_data, cluster_simi_lengths, args, output_file):
    """
    Create a bar plot showing the distribution of intra-cluster similarity values.

    The plot uses different colors to distinguish reads belonging to different clusters.

    :param all_simi_data: List of all similarity values, sorted descending.
    :type all_simi_data: list[float]
    :param cluster_simi_lengths: List of lengths of similarity data per cluster, used for coloring.
    :type cluster_simi_lengths: list[int]
    :param args: Parsed script arguments, used for plot y-limits.
    :type args: argparse.Namespace
    :param output_file: Path to the output plot file (e.g., .png).
    :type output_file: str
    :return: None
    :rtype: None
    """
    if not all_simi_data:
        return

    sorted_simi_list = sorted(all_simi_data, reverse=True)
    bar_positions = list(range(len(sorted_simi_list)))

    # Create colormap for different clusters
    colormap_full = []
    for i, length in enumerate(cluster_simi_lengths):
        color = COLORMAP[i % len(COLORMAP)]
        colormap_full.extend([color] * length)

    plt.figure(figsize=(12, 6))
    plt.bar(bar_positions, sorted_simi_list, width=1, color=colormap_full)
    plt.grid(axis='y', linestyle='--', color='gray', alpha=0.7)
    plt.ylabel("Similarity (%)")
    plt.xlabel("Reads")
    plt.title("Intra-cluster Similarity Distribution")
    plt.ylim(ymin=args.simi_plot_y_min, ymax=args.simi_plot_y_max)
    plt.tight_layout()

    plt.savefig(output_file, format='png', dpi=300, bbox_inches='tight')
    plt.close()


def create_evalue_plot(all_eval_data, cluster_eval_lengths, output_file):
    """
    Create a bar plot showing the distribution of E-values.

    The y-axis is log-scaled and displays ``1/E-values``. Reads are ordered
    by E-value (ascending). The plot uses different colors to distinguish reads
    belonging to different clusters.

    :param all_eval_data: List of E-values from all reads. Assumes E-values start at index 1.
    :type all_eval_data: list[float | int]
    :param cluster_eval_lengths: List of lengths of annotated E-value data per cluster, used for coloring.
    :type cluster_eval_lengths: list[int]
    :param output_file: Path to the output plot file (e.g., .png).
    :type output_file: str
    :return: None
    :rtype: None
    """
    if len(all_eval_data) <= 1:  # Only unannotated reads
        return

    sorted_eval_list = sorted(all_eval_data[1:])  # Skip unannotated count

    if not sorted_eval_list:
        return

    bar_positions = list(range(len(sorted_eval_list)))
    bar_heights = [1 / e if e > 0 else 0 for e in sorted_eval_list]

    # Create colormap for different clusters
    colormap_full = []
    for i, length in enumerate(cluster_eval_lengths):
        color = COLORMAP[i % len(COLORMAP)]
        colormap_full.extend([color] * length)

    plt.figure(figsize=(12, 6))
    plt.bar(bar_positions, bar_heights, width=1, color=colormap_full)
    plt.yscale('log')
    plt.grid(axis='y', linestyle='--', color='gray', alpha=0.7)
    plt.ylabel("1/E-values")
    plt.xlabel("Reads")
    plt.title("E-value Distribution")
    plt.tight_layout()

    plt.savefig(output_file, format='png', dpi=300, bbox_inches='tight')
    plt.close()

def prepare_evalue_histogram(evalue_list, unannotated_list):
    """
    Generate histogram data for E-value distributions.

    This function processes a list of E-values from BLAST or similar search
    results, filters out invalid or zero entries, and computes histogram data
    suitable for plotting. The histogram represents the frequency distribution
    of E-values across all annotated hits.

    :param evalue_list: List of E-values from BLAST hits
    :type evalue_list: list[float | int]
    :param unannotated_list: List of unannotated E-values
    :type unannotated_list: list
    :return: Tuple containing:
        - **counts** (*numpy.ndarray*): Number of entries per histogram bin.
        - **bins** (*numpy.ndarray*): Bin edges corresponding to the histogram.
        Returns ``(None, None)`` if no valid data is available.
    :rtype: tuple[numpy.ndarray, numpy.ndarray] | tuple[None, None]
    :note:
        - Only positive numeric E-values are included in the histogram.
        - Uses 50 bins in the range (0, 1) for visualization consistency.
    """
    data = [ev for ev in evalue_list if isinstance(ev, (int, float)) and ev > 0]
    if not data:
        return None, None

    counts, bins, _ = plt.hist(data, bins=50, range=(0, 1))
    plt.close()
    return counts, bins

def create_evalue_plot_test(evalue_list, unannotated_list, output_file):
    """
    Create and save an E-value distribution plot, returning the computed histogram data.

    This function visualizes the frequency distribution of E-values from BLAST or
    annotation results. It saves the plot to the specified output file and returns
    the histogram data (counts and bins) for testing with pytests.

    :param evalue_list: List of numeric E-values to plot
    :type evalue_list: list[float | int]
    :param unannotated_list: Optional list of E-values for unannotated sequences.
    :type unannotated_list: list
    :param output_file: Path where the histogram image  will be saved.
    :type output_file: str

    :return: Tuple containing:
        - **counts** (*numpy.ndarray*): Frequency counts per histogram bin.
        - **bins** (*numpy.ndarray*): Histogram bin edges.
        Returns ``(None, None)`` if no valid data was available for plotting.
    :rtype: tuple[numpy.ndarray, numpy.ndarray] | tuple[None, None]
    """
    counts, bins = prepare_evalue_histogram(evalue_list, unannotated_list)
    if counts is None:
        return None, None

    plt.hist([ev for ev in evalue_list if isinstance(ev, (int, float)) and ev > 0],
             bins=50, range=(0, 1))
    plt.xlabel("E-value")
    plt.ylabel("Frequency")
    plt.title("E-value Distribution")
    plt.savefig(output_file)
    plt.close()
    return counts, bins


def main(arg_list=None):
    """
    Main entry point of the script.

    Parses arguments, processes cd-hit cluster data, aggregates results,
    and generates requested outputs (text summaries, plots, and Excel reports).

    :param arg_list: List of arguments for testing purposes.
    :type arg_list: list, optional
    :return: None
    :rtype: None
    """
    args = parse_arguments(arg_list)
   # Parse cluster file
    clusters = parse_cluster_file(
        args.input_cluster,
        args.input_annotation,
        args.print_empty_files
    )
    # Process each cluster
    all_eval_data = [0]  # For full sample statistics
    all_simi_data = []
    cluster_eval_lengths = []
    cluster_simi_lengths = []
    cluster_data_list = []

    for cluster in clusters:
        eval_list, simi_list, taxa_dict = process_cluster_data(cluster)
        cluster_data_list.append((eval_list, simi_list, taxa_dict))
        # Collect data for full sample plots
        all_eval_data[0] += eval_list[0]
        if len(eval_list) > 1:
            all_eval_data.extend(sorted(eval_list[1:]))
            cluster_eval_lengths.append(len(eval_list[1:]))

        if simi_list:
            all_simi_data.extend(sorted(simi_list, reverse=True))
            cluster_simi_lengths.append(len(simi_list))

    # Generate outputs based on what was requested
    if args.output_similarity_txt:
        write_similarity_output(all_simi_data, args.output_similarity_txt)

    if args.output_similarity_plot and all_simi_data:
        create_similarity_plot(all_simi_data, cluster_simi_lengths, args, args.output_similarity_plot)

    if args.output_evalue_txt:
        write_evalue_output(all_eval_data, args.output_evalue_txt)

    if args.output_evalue_plot and len(all_eval_data) > 1:
        create_evalue_plot(all_eval_data, cluster_eval_lengths, args.output_evalue_plot)

    if args.output_count:
        write_count_output(all_eval_data, cluster_data_list, args.output_count)

    if args.output_taxa_clusters:
        write_taxa_clusters_output(cluster_data_list, args.output_taxa_clusters)

    if args.output_taxa_processed:
        write_taxa_processed_output(cluster_data_list, args, args.output_taxa_processed)

    print(f"Processing complete. Processed {len(clusters)} clusters.")


if __name__ == "__main__":
    main()