Mercurial > repos > galaxyp > appendfdr
view append_fdr.py @ 0:ef7cc296f063 draft default tip
Initial commit.
author | galaxyp |
---|---|
date | Fri, 10 May 2013 16:42:08 -0400 |
parents | |
children |
line wrap: on
line source
#!/usr/bin/env python import optparse import sys try: # Ubuntu deps: gfortan libblas-dev liblapack-dev # pip deps: numpy scipy from math import sqrt from scipy.optimize import root from numpy import arange, exp, concatenate, sum, log, array, seterr except ImportError: # Allow this script to be used for global FDR even # if these dependencies are not present. pass SEPARATORS = {"TAB": "\t", "SPACE": " ", "COMMA": "," } def __main__(): run_script() def append_fdr(input_file, output, settings): sorted_scores, accum_hits, accum_decoys = _accum_decoys(input_file, settings) fdr_array = compute_fdr(sorted_scores, accum_hits, accum_decoys, settings) index = 0 for line in __read_lines(input_file): if not line or line.startswith('#'): continue entry = Entry(line, settings, index) this_fdr = fdr_array[entry.score] new_line = "%s%s%f" % (line, settings["separator"], this_fdr) print >> output, new_line index += 1 def compute_fdr(sorted_scores, accum_hits, accum_decoys, settings): fdr_type = settings["fdr_type"] compute_functions = {"global_conservative": _compute_fdr_global_conservative, "global_permissive": _compute_fdr_global_permissive, #"pspep": _compute_pspep } return compute_functions[fdr_type](sorted_scores, accum_hits, accum_decoys, settings) #return compute_functions[fdr_type](all_hits_array, decoy_hits_array, settings) def _compute_pspep(all_hits, decoy_hits, settings): scaling = _get_scaling(settings) seterr(all="ignore") sigma = array([sqrt(x) if x > 0 else 0.2 for x in decoy_hits]) if isinstance(all_hits, list): all_hits = array(all_hits) if isinstance(decoy_hits, list): decoy_hits = array(decoy_hits) searchSeg = concatenate((exp(arange(-8, 9, 2)), -1 * exp(arange(-8, 9, 2)))) bestResids = sys.float_info.max bestResidsComb = [0.0, 0.0, 0.0] for aEst in searchSeg: for bEst in searchSeg: for cEst in searchSeg: try: sol = _non_linear_fit(aEst, bEst, cEst, all_hits, decoy_hits, sigma) if sol[3] and sol[3] < bestResids: bestResids = sol[3] bestResidsComb = sol[0:3] except: pass (a, b, c) = bestResidsComb[0:3] fdr_local = scaling * (exp(b * (all_hits - a)) / (exp(b * (all_hits - a)) + 1)) * c return fdr_local def _get_scaling(settings): scaling = float(settings.get("scaling", "2.0")) return scaling def _non_linear_fit(aEst, bEst, cEst, all_hits, decoy_hits, sigma, scaling=2): guess = [aEst, bEst, cEst] def f(a, b, c): return c * (log(exp(b * (all_hits - a)) + 1) - log(exp(-b * a) + 1)) / b def fcn(p): a = p[0] b = p[1] c = p[2] return (decoy_hits - f(a, b, c)) / sigma solution = root(fcn, guess, method='lm') a = solution.x[0] b = solution.x[1] c = solution.x[2] resids = sum((decoy_hits - f(a, b, c)) ** 2) / len(all_hits) return (a, b, c, resids) def _compute_fdr_global_conservative(sorted_scores, accum_hits, accum_decoys, settings): raw_fdrs = build_raw_fdr_dict(sorted_scores, accum_hits, accum_decoys, settings) fdrs = {} max_fdr = -1 for score in sorted_scores: raw_fdr = raw_fdrs[score] if raw_fdr > max_fdr: max_fdr = raw_fdr fdrs[score] = max_fdr return fdrs def _compute_fdr_global_permissive(sorted_scores, accum_hits, accum_decoys, settings): raw_fdrs = build_raw_fdr_dict(sorted_scores, accum_hits, accum_decoys, settings) fdrs = {} index = len(sorted_scores) - 1 min_fdr = 1 while index >= 0: score = sorted_scores[index] raw_fdr = raw_fdrs[score] if raw_fdr < min_fdr: min_fdr = raw_fdr fdrs[score] = min_fdr index -= 1 return fdrs def build_raw_fdr_dict(sorted_scores, accum_hits, accum_decoys, settings): scaling = _get_scaling(settings) fdrs = {} for score in sorted_scores: fdrs[score] = (scaling * accum_decoys[score]) / accum_hits[score] return fdrs def __read_lines(input_file): with open(input_file, 'r') as input: for i, line in enumerate(input): line = line.rstrip('\r\n') yield line def __read_entries(input_file, settings): total_hits = 0 for line in __read_lines(input_file): if not line or line.startswith('#'): continue entry = Entry(line, settings, total_hits) total_hits = total_hits + 1 yield entry class Entry(object): def __init__(self, line, settings, index): self.settings = settings line_parts = line.split(settings["separator"]) self.identifier = line_parts[settings["identifiers_index"]] if settings["score_column"]: self.score = float(line_parts[settings["score_column"]]) else: self.score = index @property def is_decoy(self): return self.identifier.startswith(self.settings["decoy_prefix"]) def _accum_decoys(input_file, settings): hits_at_score = {} decoys_at_score = {} for entry in __read_entries(input_file, settings): score = entry.score score_total = hits_at_score.get(score, 0) + 1 score_decoys = decoys_at_score.get(score, 0) + (1 if entry.is_decoy else 0) hits_at_score[score] = score_total decoys_at_score[score] = score_decoys sorted_scores = sorted(hits_at_score, reverse=not settings["invert_score"]) accum_hits = {} accum_decoys = {} accum_hit_count = 0 accum_decoy_count = 0 for score in sorted_scores: accum_decoy_count += decoys_at_score[score] accum_hit_count += hits_at_score[score] accum_hits[score] = accum_hit_count accum_decoys[score] = accum_decoy_count return (sorted_scores, accum_hits, accum_decoys) def _build_arrays(input_file, settings, sorted_scores, accum_hits, accum_decoys): all_hits = [] decoy_hits = [] for entry in __read_entries(input_file, settings): score = entry.score all_hits.append(accum_hits[score]) decoy_hits.append(accum_decoys[score]) return (all_hits, decoy_hits) def run_script(): parser = optparse.OptionParser() parser.add_option("--input") parser.add_option("--output") parser.add_option("--decoy_prefix") parser.add_option("--identifiers_column") parser.add_option("--separator", default="TAB") parser.add_option("--fdr_type", default="global_conservative") parser.add_option("--scaling") parser.add_option("--score_column", default=None) # By default higher score is better. parser.add_option("--invert_score", default=False, action="store_true") (options, args) = parser.parse_args() decoy_prefix = options.decoy_prefix identifiers_column = options.identifiers_column score_column = options.score_column separator = SEPARATORS[options.separator] settings = {"decoy_prefix": decoy_prefix, "identifiers_index": int(identifiers_column) - 1, "fdr_type": options.fdr_type, "separator": separator, "scaling": options.scaling, "invert_score": options.invert_score } if score_column: settings["score_column"] = int(score_column) - 1 else: settings["score_column"] = None # Assume data is descending, use index as score and invert. settings["invert_score"] = True with open(options.output, 'w') as output: append_fdr(options.input, output, settings) if __name__ == '__main__': __main__()