| 
539
 | 
     1 """
 | 
| 
 | 
     2 Compute Reaction Propensity Scores (RPS) from metabolite abundances and reaction stoichiometry.
 | 
| 
 | 
     3 
 | 
| 
 | 
     4 Given a tabular dataset (metabolites x samples) and a reaction set, this script
 | 
| 
 | 
     5 maps metabolite names via synonyms, fills missing metabolites, and computes RPS
 | 
| 
 | 
     6 per reaction for each sample using a log-normalized formula.
 | 
| 
 | 
     7 """
 | 
| 
 | 
     8 import math
 | 
| 
 | 
     9 import argparse
 | 
| 
543
 | 
    10 import os
 | 
| 
539
 | 
    11 import numpy  as np
 | 
| 
 | 
    12 import pickle as pk
 | 
| 
 | 
    13 import pandas as pd
 | 
| 
 | 
    14 
 | 
| 
 | 
    15 from typing import Optional, List, Dict
 | 
| 
 | 
    16 
 | 
| 
542
 | 
    17 try:
 | 
| 
 | 
    18     from .utils import general_utils as utils
 | 
| 
 | 
    19     from .utils import reaction_parsing as reactionUtils
 | 
| 
 | 
    20 except:
 | 
| 
 | 
    21     import utils.general_utils as utils
 | 
| 
 | 
    22     import utils.reaction_parsing as reactionUtils
 | 
| 
539
 | 
    23 
 | 
| 
 | 
    24 ########################## argparse ##########################################
 | 
| 
 | 
    25 ARGS :argparse.Namespace
 | 
| 
 | 
    26 def process_args(args:List[str] = None) -> argparse.Namespace:
 | 
| 
 | 
    27     """
 | 
| 
 | 
    28     Processes command-line arguments.
 | 
| 
 | 
    29 
 | 
| 
 | 
    30     Args:
 | 
| 
 | 
    31         args (list): List of command-line arguments.
 | 
| 
 | 
    32 
 | 
| 
 | 
    33     Returns:
 | 
| 
 | 
    34         Namespace: An object containing parsed arguments.
 | 
| 
 | 
    35     """
 | 
| 
 | 
    36     parser = argparse.ArgumentParser(
 | 
| 
 | 
    37         usage='%(prog)s [options]',
 | 
| 
 | 
    38         description='Process abundances and reactions to create RPS scores.'
 | 
| 
 | 
    39     )
 | 
| 
 | 
    40     
 | 
| 
 | 
    41     parser.add_argument("-rl", "--model_upload", type = str,
 | 
| 
 | 
    42         help = "path to input file containing the reactions")
 | 
| 
 | 
    43 
 | 
| 
 | 
    44     parser.add_argument('-td', '--tool_dir',
 | 
| 
542
 | 
    45         type = str,
 | 
| 
 | 
    46         default = os.path.dirname(os.path.abspath(__file__)),
 | 
| 
 | 
    47         help = 'your tool directory (default: auto-detected package location)')
 | 
| 
 | 
    48     
 | 
| 
539
 | 
    49     parser.add_argument('-ol', '--out_log', 
 | 
| 
 | 
    50                         help = "Output log")    
 | 
| 
 | 
    51     parser.add_argument('-id', '--input',
 | 
| 
 | 
    52                         type = str,
 | 
| 
 | 
    53                         required = True,
 | 
| 
 | 
    54                         help = 'input dataset')
 | 
| 
 | 
    55     parser.add_argument('-rp', '--rps_output',
 | 
| 
 | 
    56                         type = str,
 | 
| 
 | 
    57                         required = True,
 | 
| 
 | 
    58                         help = 'rps output')
 | 
| 
 | 
    59     
 | 
| 
 | 
    60     args = parser.parse_args(args)
 | 
| 
 | 
    61     return args
 | 
| 
 | 
    62 
 | 
| 
 | 
    63 ############################ dataset name #####################################
 | 
| 
 | 
    64 def name_dataset(name_data :str, count :int) -> str:
 | 
