comparison env/lib/python3.9/site-packages/planemo/galaxy/test/actions.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """Actions related to running and reporting on Galaxy-specific testing."""
2
3 import io
4 import json
5 import os
6 from distutils.dir_util import copy_tree
7
8 import click
9 from galaxy.util import unicodify
10
11 from planemo.exit_codes import (
12 EXIT_CODE_GENERIC_FAILURE,
13 EXIT_CODE_NO_SUCH_TARGET,
14 EXIT_CODE_OK,
15 )
16 from planemo.galaxy.run import (
17 run_galaxy_command,
18 setup_venv,
19 )
20 from planemo.io import error, info, shell_join, warn
21 from planemo.reports import (
22 allure,
23 build_report,
24 )
25 from planemo.test.results import get_dict_value
26 from . import structures as test_structures
27
28
29 NO_XUNIT_REPORT_MESSAGE = ("Cannot locate xUnit report [%s] for tests - "
30 "required to build planemo report and summarize "
31 "tests.")
32 NO_JSON_REPORT_MESSAGE = ("Cannot locate JSON report [%s] for tests - "
33 "required to build planemo report and summarize "
34 "tests.")
35 REPORT_NOT_CHANGED = ("Galaxy failed to update test report [%s] for tests - "
36 "required to build planemo report and summarize "
37 "tests.")
38 NO_TESTS_MESSAGE = "No tests were executed - see Galaxy output for details."
39 ALL_TESTS_PASSED_MESSAGE = "All %d test(s) executed passed."
40 PROBLEM_COUNT_MESSAGE = ("There were problems with %d test(s) - out of %d "
41 "test(s) executed. See %s for detailed breakdown.")
42 GENERIC_PROBLEMS_MESSAGE = ("One or more tests failed. See %s for detailed "
43 "breakdown.")
44 GENERIC_TESTS_PASSED_MESSAGE = "No failing tests encountered."
45 TEST_DATA_UPDATED_MESSAGE = "Test data were updated and tests were rerun."
46 TEST_DATA_NOT_UPDATED_MESSAGE = "%s Therefore, no test data were updated." % ALL_TESTS_PASSED_MESSAGE
47
48
49 def run_in_config(ctx, config, run=run_galaxy_command, test_data_target_dir=None, **kwds):
50 """Run Galaxy tests with the run_tests.sh command.
51
52 The specified `config` object describes the context for tool
53 execution.
54 """
55 config_directory = config.config_directory
56 html_report_file = kwds["test_output"]
57
58 job_output_files = kwds.get("job_output_files", None)
59 if job_output_files is None:
60 job_output_files = os.path.join(config_directory, "jobfiles")
61
62 xunit_report_file = _xunit_state(kwds, config)
63 xunit_report_file_tracker = _FileChangeTracker(xunit_report_file)
64 structured_report_file = _structured_report_file(kwds, config)
65 structured_report_file_tracker = _FileChangeTracker(structured_report_file)
66
67 info("Testing using galaxy_root %s", config.galaxy_root)
68 # TODO: Allow running dockerized Galaxy here instead.
69 server_ini = os.path.join(config_directory, "galaxy.ini")
70 config.env["GALAXY_CONFIG_FILE"] = server_ini
71 config.env["GALAXY_TEST_VERBOSE_ERRORS"] = "true"
72 config.env["GALAXY_TEST_SAVE"] = job_output_files
73
74 cd_to_galaxy_command = ['cd', config.galaxy_root]
75 test_cmd = test_structures.GalaxyTestCommand(
76 html_report_file,
77 xunit_report_file,
78 structured_report_file,
79 failed=kwds.get("failed", False),
80 installed=kwds.get("installed", False),
81 ).build()
82 setup_common_startup_args = ""
83 if kwds.get("skip_venv", False):
84 setup_common_startup_args = shell_join(
85 'COMMON_STARTUP_ARGS=--skip-venv',
86 'export COMMON_STARTUP_ARGS',
87 'echo "Set COMMON_STARTUP_ARGS to ${COMMON_STARTUP_ARGS}"',
88 )
89 setup_venv_command = setup_venv(ctx, kwds)
90 cmd = shell_join(
91 cd_to_galaxy_command,
92 setup_common_startup_args,
93 setup_venv_command,
94 test_cmd,
95 )
96 action = "Testing tools"
97 return_code = run(
98 ctx,
99 cmd,
100 config.env,
101 action
102 )
103 if return_code != 0 and kwds.get('update_test_data', False):
104 for test_data_dir in [config.test_data_dir, test_data_target_dir]:
105 if test_data_dir:
106 copy_tree(job_output_files, test_data_dir)
107 kwds['test_data_updated'] = True
108 info('Test data updated. Rerunning...')
109 return_code = run(
110 ctx,
111 cmd,
112 config.env,
113 action
114 )
115
116 _check_test_outputs(xunit_report_file_tracker, structured_report_file_tracker)
117 test_results = test_structures.GalaxyTestResults(
118 structured_report_file,
119 xunit_report_file,
120 html_report_file,
121 return_code,
122 )
123
124 structured_data = test_results.structured_data
125 return handle_reports_and_summary(
126 ctx,
127 structured_data,
128 exit_code=test_results.exit_code,
129 kwds=kwds
130 )
131
132
133 def handle_reports_and_summary(ctx, structured_data, exit_code=None, kwds=None):
134 """Produce reports and print summary, return 0 if tests passed.
135
136 If ``exit_code`` is set - use underlying test source for return
137 code and test success determination, otherwise infer from supplied
138 test data.
139 """
140 if kwds is None:
141 kwds = {}
142 handle_reports(ctx, structured_data, kwds)
143 summary_exit_code = _handle_summary(
144 structured_data,
145 **kwds
146 )
147 return exit_code if exit_code is not None else summary_exit_code
148
149
150 def merge_reports(input_paths, output_path):
151 reports = []
152 for path in input_paths:
153 with io.open(path, encoding='utf-8') as f:
154 reports.append(json.load(f))
155 tests = []
156 for report in reports:
157 tests.extend(report["tests"])
158 tests = sorted(tests, key=lambda k: k['id'])
159 merged_report = {"tests": tests}
160 with io.open(output_path, mode="w", encoding='utf-8') as out:
161 out.write(unicodify(json.dumps(merged_report)))
162
163
164 def handle_reports(ctx, structured_data, kwds):
165 """Write reports based on user specified kwds."""
166 exceptions = []
167 structured_report_file = kwds.get("test_output_json", None)
168 if structured_report_file and not os.path.exists(structured_report_file):
169 try:
170 with io.open(structured_report_file, mode="w", encoding='utf-8') as f:
171 f.write(unicodify(json.dumps(structured_data)))
172 except Exception as e:
173 exceptions.append(e)
174
175 for report_type in ["html", "markdown", "text", "xunit", "junit", "allure"]:
176 try:
177 _handle_test_output_file(
178 ctx, report_type, structured_data, kwds
179 )
180 except Exception as e:
181 exceptions.append(e)
182
183 if len(exceptions) > 0:
184 raise exceptions[0]
185
186
187 def _handle_test_output_file(ctx, report_type, test_data, kwds):
188 kwd_name = "test_output"
189 if report_type != "html":
190 kwd_name = "test_output_%s" % report_type
191
192 path = kwds.get(kwd_name, None)
193 if path is None:
194 message = "No file specified for %s, skipping test output." % kwd_name
195 ctx.vlog(message)
196 return
197
198 if report_type == "allure":
199 file_modication_datatime = kwds.get("file_modication_datatime")
200 allure.write_results(path, test_data, file_modication_datatime=file_modication_datatime)
201 return
202
203 try:
204 contents = build_report.build_report(
205 test_data, report_type=report_type
206 )
207 except Exception:
208 message = "Problem producing report file %s for %s" % (
209 path, kwd_name
210 )
211 ctx.vlog(message, exception=True)
212 raise
213
214 try:
215 with io.open(path, mode='w', encoding='utf-8') as handle:
216 handle.write(unicodify(contents))
217 except Exception:
218 message = "Problem writing output file %s for %s" % (
219 kwd_name, path
220 )
221 ctx.vlog(message, exception=True)
222 raise
223
224
225 def _handle_summary(
226 structured_data,
227 **kwds
228 ):
229 summary_dict = get_dict_value("summary", structured_data)
230 num_tests = get_dict_value("num_tests", summary_dict)
231 num_failures = get_dict_value("num_failures", summary_dict)
232 num_errors = get_dict_value("num_errors", summary_dict)
233 num_problems = num_failures + num_errors
234
235 summary_exit_code = EXIT_CODE_OK
236 if num_problems > 0:
237 summary_exit_code = EXIT_CODE_GENERIC_FAILURE
238 elif num_tests == 0:
239 summary_exit_code = EXIT_CODE_NO_SUCH_TARGET
240
241 summary_style = kwds.get("summary")
242 if kwds.get('test_data_updated'):
243 info(TEST_DATA_UPDATED_MESSAGE)
244 if summary_style != "none":
245 if num_tests == 0:
246 warn(NO_TESTS_MESSAGE)
247 elif num_problems == 0:
248 if kwds.get('update_test_data') and not kwds.get('test_data_updated'):
249 info(TEST_DATA_NOT_UPDATED_MESSAGE % num_tests)
250 else:
251 info(ALL_TESTS_PASSED_MESSAGE % num_tests)
252 elif num_problems:
253 html_report_file = kwds.get("test_output")
254 message_args = (num_problems, num_tests, html_report_file)
255 message = PROBLEM_COUNT_MESSAGE % message_args
256 warn(message)
257
258 _summarize_tests_full(
259 structured_data,
260 **kwds
261 )
262
263 return summary_exit_code
264
265
266 def _summarize_tests_full(
267 structured_data,
268 **kwds
269 ):
270 tests = get_dict_value("tests", structured_data)
271 for test_case_data in tests:
272 _summarize_test_case(test_case_data, **kwds)
273
274
275 def passed(xunit_testcase_el):
276 did_pass = True
277 for child_el in list(xunit_testcase_el):
278 if child_el.tag in ["failure", "error"]:
279 did_pass = False
280 return did_pass
281
282
283 def _summarize_test_case(structured_data, **kwds):
284 summary_style = kwds.get("summary")
285 test_id = test_structures.case_id(
286 raw_id=get_dict_value("id", structured_data)
287 )
288 status = get_dict_value(
289 "status",
290 get_dict_value("data", structured_data)
291 )
292 if status != "success":
293 state = click.style("failed", bold=True, fg='red')
294 else:
295 state = click.style("passed", bold=True, fg='green')
296 click.echo(test_id.label + ": " + state)
297 if summary_style != "minimal":
298 _print_command_line(structured_data, test_id)
299
300
301 def _print_command_line(test, test_id):
302 execution_problem = test.get("execution_problem", None)
303 if execution_problem:
304 click.echo("| command: *could not execute job, no command generated* ")
305 return
306
307 job = None
308 try:
309 job = test["job"]
310 except (KeyError, IndexError):
311 click.echo("| command: *failed to find job for test object [%s]" % test)
312 return
313 try:
314 command = job["command_line"]
315 except (KeyError, IndexError):
316 click.echo("| command: *failed to find command_line for job object [%s]" % job)
317 return
318
319 click.echo("| command: %s" % command)
320
321
322 def _check_test_outputs(
323 xunit_report_file_tracker,
324 structured_report_file_tracker
325 ):
326 if not os.path.exists(xunit_report_file_tracker.path):
327 message = NO_XUNIT_REPORT_MESSAGE % xunit_report_file_tracker.path
328 error(message)
329 raise Exception(message)
330
331 if not os.path.exists(structured_report_file_tracker.path):
332 message = NO_JSON_REPORT_MESSAGE % structured_report_file_tracker.path
333 error(message)
334 raise Exception(message)
335
336 if not xunit_report_file_tracker.changed():
337 message = REPORT_NOT_CHANGED % xunit_report_file_tracker.path
338 error(message)
339 raise Exception(message)
340
341 if not structured_report_file_tracker.changed():
342 message = REPORT_NOT_CHANGED % structured_report_file_tracker.path
343 error(message)
344 raise Exception(message)
345
346
347 def _xunit_state(kwds, config):
348 # This has been supported in Galaxy for well over a year, just going to assume
349 # it from here on out.
350 # xunit_supported = True
351 # if shell("grep -q xunit '%s'/run_tests.sh" % config.galaxy_root):
352 # xunit_supported = False
353
354 xunit_report_file = kwds.get("test_output_xunit", None)
355 if xunit_report_file is None:
356 xunit_report_file = os.path.join(config.config_directory, "xunit.xml")
357
358 return xunit_report_file
359
360
361 def _structured_report_file(kwds, config):
362 # This has been supported in Galaxy for well over a year, just going to assume
363 # it from here on out.
364 # structured_data_supported = True
365 # if shell("grep -q structured_data '%s'/run_tests.sh" % config.galaxy_root):
366 # structured_data_supported = False
367
368 structured_report_file = None
369 structured_report_file = kwds.get("test_output_json", None)
370 if structured_report_file is None:
371 conf_dir = config.config_directory
372 structured_report_file = os.path.join(conf_dir, "structured_data.json")
373
374 return structured_report_file
375
376
377 class _FileChangeTracker(object):
378
379 def __init__(self, path):
380 modification_time = None
381 if os.path.exists(path):
382 modification_time = os.path.getmtime(path)
383
384 self.path = path
385 self.modification_time = modification_time
386
387 def changed(self):
388 if self.modification_time:
389 new_modification_time = os.path.getmtime(self.path)
390 return self.modification_time != new_modification_time
391 else:
392 return os.path.exists(self.path)
393
394
395 __all__ = (
396 "run_in_config",
397 "handle_reports",
398 "handle_reports_and_summary",
399 )