| 539 | 1 """ | 
|  | 2 Compute Reaction Propensity Scores (RPS) from metabolite abundances and reaction stoichiometry. | 
|  | 3 | 
|  | 4 Given a tabular dataset (metabolites x samples) and a reaction set, this script | 
|  | 5 maps metabolite names via synonyms, fills missing metabolites, and computes RPS | 
|  | 6 per reaction for each sample using a log-normalized formula. | 
|  | 7 """ | 
|  | 8 import math | 
|  | 9 import argparse | 
|  | 10 | 
|  | 11 import numpy  as np | 
|  | 12 import pickle as pk | 
|  | 13 import pandas as pd | 
|  | 14 | 
|  | 15 from typing import Optional, List, Dict | 
|  | 16 | 
|  | 17 import utils.general_utils as utils | 
|  | 18 import utils.reaction_parsing as reactionUtils | 
|  | 19 | 
|  | 20 ########################## argparse ########################################## | 
|  | 21 ARGS :argparse.Namespace | 
|  | 22 def process_args(args:List[str] = None) -> argparse.Namespace: | 
|  | 23     """ | 
|  | 24     Processes command-line arguments. | 
|  | 25 | 
|  | 26     Args: | 
|  | 27         args (list): List of command-line arguments. | 
|  | 28 | 
|  | 29     Returns: | 
|  | 30         Namespace: An object containing parsed arguments. | 
|  | 31     """ | 
|  | 32     parser = argparse.ArgumentParser( | 
|  | 33         usage='%(prog)s [options]', | 
|  | 34         description='Process abundances and reactions to create RPS scores.' | 
|  | 35     ) | 
|  | 36 | 
|  | 37     parser.add_argument("-rl", "--model_upload", type = str, | 
|  | 38         help = "path to input file containing the reactions") | 
|  | 39 | 
|  | 40     parser.add_argument('-td', '--tool_dir', | 
|  | 41                         type = str, | 
|  | 42                         required = True, | 
|  | 43                         help = 'your tool directory') | 
|  | 44     parser.add_argument('-ol', '--out_log', | 
|  | 45                         help = "Output log") | 
|  | 46     parser.add_argument('-id', '--input', | 
|  | 47                         type = str, | 
|  | 48                         required = True, | 
|  | 49                         help = 'input dataset') | 
|  | 50     parser.add_argument('-rp', '--rps_output', | 
|  | 51                         type = str, | 
|  | 52                         required = True, | 
|  | 53                         help = 'rps output') | 
|  | 54 | 
|  | 55     args = parser.parse_args(args) | 
|  | 56     return args | 
|  | 57 | 
|  | 58 ############################ dataset name ##################################### | 
|  | 59 def name_dataset(name_data :str, count :int) -> str: | 
|  | 60     """ | 
|  | 61     Produces a unique name for a dataset based on what was provided by the user. The default name for any dataset is "Dataset", thus if the user didn't change it this function appends f"_{count}" to make it unique. | 
|  | 62 | 
|  | 63     Args: | 
|  | 64     name_data: Name associated with the dataset (from frontend input params). | 
|  | 65     count: Counter starting at 1 to make names unique when default. | 
|  | 66 | 
|  | 67     Returns: | 
|  | 68         str : the name made unique | 
|  | 69     """ | 
|  | 70     if str(name_data) == 'Dataset': | 
|  | 71         return str(name_data) + '_' + str(count) | 
|  | 72     else: | 
|  | 73         return str(name_data) | 
|  | 74 | 
|  | 75 ############################ get_abund_data #################################### | 
|  | 76 def get_abund_data(dataset: pd.DataFrame, cell_line_index:int) -> Optional[pd.Series]: | 
|  | 77     """ | 
|  | 78     Extracts abundance data and turns it into a series for a specific cell line from the dataset, which rows are | 
|  | 79     metabolites and columns are cell lines. | 
|  | 80 | 
|  | 81     Args: | 
|  | 82         dataset (pandas.DataFrame): The DataFrame containing abundance data for all cell lines and metabolites. | 
|  | 83         cell_line_index (int): The index of the cell line of interest in the dataset. | 
|  | 84 | 
|  | 85     Returns: | 
|  | 86         pd.Series or None: A series containing abundance values for the specified cell line. | 
|  | 87                            The name of the series is the name of the cell line. | 
|  | 88                            Returns None if the cell index is invalid. | 
|  | 89     """ | 
|  | 90     if cell_line_index < 0 or cell_line_index >= len(dataset.index): | 
|  | 91         print(f"Error: cell line index '{cell_line_index}' is not valid.") | 
|  | 92         return None | 
|  | 93 | 
|  | 94     cell_line_name = dataset.columns[cell_line_index] | 
|  | 95     abundances_series = dataset[cell_line_name][1:] | 
|  | 96 | 
|  | 97     return abundances_series | 
|  | 98 | 
|  | 99 ############################ clean_metabolite_name #################################### | 
|  | 100 def clean_metabolite_name(name :str) -> str: | 
|  | 101     """ | 
|  | 102     Removes some characters from a metabolite's name, provided as input, and makes it lowercase in order to simplify | 
|  | 103     the search of a match in the dictionary of synonyms. | 
|  | 104 | 
|  | 105     Args: | 
|  | 106         name : the metabolite's name, as given in the dataset. | 
|  | 107 | 
|  | 108     Returns: | 
|  | 109         str : a new string with the cleaned name. | 
|  | 110     """ | 
|  | 111     return "".join(ch for ch in name if ch not in ",;-_'([{ }])").lower() | 
|  | 112 | 
|  | 113 ############################ get_metabolite_id #################################### | 
|  | 114 def get_metabolite_id(name :str, syn_dict :Dict[str, List[str]]) -> str: | 
|  | 115     """ | 
|  | 116     Looks through a dictionary of synonyms to find a match for a given metabolite's name. | 
|  | 117 | 
|  | 118     Args: | 
|  | 119         name : the metabolite's name, as given in the dataset. | 
|  | 120         syn_dict : the dictionary of synonyms, using unique identifiers as keys and lists of clean synonyms as values. | 
|  | 121 | 
|  | 122     Returns: | 
|  | 123         str : the internal :str unique identifier of that metabolite, used in all other parts of the model in use. | 
|  | 124         An empty string is returned if a match isn't found. | 
|  | 125     """ | 
|  | 126     name = clean_metabolite_name(name) | 
|  | 127     for id, synonyms in syn_dict.items(): | 
|  | 128         if name in synonyms: | 
|  | 129             return id | 
|  | 130 | 
|  | 131     return "" | 
|  | 132 | 
|  | 133 ############################ check_missing_metab #################################### | 
|  | 134 def check_missing_metab(reactions: Dict[str, Dict[str, int]], dataset_by_rows: Dict[str, List[float]], cell_lines_amt :int) -> List[str]: | 
|  | 135     """ | 
|  | 136     Check for missing metabolites in the abundances dictionary compared to the reactions dictionary and update abundances accordingly. | 
|  | 137 | 
|  | 138     Parameters: | 
|  | 139         reactions (dict): A dictionary representing reactions where keys are reaction names and values are dictionaries containing metabolite names as keys and | 
|  | 140                           stoichiometric coefficients as values. | 
|  | 141         dataset_by_rows (dict): A dictionary representing abundances where keys are metabolite names and values are their corresponding abundances for all cell lines. | 
|  | 142         cell_lines_amt : amount of cell lines, needed to add a new list of abundances for missing metabolites. | 
|  | 143 | 
|  | 144     Returns: | 
|  | 145         list[str] : list of metabolite names that were missing in the original abundances dictionary and thus their aboundances were set to 1. | 
|  | 146 | 
|  | 147     Side effects: | 
|  | 148         dataset_by_rows: mutated to include missing metabolites with default abundances. | 
|  | 149     """ | 
|  | 150     missing_list = [] | 
|  | 151     for reaction in reactions.values(): | 
|  | 152         for metabolite in reaction.keys(): | 
|  | 153           if metabolite not in dataset_by_rows: | 
|  | 154             dataset_by_rows[metabolite] = [1] * cell_lines_amt | 
|  | 155             missing_list.append(metabolite) | 
|  | 156 | 
|  | 157     return missing_list | 
|  | 158 | 
|  | 159 ############################ calculate_rps #################################### | 
|  | 160 def calculate_rps(reactions: Dict[str, Dict[str, int]], abundances: Dict[str, float], black_list: List[str], missing_list: List[str], substrateFreqTable: Dict[str, int]) -> Dict[str, float]: | 
|  | 161     """ | 
|  | 162     Calculate the Reaction Propensity scores (RPS) based on the availability of reaction substrates, for (ideally) each input model reaction and for each sample. | 
|  | 163     The score is computed as the product of the concentrations of the reacting substances, with each concentration raised to a power equal to its stoichiometric coefficient | 
|  | 164     for each reaction using the provided coefficient and abundance values. The value is then normalized, based on how frequent the metabolite is in the selected model's reactions, | 
|  | 165     and log-transformed. | 
|  | 166 | 
|  | 167     Parameters: | 
|  | 168         reactions (dict): A dictionary representing reactions where keys are reaction names and values are dictionaries containing metabolite names as keys and stoichiometric coefficients as values. | 
|  | 169         abundances (dict): A dictionary representing metabolite abundances where keys are metabolite names and values are their corresponding abundances. | 
|  | 170         black_list (list): A list containing metabolite names that should be excluded from the RPS calculation. | 
|  | 171         missing_list (list): A list containing metabolite names that were missing in the original abundances dictionary and thus their values were set to 1. | 
|  | 172         substrateFreqTable (dict): A dictionary where each metabolite name (key) is associated with how many times it shows up in the model's reactions (value). | 
|  | 173 | 
|  | 174     Returns: | 
|  | 175         dict: A dictionary containing Reaction Propensity Scores (RPS) where keys are reaction names and values are the corresponding RPS scores. | 
|  | 176     """ | 
|  | 177     rps_scores = {} | 
|  | 178 | 
|  | 179     for reaction_name, substrates in reactions.items(): | 
|  | 180         total_contribution = 0 | 
|  | 181         metab_significant  = False | 
|  | 182         for metabolite, stoichiometry in substrates.items(): | 
|  | 183             abundance = 1 if math.isnan(abundances[metabolite]) else abundances[metabolite] | 
|  | 184             if metabolite not in black_list and metabolite not in missing_list: | 
|  | 185                 metab_significant = True | 
|  | 186 | 
|  | 187             total_contribution += math.log((abundance + np.finfo(float).eps) / substrateFreqTable[metabolite]) * stoichiometry | 
|  | 188 | 
|  | 189         rps_scores[reaction_name] = total_contribution if metab_significant else math.nan | 
|  | 190 | 
|  | 191     return rps_scores | 
|  | 192 | 
|  | 193 ############################ rps_for_cell_lines #################################### | 
|  | 194 def rps_for_cell_lines(dataset: List[List[str]], reactions: Dict[str, Dict[str, int]], black_list: List[str], syn_dict: Dict[str, List[str]], substrateFreqTable: Dict[str, int]) -> None: | 
|  | 195     """ | 
|  | 196     Calculate Reaction Propensity Scores (RPS) for each cell line represented in the dataframe and creates an output file. | 
|  | 197 | 
|  | 198     Parameters: | 
|  | 199         dataset : the dataset's data, by rows | 
|  | 200         reactions (dict): A dictionary representing reactions where keys are reaction names and values are dictionaries containing metabolite names as keys and stoichiometric coefficients as values. | 
|  | 201         black_list (list): A list containing metabolite names that should be excluded from the RPS calculation. | 
|  | 202         syn_dict (dict): A dictionary where keys are general metabolite names and values are lists of possible synonyms. | 
|  | 203         substrateFreqTable (dict): A dictionary where each metabolite name (key) is associated with how many times it shows up in the model's reactions (value). | 
|  | 204 | 
|  | 205     Returns: | 
|  | 206         None | 
|  | 207     """ | 
|  | 208 | 
|  | 209     cell_lines = dataset[0][1:] | 
|  | 210     abundances_dict = {} | 
|  | 211 | 
|  | 212     for row in dataset[1:]: | 
|  | 213         id = get_metabolite_id(row[0], syn_dict) | 
|  | 214         if id: | 
|  | 215             abundances_dict[id] = list(map(utils.Float(), row[1:])) | 
|  | 216 | 
|  | 217     missing_list = check_missing_metab(reactions, abundances_dict, len((cell_lines))) | 
|  | 218 | 
|  | 219     rps_scores :Dict[Dict[str, float]] = {} | 
|  | 220     for pos, cell_line_name in enumerate(cell_lines): | 
|  | 221         abundances = { metab : abundances[pos] for metab, abundances in abundances_dict.items() } | 
|  | 222 | 
|  | 223         rps_scores[cell_line_name] = calculate_rps(reactions, abundances, black_list, missing_list, substrateFreqTable) | 
|  | 224 | 
|  | 225     df = pd.DataFrame.from_dict(rps_scores) | 
|  | 226     df = df.loc[list(reactions.keys()),:] | 
|  | 227     # Optional preview: first 10 rows | 
|  | 228     # print(df.head(10)) | 
|  | 229     df.index.name = 'Reactions' | 
|  | 230     df.to_csv(ARGS.rps_output, sep='\t', na_rep='None', index=True) | 
|  | 231 | 
|  | 232 ############################ main #################################### | 
|  | 233 def main(args:List[str] = None) -> None: | 
|  | 234     """ | 
|  | 235     Initializes everything and sets the program in motion based on the fronted input arguments. | 
|  | 236 | 
|  | 237     Returns: | 
|  | 238         None | 
|  | 239     """ | 
|  | 240     global ARGS | 
|  | 241     ARGS = process_args(args) | 
|  | 242 | 
|  | 243     # Load support data (black list and synonyms) | 
|  | 244     with open(ARGS.tool_dir + '/local/pickle files/black_list.pickle', 'rb') as bl: | 
|  | 245         black_list = pk.load(bl) | 
|  | 246 | 
|  | 247     with open(ARGS.tool_dir + '/local/pickle files/synonyms.pickle', 'rb') as sd: | 
|  | 248         syn_dict = pk.load(sd) | 
|  | 249 | 
|  | 250     dataset = utils.readCsv(utils.FilePath.fromStrPath(ARGS.input), '\t', skipHeader=False) | 
|  | 251     tmp_dict = None | 
|  | 252 | 
|  | 253     # Parse custom reactions from uploaded file | 
|  | 254     reactions = reactionUtils.parse_custom_reactions(ARGS.model_upload) | 
|  | 255     for r, s in reactions.items(): | 
|  | 256         tmp_list = list(s.keys()) | 
|  | 257         for k in tmp_list: | 
|  | 258             if k[-2] == '_': | 
|  | 259                 s[k[:-2]] = s.pop(k) | 
|  | 260     substrateFreqTable = {} | 
|  | 261     for _, substrates in reactions.items(): | 
|  | 262         for substrateName, _ in substrates.items(): | 
|  | 263             if substrateName not in substrateFreqTable: substrateFreqTable[substrateName] = 0 | 
|  | 264             substrateFreqTable[substrateName] += 1 | 
|  | 265 | 
|  | 266     # Debug prints (can be enabled during troubleshooting) | 
|  | 267     # print(f"Reactions: {reactions}") | 
|  | 268     # print(f"Substrate Frequencies: {substrateFreqTable}") | 
|  | 269     # print(f"Synonyms: {syn_dict}") | 
|  | 270         tmp_dict = {} | 
|  | 271         for metabName, freq in substrateFreqTable.items(): | 
|  | 272             tmp_metabName = clean_metabolite_name(metabName) | 
|  | 273             for syn_key, syn_list in syn_dict.items(): | 
|  | 274                 if tmp_metabName in syn_list or tmp_metabName == clean_metabolite_name(syn_key): | 
|  | 275                     # print(f"Mapping {tmp_metabName} to {syn_key}") | 
|  | 276                     tmp_dict[syn_key] = syn_list | 
|  | 277                     tmp_dict[syn_key].append(tmp_metabName) | 
|  | 278 | 
|  | 279     rps_for_cell_lines(dataset, reactions, black_list, syn_dict, substrateFreqTable) | 
|  | 280     print('Execution succeeded') | 
|  | 281 | 
|  | 282 ############################################################################## | 
|  | 283 if __name__ == "__main__": main() |