Mercurial > repos > iuc > ipfp_normalisation
comparison ipfp_normalisation.py @ 0:8b5e4ea144a5 draft default tip
planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/ipfp_normalisation commit 1facbf5b9d74f0f7cd1f9346acb405a2e327c639
author | iuc |
---|---|
date | Tue, 04 Feb 2025 09:11:16 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:8b5e4ea144a5 |
---|---|
1 #!/usr/bin/env python | |
2 """ | |
3 IPFP Normalisation | |
4 """ | |
5 import argparse | |
6 import sys | |
7 | |
8 import numpy as np | |
9 | |
10 | |
11 def throw_error(msg, exit_code=1): | |
12 sys.stderr.write(msg) | |
13 sys.exit(exit_code) | |
14 | |
15 | |
16 def ipfp(data, precision=1e-5, maxIterations=50): | |
17 """ | |
18 Return the normalized version of the input data (matrix) as an ndarray | |
19 :param data: np.ndArray | |
20 :param precision: float combined allowed deviation (residual error) of col and row means from TARGET (=1) | |
21 :param maxIterations: int maximum amount of iterations (1x row and 1x col per iteration) | |
22 :return normalizedData: np.ndArray normalized data | |
23 """ | |
24 try: | |
25 assert isinstance(data, np.ndarray) and data.dtype in ['float64', 'int64'] | |
26 assert precision > 0 | |
27 assert isinstance(maxIterations, int) and maxIterations > 0 | |
28 except AssertionError: | |
29 throw_error("Invalid input parameters. Please check that the input data consists of floats or integers, precision > 0 and maxIterations is a positive integer.") | |
30 # replace zeros with nan | |
31 if (data < 0).any(): | |
32 throw_error("Negative values detected, only use positive values.") | |
33 | |
34 zeros = (data == 0) | |
35 if zeros.any(): | |
36 print("Zero values detected; replacing with NA.") | |
37 data = data.astype(float) | |
38 data[zeros] = np.nan | |
39 | |
40 # initialize variables | |
41 Nrows, Ncols = data.shape | |
42 convergenceTrail = np.asarray([np.nan] * (2 * maxIterations)) | |
43 convergence = np.inf | |
44 normalized_data = data | |
45 TARGET = 1 | |
46 | |
47 i = 0 # number of current iteration | |
48 # without reshaping the ndarrays, they have shape (x,) (no second value) and the procedure fails. | |
49 # main loop; iterates until convergence is reached (i.e., L1-norm below variable <h>) or the maximum number of | |
50 # iteration cycles is surpassed. | |
51 while convergence > precision and i < maxIterations: | |
52 # fit the rows | |
53 Ri = TARGET * np.asarray(1 / np.nanmean(normalized_data, 1)).reshape(Nrows,) | |
54 normalized_data = (normalized_data.T * Ri).T | |
55 | |
56 # calculate deviation from column marginals; row deviation is zero at even indices. (index start = 0) | |
57 convergenceTrail[2 * i] = Nrows * 0.5 * np.nansum(np.abs(np.nanmean(normalized_data, 0) - TARGET)) | |
58 | |
59 # fit the columns | |
60 Si = TARGET * np.asarray(1 / np.nanmean(normalized_data, 0)).reshape(Ncols,) | |
61 normalized_data *= Si | |
62 # calculate deviation from row marginals; column deviation is zero at odd indices. (index start = 0) | |
63 convergenceTrail[2 * i + 1] = Ncols * 0.5 * np.nansum(np.abs(np.nanmean(normalized_data, 1) - TARGET)) | |
64 | |
65 convergence = convergenceTrail[2 * i + 1] | |
66 i += 1 | |
67 | |
68 if i == maxIterations: | |
69 throw_error(f"Max number of IPFP iterations ({maxIterations}) reached. Attained precision: {convergence}.") | |
70 | |
71 return normalized_data | |
72 | |
73 | |
74 def main(): | |
75 parser = argparse.ArgumentParser(description="IPFP Normalisation") | |
76 parser.add_argument('-i', '--input', help="Input file", required=True, metavar="FILE") | |
77 parser.add_argument('-p', '--precision', help="Precision", default=1e-5, type=float) | |
78 parser.add_argument('-m', '--maxIterations', help="Max iterations", default=50, type=int) | |
79 parser.add_argument('-s', '--skipHeaders', help="Skip headers, skips the first n lines", default=0, type=int) | |
80 | |
81 args = parser.parse_args() | |
82 | |
83 try: | |
84 data = np.genfromtxt(args.input, skip_header=args.skipHeaders, filling_values=np.nan, delimiter='\t') | |
85 normalized_data = ipfp(data, args.precision, args.maxIterations) | |
86 np.savetxt("output.tsv", normalized_data, delimiter='\t') | |
87 | |
88 except Exception as e: | |
89 throw_error(str(e)) | |
90 | |
91 | |
92 if __name__ == "__main__": | |
93 main() |