Mercurial > repos > pieterlukasse > prims_metabolomics
diff rankfilter_GCMS/rankfilter.py @ 0:9d5f4f5f764b
Initial commit to toolshed
author | pieter.lukasse@wur.nl |
---|---|
date | Thu, 16 Jan 2014 13:10:00 +0100 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rankfilter_GCMS/rankfilter.py Thu Jan 16 13:10:00 2014 +0100 @@ -0,0 +1,432 @@ +""" +Copyright (C) 2011 by Velitchka Mihaleva, Wageningen University + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +""" + +#Library functions definition +#----------Begin------------- +from numpy import array, linalg, ones +from numpy.polynomial import polynomial +import math +import pdfread +import sys + + +def calibrate(standards): + ''' + Calculates the RT to RI conversion: RI = a + b*RT + @param standards: variable containing RI and RT data + ''' + A = ones((len(standards['R.T.']), 2), dtype=float) + A[:, 0] = array(map(float, standards['R.T.'])) + [coeff, res, r, s] = linalg.lstsq(A, array(map(float, standards['RI']))) + + return coeff + + +def calibrate_poly(standards): + ''' + Calculates the RT to RI conversion using a polynomial model + @param standards: variable containing RI and RT data + ''' + retention_time = array(map(float, standards['R.T.'])) + retention_index = array(map(float, standards['RI'])) + + # Fit a 3rd degree polynomial + fit = polynomial.polyfit(retention_time, retention_index, 3)[::-1] + return [fit[0], fit[1], fit[2], fit[3]] + + +def calculate_poly(retention_time, poly_cal): + ''' + Converts a given retention time to retention index using the calculated polynomial model + @param retention_time: retention_time to convert to retention index + @param poly_cal: result from calculating regression + ''' + # Calculates RI based on given retention_time using polynomial function + retention_time = array(map(float, retention_time)) + if len(retention_time) > 1: + ri_exp = [] + for i in retention_time: + ri_exp.append(poly_cal[0] * (i ** 3) + poly_cal[1] * (i ** 2) + (i * poly_cal[2]) + poly_cal[3]) + return ri_exp + else: + return poly_cal[0] * (retention_time ** 3) + poly_cal[1] * (retention_time ** 2) + (retention_time * poly_cal[2]) + poly_cal[3] + + +def convert_rt(hit_list, coeff): + ''' + Converts a given retention time to retention index using the linear model + @param hit_list: list holding the retention time + @param coeff: calculated coefficient (slope and intercept) using the linear model + ''' + #Convert RT to RI + hit_list['RIexp'] = array(map(float, hit_list['R.T.'])) * coeff[0] + coeff[1] + return hit_list + + +def convert_rt_poly(hit_list, poly_cal): + ''' + Calls the actual RT to RI converter and returns the updated list with added RIexp value + @param hit_list: result list containing the retention time + ''' + hit_list['RIexp'] = array(map(float, calculate_poly(hit_list['R.T.'], poly_cal))) + return hit_list + + +def get_data(libdata, LabelCol): + ''' + Retrieves datacolumns indicated by LabelCol from libdata (generic function) + Returns a dict with the requested column names as keys + @param libdata: file from which data is loaded + @param LabelCol: columns to retrieve + ''' + from numpy import take + LibData = open(libdata, 'r').read().split('\n') + + #Get the labels of the columns in the file + FirstLine = LibData.pop(0).split('\t') + + FirstLine[-1] = FirstLine[-1].replace('\r', '') + + # Create a temporate variable containing the all data + tmp_data = [] + for ll in LibData: + if ll != '': + tmp_data.append(ll.split('\t')) + + # Find the indices of the desired data + ind = [] + try: + for key in LabelCol: + ind.append(FirstLine.index(key)) + except: + print str(key) + " not found in first line of library (" + str(libdata) + ")" + print"the folowing items are found in the first line of the library:\n" + str(FirstLine) + sys.exit(1) + # Extract the desired data + data = [] + for x in tmp_data: + data.append(take(array(x), ind)) + + # library_data = dict(zip(LabelCol,transpose(data))) + library_data = dict(zip(LabelCol, map(lambda *x: list(x), *data))) + return library_data + + +def rank_hit(hit_list, library_data, window): + ''' + Computes the Rank and % relative error + @param hit_list: input data + @param library_data: library used for reading the RIsvr data + @param window: error window + ''' + hit_match_ripred = [] + hit_match_syn = [] + # Convert 'Name' data to list in order to be indexed + # library_data['Name']=list(library_data['Name']) + + for hit_cas, hit_name in zip(hit_list['CAS'], hit_list['Name']): + index = 0 + if hit_cas != 'undef': + try: + index = library_data['CAS'].index(hit_cas.replace(' ', '').replace('-', '')) + except: + try: + index = library_data['Name'].index(hit_name.replace(' ', '')) + except: + # If for any reason the hit is not present + # in the mainlib library indicate this with -999 + index = 0 + else: + try: + index = library_data['Name'].index(hit_name.replace(' ', '')) + except: + # If for any reason the hit is not present + # in the mainlib library indicate this with -999 + index = 0 + if index != 0: + hit_match_ripred.append(float(library_data['RIsvr'][index])) + hit_match_syn.append(library_data['Synonyms'][index]) + else: + hit_match_ripred.append(-999) + hit_match_syn.append('None') + hit_list['RIsvr'] = hit_match_ripred + hit_list['Synonyms'] = hit_match_syn + + # Determine the relative difference between the experimental + # and the predicted RI + ri_err = [] + + for ri_exp, ri_qsar in zip(hit_list['RIexp'], hit_list['RIsvr']): + if int(ri_qsar) != -999: + ri_err.append(float(int(float(ri_qsar)) - int(float(ri_exp))) / int(float(ri_exp)) * 100) + else: + ri_err.append(-999) + + # Define the rank of the hits + hit_rank = [] + + for tt in ri_err: + if tt == -999: + # The name of the hit generated with AMDIS did not match a name + # in the mainlib library + hit_rank.append(4) + else: + # Rank 1 - ri_err is within +/- window/2 + if abs(tt) <= float(window) / 2: + hit_rank.append(1) + # Rank 2 - window/2 < ri_err <= window + if abs(tt) > float(window) / 2 and abs(tt) <= float(window): + hit_rank.append(2) + # Rank 3 - ri_err > window + if abs(tt) > float(window): + hit_rank.append(3) + hit_list['Rank'] = hit_rank + hit_list['%rel.err'] = ri_err + return hit_list + +def print_to_file(hit_list, filename, keys_to_print, print_subsets=True): + ''' + Writes output data to files (four output files are generated): + filename_ranked - the hits are ranked + filename_filter_in - only hits with rank 1 and 2 are retained + filename_filter_out - hits with rank 3 are filtered out + filename_filter_missed - hits with rank 4 - there was no match with the + library data + @param hit_list: a dictionary with the ranked hits + @param filename: the core of the output file names + @param keys_to_print: determines the order in which the data are printed + @param print_subsets: + ''' + from numpy import take + + out_ranked = open(filename["ranked"], 'w') + if (print_subsets): + out_in = open(filename["filter_in"], 'w') + out_out = open(filename["filter_out"], 'w') + out_missed = open(filename["missed"], 'w') + + #Convert RIexp and RIsvr into integer for printing + hit_list['RIexp'] = map(int, map(math.ceil, hit_list['RIexp'])) + hit_list['RIsvr'] = map(int, map(math.ceil, hit_list['RIsvr'])) + + #Establish the right order of the data to be printed + tmp_items = hit_list.items() + tmp_keys = hit_list.keys() + ind = [] + + for key in keys_to_print: + ind.append(tmp_keys.index(key)) + + #Print the labels of the columns + line = '\t'.join(take(array(tmp_keys), ind)) + out_ranked.write('%s\n' % line) + if (print_subsets): + out_in.write('%s\n' % line) + out_out.write('%s\n' % line) + out_missed.write('%s\n' % line) + + #Collect the data for each hit in the right order and print them + #in the output file + i = 0 + for name in hit_list['Name']: + tt = [] + for x in iter(tmp_items): + # trim the value if it is a string: + if isinstance(x[1][i], basestring): + tt.append(x[1][i].strip()) + else: + tt.append(x[1][i]) + tmp1 = take(array(tt), ind) + line = '\t'.join(tmp1.tolist()) + + out_ranked.write('%s\n' % line) + if(print_subsets): + if hit_list['Rank'][i] == 4: + out_missed.write('%s\n' % line) + if hit_list['Rank'][i] == 3: + out_out.write('%s\n' % line) + if hit_list['Rank'][i] == 1 or hit_list['Rank'][i] == 2: + out_in.write('%s\n' % line) + + i = i + 1 + +#---------End-------------- +def main(): + #Ranking and filtering procedure + if len(sys.argv) < 2: + print "Usage:" + print "python RankFilter_GC-MS.py input \n" + print "input is a text file that specifies the names and the location" + print "of the files with the sample, compounds for calibration, library" + print "data, the core of the name ot the outputs, and the value of the" + print "window used for the filtering \n" + + sys.exit(1) + + #------Read the input file------ + try: + ifile = open(sys.argv[1], 'r').read().split('\n') + except: + print sys.argv[1], " file can not be found" + sys.exit() + + #Get the file names for the data + #labels - the type of input files + #filenames - the names of the input files + labels = [] + filenames = [] + + for l in ifile: + l = l.strip() + if l != '': + labels.append(l.split('=')[0].replace(' ', '').replace('\r', '')) + filenames.append(l.split('=')[1].replace(' ', '').replace('\r', '')) + + InputData = dict(zip(labels, filenames)) + + #this part checkes if the ouput option is set. The output files are saved as the output variable as prefix for the output files + #if the output is not found , each output file has to be selected by forehand. This comes in handy for pipeline tools as galaxy + print_subsets = True + NDIS_is_tabular = False + + if 'output' in InputData: + output_files = {"ranked":InputData['output'] + "_ranked", \ + "filter_in":InputData['output'] + "_filter_in", \ + "filter_out":InputData['output'] + "filter_out", \ + "missed":InputData['output'] + "_missed", \ + "missed_parse_pdf":InputData['output'] + "_missed_parse_pdf"} + elif 'tabular' in InputData: + NDIS_is_tabular = True + if(not "onefile" in InputData): + output_files = {"ranked":InputData['ranked'], \ + "filter_in":InputData['filter_in'], \ + "filter_out":InputData['filter_out'], \ + "missed":InputData['missed']} + else: + print_subsets = False + output_files = {"ranked":InputData['onefile']} + else: + output_files = {"ranked":InputData['ranked'], \ + "filter_in":InputData['filter_in'], \ + "filter_out":InputData['filter_out'], \ + "missed":InputData['missed'], \ + "missed_parse_pdf":InputData['missed_parse_pdf']} + + #------Start with calibration------ + #Check whether a file with data for the calibration is specified + #Specify which data to be read from the file with standard compounds + LabelColStand = ['Name', 'R.T.', 'RI'] + + if InputData['calibration'] != 'none': + #get the coeffiecients for the RT to RI convertion + + try: + ifile = open(InputData['calibration'], 'r') + except: + print "file", InputData['calibration'], "can not be found" + sys.exit(1) + + standards = get_data(InputData['calibration'], LabelColStand) + if InputData['model'] == 'linear': + coeff = calibrate(standards) + elif InputData['model'] == 'poly': + poly_cal = calibrate_poly(standards) + else: + print "error: model ", InputData['model'], " can not be found. Use 'linear' or 'poly' " + sys.exit(1) + else: + #No file has been specified for the calibration + #Use the default coefficients + print 'No file has been specified for the calibration' + print 'WARNING: the default coefficients will be used' + coeff = array([29.4327, 454.5260]) + + if InputData['analysis_type'] == 'AMDIS': + try: + AmdisOut = open(InputData['sample'], 'r') + print("open ok") + #Specify which data to be extracted from the AMDIS output table + #Weighted and Reverse are measure of matching between the experimental + #and the library spectra. The experimental spectrum is used as template + #for the calculation of Weighted, whereas for Reverse the template is the + #library spectrum + LabelCol = ['CAS', 'Name', 'R.T.', 'Weighted', 'Reverse', 'Purity'] + + #Get the data from the AMDIS output file + HitList = get_data(InputData['sample'], LabelCol) + #Remove '>' from the names + HitList['Name'] = [s.replace('>', '') for s in HitList['Name']] + except: + print "the file", InputData['sample'], "can not be found" + sys.exit(1) + if InputData['analysis_type'] == 'NIST': + #HitList_missed - a variable of type dictionary containing the hits with the symbol ";" + #in the name + if not NDIS_is_tabular: + print "Warning; NDIS is not tabular format, reading PDF..\n" + [HitList, HitList_missed] = pdfread.getPDF(InputData['sample']) + else: + HitList = pdfread.read_tabular(InputData['sample']) + + #Convert RT to RI + if InputData['model'] == 'linear': + HitList = convert_rt(HitList, coeff) + if InputData['model'] == 'poly': + print "Executing convert_rt_poly().." + HitList = convert_rt_poly(HitList, poly_cal) + + #------Read the library data with the predicted RI------ + try: + LibData = open(InputData['lib_data'], 'r') + except: + print "the file", InputData['lib_data'], "can not be found" + sys.exit(1) + + #Specify which data to be extracted from the library data file + LabelColLib = ['CAS', 'Name', 'RIsvr', 'Synonyms'] + LibraryData = get_data(InputData['lib_data'], LabelColLib) + + #------Match the hits with the library data and rank them------ + if InputData['window'] != '': + HitList = rank_hit(HitList, LibraryData, InputData['window']) + else: + print "No value for the window used for the filtering is specified \n" + sys.exit(1) + + #------Print the ranked and filtered hits------ + #Specify which data to be printed + if InputData['analysis_type'] == 'AMDIS': + keys_to_print = ['R.T.', 'CAS', 'Name', 'Rank', 'RIexp', 'RIsvr', '%rel.err', 'Weighted', 'Reverse', 'Synonyms'] + else: + keys_to_print = ['ID', 'R.T.', 'Name', 'CAS', 'Rank', 'RIexp', 'RIsvr', '%rel.err', 'Forward', 'Reverse', 'Synonyms', 'Library'] + + #skip this error output from reading a pdftotext file when file is tabular + if InputData['analysis_type'] == 'NIST' and not NDIS_is_tabular: + out_missed_pdf = open(output_files['missed_parse_pdf'], 'w') + for x, y in zip(HitList_missed['Missed Compounds'], HitList_missed['RT missed Compounds']): + out_missed_pdf.write('%s\n' % '\t'.join([y, x])) + out_missed_pdf.close() + + print_to_file(HitList, output_files, keys_to_print, print_subsets) + +if __name__ == '__main__': + main()