Mercurial > repos > bimib > cobraxy
comparison COBRAxy/custom_data_generator_beta.py @ 411:6b015d3184ab draft
Uploaded
| author | francesco_lapi |
|---|---|
| date | Mon, 08 Sep 2025 21:07:34 +0000 |
| parents | 187cee1a00e2 |
| children | 5086145cfb96 |
comparison
equal
deleted
inserted
replaced
| 410:d660c5b03c14 | 411:6b015d3184ab |
|---|---|
| 70 | 70 |
| 71 except Exception as e: raise utils.DataErr(file_path, e.__str__()) | 71 except Exception as e: raise utils.DataErr(file_path, e.__str__()) |
| 72 raise utils.DataErr(file_path, | 72 raise utils.DataErr(file_path, |
| 73 f"Formato \"{file_path.ext}\" non riconosciuto, sono supportati solo file JSON e XML") | 73 f"Formato \"{file_path.ext}\" non riconosciuto, sono supportati solo file JSON e XML") |
| 74 | 74 |
| 75 ################################- DATA GENERATION -################################ | 75 |
| 76 ReactionId = str | |
| 77 def generate_rules(model: cobra.Model, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]: | |
| 78 """ | |
| 79 Generates a dictionary mapping reaction ids to rules from the model. | |
| 80 | |
| 81 Args: | |
| 82 model : the model to derive data from. | |
| 83 asParsed : if True parses the rules to an optimized runtime format, otherwise leaves them as strings. | |
| 84 | |
| 85 Returns: | |
| 86 Dict[ReactionId, rulesUtils.OpList] : the generated dictionary of parsed rules. | |
| 87 Dict[ReactionId, str] : the generated dictionary of raw rules. | |
| 88 """ | |
| 89 # Is the below approach convoluted? yes | |
| 90 # Ok but is it inefficient? probably | |
| 91 # Ok but at least I don't have to repeat the check at every rule (I'm clinically insane) | |
| 92 _ruleGetter = lambda reaction : reaction.gene_reaction_rule | |
| 93 ruleExtractor = (lambda reaction : | |
| 94 rulesUtils.parseRuleToNestedList(_ruleGetter(reaction))) if asParsed else _ruleGetter | |
| 95 | |
| 96 return { | |
| 97 reaction.id : ruleExtractor(reaction) | |
| 98 for reaction in model.reactions | |
| 99 if reaction.gene_reaction_rule } | |
| 100 | |
| 101 def generate_reactions(model :cobra.Model, *, asParsed = True) -> Dict[ReactionId, str]: | |
| 102 """ | |
| 103 Generates a dictionary mapping reaction ids to reaction formulas from the model. | |
| 104 | |
| 105 Args: | |
| 106 model : the model to derive data from. | |
| 107 asParsed : if True parses the reactions to an optimized runtime format, otherwise leaves them as they are. | |
| 108 | |
| 109 Returns: | |
| 110 Dict[ReactionId, str] : the generated dictionary. | |
| 111 """ | |
| 112 | |
| 113 unparsedReactions = { | |
| 114 reaction.id : reaction.reaction | |
| 115 for reaction in model.reactions | |
| 116 if reaction.reaction | |
| 117 } | |
| 118 | |
| 119 if not asParsed: return unparsedReactions | |
| 120 | |
| 121 return reactionUtils.create_reaction_dict(unparsedReactions) | |
| 122 | |
| 123 def get_medium(model:cobra.Model) -> pd.DataFrame: | |
| 124 trueMedium=[] | |
| 125 for r in model.reactions: | |
| 126 positiveCoeff=0 | |
| 127 for m in r.metabolites: | |
| 128 if r.get_coefficient(m.id)>0: | |
| 129 positiveCoeff=1; | |
| 130 if (positiveCoeff==0 and r.lower_bound<0): | |
| 131 trueMedium.append(r.id) | |
| 132 | |
| 133 df_medium = pd.DataFrame() | |
| 134 df_medium["reaction"] = trueMedium | |
| 135 return df_medium | |
| 136 | |
| 137 def generate_bounds(model:cobra.Model) -> pd.DataFrame: | |
| 138 | |
| 139 rxns = [] | |
| 140 for reaction in model.reactions: | |
| 141 rxns.append(reaction.id) | |
| 142 | |
| 143 bounds = pd.DataFrame(columns = ["lower_bound", "upper_bound"], index=rxns) | |
| 144 | |
| 145 for reaction in model.reactions: | |
| 146 bounds.loc[reaction.id] = [reaction.lower_bound, reaction.upper_bound] | |
| 147 return bounds | |
| 148 | |
| 149 | |
| 150 | |
| 151 def generate_compartments(model: cobra.Model) -> pd.DataFrame: | |
| 152 """ | |
| 153 Generates a DataFrame containing compartment information for each reaction. | |
| 154 Creates columns for each compartment position (Compartment_1, Compartment_2, etc.) | |
| 155 | |
| 156 Args: | |
| 157 model: the COBRA model to extract compartment data from. | |
| 158 | |
| 159 Returns: | |
| 160 pd.DataFrame: DataFrame with ReactionID and compartment columns | |
| 161 """ | |
| 162 pathway_data = [] | |
| 163 | |
| 164 # First pass: determine the maximum number of pathways any reaction has | |
| 165 max_pathways = 0 | |
| 166 reaction_pathways = {} | |
| 167 | |
| 168 for reaction in model.reactions: | |
| 169 # Get unique pathways from all metabolites in the reaction | |
| 170 if type(reaction.annotation['pathways']) == list: | |
| 171 reaction_pathways[reaction.id] = reaction.annotation['pathways'] | |
| 172 max_pathways = max(max_pathways, len(reaction.annotation['pathways'])) | |
| 173 else: | |
| 174 reaction_pathways[reaction.id] = [reaction.annotation['pathways']] | |
| 175 | |
| 176 # Create column names for pathways | |
| 177 pathway_columns = [f"Pathway_{i+1}" for i in range(max_pathways)] | |
| 178 | |
| 179 # Second pass: create the data | |
| 180 for reaction_id, pathways in reaction_pathways.items(): | |
| 181 row = {"ReactionID": reaction_id} | |
| 182 | |
| 183 # Fill pathway columns | |
| 184 for i in range(max_pathways): | |
| 185 col_name = pathway_columns[i] | |
| 186 if i < len(pathways): | |
| 187 row[col_name] = pathways[i] | |
| 188 else: | |
| 189 row[col_name] = None # or "" if you prefer empty strings | |
| 190 | |
| 191 pathway_data.append(row) | |
| 192 | |
| 193 return pd.DataFrame(pathway_data) | |
| 194 | 76 |
| 195 | 77 |
| 196 ###############################- FILE SAVING -################################ | 78 ###############################- FILE SAVING -################################ |
| 197 def save_as_csv_filePath(data :dict, file_path :utils.FilePath, fieldNames :Tuple[str, str]) -> None: | 79 def save_as_csv_filePath(data :dict, file_path :utils.FilePath, fieldNames :Tuple[str, str]) -> None: |
| 198 """ | 80 """ |
| 294 if ARGS.name == "ENGRO2" and ARGS.gene_format != "Default": | 176 if ARGS.name == "ENGRO2" and ARGS.gene_format != "Default": |
| 295 | 177 |
| 296 model = utils.convert_genes(model, ARGS.gene_format.replace("HGNC_", "HGNC ")) | 178 model = utils.convert_genes(model, ARGS.gene_format.replace("HGNC_", "HGNC ")) |
| 297 | 179 |
| 298 # generate data | 180 # generate data |
| 299 rules = generate_rules(model, asParsed = False) | 181 rules = utils.generate_rules(model, asParsed = False) |
| 300 reactions = generate_reactions(model, asParsed = False) | 182 reactions = utils.generate_reactions(model, asParsed = False) |
| 301 bounds = generate_bounds(model) | 183 bounds = utils.generate_bounds(model) |
| 302 medium = get_medium(model) | 184 medium = utils.get_medium(model) |
| 303 if ARGS.name == "ENGRO2": | 185 if ARGS.name == "ENGRO2": |
| 304 compartments = generate_compartments(model) | 186 compartments = utils.generate_compartments(model) |
| 305 | 187 |
| 306 df_rules = pd.DataFrame(list(rules.items()), columns = ["ReactionID", "Rule"]) | 188 df_rules = pd.DataFrame(list(rules.items()), columns = ["ReactionID", "Rule"]) |
| 307 df_reactions = pd.DataFrame(list(reactions.items()), columns = ["ReactionID", "Reaction"]) | 189 df_reactions = pd.DataFrame(list(reactions.items()), columns = ["ReactionID", "Reaction"]) |
| 308 | 190 |
| 309 df_bounds = bounds.reset_index().rename(columns = {"index": "ReactionID"}) | 191 df_bounds = bounds.reset_index().rename(columns = {"index": "ReactionID"}) |
| 322 | 204 |
| 323 #out_file = os.path.join(ARGS.output_path, f"{os.path.basename(ARGS.name).split('.')[0]}_custom_data") | 205 #out_file = os.path.join(ARGS.output_path, f"{os.path.basename(ARGS.name).split('.')[0]}_custom_data") |
| 324 | 206 |
| 325 #merged.to_csv(out_file, sep = '\t', index = False) | 207 #merged.to_csv(out_file, sep = '\t', index = False) |
| 326 | 208 |
| 327 | |
| 328 #### | 209 #### |
| 329 | |
| 330 | 210 |
| 331 if not ARGS.out_tabular: | 211 if not ARGS.out_tabular: |
| 332 raise utils.ArgsErr("out_tabular", "output path (--out_tabular) is required when output_format == tabular", ARGS.out_tabular) | 212 raise utils.ArgsErr("out_tabular", "output path (--out_tabular) is required when output_format == tabular", ARGS.out_tabular) |
| 333 save_as_tabular_df(merged, ARGS.out_tabular) | 213 save_as_tabular_df(merged, ARGS.out_tabular) |
| 334 expected = ARGS.out_tabular | 214 expected = ARGS.out_tabular |
