Mercurial > repos > ebi-gxa > decoupler_pathway_inference
changeset 11:db14ac3f6b43 draft default tip
planemo upload for repository https://github.com/ebi-gene-expression-group/container-galaxy-sc-tertiary/ commit 487508282bda9dbb68138d5c7091f46ef54fe52a
author | ebi-gxa |
---|---|
date | Wed, 19 Feb 2025 16:55:51 +0000 |
parents | 97c2c52a7ab4 |
children | |
files | decoupler_pseudobulk.py |
diffstat | 1 files changed, 75 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- a/decoupler_pseudobulk.py Fri Nov 29 11:34:09 2024 +0000 +++ b/decoupler_pseudobulk.py Wed Feb 19 16:55:51 2025 +0000 @@ -2,6 +2,7 @@ import anndata import decoupler +import numpy as np import pandas as pd @@ -34,6 +35,60 @@ ) +def create_pseudo_replicates(adata, sample_key, num_replicates, seed=None): + """ + Create pseudo replicates for each sample in the sample_key groups. + + Parameters + ---------- + adata : anndata.AnnData + The AnnData object. + sample_key : str + The column in adata.obs that defines the samples. + num_replicates : int + Number of pseudo replicates to create per sample. + + Returns + ------- + anndata.AnnData + The AnnData object with pseudo replicates. + + Examples + -------- + >>> import anndata + >>> import pandas as pd + >>> import numpy as np + >>> data = { + ... 'obs': pd.DataFrame({'sample': ['A', 'A', 'B', 'B']}), + ... 'X': np.array([[1, 0], [0, 1], [1, 1], [0, 0]]) + ... } + >>> adata = anndata.AnnData(X=data['X'], obs=data['obs']) + >>> adata = create_pseudo_replicates(adata, 'sample', 2) + >>> adata.obs['sample_pseudo'].tolist() + ['A_rep1', 'A_rep2', 'B_rep1', 'B_rep2'] + """ + if seed is not None: + np.random.seed(seed) + + new_sample_key = f"{sample_key}_pseudo" + adata.obs[new_sample_key] = adata.obs[sample_key].astype(str) + + for sample in adata.obs[sample_key].unique(): + sample_indices = adata.obs[ + adata.obs[sample_key] == sample].index.to_numpy() + np.random.shuffle(sample_indices) # Shuffle the indices to randomize + replicate_size = int(len(sample_indices) / num_replicates) + for i in range(num_replicates): + start_idx = i * replicate_size + end_idx = start_idx + replicate_size + replicate_indices = sample_indices[start_idx:end_idx] + adata.obs.loc[replicate_indices, new_sample_key] = ( + adata.obs.loc[replicate_indices, new_sample_key] + f"_rep{i+1}" + ) + + return adata + + def prepend_c_to_index(index_value): if index_value and index_value[0].isdigit(): return "C" + index_value @@ -307,6 +362,13 @@ factor_fields = args.factor_fields.split(",") check_fields(factor_fields, adata) + # Create pseudo replicates if specified + if args.num_pseudo_replicates: + adata = create_pseudo_replicates( + adata, args.sample_key, args.num_pseudo_replicates, seed=args.seed + ) + args.sample_key = f"{args.sample_key}_pseudo" + print(f"Using mode: {args.mode}") # Perform pseudobulk analysis pseudobulk_data = get_pseudobulk( @@ -664,6 +726,19 @@ help="Minimum total count threshold for filtering by expression", ) parser.add_argument( + "--num_pseudo_replicates", + type=int, + choices=range(3, 1000), + help="Number of pseudo replicates to create per sample (at least 3)", + required=False + ) + parser.add_argument( + "--seed", + type=int, + default=None, + help="Random seed for pseudo replicate sampling", + ) + parser.add_argument( "--anndata_output_path", type=str, help="Path to save the filtered AnnData object or pseudobulk data",