comparison COBRAxy/rps_generator_beta.py @ 406:187cee1a00e2 draft

Uploaded
author francesco_lapi
date Mon, 08 Sep 2025 14:44:15 +0000
parents
children a6e45049c1b9
comparison
equal deleted inserted replaced
405:716b1a638fb5 406:187cee1a00e2
1 import math
2 import argparse
3
4 import numpy as np
5 import pickle as pk
6 import pandas as pd
7
8 from typing import Optional, List, Dict
9
10 import utils.general_utils as utils
11 import utils.reaction_parsing as reactionUtils
12
13 ########################## argparse ##########################################
14 ARGS :argparse.Namespace
15 def process_args(args:List[str] = None) -> argparse.Namespace:
16 """
17 Processes command-line arguments.
18
19 Args:
20 args (list): List of command-line arguments.
21
22 Returns:
23 Namespace: An object containing parsed arguments.
24 """
25 parser = argparse.ArgumentParser(usage = '%(prog)s [options]',
26 description = 'process some value\'s'+
27 ' abundances and reactions to create RPS scores.')
28
29 parser.add_argument("-rl", "--model_upload", type = str,
30 help = "path to input file containing the reactions")
31
32 # model_upload custom
33 parser.add_argument('-td', '--tool_dir',
34 type = str,
35 required = True,
36 help = 'your tool directory')
37 parser.add_argument('-ol', '--out_log',
38 help = "Output log")
39 parser.add_argument('-id', '--input',
40 type = str,
41 required = True,
42 help = 'input dataset')
43 parser.add_argument('-rp', '--rps_output',
44 type = str,
45 required = True,
46 help = 'rps output')
47
48 args = parser.parse_args(args)
49 return args
50
51 ############################ dataset name #####################################
52 def name_dataset(name_data :str, count :int) -> str:
53 """
54 Produces a unique name for a dataset based on what was provided by the user. The default name for any dataset is "Dataset", thus if the user didn't change it this function appends f"_{count}" to make it unique.
55
56 Args:
57 name_data : name associated with the dataset (from frontend input params)
58 count : counter from 1 to make these names unique (external)
59
60 Returns:
61 str : the name made unique
62 """
63 if str(name_data) == 'Dataset':
64 return str(name_data) + '_' + str(count)
65 else:
66 return str(name_data)
67
68 ############################ get_abund_data ####################################
69 def get_abund_data(dataset: pd.DataFrame, cell_line_index:int) -> Optional[pd.Series]:
70 """
71 Extracts abundance data and turns it into a series for a specific cell line from the dataset, which rows are
72 metabolites and columns are cell lines.
73
74 Args:
75 dataset (pandas.DataFrame): The DataFrame containing abundance data for all cell lines and metabolites.
76 cell_line_index (int): The index of the cell line of interest in the dataset.
77
78 Returns:
79 pd.Series or None: A series containing abundance values for the specified cell line.
80 The name of the series is the name of the cell line.
81 Returns None if the cell index is invalid.
82 """
83 if cell_line_index < 0 or cell_line_index >= len(dataset.index):
84 print(f"Errore: This cell line index: '{cell_line_index}' is not valid.")
85 return None
86
87 cell_line_name = dataset.columns[cell_line_index]
88 abundances_series = dataset[cell_line_name][1:]
89
90 return abundances_series
91
92 ############################ clean_metabolite_name ####################################
93 def clean_metabolite_name(name :str) -> str:
94 """
95 Removes some characters from a metabolite's name, provided as input, and makes it lowercase in order to simplify
96 the search of a match in the dictionary of synonyms.
97
98 Args:
99 name : the metabolite's name, as given in the dataset.
100
101 Returns:
102 str : a new string with the cleaned name.
103 """
104 return "".join(ch for ch in name if ch not in ",;-_'([{ }])").lower()
105
106 ############################ get_metabolite_id ####################################
107 def get_metabolite_id(name :str, syn_dict :Dict[str, List[str]]) -> str:
108 """
109 Looks through a dictionary of synonyms to find a match for a given metabolite's name.
110
111 Args:
112 name : the metabolite's name, as given in the dataset.
113 syn_dict : the dictionary of synonyms, using unique identifiers as keys and lists of clean synonyms as values.
114
115 Returns:
116 str : the internal :str unique identifier of that metabolite, used in all other parts of the model in use.
117 An empty string is returned if a match isn't found.
118 """
119 name = clean_metabolite_name(name)
120 for id, synonyms in syn_dict.items():
121 if name in synonyms:
122 return id
123
124 return ""
125
126 ############################ check_missing_metab ####################################
127 def check_missing_metab(reactions: Dict[str, Dict[str, int]], dataset_by_rows: Dict[str, List[float]], cell_lines_amt :int) -> List[str]:
128 """
129 Check for missing metabolites in the abundances dictionary compared to the reactions dictionary and update abundances accordingly.
130
131 Parameters:
132 reactions (dict): A dictionary representing reactions where keys are reaction names and values are dictionaries containing metabolite names as keys and
133 stoichiometric coefficients as values.
134 dataset_by_rows (dict): A dictionary representing abundances where keys are metabolite names and values are their corresponding abundances for all cell lines.
135 cell_lines_amt : amount of cell lines, needed to add a new list of abundances for missing metabolites.
136
137 Returns:
138 list[str] : list of metabolite names that were missing in the original abundances dictionary and thus their aboundances were set to 1.
139
140 Side effects:
141 dataset_by_rows : mut
142 """
143 missing_list = []
144 for reaction in reactions.values():
145 for metabolite in reaction.keys():
146 if metabolite not in dataset_by_rows:
147 dataset_by_rows[metabolite] = [1] * cell_lines_amt
148 missing_list.append(metabolite)
149
150 return missing_list
151
152 ############################ calculate_rps ####################################
153 def calculate_rps(reactions: Dict[str, Dict[str, int]], abundances: Dict[str, float], black_list: List[str], missing_list: List[str], substrateFreqTable: Dict[str, int]) -> Dict[str, float]:
154 """
155 Calculate the Reaction Propensity scores (RPS) based on the availability of reaction substrates, for (ideally) each input model reaction and for each sample.
156 The score is computed as the product of the concentrations of the reacting substances, with each concentration raised to a power equal to its stoichiometric coefficient
157 for each reaction using the provided coefficient and abundance values. The value is then normalized, based on how frequent the metabolite is in the selected model's reactions,
158 and log-transformed.
159
160 Parameters:
161 reactions (dict): A dictionary representing reactions where keys are reaction names and values are dictionaries containing metabolite names as keys and stoichiometric coefficients as values.
162 abundances (dict): A dictionary representing metabolite abundances where keys are metabolite names and values are their corresponding abundances.
163 black_list (list): A list containing metabolite names that should be excluded from the RPS calculation.
164 missing_list (list): A list containing metabolite names that were missing in the original abundances dictionary and thus their values were set to 1.
165 substrateFreqTable (dict): A dictionary where each metabolite name (key) is associated with how many times it shows up in the model's reactions (value).
166
167 Returns:
168 dict: A dictionary containing Reaction Propensity Scores (RPS) where keys are reaction names and values are the corresponding RPS scores.
169 """
170 rps_scores = {}
171
172 for reaction_name, substrates in reactions.items():
173 total_contribution = 0
174 metab_significant = False
175 for metabolite, stoichiometry in substrates.items():
176 abundance = 1 if math.isnan(abundances[metabolite]) else abundances[metabolite]
177 if metabolite not in black_list and metabolite not in missing_list:
178 metab_significant = True
179
180 total_contribution += math.log((abundance + np.finfo(float).eps) / substrateFreqTable[metabolite]) * stoichiometry
181
182 rps_scores[reaction_name] = total_contribution if metab_significant else math.nan
183
184 return rps_scores
185
186 ############################ rps_for_cell_lines ####################################
187 def rps_for_cell_lines(dataset: List[List[str]], reactions: Dict[str, Dict[str, int]], black_list: List[str], syn_dict: Dict[str, List[str]], substrateFreqTable: Dict[str, int]) -> None:
188 """
189 Calculate Reaction Propensity Scores (RPS) for each cell line represented in the dataframe and creates an output file.
190
191 Parameters:
192 dataset : the dataset's data, by rows
193 reactions (dict): A dictionary representing reactions where keys are reaction names and values are dictionaries containing metabolite names as keys and stoichiometric coefficients as values.
194 black_list (list): A list containing metabolite names that should be excluded from the RPS calculation.
195 syn_dict (dict): A dictionary where keys are general metabolite names and values are lists of possible synonyms.
196 substrateFreqTable (dict): A dictionary where each metabolite name (key) is associated with how many times it shows up in the model's reactions (value).
197
198 Returns:
199 None
200 """
201
202 cell_lines = dataset[0][1:]
203 abundances_dict = {}
204
205 for row in dataset[1:]:
206 id = get_metabolite_id(row[0], syn_dict) #if translationIsApplied else row[0]
207 if id:
208 abundances_dict[id] = list(map(utils.Float(), row[1:]))
209
210 missing_list = check_missing_metab(reactions, abundances_dict, len((cell_lines)))
211
212 rps_scores :Dict[Dict[str, float]] = {}
213 for pos, cell_line_name in enumerate(cell_lines):
214 abundances = { metab : abundances[pos] for metab, abundances in abundances_dict.items() }
215
216 rps_scores[cell_line_name] = calculate_rps(reactions, abundances, black_list, missing_list, substrateFreqTable)
217
218 df = pd.DataFrame.from_dict(rps_scores)
219 df = df.loc[list(reactions.keys()),:]
220 print(df.head(10))
221 df.index.name = 'Reactions'
222 df.to_csv(ARGS.rps_output, sep='\t', na_rep='None', index=True)
223
224 ############################ main ####################################
225 def main(args:List[str] = None) -> None:
226 """
227 Initializes everything and sets the program in motion based on the fronted input arguments.
228
229 Returns:
230 None
231 """
232 global ARGS
233 ARGS = process_args(args)
234
235 # TODO:use utils functions vvv
236 with open(ARGS.tool_dir + '/local/pickle files/black_list.pickle', 'rb') as bl:
237 black_list = pk.load(bl)
238
239 with open(ARGS.tool_dir + '/local/pickle files/synonyms.pickle', 'rb') as sd:
240 syn_dict = pk.load(sd)
241
242 dataset = utils.readCsv(utils.FilePath.fromStrPath(ARGS.input), '\t', skipHeader = False)
243 tmp_dict = None
244 #if ARGS.reaction_choice == 'default':
245 # reactions = pk.load(open(ARGS.tool_dir + '/local/pickle files/reactions.pickle', 'rb'))
246 # substrateFreqTable = pk.load(open(ARGS.tool_dir + '/local/pickle files/substrate_frequencies.pickle', 'rb'))
247
248 #elif ARGS.reaction_choice == 'custom':
249 reactions = reactionUtils.parse_custom_reactions(ARGS.model_upload)
250 for r, s in reactions.items():
251 tmp_list = list(s.keys())
252 for k in tmp_list:
253 if k[-2] == '_':
254 s[k[:-2]] = s.pop(k)
255 substrateFreqTable = {}
256 for _, substrates in reactions.items():
257 for substrateName, _ in substrates.items():
258 if substrateName not in substrateFreqTable: substrateFreqTable[substrateName] = 0
259 substrateFreqTable[substrateName] += 1
260
261 print(f"Reactions: {reactions}")
262 print(f"Substrate Frequencies: {substrateFreqTable}")
263 print(f"Synonyms: {syn_dict}")
264 tmp_dict = {}
265 for metabName, freq in substrateFreqTable.items():
266 tmp_metabName = clean_metabolite_name(metabName)
267 for syn_key, syn_list in syn_dict.items():
268 if tmp_metabName in syn_list or tmp_metabName == clean_metabolite_name(syn_key):
269 print(f"Mapping {tmp_metabName} to {syn_key}")
270 tmp_dict[syn_key] = syn_list
271 tmp_dict[syn_key].append(tmp_metabName)
272
273 rps_for_cell_lines(dataset, reactions, black_list, syn_dict, substrateFreqTable)
274 print('Execution succeded')
275
276 ##############################################################################
277 if __name__ == "__main__": main()