Mercurial > repos > bimib > cobraxy
comparison COBRAxy/src/rps_generator.py @ 539:2fb97466e404 draft
Uploaded
| author | francesco_lapi |
|---|---|
| date | Sat, 25 Oct 2025 14:55:13 +0000 |
| parents | |
| children | fcdbc81feb45 |
comparison
equal
deleted
inserted
replaced
| 538:fd53d42348bd | 539:2fb97466e404 |
|---|---|
| 1 """ | |
| 2 Compute Reaction Propensity Scores (RPS) from metabolite abundances and reaction stoichiometry. | |
| 3 | |
| 4 Given a tabular dataset (metabolites x samples) and a reaction set, this script | |
| 5 maps metabolite names via synonyms, fills missing metabolites, and computes RPS | |
| 6 per reaction for each sample using a log-normalized formula. | |
| 7 """ | |
| 8 import math | |
| 9 import argparse | |
| 10 | |
| 11 import numpy as np | |
| 12 import pickle as pk | |
| 13 import pandas as pd | |
| 14 | |
| 15 from typing import Optional, List, Dict | |
| 16 | |
| 17 import utils.general_utils as utils | |
| 18 import utils.reaction_parsing as reactionUtils | |
| 19 | |
| 20 ########################## argparse ########################################## | |
| 21 ARGS :argparse.Namespace | |
| 22 def process_args(args:List[str] = None) -> argparse.Namespace: | |
| 23 """ | |
| 24 Processes command-line arguments. | |
| 25 | |
| 26 Args: | |
| 27 args (list): List of command-line arguments. | |
| 28 | |
| 29 Returns: | |
| 30 Namespace: An object containing parsed arguments. | |
| 31 """ | |
| 32 parser = argparse.ArgumentParser( | |
| 33 usage='%(prog)s [options]', | |
| 34 description='Process abundances and reactions to create RPS scores.' | |
| 35 ) | |
| 36 | |
| 37 parser.add_argument("-rl", "--model_upload", type = str, | |
| 38 help = "path to input file containing the reactions") | |
| 39 | |
| 40 parser.add_argument('-td', '--tool_dir', | |
| 41 type = str, | |
| 42 required = True, | |
| 43 help = 'your tool directory') | |
| 44 parser.add_argument('-ol', '--out_log', | |
| 45 help = "Output log") | |
| 46 parser.add_argument('-id', '--input', | |
| 47 type = str, | |
| 48 required = True, | |
| 49 help = 'input dataset') | |
| 50 parser.add_argument('-rp', '--rps_output', | |
| 51 type = str, | |
| 52 required = True, | |
| 53 help = 'rps output') | |
| 54 | |
| 55 args = parser.parse_args(args) | |
| 56 return args | |
| 57 | |
| 58 ############################ dataset name ##################################### | |
| 59 def name_dataset(name_data :str, count :int) -> str: | |
| 60 """ | |
| 61 Produces a unique name for a dataset based on what was provided by the user. The default name for any dataset is "Dataset", thus if the user didn't change it this function appends f"_{count}" to make it unique. | |
| 62 | |
| 63 Args: | |
| 64 name_data: Name associated with the dataset (from frontend input params). | |
| 65 count: Counter starting at 1 to make names unique when default. | |
| 66 | |
| 67 Returns: | |
| 68 str : the name made unique | |
| 69 """ | |
| 70 if str(name_data) == 'Dataset': | |
| 71 return str(name_data) + '_' + str(count) | |
| 72 else: | |
| 73 return str(name_data) | |
| 74 | |
| 75 ############################ get_abund_data #################################### | |
| 76 def get_abund_data(dataset: pd.DataFrame, cell_line_index:int) -> Optional[pd.Series]: | |
| 77 """ | |
| 78 Extracts abundance data and turns it into a series for a specific cell line from the dataset, which rows are | |
| 79 metabolites and columns are cell lines. | |
| 80 | |
| 81 Args: | |
| 82 dataset (pandas.DataFrame): The DataFrame containing abundance data for all cell lines and metabolites. | |
| 83 cell_line_index (int): The index of the cell line of interest in the dataset. | |
| 84 | |
| 85 Returns: | |
| 86 pd.Series or None: A series containing abundance values for the specified cell line. | |
| 87 The name of the series is the name of the cell line. | |
| 88 Returns None if the cell index is invalid. | |
| 89 """ | |
| 90 if cell_line_index < 0 or cell_line_index >= len(dataset.index): | |
| 91 print(f"Error: cell line index '{cell_line_index}' is not valid.") | |
| 92 return None | |
| 93 | |
| 94 cell_line_name = dataset.columns[cell_line_index] | |
| 95 abundances_series = dataset[cell_line_name][1:] | |
| 96 | |
| 97 return abundances_series | |
| 98 | |
| 99 ############################ clean_metabolite_name #################################### | |
| 100 def clean_metabolite_name(name :str) -> str: | |
| 101 """ | |
| 102 Removes some characters from a metabolite's name, provided as input, and makes it lowercase in order to simplify | |
| 103 the search of a match in the dictionary of synonyms. | |
| 104 | |
| 105 Args: | |
| 106 name : the metabolite's name, as given in the dataset. | |
| 107 | |
| 108 Returns: | |
| 109 str : a new string with the cleaned name. | |
| 110 """ | |
| 111 return "".join(ch for ch in name if ch not in ",;-_'([{ }])").lower() | |
| 112 | |
| 113 ############################ get_metabolite_id #################################### | |
| 114 def get_metabolite_id(name :str, syn_dict :Dict[str, List[str]]) -> str: | |
| 115 """ | |
| 116 Looks through a dictionary of synonyms to find a match for a given metabolite's name. | |
| 117 | |
| 118 Args: | |
| 119 name : the metabolite's name, as given in the dataset. | |
| 120 syn_dict : the dictionary of synonyms, using unique identifiers as keys and lists of clean synonyms as values. | |
| 121 | |
| 122 Returns: | |
| 123 str : the internal :str unique identifier of that metabolite, used in all other parts of the model in use. | |
| 124 An empty string is returned if a match isn't found. | |
| 125 """ | |
| 126 name = clean_metabolite_name(name) | |
| 127 for id, synonyms in syn_dict.items(): | |
| 128 if name in synonyms: | |
| 129 return id | |
| 130 | |
| 131 return "" | |
| 132 | |
| 133 ############################ check_missing_metab #################################### | |
| 134 def check_missing_metab(reactions: Dict[str, Dict[str, int]], dataset_by_rows: Dict[str, List[float]], cell_lines_amt :int) -> List[str]: | |
| 135 """ | |
| 136 Check for missing metabolites in the abundances dictionary compared to the reactions dictionary and update abundances accordingly. | |
| 137 | |
| 138 Parameters: | |
| 139 reactions (dict): A dictionary representing reactions where keys are reaction names and values are dictionaries containing metabolite names as keys and | |
| 140 stoichiometric coefficients as values. | |
| 141 dataset_by_rows (dict): A dictionary representing abundances where keys are metabolite names and values are their corresponding abundances for all cell lines. | |
| 142 cell_lines_amt : amount of cell lines, needed to add a new list of abundances for missing metabolites. | |
| 143 | |
| 144 Returns: | |
| 145 list[str] : list of metabolite names that were missing in the original abundances dictionary and thus their aboundances were set to 1. | |
| 146 | |
| 147 Side effects: | |
| 148 dataset_by_rows: mutated to include missing metabolites with default abundances. | |
| 149 """ | |
| 150 missing_list = [] | |
| 151 for reaction in reactions.values(): | |
| 152 for metabolite in reaction.keys(): | |
| 153 if metabolite not in dataset_by_rows: | |
| 154 dataset_by_rows[metabolite] = [1] * cell_lines_amt | |
| 155 missing_list.append(metabolite) | |
| 156 | |
| 157 return missing_list | |
| 158 | |
| 159 ############################ calculate_rps #################################### | |
| 160 def calculate_rps(reactions: Dict[str, Dict[str, int]], abundances: Dict[str, float], black_list: List[str], missing_list: List[str], substrateFreqTable: Dict[str, int]) -> Dict[str, float]: | |
| 161 """ | |
| 162 Calculate the Reaction Propensity scores (RPS) based on the availability of reaction substrates, for (ideally) each input model reaction and for each sample. | |
| 163 The score is computed as the product of the concentrations of the reacting substances, with each concentration raised to a power equal to its stoichiometric coefficient | |
| 164 for each reaction using the provided coefficient and abundance values. The value is then normalized, based on how frequent the metabolite is in the selected model's reactions, | |
| 165 and log-transformed. | |
| 166 | |
| 167 Parameters: | |
| 168 reactions (dict): A dictionary representing reactions where keys are reaction names and values are dictionaries containing metabolite names as keys and stoichiometric coefficients as values. | |
| 169 abundances (dict): A dictionary representing metabolite abundances where keys are metabolite names and values are their corresponding abundances. | |
| 170 black_list (list): A list containing metabolite names that should be excluded from the RPS calculation. | |
| 171 missing_list (list): A list containing metabolite names that were missing in the original abundances dictionary and thus their values were set to 1. | |
| 172 substrateFreqTable (dict): A dictionary where each metabolite name (key) is associated with how many times it shows up in the model's reactions (value). | |
| 173 | |
| 174 Returns: | |
| 175 dict: A dictionary containing Reaction Propensity Scores (RPS) where keys are reaction names and values are the corresponding RPS scores. | |
| 176 """ | |
| 177 rps_scores = {} | |
| 178 | |
| 179 for reaction_name, substrates in reactions.items(): | |
| 180 total_contribution = 0 | |
| 181 metab_significant = False | |
| 182 for metabolite, stoichiometry in substrates.items(): | |
| 183 abundance = 1 if math.isnan(abundances[metabolite]) else abundances[metabolite] | |
| 184 if metabolite not in black_list and metabolite not in missing_list: | |
| 185 metab_significant = True | |
| 186 | |
| 187 total_contribution += math.log((abundance + np.finfo(float).eps) / substrateFreqTable[metabolite]) * stoichiometry | |
| 188 | |
| 189 rps_scores[reaction_name] = total_contribution if metab_significant else math.nan | |
| 190 | |
| 191 return rps_scores | |
| 192 | |
| 193 ############################ rps_for_cell_lines #################################### | |
| 194 def rps_for_cell_lines(dataset: List[List[str]], reactions: Dict[str, Dict[str, int]], black_list: List[str], syn_dict: Dict[str, List[str]], substrateFreqTable: Dict[str, int]) -> None: | |
| 195 """ | |
| 196 Calculate Reaction Propensity Scores (RPS) for each cell line represented in the dataframe and creates an output file. | |
| 197 | |
| 198 Parameters: | |
| 199 dataset : the dataset's data, by rows | |
| 200 reactions (dict): A dictionary representing reactions where keys are reaction names and values are dictionaries containing metabolite names as keys and stoichiometric coefficients as values. | |
| 201 black_list (list): A list containing metabolite names that should be excluded from the RPS calculation. | |
| 202 syn_dict (dict): A dictionary where keys are general metabolite names and values are lists of possible synonyms. | |
| 203 substrateFreqTable (dict): A dictionary where each metabolite name (key) is associated with how many times it shows up in the model's reactions (value). | |
| 204 | |
| 205 Returns: | |
| 206 None | |
| 207 """ | |
| 208 | |
| 209 cell_lines = dataset[0][1:] | |
| 210 abundances_dict = {} | |
| 211 | |
| 212 for row in dataset[1:]: | |
| 213 id = get_metabolite_id(row[0], syn_dict) | |
| 214 if id: | |
| 215 abundances_dict[id] = list(map(utils.Float(), row[1:])) | |
| 216 | |
| 217 missing_list = check_missing_metab(reactions, abundances_dict, len((cell_lines))) | |
| 218 | |
| 219 rps_scores :Dict[Dict[str, float]] = {} | |
| 220 for pos, cell_line_name in enumerate(cell_lines): | |
| 221 abundances = { metab : abundances[pos] for metab, abundances in abundances_dict.items() } | |
| 222 | |
| 223 rps_scores[cell_line_name] = calculate_rps(reactions, abundances, black_list, missing_list, substrateFreqTable) | |
| 224 | |
| 225 df = pd.DataFrame.from_dict(rps_scores) | |
| 226 df = df.loc[list(reactions.keys()),:] | |
| 227 # Optional preview: first 10 rows | |
| 228 # print(df.head(10)) | |
| 229 df.index.name = 'Reactions' | |
| 230 df.to_csv(ARGS.rps_output, sep='\t', na_rep='None', index=True) | |
| 231 | |
| 232 ############################ main #################################### | |
| 233 def main(args:List[str] = None) -> None: | |
| 234 """ | |
| 235 Initializes everything and sets the program in motion based on the fronted input arguments. | |
| 236 | |
| 237 Returns: | |
| 238 None | |
| 239 """ | |
| 240 global ARGS | |
| 241 ARGS = process_args(args) | |
| 242 | |
| 243 # Load support data (black list and synonyms) | |
| 244 with open(ARGS.tool_dir + '/local/pickle files/black_list.pickle', 'rb') as bl: | |
| 245 black_list = pk.load(bl) | |
| 246 | |
| 247 with open(ARGS.tool_dir + '/local/pickle files/synonyms.pickle', 'rb') as sd: | |
| 248 syn_dict = pk.load(sd) | |
| 249 | |
| 250 dataset = utils.readCsv(utils.FilePath.fromStrPath(ARGS.input), '\t', skipHeader=False) | |
| 251 tmp_dict = None | |
| 252 | |
| 253 # Parse custom reactions from uploaded file | |
| 254 reactions = reactionUtils.parse_custom_reactions(ARGS.model_upload) | |
| 255 for r, s in reactions.items(): | |
| 256 tmp_list = list(s.keys()) | |
| 257 for k in tmp_list: | |
| 258 if k[-2] == '_': | |
| 259 s[k[:-2]] = s.pop(k) | |
| 260 substrateFreqTable = {} | |
| 261 for _, substrates in reactions.items(): | |
| 262 for substrateName, _ in substrates.items(): | |
| 263 if substrateName not in substrateFreqTable: substrateFreqTable[substrateName] = 0 | |
| 264 substrateFreqTable[substrateName] += 1 | |
| 265 | |
| 266 # Debug prints (can be enabled during troubleshooting) | |
| 267 # print(f"Reactions: {reactions}") | |
| 268 # print(f"Substrate Frequencies: {substrateFreqTable}") | |
| 269 # print(f"Synonyms: {syn_dict}") | |
| 270 tmp_dict = {} | |
| 271 for metabName, freq in substrateFreqTable.items(): | |
| 272 tmp_metabName = clean_metabolite_name(metabName) | |
| 273 for syn_key, syn_list in syn_dict.items(): | |
| 274 if tmp_metabName in syn_list or tmp_metabName == clean_metabolite_name(syn_key): | |
| 275 # print(f"Mapping {tmp_metabName} to {syn_key}") | |
| 276 tmp_dict[syn_key] = syn_list | |
| 277 tmp_dict[syn_key].append(tmp_metabName) | |
| 278 | |
| 279 rps_for_cell_lines(dataset, reactions, black_list, syn_dict, substrateFreqTable) | |
| 280 print('Execution succeeded') | |
| 281 | |
| 282 ############################################################################## | |
| 283 if __name__ == "__main__": main() |
