Mercurial > repos > iuc > clustering_from_distmat
view clustering_from_distmat.py @ 2:f8ee933de3ca draft default tip
planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/clustering_from_distmat/ commit e4b7fe74660f4d57fce7a5708bdbddaf769cc968
author | iuc |
---|---|
date | Mon, 16 Sep 2024 14:57:17 +0000 |
parents | c0b01c55a0e0 |
children |
line wrap: on
line source
import argparse import sys from collections import Counter import scipy def linkage_as_newick(linkage, tip_names): newick_parts = tip_names[::] within_cluster_distances = [0] * len(tip_names) for step in linkage: n1 = int(step[0]) n2 = int(step[1]) d = float(step[2]) d1 = d - within_cluster_distances[n1] d2 = d - within_cluster_distances[n2] id1 = newick_parts[n1] id2 = newick_parts[n2] part = f'({id1}:{d1 / 2},{id2}:{d2 / 2})' within_cluster_distances.append(d) newick_parts.append(part) return newick_parts[-1].format(*newick_parts) + ';' if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( 'infile', help='Distance matrix input file' ) parser.add_argument( 'out_prefix', help="Output prefix" ) parser.add_argument parser.add_argument( '-m', '--method', default="average", choices=[ "single", "complete", "average", "weighted", "centroid", "median", "ward" ], help="Clustering method to use" ) missing_names = parser.add_mutually_exclusive_group() missing_names.add_argument( "--nc", "--no-colnames", action="store_true", help="Indicate that the distance matrix input does not feature column names" ) missing_names.add_argument( "--nr", "--no-rownames", action="store_true", help="Indicate that the distance matrix input does not feature row names" ) cut_mode = parser.add_mutually_exclusive_group() cut_mode.add_argument( "-n", "--n-clusters", nargs="*", type=int ) cut_mode.add_argument( "--height", nargs="*", type=float ) parser.add_argument("-s", "--min-cluster-size", type=int, default=2) args = parser.parse_args() # read from input and check that # we have been passed a symmetric distance matrix with open(args.infile) as i: col_count = None row_count = 0 matrix = [] if args.nc: col_names = col_count = None else: while True: # skip leading empty lines line = next(i).rstrip("\n\r") if line: break if args.nr: col_names = line.split("\t") else: # first column is for row names, rest are column names col_names = line.split("\t")[1:] col_count = len(col_names) if not col_count: sys.exit( 'No data columns found. ' 'By default, this tool expects tabular input with column names on the first line ' 'and a row name in the first column of each row followed by data columns. ' 'Use --no-colnames or --no-rownames to modify the expected format.' ) for line in i: if not line.strip(): # skip empty lines continue row_count += 1 if col_count is not None and row_count > col_count: sys.exit( 'This tool expects a symmetric distance matrix with an equal number of rows and columns, ' 'but got more rows than columns.' ) if args.nr: row_name = None row_data = line.rstrip("\n\r").split("\t") else: row_name, *row_data = line.rstrip("\n\r").split("\t") if col_count is None: col_count = len(row_data) col_names = [None] * col_count col_name = col_names[row_count - 1] if not row_name and col_name: # tolerate omitted row names, use col name instead row_name = col_name elif row_name and not col_name: # likewise for column names # plus update list of col names with row name col_name = col_names[row_count - 1] = row_name elif not row_name and not col_name: sys.exit( 'Each sample in the distance matrix must have its name specified via a row name, a column name, or both, ' f'but found no name for sample number {row_count}' ) if row_name != col_name: sys.exit( 'This tool expects a symmetric distance matrix with identical names for rows and columns, ' f'but got "{col_name}" in column {row_count} and "{row_name}" on row {row_count}.' ) if len(row_data) != col_count: sys.exit( 'This tool expects a symmetric distance matrix with the same number of columns on each row, ' f'but row {row_count} ("{row_name}") has {len(row_data)} columns instead of {col_count}.' ) try: matrix.append([float(x) for x in row_data]) except ValueError as e: if args.nr: sys.exit(str(e) + f' on row {row_count}') else: sys.exit(str(e) + f' on row {row_count} ("{row_name}")') if row_count < col_count: sys.exit( 'This tool expects a symmetric distance matrix with an equal number of rows and columns, ' 'but got more columns than rows.' ) # turn the distance matrix into "condensed" vector form # this gives us further checks and raises ValueErrors if: # - the values on the diagonal aren't zero # - the upper and lower triangle of the matrix aren't identical D = scipy.spatial.distance.squareform(matrix) # perform the requested clustering and retrieve the result as a linkage object linkage = scipy.cluster.hierarchy.linkage(D, args.method) with open(args.out_prefix + '.tree.newick', 'w') as o: o.write(linkage_as_newick(linkage, col_names)) # cut the tree as specified and report sample to cluster assignments if args.n_clusters or args.height: if args.n_clusters: cut_values = args.n_clusters colname_template = "cluster_id_n{}" else: cut_values = args.height colname_template = "cluster_id_h{}" header_cols = ["sample"] + [ colname_template.format(x) for x in cut_values ] cut_result = scipy.cluster.hierarchy.cut_tree( linkage, args.n_clusters, args.height ) # Go through the cut results once to determine cluster sizes # In the final report, the ids of clusters with fewer members than # args.min_cluster_size will be masked with "-". # The remaining cluster ids will be renumbered to start fom 1. # This has to be done for each clustering resulting from the # user-specified cut_values. cluster_member_counts = [Counter() for _ in cut_values] effective_cluster_ids = [{} for _ in cut_values] for cluster_ids in cut_result: for cl_count, cl_id, eff_id in zip(cluster_member_counts, cluster_ids, effective_cluster_ids): cl_count[cl_id] += 1 for counter, eff_ids in zip(cluster_member_counts, effective_cluster_ids): eff_id = 1 for item, count in counter.items(): # Since Python 3.7, Counter objects (like dicts) preserve # insertion order so we can be sure that in the mapping # constructed below, clusters will get renumbered in # the order they will be reported later. if count >= args.min_cluster_size: eff_ids[item] = str(eff_id) eff_id += 1 else: eff_ids[item] = "-" # build and write the cluster assignment report # with remapped cluster ids cluster_assignments = [] for name, cluster_ids in zip(col_names, cut_result): cluster_assignments.append( [name] + [ eff_ids[c] for c, eff_ids in zip(cluster_ids, effective_cluster_ids) ] ) with open(args.out_prefix + '.cluster_assignments.tsv', 'w') as o: print("\t".join(header_cols), file=o) for ass in cluster_assignments: print("\t".join(ass), file=o)