Mercurial > repos > iuc > ipfp_normalisation
view ipfp_normalisation.py @ 0:8b5e4ea144a5 draft default tip
planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/ipfp_normalisation commit 1facbf5b9d74f0f7cd1f9346acb405a2e327c639
author | iuc |
---|---|
date | Tue, 04 Feb 2025 09:11:16 +0000 |
parents | |
children |
line wrap: on
line source
#!/usr/bin/env python """ IPFP Normalisation """ import argparse import sys import numpy as np def throw_error(msg, exit_code=1): sys.stderr.write(msg) sys.exit(exit_code) def ipfp(data, precision=1e-5, maxIterations=50): """ Return the normalized version of the input data (matrix) as an ndarray :param data: np.ndArray :param precision: float combined allowed deviation (residual error) of col and row means from TARGET (=1) :param maxIterations: int maximum amount of iterations (1x row and 1x col per iteration) :return normalizedData: np.ndArray normalized data """ try: assert isinstance(data, np.ndarray) and data.dtype in ['float64', 'int64'] assert precision > 0 assert isinstance(maxIterations, int) and maxIterations > 0 except AssertionError: throw_error("Invalid input parameters. Please check that the input data consists of floats or integers, precision > 0 and maxIterations is a positive integer.") # replace zeros with nan if (data < 0).any(): throw_error("Negative values detected, only use positive values.") zeros = (data == 0) if zeros.any(): print("Zero values detected; replacing with NA.") data = data.astype(float) data[zeros] = np.nan # initialize variables Nrows, Ncols = data.shape convergenceTrail = np.asarray([np.nan] * (2 * maxIterations)) convergence = np.inf normalized_data = data TARGET = 1 i = 0 # number of current iteration # without reshaping the ndarrays, they have shape (x,) (no second value) and the procedure fails. # main loop; iterates until convergence is reached (i.e., L1-norm below variable <h>) or the maximum number of # iteration cycles is surpassed. while convergence > precision and i < maxIterations: # fit the rows Ri = TARGET * np.asarray(1 / np.nanmean(normalized_data, 1)).reshape(Nrows,) normalized_data = (normalized_data.T * Ri).T # calculate deviation from column marginals; row deviation is zero at even indices. (index start = 0) convergenceTrail[2 * i] = Nrows * 0.5 * np.nansum(np.abs(np.nanmean(normalized_data, 0) - TARGET)) # fit the columns Si = TARGET * np.asarray(1 / np.nanmean(normalized_data, 0)).reshape(Ncols,) normalized_data *= Si # calculate deviation from row marginals; column deviation is zero at odd indices. (index start = 0) convergenceTrail[2 * i + 1] = Ncols * 0.5 * np.nansum(np.abs(np.nanmean(normalized_data, 1) - TARGET)) convergence = convergenceTrail[2 * i + 1] i += 1 if i == maxIterations: throw_error(f"Max number of IPFP iterations ({maxIterations}) reached. Attained precision: {convergence}.") return normalized_data def main(): parser = argparse.ArgumentParser(description="IPFP Normalisation") parser.add_argument('-i', '--input', help="Input file", required=True, metavar="FILE") parser.add_argument('-p', '--precision', help="Precision", default=1e-5, type=float) parser.add_argument('-m', '--maxIterations', help="Max iterations", default=50, type=int) parser.add_argument('-s', '--skipHeaders', help="Skip headers, skips the first n lines", default=0, type=int) args = parser.parse_args() try: data = np.genfromtxt(args.input, skip_header=args.skipHeaders, filling_values=np.nan, delimiter='\t') normalized_data = ipfp(data, args.precision, args.maxIterations) np.savetxt("output.tsv", normalized_data, delimiter='\t') except Exception as e: throw_error(str(e)) if __name__ == "__main__": main()