view cdhit_analysis.py @ 4:e64af72e1b8f draft default tip

planemo upload for repository https://github.com/Onnodg/Naturalis_NLOOR/tree/main/NLOOR_scripts/process_clusters_tool commit 4017d38cf327c48a6252e488ba792527dae97a70-dirty
author onnodg
date Mon, 15 Dec 2025 16:44:40 +0000
parents c6981ea453ae
children
line wrap: on
line source

"""
This script processes cluster output files from cd-hit-est for use in Galaxy.
It extracts cluster information, associates taxa from annotation files,
performs statistical calculations, and generates text and plot outputs
summarizing similarity and taxonomic distributions.

Main steps:
1. Parse cd-hit-est cluster file and (optional) annotation Excel file.
2. Process each cluster to extract similarity and taxa information.
3. Aggregate results across clusters.
4. Generate requested outputs: text summaries, plots, and Excel reports.
"""

import argparse
from collections import Counter, defaultdict
import os
import re
from math import sqrt

import matplotlib.pyplot as plt
import pandas as pd
import openpyxl

def log_message(messages, text: str):
    """Append a message to the log list."""
    messages.append(text)


def parse_arguments(args_list=None):
    """Parse command-line arguments for the script."""
    parser = argparse.ArgumentParser(description="Create taxa analysis from cd-hit cluster files")
    parser.add_argument("--input_cluster", type=str, required=True, help="Input cluster file (.clstr)")
    parser.add_argument("--input_annotation", type=str, required=False, help="Input annotation Excel file (header annotations)")

    parser.add_argument("--output_similarity_txt", type=str, help="Similarity text output file")
    parser.add_argument("--output_similarity_plot",type=str, help="Similarity plot output file (PNG)")
    parser.add_argument("--output_count", type=str, help="Count summary output file")
    parser.add_argument("--output_excel", type=str, help="Count summary output file")
    parser.add_argument("--output_taxa_clusters", action="store_true", default=False, help="Excel output: include raw taxa per cluster sheet")
    parser.add_argument("--output_taxa_processed", action="store_true", default=False, help="Excel output: include processed/LCA taxa per cluster sheet")
    parser.add_argument("--log_file", type=str, help="Optional log file with run statistics")

    parser.add_argument("--simi_plot_y_min", type=float, default=95.0, help="Minimum value of the y-axis in the similarity plot")
    parser.add_argument("--simi_plot_y_max", type=float, default=100.0, help="Maximum value of the y-axis in the similarity plot")

    parser.add_argument("--uncertain_taxa_use_ratio", type=float, default=0.5, help="Ratio at which uncertain taxa count toward the dominant taxa")
    parser.add_argument("--min_to_split", type=float, default=0.45, help=("Minimum fraction (0–0.5) for splitting a cluster taxonomically. "
            "ratio = count(second) / (count(first) + count(second))"))
    parser.add_argument("--min_count_to_split", type=int, default=10, help=("Minimum combined count of the two top taxa to avoid being forced "
            "into an 'uncertain' split at this level."))

    parser.add_argument("--min_cluster_support", type=int, default=1, help=("Minimum number of annotated reads required for a cluster to be "
          "included in the processed/LCA taxa sheet. Unannotated reads do not count toward this threshold."))

    return parser.parse_args(args_list)


COLORMAP = [
    # List of RGBA tuples for bar coloring in plots
    (0.12156862745098039, 0.4666666666666667, 0.7058823529411765, 1.0),
    (1.0, 0.4980392156862745, 0.054901960784313725, 1.0),
    (0.17254901960784313, 0.6274509803921569, 0.17254901960784313, 1.0),
    (0.8392156862745098, 0.15294117647058825, 0.1568627450980392, 1.0),
    (0.5803921568627451, 0.403921568627451, 0.7411764705882353, 1.0),
    (0.5490196078431373, 0.33725490196078434, 0.29411764705882354, 1.0),
    (0.8901960784313725, 0.4666666666666667, 0.7607843137254902, 1.0),
    (0.4980392156862745, 0.4980392156862745, 0.4980392156862745, 1.0),
    (0.7372549019607844, 0.7411764705882353, 0.13333333333333333, 1.0),
    (0.09019607843137255, 0.7450980392156863, 0.8117647058823529, 1.0),
]


