diff rankfilter_GCMS/rankfilter.py @ 0:9d5f4f5f764b

Initial commit to toolshed
author pieter.lukasse@wur.nl
date Thu, 16 Jan 2014 13:10:00 +0100
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rankfilter_GCMS/rankfilter.py	Thu Jan 16 13:10:00 2014 +0100
@@ -0,0 +1,432 @@
+"""
+Copyright (C) 2011 by Velitchka Mihaleva, Wageningen University
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
+"""
+
+#Library functions definition
+#----------Begin-------------
+from numpy import array, linalg, ones
+from numpy.polynomial import polynomial
+import math
+import pdfread
+import sys
+
+
+def calibrate(standards):
+    '''
+    Calculates the RT to RI conversion: RI = a + b*RT
+    @param standards: variable containing RI and RT data
+    '''
+    A = ones((len(standards['R.T.']), 2), dtype=float)
+    A[:, 0] = array(map(float, standards['R.T.']))
+    [coeff, res, r, s] = linalg.lstsq(A, array(map(float, standards['RI'])))
+
+    return coeff
+
+
+def calibrate_poly(standards):
+    '''
+    Calculates the RT to RI conversion using a polynomial model
+    @param standards: variable containing RI and RT data
+    '''
+    retention_time = array(map(float, standards['R.T.']))
+    retention_index = array(map(float, standards['RI']))
+
+    # Fit a 3rd degree polynomial
+    fit = polynomial.polyfit(retention_time, retention_index, 3)[::-1]
+    return [fit[0], fit[1], fit[2], fit[3]]
+
+
+def calculate_poly(retention_time, poly_cal):
+    '''
+    Converts a given retention time to retention index using the calculated polynomial model
+    @param retention_time: retention_time to convert to retention index
+    @param poly_cal: result from calculating regression
+    '''
+    # Calculates RI based on given retention_time using polynomial function
+    retention_time = array(map(float, retention_time))
+    if len(retention_time) > 1:
+        ri_exp = []
+        for i in retention_time:
+            ri_exp.append(poly_cal[0] * (i ** 3) + poly_cal[1] * (i ** 2) + (i * poly_cal[2]) + poly_cal[3])
+        return ri_exp
+    else:
+        return poly_cal[0] * (retention_time ** 3) + poly_cal[1] * (retention_time ** 2) + (retention_time * poly_cal[2]) + poly_cal[3]
+
+
+def convert_rt(hit_list, coeff):
+    '''
+    Converts a given retention time to retention index using the linear model
+    @param hit_list: list holding the retention time
+    @param coeff: calculated coefficient (slope and intercept) using the linear model
+    '''
+    #Convert RT to RI
+    hit_list['RIexp'] = array(map(float, hit_list['R.T.'])) * coeff[0] + coeff[1]
+    return hit_list
+
+
+def convert_rt_poly(hit_list, poly_cal):
+    '''
+    Calls the actual RT to RI converter and returns the updated list with added RIexp value
+    @param hit_list: result list containing the retention time
+    '''
+    hit_list['RIexp'] = array(map(float, calculate_poly(hit_list['R.T.'], poly_cal)))
+    return hit_list
+
+
+def get_data(libdata, LabelCol):
+    '''
+    Retrieves datacolumns indicated by LabelCol from libdata (generic function)
+    Returns a dict with the requested column names as keys
+    @param libdata: file from which data is loaded
+    @param LabelCol: columns to retrieve
+    '''
+    from numpy import take
+    LibData = open(libdata, 'r').read().split('\n')
+
+    #Get the labels of the columns in the file
+    FirstLine = LibData.pop(0).split('\t')
+
+    FirstLine[-1] = FirstLine[-1].replace('\r', '')
+
+    # Create a temporate variable containing the all data
+    tmp_data = []
+    for ll in LibData:
+        if ll != '':
+            tmp_data.append(ll.split('\t'))
+
+    # Find the indices of the desired data
+    ind = []
+    try:
+        for key in LabelCol:
+            ind.append(FirstLine.index(key))
+    except:
+        print str(key) + " not found in first line of library (" + str(libdata) + ")"
+        print"the folowing items are found in the first line of the library:\n" + str(FirstLine)
+        sys.exit(1)
+    # Extract the desired data
+    data = []
+    for x in tmp_data:
+        data.append(take(array(x), ind))
+
+    # library_data = dict(zip(LabelCol,transpose(data)))
+    library_data = dict(zip(LabelCol, map(lambda *x: list(x), *data)))
+    return library_data
+
+
+def rank_hit(hit_list, library_data, window):
+    '''
+    Computes the Rank and % relative error
+    @param hit_list: input data
+    @param library_data: library used for reading the RIsvr data 
+    @param window: error window
+    '''
+    hit_match_ripred = []
+    hit_match_syn = []
+    # Convert 'Name' data to list in order to be indexed
+    # library_data['Name']=list(library_data['Name'])
+
+    for hit_cas, hit_name in zip(hit_list['CAS'], hit_list['Name']):
+        index = 0
+        if hit_cas != 'undef':
+            try:
+                index = library_data['CAS'].index(hit_cas.replace(' ', '').replace('-', ''))
+            except:
+                try:
+                    index = library_data['Name'].index(hit_name.replace(' ', ''))
+                except:
+                    # If for any reason the hit is not present
+                    # in the mainlib library indicate this with -999 
+                    index = 0
+        else:
+            try:
+                index = library_data['Name'].index(hit_name.replace(' ', ''))
+            except:
+                # If for any reason the hit is not present
+                # in the mainlib library indicate this with -999 
+                index = 0
+        if index != 0:
+            hit_match_ripred.append(float(library_data['RIsvr'][index]))
+            hit_match_syn.append(library_data['Synonyms'][index])
+        else:
+            hit_match_ripred.append(-999)
+            hit_match_syn.append('None')
+    hit_list['RIsvr'] = hit_match_ripred
+    hit_list['Synonyms'] = hit_match_syn
+
+    # Determine the relative difference between the experimental
+    # and the predicted RI
+    ri_err = []
+
+    for ri_exp, ri_qsar in zip(hit_list['RIexp'], hit_list['RIsvr']):
+        if int(ri_qsar) != -999:
+            ri_err.append(float(int(float(ri_qsar)) - int(float(ri_exp))) / int(float(ri_exp)) * 100)
+        else:
+            ri_err.append(-999)
+
+    # Define the rank of the hits
+    hit_rank = []
+
+    for tt in ri_err:
+        if tt == -999:
+            # The name of the hit generated with AMDIS did not match a name
+            # in the mainlib library
+            hit_rank.append(4)
+        else:
+            # Rank 1 - ri_err is within +/- window/2
+            if abs(tt) <= float(window) / 2:
+                hit_rank.append(1)
+            # Rank 2 - window/2 < ri_err <= window 
+            if abs(tt) > float(window) / 2 and abs(tt) <= float(window):
+                hit_rank.append(2)
+            # Rank 3 - ri_err > window
+            if abs(tt) > float(window):
+                hit_rank.append(3)
+    hit_list['Rank'] = hit_rank
+    hit_list['%rel.err'] = ri_err
+    return hit_list
+
+def print_to_file(hit_list, filename, keys_to_print, print_subsets=True):
+    '''
+    Writes output data to files (four output files are generated):
+        filename_ranked - the hits are ranked
+        filename_filter_in - only hits with rank 1 and 2 are retained
+        filename_filter_out - hits with rank 3 are filtered out
+        filename_filter_missed - hits with rank 4 - there was no match with the
+                                 library data
+    @param hit_list: a dictionary with the ranked hits
+    @param filename: the core of the output file names
+    @param keys_to_print: determines the order in which the data are printed
+    @param print_subsets:
+    '''
+    from numpy import take
+
+    out_ranked = open(filename["ranked"], 'w')
+    if (print_subsets):
+        out_in = open(filename["filter_in"], 'w')
+        out_out = open(filename["filter_out"], 'w')
+        out_missed = open(filename["missed"], 'w')
+
+    #Convert RIexp and RIsvr into integer for printing
+    hit_list['RIexp'] = map(int, map(math.ceil, hit_list['RIexp']))
+    hit_list['RIsvr'] = map(int, map(math.ceil, hit_list['RIsvr']))
+
+    #Establish the right order of the data to be printed    
+    tmp_items = hit_list.items()
+    tmp_keys = hit_list.keys()
+    ind = []
+
+    for key in keys_to_print:
+        ind.append(tmp_keys.index(key))
+
+    #Print the labels of the columns
+    line = '\t'.join(take(array(tmp_keys), ind))
+    out_ranked.write('%s\n' % line)
+    if (print_subsets):
+        out_in.write('%s\n' % line)
+        out_out.write('%s\n' % line)
+        out_missed.write('%s\n' % line)
+
+    #Collect the data for each hit in the right order and print them
+    #in the output file
+    i = 0
+    for name in hit_list['Name']:
+        tt = []
+        for x in iter(tmp_items):
+            # trim the value if it is a string:
+            if isinstance(x[1][i], basestring):
+                tt.append(x[1][i].strip())
+            else:
+                tt.append(x[1][i])
+        tmp1 = take(array(tt), ind)
+        line = '\t'.join(tmp1.tolist())
+
+        out_ranked.write('%s\n' % line)
+        if(print_subsets):
+            if hit_list['Rank'][i] == 4:
+                out_missed.write('%s\n' % line)
+            if hit_list['Rank'][i] == 3:
+                out_out.write('%s\n' % line)
+            if hit_list['Rank'][i] == 1 or hit_list['Rank'][i] == 2:
+                out_in.write('%s\n' % line)
+
+        i = i + 1
+
+#---------End--------------
+def main():
+    #Ranking and filtering procedure
+    if len(sys.argv) < 2:
+        print "Usage:"
+        print "python RankFilter_GC-MS.py input \n"
+        print "input is a text file that specifies the names and the location"
+        print "of the files with the sample, compounds for calibration, library"
+        print "data, the core of the name ot the outputs, and the value of the"
+        print "window used for the filtering \n"
+
+        sys.exit(1)
+
+    #------Read the input file------
+    try:
+        ifile = open(sys.argv[1], 'r').read().split('\n')
+    except:
+        print sys.argv[1], " file can not be found"
+        sys.exit()
+
+    #Get the file names for the data
+    #labels - the type of input files
+    #filenames - the names of the input files
+    labels = []
+    filenames = []
+
+    for l in ifile:
+        l = l.strip()
+        if l != '':
+            labels.append(l.split('=')[0].replace(' ', '').replace('\r', ''))
+            filenames.append(l.split('=')[1].replace(' ', '').replace('\r', ''))
+
+    InputData = dict(zip(labels, filenames))
+
+    #this part checkes if the ouput option is set. The output files are saved as the output variable as prefix for the output files
+    #if the output is not found , each output file has to be selected by forehand. This comes in handy for pipeline tools as galaxy
+    print_subsets = True
+    NDIS_is_tabular = False
+
+    if 'output' in InputData:
+        output_files = {"ranked":InputData['output'] + "_ranked", \
+        "filter_in":InputData['output'] + "_filter_in", \
+        "filter_out":InputData['output'] + "filter_out", \
+        "missed":InputData['output'] + "_missed", \
+        "missed_parse_pdf":InputData['output'] + "_missed_parse_pdf"}
+    elif 'tabular' in InputData:
+        NDIS_is_tabular = True
+        if(not "onefile" in InputData):
+            output_files = {"ranked":InputData['ranked'], \
+            "filter_in":InputData['filter_in'], \
+            "filter_out":InputData['filter_out'], \
+            "missed":InputData['missed']}
+        else:
+            print_subsets = False
+            output_files = {"ranked":InputData['onefile']}
+    else:
+        output_files = {"ranked":InputData['ranked'], \
+        "filter_in":InputData['filter_in'], \
+        "filter_out":InputData['filter_out'], \
+        "missed":InputData['missed'], \
+        "missed_parse_pdf":InputData['missed_parse_pdf']}
+
+    #------Start with calibration------
+    #Check whether a file with data for the calibration is specified
+    #Specify which data to be read from the file with standard compounds
+    LabelColStand = ['Name', 'R.T.', 'RI']
+
+    if InputData['calibration'] != 'none':
+        #get the coeffiecients for the RT to RI convertion
+
+        try:
+            ifile = open(InputData['calibration'], 'r')
+        except:
+            print "file", InputData['calibration'], "can not be found"
+            sys.exit(1)
+
+        standards = get_data(InputData['calibration'], LabelColStand)
+        if InputData['model'] == 'linear':
+            coeff = calibrate(standards)
+        elif InputData['model'] == 'poly':
+            poly_cal = calibrate_poly(standards)
+        else:
+            print "error: model ", InputData['model'], " can not be found. Use 'linear' or 'poly' "
+            sys.exit(1)
+    else:
+        #No file has been specified for the calibration
+        #Use the default coefficients
+        print 'No file has been specified for the calibration'
+        print 'WARNING: the default coefficients will be used'
+        coeff = array([29.4327, 454.5260])
+
+    if InputData['analysis_type'] == 'AMDIS':
+        try:
+            AmdisOut = open(InputData['sample'], 'r')
+            print("open ok")
+            #Specify which data to be extracted from the AMDIS output table
+            #Weighted and Reverse are measure of matching between the experimental
+            #and the library spectra. The experimental spectrum is used as template
+            #for the calculation of Weighted, whereas for Reverse the template is the
+            #library spectrum
+            LabelCol = ['CAS', 'Name', 'R.T.', 'Weighted', 'Reverse', 'Purity']
+
+            #Get the data from the AMDIS output file
+            HitList = get_data(InputData['sample'], LabelCol)
+            #Remove '>' from the names
+            HitList['Name'] = [s.replace('>', '') for s in HitList['Name']]
+        except:
+            print "the file", InputData['sample'], "can not be found"
+            sys.exit(1)
+    if InputData['analysis_type'] == 'NIST':
+        #HitList_missed - a variable of type dictionary containing the hits with the symbol ";"
+        #in the name
+        if not NDIS_is_tabular:
+            print "Warning; NDIS is not tabular format, reading PDF..\n"
+            [HitList, HitList_missed] = pdfread.getPDF(InputData['sample'])
+        else:
+            HitList = pdfread.read_tabular(InputData['sample'])
+
+    #Convert RT to RI
+    if InputData['model'] == 'linear':
+            HitList = convert_rt(HitList, coeff)
+    if InputData['model'] == 'poly':
+            print "Executing convert_rt_poly().."
+            HitList = convert_rt_poly(HitList, poly_cal)
+
+    #------Read the library data with the predicted RI------
+    try:
+        LibData = open(InputData['lib_data'], 'r')
+    except:
+        print "the file", InputData['lib_data'], "can not be found"
+        sys.exit(1)
+
+    #Specify which data to be extracted from the library data file
+    LabelColLib = ['CAS', 'Name', 'RIsvr', 'Synonyms']
+    LibraryData = get_data(InputData['lib_data'], LabelColLib)
+
+    #------Match the hits with the library data and rank them------
+    if InputData['window'] != '':
+        HitList = rank_hit(HitList, LibraryData, InputData['window'])
+    else:
+        print "No value for the window used for the filtering is specified \n"
+        sys.exit(1)
+
+    #------Print the ranked and filtered hits------
+    #Specify which data to be printed
+    if InputData['analysis_type'] == 'AMDIS':
+        keys_to_print = ['R.T.', 'CAS', 'Name', 'Rank', 'RIexp', 'RIsvr', '%rel.err', 'Weighted', 'Reverse', 'Synonyms']
+    else:
+        keys_to_print = ['ID', 'R.T.', 'Name', 'CAS', 'Rank', 'RIexp', 'RIsvr', '%rel.err', 'Forward', 'Reverse', 'Synonyms', 'Library']
+
+    #skip this error output from reading a pdftotext file when file is tabular     
+    if InputData['analysis_type'] == 'NIST' and not NDIS_is_tabular:
+        out_missed_pdf = open(output_files['missed_parse_pdf'], 'w')
+        for x, y in zip(HitList_missed['Missed Compounds'], HitList_missed['RT missed Compounds']):
+            out_missed_pdf.write('%s\n' % '\t'.join([y, x]))
+        out_missed_pdf.close()
+
+    print_to_file(HitList, output_files, keys_to_print, print_subsets)
+
+if __name__ == '__main__':
+    main()