Mercurial > repos > iuc > ipfp_normalisation
comparison ipfp_normalisation.py @ 0:8b5e4ea144a5 draft default tip
planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/ipfp_normalisation commit 1facbf5b9d74f0f7cd1f9346acb405a2e327c639
| author | iuc |
|---|---|
| date | Tue, 04 Feb 2025 09:11:16 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:8b5e4ea144a5 |
|---|---|
| 1 #!/usr/bin/env python | |
| 2 """ | |
| 3 IPFP Normalisation | |
| 4 """ | |
| 5 import argparse | |
| 6 import sys | |
| 7 | |
| 8 import numpy as np | |
| 9 | |
| 10 | |
| 11 def throw_error(msg, exit_code=1): | |
| 12 sys.stderr.write(msg) | |
| 13 sys.exit(exit_code) | |
| 14 | |
| 15 | |
| 16 def ipfp(data, precision=1e-5, maxIterations=50): | |
| 17 """ | |
| 18 Return the normalized version of the input data (matrix) as an ndarray | |
| 19 :param data: np.ndArray | |
| 20 :param precision: float combined allowed deviation (residual error) of col and row means from TARGET (=1) | |
| 21 :param maxIterations: int maximum amount of iterations (1x row and 1x col per iteration) | |
| 22 :return normalizedData: np.ndArray normalized data | |
| 23 """ | |
| 24 try: | |
| 25 assert isinstance(data, np.ndarray) and data.dtype in ['float64', 'int64'] | |
| 26 assert precision > 0 | |
| 27 assert isinstance(maxIterations, int) and maxIterations > 0 | |
| 28 except AssertionError: | |
| 29 throw_error("Invalid input parameters. Please check that the input data consists of floats or integers, precision > 0 and maxIterations is a positive integer.") | |
| 30 # replace zeros with nan | |
| 31 if (data < 0).any(): | |
| 32 throw_error("Negative values detected, only use positive values.") | |
| 33 | |
| 34 zeros = (data == 0) | |
| 35 if zeros.any(): | |
| 36 print("Zero values detected; replacing with NA.") | |
| 37 data = data.astype(float) | |
| 38 data[zeros] = np.nan | |
| 39 | |
| 40 # initialize variables | |
| 41 Nrows, Ncols = data.shape | |
| 42 convergenceTrail = np.asarray([np.nan] * (2 * maxIterations)) | |
| 43 convergence = np.inf | |
| 44 normalized_data = data | |
| 45 TARGET = 1 | |
| 46 | |
| 47 i = 0 # number of current iteration | |
| 48 # without reshaping the ndarrays, they have shape (x,) (no second value) and the procedure fails. | |
| 49 # main loop; iterates until convergence is reached (i.e., L1-norm below variable <h>) or the maximum number of | |
| 50 # iteration cycles is surpassed. | |
| 51 while convergence > precision and i < maxIterations: | |
| 52 # fit the rows | |
| 53 Ri = TARGET * np.asarray(1 / np.nanmean(normalized_data, 1)).reshape(Nrows,) | |
| 54 normalized_data = (normalized_data.T * Ri).T | |
| 55 | |
| 56 # calculate deviation from column marginals; row deviation is zero at even indices. (index start = 0) | |
| 57 convergenceTrail[2 * i] = Nrows * 0.5 * np.nansum(np.abs(np.nanmean(normalized_data, 0) - TARGET)) | |
| 58 | |
| 59 # fit the columns | |
| 60 Si = TARGET * np.asarray(1 / np.nanmean(normalized_data, 0)).reshape(Ncols,) | |
| 61 normalized_data *= Si | |
| 62 # calculate deviation from row marginals; column deviation is zero at odd indices. (index start = 0) | |
| 63 convergenceTrail[2 * i + 1] = Ncols * 0.5 * np.nansum(np.abs(np.nanmean(normalized_data, 1) - TARGET)) | |
| 64 | |
| 65 convergence = convergenceTrail[2 * i + 1] | |
| 66 i += 1 | |
| 67 | |
| 68 if i == maxIterations: | |
| 69 throw_error(f"Max number of IPFP iterations ({maxIterations}) reached. Attained precision: {convergence}.") | |
| 70 | |
| 71 return normalized_data | |
| 72 | |
| 73 | |
| 74 def main(): | |
| 75 parser = argparse.ArgumentParser(description="IPFP Normalisation") | |
| 76 parser.add_argument('-i', '--input', help="Input file", required=True, metavar="FILE") | |
| 77 parser.add_argument('-p', '--precision', help="Precision", default=1e-5, type=float) | |
| 78 parser.add_argument('-m', '--maxIterations', help="Max iterations", default=50, type=int) | |
| 79 parser.add_argument('-s', '--skipHeaders', help="Skip headers, skips the first n lines", default=0, type=int) | |
| 80 | |
| 81 args = parser.parse_args() | |
| 82 | |
| 83 try: | |
| 84 data = np.genfromtxt(args.input, skip_header=args.skipHeaders, filling_values=np.nan, delimiter='\t') | |
| 85 normalized_data = ipfp(data, args.precision, args.maxIterations) | |
| 86 np.savetxt("output.tsv", normalized_data, delimiter='\t') | |
| 87 | |
| 88 except Exception as e: | |
| 89 throw_error(str(e)) | |
| 90 | |
| 91 | |
| 92 if __name__ == "__main__": | |
| 93 main() |