def parse_cluster_file(cluster_file, annotation_file, raise_on_error=False, log_messages=None):
    """Parse CD-HIT cluster output and attach taxonomic annotations.

    Cluster entries are parsed for read headers, counts, and similarities.
    If an annotation Excel file is supplied, taxa, seq_id and source fields
    are mapped per read from sheet ``Individual_Reads``.

    :param cluster_file: Path to the CD-HIT cluster file.
    :type cluster_file: str
    :param annotation_file: Optional Excel annotation file.
    :type annotation_file: str or None
    :param raise_on_error: Whether to raise parsing errors directly.
    :type raise_on_error: bool
    :param log_messages: Optional message collector for logging.
    :type log_messages: list[str] or None
    :return: List of clusters, where each cluster is a dict keyed by header.
    :rtype: list[dict]
    """
    clusters = []
    current_cluster = {}

    annotations = {}
    if annotation_file and os.path.exists(annotation_file):
        try:
            df = pd.read_excel(
                annotation_file,
                sheet_name="Individual_Reads",
                engine="openpyxl",
            )
            required_cols = {"header", "seq_id", "source", "taxa"}
            missing = required_cols - set(df.columns)
            if missing:
                msg = (
                    f"Annotation file missing columns {missing}, "
                    "continuing without annotations."
                )
                if log_messages is not None:
                    log_message(log_messages, msg)
            else:
                for _, row in df.iterrows():
                    header = str(row["header"])
                    seq_id = str(row["seq_id"])
                    source = str(row["source"])
                    taxa = str(row["taxa"])
                    annotations[header] = {
                        "seq_id": seq_id,
                        "source": source,
                        "taxa": taxa,
                    }
                if log_messages is not None:
                    log_message(
                        log_messages,
                        f"Loaded {len(annotations)} annotated headers",
                    )
        except Exception as exc:
            msg = (
                "Failed to read annotation file; proceeding without annotations. "
                f"Details: {exc}"
            )
            if log_messages is not None:
                log_message(log_messages, msg)
    else:
        if log_messages is not None:
            if annotation_file:
                log_message(
                    log_messages,
                    "Annotation file not found; proceeding without annotations.",
                )
            else:
                log_message(
                    log_messages,
                    "No annotation file provided; proceeding without annotations.",
                )

    with open(cluster_file, "r") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue

            if line.startswith(">Cluster"):
                # Start of new cluster, save previous if exists
                if current_cluster:
                    clusters.append(current_cluster)
                    current_cluster = {}
            else:
                parts = line.split()
                if len(parts) < 3:
                    continue

                header_part = parts[2].strip(">.")
                header_parts = header_part.split("(")
                header = header_parts[0]
                count = 0
                if len(header_parts) > 1:
                    last_part = header_parts[-1].strip(")")
                    if last_part:
                        try:
                            count = int(last_part)
                        except ValueError:
                            count = 0

                similarity_part = parts[-1].strip()
                if "*" in similarity_part:
                    similarity = 100.0
                else:
                    matches = re.findall(r"[\d.]+", similarity_part)
                    if matches:
                        similarity = float(matches[-1])
                    else:
                        msg = f"Could not parse similarity from: '{similarity_part}'"
                        if raise_on_error:
                            raise ValueError(msg)
                        if log_messages is not None:
                            log_message(log_messages, f"WARNING: {msg}")
                        similarity = 0.0

                if header in annotations:
                    taxa = annotations[header]["taxa"]
                    seq_id = annotations[header]["seq_id"]
                    source = annotations[header]["source"]
                else:
                    taxa = "Unannotated read"
                    seq_id = "Unannotated read"
                    source = "Unannotated read"

                current_cluster[header] = {
                    "count": count,
                    "similarity": similarity,
                    "taxa": taxa,
                    "seq_id": seq_id,
                    "source": source,
                }

    if current_cluster:
        clusters.append(current_cluster)

    if log_messages is not None:
        log_message(log_messages, f"Parsed {len(clusters)} clusters")

    return clusters