| 
 | 
    65     """
 | 
| 
 | 
    66     Produces a unique name for a dataset based on what was provided by the user. The default name for any dataset is "Dataset", thus if the user didn't change it this function appends f"_{count}" to make it unique.
 | 
| 
 | 
    67 
 | 
| 
 | 
    68     Args:
 | 
| 
 | 
    69     name_data: Name associated with the dataset (from frontend input params).
 | 
| 
 | 
    70     count: Counter starting at 1 to make names unique when default.
 | 
| 
 | 
    71 
 | 
| 
 | 
    72     Returns:
 | 
| 
 | 
    73         str : the name made unique
 | 
| 
 | 
    74     """
 | 
| 
 | 
    75     if str(name_data) == 'Dataset':
 | 
| 
 | 
    76         return str(name_data) + '_' + str(count)
 | 
| 
 | 
    77     else:
 | 
| 
 | 
    78         return str(name_data)
 | 
| 
 | 
    79 
 | 
| 
 | 
    80 ############################ get_abund_data ####################################
 | 
| 
 | 
    81 def get_abund_data(dataset: pd.DataFrame, cell_line_index:int) -> Optional[pd.Series]:
 | 
| 
 | 
    82     """
 | 
| 
 | 
    83     Extracts abundance data and turns it into a series for a specific cell line from the dataset, which rows are
 | 
| 
 | 
    84     metabolites and columns are cell lines.
 | 
| 
 | 
    85 
 | 
| 
 | 
    86     Args:
 | 
| 
 | 
    87         dataset (pandas.DataFrame): The DataFrame containing abundance data for all cell lines and metabolites.
 | 
| 
 | 
    88         cell_line_index (int): The index of the cell line of interest in the dataset.
 | 
| 
 | 
    89 
 | 
| 
 | 
    90     Returns:
 | 
| 
 | 
    91         pd.Series or None: A series containing abundance values for the specified cell line.
 | 
| 
 | 
    92                            The name of the series is the name of the cell line.
 | 
| 
 | 
    93                            Returns None if the cell index is invalid.
 | 
| 
 | 
    94     """
 | 
| 
 | 
    95     if cell_line_index < 0 or cell_line_index >= len(dataset.index):
 | 
| 
 | 
    96         print(f"Error: cell line index '{cell_line_index}' is not valid.")
 | 
| 
 | 
    97         return None
 | 
| 
 | 
    98 
 | 
| 
 | 
    99     cell_line_name = dataset.columns[cell_line_index]
 | 
| 
 | 
   100     abundances_series = dataset[cell_line_name][1:]
 | 
| 
 | 
   101 
 | 
| 
 | 
   102     return abundances_series
 | 
| 
 | 
   103 
 | 
| 
 | 
   104 ############################ clean_metabolite_name ####################################
 | 
| 
 | 
   105 def clean_metabolite_name(name :str) -> str:
 | 
| 
 | 
   106     """
 | 
| 
 | 
   107     Removes some characters from a metabolite's name, provided as input, and makes it lowercase in order to simplify
 | 
| 
 | 
   108     the search of a match in the dictionary of synonyms.
 | 
| 
 | 
   109 
 | 
| 
 | 
   110     Args:
 | 
| 
 | 
   111         name : the metabolite's name, as given in the dataset.
 | 
| 
 | 
   112     
 | 
| 
 | 
   113     Returns:
 | 
| 
 | 
   114         str : a new string with the cleaned name.
 | 
| 
 | 
   115     """
 | 
| 
 | 
   116     return "".join(ch for ch in name if ch not in ",;-_'([{ }])").lower()
 | 
| 
 | 
   117 
 | 
| 
 | 
   118 ############################ get_metabolite_id ####################################
 | 
| 
 | 
   119 def get_metabolite_id(name :str, syn_dict :Dict[str, List[str]]) -> str:
 | 
| 
 | 
   120     """
 | 
| 
 | 
   121     Looks through a dictionary of synonyms to find a match for a given metabolite's name.
 | 
| 
 | 
   122 
 | 
| 
 | 
   123     Args:
 | 
| 
 | 
   124         name : the metabolite's name, as given in the dataset.
 | 
| 
 | 
   125         syn_dict : the dictionary of synonyms, using unique identifiers as keys and lists of clean synonyms as values.
 | 
| 
 | 
   126     
 | 
| 
 | 
   127     Returns:
 | 
| 
 | 
   128         str : the internal :str unique identifier of that metabolite, used in all other parts of the model in use.
 | 
| 
 | 
   129         An empty string is returned if a match isn't found.
 | 
| 
 | 
   130     """
 | 
