diff ipfp_normalisation.py @ 0:8b5e4ea144a5 draft default tip

planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/ipfp_normalisation commit 1facbf5b9d74f0f7cd1f9346acb405a2e327c639
author iuc
date Tue, 04 Feb 2025 09:11:16 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/ipfp_normalisation.py	Tue Feb 04 09:11:16 2025 +0000
@@ -0,0 +1,93 @@
+#!/usr/bin/env python
+"""
+IPFP Normalisation
+"""
+import argparse
+import sys
+
+import numpy as np
+
+
+def throw_error(msg, exit_code=1):
+    sys.stderr.write(msg)
+    sys.exit(exit_code)
+
+
+def ipfp(data, precision=1e-5, maxIterations=50):
+    """
+    Return the normalized version of the input data (matrix) as an ndarray
+    :param data:				np.ndArray
+    :param precision:			float		combined allowed deviation (residual error) of col and row means from TARGET (=1)
+    :param maxIterations:		int			maximum amount of iterations (1x row and 1x col per iteration)
+    :return normalizedData:		np.ndArray	normalized data
+    """
+    try:
+        assert isinstance(data, np.ndarray) and data.dtype in ['float64', 'int64']
+        assert precision > 0
+        assert isinstance(maxIterations, int) and maxIterations > 0
+    except AssertionError:
+        throw_error("Invalid input parameters. Please check that the input data consists of floats or integers, precision > 0 and maxIterations is a positive integer.")
+    # replace zeros with nan
+    if (data < 0).any():
+        throw_error("Negative values detected, only use positive values.")
+
+    zeros = (data == 0)
+    if zeros.any():
+        print("Zero values detected; replacing with NA.")
+        data = data.astype(float)
+        data[zeros] = np.nan
+
+    # initialize variables
+    Nrows, Ncols = data.shape
+    convergenceTrail = np.asarray([np.nan] * (2 * maxIterations))
+    convergence = np.inf
+    normalized_data = data
+    TARGET = 1
+
+    i = 0  # number of current iteration
+    # without reshaping the ndarrays, they have shape (x,) (no second value) and the procedure fails.
+    # main loop; iterates until convergence is reached (i.e., L1-norm below variable <h>) or the maximum number of
+    # iteration cycles is surpassed.
+    while convergence > precision and i < maxIterations:
+        # fit the rows
+        Ri = TARGET * np.asarray(1 / np.nanmean(normalized_data, 1)).reshape(Nrows,)
+        normalized_data = (normalized_data.T * Ri).T
+
+        # calculate deviation from column marginals; row deviation is zero at even indices. (index start = 0)
+        convergenceTrail[2 * i] = Nrows * 0.5 * np.nansum(np.abs(np.nanmean(normalized_data, 0) - TARGET))
+
+        # fit the columns
+        Si = TARGET * np.asarray(1 / np.nanmean(normalized_data, 0)).reshape(Ncols,)
+        normalized_data *= Si
+        # calculate deviation from row marginals; column deviation is zero at odd indices. (index start = 0)
+        convergenceTrail[2 * i + 1] = Ncols * 0.5 * np.nansum(np.abs(np.nanmean(normalized_data, 1) - TARGET))
+
+        convergence = convergenceTrail[2 * i + 1]
+        i += 1
+
+    if i == maxIterations:
+        throw_error(f"Max number of IPFP iterations ({maxIterations}) reached. Attained precision: {convergence}.")
+
+    return normalized_data
+
+
+def main():
+    parser = argparse.ArgumentParser(description="IPFP Normalisation")
+    parser.add_argument('-i', '--input', help="Input file", required=True, metavar="FILE")
+    parser.add_argument('-p', '--precision', help="Precision", default=1e-5, type=float)
+    parser.add_argument('-m', '--maxIterations', help="Max iterations", default=50, type=int)
+    parser.add_argument('-s', '--skipHeaders', help="Skip headers, skips the first n lines", default=0, type=int)
+
+    args = parser.parse_args()
+
+    try:
+        data = np.genfromtxt(args.input, skip_header=args.skipHeaders, filling_values=np.nan, delimiter='\t')
+        normalized_data = ipfp(data, args.precision, args.maxIterations)
+        np.savetxt("output.tsv", normalized_data, delimiter='\t')
+
+    except Exception as e:
+        throw_error(str(e))
+
+
+if __name__ == "__main__":
+    main()