def process_cluster_data(cluster: dict):
    """Convert a raw cluster into similarity lists and aggregated taxa counts.

    Similarity values are expanded based on read count. Taxa are grouped
    by unique taxonomic labels, tracking supporting seq_ids and sources.

    :param cluster: Mapping of read header to cluster metadata.
    :type cluster: dict
    :return: similarity_list, taxa_map, annotated_count, unannotated_count
    :rtype: tuple[list[float], dict, int, int]
    """
    similarity_list: list[float] = []
    taxa_map: dict[str, dict] = {}
    annotated_count = 0
    unannotated_count = 0

    for _header, info in cluster.items():
        count = info["count"]
        similarity = info["similarity"]
        taxa = info["taxa"]
        seq_id = info["seq_id"]
        source = info["source"]

        similarity_list.extend([similarity] * count)

        key = taxa if seq_id != "Unannotated read" else "Unannotated read"
        if key not in taxa_map:
            taxa_map[key] = {
                "count": 0,
                "seq_ids": set(),
                "sources": set(),
            }

        taxa_map[key]["count"] += count
        taxa_map[key]["seq_ids"].add(seq_id)
        taxa_map[key]["sources"].add(source)

        if seq_id == "Unannotated read":
            unannotated_count += count
        else:
            annotated_count += count

    return similarity_list, taxa_map, annotated_count, unannotated_count



def calculate_cluster_taxa(taxa_dict, args):
    """Resolve cluster-level taxa using weighted recursive LCA splitting.

    Unannotated reads are treated as fully uncertain taxa but contribute
    partially via ``uncertain_taxa_use_ratio`` when determining dominant levels.

    :param taxa_dict: Mapping of taxa string to total read count.
    :type taxa_dict: dict[str, int]
    :param args: Argument namespace containing resolution parameters.
    :type args: argparse.Namespace
    :return: List of taxonomic groups after recursive resolution.
    :rtype: list[dict[str, int]]
    """
    processed_dict = {}
    for taxa, count in taxa_dict.items():
        if taxa == "Unannotated read":
            uncertain_taxa = " / ".join(["Uncertain taxa"] * 7)
            processed_dict[uncertain_taxa] = count
        else:
            processed_dict[taxa] = count

    return _recursive_taxa_calculation(processed_dict, args, 0)


def _recursive_taxa_calculation(taxa_dict, args, level):
    """Recursive helper performing level-by-level weighted LCA resolution.

    At each taxonomic rank, taxa are grouped and compared. If multiple
    taxa exceed the configured split thresholds, the cluster is divided
    and recursion continues deeper.

    :param taxa_dict: Mapping of taxa to count for this recursion stage.
    :type taxa_dict: dict[str, int]
    :param args: Parameter namespace controlling uncertainty and split rules.
    :type args: argparse.Namespace
    :param level: Current taxonomy depth (0–6).
    :type level: int
    :return: One or more resolved taxon groups at deeper levels.
    :rtype: list[dict[str, int]]
    """
    if level >= 7:
        return [taxa_dict]

    level_dict = defaultdict(float)
    for taxa, count in taxa_dict.items():
        taxa_parts = taxa.split(" / ")
        if level < len(taxa_parts):
            level_taxon = taxa_parts[level]
            if level_taxon == "Uncertain taxa":
                level_dict[level_taxon] += count * args.uncertain_taxa_use_ratio
            else:
                level_dict[level_taxon] += count

    if len(level_dict) <= 1:
        return _recursive_taxa_calculation(taxa_dict, args, level + 1)

    sorted_taxa = sorted(level_dict.items(), key=lambda x: x[1], reverse=True)
    result = []
    dominant_taxon = sorted_taxa[0][0]

    for i in range(1, len(sorted_taxa)):
        secondary_taxon = sorted_taxa[i][0]
        total_count = sorted_taxa[0][1] + sorted_taxa[i][1]
        ratio = sorted_taxa[i][1] / total_count

        if ratio >= args.min_to_split or total_count <= args.min_count_to_split:
            split_dict = {
                taxa: count
                for taxa, count in taxa_dict.items()
                if taxa.split(" / ")[level] == secondary_taxon
            }
            result.extend(_recursive_taxa_calculation(split_dict, args, level + 1))

    dominant_dict = {
        taxa: count
        for taxa, count in taxa_dict.items()
        if taxa.split(" / ")[level] == dominant_taxon
    }
    result.extend(_recursive_taxa_calculation(dominant_dict, args, level + 1))

    return result



