| 
4
 | 
     1 import re
 | 
| 
 | 
     2 import sys
 | 
| 
 | 
     3 import csv
 | 
| 
 | 
     4 import math
 | 
| 
 | 
     5 import argparse
 | 
| 
 | 
     6 
 | 
| 
 | 
     7 import numpy  as np
 | 
| 
 | 
     8 import pickle as pk
 | 
| 
 | 
     9 import pandas as pd
 | 
| 
 | 
    10 
 | 
| 
 | 
    11 from enum   import Enum
 | 
| 
 | 
    12 from typing import Optional, List, Dict, Tuple
 | 
| 
 | 
    13 
 | 
| 
 | 
    14 import utils.general_utils as utils
 | 
| 
 | 
    15 import utils.reaction_parsing as reactionUtils
 | 
| 
 | 
    16 
 | 
| 
 | 
    17 ########################## argparse ##########################################
 | 
| 
 | 
    18 ARGS :argparse.Namespace
 | 
| 
 | 
    19 def process_args() -> argparse.Namespace:
 | 
| 
 | 
    20     """
 | 
| 
 | 
    21     Processes command-line arguments.
 | 
| 
 | 
    22 
 | 
| 
 | 
    23     Args:
 | 
| 
 | 
    24         args (list): List of command-line arguments.
 | 
| 
 | 
    25 
 | 
| 
 | 
    26     Returns:
 | 
| 
 | 
    27         Namespace: An object containing parsed arguments.
 | 
| 
 | 
    28     """
 | 
| 
 | 
    29     parser = argparse.ArgumentParser(usage = '%(prog)s [options]',
 | 
| 
 | 
    30                                      description = 'process some value\'s'+
 | 
| 
 | 
    31                                      ' abundances and reactions to create RPS scores.')
 | 
| 
 | 
    32     parser.add_argument('-rc', '--reaction_choice', 
 | 
| 
 | 
    33                         type = str,
 | 
| 
 | 
    34                         default = 'default',
 | 
| 
 | 
    35                         choices = ['default','custom'], 
 | 
| 
 | 
    36                         help = 'chose which type of reaction dataset you want use')
 | 
| 
 | 
    37     parser.add_argument('-cm', '--custom',
 | 
| 
 | 
    38                         type = str,
 | 
| 
 | 
    39                         help='your dataset if you want custom reactions')
 | 
| 
 | 
    40     parser.add_argument('-td', '--tool_dir',
 | 
| 
 | 
    41                         type = str,
 | 
| 
 | 
    42                         required = True,
 | 
| 
 | 
    43                         help = 'your tool directory')
 | 
| 
 | 
    44     parser.add_argument('-ol', '--out_log', 
 | 
| 
 | 
    45                         help = "Output log")    
 | 
| 
 | 
    46     parser.add_argument('-id', '--input',
 | 
| 
 | 
    47                         type = str,
 | 
| 
 | 
    48                         help = 'input dataset')
 | 
| 
 | 
    49     parser.add_argument('-rp', '--rps_output',
 | 
| 
 | 
    50                         type = str,
 | 
| 
 | 
    51                         required = True,
 | 
| 
 | 
    52                         help = 'rps output')
 | 
| 
 | 
    53     
 | 
| 
 | 
    54     args = parser.parse_args()
 | 
| 
 | 
    55     return args
 | 
| 
 | 
    56 
 | 
| 
 | 
    57 ############################ dataset name #####################################
 | 
| 
 | 
    58 def name_dataset(name_data :str, count :int) -> str:
 | 
| 
 | 
    59     """
 | 
| 
 | 
    60     Produces a unique name for a dataset based on what was provided by the user. The default name for any dataset is "Dataset", thus if the user didn't change it this function appends f"_{count}" to make it unique.
 | 
| 
 | 
    61 
 | 
| 
 | 
    62     Args:
 | 
| 
 | 
    63         name_data : name associated with the dataset (from frontend input params)
 | 
| 
 | 
    64         count : counter from 1 to make these names unique (external)
 | 
| 
 | 
    65 
 | 
| 
 | 
    66     Returns:
 | 
| 
 | 
    67         str : the name made unique
 | 
| 
 | 
    68     """
 | 
| 
 | 
    69     if str(name_data) == 'Dataset':
 | 
| 
 | 
    70         return str(name_data) + '_' + str(count)
 | 
| 
 | 
    71     else:
 | 
| 
 | 
    72         return str(name_data)
 | 
| 
 | 
    73     
 | 
| 
 | 
    74 
 | 
| 
 | 
    75 ############################ get_abund_data ####################################
 | 
