Mercurial > repos > bimib > cobraxy
comparison COBRAxy/rps_generator.py @ 489:97eea560a10f draft
Uploaded
| author | francesco_lapi |
|---|---|
| date | Mon, 29 Sep 2025 10:33:26 +0000 |
| parents | 187cee1a00e2 |
| children |
comparison
equal
deleted
inserted
replaced
| 488:e0bcc61b2feb | 489:97eea560a10f |
|---|---|
| 1 """ | |
| 2 Compute Reaction Propensity Scores (RPS) from metabolite abundances and reaction stoichiometry. | |
| 3 | |
| 4 Given a tabular dataset (metabolites x samples) and a reaction set, this script | |
| 5 maps metabolite names via synonyms, fills missing metabolites, and computes RPS | |
| 6 per reaction for each sample using a log-normalized formula. | |
| 7 """ | |
| 1 import math | 8 import math |
| 2 import argparse | 9 import argparse |
| 3 | 10 |
| 4 import numpy as np | 11 import numpy as np |
| 5 import pickle as pk | 12 import pickle as pk |
| 20 args (list): List of command-line arguments. | 27 args (list): List of command-line arguments. |
| 21 | 28 |
| 22 Returns: | 29 Returns: |
| 23 Namespace: An object containing parsed arguments. | 30 Namespace: An object containing parsed arguments. |
| 24 """ | 31 """ |
| 25 parser = argparse.ArgumentParser(usage = '%(prog)s [options]', | 32 parser = argparse.ArgumentParser( |
| 26 description = 'process some value\'s'+ | 33 usage='%(prog)s [options]', |
| 27 ' abundances and reactions to create RPS scores.') | 34 description='Process abundances and reactions to create RPS scores.' |
| 28 parser.add_argument('-rc', '--reaction_choice', | 35 ) |
| 29 type = str, | 36 |
| 30 default = 'default', | 37 parser.add_argument("-rl", "--model_upload", type = str, |
| 31 choices = ['default','custom'], | 38 help = "path to input file containing the reactions") |
| 32 help = 'chose which type of reaction dataset you want use') | 39 |
| 33 parser.add_argument('-cm', '--custom', | |
| 34 type = str, | |
| 35 help='your dataset if you want custom reactions') | |
| 36 parser.add_argument('-td', '--tool_dir', | 40 parser.add_argument('-td', '--tool_dir', |
| 37 type = str, | 41 type = str, |
| 38 required = True, | 42 required = True, |
| 39 help = 'your tool directory') | 43 help = 'your tool directory') |
| 40 parser.add_argument('-ol', '--out_log', | 44 parser.add_argument('-ol', '--out_log', |
| 55 def name_dataset(name_data :str, count :int) -> str: | 59 def name_dataset(name_data :str, count :int) -> str: |
| 56 """ | 60 """ |
| 57 Produces a unique name for a dataset based on what was provided by the user. The default name for any dataset is "Dataset", thus if the user didn't change it this function appends f"_{count}" to make it unique. | 61 Produces a unique name for a dataset based on what was provided by the user. The default name for any dataset is "Dataset", thus if the user didn't change it this function appends f"_{count}" to make it unique. |
| 58 | 62 |
| 59 Args: | 63 Args: |
| 60 name_data : name associated with the dataset (from frontend input params) | 64 name_data: Name associated with the dataset (from frontend input params). |
| 61 count : counter from 1 to make these names unique (external) | 65 count: Counter starting at 1 to make names unique when default. |
| 62 | 66 |
| 63 Returns: | 67 Returns: |
| 64 str : the name made unique | 68 str : the name made unique |
| 65 """ | 69 """ |
| 66 if str(name_data) == 'Dataset': | 70 if str(name_data) == 'Dataset': |
| 82 pd.Series or None: A series containing abundance values for the specified cell line. | 86 pd.Series or None: A series containing abundance values for the specified cell line. |
| 83 The name of the series is the name of the cell line. | 87 The name of the series is the name of the cell line. |
| 84 Returns None if the cell index is invalid. | 88 Returns None if the cell index is invalid. |
| 85 """ | 89 """ |
| 86 if cell_line_index < 0 or cell_line_index >= len(dataset.index): | 90 if cell_line_index < 0 or cell_line_index >= len(dataset.index): |
| 87 print(f"Errore: This cell line index: '{cell_line_index}' is not valid.") | 91 print(f"Error: cell line index '{cell_line_index}' is not valid.") |
| 88 return None | 92 return None |
| 89 | 93 |
| 90 cell_line_name = dataset.columns[cell_line_index] | 94 cell_line_name = dataset.columns[cell_line_index] |
| 91 abundances_series = dataset[cell_line_name][1:] | 95 abundances_series = dataset[cell_line_name][1:] |
| 92 | 96 |
| 119 str : the internal :str unique identifier of that metabolite, used in all other parts of the model in use. | 123 str : the internal :str unique identifier of that metabolite, used in all other parts of the model in use. |
| 120 An empty string is returned if a match isn't found. | 124 An empty string is returned if a match isn't found. |
| 121 """ | 125 """ |
| 122 name = clean_metabolite_name(name) | 126 name = clean_metabolite_name(name) |
| 123 for id, synonyms in syn_dict.items(): | 127 for id, synonyms in syn_dict.items(): |
| 124 if name in synonyms: return id | 128 if name in synonyms: |
| 129 return id | |
| 125 | 130 |
| 126 return "" | 131 return "" |
| 127 | 132 |
| 128 ############################ check_missing_metab #################################### | 133 ############################ check_missing_metab #################################### |
| 129 def check_missing_metab(reactions: Dict[str, Dict[str, int]], dataset_by_rows: Dict[str, List[float]], cell_lines_amt :int) -> List[str]: | 134 def check_missing_metab(reactions: Dict[str, Dict[str, int]], dataset_by_rows: Dict[str, List[float]], cell_lines_amt :int) -> List[str]: |
| 130 """ | 135 """ |
| 131 Check for missing metabolites in the abundances dictionary compared to the reactions dictionary and update abundances accordingly. | 136 Check for missing metabolites in the abundances dictionary compared to the reactions dictionary and update abundances accordingly. |
| 132 | 137 |
| 133 Parameters: | 138 Parameters: |
| 134 reactions (dict): A dictionary representing reactions where keys are reaction names and values are dictionaries containing metabolite names as keys and stoichiometric coefficients as values. | 139 reactions (dict): A dictionary representing reactions where keys are reaction names and values are dictionaries containing metabolite names as keys and |
| 140 stoichiometric coefficients as values. | |
| 135 dataset_by_rows (dict): A dictionary representing abundances where keys are metabolite names and values are their corresponding abundances for all cell lines. | 141 dataset_by_rows (dict): A dictionary representing abundances where keys are metabolite names and values are their corresponding abundances for all cell lines. |
| 136 cell_lines_amt : amount of cell lines, needed to add a new list of abundances for missing metabolites. | 142 cell_lines_amt : amount of cell lines, needed to add a new list of abundances for missing metabolites. |
| 137 | 143 |
| 138 Returns: | 144 Returns: |
| 139 list[str] : list of metabolite names that were missing in the original abundances dictionary and thus their aboundances were set to 1. | 145 list[str] : list of metabolite names that were missing in the original abundances dictionary and thus their aboundances were set to 1. |
| 140 | 146 |
| 141 Side effects: | 147 Side effects: |
| 142 dataset_by_rows : mut | 148 dataset_by_rows: mutated to include missing metabolites with default abundances. |
| 143 """ | 149 """ |
| 144 missing_list = [] | 150 missing_list = [] |
| 145 for reaction in reactions.values(): | 151 for reaction in reactions.values(): |
| 146 for metabolite in reaction.keys(): | 152 for metabolite in reaction.keys(): |
| 147 if metabolite not in dataset_by_rows: | 153 if metabolite not in dataset_by_rows: |
| 197 substrateFreqTable (dict): A dictionary where each metabolite name (key) is associated with how many times it shows up in the model's reactions (value). | 203 substrateFreqTable (dict): A dictionary where each metabolite name (key) is associated with how many times it shows up in the model's reactions (value). |
| 198 | 204 |
| 199 Returns: | 205 Returns: |
| 200 None | 206 None |
| 201 """ | 207 """ |
| 208 | |
| 202 cell_lines = dataset[0][1:] | 209 cell_lines = dataset[0][1:] |
| 203 abundances_dict = {} | 210 abundances_dict = {} |
| 204 | 211 |
| 205 translationIsApplied = ARGS.reaction_choice == "default" | |
| 206 for row in dataset[1:]: | 212 for row in dataset[1:]: |
| 207 id = get_metabolite_id(row[0], syn_dict) if translationIsApplied else row[0] | 213 id = get_metabolite_id(row[0], syn_dict) |
| 208 if id: abundances_dict[id] = list(map(utils.Float(), row[1:])) | 214 if id: |
| 209 | 215 abundances_dict[id] = list(map(utils.Float(), row[1:])) |
| 216 | |
| 210 missing_list = check_missing_metab(reactions, abundances_dict, len((cell_lines))) | 217 missing_list = check_missing_metab(reactions, abundances_dict, len((cell_lines))) |
| 211 | 218 |
| 212 rps_scores :Dict[Dict[str, float]] = {} | 219 rps_scores :Dict[Dict[str, float]] = {} |
| 213 for pos, cell_line_name in enumerate(cell_lines): | 220 for pos, cell_line_name in enumerate(cell_lines): |
| 214 abundances = { metab : abundances[pos] for metab, abundances in abundances_dict.items() } | 221 abundances = { metab : abundances[pos] for metab, abundances in abundances_dict.items() } |
| 222 | |
| 215 rps_scores[cell_line_name] = calculate_rps(reactions, abundances, black_list, missing_list, substrateFreqTable) | 223 rps_scores[cell_line_name] = calculate_rps(reactions, abundances, black_list, missing_list, substrateFreqTable) |
| 216 | 224 |
| 217 df = pd.DataFrame.from_dict(rps_scores) | 225 df = pd.DataFrame.from_dict(rps_scores) |
| 218 | 226 df = df.loc[list(reactions.keys()),:] |
| 227 # Optional preview: first 10 rows | |
| 228 # print(df.head(10)) | |
| 219 df.index.name = 'Reactions' | 229 df.index.name = 'Reactions' |
| 220 df.to_csv(ARGS.rps_output, sep='\t', na_rep='None', index=True) | 230 df.to_csv(ARGS.rps_output, sep='\t', na_rep='None', index=True) |
| 221 | 231 |
| 222 ############################ main #################################### | 232 ############################ main #################################### |
| 223 def main(args:List[str] = None) -> None: | 233 def main(args:List[str] = None) -> None: |
| 228 None | 238 None |
| 229 """ | 239 """ |
| 230 global ARGS | 240 global ARGS |
| 231 ARGS = process_args(args) | 241 ARGS = process_args(args) |
| 232 | 242 |
| 233 # TODO:use utils functions vvv | 243 # Load support data (black list and synonyms) |
| 234 with open(ARGS.tool_dir + '/local/pickle files/black_list.pickle', 'rb') as bl: | 244 with open(ARGS.tool_dir + '/local/pickle files/black_list.pickle', 'rb') as bl: |
| 235 black_list = pk.load(bl) | 245 black_list = pk.load(bl) |
| 236 | 246 |
| 237 with open(ARGS.tool_dir + '/local/pickle files/synonyms.pickle', 'rb') as sd: | 247 with open(ARGS.tool_dir + '/local/pickle files/synonyms.pickle', 'rb') as sd: |
| 238 syn_dict = pk.load(sd) | 248 syn_dict = pk.load(sd) |
| 239 | 249 |
| 240 dataset = utils.readCsv(utils.FilePath.fromStrPath(ARGS.input), '\t', skipHeader = False) | 250 dataset = utils.readCsv(utils.FilePath.fromStrPath(ARGS.input), '\t', skipHeader=False) |
| 241 | 251 tmp_dict = None |
| 242 if ARGS.reaction_choice == 'default': | 252 |
| 243 reactions = pk.load(open(ARGS.tool_dir + '/local/pickle files/reactions.pickle', 'rb')) | 253 # Parse custom reactions from uploaded file |
| 244 substrateFreqTable = pk.load(open(ARGS.tool_dir + '/local/pickle files/substrate_frequencies.pickle', 'rb')) | 254 reactions = reactionUtils.parse_custom_reactions(ARGS.model_upload) |
| 245 | 255 for r, s in reactions.items(): |
| 246 elif ARGS.reaction_choice == 'custom': | 256 tmp_list = list(s.keys()) |
| 247 reactions = reactionUtils.parse_custom_reactions(ARGS.custom) | 257 for k in tmp_list: |
| 248 substrateFreqTable = {} | 258 if k[-2] == '_': |
| 249 for _, substrates in reactions.items(): | 259 s[k[:-2]] = s.pop(k) |
| 250 for substrateName, _ in substrates.items(): | 260 substrateFreqTable = {} |
| 251 if substrateName not in substrateFreqTable: substrateFreqTable[substrateName] = 0 | 261 for _, substrates in reactions.items(): |
| 252 substrateFreqTable[substrateName] += 1 | 262 for substrateName, _ in substrates.items(): |
| 263 if substrateName not in substrateFreqTable: substrateFreqTable[substrateName] = 0 | |
| 264 substrateFreqTable[substrateName] += 1 | |
| 265 | |
| 266 # Debug prints (can be enabled during troubleshooting) | |
| 267 # print(f"Reactions: {reactions}") | |
| 268 # print(f"Substrate Frequencies: {substrateFreqTable}") | |
| 269 # print(f"Synonyms: {syn_dict}") | |
| 270 tmp_dict = {} | |
| 271 for metabName, freq in substrateFreqTable.items(): | |
| 272 tmp_metabName = clean_metabolite_name(metabName) | |
| 273 for syn_key, syn_list in syn_dict.items(): | |
| 274 if tmp_metabName in syn_list or tmp_metabName == clean_metabolite_name(syn_key): | |
| 275 # print(f"Mapping {tmp_metabName} to {syn_key}") | |
| 276 tmp_dict[syn_key] = syn_list | |
| 277 tmp_dict[syn_key].append(tmp_metabName) | |
| 253 | 278 |
| 254 rps_for_cell_lines(dataset, reactions, black_list, syn_dict, substrateFreqTable) | 279 rps_for_cell_lines(dataset, reactions, black_list, syn_dict, substrateFreqTable) |
| 255 print('Execution succeded') | 280 print('Execution succeeded') |
| 256 | 281 |
| 257 ############################################################################## | 282 ############################################################################## |
| 258 if __name__ == "__main__": main() | 283 if __name__ == "__main__": main() |