def write_similarity_output(cluster_data_list, output_file, log_messages=None):
    """Write similarity statistics and per-cluster distributions.

    Output includes mean, standard deviation, and counts of each
    similarity value per cluster.

    :param cluster_data_list: List of processed cluster dictionaries.
    :type cluster_data_list: list[dict]
    :param output_file: Destination path for the TSV summary.
    :type output_file: str
    :param log_messages: Optional log collector.
    :type log_messages: list[str] or None
    :return: None
    :rtype: None
    """
    all_simi = []
    pair_counter = Counter()

    for cluster_index, cluster_data in enumerate(cluster_data_list):
        simi_list = cluster_data["similarities"]
        if not simi_list:
            continue
        all_simi.extend(simi_list)
        for s in simi_list:
            pair_counter[(cluster_index, s)] += 1

    if not all_simi:
        if log_messages is not None:
            log_message(
                log_messages,
                "No similarity data found; similarity text file not written.",
            )
        return

    avg = sum(all_simi) / len(all_simi)
    variance = sum((s - avg) ** 2 for s in all_simi) / len(all_simi)
    st_dev = sqrt(variance)

    with open(output_file, "w") as f:
        f.write(f"# Average similarity (all reads): {avg:.2f}\n")
        f.write(f"# Standard deviation (all reads): {st_dev:.2f}\n")
        f.write("cluster\tsimilarity\tcount\n")

        for (cluster_index, similarity), count in sorted(
            pair_counter.items(), key=lambda x: (x[0][0], -x[0][1])
        ):
            f.write(f"{cluster_index}\t{similarity}\t{count}\n")

    if log_messages is not None:
        log_message(log_messages, "Similarity text summary written succesfully")
        log_message(
            log_messages, f"Similarity mean = {avg:.2f}, std = {st_dev:.2f}"
        )


def write_count_output(cluster_data_list, output_file, log_messages=None):
    """Write counts of annotated and unannotated reads per cluster.

    A summary row containing total and percentage values is appended.

    :param cluster_data_list: List of processed clusters.
    :type cluster_data_list: list[dict]
    :param output_file: Path for the count summary text file.
    :type output_file: str
    :param log_messages: Optional logging accumulator.
    :type log_messages: list[str] or None
    :return: None
    :rtype: None
    """
    total_annotated = 0
    total_unannotated = 0

    with open(output_file, "w") as f:
        f.write(
            "cluster\tunannotated\tannotated\ttotal\tperc_unannotated\tperc_annotated\n"
        )

        for cluster_index, cluster_data in enumerate(cluster_data_list):
            unannotated = cluster_data["unannotated"]
            annotated = cluster_data["annotated"]
            total = unannotated + annotated

            total_annotated += annotated
            total_unannotated += unannotated

            if total > 0:
                perc_annotated = (annotated / total) * 100
                perc_unannotated = (unannotated / total) * 100
            else:
                perc_annotated = perc_unannotated = 0.0

            f.write(
                f"{cluster_index}\t{unannotated}\t{annotated}\t{total}"
                f"\t{perc_unannotated:.2f}\t{perc_annotated:.2f}\n"
            )

        grand_total = total_annotated + total_unannotated
        if grand_total > 0:
            perc_annotated = (total_annotated / grand_total) * 100
            perc_unannotated = (total_unannotated / grand_total) * 100
        else:
            perc_annotated = perc_unannotated = 0.0

        f.write(
            f"TOTAL\t{total_unannotated}\t{total_annotated}\t{grand_total}"
            f"\t{perc_unannotated:.2f}\t{perc_annotated:.2f}\n"
        )

    if log_messages is not None:
        log_message(log_messages, "Count summary written succesfully")
        log_message(
            log_messages,
            f"TOTAL annotated={total_annotated}, unannotated={total_unannotated}, total={grand_total}",
        )