| 
 | 
    76 def get_abund_data(dataset: pd.DataFrame, cell_line_index:int) -> Optional[pd.Series]:
 | 
| 
 | 
    77     """
 | 
| 
 | 
    78     Extracts abundance data and turns it into a series for a specific cell line from the dataset, which rows are
 | 
| 
 | 
    79     metabolites and columns are cell lines.
 | 
| 
 | 
    80 
 | 
| 
 | 
    81     Args:
 | 
| 
 | 
    82         dataset (pandas.DataFrame): The DataFrame containing abundance data for all cell lines and metabolites.
 | 
| 
 | 
    83         cell_line_index (int): The index of the cell line of interest in the dataset.
 | 
| 
 | 
    84 
 | 
| 
 | 
    85     Returns:
 | 
| 
 | 
    86         pd.Series or None: A series containing abundance values for the specified cell line.
 | 
| 
 | 
    87                            The name of the series is the name of the cell line.
 | 
| 
 | 
    88                            Returns None if the cell index is invalid.
 | 
| 
 | 
    89     """
 | 
| 
 | 
    90     if cell_line_index < 0 or cell_line_index >= len(dataset.index):
 | 
| 
 | 
    91         print(f"Errore: This cell line index: '{cell_line_index}' is not valid.")
 | 
| 
 | 
    92         return None
 | 
| 
 | 
    93 
 | 
| 
 | 
    94     cell_line_name = dataset.columns[cell_line_index]
 | 
| 
 | 
    95     abundances_series = dataset[cell_line_name][1:]
 | 
| 
 | 
    96 
 | 
| 
 | 
    97     return abundances_series
 | 
| 
 | 
    98 
 | 
| 
 | 
    99 
 | 
| 
 | 
   100 ############################ clean_metabolite_name ####################################
 | 
| 
 | 
   101 def clean_metabolite_name(name :str) -> str:
 | 
| 
 | 
   102     """
 | 
| 
 | 
   103     Removes some characters from a metabolite's name, provided as input, and makes it lowercase in order to simplify
 | 
| 
 | 
   104     the search of a match in the dictionary of synonyms.
 | 
| 
 | 
   105 
 | 
| 
 | 
   106     Args:
 | 
| 
 | 
   107         name : the metabolite's name, as given in the dataset.
 | 
| 
 | 
   108     
 | 
| 
 | 
   109     Returns:
 | 
| 
 | 
   110         str : a new string with the cleaned name.
 | 
| 
 | 
   111     """
 | 
| 
 | 
   112     return "".join(ch for ch in name if ch not in ",;-_'([{ }])").lower()
 | 
| 
 | 
   113 
 | 
| 
 | 
   114 
 | 
| 
 | 
   115 ############################ get_metabolite_id ####################################
 | 
| 
 | 
   116 def get_metabolite_id(name :str, syn_dict :Dict[str, List[str]]) -> str:
 | 
| 
 | 
   117     """
 | 
| 
 | 
   118     Looks through a dictionary of synonyms to find a match for a given metabolite's name.
 | 
| 
 | 
   119 
 | 
| 
 | 
   120     Args:
 | 
| 
 | 
   121         name : the metabolite's name, as given in the dataset.
 | 
| 
 | 
   122         syn_dict : the dictionary of synonyms, using unique identifiers as keys and lists of clean synonyms as values.
 | 
| 
 | 
   123     
 | 
| 
 | 
   124     Returns:
 | 
| 
 | 
   125         str : the internal :str unique identifier of that metabolite, used in all other parts of the model in use.
 | 
| 
 | 
   126         An empty string is returned if a match isn't found.
 | 
| 
 | 
   127     """
 | 
| 
 | 
   128     name = clean_metabolite_name(name)
 | 
| 
 | 
   129     for id, synonyms in syn_dict.items():
 | 
| 
 | 
   130         if name in synonyms: return id
 | 
| 
 | 
   131     
 | 
| 
 | 
   132     return ""
 | 
| 
 | 
   133 
 | 
| 
 | 
   134 ############################ check_missing_metab ####################################
 | 
| 
 | 
   135 def check_missing_metab(reactions: Dict[str, Dict[str, int]], dataset_by_rows: Dict[str, List[float]], cell_lines_amt :int) -> List[str]:
 | 
