0
|
1 '''Integration tests for the GCMS project'''
|
|
2
|
|
3 from pkg_resources import resource_filename # @UnresolvedImport # pylint: disable=E0611
|
|
4 from GCMS import library_lookup, combine_output
|
|
5 from GCMS.rankfilter_GCMS import rankfilter
|
|
6 import os.path
|
|
7 import sys
|
|
8 import unittest
|
|
9 import re
|
|
10
|
|
11
|
|
12 class IntegrationTest(unittest.TestCase):
|
|
13 def test_library_lookup(self):
|
|
14 '''
|
|
15 Run main for data/NIST_tabular and compare produced files with references determined earlier.
|
|
16 '''
|
|
17 # Create out folder
|
|
18 outdir = "output/" #tempfile.mkdtemp(prefix='test_library_lookup')
|
|
19 if not os.path.exists(outdir):
|
|
20 os.makedirs(outdir)
|
|
21 outfile_base = os.path.join(outdir, 'produced_library_lookup')
|
|
22 outfile_txt = outfile_base + '.txt'
|
|
23
|
|
24 #Build up arguments and run
|
|
25 input_txt = resource_filename(__name__, "data/NIST_tabular.txt")
|
|
26 library = resource_filename(__name__, "data/RIDB_subset.txt")
|
|
27 regress_model = resource_filename(__name__, "data/ridb_poly_regression.txt")
|
|
28 sys.argv = ['test',
|
|
29 library,
|
|
30 input_txt,
|
|
31 'Capillary',
|
|
32 'Semi-standard non-polar',
|
|
33 outfile_txt,
|
|
34 'HP-5',
|
|
35 regress_model]
|
|
36 # Execute main function with arguments provided through sys.argv
|
|
37 library_lookup.main()
|
|
38 #Compare with reference files
|
|
39 reference_txt = resource_filename(__name__, 'reference/produced_library_lookup.txt')
|
|
40
|
|
41 #read both the reference file and actual output files
|
|
42 expected = _read_file(reference_txt)
|
|
43 actual = _read_file(outfile_txt)
|
|
44
|
|
45 #convert the read in files to lists we can compare
|
|
46 expected = expected.split()
|
|
47 actual = actual.split()
|
|
48
|
|
49 for exp, act in zip(expected, actual):
|
|
50 if re.match('\\d+\\.\\d+', exp):
|
|
51 exp = float(exp)
|
|
52 act = float(act)
|
|
53 self.assertAlmostEqual(exp, act, places=5)
|
|
54 else:
|
|
55 # compare values
|
|
56 self.failUnlessEqual(expected, actual)
|
|
57
|
|
58
|
|
59 def test_combine_output_simple(self):
|
|
60 '''
|
|
61 Run main for data/NIST_tabular and compare produced files with references determined earlier.
|
|
62 '''
|
|
63 # Create out folder
|
|
64 outdir = "output/" #tempfile.mkdtemp(prefix='test_library_lookup')
|
|
65 if not os.path.exists(outdir):
|
|
66 os.makedirs(outdir)
|
|
67 outfile_base = os.path.join(outdir, 'produced_combine_output')
|
|
68 outfile_single_txt = outfile_base + '_single.txt'
|
|
69 outfile_multi_txt = outfile_base + '_multi.txt'
|
|
70
|
|
71 #Build up arguments and run
|
|
72 input_rankfilter = resource_filename(__name__, "data/Rankfilter.txt")
|
|
73 input_caslookup = resource_filename(__name__, "data/Caslookup.txt")
|
|
74 sys.argv = ['test',
|
|
75 input_rankfilter,
|
|
76 input_caslookup,
|
|
77 outfile_single_txt,
|
|
78 outfile_multi_txt]
|
|
79 # Execute main function with arguments provided through sys.argv
|
|
80 combine_output.main()
|
|
81 #Compare with reference files
|
|
82 # reference_single_txt = resource_filename(__name__, 'reference/produced_combine_output_single.txt')
|
|
83 # reference_multi_txt = resource_filename(__name__, 'reference/produced_combine_output_multi.txt')
|
|
84 # self.failUnlessEqual(_read_file(reference_single_txt), _read_file(outfile_single_txt))
|
|
85 # self.failUnlessEqual(_read_file(reference_multi_txt), _read_file(outfile_multi_txt))
|
|
86
|
|
87 #Clean up
|
|
88 #shutil.rmtree(tempdir)
|
|
89
|
|
90
|
|
91
|
|
92 def def_test_rank_filter_advanced(self):
|
|
93 '''
|
|
94 Run main of RankFilter
|
|
95 '''
|
|
96 # Create out folder
|
|
97 outdir = "output/integration/"
|
|
98 if not os.path.exists(outdir):
|
|
99 os.makedirs(outdir)
|
|
100
|
|
101 #Build up arguments and run
|
|
102 input_txt = resource_filename(__name__, "data/integration/RankFilterInput_conf.txt")
|
|
103 sys.argv = ['test',
|
|
104 input_txt]
|
|
105 # Execute main function with arguments provided through sys.argv
|
|
106 rankfilter.main()
|
|
107 #Compare with reference files
|
|
108
|
|
109 def def_test_library_lookup_advanced(self):
|
|
110 '''
|
|
111 Run main for data/NIST_tabular and compare produced files with references determined earlier.
|
|
112 '''
|
|
113 # Create out folder
|
|
114 outdir = "output/integration/"
|
|
115 if not os.path.exists(outdir):
|
|
116 os.makedirs(outdir)
|
|
117 outfile_base = os.path.join(outdir, 'produced_library_lookup_ADVANCED')
|
|
118 outfile_txt = outfile_base + '.txt'
|
|
119
|
|
120 #Build up arguments and run
|
|
121 input_txt = resource_filename(__name__, "data/integration/NIST_identification_results_tabular.txt")
|
|
122 library = resource_filename(__name__, "data/integration/Library_RI_DB_capillary_columns-noDuplicates.txt")
|
|
123 regress_model = resource_filename(__name__, "data/integration/regression_MODEL_for_columns.txt")
|
|
124 sys.argv = ['test',
|
|
125 library,
|
|
126 input_txt,
|
|
127 'Capillary',
|
|
128 'Semi-standard non-polar',
|
|
129 outfile_txt,
|
|
130 'DB-5',
|
|
131 regress_model]
|
|
132 # Execute main function with arguments provided through sys.argv
|
|
133 library_lookup.main()
|
|
134
|
|
135
|
|
136
|
|
137 def test_combine_output_advanced(self):
|
|
138 '''
|
|
139 Variant on test case above, but a bit more complex as some of the centrotypes have
|
|
140 different NIST hits which should give them different RI values. This test also
|
|
141 runs not only the combine output, but the other two preceding steps as well,
|
|
142 so it ensures the integration also works on the current code of all three tools.
|
|
143 '''
|
|
144
|
|
145 # Run RankFilter
|
|
146 self.def_test_rank_filter_advanced()
|
|
147
|
|
148 # Run library CAS RI lookup
|
|
149 self.def_test_library_lookup_advanced()
|
|
150
|
|
151 outdir = "output/integration/"
|
|
152 outfile_base = os.path.join(outdir, 'produced_combine_output')
|
|
153 outfile_single_txt = outfile_base + '_single.txt'
|
|
154 outfile_multi_txt = outfile_base + '_multi.txt'
|
|
155
|
|
156 #Build up arguments and run
|
|
157 input_rankfilter = resource_filename(__name__, "output/integration/produced_rank_filter_out.txt")
|
|
158 input_caslookup = resource_filename(__name__, "output/integration/produced_library_lookup_ADVANCED.txt")
|
|
159 sys.argv = ['test',
|
|
160 input_rankfilter,
|
|
161 input_caslookup,
|
|
162 outfile_single_txt,
|
|
163 outfile_multi_txt]
|
|
164 # Execute main function with arguments provided through sys.argv
|
|
165 combine_output.main()
|
|
166 #Compare with reference files
|
|
167 # reference_single_txt = resource_filename(__name__, 'reference/produced_combine_output_single.txt')
|
|
168 # reference_multi_txt = resource_filename(__name__, 'reference/produced_combine_output_multi.txt')
|
|
169 # self.failUnlessEqual(_read_file(reference_single_txt), _read_file(outfile_single_txt))
|
|
170 # self.failUnlessEqual(_read_file(reference_multi_txt), _read_file(outfile_multi_txt))
|
|
171
|
|
172 # Check 1: output single should have one record per centrotype:
|
|
173
|
|
174
|
|
175 # Check 2: output single has more records than output single:
|
|
176 combine_result_single_items = combine_output._process_data(outfile_single_txt)
|
|
177 combine_result_multi_items = combine_output._process_data(outfile_multi_txt)
|
|
178 self.assertGreater(len(combine_result_single_items['Centrotype']),
|
|
179 len(combine_result_multi_items['Centrotype']))
|
|
180
|
|
181
|
|
182 # Check 3: library_lookup RI column, centrotype column, ri_svr column are correct:
|
|
183 caslookup_items = combine_output._process_data(input_caslookup)
|
|
184 rankfilter_items = combine_output._process_data(input_rankfilter)
|
|
185
|
|
186 # check that the caslookup RI column is correctly maintained in its original order in
|
|
187 # the combined file:
|
|
188 ri_caslookup = caslookup_items['RI']
|
|
189 ri_combine_single = combine_result_single_items['RI']
|
|
190 self.assertListEqual(ri_caslookup, ri_combine_single)
|
|
191
|
|
192 # check the centrotype column's integrity:
|
|
193 centrotype_caslookup = caslookup_items['Centrotype']
|
|
194 centrotype_combine_single = combine_result_single_items['Centrotype']
|
|
195 centrotype_rankfilter = _get_centrotype_rankfilter(rankfilter_items['ID'])
|
|
196 self.assertListEqual(centrotype_caslookup, centrotype_combine_single)
|
|
197 self.assertListEqual(centrotype_caslookup, centrotype_rankfilter)
|
|
198
|
|
199 # integration and integrity checks:
|
|
200 file_NIST = resource_filename(__name__, "data/integration/NIST_identification_results_tabular.txt")
|
|
201 file_NIST_items = combine_output._process_data(file_NIST)
|
|
202 # check that rank filter output has exactly the same ID items as the original NIST input file:
|
|
203 self.assertListEqual(file_NIST_items['ID'], rankfilter_items['ID'])
|
|
204 # check the same for the CAS column:
|
|
205 self.assertListEqual(_get_strippedcas(file_NIST_items['CAS']), rankfilter_items['CAS'])
|
|
206 # now check the NIST CAS column against the cas lookup results:
|
|
207 cas_NIST = _get_processedcas(file_NIST_items['CAS'])
|
|
208 self.assertListEqual(cas_NIST, caslookup_items['CAS'])
|
|
209 # now check the CAS of the combined result. If all checks are OK, it means the CAS column's order
|
|
210 # and values remained stable throughout all steps:
|
|
211 self.assertListEqual(rankfilter_items['CAS'], combine_result_single_items['CAS'])
|
|
212
|
|
213 # check that the rankfilter RIsvr column is correctly maintained in its original order in
|
|
214 # the combined file:
|
|
215 risvr_rankfilter = rankfilter_items['RIsvr']
|
|
216 risvr_combine_single = combine_result_single_items['RIsvr']
|
|
217 self.assertListEqual(risvr_rankfilter, risvr_combine_single)
|
|
218
|
|
219
|
|
220
|
|
221
|
|
222 def _get_centrotype_rankfilter(id_list):
|
|
223 '''
|
|
224 returns the list of centrotype ids given a list of ID in the
|
|
225 form e.g. 74-1.0-564-1905200-7, where the numbers before the
|
|
226 first "-" are the centrotype id
|
|
227 '''
|
|
228 result = []
|
|
229 for compound_id_idx in xrange(len(id_list)):
|
|
230 compound_id = id_list[compound_id_idx]
|
|
231 centrotype = compound_id.split('-')[0]
|
|
232 result.append(centrotype)
|
|
233
|
|
234 return result
|
|
235
|
|
236
|
|
237 def _get_processedcas(cas_list):
|
|
238 '''
|
|
239 returns the list cas numbers in the form C64175 instead of 64-17-5
|
|
240 '''
|
|
241 result = []
|
|
242 for cas_id_idx in xrange(len(cas_list)):
|
|
243 cas = cas_list[cas_id_idx]
|
|
244 processed_cas = 'C' + str(cas.replace('-', '').strip())
|
|
245 result.append(processed_cas)
|
|
246
|
|
247 return result
|
|
248
|
|
249 def _get_strippedcas(cas_list):
|
|
250 '''
|
|
251 removes the leading white space from e.g. " 64-17-5"
|
|
252 '''
|
|
253 result = []
|
|
254 for cas_id_idx in xrange(len(cas_list)):
|
|
255 cas = cas_list[cas_id_idx]
|
|
256 processed_cas = cas.strip()
|
|
257 result.append(processed_cas)
|
|
258
|
|
259 return result
|
|
260
|
|
261
|
|
262 def _read_file(filename):
|
|
263 '''
|
|
264 Helper method to quickly read a file
|
|
265 @param filename:
|
|
266 '''
|
|
267 with open(filename) as handle:
|
|
268 return handle.read()
|