def write_taxa_excel(cluster_data_list, args, output_file, write_raw: bool, write_processed: bool, log_messages=None):
    """Write raw and processed taxa data into a combined Excel report.

    Generates up to three sheets:

        - ``Raw_Taxa_Clusters``: Full taxa per read cluster (if enabled).
        - ``Processed_Taxa_Clusters``: LCA-resolved taxa (if enabled).
        - ``Settings``: Parameters used for taxonomic resolution.

    seq_id and source tracking are kept only in the raw sheet.

    :param cluster_data_list: List containing similarity/taxa data per cluster.
    :type cluster_data_list: list[dict]
    :param args: Parsed arguments containing LCA configuration.
    :type args: argparse.Namespace
    :param output_file: Path to the combined Excel file.
    :type output_file: str
    :param write_raw: Whether to include the raw taxa sheet.
    :type write_raw: bool
    :param write_processed: Whether to include the processed/LCA sheet.
    :type write_processed: bool
    :param log_messages: Optional log collector.
    :type log_messages: list[str] or None
    :return: None
    :rtype: None
    """
    raw_rows = []
    processed_rows = []

    if write_raw:
        for cluster_index, cluster_data in enumerate(cluster_data_list):
            taxa_map = cluster_data["taxa_map"]
            for taxa, info in taxa_map.items():
                count = info["count"]
                if count <= 0:
                    continue

                taxa_levels = taxa.split(" / ") if taxa else []
                while len(taxa_levels) < 7:
                    taxa_levels.append("Unannotated")

                seq_ids_str = ",".join(sorted(info["seq_ids"]))
                sources_str = ",".join(sorted(info["sources"]))

                raw_rows.append(
                    {
                        "cluster": cluster_index,
                        "count": count,
                        "seq_id": seq_ids_str,
                        "source": sources_str,
                        "taxa_full": taxa,
                        "kingdom": taxa_levels[0],
                        "phylum": taxa_levels[1],
                        "class": taxa_levels[2],
                        "order": taxa_levels[3],
                        "family": taxa_levels[4],
                        "genus": taxa_levels[5],
                        "species": taxa_levels[6],
                    }
                )

    if write_processed:
        for cluster_index, cluster_data in enumerate(cluster_data_list):
            taxa_map = cluster_data["taxa_map"]

            annotated_support = sum(
                info["count"]
                for taxa, info in taxa_map.items()
                if taxa != "Unannotated read"
            )

            if annotated_support < args.min_cluster_support:
                continue

            taxa_counts = {taxa: info["count"] for taxa, info in taxa_map.items()}
            processed_groups = calculate_cluster_taxa(taxa_counts, args)

            for group in processed_groups:
                for taxa, count in group.items():
                    if count <= 0:
                        continue

                    if "Uncertain taxa / Uncertain taxa / Uncertain taxa" in taxa:
                        continue
                    else:
                        info = taxa_map.get(taxa)
                        seq_ids_set = info["seq_ids"]
                        sources_set = info["sources"]
                        taxa_full = taxa
                        taxa_levels = taxa.split(" / ") if taxa else []
                        while len(taxa_levels) < 7:
                            taxa_levels.append("Unannotated")

                    seq_ids_str = ",".join(sorted(seq_ids_set))
                    sources_str = ",".join(sorted(sources_set))
                    processed_rows.append(
                        {
                            "cluster": cluster_index,
                            "count": count,
                            "seq_id": seq_ids_str,
                            "source": sources_str,
                            "taxa_full": taxa_full,
                            "kingdom": taxa_levels[0],
                            "phylum": taxa_levels[1],
                            "class": taxa_levels[2],
                            "order": taxa_levels[3],
                            "family": taxa_levels[4],
                            "genus": taxa_levels[5],
                            "species": taxa_levels[6],
                        }
                    )

    if not raw_rows and not processed_rows:
        if log_messages is not None:
            log_message(
                log_messages,
                "No taxa data to write; taxa Excel file not created.",
            )
        return

    raw_df = pd.DataFrame(raw_rows) if raw_rows else None
    processed_df = pd.DataFrame(processed_rows) if processed_rows else None

    settings_data = [
        ["uncertain_taxa_use_ratio", args.uncertain_taxa_use_ratio],
        ["min_to_split", args.min_to_split],
        ["min_count_to_split", args.min_count_to_split],
        ["min_cluster_support", args.min_cluster_support]
    ]
    settings_df = pd.DataFrame(settings_data, columns=["Parameter", "Value"])

    temp_path = output_file + ".tmp.xlsx"
    os.makedirs(os.path.dirname(temp_path), exist_ok=True)
    with pd.ExcelWriter(temp_path, engine="openpyxl") as writer:
        if raw_df is not None:
            raw_df.to_excel(
                writer, sheet_name="Raw_Taxa_Clusters", index=False
            )
        if processed_df is not None:
            processed_df.to_excel(
                writer, sheet_name="Processed_Taxa_Clusters", index=False
            )
        settings_df.to_excel(writer, sheet_name="Settings", index=False)

        for sheet_name in writer.sheets:
            worksheet = writer.sheets[sheet_name]
            for column in worksheet.columns:
                max_length = 0
                column_letter = column[0].column_letter
                for cell in column:
                    try:
                        cell_len = len(str(cell.value))
                        if cell_len > max_length:
                            max_length = cell_len
                    except AttributeError:
                        pass
                adjusted_width = min(max_length + 2, 50)
                worksheet.column_dimensions[column_letter].width = adjusted_width

    os.replace(temp_path, output_file)

    if log_messages is not None:
        if raw_df is not None:
            log_message(log_messages, "Raw taxa per cluster written succesfully")
        if processed_df is not None:
            log_message(
                log_messages,
                "Processed taxa (split clusters) written succesfully",
            )


