changeset 11:db14ac3f6b43 draft default tip

planemo upload for repository https://github.com/ebi-gene-expression-group/container-galaxy-sc-tertiary/ commit 487508282bda9dbb68138d5c7091f46ef54fe52a
author ebi-gxa
date Wed, 19 Feb 2025 16:55:51 +0000
parents 97c2c52a7ab4
children
files decoupler_pseudobulk.py
diffstat 1 files changed, 75 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- a/decoupler_pseudobulk.py	Fri Nov 29 11:34:09 2024 +0000
+++ b/decoupler_pseudobulk.py	Wed Feb 19 16:55:51 2025 +0000
@@ -2,6 +2,7 @@
 
 import anndata
 import decoupler
+import numpy as np
 import pandas as pd
 
 
@@ -34,6 +35,60 @@
     )
 
 
+def create_pseudo_replicates(adata, sample_key, num_replicates, seed=None):
+    """
+    Create pseudo replicates for each sample in the sample_key groups.
+
+    Parameters
+    ----------
+    adata : anndata.AnnData
+        The AnnData object.
+    sample_key : str
+        The column in adata.obs that defines the samples.
+    num_replicates : int
+        Number of pseudo replicates to create per sample.
+
+    Returns
+    -------
+    anndata.AnnData
+        The AnnData object with pseudo replicates.
+
+    Examples
+    --------
+    >>> import anndata
+    >>> import pandas as pd
+    >>> import numpy as np
+    >>> data = {
+    ...     'obs': pd.DataFrame({'sample': ['A', 'A', 'B', 'B']}),
+    ...     'X': np.array([[1, 0], [0, 1], [1, 1], [0, 0]])
+    ... }
+    >>> adata = anndata.AnnData(X=data['X'], obs=data['obs'])
+    >>> adata = create_pseudo_replicates(adata, 'sample', 2)
+    >>> adata.obs['sample_pseudo'].tolist()
+    ['A_rep1', 'A_rep2', 'B_rep1', 'B_rep2']
+    """
+    if seed is not None:
+        np.random.seed(seed)
+
+    new_sample_key = f"{sample_key}_pseudo"
+    adata.obs[new_sample_key] = adata.obs[sample_key].astype(str)
+
+    for sample in adata.obs[sample_key].unique():
+        sample_indices = adata.obs[
+            adata.obs[sample_key] == sample].index.to_numpy()
+        np.random.shuffle(sample_indices)  # Shuffle the indices to randomize
+        replicate_size = int(len(sample_indices) / num_replicates)
+        for i in range(num_replicates):
+            start_idx = i * replicate_size
+            end_idx = start_idx + replicate_size
+            replicate_indices = sample_indices[start_idx:end_idx]
+            adata.obs.loc[replicate_indices, new_sample_key] = (
+                adata.obs.loc[replicate_indices, new_sample_key] + f"_rep{i+1}"
+            )
+
+    return adata
+
+
 def prepend_c_to_index(index_value):
     if index_value and index_value[0].isdigit():
         return "C" + index_value
@@ -307,6 +362,13 @@
         factor_fields = args.factor_fields.split(",")
         check_fields(factor_fields, adata)
 
+    # Create pseudo replicates if specified
+    if args.num_pseudo_replicates:
+        adata = create_pseudo_replicates(
+            adata, args.sample_key, args.num_pseudo_replicates, seed=args.seed
+        )
+        args.sample_key = f"{args.sample_key}_pseudo"
+
     print(f"Using mode: {args.mode}")
     # Perform pseudobulk analysis
     pseudobulk_data = get_pseudobulk(
@@ -664,6 +726,19 @@
         help="Minimum total count threshold for filtering by expression",
     )
     parser.add_argument(
+        "--num_pseudo_replicates",
+        type=int,
+        choices=range(3, 1000),
+        help="Number of pseudo replicates to create per sample (at least 3)",
+        required=False
+    )
+    parser.add_argument(
+        "--seed",
+        type=int,
+        default=None,
+        help="Random seed for pseudo replicate sampling",
+    )
+    parser.add_argument(
         "--anndata_output_path",
         type=str,
         help="Path to save the filtered AnnData object or pseudobulk data",