| 
 | 
   131     name = clean_metabolite_name(name)
 | 
| 
 | 
   132     for id, synonyms in syn_dict.items():
 | 
| 
 | 
   133         if name in synonyms:
 | 
| 
 | 
   134             return id
 | 
| 
 | 
   135     
 | 
| 
 | 
   136     return ""
 | 
| 
 | 
   137 
 | 
| 
 | 
   138 ############################ check_missing_metab ####################################
 | 
| 
 | 
   139 def check_missing_metab(reactions: Dict[str, Dict[str, int]], dataset_by_rows: Dict[str, List[float]], cell_lines_amt :int) -> List[str]:
 | 
| 
 | 
   140     """
 | 
| 
 | 
   141     Check for missing metabolites in the abundances dictionary compared to the reactions dictionary and update abundances accordingly.
 | 
| 
 | 
   142 
 | 
| 
 | 
   143     Parameters:
 | 
| 
 | 
   144         reactions (dict): A dictionary representing reactions where keys are reaction names and values are dictionaries containing metabolite names as keys and 
 | 
| 
 | 
   145                           stoichiometric coefficients as values.
 | 
| 
 | 
   146         dataset_by_rows (dict): A dictionary representing abundances where keys are metabolite names and values are their corresponding abundances for all cell lines.
 | 
| 
 | 
   147         cell_lines_amt : amount of cell lines, needed to add a new list of abundances for missing metabolites.
 | 
| 
 | 
   148 
 | 
| 
 | 
   149     Returns:
 | 
| 
 | 
   150         list[str] : list of metabolite names that were missing in the original abundances dictionary and thus their aboundances were set to 1.
 | 
| 
 | 
   151 
 | 
| 
 | 
   152     Side effects:
 | 
| 
 | 
   153         dataset_by_rows: mutated to include missing metabolites with default abundances.
 | 
| 
 | 
   154     """
 | 
| 
 | 
   155     missing_list = []
 | 
| 
 | 
   156     for reaction in reactions.values():
 | 
| 
 | 
   157         for metabolite in reaction.keys():
 | 
| 
 | 
   158           if metabolite not in dataset_by_rows:
 | 
| 
 | 
   159             dataset_by_rows[metabolite] = [1] * cell_lines_amt
 | 
| 
 | 
   160             missing_list.append(metabolite)
 | 
| 
 | 
   161 
 | 
| 
 | 
   162     return missing_list
 | 
| 
 | 
   163 
 | 
| 
 | 
   164 ############################ calculate_rps ####################################
 | 
| 
 | 
   165 def calculate_rps(reactions: Dict[str, Dict[str, int]], abundances: Dict[str, float], black_list: List[str], missing_list: List[str], substrateFreqTable: Dict[str, int]) -> Dict[str, float]:
 | 
| 
 | 
   166     """
 | 
| 
 | 
   167     Calculate the Reaction Propensity scores (RPS) based on the availability of reaction substrates, for (ideally) each input model reaction and for each sample.
 | 
| 
 | 
   168     The score is computed as the product of the concentrations of the reacting substances, with each concentration raised to a power equal to its stoichiometric coefficient
 | 
| 
 | 
   169     for each reaction using the provided coefficient and abundance values. The value is then normalized, based on how frequent the metabolite is in the selected model's reactions,
 | 
| 
 | 
   170     and log-transformed.
 | 
| 
 | 
   171 
 | 
| 
 | 
   172     Parameters:
 | 
| 
 | 
   173         reactions (dict): A dictionary representing reactions where keys are reaction names and values are dictionaries containing metabolite names as keys and stoichiometric coefficients as values.
 | 
| 
 | 
   174         abundances (dict): A dictionary representing metabolite abundances where keys are metabolite names and values are their corresponding abundances.
 | 
| 
 | 
   175         black_list (list): A list containing metabolite names that should be excluded from the RPS calculation.
 | 
| 
 | 
   176         missing_list (list): A list containing metabolite names that were missing in the original abundances dictionary and thus their values were set to 1.
 | 
| 
 | 
   177         substrateFreqTable (dict): A dictionary where each metabolite name (key) is associated with how many times it shows up in the model's reactions (value).
 | 
| 
 | 
   178         
 | 
| 
 | 
   179     Returns:
 | 
| 
 | 
   180         dict: A dictionary containing Reaction Propensity Scores (RPS) where keys are reaction names and values are the corresponding RPS scores.
 | 
| 
 | 
   181     """
 | 