| 
 | 
   136     """
 | 
| 
 | 
   137     Check for missing metabolites in the abundances dictionary compared to the reactions dictionary and update abundances accordingly.
 | 
| 
 | 
   138 
 | 
| 
 | 
   139     Parameters:
 | 
| 
 | 
   140         reactions (dict): A dictionary representing reactions where keys are reaction names and values are dictionaries containing metabolite names as keys and stoichiometric coefficients as values.
 | 
| 
 | 
   141         dataset_by_rows (dict): A dictionary representing abundances where keys are metabolite names and values are their corresponding abundances for all cell lines.
 | 
| 
 | 
   142         cell_lines_amt : amount of cell lines, needed to add a new list of abundances for missing metabolites.
 | 
| 
 | 
   143 
 | 
| 
 | 
   144     Returns:
 | 
| 
 | 
   145         list[str] : list of metabolite names that were missing in the original abundances dictionary and thus their aboundances were set to 1.
 | 
| 
 | 
   146 
 | 
| 
 | 
   147     Side effects:
 | 
| 
 | 
   148         dataset_by_rows : mut
 | 
| 
 | 
   149     """
 | 
| 
 | 
   150     missing_list = []
 | 
| 
 | 
   151     for reaction in reactions.values():
 | 
| 
 | 
   152         for metabolite in reaction.keys():
 | 
| 
 | 
   153           if metabolite not in dataset_by_rows:
 | 
| 
 | 
   154             dataset_by_rows[metabolite] = [1] * cell_lines_amt
 | 
| 
 | 
   155             missing_list.append(metabolite)
 | 
| 
 | 
   156 
 | 
| 
 | 
   157     return missing_list
 | 
| 
 | 
   158 
 | 
| 
 | 
   159 ############################ calculate_rps ####################################
 | 
| 
 | 
   160 def calculate_rps(reactions: Dict[str, Dict[str, int]], abundances: Dict[str, float], black_list: List[str], missing_list: List[str]) -> Dict[str, float]:
 | 
| 
 | 
   161     """
 | 
| 
 | 
   162     Calculate the Reaction Propensity scores (RPS) based on the availability of reaction substrates, for (ideally) each input model reaction and for each sample.
 | 
| 
 | 
   163     The score is computed as the product of the concentrations of the reacting substances, with each concentration raised to a power equal to its stoichiometric coefficient
 | 
| 
 | 
   164     for each reaction using the provided coefficient and abundance values.
 | 
| 
 | 
   165 
 | 
| 
 | 
   166     Parameters:
 | 
| 
 | 
   167         reactions (dict): A dictionary representing reactions where keys are reaction names and values are dictionaries containing metabolite names as keys and stoichiometric coefficients as values.
 | 
| 
 | 
   168         abundances (dict): A dictionary representing metabolite abundances where keys are metabolite names and values are their corresponding abundances.
 | 
| 
 | 
   169         black_list (list): A list containing metabolite names that should be excluded from the RPS calculation.
 | 
| 
 | 
   170         missing_list (list): A list containing metabolite names that were missing in the original abundances dictionary and thus their values were set to 1.
 | 
| 
 | 
   171 
 | 
| 
 | 
   172     Returns:
 | 
| 
 | 
   173         dict: A dictionary containing Reaction Propensity Scores (RPS) where keys are reaction names and values are the corresponding RPS scores.
 | 
| 
 | 
   174     """
 | 
| 
 | 
   175     rps_scores = {}
 | 
| 
 | 
   176  
 | 
| 
 | 
   177     for reaction_name, substrates in reactions.items():
 | 
| 
 | 
   178         total_contribution = 1
 | 
| 
 | 
   179         metab_significant = False
 | 
| 
 | 
   180         for metabolite, stoichiometry in substrates.items():
 | 
| 
 | 
   181             temp = 1 if math.isnan(abundances[metabolite]) else abundances[metabolite]
 | 
| 
 | 
   182             if metabolite not in black_list and metabolite not in missing_list:
 | 
| 
 | 
   183               metab_significant = True
 | 
| 
 | 
   184             total_contribution *= temp ** stoichiometry
 | 
| 
 | 
   185         
 | 
| 
 | 
   186         rps_scores[reaction_name] = total_contribution if metab_significant else math.nan
 | 
| 
 | 
   187     
 | 
| 
 | 
   188     return rps_scores
 | 
| 
 | 
   189 
 | 
| 
 | 
   190 
 | 
| 
 | 
   191 ############################ rps_for_cell_lines ####################################
 | 
| 
 | 
   192 def rps_for_cell_lines(dataset: List[List[str]], reactions: Dict[str, Dict[str, int]], black_list: List[str], syn_dict: Dict[str, List[str]]) -> None:
 | 
