0
|
1 '''
|
|
2 Logic for searching a Retention Index database file given output from NIST
|
|
3 '''
|
|
4 import match_library
|
|
5 import re
|
|
6 import sys
|
|
7 import csv
|
|
8
|
|
9 __author__ = "Marcel Kempenaar"
|
|
10 __contact__ = "brs@nbic.nl"
|
|
11 __copyright__ = "Copyright, 2012, Netherlands Bioinformatics Centre"
|
|
12 __license__ = "MIT"
|
|
13
|
|
14 def create_lookup_table(library_file, column_type_name, statphase):
|
|
15 '''
|
|
16 Creates a dictionary holding the contents of the library to be searched
|
|
17 @param library_file: library to read
|
|
18 @param column_type_name: the columns type name
|
|
19 @param statphase: the columns stationary phase
|
|
20 '''
|
|
21 (data, header) = match_library.read_library(library_file)
|
|
22 # Test for presence of required columns
|
|
23 if ('columntype' not in header or
|
|
24 'columnphasetype' not in header or
|
|
25 'cas' not in header):
|
|
26 raise IOError('Missing columns in ', library_file)
|
|
27
|
|
28 column_type_column = header.index("columntype")
|
|
29 statphase_column = header.index("columnphasetype")
|
|
30 cas_column = header.index("cas")
|
|
31
|
|
32 filtered_library = [line for line in data if line[column_type_column] == column_type_name
|
|
33 and line[statphase_column] == statphase]
|
|
34 lookup_dict = {}
|
|
35 for element in filtered_library:
|
|
36 # Here the cas_number is set to the numeric part of the cas_column value, so if the
|
|
37 # cas_column value is 'C1433' then cas_number will be '1433'
|
|
38 cas_number = str(re.findall(r'\d+', (element[cas_column]).strip())[0])
|
|
39 try:
|
|
40 lookup_dict[cas_number].append(element)
|
|
41 except KeyError:
|
|
42 lookup_dict[cas_number] = [element]
|
|
43 return lookup_dict
|
|
44
|
|
45
|
|
46 def _preferred(hits, pref, ctype, polar, model, method):
|
|
47 '''
|
|
48 Returns all entries in the lookup_dict that have the same column name, type and polarity
|
|
49 as given by the user, uses regression if selected given the model and method to use. The
|
|
50 regression is applied on the column with the best R-squared value in the model
|
|
51 @param hits: all entries in the lookup_dict for the given CAS number
|
|
52 @param pref: preferred GC-column, can be one or more names
|
|
53 @param ctype: column type (capillary etc.)
|
|
54 @param polar: polarity (polar / non-polar etc.)
|
|
55 @param model: data loaded from file containing regression models
|
|
56 @param method: supported regression method (i.e. poly(nomial) or linear)
|
|
57 '''
|
|
58 match = []
|
|
59 for column in pref:
|
|
60 for hit in hits:
|
|
61 if hit[4] == ctype and hit[5] == polar and hit[6] == column:
|
|
62 # Create copy of found hit since it will be altered downstream
|
|
63 match.extend(hit)
|
|
64 return match, False
|
|
65
|
|
66 # No hit found for current CAS number, return if not performing regression
|
|
67 if not model:
|
|
68 return False, False
|
|
69
|
|
70 # Perform regression
|
|
71 for column in pref:
|
|
72 if column not in model:
|
|
73 break
|
|
74 # Order regression candidates by R-squared value (last element)
|
|
75 order = sorted(model[column].items(), key=lambda col: col[1][-1])
|
|
76 # Create list of regression candidate column names
|
|
77 regress_columns = list(reversed([column for (column, _) in order]))
|
|
78 # Names of available columns
|
|
79 available = [hit[6] for hit in hits]
|
|
80
|
|
81 # TODO: combine Rsquared and number of datapoints to get the best regression match
|
|
82 '''
|
|
83 # Iterate regression columns (in order) and retrieve their models
|
|
84 models = {}
|
|
85 for col in regress_columns:
|
|
86 if col in available:
|
|
87 hit = list(hits[available.index(col)])
|
|
88 if hit[4] == ctype:
|
|
89 # models contains all model data including residuals [-2] and rsquared [-1]
|
|
90 models[pref[0]] = model[pref[0]][hit[6]]
|
|
91 # Get the combined maximum for residuals and rsquared
|
|
92 best_match = models[]
|
|
93 # Apply regression
|
|
94 if method == 'poly':
|
|
95 regressed = _apply_poly_regression(best_match, hit[6], float(hit[3]), model)
|
|
96 if regressed:
|
|
97 hit[3] = regressed
|
|
98 else:
|
|
99 return False, False
|
|
100 else:
|
|
101 hit[3] = _apply_linear_regression(best_match, hit[6], float(hit[3]), model)
|
|
102 match.extend(hit)
|
|
103 return match, hit[6]
|
|
104 '''
|
|
105
|
|
106 for col in regress_columns:
|
|
107 if col in available:
|
|
108 hit = list(hits[available.index(col)])
|
|
109 if hit[4] == ctype:
|
|
110 # Perform regression using a column for which regression is possible
|
|
111 if method == 'poly':
|
|
112 # Polynomial is only possible within a set border, if the RI falls outside
|
|
113 # of this border, skip this lookup
|
|
114 regressed = _apply_poly_regression(pref[0], hit[6], float(hit[3]), model)
|
|
115 if regressed:
|
|
116 hit[3] = regressed
|
|
117 else:
|
|
118 return False, False
|
|
119 else:
|
|
120 hit[3] = _apply_linear_regression(pref[0], hit[6], float(hit[3]), model)
|
|
121 match.extend(hit)
|
|
122 return match, hit[6]
|
|
123
|
|
124 return False, False
|
|
125
|
|
126
|
|
127
|
|
128 def default_hit(row, cas_nr, compound_id):
|
|
129 '''
|
|
130 This method will return a "default"/empty hit for cases where the
|
|
131 method _preferred() returns False (i.e. a RI could not be found
|
|
132 for the given cas nr, also not via regression.
|
|
133 '''
|
|
134 return [
|
|
135 #'CAS',
|
|
136 'C' + cas_nr,
|
|
137 #'NAME',
|
|
138 '',
|
|
139 #'FORMULA',
|
|
140 '',
|
|
141 #'RI',
|
|
142 '0.0',
|
|
143 #'Column.type',
|
|
144 '',
|
|
145 #'Column.phase.type',
|
|
146 '',
|
|
147 #'Column.name',
|
|
148 '',
|
|
149 #'phase.coding',
|
|
150 ' ',
|
|
151 #'CAS_column.Name',
|
|
152 '',
|
|
153 #'Centrotype', -> NOTE THAT compound_id is not ALWAYS centrotype...depends on MsClust algorithm used...for now only one MsClust algorithm is used so it is not an issue, but this should be updated/corrected once that changes
|
|
154 compound_id,
|
|
155 #'Regression.Column.Name',
|
|
156 '',
|
|
157 #'min',
|
|
158 '',
|
|
159 #'max',
|
|
160 '',
|
|
161 #'nr.duplicates',
|
|
162 '']
|
|
163
|
|
164
|
|
165 def format_result(lookup_dict, nist_tabular_filename, pref, ctype, polar, model, method):
|
|
166 '''
|
|
167 Looks up the compounds in the library lookup table and formats the results
|
|
168 @param lookup_dict: dictionary containing the library to be searched
|
|
169 @param nist_tabular_filename: NIST output file to be matched
|
|
170 @param pref: (list of) column-name(s) to look for
|
|
171 @param ctype: column type of interest
|
|
172 @param polar: polarity of the used column
|
|
173 @param model: data loaded from file containing regression models
|
|
174 @param method: supported regression method (i.e. poly(nomial) or linear)
|
|
175 '''
|
|
176 (nist_tabular_list, header_clean) = match_library.read_library(nist_tabular_filename)
|
|
177 # Retrieve indices of the CAS and compound_id columns (exit if not present)
|
|
178 try:
|
|
179 casi = header_clean.index("cas")
|
|
180 idi = header_clean.index("id")
|
|
181 except:
|
|
182 raise IOError("'CAS' or 'compound_id' not found in header of library file")
|
|
183
|
|
184 data = []
|
|
185 for row in nist_tabular_list:
|
|
186 casf = str(row[casi].replace('-', '').strip())
|
|
187 compound_id = str(row[idi].split('-')[0])
|
|
188 if casf in lookup_dict:
|
|
189 found_hit, regress = _preferred(lookup_dict[casf], pref, ctype, polar, model, method)
|
|
190 if found_hit:
|
|
191 # Keep cas nr as 'C'+ numeric part:
|
|
192 found_hit[0] = 'C' + casf
|
|
193 # Add compound id
|
|
194 found_hit.insert(9, compound_id)
|
|
195 # Add information on regression process
|
|
196 found_hit.insert(10, regress if regress else 'None')
|
|
197 # Replace column index references with actual number of duplicates
|
|
198 dups = len(found_hit[-1].split(','))
|
|
199 if dups > 1:
|
|
200 found_hit[-1] = str(dups + 1)
|
|
201 else:
|
|
202 found_hit[-1] = '0'
|
|
203 data.append(found_hit)
|
|
204 found_hit = ''
|
|
205 else:
|
|
206 data.append(default_hit(row, casf, compound_id))
|
|
207 else:
|
|
208 data.append(default_hit(row, casf, compound_id))
|
|
209
|
|
210 casf = ''
|
|
211 compound_id = ''
|
|
212 found_hit = []
|
|
213 dups = []
|
|
214 return data
|
|
215
|
|
216
|
|
217 def _save_data(content, outfile):
|
|
218 '''
|
|
219 Write to output file
|
|
220 @param content: content to write
|
|
221 @param outfile: file to write to
|
|
222 '''
|
|
223 # header
|
|
224 header = ['CAS',
|
|
225 'NAME',
|
|
226 'FORMULA',
|
|
227 'RI',
|
|
228 'Column.type',
|
|
229 'Column.phase.type',
|
|
230 'Column.name',
|
|
231 'phase.coding',
|
|
232 'CAS_column.Name',
|
|
233 'Centrotype',
|
|
234 'Regression.Column.Name',
|
|
235 'min',
|
|
236 'max',
|
|
237 'nr.duplicates']
|
|
238 output_handle = csv.writer(open(outfile, 'wb'), delimiter="\t")
|
|
239 output_handle.writerow(header)
|
|
240 for entry in content:
|
|
241 output_handle.writerow(entry)
|
|
242
|
|
243
|
|
244 def _read_model(model_file):
|
|
245 '''
|
|
246 Creates an easy to search dictionary for getting the regression parameters
|
|
247 for each valid combination of GC-columns
|
|
248 @param model_file: filename containing the regression models
|
|
249 '''
|
|
250 regress = list(csv.reader(open(model_file, 'rU'), delimiter='\t'))
|
|
251 if len(regress.pop(0)) > 9:
|
|
252 method = 'poly'
|
|
253 else:
|
|
254 method = 'linear'
|
|
255
|
|
256 model = {}
|
|
257 # Create new dictionary for each GC-column
|
|
258 for line in regress:
|
|
259 model[line[0]] = {}
|
|
260
|
|
261 # Add data
|
|
262 for line in regress:
|
|
263 if method == 'poly':
|
|
264 model[line[0]][line[1]] = [float(col) for col in line[2:11]]
|
|
265 else: # linear
|
|
266 model[line[0]][line[1]] = [float(col) for col in line[2:9]]
|
|
267
|
|
268 return model, method
|
|
269
|
|
270
|
|
271 def _apply_poly_regression(column1, column2, retention_index, model):
|
|
272 '''
|
|
273 Calculates a new retention index (RI) value using a given 3rd-degree polynomial
|
|
274 model based on data from GC columns 1 and 2
|
|
275 @param column1: name of the selected GC-column
|
|
276 @param column2: name of the GC-column to use for regression
|
|
277 @param retention_index: RI to convert
|
|
278 @param model: dictionary containing model information for all GC-columns
|
|
279 '''
|
|
280 coeff = model[column1][column2]
|
|
281 # If the retention index to convert is within range of the data the model is based on, perform regression
|
|
282 if coeff[4] < retention_index < coeff[5]:
|
|
283 return (coeff[3] * (retention_index ** 3) + coeff[2] * (retention_index ** 2) +
|
|
284 (retention_index * coeff[1]) + coeff[0])
|
|
285 else:
|
|
286 return False
|
|
287
|
|
288
|
|
289 def _apply_linear_regression(column1, column2, retention_index, model):
|
|
290 '''
|
|
291 Calculates a new retention index (RI) value using a given linear model based on data
|
|
292 from GC columns 1 and 2
|
|
293 @param column1: name of the selected GC-column
|
|
294 @param column2: name of the GC-column to use for regression
|
|
295 @param retention_index: RI to convert
|
|
296 @param model: dictionary containing model information for all GC-columns
|
|
297 '''
|
|
298 # TODO: No use of limits
|
|
299 coeff = model[column1][column2]
|
|
300 return coeff[1] * retention_index + coeff[0]
|
|
301
|
|
302
|
|
303 def main():
|
|
304 '''
|
|
305 Library Lookup main function
|
|
306 '''
|
|
307 library_file = sys.argv[1]
|
|
308 nist_tabular_filename = sys.argv[2]
|
|
309 ctype = sys.argv[3]
|
|
310 polar = sys.argv[4]
|
|
311 outfile = sys.argv[5]
|
|
312 pref = sys.argv[6:-1]
|
|
313 regress = sys.argv[-1]
|
|
314
|
|
315 if regress != 'False':
|
|
316 model, method = _read_model(regress)
|
|
317 else:
|
|
318 model, method = False, None
|
|
319
|
|
320 lookup_dict = create_lookup_table(library_file, ctype, polar)
|
|
321 data = format_result(lookup_dict, nist_tabular_filename, pref, ctype, polar, model, method)
|
|
322
|
|
323 _save_data(data, outfile)
|
|
324
|
|
325
|
|
326 if __name__ == "__main__":
|
|
327 main()
|