| 
 | 
   182     rps_scores = {}
 | 
| 
 | 
   183 
 | 
| 
 | 
   184     for reaction_name, substrates in reactions.items():
 | 
| 
 | 
   185         total_contribution = 0
 | 
| 
 | 
   186         metab_significant  = False
 | 
| 
 | 
   187         for metabolite, stoichiometry in substrates.items():
 | 
| 
 | 
   188             abundance = 1 if math.isnan(abundances[metabolite]) else abundances[metabolite]
 | 
| 
 | 
   189             if metabolite not in black_list and metabolite not in missing_list:
 | 
| 
 | 
   190                 metab_significant = True
 | 
| 
 | 
   191             
 | 
| 
 | 
   192             total_contribution += math.log((abundance + np.finfo(float).eps) / substrateFreqTable[metabolite]) * stoichiometry
 | 
| 
 | 
   193         
 | 
| 
 | 
   194         rps_scores[reaction_name] = total_contribution if metab_significant else math.nan
 | 
| 
 | 
   195     
 | 
| 
 | 
   196     return rps_scores
 | 
| 
 | 
   197 
 | 
| 
 | 
   198 ############################ rps_for_cell_lines ####################################
 | 
| 
 | 
   199 def rps_for_cell_lines(dataset: List[List[str]], reactions: Dict[str, Dict[str, int]], black_list: List[str], syn_dict: Dict[str, List[str]], substrateFreqTable: Dict[str, int]) -> None:
 | 
| 
 | 
   200     """
 | 
| 
 | 
   201     Calculate Reaction Propensity Scores (RPS) for each cell line represented in the dataframe and creates an output file.
 | 
| 
 | 
   202 
 | 
| 
 | 
   203     Parameters:
 | 
| 
 | 
   204         dataset : the dataset's data, by rows
 | 
| 
 | 
   205         reactions (dict): A dictionary representing reactions where keys are reaction names and values are dictionaries containing metabolite names as keys and stoichiometric coefficients as values.
 | 
| 
 | 
   206         black_list (list): A list containing metabolite names that should be excluded from the RPS calculation.
 | 
| 
 | 
   207         syn_dict (dict): A dictionary where keys are general metabolite names and values are lists of possible synonyms.
 | 
| 
 | 
   208         substrateFreqTable (dict): A dictionary where each metabolite name (key) is associated with how many times it shows up in the model's reactions (value).
 | 
| 
 | 
   209 
 | 
| 
 | 
   210     Returns:
 | 
| 
 | 
   211         None
 | 
| 
 | 
   212     """
 | 
| 
 | 
   213 
 | 
| 
 | 
   214     cell_lines = dataset[0][1:]
 | 
| 
 | 
   215     abundances_dict = {}
 | 
| 
 | 
   216 
 | 
| 
 | 
   217     for row in dataset[1:]:
 | 
| 
 | 
   218         id = get_metabolite_id(row[0], syn_dict)
 | 
| 
 | 
   219         if id:
 | 
| 
 | 
   220             abundances_dict[id] = list(map(utils.Float(), row[1:]))
 | 
| 
 | 
   221 
 | 
| 
 | 
   222     missing_list = check_missing_metab(reactions, abundances_dict, len((cell_lines)))
 | 
| 
 | 
   223 
 | 
| 
 | 
   224     rps_scores :Dict[Dict[str, float]] = {}
 | 
| 
 | 
   225     for pos, cell_line_name in enumerate(cell_lines):
 | 
| 
 | 
   226         abundances = { metab : abundances[pos] for metab, abundances in abundances_dict.items() }
 | 
| 
 | 
   227 
 | 
| 
 | 
   228         rps_scores[cell_line_name] = calculate_rps(reactions, abundances, black_list, missing_list, substrateFreqTable)
 | 
| 
 | 
   229     
 | 
| 
 | 
   230     df = pd.DataFrame.from_dict(rps_scores)
 | 
| 
 | 
   231     df = df.loc[list(reactions.keys()),:]
 | 