| 
 | 
   193     """
 | 
| 
 | 
   194     Calculate Reaction Propensity Scores (RPS) for each cell line represented in the dataframe and creates an output file.
 | 
| 
 | 
   195 
 | 
| 
 | 
   196     Parameters:
 | 
| 
 | 
   197         dataset : the dataset's data, by rows
 | 
| 
 | 
   198         reactions (dict): A dictionary representing reactions where keys are reaction names and values are dictionaries containing metabolite names as keys and stoichiometric coefficients as values.
 | 
| 
 | 
   199         black_list (list): A list containing metabolite names that should be excluded from the RPS calculation.
 | 
| 
 | 
   200         syn_dict (dict): A dictionary where keys are general metabolite names and values are lists of possible synonyms.
 | 
| 
 | 
   201 
 | 
| 
 | 
   202     Returns:
 | 
| 
 | 
   203         None
 | 
| 
 | 
   204     """
 | 
| 
 | 
   205     cell_lines = dataset[0][1:]
 | 
| 
 | 
   206     abundances_dict = {}
 | 
| 
 | 
   207 
 | 
| 
 | 
   208     translationIsApplied = ARGS.reaction_choice == "default"
 | 
| 
 | 
   209     for row in dataset[1:]:
 | 
| 
 | 
   210         id = get_metabolite_id(row[0], syn_dict) if translationIsApplied else row[0]
 | 
| 
 | 
   211         if id: abundances_dict[id] = list(map(utils.Float(), row[1:]))
 | 
| 
 | 
   212     
 | 
| 
 | 
   213     missing_list = check_missing_metab(reactions, abundances_dict, len((cell_lines)))
 | 
| 
 | 
   214     
 | 
| 
 | 
   215     rps_scores :Dict[Dict[str, float]] = {}
 | 
| 
 | 
   216     for pos, cell_line_name in enumerate(cell_lines):
 | 
| 
 | 
   217         abundances = { metab : abundances[pos] for metab, abundances in abundances_dict.items() }
 | 
| 
 | 
   218         rps_scores[cell_line_name] = calculate_rps(reactions, abundances, black_list, missing_list)
 | 
| 
 | 
   219     
 | 
| 
 | 
   220     df = pd.DataFrame.from_dict(rps_scores)
 | 
| 
 | 
   221     df.rename(columns={'Unnamed: 0': 'Reactions'}, inplace=True)
 | 
| 
 | 
   222     df.to_csv(ARGS.rps_output, sep = '\t', na_rep = "None", index = False)
 | 
| 
 | 
   223 
 | 
| 
 | 
   224 ############################ main ####################################
 | 
| 
 | 
   225 def main() -> None:
 | 
| 
 | 
   226     """
 | 
| 
 | 
   227     Initializes everything and sets the program in motion based on the fronted input arguments.
 | 
| 
 | 
   228 
 | 
| 
 | 
   229     Returns:
 | 
| 
 | 
   230         None
 | 
| 
 | 
   231     """
 | 
| 
 | 
   232     global ARGS
 | 
| 
 | 
   233     ARGS = process_args()
 | 
| 
 | 
   234 
 | 
| 
 | 
   235     # TODO:use utils functions vvv
 | 
| 
 | 
   236     with open(ARGS.tool_dir + '/local/pickle files/black_list.pickle', 'rb') as bl:
 | 
| 
 | 
   237         black_list = pk.load(bl)
 | 
| 
 | 
   238 
 | 
| 
 | 
   239     with open(ARGS.tool_dir + '/local/pickle files/synonyms.pickle', 'rb') as sd:
 | 
| 
 | 
   240         syn_dict = pk.load(sd)
 | 
| 
 | 
   241 
 | 
| 
 | 
   242     dataset = utils.readCsv(utils.FilePath.fromStrPath(ARGS.input), '\t', skipHeader = False)
 | 
| 
 | 
   243 
 | 
| 
 | 
   244     if ARGS.reaction_choice == 'default':
 | 
| 
 | 
   245         reactions = pk.load(open(ARGS.tool_dir + '/local/pickle files/reactions.pickle', 'rb'))
 | 
| 
 | 
   246     
 | 
| 
 | 
   247     elif ARGS.reaction_choice == 'custom':
 | 
| 
 | 
   248         reactions = reactionUtils.parse_custom_reactions(ARGS.custom)
 | 
| 
 | 
   249     
 | 
| 
 | 
   250     rps_for_cell_lines(dataset, reactions, black_list, syn_dict)
 | 
| 
 | 
   251     print('Execution succeded')
 | 
| 
 | 
   252 
 | 
| 
 | 
   253 ##############################################################################
 | 
| 
 | 
   254 if __name__ == "__main__":
 | 
| 
 | 
   255     main() |