Mercurial > repos > bgruening > sucos_clustering
diff sucos_cluster.py @ 0:f80cfac80c53 draft
"planemo upload for repository https://github.com/bgruening/galaxytools/tree/master/chemicaltoolbox/sucos commit ef86cfa5f7ab5043de420511211579d03df58645"
author | bgruening |
---|---|
date | Wed, 02 Oct 2019 12:58:19 -0400 |
parents | |
children | b8725fec8c7b |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sucos_cluster.py Wed Oct 02 12:58:19 2019 -0400 @@ -0,0 +1,134 @@ +#!/usr/bin/env python +""" +Cluster a set of molecules based on their 3D overlays as determined by the SuCOS score. + +This will generate a set of SD files, one for each cluster of molecules (presumably corresponding to a +binding pocket in the protein target). + + +SuCOS is the work of Susan Leung. +GitHub: https://github.com/susanhleung/SuCOS +Publication: https://doi.org/10.26434/chemrxiv.8100203.v1 +""" + +import sucos, utils +import argparse, gzip +from rdkit import Chem +import numpy as np +import pandas as pd +from scipy.cluster.hierarchy import linkage, fcluster + +### start main execution ######################################### + + +def calc_distance_matrix(mols): + """ + Calculate a full distance matrix for the given molecules. Identical molecules get a score of 0.0 with the maximum + distance possible being 1.0. + :param mols: A list of molecules. It must be possible to iterate through this list multiple times + :return: A NxN 2D array of distance scores, with N being the number of molecules in the input + """ + + # TODO - do we need to calculate both sides of the matrix? Tanimoto is supposed to be a symmetric distance measure, + # but the matrix that is generated does not seem to be symmetric. + + mol_fm_tuples = [] + for mol in mols: + features = sucos.getRawFeatures(mol) + mol_fm_tuples.append((mol, features)) + + matrix = [] + for tuple1 in mol_fm_tuples: + tmp = [] + for tuple2 in mol_fm_tuples: + if tuple1[0] == tuple2[0]: + tmp.append(0.0) + else: + #utils.log("Calculating SuCOS between", mol1, mol2) + sucos_score, fm_score, tani_score = sucos.get_SucosScore(tuple1[0], tuple2[0], + tani=True, ref_features=tuple1[1], query_features=tuple2[1]) + tmp.append(1.0 - sucos_score) + matrix.append(tmp) + + + return matrix + + +def cluster(matrix, threshold=0.8): + """ + Cluster the supplied distance matrix returning an array of clusters. + :param matrix: the distance matrix, as calculated with the calc_distance_matrix function. + :param threshold: The clustering cuttoff. The default of 0.8 is a reasonable value to use. + :return: An array of clusters, each cluster being an array of the indices from the matrix. + """ + + indexes = [x for x in range(0, len(matrix))] + cols = [x for x in range(0, len(matrix[0]))] + #utils.log("indexes", indexes) + #utils.log("cols", cols) + df = pd.DataFrame(matrix, columns=cols, index=indexes) + utils.log("DataFrame:", df.shape) + #utils.log(df) + indices = np.triu_indices(df.shape[0], k=1) + #utils.log("Indices:", indices) + t = np.array(df)[indices] + Z = linkage(t, 'average') + lig_clusters = [] + cluster_arr = fcluster(Z, t=threshold, criterion='distance') + for i in range(np.amax(cluster_arr)): + clus = df.columns[np.argwhere(cluster_arr==i+1)] + lig_clusters.append([x[0] for x in clus.tolist()]) + + utils.log("Clusters", lig_clusters) + return lig_clusters + +def write_clusters_to_sdfs(mols, clusters, basename, gzip=False): + """ + Write the molecules to SDF files, 1 file for each cluster. + :param mols The molecules to write: + :param clusters The clusters, as returned by the cluster function: + :param basename The basename for the file name. e.g. if basename is 'output' then files like + output1.sdf, output2.sdf will be written: + :param gzip Whether to gzip the output + :return: + """ + + i = 0 + for cluster in clusters: + i += 1 + filename = basename + str(i) + ".sdf" + if gzip: + filename += ".gz" + utils.log("Writing ", len(cluster), "molecules in cluster", i, "to file", filename) + output_file = utils.open_file_for_writing(filename) + writer = Chem.SDWriter(output_file) + for index in cluster: + mol = mols[index] + writer.write(mol) + writer.flush() + writer.close() + output_file.close() + + + +def main(): + parser = argparse.ArgumentParser(description='Clustering with SuCOS and RDKit') + parser.add_argument('-i', '--input', help='Input file in SDF format. Can be gzipped (*.gz).') + parser.add_argument('-o', '--output', default="cluster", help="Base name for output files in SDF format. " + + "e.g. if value is 'output' then files like output1.sdf, output2.sdf will be created") + parser.add_argument('--gzip', action='store_true', help='Gzip the outputs generating files like output1.sdf.gz, output2.sdf.gz') + parser.add_argument('-t', '--threshold', type=float, default=0.8, help='Clustering threshold') + + args = parser.parse_args() + utils.log("SuCOS Cluster Args: ", args) + + input_file = utils.open_file_for_reading(args.input) + suppl = Chem.ForwardSDMolSupplier(input_file) + mols = list(suppl) + matrix = calc_distance_matrix(mols) + clusters = cluster(matrix, threshold=args.threshold) + write_clusters_to_sdfs(mols, clusters, args.output, gzip=args.gzip) + + +if __name__ == "__main__": + main() \ No newline at end of file