| 
 | 
   232     # Optional preview: first 10 rows
 | 
| 
 | 
   233     # print(df.head(10))
 | 
| 
 | 
   234     df.index.name = 'Reactions'
 | 
| 
 | 
   235     df.to_csv(ARGS.rps_output, sep='\t', na_rep='None', index=True)
 | 
| 
 | 
   236 
 | 
| 
 | 
   237 ############################ main ####################################
 | 
| 
 | 
   238 def main(args:List[str] = None) -> None:
 | 
| 
 | 
   239     """
 | 
| 
 | 
   240     Initializes everything and sets the program in motion based on the fronted input arguments.
 | 
| 
 | 
   241 
 | 
| 
 | 
   242     Returns:
 | 
| 
 | 
   243         None
 | 
| 
 | 
   244     """
 | 
| 
 | 
   245     global ARGS
 | 
| 
 | 
   246     ARGS = process_args(args)
 | 
| 
 | 
   247 
 | 
| 
 | 
   248     # Load support data (black list and synonyms)
 | 
| 
 | 
   249     with open(ARGS.tool_dir + '/local/pickle files/black_list.pickle', 'rb') as bl:
 | 
| 
 | 
   250         black_list = pk.load(bl)
 | 
| 
 | 
   251 
 | 
| 
 | 
   252     with open(ARGS.tool_dir + '/local/pickle files/synonyms.pickle', 'rb') as sd:
 | 
| 
 | 
   253         syn_dict = pk.load(sd)
 | 
| 
 | 
   254 
 | 
| 
 | 
   255     dataset = utils.readCsv(utils.FilePath.fromStrPath(ARGS.input), '\t', skipHeader=False)
 | 
| 
 | 
   256     tmp_dict = None
 | 
| 
 | 
   257 
 | 
| 
 | 
   258     # Parse custom reactions from uploaded file
 | 
| 
 | 
   259     reactions = reactionUtils.parse_custom_reactions(ARGS.model_upload)
 | 
| 
 | 
   260     for r, s in reactions.items():
 | 
| 
 | 
   261         tmp_list = list(s.keys())
 | 
| 
 | 
   262         for k in tmp_list:
 | 
| 
 | 
   263             if k[-2] == '_':
 | 
| 
 | 
   264                 s[k[:-2]] = s.pop(k)
 | 
| 
 | 
   265     substrateFreqTable = {}
 | 
| 
 | 
   266     for _, substrates in reactions.items():
 | 
| 
 | 
   267         for substrateName, _ in substrates.items():
 | 
| 
 | 
   268             if substrateName not in substrateFreqTable: substrateFreqTable[substrateName] = 0
 | 
| 
 | 
   269             substrateFreqTable[substrateName] += 1
 | 
| 
 | 
   270 
 | 
| 
 | 
   271     # Debug prints (can be enabled during troubleshooting)
 | 
| 
 | 
   272     # print(f"Reactions: {reactions}")
 | 
| 
 | 
   273     # print(f"Substrate Frequencies: {substrateFreqTable}")
 | 
| 
 | 
   274     # print(f"Synonyms: {syn_dict}")
 | 
| 
 | 
   275         tmp_dict = {}
 | 
| 
 | 
   276         for metabName, freq in substrateFreqTable.items():
 | 
| 
 | 
   277             tmp_metabName = clean_metabolite_name(metabName)
 | 
| 
 | 
   278             for syn_key, syn_list in syn_dict.items():
 | 
| 
 | 
   279                 if tmp_metabName in syn_list or tmp_metabName == clean_metabolite_name(syn_key):
 | 
| 
 | 
   280                     # print(f"Mapping {tmp_metabName} to {syn_key}")
 | 
| 
 | 
   281                     tmp_dict[syn_key] = syn_list
 | 
| 
 | 
   282                     tmp_dict[syn_key].append(tmp_metabName)
 | 
| 
 | 
   283 
 | 
| 
 | 
   284     rps_for_cell_lines(dataset, reactions, black_list, syn_dict, substrateFreqTable)
 | 
| 
 | 
   285     print('Execution succeeded')
 | 
| 
 | 
   286 
 | 
| 
 | 
   287 ##############################################################################
 | 
| 
 | 
   288 if __name__ == "__main__": main()
 |