def create_similarity_plot(all_simi_data, cluster_simi_lengths, args, output_file, log_messages=None):
    """Create and save a similarity distribution bar plot.

    Bars are colored per cluster using a fixed repeating colormap.

    :param all_simi_data: All similarity values across clusters.
    :type all_simi_data: list[float]
    :param cluster_simi_lengths: Per-cluster similarity list lengths.
    :type cluster_simi_lengths: list[int]
    :param args: Namespace containing y-axis plot limits.
    :type args: argparse.Namespace
    :param output_file: Output PNG file path.
    :type output_file: str
    :param log_messages: Optional log message list.
    :type log_messages: list[str] or None
    :return: None
    :rtype: None
    """
    if not all_simi_data:
        if log_messages is not None:
            log_message(log_messages, "No similarity data; similarity plot not created.")
        return

    sorted_simi_list = sorted(all_simi_data, reverse=True)
    bar_positions = list(range(len(sorted_simi_list)))

    colormap_full = []
    for i, length in enumerate(cluster_simi_lengths):
        color = COLORMAP[i % len(COLORMAP)]
        colormap_full.extend([color] * length)

    plt.figure(figsize=(12, 6))
    plt.bar(bar_positions, sorted_simi_list, width=1, color=colormap_full)
    plt.grid(axis="y", linestyle="--", color="gray", alpha=0.7)
    plt.ylabel("Similarity (%)")
    plt.xlabel("Reads")
    plt.title("Intra-cluster Similarity Distribution")
    plt.ylim(ymin=args.simi_plot_y_min, ymax=args.simi_plot_y_max)
    plt.tight_layout()

    plt.savefig(output_file, format="png", dpi=300, bbox_inches="tight")
    plt.close()

    if log_messages is not None:
        log_message(log_messages, "Similarity plot written succesfully")


def summarize_and_log(cluster_data_list, all_simi_data, log_messages, args):
    """Compute global summary statistics and append them to the log.

    Summary includes:
        - total clusters
        - total annotated/unannotated reads
        - per-cluster annotation presence
        - top taxa distribution
        - similarity mean and standard deviation

    :param cluster_data_list: List of processed cluster descriptors.
    :type cluster_data_list: list[dict]
    :param all_simi_data: All similarity values from all clusters.
    :type all_simi_data: list[float]
    :param log_messages: List collecting log output lines.
    :type log_messages: list[str]
    :param args: Argument namespace with configuration parameters.
    :type args: argparse.Namespace
    :return: None
    :rtype: None
    """
    total_clusters = len(cluster_data_list)
    total_annotated = sum(c["annotated"] for c in cluster_data_list)
    total_unannotated = sum(c["unannotated"] for c in cluster_data_list)
    grand_total = total_annotated + total_unannotated

    clusters_with_annotations = sum(
        1 for c in cluster_data_list if c["annotated"] > 0
    )
    clusters_unannotated_only = total_clusters - clusters_with_annotations

    log_message(log_messages, "=== SUMMARY ===")
    log_message(log_messages, f"Clusters parsed: {total_clusters}")
    log_message(log_messages, f"Total reads: {grand_total}")
    log_message(log_messages, f"Annotated reads: {total_annotated}")
    log_message(log_messages, f"Unannotated reads: {total_unannotated}")
    log_message(log_messages, f"Clusters with annotations: {clusters_with_annotations}")
    log_message(log_messages, f"Clusters fully unannotated: {clusters_unannotated_only}")
    log_message(log_messages, f"Minimum cluster support for processed taxa: {args.min_cluster_support}")

    taxa_counter = Counter()
    for c in cluster_data_list:
        for taxa, info in c["taxa_map"].items():
            if taxa == "Unannotated read":
                continue
            taxa_counter[taxa] += info["count"]

    if taxa_counter:
        log_message(log_messages, "=== TAXA SUMMARY (top 10) ===")
        for taxa, count in taxa_counter.most_common(10):
            log_message(log_messages, f"{taxa}: {count} reads")
        log_message(log_messages, f"Total unique taxa: {len(taxa_counter)}")
    else:
        log_message(log_messages, "No annotated taxa found.")

    if all_simi_data:
        avg = sum(all_simi_data) / len(all_simi_data)
        variance = sum((s - avg) ** 2 for s in all_simi_data) / len(all_simi_data)
        st_dev = sqrt(variance)
        log_message(log_messages, "=== SIMILARITY SUMMARY ===")
        log_message(log_messages, f"Mean similarity: {avg:.2f}")
        log_message(log_messages, f"Std similarity: {st_dev:.2f}")
    else:
        log_message(log_messages, "No similarity values available for summary.")



