Mercurial > repos > iuc > table_compute
view scripts/table_compute.py @ 0:1b0f96ed73f2 draft
"planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/table_compute commit 1ee75135483d5db22c540bc043746cd986f85762"
author | iuc |
---|---|
date | Sat, 17 Aug 2019 16:25:37 -0400 |
parents | |
children | dddadbbac949 |
line wrap: on
line source
#!/usr/bin/env python3 """ Table Compute tool - a wrapper around pandas with parameter input validation. """ __version__ = "0.8" import csv import math from sys import argv import numpy as np import pandas as pd import userconfig as uc from safety import Safety # This should be generated in the same directory # Version command should not need to copy the config if len(argv) == 2 and argv[1] == "--version": print(__version__) exit(-1) class Utils: @staticmethod def getOneValueMathOp(op_name): "Returns a simple one value math operator such as log, sqrt, etc" return getattr(math, op_name) @staticmethod def getVectorPandaOp(op_name): "Returns a valid DataFrame vector operator" return getattr(pd.DataFrame, op_name) @staticmethod def getTwoValuePandaOp(op_name, pd_obj): "Returns a valid two value DataFrame or Series operator" return getattr(type(pd_obj), "__" + op_name + "__") # Math is imported but not directly used because users # may specify a "math.<function>" when inserting a custom # function. To remove linting errors, which break CI testing # we will just use an arbitrary math statement here. __ = math.log # Set decimal precision pd.options.display.precision = uc.Default["precision"] user_mode = uc.Default["user_mode"] user_mode_single = None out_table = None params = uc.Data["params"] if user_mode == "single": # Read in TSV file data = pd.read_csv( uc.Data["tables"][0]["reader_file"], header=uc.Data["tables"][0]["reader_header"], index_col=uc.Data["tables"][0]["reader_row_col"], keep_default_na=uc.Default["narm"], sep='\t' ) # Fix whitespace issues in index or column names data.columns = [col.strip() if type(col) is str else col for col in data.columns] data.index = [row.strip() if type(row) is str else row for row in data.index] user_mode_single = params["user_mode_single"] if user_mode_single == "precision": # Useful for changing decimal precision on write out out_table = data elif user_mode_single == "select": cols_specified = params["select_cols_wanted"] rows_specified = params["select_rows_wanted"] # Select all indexes if empty array of values if not cols_specified: cols_specified = range(len(data.columns)) if not rows_specified: rows_specified = range(len(data)) # do not use duplicate indexes # e.g. [2,3,2,5,5,4,2] to [2,3,5,4] nodupes_col = not params["select_cols_unique"] nodupes_row = not params["select_rows_unique"] if nodupes_col: cols_specified = [x for i, x in enumerate(cols_specified) if x not in cols_specified[:i]] if nodupes_row: rows_specified = [x for i, x in enumerate(rows_specified) if x not in rows_specified[:i]] out_table = data.iloc[rows_specified, cols_specified] elif user_mode_single == "filtersumval": mode = params["filtersumval_mode"] axis = params["filtersumval_axis"] operation = params["filtersumval_op"] compare_operation = params["filtersumval_compare"] value = params["filtersumval_against"] minmatch = params["filtersumval_minmatch"] if mode == "operation": # Perform axis operation summary_op = Utils.getVectorPandaOp(operation) axis_summary = summary_op(data, axis=axis) # Perform vector comparison compare_op = Utils.getTwoValuePandaOp( compare_operation, axis_summary ) axis_bool = compare_op(axis_summary, value) elif mode == "element": if operation.startswith("str_"): data = data.astype("str") value = str(value) # Convert str_eq to eq operation = operation[4:] else: value = float(value) op = Utils.getTwoValuePandaOp(operation, data) bool_mat = op(data, value) axis_bool = np.sum(bool_mat, axis=axis) >= minmatch out_table = data.loc[:, axis_bool] if axis == 0 else data.loc[axis_bool, :] elif user_mode_single == "matrixapply": # 0 - column, 1 - row axis = params["matrixapply_dimension"] # sd, mean, max, min, sum, median, summary operation = params["matrixapply_op"] if operation is None: use_custom = params["matrixapply_custom"] if use_custom: custom_func = params["matrixapply_custom_func"] def fun(vec): """Dummy Function""" return vec ss = Safety(custom_func, ['vec'], 'pd.Series') fun_string = ss.generateFunction() exec(fun_string) # SUPER DUPER SAFE... out_table = data.apply(fun, axis) else: print("No operation given") exit(-1) else: op = getattr(pd.DataFrame, operation) out_table = op(data, axis) elif user_mode_single == "element": # lt, gt, ge, etc. operation = params["element_op"] if operation is not None: op = Utils.getTwoValuePandaOp(operation, data) value = params["element_value"] try: # Could be numeric value = float(value) except ValueError: pass # generate filter matrix of True/False values bool_mat = op(data, value) else: # implement no filtering through a filter matrix filled with # True values. bool_mat = np.full(data.shape, True) # Get the main processing mode mode = params["element_mode"] if mode == "replace": replacement_val = params["element_replace"] out_table = data.mask(bool_mat, replacement_val) elif mode == "modify": mod_op = Utils.getOneValueMathOp(params["element_modify_op"]) out_table = data.mask( bool_mat, data.where(bool_mat).applymap(mod_op) ) elif mode == "scale": scale_op = Utils.getTwoValuePandaOp( params["element_scale_op"], data ) scale_value = params["element_scale_value"] out_table = data.mask( bool_mat, scale_op(data.where(bool_mat), scale_value) ) elif mode == "custom": element_customop = params["element_customop"] def fun(elem): """Dummy Function""" return elem ss = Safety(element_customop, ['elem']) fun_string = ss.generateFunction() exec(fun_string) # SUPER DUPER SAFE... out_table = data.mask( bool_mat, data.where(bool_mat).applymap(fun) ) else: print("No such element mode!", mode) exit(-1) elif user_mode_single == "fulltable": general_mode = params["mode"] if general_mode == "melt": melt_ids = params["MELT"]["melt_ids"] melt_values = params["MELT"]["melt_values"] out_table = pd.melt(data, id_vars=melt_ids, value_vars=melt_values) elif general_mode == "pivot": pivot_index = params["PIVOT"]["pivot_index"] pivot_column = params["PIVOT"]["pivot_column"] pivot_values = params["PIVOT"]["pivot_values"] out_table = data.pivot( index=pivot_index, columns=pivot_column, values=pivot_values ) elif general_mode == "custom": custom_func = params["fulltable_customop"] def fun(tableau): """Dummy Function""" return tableau ss = Safety(custom_func, ['table'], 'pd.DataFrame') fun_string = ss.generateFunction() exec(fun_string) # SUPER DUPER SAFE... out_table = fun(data) else: print("No such mode!", user_mode_single) exit(-1) elif user_mode == "multiple": table_sections = uc.Data["tables"] if not table_sections: print("Multiple table sets not given!") exit(-1) reader_skip = uc.Default["reader_skip"] # Data table = [] # 1-based handlers for users "table1", "table2", etc. table_names = [] # Actual 0-based references "table[0]", "table[1]", etc. table_names_real = [] # Read and populate tables for x, t_sect in enumerate(table_sections): tmp = pd.read_csv( t_sect["file"], header=t_sect["header"], index_col=t_sect["row_names"], keep_default_na=uc.Default["narm"], sep="\t" ) table.append(tmp) table_names.append("table" + str(x + 1)) table_names_real.append("table[" + str(x) + "]") custom_op = params["fulltable_customop"] ss = Safety(custom_op, table_names, 'pd.DataFrame') fun_string = ss.generateFunction() # Change the argument to table fun_string = fun_string.replace("fun(table1):", "fun():") # table1 to table[1] for name, name_real in zip(table_names, table_names_real): fun_string = fun_string.replace(name, name_real) fun_string = fun_string.replace("fun():", "fun(table):") exec(fun_string) # SUPER DUPER SAFE... out_table = fun(table) else: print("No such mode!", user_mode) exit(-1) if not isinstance(out_table, (pd.DataFrame, pd.Series)): print('The specified operation did not result in a table to return.') raise RuntimeError( 'The operation did not generate a pd.DataFrame or pd.Series to return.' ) out_parameters = { "sep": "\t", "float_format": "%%.%df" % pd.options.display.precision, "header": uc.Default["out_headers_col"], "index": uc.Default["out_headers_row"] } if user_mode_single not in ('matrixapply', None): out_parameters["quoting"] = csv.QUOTE_NONE out_table.to_csv(uc.Default["outtable"], **out_parameters)