Mercurial > repos > pieterlukasse > prims_metabolomics
view rankfilter_GCMS/rankfilter.py @ 57:963684611ccb
fix for xcms support in msclust
author | pieter.lukasse@wur.nl |
---|---|
date | Fri, 12 Dec 2014 12:06:36 +0100 |
parents | 9d5f4f5f764b |
children |
line wrap: on
line source
""" Copyright (C) 2011 by Velitchka Mihaleva, Wageningen University Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ #Library functions definition #----------Begin------------- from numpy import array, linalg, ones from numpy.polynomial import polynomial import math import pdfread import sys def calibrate(standards): ''' Calculates the RT to RI conversion: RI = a + b*RT @param standards: variable containing RI and RT data ''' A = ones((len(standards['R.T.']), 2), dtype=float) A[:, 0] = array(map(float, standards['R.T.'])) [coeff, res, r, s] = linalg.lstsq(A, array(map(float, standards['RI']))) return coeff def calibrate_poly(standards): ''' Calculates the RT to RI conversion using a polynomial model @param standards: variable containing RI and RT data ''' retention_time = array(map(float, standards['R.T.'])) retention_index = array(map(float, standards['RI'])) # Fit a 3rd degree polynomial fit = polynomial.polyfit(retention_time, retention_index, 3)[::-1] return [fit[0], fit[1], fit[2], fit[3]] def calculate_poly(retention_time, poly_cal): ''' Converts a given retention time to retention index using the calculated polynomial model @param retention_time: retention_time to convert to retention index @param poly_cal: result from calculating regression ''' # Calculates RI based on given retention_time using polynomial function retention_time = array(map(float, retention_time)) if len(retention_time) > 1: ri_exp = [] for i in retention_time: ri_exp.append(poly_cal[0] * (i ** 3) + poly_cal[1] * (i ** 2) + (i * poly_cal[2]) + poly_cal[3]) return ri_exp else: return poly_cal[0] * (retention_time ** 3) + poly_cal[1] * (retention_time ** 2) + (retention_time * poly_cal[2]) + poly_cal[3] def convert_rt(hit_list, coeff): ''' Converts a given retention time to retention index using the linear model @param hit_list: list holding the retention time @param coeff: calculated coefficient (slope and intercept) using the linear model ''' #Convert RT to RI hit_list['RIexp'] = array(map(float, hit_list['R.T.'])) * coeff[0] + coeff[1] return hit_list def convert_rt_poly(hit_list, poly_cal): ''' Calls the actual RT to RI converter and returns the updated list with added RIexp value @param hit_list: result list containing the retention time ''' hit_list['RIexp'] = array(map(float, calculate_poly(hit_list['R.T.'], poly_cal))) return hit_list def get_data(libdata, LabelCol): ''' Retrieves datacolumns indicated by LabelCol from libdata (generic function) Returns a dict with the requested column names as keys @param libdata: file from which data is loaded @param LabelCol: columns to retrieve ''' from numpy import take LibData = open(libdata, 'r').read().split('\n') #Get the labels of the columns in the file FirstLine = LibData.pop(0).split('\t') FirstLine[-1] = FirstLine[-1].replace('\r', '') # Create a temporate variable containing the all data tmp_data = [] for ll in LibData: if ll != '': tmp_data.append(ll.split('\t')) # Find the indices of the desired data ind = [] try: for key in LabelCol: ind.append(FirstLine.index(key)) except: print str(key) + " not found in first line of library (" + str(libdata) + ")" print"the folowing items are found in the first line of the library:\n" + str(FirstLine) sys.exit(1) # Extract the desired data data = [] for x in tmp_data: data.append(take(array(x), ind)) # library_data = dict(zip(LabelCol,transpose(data))) library_data = dict(zip(LabelCol, map(lambda *x: list(x), *data))) return library_data def rank_hit(hit_list, library_data, window): ''' Computes the Rank and % relative error @param hit_list: input data @param library_data: library used for reading the RIsvr data @param window: error window ''' hit_match_ripred = [] hit_match_syn = [] # Convert 'Name' data to list in order to be indexed # library_data['Name']=list(library_data['Name']) for hit_cas, hit_name in zip(hit_list['CAS'], hit_list['Name']): index = 0 if hit_cas != 'undef': try: index = library_data['CAS'].index(hit_cas.replace(' ', '').replace('-', '')) except: try: index = library_data['Name'].index(hit_name.replace(' ', '')) except: # If for any reason the hit is not present # in the mainlib library indicate this with -999 index = 0 else: try: index = library_data['Name'].index(hit_name.replace(' ', '')) except: # If for any reason the hit is not present # in the mainlib library indicate this with -999 index = 0 if index != 0: hit_match_ripred.append(float(library_data['RIsvr'][index])) hit_match_syn.append(library_data['Synonyms'][index]) else: hit_match_ripred.append(-999) hit_match_syn.append('None') hit_list['RIsvr'] = hit_match_ripred hit_list['Synonyms'] = hit_match_syn # Determine the relative difference between the experimental # and the predicted RI ri_err = [] for ri_exp, ri_qsar in zip(hit_list['RIexp'], hit_list['RIsvr']): if int(ri_qsar) != -999: ri_err.append(float(int(float(ri_qsar)) - int(float(ri_exp))) / int(float(ri_exp)) * 100) else: ri_err.append(-999) # Define the rank of the hits hit_rank = [] for tt in ri_err: if tt == -999: # The name of the hit generated with AMDIS did not match a name # in the mainlib library hit_rank.append(4) else: # Rank 1 - ri_err is within +/- window/2 if abs(tt) <= float(window) / 2: hit_rank.append(1) # Rank 2 - window/2 < ri_err <= window if abs(tt) > float(window) / 2 and abs(tt) <= float(window): hit_rank.append(2) # Rank 3 - ri_err > window if abs(tt) > float(window): hit_rank.append(3) hit_list['Rank'] = hit_rank hit_list['%rel.err'] = ri_err return hit_list def print_to_file(hit_list, filename, keys_to_print, print_subsets=True): ''' Writes output data to files (four output files are generated): filename_ranked - the hits are ranked filename_filter_in - only hits with rank 1 and 2 are retained filename_filter_out - hits with rank 3 are filtered out filename_filter_missed - hits with rank 4 - there was no match with the library data @param hit_list: a dictionary with the ranked hits @param filename: the core of the output file names @param keys_to_print: determines the order in which the data are printed @param print_subsets: ''' from numpy import take out_ranked = open(filename["ranked"], 'w') if (print_subsets): out_in = open(filename["filter_in"], 'w') out_out = open(filename["filter_out"], 'w') out_missed = open(filename["missed"], 'w') #Convert RIexp and RIsvr into integer for printing hit_list['RIexp'] = map(int, map(math.ceil, hit_list['RIexp'])) hit_list['RIsvr'] = map(int, map(math.ceil, hit_list['RIsvr'])) #Establish the right order of the data to be printed tmp_items = hit_list.items() tmp_keys = hit_list.keys() ind = [] for key in keys_to_print: ind.append(tmp_keys.index(key)) #Print the labels of the columns line = '\t'.join(take(array(tmp_keys), ind)) out_ranked.write('%s\n' % line) if (print_subsets): out_in.write('%s\n' % line) out_out.write('%s\n' % line) out_missed.write('%s\n' % line) #Collect the data for each hit in the right order and print them #in the output file i = 0 for name in hit_list['Name']: tt = [] for x in iter(tmp_items): # trim the value if it is a string: if isinstance(x[1][i], basestring): tt.append(x[1][i].strip()) else: tt.append(x[1][i]) tmp1 = take(array(tt), ind) line = '\t'.join(tmp1.tolist()) out_ranked.write('%s\n' % line) if(print_subsets): if hit_list['Rank'][i] == 4: out_missed.write('%s\n' % line) if hit_list['Rank'][i] == 3: out_out.write('%s\n' % line) if hit_list['Rank'][i] == 1 or hit_list['Rank'][i] == 2: out_in.write('%s\n' % line) i = i + 1 #---------End-------------- def main(): #Ranking and filtering procedure if len(sys.argv) < 2: print "Usage:" print "python RankFilter_GC-MS.py input \n" print "input is a text file that specifies the names and the location" print "of the files with the sample, compounds for calibration, library" print "data, the core of the name ot the outputs, and the value of the" print "window used for the filtering \n" sys.exit(1) #------Read the input file------ try: ifile = open(sys.argv[1], 'r').read().split('\n') except: print sys.argv[1], " file can not be found" sys.exit() #Get the file names for the data #labels - the type of input files #filenames - the names of the input files labels = [] filenames = [] for l in ifile: l = l.strip() if l != '': labels.append(l.split('=')[0].replace(' ', '').replace('\r', '')) filenames.append(l.split('=')[1].replace(' ', '').replace('\r', '')) InputData = dict(zip(labels, filenames)) #this part checkes if the ouput option is set. The output files are saved as the output variable as prefix for the output files #if the output is not found , each output file has to be selected by forehand. This comes in handy for pipeline tools as galaxy print_subsets = True NDIS_is_tabular = False if 'output' in InputData: output_files = {"ranked":InputData['output'] + "_ranked", \ "filter_in":InputData['output'] + "_filter_in", \ "filter_out":InputData['output'] + "filter_out", \ "missed":InputData['output'] + "_missed", \ "missed_parse_pdf":InputData['output'] + "_missed_parse_pdf"} elif 'tabular' in InputData: NDIS_is_tabular = True if(not "onefile" in InputData): output_files = {"ranked":InputData['ranked'], \ "filter_in":InputData['filter_in'], \ "filter_out":InputData['filter_out'], \ "missed":InputData['missed']} else: print_subsets = False output_files = {"ranked":InputData['onefile']} else: output_files = {"ranked":InputData['ranked'], \ "filter_in":InputData['filter_in'], \ "filter_out":InputData['filter_out'], \ "missed":InputData['missed'], \ "missed_parse_pdf":InputData['missed_parse_pdf']} #------Start with calibration------ #Check whether a file with data for the calibration is specified #Specify which data to be read from the file with standard compounds LabelColStand = ['Name', 'R.T.', 'RI'] if InputData['calibration'] != 'none': #get the coeffiecients for the RT to RI convertion try: ifile = open(InputData['calibration'], 'r') except: print "file", InputData['calibration'], "can not be found" sys.exit(1) standards = get_data(InputData['calibration'], LabelColStand) if InputData['model'] == 'linear': coeff = calibrate(standards) elif InputData['model'] == 'poly': poly_cal = calibrate_poly(standards) else: print "error: model ", InputData['model'], " can not be found. Use 'linear' or 'poly' " sys.exit(1) else: #No file has been specified for the calibration #Use the default coefficients print 'No file has been specified for the calibration' print 'WARNING: the default coefficients will be used' coeff = array([29.4327, 454.5260]) if InputData['analysis_type'] == 'AMDIS': try: AmdisOut = open(InputData['sample'], 'r') print("open ok") #Specify which data to be extracted from the AMDIS output table #Weighted and Reverse are measure of matching between the experimental #and the library spectra. The experimental spectrum is used as template #for the calculation of Weighted, whereas for Reverse the template is the #library spectrum LabelCol = ['CAS', 'Name', 'R.T.', 'Weighted', 'Reverse', 'Purity'] #Get the data from the AMDIS output file HitList = get_data(InputData['sample'], LabelCol) #Remove '>' from the names HitList['Name'] = [s.replace('>', '') for s in HitList['Name']] except: print "the file", InputData['sample'], "can not be found" sys.exit(1) if InputData['analysis_type'] == 'NIST': #HitList_missed - a variable of type dictionary containing the hits with the symbol ";" #in the name if not NDIS_is_tabular: print "Warning; NDIS is not tabular format, reading PDF..\n" [HitList, HitList_missed] = pdfread.getPDF(InputData['sample']) else: HitList = pdfread.read_tabular(InputData['sample']) #Convert RT to RI if InputData['model'] == 'linear': HitList = convert_rt(HitList, coeff) if InputData['model'] == 'poly': print "Executing convert_rt_poly().." HitList = convert_rt_poly(HitList, poly_cal) #------Read the library data with the predicted RI------ try: LibData = open(InputData['lib_data'], 'r') except: print "the file", InputData['lib_data'], "can not be found" sys.exit(1) #Specify which data to be extracted from the library data file LabelColLib = ['CAS', 'Name', 'RIsvr', 'Synonyms'] LibraryData = get_data(InputData['lib_data'], LabelColLib) #------Match the hits with the library data and rank them------ if InputData['window'] != '': HitList = rank_hit(HitList, LibraryData, InputData['window']) else: print "No value for the window used for the filtering is specified \n" sys.exit(1) #------Print the ranked and filtered hits------ #Specify which data to be printed if InputData['analysis_type'] == 'AMDIS': keys_to_print = ['R.T.', 'CAS', 'Name', 'Rank', 'RIexp', 'RIsvr', '%rel.err', 'Weighted', 'Reverse', 'Synonyms'] else: keys_to_print = ['ID', 'R.T.', 'Name', 'CAS', 'Rank', 'RIexp', 'RIsvr', '%rel.err', 'Forward', 'Reverse', 'Synonyms', 'Library'] #skip this error output from reading a pdftotext file when file is tabular if InputData['analysis_type'] == 'NIST' and not NDIS_is_tabular: out_missed_pdf = open(output_files['missed_parse_pdf'], 'w') for x, y in zip(HitList_missed['Missed Compounds'], HitList_missed['RT missed Compounds']): out_missed_pdf.write('%s\n' % '\t'.join([y, x])) out_missed_pdf.close() print_to_file(HitList, output_files, keys_to_print, print_subsets) if __name__ == '__main__': main()