comparison ipfp_normalisation.py @ 0:8b5e4ea144a5 draft default tip

planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/ipfp_normalisation commit 1facbf5b9d74f0f7cd1f9346acb405a2e327c639
author iuc
date Tue, 04 Feb 2025 09:11:16 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:8b5e4ea144a5
1 #!/usr/bin/env python
2 """
3 IPFP Normalisation
4 """
5 import argparse
6 import sys
7
8 import numpy as np
9
10
11 def throw_error(msg, exit_code=1):
12 sys.stderr.write(msg)
13 sys.exit(exit_code)
14
15
16 def ipfp(data, precision=1e-5, maxIterations=50):
17 """
18 Return the normalized version of the input data (matrix) as an ndarray
19 :param data: np.ndArray
20 :param precision: float combined allowed deviation (residual error) of col and row means from TARGET (=1)
21 :param maxIterations: int maximum amount of iterations (1x row and 1x col per iteration)
22 :return normalizedData: np.ndArray normalized data
23 """
24 try:
25 assert isinstance(data, np.ndarray) and data.dtype in ['float64', 'int64']
26 assert precision > 0
27 assert isinstance(maxIterations, int) and maxIterations > 0
28 except AssertionError:
29 throw_error("Invalid input parameters. Please check that the input data consists of floats or integers, precision > 0 and maxIterations is a positive integer.")
30 # replace zeros with nan
31 if (data < 0).any():
32 throw_error("Negative values detected, only use positive values.")
33
34 zeros = (data == 0)
35 if zeros.any():
36 print("Zero values detected; replacing with NA.")
37 data = data.astype(float)
38 data[zeros] = np.nan
39
40 # initialize variables
41 Nrows, Ncols = data.shape
42 convergenceTrail = np.asarray([np.nan] * (2 * maxIterations))
43 convergence = np.inf
44 normalized_data = data
45 TARGET = 1
46
47 i = 0 # number of current iteration
48 # without reshaping the ndarrays, they have shape (x,) (no second value) and the procedure fails.
49 # main loop; iterates until convergence is reached (i.e., L1-norm below variable <h>) or the maximum number of
50 # iteration cycles is surpassed.
51 while convergence > precision and i < maxIterations:
52 # fit the rows
53 Ri = TARGET * np.asarray(1 / np.nanmean(normalized_data, 1)).reshape(Nrows,)
54 normalized_data = (normalized_data.T * Ri).T
55
56 # calculate deviation from column marginals; row deviation is zero at even indices. (index start = 0)
57 convergenceTrail[2 * i] = Nrows * 0.5 * np.nansum(np.abs(np.nanmean(normalized_data, 0) - TARGET))
58
59 # fit the columns
60 Si = TARGET * np.asarray(1 / np.nanmean(normalized_data, 0)).reshape(Ncols,)
61 normalized_data *= Si
62 # calculate deviation from row marginals; column deviation is zero at odd indices. (index start = 0)
63 convergenceTrail[2 * i + 1] = Ncols * 0.5 * np.nansum(np.abs(np.nanmean(normalized_data, 1) - TARGET))
64
65 convergence = convergenceTrail[2 * i + 1]
66 i += 1
67
68 if i == maxIterations:
69 throw_error(f"Max number of IPFP iterations ({maxIterations}) reached. Attained precision: {convergence}.")
70
71 return normalized_data
72
73
74 def main():
75 parser = argparse.ArgumentParser(description="IPFP Normalisation")
76 parser.add_argument('-i', '--input', help="Input file", required=True, metavar="FILE")
77 parser.add_argument('-p', '--precision', help="Precision", default=1e-5, type=float)
78 parser.add_argument('-m', '--maxIterations', help="Max iterations", default=50, type=int)
79 parser.add_argument('-s', '--skipHeaders', help="Skip headers, skips the first n lines", default=0, type=int)
80
81 args = parser.parse_args()
82
83 try:
84 data = np.genfromtxt(args.input, skip_header=args.skipHeaders, filling_values=np.nan, delimiter='\t')
85 normalized_data = ipfp(data, args.precision, args.maxIterations)
86 np.savetxt("output.tsv", normalized_data, delimiter='\t')
87
88 except Exception as e:
89 throw_error(str(e))
90
91
92 if __name__ == "__main__":
93 main()