Mercurial > repos > bimib > cobraxy
comparison COBRAxy/utils/general_utils.py @ 414:5086145cfb96 draft
Uploaded
author | francesco_lapi |
---|---|
date | Mon, 08 Sep 2025 21:54:14 +0000 |
parents | 7a3ccf066b2c |
children | 4a248b45273c |
comparison
equal
deleted
inserted
replaced
413:7a3ccf066b2c | 414:5086145cfb96 |
---|---|
15 | 15 |
16 import zipfile | 16 import zipfile |
17 import gzip | 17 import gzip |
18 import bz2 | 18 import bz2 |
19 from io import StringIO | 19 from io import StringIO |
20 import os | |
21 sys.path.insert(0, os.path.dirname(__file__)) | |
22 import rule_parsing as rulesUtils | |
23 import reaction_parsing as reactionUtils | |
24 | |
25 | 20 |
26 | 21 |
27 | 22 |
28 class ValueErr(Exception): | 23 class ValueErr(Exception): |
29 def __init__(self, param_name, expected, actual): | 24 def __init__(self, param_name, expected, actual): |
778 | 773 |
779 print(f"Aggiunti {len(metabolites_dict)} metaboliti e {len(compartments_dict)} compartimenti") | 774 print(f"Aggiunti {len(metabolites_dict)} metaboliti e {len(compartments_dict)} compartimenti") |
780 | 775 |
781 # Seconda passata: aggiungi le reazioni | 776 # Seconda passata: aggiungi le reazioni |
782 reactions_added = 0 | 777 reactions_added = 0 |
778 reactions_skipped = 0 | |
783 | 779 |
784 for idx, row in df.iterrows(): | 780 for idx, row in df.iterrows(): |
785 reaction_id = str(row['ReactionID']).strip() | |
786 reaction_formula = str(row['Reaction']).strip() | |
787 | |
788 # Salta reazioni senza formula | |
789 if not reaction_formula or reaction_formula == 'nan': | |
790 raise ValueError(f"Formula della reazione mancante {reaction_id}") | |
791 | |
792 # Crea la reazione | |
793 reaction = Reaction(reaction_id) | |
794 reaction.name = reaction_id | |
795 | |
796 # Imposta bounds | |
797 reaction.lower_bound = float(row['lower_bound']) if pd.notna(row['lower_bound']) else -1000.0 | |
798 reaction.upper_bound = float(row['upper_bound']) if pd.notna(row['upper_bound']) else 1000.0 | |
799 | |
800 # Aggiungi gene rule se presente | |
801 if pd.notna(row['Rule']) and str(row['Rule']).strip(): | |
802 reaction.gene_reaction_rule = str(row['Rule']).strip() | |
803 | |
804 # Parse della formula della reazione | |
805 try: | 781 try: |
806 parse_reaction_formula(reaction, reaction_formula, metabolites_dict) | 782 reaction_id = str(row['ReactionID']).strip() |
807 except Exception as e: | 783 reaction_formula = str(row['Reaction']).strip() |
808 print(f"Errore nel parsing della reazione {reaction_id}: {e}") | 784 |
809 reactions_skipped += 1 | 785 # Salta reazioni senza formula |
810 continue | 786 if not reaction_formula or reaction_formula == 'nan': |
811 | 787 raise ValueError(f"Formula della reazione mancante {reaction_id}") |
812 # Aggiungi la reazione al modello | 788 |
813 model.add_reactions([reaction]) | 789 # Crea la reazione |
814 reactions_added += 1 | 790 reaction = Reaction(reaction_id) |
791 reaction.name = reaction_id | |
792 | |
793 # Imposta bounds | |
794 reaction.lower_bound = float(row['lower_bound']) if pd.notna(row['lower_bound']) else -1000.0 | |
795 reaction.upper_bound = float(row['upper_bound']) if pd.notna(row['upper_bound']) else 1000.0 | |
796 | |
797 # Aggiungi gene rule se presente | |
798 if pd.notna(row['Rule']) and str(row['Rule']).strip(): | |
799 reaction.gene_reaction_rule = str(row['Rule']).strip() | |
800 | |
801 # Parse della formula della reazione | |
802 try: | |
803 parse_reaction_formula(reaction, reaction_formula, metabolites_dict) | |
804 except Exception as e: | |
805 print(f"Errore nel parsing della reazione {reaction_id}: {e}") | |
806 reactions_skipped += 1 | |
807 continue | |
808 | |
809 # Aggiungi la reazione al modello | |
810 model.add_reactions([reaction]) | |
811 reactions_added += 1 | |
815 | 812 |
816 | 813 |
817 print(f"Aggiunte {reactions_added} reazioni, saltate {reactions_skipped} reazioni") | 814 print(f"Aggiunte {reactions_added} reazioni, saltate {reactions_skipped} reazioni") |
818 | 815 |
819 # Imposta l'obiettivo di biomassa | 816 # Imposta l'obiettivo di biomassa |
977 except Exception as e: | 974 except Exception as e: |
978 validation['growth_rate'] = None | 975 validation['growth_rate'] = None |
979 validation['status'] = f"Error: {e}" | 976 validation['status'] = f"Error: {e}" |
980 | 977 |
981 return validation | 978 return validation |
982 | |
983 | |
984 ################################- DATA GENERATION -################################ | |
985 ReactionId = str | |
986 def generate_rules(model: cobra.Model, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]: | |
987 """ | |
988 Generates a dictionary mapping reaction ids to rules from the model. | |
989 | |
990 Args: | |
991 model : the model to derive data from. | |
992 asParsed : if True parses the rules to an optimized runtime format, otherwise leaves them as strings. | |
993 | |
994 Returns: | |
995 Dict[ReactionId, rulesUtils.OpList] : the generated dictionary of parsed rules. | |
996 Dict[ReactionId, str] : the generated dictionary of raw rules. | |
997 """ | |
998 # Is the below approach convoluted? yes | |
999 # Ok but is it inefficient? probably | |
1000 # Ok but at least I don't have to repeat the check at every rule (I'm clinically insane) | |
1001 _ruleGetter = lambda reaction : reaction.gene_reaction_rule | |
1002 ruleExtractor = (lambda reaction : | |
1003 rulesUtils.parseRuleToNestedList(_ruleGetter(reaction))) if asParsed else _ruleGetter | |
1004 | |
1005 return { | |
1006 reaction.id : ruleExtractor(reaction) | |
1007 for reaction in model.reactions | |
1008 if reaction.gene_reaction_rule } | |
1009 | |
1010 def generate_reactions(model :cobra.Model, *, asParsed = True) -> Dict[ReactionId, str]: | |
1011 """ | |
1012 Generates a dictionary mapping reaction ids to reaction formulas from the model. | |
1013 | |
1014 Args: | |
1015 model : the model to derive data from. | |
1016 asParsed : if True parses the reactions to an optimized runtime format, otherwise leaves them as they are. | |
1017 | |
1018 Returns: | |
1019 Dict[ReactionId, str] : the generated dictionary. | |
1020 """ | |
1021 | |
1022 unparsedReactions = { | |
1023 reaction.id : reaction.reaction | |
1024 for reaction in model.reactions | |
1025 if reaction.reaction | |
1026 } | |
1027 | |
1028 if not asParsed: return unparsedReactions | |
1029 | |
1030 return reactionUtils.create_reaction_dict(unparsedReactions) | |
1031 | |
1032 def get_medium(model:cobra.Model) -> pd.DataFrame: | |
1033 trueMedium=[] | |
1034 for r in model.reactions: | |
1035 positiveCoeff=0 | |
1036 for m in r.metabolites: | |
1037 if r.get_coefficient(m.id)>0: | |
1038 positiveCoeff=1; | |
1039 if (positiveCoeff==0 and r.lower_bound<0): | |
1040 trueMedium.append(r.id) | |
1041 | |
1042 df_medium = pd.DataFrame() | |
1043 df_medium["reaction"] = trueMedium | |
1044 return df_medium | |
1045 | |
1046 def generate_bounds(model:cobra.Model) -> pd.DataFrame: | |
1047 | |
1048 rxns = [] | |
1049 for reaction in model.reactions: | |
1050 rxns.append(reaction.id) | |
1051 | |
1052 bounds = pd.DataFrame(columns = ["lower_bound", "upper_bound"], index=rxns) | |
1053 | |
1054 for reaction in model.reactions: | |
1055 bounds.loc[reaction.id] = [reaction.lower_bound, reaction.upper_bound] | |
1056 return bounds | |
1057 | |
1058 | |
1059 | |
1060 def generate_compartments(model: cobra.Model) -> pd.DataFrame: | |
1061 """ | |
1062 Generates a DataFrame containing compartment information for each reaction. | |
1063 Creates columns for each compartment position (Compartment_1, Compartment_2, etc.) | |
1064 | |
1065 Args: | |
1066 model: the COBRA model to extract compartment data from. | |
1067 | |
1068 Returns: | |
1069 pd.DataFrame: DataFrame with ReactionID and compartment columns | |
1070 """ | |
1071 pathway_data = [] | |
1072 | |
1073 # First pass: determine the maximum number of pathways any reaction has | |
1074 max_pathways = 0 | |
1075 reaction_pathways = {} | |
1076 | |
1077 for reaction in model.reactions: | |
1078 # Get unique pathways from all metabolites in the reaction | |
1079 if type(reaction.annotation['pathways']) == list: | |
1080 reaction_pathways[reaction.id] = reaction.annotation['pathways'] | |
1081 max_pathways = max(max_pathways, len(reaction.annotation['pathways'])) | |
1082 else: | |
1083 reaction_pathways[reaction.id] = [reaction.annotation['pathways']] | |
1084 | |
1085 # Create column names for pathways | |
1086 pathway_columns = [f"Pathway_{i+1}" for i in range(max_pathways)] | |
1087 | |
1088 # Second pass: create the data | |
1089 for reaction_id, pathways in reaction_pathways.items(): | |
1090 row = {"ReactionID": reaction_id} | |
1091 | |
1092 # Fill pathway columns | |
1093 for i in range(max_pathways): | |
1094 col_name = pathway_columns[i] | |
1095 if i < len(pathways): | |
1096 row[col_name] = pathways[i] | |
1097 else: | |
1098 row[col_name] = None # or "" if you prefer empty strings | |
1099 | |
1100 pathway_data.append(row) | |
1101 | |
1102 return pd.DataFrame(pathway_data) |