def main(arg_list=None):
    """Main entry point for CD-HIT cluster processing.

    Orchestrates parsing, cluster processing, statistic aggregation,
    visualization, and generation of all requested outputs.

    :param arg_list: Optional list of arguments (used by tests).
    :type arg_list: list[str] or None
    :return: None
    :rtype: None
    """
    args = parse_arguments(arg_list)
    log_messages: list[str] = []
    log_message(log_messages, "=== CD-HIT cluster processing started ===")

    log_message(log_messages, "cluster_file: provided")
    log_message(
        log_messages,
        "annotation_file: provided" if args.input_annotation else "annotation_file: none",
    )

    log_message(log_messages, "=== PARAMETERS ===")
    skip_keys = {
        "output_similarity_txt",
        "output_similarity_plot",
        "output_count",
        "output_excel",
        "input_cluster",
        "input_annotation",
        "log_file",
    }
    for key, value in vars(args).items():
        if key in skip_keys:
            continue
        log_message(log_messages, f"{key}: {value}")
    log_message(log_messages, "=== PARAMETERS END===")

    clusters = parse_cluster_file(
        args.input_cluster,
        args.input_annotation,
        raise_on_error=False,
        log_messages=log_messages,
    )

    cluster_data_list = []
    all_simi_data: list[float] = []
    cluster_simi_lengths: list[int] = []

    for cluster in clusters:
        simi_list, taxa_map, annotated_count, unannotated_count = process_cluster_data(
            cluster
        )
        cluster_data = {
            "similarities": simi_list,
            "taxa_map": taxa_map,
            "annotated": annotated_count,
            "unannotated": unannotated_count,
        }
        cluster_data_list.append(cluster_data)

        if simi_list:
            all_simi_data.extend(simi_list)
            cluster_simi_lengths.append(len(simi_list))

    if args.output_similarity_txt:
        write_similarity_output(
            cluster_data_list, args.output_similarity_txt, log_messages)

    if args.output_similarity_plot:
        create_similarity_plot(
            all_simi_data,
            cluster_simi_lengths,
            args,
            args.output_similarity_plot,
            log_messages)

    if args.output_count:
        write_count_output(cluster_data_list, args.output_count, log_messages)

    if args.output_excel:
        write_raw = bool(args.output_taxa_clusters)
        write_processed = bool(args.output_taxa_processed)

        if not write_raw and not write_processed:
            if log_messages is not None:
                log_message(log_messages, "output_excel provided but no taxa sheet flags set; no Excel file written.")
        else:
            write_taxa_excel(cluster_data_list,
                args,
                args.output_excel,
                write_raw=write_raw,
                write_processed=write_processed,
                log_messages=log_messages)
    else:
        if args.output_taxa_clusters or args.output_taxa_processed:
            if log_messages is not None:
                log_message(
                    log_messages,
                    "WARNING: Raw/processed taxa output flags set but no --output_excel path provided; skipping Excel output."
                )

    summarize_and_log(cluster_data_list, all_simi_data, log_messages, args)
    log_message(log_messages, "=== CD-HIT cluster processing finished ===")

    if args.log_file:
        os.makedirs(os.path.dirname(args.log_file), exist_ok=True)
        with open(args.log_file, "w") as f:
            f.write("\n".join(log_messages))


if __name__ == "__main__":
    main()