comparison COBRAxy/utils/CBS_backend.py @ 456:a6e45049c1b9 draft default tip

Uploaded
author francesco_lapi
date Fri, 12 Sep 2025 17:28:45 +0000
parents 0a3ca20848f3
children
comparison
equal deleted inserted replaced
455:4e2bc80764b6 456:a6e45049c1b9
1 """
2 CBS backend utilities using GLPK for constraint-based sampling.
3
4 This module builds and solves LP problems from COBRA models, supports random
5 objective function generation (CBS), and provides both swiglpk-based and
6 COBRApy-based sampling fallbacks.
7 """
1 from swiglpk import * 8 from swiglpk import *
2 import random 9 import random
3 import pandas as pd 10 import pandas as pd
4 import numpy as np 11 import numpy as np
5 import cobra as cb 12 import cobra as cb
6 13
7 # Initialize LP problem 14 # Initialize LP problem
8 def initialize_lp_problem(S): 15 def initialize_lp_problem(S):
16 """
17 Prepare sparse matrix structures for GLPK given a stoichiometric matrix.
18
19 Args:
20 S: Stoichiometric matrix (DOK or similar) as returned by cobra.util.create_stoichiometric_matrix.
21
22 Returns:
23 tuple: (len_vector, values, indexes, ia, ja, ar, nrows, ncol)
24 - len_vector: number of non-zero entries
25 - values: list of non-zero values
26 - indexes: list of (row, col) indices for non-zero entries
27 - ia, ja, ar: GLPK-ready arrays for the sparse matrix
28 - nrows, ncol: matrix shape
29 """
9 30
10 len_vector=len(S.keys()) 31 len_vector=len(S.keys())
11 values=list(S.values()) 32 values=list(S.values())
12 indexes=list(S.keys()) 33 indexes=list(S.keys())
13 ia = intArray(len_vector+1); 34 ia = intArray(len_vector+1);
27 48
28 return len_vector, values, indexes, ia, ja, ar, nrows, ncol 49 return len_vector, values, indexes, ia, ja, ar, nrows, ncol
29 50
30 51
31 52
32 # Solve LP problem from the structure of the metabolic model
33 def create_and_solve_lp_problem(lb,ub,nrows, ncol, len_vector, ia, ja, ar, 53 def create_and_solve_lp_problem(lb,ub,nrows, ncol, len_vector, ia, ja, ar,
34 obj_coefs,reactions,return_lp=False): 54 obj_coefs,reactions,return_lp=False):
55 """
56 Create and solve a GLPK LP problem for a metabolic model.
57
58 Args:
59 lb, ub: Lower/upper bounds per reaction (lists of floats).
60 nrows, ncol: Dimensions of the S matrix.
61 len_vector, ia, ja, ar: Sparse matrix data prepared for GLPK.
62 obj_coefs: Objective function coefficients (list of floats).
63 reactions: Reaction identifiers (list of str), used for output mapping.
64 return_lp: If True, also return the GLPK problem object (caller must delete).
65
66 Returns:
67 tuple: (fluxes, Z) or (fluxes, Z, lp) if return_lp=True.
68 """
35 69
36 70
37 lp = glp_create_prob(); 71 lp = glp_create_prob();
38 glp_set_prob_name(lp, "sample"); 72 glp_set_prob_name(lp, "sample");
39 glp_set_obj_dir(lp, GLP_MAX); 73 glp_set_obj_dir(lp, GLP_MAX);
58 except Exception as e: 92 except Exception as e:
59 glp_delete_prob(lp) 93 glp_delete_prob(lp)
60 raise Exception(e) 94 raise Exception(e)
61 95
62 96
63 # Solve LP problem from the structure of the metabolic model
64 def solve_lp_problem(lp,obj_coefs,reactions): 97 def solve_lp_problem(lp,obj_coefs,reactions):
98 """
99 Configure and solve an LP with GLPK using provided objective coefficients.
100
101 Args:
102 lp: GLPK problem handle.
103 obj_coefs: Objective coefficients.
104 reactions: Reaction identifiers (unused in computation; length used for output).
105
106 Returns:
107 tuple: (fluxes, Z) where fluxes is a list of primal column values and Z is the objective value.
108 """
65 109
66 # Set the coefficients of the objective function 110 # Set the coefficients of the objective function
67 i=1 111 i=1
68 for ind_coef in obj_coefs: 112 for ind_coef in obj_coefs:
69 glp_set_obj_coef(lp, i, ind_coef); 113 glp_set_obj_coef(lp, i, ind_coef);
99 raise Exception(e) 143 raise Exception(e)
100 finally: 144 finally:
101 # Re-enable terminal output after solving 145 # Re-enable terminal output after solving
102 glp_term_out(GLP_ON) 146 glp_term_out(GLP_ON)
103 147
104 # Create LP structure
105 def create_lp_structure(model): 148 def create_lp_structure(model):
149 """
150 Extract the LP structure (S matrix, bounds, objective) from a COBRA model.
151
152 Args:
153 model (cobra.Model): The COBRA model.
154
155 Returns:
156 tuple: (S, lb, ub, coefs_obj, reactions)
157 """
106 158
107 reactions=[el.id for el in model.reactions] 159 reactions=[el.id for el in model.reactions]
108 coefs_obj=[reaction.objective_coefficient for reaction in model.reactions] 160 coefs_obj=[reaction.objective_coefficient for reaction in model.reactions]
109 161
110 # Lower and upper bounds 162 # Lower and upper bounds
114 # Create S matrix 166 # Create S matrix
115 S=cb.util.create_stoichiometric_matrix(model,array_type="dok") 167 S=cb.util.create_stoichiometric_matrix(model,array_type="dok")
116 168
117 return S,lb,ub,coefs_obj,reactions 169 return S,lb,ub,coefs_obj,reactions
118 170
119 # CBS sampling interface
120 def randomObjectiveFunctionSampling(model, nsample, coefficients_df, df_sample): 171 def randomObjectiveFunctionSampling(model, nsample, coefficients_df, df_sample):
172 """
173 Sample fluxes using GLPK by iterating over random objective functions.
174
175 Args:
176 model: COBRA model.
177 nsample: Number of samples to generate.
178 coefficients_df: DataFrame of objective coefficients (columns are samples; last row is type_of_problem).
179 df_sample: Output DataFrame to fill with flux samples (index: sample, columns: reactions).
180
181 Returns:
182 None
183 """
121 184
122 S,lb,ub,coefs_obj,reactions = create_lp_structure(model) 185 S,lb,ub,coefs_obj,reactions = create_lp_structure(model)
123 len_vector, values, indexes, ia, ja, ar, nrow, ncol = initialize_lp_problem(S) 186 len_vector, values, indexes, ia, ja, ar, nrow, ncol = initialize_lp_problem(S)
124 187
125 for i in range(nsample): 188 for i in range(nsample):
126 189
127 coefs_obj=coefficients_df.iloc[:,i].values 190 coefs_obj=coefficients_df.iloc[:,i].values
128 191
129 if coefs_obj[-1]==1: #minimize 192 if coefs_obj[-1]==1: # minimize
130 coefs_obj= coefs_obj[0:-1] * -1 193 coefs_obj= coefs_obj[0:-1] * -1
131 else: 194 else:
132 coefs_obj=coefs_obj[0:-1] 195 coefs_obj=coefs_obj[0:-1]
133 196
134 fluxes,Z = create_and_solve_lp_problem(lb,ub, nrow, ncol, len_vector, 197 fluxes,Z = create_and_solve_lp_problem(lb,ub, nrow, ncol, len_vector,
135 ia, ja, ar, coefs_obj,reactions,return_lp=False) 198 ia, ja, ar, coefs_obj,reactions,return_lp=False)
136 df_sample.loc[i] = fluxes 199 df_sample.loc[i] = fluxes
137 pass 200 return
138 201
139 def randomObjectiveFunctionSampling_cobrapy(model, nsample, coefficients_df, df_sample): 202 def randomObjectiveFunctionSampling_cobrapy(model, nsample, coefficients_df, df_sample):
203 """
204 Fallback sampling using COBRApy's optimize with per-sample randomized objectives.
205
206 Args:
207 model: COBRA model.
208 nsample: Number of samples to generate.
209 coefficients_df: DataFrame of objective coefficients (columns are samples; last row is type_of_problem).
210 df_sample: Output DataFrame to fill with flux samples (index: sample, columns: reactions).
211
212 Returns:
213 None
214 """
140 215
141 for i in range(nsample): 216 for i in range(nsample):
142 217
143 dict_coeff={} 218 dict_coeff={}
144 if(coefficients_df.iloc[-1][i]==1): 219 if(coefficients_df.iloc[-1][i]==1):
145 type_problem = -1 #minimize 220 type_problem = -1 # minimize
146 else: 221 else:
147 type_problem = 1 222 type_problem = 1 # maximize
148 223
149 for rxn in [reaction.id for reaction in model.reactions]: 224 for rxn in [reaction.id for reaction in model.reactions]:
150 dict_coeff[model.reactions.get_by_id(rxn)] = coefficients_df.loc[rxn][i] * type_problem 225 dict_coeff[model.reactions.get_by_id(rxn)] = coefficients_df.loc[rxn][i] * type_problem
151 226
152 model.objective = dict_coeff 227 model.objective = dict_coeff
153 solution = model.optimize().fluxes 228 solution = model.optimize().fluxes
154 for rxn, flux in solution.items(): 229 for rxn, flux in solution.items():
155 df_sample.loc[i][rxn] = flux 230 df_sample.loc[i][rxn] = flux
156 231
157 pass 232 return
158 233
159 # Create random coefficients for CBS
160 def randomObjectiveFunction(model, n_samples, df_fva, seed=0): 234 def randomObjectiveFunction(model, n_samples, df_fva, seed=0):
161 235 """
162 236 Create random objective function coefficients for CBS sampling.
163 #reactions = model.reactions 237
164 reactions = [reaction.id for reaction in model.reactions] 238 The last row 'type_of_problem' encodes 0 for maximize and 1 for minimize.
165 cont=seed 239
166 list_ex=reactions.copy() 240 Args:
167 list_ex.append("type_of_problem") 241 model: COBRA model.
168 coefficients_df = pd.DataFrame(index=list_ex,columns=[str(i) for i in range(n_samples)]) 242 n_samples: Number of random objectives to generate.
169 243 df_fva: Flux Variability Analysis results with 'minimum' and 'maximum' per reaction.
170 for i in range(0, n_samples): 244 seed: Seed for reproducibility.
171 245
172 cont=cont+1 246 Returns:
247 pandas.DataFrame: Coefficients DataFrame indexed by reaction IDs plus 'type_of_problem'.
248 """
249 # reactions = model.reactions
250 reactions = [reaction.id for reaction in model.reactions]
251 cont = seed
252 list_ex = reactions.copy()
253 list_ex.append("type_of_problem")
254 coefficients_df = pd.DataFrame(index=list_ex, columns=[str(i) for i in range(n_samples)])
255
256 for i in range(0, n_samples):
257
258 cont = cont + 1
259 random.seed(cont)
260
261 # Generate a random threshold in [0, 1]
262 threshold = random.random()
263
264 for reaction in reactions:
265
266 cont = cont + 1
173 random.seed(cont) 267 random.seed(cont)
174 268
175 # Genera un numero casuale tra 0 e 1 269 val = random.random()
176 threshold = random.random() #coefficiente tra 0 e 1 270
177 271 if val > threshold:
178 for reaction in reactions: 272
179 273 cont = cont + 1
180 cont=cont+1
181 random.seed(cont) 274 random.seed(cont)
182 275
183 val=random.random() 276 # Coefficient in [-1, 1]
184 277 c = 2 * random.random() - 1
185 if val>threshold: 278
186 279 val_max = np.max([abs(df_fva.loc[reaction, "minimum"]), abs(df_fva.loc[reaction, "maximum"])])
187 cont=cont+1 280
188 random.seed(cont) 281 if val_max != 0: # only if FVA is non-zero
189 282 coefficients_df.loc[reaction, str(i)] = c / val_max # scale by FVA
190 c=2*random.random()-1 #coefficiente tra -1 e 1
191
192 val_max = np.max([abs(df_fva.loc[reaction,"minimum"]), abs(df_fva.loc[reaction,"maximum"])])
193
194 if val_max!=0: #solo se la fva รจ diversa da zero
195 coefficients_df.loc[reaction,str(i)] = c/val_max #divido per la fva
196 else:
197 coefficients_df.loc[reaction,str(i)] = 0
198
199 else: 283 else:
200 coefficients_df.loc[reaction,str(i)] = 0 284 coefficients_df.loc[reaction, str(i)] = 0
201 285
202 cont=cont+1
203 random.seed(cont)
204
205 if random.random()<0.5:
206 coefficients_df.loc["type_of_problem",str(i)] = 0 #maximize
207 else: 286 else:
208 coefficients_df.loc["type_of_problem",str(i)] = 1 #minimize 287 coefficients_df.loc[reaction, str(i)] = 0
209 288
210 return coefficients_df 289 cont = cont + 1
290 random.seed(cont)
291
292 if random.random() < 0.5:
293 coefficients_df.loc["type_of_problem", str(i)] = 0 # maximize
294 else:
295 coefficients_df.loc["type_of_problem", str(i)] = 1 # minimize
296
297 return coefficients_df