Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/galaxy/tool_util/verify/interactor.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 import io | |
2 import json | |
3 import os | |
4 import re | |
5 import shutil | |
6 import sys | |
7 import tarfile | |
8 import tempfile | |
9 import time | |
10 import zipfile | |
11 from json import dumps | |
12 from logging import getLogger | |
13 | |
14 import requests | |
15 from packaging.version import parse as parse_version, Version | |
16 try: | |
17 from nose.tools import nottest | |
18 except ImportError: | |
19 def nottest(x): | |
20 return x | |
21 | |
22 from galaxy import util | |
23 from galaxy.tool_util.parser.interface import TestCollectionDef, TestCollectionOutputDef | |
24 from galaxy.util.bunch import Bunch | |
25 from . import verify | |
26 from .asserts import verify_assertions | |
27 from .wait import wait_on | |
28 | |
29 log = getLogger(__name__) | |
30 | |
31 # Off by default because it can pound the database pretty heavily | |
32 # and result in sqlite errors on larger tests or larger numbers of | |
33 # tests. | |
34 VERBOSE_ERRORS = util.asbool(os.environ.get("GALAXY_TEST_VERBOSE_ERRORS", False)) | |
35 UPLOAD_ASYNC = util.asbool(os.environ.get("GALAXY_TEST_UPLOAD_ASYNC", True)) | |
36 ERROR_MESSAGE_DATASET_SEP = "--------------------------------------" | |
37 DEFAULT_TOOL_TEST_WAIT = int(os.environ.get("GALAXY_TEST_DEFAULT_WAIT", 86400)) | |
38 | |
39 DEFAULT_FTYPE = 'auto' | |
40 # This following default dbkey was traditionally hg17 before Galaxy 18.05, | |
41 # restore this behavior by setting GALAXY_TEST_DEFAULT_DBKEY to hg17. | |
42 DEFAULT_DBKEY = os.environ.get("GALAXY_TEST_DEFAULT_DBKEY", "?") | |
43 | |
44 | |
45 class OutputsDict(dict): | |
46 """Ordered dict that can also be accessed by index. | |
47 | |
48 >>> out = OutputsDict() | |
49 >>> out['item1'] = 1 | |
50 >>> out['item2'] = 2 | |
51 >>> out[1] == 2 == out['item2'] | |
52 True | |
53 """ | |
54 | |
55 def __getitem__(self, item): | |
56 if isinstance(item, int): | |
57 return self[list(self.keys())[item]] | |
58 else: | |
59 return super().__getitem__(item) | |
60 | |
61 | |
62 def stage_data_in_history(galaxy_interactor, tool_id, all_test_data, history=None, force_path_paste=False, maxseconds=DEFAULT_TOOL_TEST_WAIT): | |
63 # Upload any needed files | |
64 upload_waits = [] | |
65 | |
66 assert tool_id | |
67 | |
68 if UPLOAD_ASYNC: | |
69 for test_data in all_test_data: | |
70 upload_waits.append(galaxy_interactor.stage_data_async(test_data, | |
71 history, | |
72 tool_id, | |
73 force_path_paste=force_path_paste, | |
74 maxseconds=maxseconds)) | |
75 for upload_wait in upload_waits: | |
76 upload_wait() | |
77 else: | |
78 for test_data in all_test_data: | |
79 upload_wait = galaxy_interactor.stage_data_async(test_data, | |
80 history, | |
81 tool_id, | |
82 force_path_paste=force_path_paste, | |
83 maxseconds=maxseconds) | |
84 upload_wait() | |
85 | |
86 | |
87 class GalaxyInteractorApi: | |
88 | |
89 def __init__(self, **kwds): | |
90 self.api_url = "%s/api" % kwds["galaxy_url"].rstrip("/") | |
91 self.master_api_key = kwds["master_api_key"] | |
92 self.api_key = self.__get_user_key(kwds.get("api_key"), kwds.get("master_api_key"), test_user=kwds.get("test_user")) | |
93 if kwds.get('user_api_key_is_admin_key', False): | |
94 self.master_api_key = self.api_key | |
95 self.keep_outputs_dir = kwds["keep_outputs_dir"] | |
96 self.download_attempts = kwds.get("download_attempts", 1) | |
97 self.download_sleep = kwds.get("download_sleep", 1) | |
98 # Local test data directories. | |
99 self.test_data_directories = kwds.get("test_data") or [] | |
100 | |
101 self._target_galaxy_version = None | |
102 | |
103 self.uploads = {} | |
104 | |
105 @property | |
106 def target_galaxy_version(self): | |
107 if self._target_galaxy_version is None: | |
108 self._target_galaxy_version = parse_version(self._get('version').json()['version_major']) | |
109 return self._target_galaxy_version | |
110 | |
111 @property | |
112 def supports_test_data_download(self): | |
113 return self.target_galaxy_version >= Version("19.01") | |
114 | |
115 def __get_user_key(self, user_key, admin_key, test_user=None): | |
116 if not test_user: | |
117 test_user = "test@bx.psu.edu" | |
118 if user_key: | |
119 return user_key | |
120 test_user = self.ensure_user_with_email(test_user) | |
121 return self._post("users/%s/api_key" % test_user['id'], key=admin_key).json() | |
122 | |
123 # def get_tools(self): | |
124 # response = self._get("tools?in_panel=false") | |
125 # assert response.status_code == 200, "Non 200 response from tool index API. [%s]" % response.content | |
126 # return response.json() | |
127 | |
128 def get_tests_summary(self): | |
129 response = self._get("tools/tests_summary") | |
130 assert response.status_code == 200, "Non 200 response from tool tests available API. [%s]" % response.content | |
131 return response.json() | |
132 | |
133 def get_tool_tests(self, tool_id, tool_version=None): | |
134 url = "tools/%s/test_data" % tool_id | |
135 if tool_version is not None: | |
136 url += "?tool_version=%s" % tool_version | |
137 response = self._get(url) | |
138 assert response.status_code == 200, "Non 200 response from tool test API. [%s]" % response.content | |
139 return response.json() | |
140 | |
141 def verify_output_collection(self, output_collection_def, output_collection_id, history, tool_id): | |
142 data_collection = self._get("dataset_collections/%s" % output_collection_id, data={"instance_type": "history"}).json() | |
143 | |
144 def verify_dataset(element, element_attrib, element_outfile): | |
145 hda = element["object"] | |
146 self.verify_output_dataset( | |
147 history, | |
148 hda_id=hda["id"], | |
149 outfile=element_outfile, | |
150 attributes=element_attrib, | |
151 tool_id=tool_id | |
152 ) | |
153 | |
154 verify_collection(output_collection_def, data_collection, verify_dataset) | |
155 | |
156 def verify_output(self, history_id, jobs, output_data, output_testdef, tool_id, maxseconds): | |
157 outfile = output_testdef.outfile | |
158 attributes = output_testdef.attributes | |
159 name = output_testdef.name | |
160 | |
161 self.wait_for_jobs(history_id, jobs, maxseconds) | |
162 hid = self.__output_id(output_data) | |
163 # TODO: Twill version verifies dataset is 'ok' in here. | |
164 try: | |
165 self.verify_output_dataset(history_id=history_id, hda_id=hid, outfile=outfile, attributes=attributes, tool_id=tool_id) | |
166 except AssertionError as e: | |
167 raise AssertionError("Output {}: {}".format(name, str(e))) | |
168 | |
169 primary_datasets = attributes.get('primary_datasets', {}) | |
170 if primary_datasets: | |
171 job_id = self._dataset_provenance(history_id, hid)["job_id"] | |
172 outputs = self._get("jobs/%s/outputs" % (job_id)).json() | |
173 | |
174 for designation, (primary_outfile, primary_attributes) in primary_datasets.items(): | |
175 primary_output = None | |
176 for output in outputs: | |
177 if output["name"] == f'__new_primary_file_{name}|{designation}__': | |
178 primary_output = output | |
179 break | |
180 | |
181 if not primary_output: | |
182 msg_template = "Failed to find primary dataset with designation [%s] for output with name [%s]" | |
183 msg_args = (designation, name) | |
184 raise Exception(msg_template % msg_args) | |
185 | |
186 primary_hda_id = primary_output["dataset"]["id"] | |
187 try: | |
188 self.verify_output_dataset(history_id, primary_hda_id, primary_outfile, primary_attributes, tool_id=tool_id) | |
189 except AssertionError as e: | |
190 raise AssertionError("Primary output {}: {}".format(name, str(e))) | |
191 | |
192 def wait_for_jobs(self, history_id, jobs, maxseconds): | |
193 for job in jobs: | |
194 self.wait_for_job(job['id'], history_id, maxseconds) | |
195 | |
196 def verify_output_dataset(self, history_id, hda_id, outfile, attributes, tool_id): | |
197 fetcher = self.__dataset_fetcher(history_id) | |
198 test_data_downloader = self.__test_data_downloader(tool_id) | |
199 verify_hid( | |
200 outfile, | |
201 hda_id=hda_id, | |
202 attributes=attributes, | |
203 dataset_fetcher=fetcher, | |
204 test_data_downloader=test_data_downloader, | |
205 keep_outputs_dir=self.keep_outputs_dir | |
206 ) | |
207 self._verify_metadata(history_id, hda_id, attributes) | |
208 | |
209 def _verify_metadata(self, history_id, hid, attributes): | |
210 """Check dataset metadata. | |
211 | |
212 ftype on output maps to `file_ext` on the hda's API description, `name`, `info`, | |
213 `dbkey` and `tags` all map to the API description directly. Other metadata attributes | |
214 are assumed to be datatype-specific and mapped with a prefix of `metadata_`. | |
215 """ | |
216 metadata = attributes.get('metadata', {}).copy() | |
217 for key in metadata.copy().keys(): | |
218 if key not in ['name', 'info', 'tags', 'created_from_basename']: | |
219 new_key = "metadata_%s" % key | |
220 metadata[new_key] = metadata[key] | |
221 del metadata[key] | |
222 elif key == "info": | |
223 metadata["misc_info"] = metadata["info"] | |
224 del metadata["info"] | |
225 expected_file_type = attributes.get('ftype', None) | |
226 if expected_file_type: | |
227 metadata["file_ext"] = expected_file_type | |
228 | |
229 if metadata: | |
230 time.sleep(5) | |
231 dataset = self._get(f"histories/{history_id}/contents/{hid}").json() | |
232 for key, value in metadata.items(): | |
233 try: | |
234 dataset_value = dataset.get(key, None) | |
235 | |
236 def compare(val, expected): | |
237 if str(val) != str(expected): | |
238 msg = "Dataset metadata verification for [%s] failed, expected [%s] but found [%s]. Dataset API value was [%s]." | |
239 msg_params = (key, value, dataset_value, dataset) | |
240 msg = msg % msg_params | |
241 raise Exception(msg) | |
242 | |
243 if isinstance(dataset_value, list): | |
244 value = str(value).split(",") | |
245 if len(value) != len(dataset_value): | |
246 msg = "Dataset metadata verification for [%s] failed, expected [%s] but found [%s], lists differ in length. Dataset API value was [%s]." | |
247 msg_params = (key, value, dataset_value, dataset) | |
248 msg = msg % msg_params | |
249 raise Exception(msg) | |
250 for val, expected in zip(dataset_value, value): | |
251 compare(val, expected) | |
252 else: | |
253 compare(dataset_value, value) | |
254 except KeyError: | |
255 msg = "Failed to verify dataset metadata, metadata key [%s] was not found." % key | |
256 raise Exception(msg) | |
257 | |
258 def wait_for_job(self, job_id, history_id=None, maxseconds=DEFAULT_TOOL_TEST_WAIT): | |
259 self.wait_for(lambda: self.__job_ready(job_id, history_id), maxseconds=maxseconds) | |
260 | |
261 def wait_for(self, func, what='tool test run', **kwd): | |
262 walltime_exceeded = int(kwd.get("maxseconds", DEFAULT_TOOL_TEST_WAIT)) | |
263 wait_on(func, what, walltime_exceeded) | |
264 | |
265 def get_job_stdio(self, job_id): | |
266 job_stdio = self.__get_job_stdio(job_id).json() | |
267 return job_stdio | |
268 | |
269 def __get_job(self, job_id): | |
270 return self._get('jobs/%s' % job_id) | |
271 | |
272 def __get_job_stdio(self, job_id): | |
273 return self._get('jobs/%s?full=true' % job_id) | |
274 | |
275 def new_history(self, history_name='test_history', publish_history=False): | |
276 create_response = self._post("histories", {"name": history_name}) | |
277 try: | |
278 create_response.raise_for_status() | |
279 except Exception as e: | |
280 raise Exception(f"Error occured while creating history with name '{history_name}': {e}") | |
281 history_id = create_response.json()['id'] | |
282 if publish_history: | |
283 self.publish_history(history_id) | |
284 return history_id | |
285 | |
286 def publish_history(self, history_id): | |
287 response = self._put(f'histories/{history_id}', json.dumps({'published': True})) | |
288 response.raise_for_status() | |
289 | |
290 @nottest | |
291 def test_data_path(self, tool_id, filename): | |
292 response = self._get(f"tools/{tool_id}/test_data_path?filename={filename}", admin=True) | |
293 return response.json() | |
294 | |
295 @nottest | |
296 def test_data_download(self, tool_id, filename, mode='file', is_output=True): | |
297 result = None | |
298 local_path = None | |
299 | |
300 if self.supports_test_data_download: | |
301 response = self._get(f"tools/{tool_id}/test_data_download?filename={filename}", admin=True) | |
302 if response.status_code == 200: | |
303 if mode == 'file': | |
304 result = response.content | |
305 elif mode == 'directory': | |
306 prefix = os.path.basename(filename) | |
307 path = tempfile.mkdtemp(prefix=prefix) | |
308 fileobj = io.BytesIO(response.content) | |
309 if zipfile.is_zipfile(fileobj): | |
310 with zipfile.ZipFile(fileobj) as contents: | |
311 contents.extractall(path=path) | |
312 else: | |
313 # Galaxy < 21.01 | |
314 with tarfile.open(fileobj=fileobj) as tar_contents: | |
315 tar_contents.extractall(path=path) | |
316 result = path | |
317 else: | |
318 # We can only use local data | |
319 local_path = self.test_data_path(tool_id, filename) | |
320 | |
321 if result is None and (local_path is None or not os.path.exists(local_path)): | |
322 for test_data_directory in self.test_data_directories: | |
323 local_path = os.path.join(test_data_directory, filename) | |
324 if os.path.exists(local_path): | |
325 break | |
326 | |
327 if result is None and local_path is not None and os.path.exists(local_path): | |
328 if mode == 'file': | |
329 with open(local_path, mode='rb') as f: | |
330 result = f.read() | |
331 elif mode == 'directory': | |
332 # Make a copy, since we are going to clean up the returned path | |
333 path = tempfile.mkdtemp() | |
334 shutil.copytree(local_path, path) | |
335 result = path | |
336 | |
337 if result is None: | |
338 if is_output: | |
339 raise AssertionError(f"Test output file ({filename}) is missing. If you are using planemo, try adding --update_test_data to generate it.") | |
340 else: | |
341 raise AssertionError(f"Test input file ({filename}) cannot be found.") | |
342 | |
343 return result | |
344 | |
345 def __output_id(self, output_data): | |
346 # Allow data structure coming out of tools API - {id: <id>, output_name: <name>, etc...} | |
347 # or simple id as comes out of workflow API. | |
348 try: | |
349 output_id = output_data.get('id') | |
350 except AttributeError: | |
351 output_id = output_data | |
352 return output_id | |
353 | |
354 def stage_data_async(self, test_data, history_id, tool_id, force_path_paste=False, maxseconds=DEFAULT_TOOL_TEST_WAIT): | |
355 fname = test_data['fname'] | |
356 tool_input = { | |
357 "file_type": test_data['ftype'], | |
358 "dbkey": test_data['dbkey'], | |
359 } | |
360 metadata = test_data.get("metadata", {}) | |
361 if not hasattr(metadata, "items"): | |
362 raise Exception(f"Invalid metadata description found for input [{fname}] - [{metadata}]") | |
363 for name, value in test_data.get('metadata', {}).items(): | |
364 tool_input["files_metadata|%s" % name] = value | |
365 | |
366 composite_data = test_data['composite_data'] | |
367 if composite_data: | |
368 files = {} | |
369 for i, file_name in enumerate(composite_data): | |
370 if force_path_paste: | |
371 file_path = self.test_data_path(tool_id, file_name) | |
372 tool_input.update({ | |
373 "files_%d|url_paste" % i: "file://" + file_path | |
374 }) | |
375 else: | |
376 file_content = self.test_data_download(tool_id, file_name, is_output=False) | |
377 files["files_%s|file_data" % i] = file_content | |
378 tool_input.update({ | |
379 "files_%d|type" % i: "upload_dataset", | |
380 }) | |
381 name = test_data['name'] | |
382 else: | |
383 name = os.path.basename(fname) | |
384 tool_input.update({ | |
385 "files_0|NAME": name, | |
386 "files_0|type": "upload_dataset", | |
387 }) | |
388 files = {} | |
389 if force_path_paste: | |
390 file_name = self.test_data_path(tool_id, fname) | |
391 tool_input.update({ | |
392 "files_0|url_paste": "file://" + file_name | |
393 }) | |
394 else: | |
395 file_content = self.test_data_download(tool_id, fname, is_output=False) | |
396 files = { | |
397 "files_0|file_data": file_content | |
398 } | |
399 submit_response_object = self.__submit_tool(history_id, "upload1", tool_input, extra_data={"type": "upload_dataset"}, files=files) | |
400 submit_response = ensure_tool_run_response_okay(submit_response_object, "upload dataset %s" % name) | |
401 assert "outputs" in submit_response, "Invalid response from server [%s], expecting outputs in response." % submit_response | |
402 outputs = submit_response["outputs"] | |
403 assert len(outputs) > 0, "Invalid response from server [%s], expecting an output dataset." % submit_response | |
404 dataset = outputs[0] | |
405 hid = dataset['id'] | |
406 self.uploads[os.path.basename(fname)] = self.uploads[fname] = self.uploads[name] = {"src": "hda", "id": hid} | |
407 assert "jobs" in submit_response, "Invalid response from server [%s], expecting jobs in response." % submit_response | |
408 jobs = submit_response["jobs"] | |
409 assert len(jobs) > 0, "Invalid response from server [%s], expecting a job." % submit_response | |
410 return lambda: self.wait_for_job(jobs[0]["id"], history_id, maxseconds=maxseconds) | |
411 | |
412 def run_tool(self, testdef, history_id, resource_parameters=None): | |
413 # We need to handle the case where we've uploaded a valid compressed file since the upload | |
414 # tool will have uncompressed it on the fly. | |
415 resource_parameters = resource_parameters or {} | |
416 inputs_tree = testdef.inputs.copy() | |
417 for key, value in inputs_tree.items(): | |
418 values = [value] if not isinstance(value, list) else value | |
419 new_values = [] | |
420 for value in values: | |
421 if isinstance(value, TestCollectionDef): | |
422 hdca_id = self._create_collection(history_id, value) | |
423 new_values = [dict(src="hdca", id=hdca_id)] | |
424 elif value in self.uploads: | |
425 new_values.append(self.uploads[value]) | |
426 else: | |
427 new_values.append(value) | |
428 inputs_tree[key] = new_values | |
429 | |
430 if resource_parameters: | |
431 inputs_tree["__job_resource|__job_resource__select"] = "yes" | |
432 for key, value in resource_parameters.items(): | |
433 inputs_tree["__job_resource|%s" % key] = value | |
434 | |
435 # HACK: Flatten single-value lists. Required when using expand_grouping | |
436 for key, value in inputs_tree.items(): | |
437 if isinstance(value, list) and len(value) == 1: | |
438 inputs_tree[key] = value[0] | |
439 | |
440 submit_response = None | |
441 for _ in range(DEFAULT_TOOL_TEST_WAIT): | |
442 submit_response = self.__submit_tool(history_id, tool_id=testdef.tool_id, tool_input=inputs_tree) | |
443 if _are_tool_inputs_not_ready(submit_response): | |
444 print("Tool inputs not ready yet") | |
445 time.sleep(1) | |
446 continue | |
447 else: | |
448 break | |
449 | |
450 submit_response_object = ensure_tool_run_response_okay(submit_response, "execute tool", inputs_tree) | |
451 try: | |
452 return Bunch( | |
453 inputs=inputs_tree, | |
454 outputs=self.__dictify_outputs(submit_response_object), | |
455 output_collections=self.__dictify_output_collections(submit_response_object), | |
456 jobs=submit_response_object['jobs'], | |
457 ) | |
458 except KeyError: | |
459 message = "Error creating a job for these tool inputs - %s" % submit_response_object['err_msg'] | |
460 raise RunToolException(message, inputs_tree) | |
461 | |
462 def _create_collection(self, history_id, collection_def): | |
463 create_payload = dict( | |
464 name=collection_def.name, | |
465 element_identifiers=dumps(self._element_identifiers(collection_def)), | |
466 collection_type=collection_def.collection_type, | |
467 history_id=history_id, | |
468 ) | |
469 return self._post("dataset_collections", data=create_payload).json()["id"] | |
470 | |
471 def _element_identifiers(self, collection_def): | |
472 element_identifiers = [] | |
473 for element_dict in collection_def.elements: | |
474 element_identifier = element_dict["element_identifier"] | |
475 element_def = element_dict["element_definition"] | |
476 if isinstance(element_def, TestCollectionDef): | |
477 subelement_identifiers = self._element_identifiers(element_def) | |
478 element = dict( | |
479 name=element_identifier, | |
480 src="new_collection", | |
481 collection_type=element_def.collection_type, | |
482 element_identifiers=subelement_identifiers | |
483 ) | |
484 else: | |
485 element = self.uploads[element_def["value"]].copy() | |
486 element["name"] = element_identifier | |
487 tags = element_def.get("attributes").get("tags") | |
488 if tags: | |
489 element["tags"] = tags.split(",") | |
490 element_identifiers.append(element) | |
491 return element_identifiers | |
492 | |
493 def __dictify_output_collections(self, submit_response): | |
494 output_collections_dict = {} | |
495 for output_collection in submit_response['output_collections']: | |
496 output_collections_dict[output_collection.get("output_name")] = output_collection | |
497 return output_collections_dict | |
498 | |
499 def __dictify_outputs(self, datasets_object): | |
500 # Convert outputs list to a dictionary that can be accessed by | |
501 # output_name so can be more flexible about ordering of outputs | |
502 # but also allows fallback to legacy access as list mode. | |
503 outputs_dict = OutputsDict() | |
504 | |
505 for output in datasets_object['outputs']: | |
506 outputs_dict[output.get("output_name")] = output | |
507 return outputs_dict | |
508 | |
509 def output_hid(self, output_data): | |
510 return output_data['id'] | |
511 | |
512 def delete_history(self, history): | |
513 self._delete(f"histories/{history}") | |
514 | |
515 def __job_ready(self, job_id, history_id=None): | |
516 if job_id is None: | |
517 raise ValueError("__job_ready passed empty job_id") | |
518 job_json = self._get("jobs/%s" % job_id).json() | |
519 state = job_json['state'] | |
520 try: | |
521 return self._state_ready(state, error_msg="Job in error state.") | |
522 except Exception: | |
523 if VERBOSE_ERRORS and history_id is not None: | |
524 self._summarize_history(history_id) | |
525 raise | |
526 | |
527 def _summarize_history(self, history_id): | |
528 if history_id is None: | |
529 raise ValueError("_summarize_history passed empty history_id") | |
530 print("Problem in history with id %s - summary of history's datasets and jobs below." % history_id) | |
531 try: | |
532 history_contents = self.__contents(history_id) | |
533 except Exception: | |
534 print("*TEST FRAMEWORK FAILED TO FETCH HISTORY DETAILS*") | |
535 return | |
536 | |
537 for history_content in history_contents: | |
538 | |
539 dataset = history_content | |
540 | |
541 print(ERROR_MESSAGE_DATASET_SEP) | |
542 dataset_id = dataset.get('id', None) | |
543 print("| %d - %s (HID - NAME) " % (int(dataset['hid']), dataset['name'])) | |
544 if history_content['history_content_type'] == 'dataset_collection': | |
545 history_contents_json = self._get("histories/{}/contents/dataset_collections/{}".format(history_id, history_content["id"])).json() | |
546 print("| Dataset Collection: %s" % history_contents_json) | |
547 print("|") | |
548 continue | |
549 | |
550 try: | |
551 dataset_info = self._dataset_info(history_id, dataset_id) | |
552 print("| Dataset State:") | |
553 print(self.format_for_summary(dataset_info.get("state"), "Dataset state is unknown.")) | |
554 print("| Dataset Blurb:") | |
555 print(self.format_for_summary(dataset_info.get("misc_blurb", ""), "Dataset blurb was empty.")) | |
556 print("| Dataset Info:") | |
557 print(self.format_for_summary(dataset_info.get("misc_info", ""), "Dataset info is empty.")) | |
558 print("| Peek:") | |
559 print(self.format_for_summary(dataset_info.get("peek", ""), "Peek unavilable.")) | |
560 except Exception: | |
561 print("| *TEST FRAMEWORK ERROR FETCHING DATASET DETAILS*") | |
562 try: | |
563 provenance_info = self._dataset_provenance(history_id, dataset_id) | |
564 print("| Dataset Job Standard Output:") | |
565 print(self.format_for_summary(provenance_info.get("stdout", ""), "Standard output was empty.")) | |
566 print("| Dataset Job Standard Error:") | |
567 print(self.format_for_summary(provenance_info.get("stderr", ""), "Standard error was empty.")) | |
568 except Exception: | |
569 print("| *TEST FRAMEWORK ERROR FETCHING JOB DETAILS*") | |
570 print("|") | |
571 try: | |
572 jobs_json = self._get("jobs?history_id=%s" % history_id).json() | |
573 for job_json in jobs_json: | |
574 print(ERROR_MESSAGE_DATASET_SEP) | |
575 print("| Job %s" % job_json["id"]) | |
576 print("| State: ") | |
577 print(self.format_for_summary(job_json.get("state", ""), "Job state is unknown.")) | |
578 print("| Update Time:") | |
579 print(self.format_for_summary(job_json.get("update_time", ""), "Job update time is unknown.")) | |
580 print("| Create Time:") | |
581 print(self.format_for_summary(job_json.get("create_time", ""), "Job create time is unknown.")) | |
582 print("|") | |
583 print(ERROR_MESSAGE_DATASET_SEP) | |
584 except Exception: | |
585 print(ERROR_MESSAGE_DATASET_SEP) | |
586 print("*TEST FRAMEWORK FAILED TO FETCH HISTORY JOBS*") | |
587 print(ERROR_MESSAGE_DATASET_SEP) | |
588 | |
589 def format_for_summary(self, blob, empty_message, prefix="| "): | |
590 contents = "\n".join(f"{prefix}{line.strip()}" for line in io.StringIO(blob).readlines() if line.rstrip("\n\r")) | |
591 return contents or f"{prefix}*{empty_message}*" | |
592 | |
593 def _dataset_provenance(self, history_id, id): | |
594 provenance = self._get(f"histories/{history_id}/contents/{id}/provenance").json() | |
595 return provenance | |
596 | |
597 def _dataset_info(self, history_id, id): | |
598 dataset_json = self._get(f"histories/{history_id}/contents/{id}").json() | |
599 return dataset_json | |
600 | |
601 def __contents(self, history_id): | |
602 history_contents_response = self._get("histories/%s/contents" % history_id) | |
603 history_contents_response.raise_for_status() | |
604 return history_contents_response.json() | |
605 | |
606 def _state_ready(self, state_str, error_msg): | |
607 if state_str == 'ok': | |
608 return True | |
609 elif state_str == 'error': | |
610 raise Exception(error_msg) | |
611 return None | |
612 | |
613 def __submit_tool(self, history_id, tool_id, tool_input, extra_data=None, files=None): | |
614 extra_data = extra_data or {} | |
615 data = dict( | |
616 history_id=history_id, | |
617 tool_id=tool_id, | |
618 inputs=dumps(tool_input), | |
619 **extra_data | |
620 ) | |
621 return self._post("tools", files=files, data=data) | |
622 | |
623 def ensure_user_with_email(self, email, password=None): | |
624 admin_key = self.master_api_key | |
625 all_users_response = self._get('users', key=admin_key) | |
626 try: | |
627 all_users_response.raise_for_status() | |
628 except requests.exceptions.HTTPError as e: | |
629 raise Exception(f"Failed to verify user with email [{email}] exists - perhaps you're targetting the wrong Galaxy server or using an incorrect admin API key. HTTP error: {e}") | |
630 all_users = all_users_response.json() | |
631 try: | |
632 test_user = [user for user in all_users if user["email"] == email][0] | |
633 except IndexError: | |
634 username = re.sub(r"[^a-z-\d]", '--', email.lower()) | |
635 password = password or 'testpass' | |
636 # If remote user middleware is enabled - this endpoint consumes | |
637 # ``remote_user_email`` otherwise it requires ``email``, ``password`` | |
638 # and ``username``. | |
639 data = dict( | |
640 remote_user_email=email, | |
641 email=email, | |
642 password=password, | |
643 username=username, | |
644 ) | |
645 test_user = self._post('users', data, key=admin_key).json() | |
646 return test_user | |
647 | |
648 def __test_data_downloader(self, tool_id): | |
649 def test_data_download(filename, mode='file'): | |
650 return self.test_data_download(tool_id, filename, mode=mode) | |
651 return test_data_download | |
652 | |
653 def __dataset_fetcher(self, history_id): | |
654 def fetcher(hda_id, base_name=None): | |
655 url = f"histories/{history_id}/contents/{hda_id}/display?raw=true" | |
656 if base_name: | |
657 url += "&filename=%s" % base_name | |
658 response = None | |
659 for _ in range(self.download_attempts): | |
660 response = self._get(url) | |
661 if response.status_code == 500: | |
662 print(f"Retrying failed download with status code {response.status_code}") | |
663 time.sleep(self.download_sleep) | |
664 continue | |
665 else: | |
666 break | |
667 | |
668 response.raise_for_status() | |
669 return response.content | |
670 | |
671 return fetcher | |
672 | |
673 def api_key_header(self, key, admin, anon): | |
674 header = {} | |
675 if not anon: | |
676 if not key: | |
677 key = self.api_key if not admin else self.master_api_key | |
678 header['x-api-key'] = key | |
679 return header | |
680 | |
681 def _post(self, path, data=None, files=None, key=None, admin=False, anon=False, json=False): | |
682 # If json=True, use post payload using request's json parameter instead of the data | |
683 # parameter (i.e. assume the contents is a jsonified blob instead of form parameters | |
684 # with individual parameters jsonified if needed). | |
685 headers = self.api_key_header(key=key, admin=admin, anon=anon) | |
686 url = f"{self.api_url}/{path}" | |
687 return galaxy_requests_post(url, data=data, files=files, as_json=json, headers=headers) | |
688 | |
689 def _delete(self, path, data=None, key=None, admin=False, anon=False): | |
690 headers = self.api_key_header(key=key, admin=admin, anon=anon) | |
691 return requests.delete(f"{self.api_url}/{path}", params=data, headers=headers) | |
692 | |
693 def _patch(self, path, data=None, key=None, admin=False, anon=False): | |
694 headers = self.api_key_header(key=key, admin=admin, anon=anon) | |
695 return requests.patch(f"{self.api_url}/{path}", data=data, headers=headers) | |
696 | |
697 def _put(self, path, data=None, key=None, admin=False, anon=False): | |
698 headers = self.api_key_header(key=key, admin=admin, anon=anon) | |
699 return requests.put(f"{self.api_url}/{path}", data=data, headers=headers) | |
700 | |
701 def _get(self, path, data=None, key=None, admin=False, anon=False): | |
702 headers = self.api_key_header(key=key, admin=admin, anon=anon) | |
703 if path.startswith("/api"): | |
704 path = path[len("/api"):] | |
705 url = f"{self.api_url}/{path}" | |
706 # no data for GET | |
707 return requests.get(url, params=data, headers=headers) | |
708 | |
709 | |
710 def ensure_tool_run_response_okay(submit_response_object, request_desc, inputs=None): | |
711 if submit_response_object.status_code != 200: | |
712 message = None | |
713 dynamic_param_error = False | |
714 try: | |
715 err_response = submit_response_object.json() | |
716 if "param_errors" in err_response: | |
717 param_errors = err_response["param_errors"] | |
718 if "dbkey" in param_errors: | |
719 dbkey_err_obj = param_errors["dbkey"] | |
720 dbkey_val = dbkey_err_obj.get("parameter_value") | |
721 message = "Invalid dbkey specified [%s]" % dbkey_val | |
722 for value in param_errors.values(): | |
723 if isinstance(value, dict) and value.get("is_dynamic"): | |
724 dynamic_param_error = True | |
725 if message is None: | |
726 message = err_response.get("err_msg") or None | |
727 except Exception: | |
728 # invalid JSON content. | |
729 pass | |
730 if message is None: | |
731 template = "Request to %s failed - invalid JSON content returned from Galaxy server [%s]" | |
732 message = template % (request_desc, submit_response_object.text) | |
733 raise RunToolException(message, inputs, dynamic_param_error=dynamic_param_error) | |
734 submit_response = submit_response_object.json() | |
735 return submit_response | |
736 | |
737 | |
738 def _are_tool_inputs_not_ready(submit_response): | |
739 if submit_response.status_code != 400: | |
740 return False | |
741 try: | |
742 submit_json = submit_response.json() | |
743 return submit_json.get("err_code") == 400015 | |
744 except Exception: | |
745 return False | |
746 | |
747 | |
748 class RunToolException(Exception): | |
749 | |
750 def __init__(self, message, inputs=None, dynamic_param_error=False): | |
751 super().__init__(message) | |
752 self.inputs = inputs | |
753 self.dynamic_param_error = dynamic_param_error | |
754 | |
755 | |
756 # Galaxy specific methods - rest of this can be used with arbitrary files and such. | |
757 def verify_hid(filename, hda_id, attributes, test_data_downloader, hid="", dataset_fetcher=None, keep_outputs_dir=False): | |
758 assert dataset_fetcher is not None | |
759 | |
760 def verify_extra_files(extra_files): | |
761 _verify_extra_files_content(extra_files, hda_id, dataset_fetcher=dataset_fetcher, test_data_downloader=test_data_downloader, keep_outputs_dir=keep_outputs_dir) | |
762 | |
763 data = dataset_fetcher(hda_id) | |
764 item_label = "" | |
765 verify( | |
766 item_label, | |
767 data, | |
768 attributes=attributes, | |
769 filename=filename, | |
770 get_filecontent=test_data_downloader, | |
771 keep_outputs_dir=keep_outputs_dir, | |
772 verify_extra_files=verify_extra_files, | |
773 ) | |
774 | |
775 | |
776 def verify_collection(output_collection_def, data_collection, verify_dataset): | |
777 name = output_collection_def.name | |
778 | |
779 expected_collection_type = output_collection_def.collection_type | |
780 if expected_collection_type: | |
781 collection_type = data_collection["collection_type"] | |
782 if expected_collection_type != collection_type: | |
783 template = "Expected output collection [%s] to be of type [%s], was of type [%s]." | |
784 message = template % (name, expected_collection_type, collection_type) | |
785 raise AssertionError(message) | |
786 | |
787 expected_element_count = output_collection_def.count | |
788 if expected_element_count: | |
789 actual_element_count = len(data_collection["elements"]) | |
790 if expected_element_count != actual_element_count: | |
791 template = "Expected output collection [%s] to have %s elements, but it had %s." | |
792 message = template % (name, expected_element_count, actual_element_count) | |
793 raise AssertionError(message) | |
794 | |
795 def get_element(elements, id): | |
796 for element in elements: | |
797 if element["element_identifier"] == id: | |
798 return element | |
799 return False | |
800 | |
801 def verify_elements(element_objects, element_tests): | |
802 # sorted_test_ids = [None] * len(element_tests) | |
803 expected_sort_order = [] | |
804 | |
805 eo_ids = [_["element_identifier"] for _ in element_objects] | |
806 for element_identifier, element_test in element_tests.items(): | |
807 if isinstance(element_test, dict): | |
808 element_outfile, element_attrib = None, element_test | |
809 else: | |
810 element_outfile, element_attrib = element_test | |
811 if 'expected_sort_order' in element_attrib: | |
812 expected_sort_order.append(element_identifier) | |
813 | |
814 element = get_element(element_objects, element_identifier) | |
815 if not element: | |
816 template = "Failed to find identifier '%s' in the tool generated collection elements %s" | |
817 message = template % (element_identifier, eo_ids) | |
818 raise AssertionError(message) | |
819 | |
820 element_type = element["element_type"] | |
821 if element_type != "dataset_collection": | |
822 verify_dataset(element, element_attrib, element_outfile) | |
823 if element_type == "dataset_collection": | |
824 elements = element["object"]["elements"] | |
825 verify_elements(elements, element_attrib.get("elements", {})) | |
826 | |
827 if len(expected_sort_order) > 0: | |
828 i = 0 | |
829 for element_identifier in expected_sort_order: | |
830 element = None | |
831 while i < len(element_objects): | |
832 if element_objects[i]["element_identifier"] == element_identifier: | |
833 element = element_objects[i] | |
834 i += 1 | |
835 break | |
836 i += 1 | |
837 if element is None: | |
838 template = "Collection identifier '%s' found out of order, expected order of %s for the tool generated collection elements %s" | |
839 message = template % (element_identifier, expected_sort_order, eo_ids) | |
840 raise AssertionError(message) | |
841 | |
842 verify_elements(data_collection["elements"], output_collection_def.element_tests) | |
843 | |
844 | |
845 def _verify_composite_datatype_file_content(file_name, hda_id, base_name=None, attributes=None, dataset_fetcher=None, test_data_downloader=None, keep_outputs_dir=False, mode='file'): | |
846 assert dataset_fetcher is not None | |
847 | |
848 data = dataset_fetcher(hda_id, base_name) | |
849 item_label = "History item %s" % hda_id | |
850 try: | |
851 verify( | |
852 item_label, | |
853 data, | |
854 attributes=attributes, | |
855 filename=file_name, | |
856 get_filecontent=test_data_downloader, | |
857 keep_outputs_dir=keep_outputs_dir, | |
858 mode=mode, | |
859 ) | |
860 except AssertionError as err: | |
861 errmsg = f'Composite file ({base_name}) of {item_label} different than expected, difference:\n' | |
862 errmsg += util.unicodify(err) | |
863 raise AssertionError(errmsg) | |
864 | |
865 | |
866 def _verify_extra_files_content(extra_files, hda_id, dataset_fetcher, test_data_downloader, keep_outputs_dir): | |
867 files_list = [] | |
868 cleanup_directories = [] | |
869 for extra_file_dict in extra_files: | |
870 extra_file_type = extra_file_dict["type"] | |
871 extra_file_name = extra_file_dict["name"] | |
872 extra_file_attributes = extra_file_dict["attributes"] | |
873 extra_file_value = extra_file_dict["value"] | |
874 | |
875 if extra_file_type == 'file': | |
876 files_list.append((extra_file_name, extra_file_value, extra_file_attributes, extra_file_type)) | |
877 elif extra_file_type == 'directory': | |
878 extracted_path = test_data_downloader(extra_file_value, mode='directory') | |
879 cleanup_directories.append(extracted_path) | |
880 for root, directories, files in util.path.safe_walk(extracted_path): | |
881 for filename in files: | |
882 filename = os.path.join(root, filename) | |
883 filename = os.path.relpath(filename, extracted_path) | |
884 files_list.append((filename, os.path.join(extracted_path, filename), extra_file_attributes, extra_file_type)) | |
885 else: | |
886 raise ValueError('unknown extra_files type: %s' % extra_file_type) | |
887 try: | |
888 for filename, filepath, attributes, extra_file_type in files_list: | |
889 _verify_composite_datatype_file_content(filepath, hda_id, base_name=filename, attributes=attributes, dataset_fetcher=dataset_fetcher, test_data_downloader=test_data_downloader, keep_outputs_dir=keep_outputs_dir, mode=extra_file_type) | |
890 finally: | |
891 for path in cleanup_directories: | |
892 shutil.rmtree(path) | |
893 | |
894 | |
895 class NullClientTestConfig: | |
896 | |
897 def get_test_config(self, job_data): | |
898 return None | |
899 | |
900 | |
901 class DictClientTestConfig: | |
902 | |
903 def __init__(self, tools): | |
904 self._tools = tools or {} | |
905 | |
906 def get_test_config(self, job_data): | |
907 # TODO: allow short ids, allow versions below outer id instead of key concatenation. | |
908 tool_id = job_data.get("tool_id") | |
909 tool_version = job_data.get("tool_version") | |
910 tool_test_config = None | |
911 tool_version_test_config = None | |
912 is_default = False | |
913 if tool_id in self._tools: | |
914 tool_test_config = self._tools[tool_id] | |
915 if tool_test_config is None: | |
916 tool_id = f"{tool_id}/{tool_version}" | |
917 if tool_id in self._tools: | |
918 tool_version_test_config = self._tools[tool_id] | |
919 else: | |
920 if tool_version in tool_test_config: | |
921 tool_version_test_config = tool_test_config[tool_version] | |
922 elif "default" in tool_test_config: | |
923 tool_version_test_config = tool_test_config["default"] | |
924 is_default = True | |
925 | |
926 if tool_version_test_config: | |
927 test_index = job_data.get("test_index") | |
928 if test_index in tool_version_test_config: | |
929 return tool_version_test_config[test_index] | |
930 elif str(test_index) in tool_version_test_config: | |
931 return tool_version_test_config[str(test_index)] | |
932 if 'default' in tool_version_test_config: | |
933 return tool_version_test_config['default'] | |
934 elif is_default: | |
935 return tool_version_test_config | |
936 return None | |
937 | |
938 | |
939 def verify_tool(tool_id, | |
940 galaxy_interactor, | |
941 resource_parameters=None, | |
942 register_job_data=None, | |
943 test_index=0, | |
944 tool_version=None, | |
945 quiet=False, | |
946 test_history=None, | |
947 no_history_cleanup=False, | |
948 publish_history=False, | |
949 force_path_paste=False, | |
950 maxseconds=DEFAULT_TOOL_TEST_WAIT, | |
951 tool_test_dicts=None, | |
952 client_test_config=None, | |
953 skip_with_reference_data=False, | |
954 skip_on_dynamic_param_errors=False): | |
955 if resource_parameters is None: | |
956 resource_parameters = {} | |
957 if client_test_config is None: | |
958 client_test_config = NullClientTestConfig() | |
959 tool_test_dicts = tool_test_dicts or galaxy_interactor.get_tool_tests(tool_id, tool_version=tool_version) | |
960 tool_test_dict = tool_test_dicts[test_index] | |
961 if "test_index" not in tool_test_dict: | |
962 tool_test_dict["test_index"] = test_index | |
963 if "tool_id" not in tool_test_dict: | |
964 tool_test_dict["tool_id"] = tool_id | |
965 if tool_version is None and "tool_version" in tool_test_dict: | |
966 tool_version = tool_test_dict.get("tool_version") | |
967 | |
968 job_data = { | |
969 "tool_id": tool_id, | |
970 "tool_version": tool_version, | |
971 "test_index": test_index, | |
972 } | |
973 client_config = client_test_config.get_test_config(job_data) | |
974 skip_message = None | |
975 if client_config is not None: | |
976 job_data.update(client_config) | |
977 skip_message = job_data.get("skip") | |
978 | |
979 if not skip_message and skip_with_reference_data: | |
980 required_data_tables = tool_test_dict.get("required_data_tables") | |
981 required_loc_files = tool_test_dict.get("required_loc_files") | |
982 # TODO: actually hit the API and see if these tables are available. | |
983 if required_data_tables: | |
984 skip_message = f"Skipping test because of required data tables ({required_data_tables})" | |
985 if required_loc_files: | |
986 skip_message = f"Skipping test because of required loc files ({required_loc_files})" | |
987 | |
988 if skip_message: | |
989 job_data["status"] = "skip" | |
990 register_job_data(job_data) | |
991 return | |
992 | |
993 tool_test_dict.setdefault('maxseconds', maxseconds) | |
994 testdef = ToolTestDescription(tool_test_dict) | |
995 _handle_def_errors(testdef) | |
996 | |
997 created_history = False | |
998 if test_history is None: | |
999 created_history = True | |
1000 history_name = f"Tool Test History for {tool_id}/{tool_version}-{test_index}" | |
1001 test_history = galaxy_interactor.new_history(history_name=history_name, publish_history=publish_history) | |
1002 | |
1003 # Upload data to test_history, run the tool and check the outputs - record | |
1004 # API input, job info, tool run exception, as well as exceptions related to | |
1005 # job output checking and register they with the test plugin so it can | |
1006 # record structured information. | |
1007 tool_inputs = None | |
1008 job_stdio = None | |
1009 job_output_exceptions = None | |
1010 tool_execution_exception = None | |
1011 input_staging_exception = None | |
1012 expected_failure_occurred = False | |
1013 begin_time = time.time() | |
1014 try: | |
1015 try: | |
1016 stage_data_in_history( | |
1017 galaxy_interactor, | |
1018 tool_id, | |
1019 testdef.test_data(), | |
1020 history=test_history, | |
1021 force_path_paste=force_path_paste, | |
1022 maxseconds=maxseconds, | |
1023 ) | |
1024 except Exception as e: | |
1025 input_staging_exception = e | |
1026 raise | |
1027 try: | |
1028 tool_response = galaxy_interactor.run_tool(testdef, test_history, resource_parameters=resource_parameters) | |
1029 data_list, jobs, tool_inputs = tool_response.outputs, tool_response.jobs, tool_response.inputs | |
1030 data_collection_list = tool_response.output_collections | |
1031 except RunToolException as e: | |
1032 tool_inputs = e.inputs | |
1033 tool_execution_exception = e | |
1034 if not testdef.expect_failure: | |
1035 raise e | |
1036 else: | |
1037 expected_failure_occurred = True | |
1038 except Exception as e: | |
1039 tool_execution_exception = e | |
1040 raise e | |
1041 | |
1042 if not expected_failure_occurred: | |
1043 assert data_list or data_collection_list | |
1044 | |
1045 try: | |
1046 job_stdio = _verify_outputs(testdef, test_history, jobs, tool_id, data_list, data_collection_list, galaxy_interactor, quiet=quiet) | |
1047 except JobOutputsError as e: | |
1048 job_stdio = e.job_stdio | |
1049 job_output_exceptions = e.output_exceptions | |
1050 raise e | |
1051 except Exception as e: | |
1052 job_output_exceptions = [e] | |
1053 raise e | |
1054 finally: | |
1055 if register_job_data is not None: | |
1056 end_time = time.time() | |
1057 job_data["time_seconds"] = end_time - begin_time | |
1058 if tool_inputs is not None: | |
1059 job_data["inputs"] = tool_inputs | |
1060 if job_stdio is not None: | |
1061 job_data["job"] = job_stdio | |
1062 status = "success" | |
1063 if job_output_exceptions: | |
1064 job_data["output_problems"] = [util.unicodify(_) for _ in job_output_exceptions] | |
1065 status = "failure" | |
1066 if tool_execution_exception: | |
1067 job_data["execution_problem"] = util.unicodify(tool_execution_exception) | |
1068 dynamic_param_error = getattr(tool_execution_exception, "dynamic_param_error", False) | |
1069 job_data["dynamic_param_error"] = dynamic_param_error | |
1070 status = "error" if not skip_on_dynamic_param_errors or not dynamic_param_error else "skip" | |
1071 if input_staging_exception: | |
1072 job_data["execution_problem"] = "Input staging problem: %s" % util.unicodify(input_staging_exception) | |
1073 status = "error" | |
1074 job_data["status"] = status | |
1075 register_job_data(job_data) | |
1076 | |
1077 if created_history and not no_history_cleanup: | |
1078 galaxy_interactor.delete_history(test_history) | |
1079 | |
1080 | |
1081 def _handle_def_errors(testdef): | |
1082 # If the test generation had an error, raise | |
1083 if testdef.error: | |
1084 if testdef.exception: | |
1085 if isinstance(testdef.exception, Exception): | |
1086 raise testdef.exception | |
1087 else: | |
1088 raise Exception(testdef.exception) | |
1089 else: | |
1090 raise Exception("Test parse failure") | |
1091 | |
1092 | |
1093 def _verify_outputs(testdef, history, jobs, tool_id, data_list, data_collection_list, galaxy_interactor, quiet=False): | |
1094 assert len(jobs) == 1, "Test framework logic error, somehow tool test resulted in more than one job." | |
1095 job = jobs[0] | |
1096 | |
1097 maxseconds = testdef.maxseconds | |
1098 if testdef.num_outputs is not None: | |
1099 expected = testdef.num_outputs | |
1100 actual = len(data_list) + len(data_collection_list) | |
1101 if expected != actual: | |
1102 message_template = "Incorrect number of outputs - expected %d, found %s." | |
1103 message = message_template % (expected, actual) | |
1104 raise Exception(message) | |
1105 found_exceptions = [] | |
1106 | |
1107 def register_exception(e): | |
1108 if not found_exceptions and not quiet: | |
1109 # Only print this stuff out once. | |
1110 for stream in ['stdout', 'stderr']: | |
1111 if stream in job_stdio: | |
1112 print(_format_stream(job_stdio[stream], stream=stream, format=True), file=sys.stderr) | |
1113 found_exceptions.append(e) | |
1114 | |
1115 if testdef.expect_failure: | |
1116 if testdef.outputs: | |
1117 raise Exception("Cannot specify outputs in a test expecting failure.") | |
1118 | |
1119 # Wait for the job to complete and register expections if the final | |
1120 # status was not what test was expecting. | |
1121 job_failed = False | |
1122 try: | |
1123 galaxy_interactor.wait_for_job(job['id'], history, maxseconds) | |
1124 except Exception as e: | |
1125 job_failed = True | |
1126 if not testdef.expect_failure: | |
1127 found_exceptions.append(e) | |
1128 | |
1129 job_stdio = galaxy_interactor.get_job_stdio(job['id']) | |
1130 | |
1131 if not job_failed and testdef.expect_failure: | |
1132 error = AssertionError("Expected job to fail but Galaxy indicated the job successfully completed.") | |
1133 register_exception(error) | |
1134 | |
1135 expect_exit_code = testdef.expect_exit_code | |
1136 if expect_exit_code is not None: | |
1137 exit_code = job_stdio["exit_code"] | |
1138 if str(expect_exit_code) != str(exit_code): | |
1139 error = AssertionError(f"Expected job to complete with exit code {expect_exit_code}, found {exit_code}") | |
1140 register_exception(error) | |
1141 | |
1142 for output_index, output_dict in enumerate(testdef.outputs): | |
1143 # Get the correct hid | |
1144 name = output_dict["name"] | |
1145 outfile = output_dict["value"] | |
1146 attributes = output_dict["attributes"] | |
1147 output_testdef = Bunch(name=name, outfile=outfile, attributes=attributes) | |
1148 try: | |
1149 output_data = data_list[name] | |
1150 except (TypeError, KeyError): | |
1151 # Legacy - fall back on ordered data list access if data_list is | |
1152 # just a list (case with twill variant or if output changes its | |
1153 # name). | |
1154 if hasattr(data_list, "values"): | |
1155 output_data = list(data_list.values())[output_index] | |
1156 else: | |
1157 output_data = data_list[len(data_list) - len(testdef.outputs) + output_index] | |
1158 assert output_data is not None | |
1159 try: | |
1160 galaxy_interactor.verify_output(history, jobs, output_data, output_testdef=output_testdef, tool_id=tool_id, maxseconds=maxseconds) | |
1161 except Exception as e: | |
1162 register_exception(e) | |
1163 | |
1164 other_checks = { | |
1165 "command_line": "Command produced by the job", | |
1166 "command_version": "Tool version indicated during job execution", | |
1167 "stdout": "Standard output of the job", | |
1168 "stderr": "Standard error of the job", | |
1169 } | |
1170 # TODO: Only hack the stdio like this for older profile, for newer tool profiles | |
1171 # add some syntax for asserting job messages maybe - or just drop this because exit | |
1172 # code and regex on stdio can be tested directly - so this is really testing Galaxy | |
1173 # core handling more than the tool. | |
1174 job_messages = job_stdio.get("job_messages") or [] | |
1175 stdout_prefix = "" | |
1176 stderr_prefix = "" | |
1177 for job_message in job_messages: | |
1178 message_type = job_message.get("type") | |
1179 if message_type == "regex" and job_message.get("stream") == "stderr": | |
1180 stderr_prefix += (job_message.get("desc") or '') + "\n" | |
1181 elif message_type == "regex" and job_message.get("stream") == "stdout": | |
1182 stdout_prefix += (job_message.get("desc") or '') + "\n" | |
1183 elif message_type == "exit_code": | |
1184 stderr_prefix += (job_message.get("desc") or '') + "\n" | |
1185 else: | |
1186 raise Exception(f"Unknown job message type [{message_type}] in [{job_message}]") | |
1187 | |
1188 for what, description in other_checks.items(): | |
1189 if getattr(testdef, what, None) is not None: | |
1190 try: | |
1191 raw_data = job_stdio[what] | |
1192 assertions = getattr(testdef, what) | |
1193 if what == "stdout": | |
1194 data = stdout_prefix + raw_data | |
1195 elif what == "stderr": | |
1196 data = stderr_prefix + raw_data | |
1197 else: | |
1198 data = raw_data | |
1199 verify_assertions(data, assertions) | |
1200 except AssertionError as err: | |
1201 errmsg = '%s different than expected\n' % description | |
1202 errmsg += util.unicodify(err) | |
1203 register_exception(AssertionError(errmsg)) | |
1204 | |
1205 for output_collection_def in testdef.output_collections: | |
1206 try: | |
1207 name = output_collection_def.name | |
1208 # TODO: data_collection_list is clearly a bad name for dictionary. | |
1209 if name not in data_collection_list: | |
1210 template = "Failed to find output [%s], tool outputs include [%s]" | |
1211 message = template % (name, ",".join(data_collection_list.keys())) | |
1212 raise AssertionError(message) | |
1213 | |
1214 # Data collection returned from submission, elements may have been populated after | |
1215 # the job completed so re-hit the API for more information. | |
1216 data_collection_id = data_collection_list[name]["id"] | |
1217 galaxy_interactor.verify_output_collection(output_collection_def, data_collection_id, history, tool_id) | |
1218 except Exception as e: | |
1219 register_exception(e) | |
1220 | |
1221 if found_exceptions: | |
1222 raise JobOutputsError(found_exceptions, job_stdio) | |
1223 else: | |
1224 return job_stdio | |
1225 | |
1226 | |
1227 def _format_stream(output, stream, format): | |
1228 output = output or '' | |
1229 if format: | |
1230 msg = "---------------------- >> begin tool %s << -----------------------\n" % stream | |
1231 msg += output + "\n" | |
1232 msg += "----------------------- >> end tool %s << ------------------------\n" % stream | |
1233 else: | |
1234 msg = output | |
1235 return msg | |
1236 | |
1237 | |
1238 class JobOutputsError(AssertionError): | |
1239 | |
1240 def __init__(self, output_exceptions, job_stdio): | |
1241 big_message = "\n".join(map(util.unicodify, output_exceptions)) | |
1242 super().__init__(big_message) | |
1243 self.job_stdio = job_stdio | |
1244 self.output_exceptions = output_exceptions | |
1245 | |
1246 | |
1247 class ToolTestDescription: | |
1248 """ | |
1249 Encapsulates information about a tool test, and allows creation of a | |
1250 dynamic TestCase class (the unittest framework is very class oriented, | |
1251 doing dynamic tests in this way allows better integration) | |
1252 """ | |
1253 | |
1254 def __init__(self, processed_test_dict): | |
1255 assert "test_index" in processed_test_dict, "Invalid processed test description, must have a 'test_index' for naming, etc.." | |
1256 test_index = processed_test_dict["test_index"] | |
1257 name = processed_test_dict.get('name', 'Test-%d' % (test_index + 1)) | |
1258 maxseconds = processed_test_dict.get('maxseconds', DEFAULT_TOOL_TEST_WAIT) | |
1259 if maxseconds is not None: | |
1260 maxseconds = int(maxseconds) | |
1261 | |
1262 self.test_index = test_index | |
1263 assert "tool_id" in processed_test_dict, "Invalid processed test description, must have a 'tool_id' for naming, etc.." | |
1264 self.tool_id = processed_test_dict["tool_id"] | |
1265 self.tool_version = processed_test_dict.get("tool_version") | |
1266 self.name = name | |
1267 self.maxseconds = maxseconds | |
1268 self.required_files = processed_test_dict.get("required_files", []) | |
1269 self.required_data_tables = processed_test_dict.get("required_data_tables", []) | |
1270 self.required_loc_files = processed_test_dict.get("required_loc_files", []) | |
1271 | |
1272 inputs = processed_test_dict.get("inputs", {}) | |
1273 loaded_inputs = {} | |
1274 for key, value in inputs.items(): | |
1275 if isinstance(value, dict) and value.get("model_class"): | |
1276 loaded_inputs[key] = TestCollectionDef.from_dict(value) | |
1277 else: | |
1278 loaded_inputs[key] = value | |
1279 | |
1280 self.inputs = loaded_inputs | |
1281 self.outputs = processed_test_dict.get("outputs", []) | |
1282 self.num_outputs = processed_test_dict.get("num_outputs", None) | |
1283 | |
1284 self.error = processed_test_dict.get("error", False) | |
1285 self.exception = processed_test_dict.get("exception", None) | |
1286 | |
1287 self.output_collections = [TestCollectionOutputDef.from_dict(d) for d in processed_test_dict.get("output_collections", [])] | |
1288 self.command_line = processed_test_dict.get("command_line", None) | |
1289 self.command_version = processed_test_dict.get("command_version", None) | |
1290 self.stdout = processed_test_dict.get("stdout", None) | |
1291 self.stderr = processed_test_dict.get("stderr", None) | |
1292 self.expect_exit_code = processed_test_dict.get("expect_exit_code", None) | |
1293 self.expect_failure = processed_test_dict.get("expect_failure", False) | |
1294 | |
1295 def test_data(self): | |
1296 """ | |
1297 Iterator over metadata representing the required files for upload. | |
1298 """ | |
1299 return test_data_iter(self.required_files) | |
1300 | |
1301 def to_dict(self): | |
1302 inputs_dict = {} | |
1303 for key, value in self.inputs.items(): | |
1304 if hasattr(value, "to_dict"): | |
1305 inputs_dict[key] = value.to_dict() | |
1306 else: | |
1307 inputs_dict[key] = value | |
1308 | |
1309 return { | |
1310 "inputs": inputs_dict, | |
1311 "outputs": self.outputs, | |
1312 "output_collections": [_.to_dict() for _ in self.output_collections], | |
1313 "num_outputs": self.num_outputs, | |
1314 "command_line": self.command_line, | |
1315 "command_version": self.command_version, | |
1316 "stdout": self.stdout, | |
1317 "stderr": self.stderr, | |
1318 "expect_exit_code": self.expect_exit_code, | |
1319 "expect_failure": self.expect_failure, | |
1320 "name": self.name, | |
1321 "test_index": self.test_index, | |
1322 "tool_id": self.tool_id, | |
1323 "tool_version": self.tool_version, | |
1324 "required_files": self.required_files, | |
1325 "required_data_tables": self.required_data_tables, | |
1326 "required_loc_files": self.required_loc_files, | |
1327 "error": self.error, | |
1328 "exception": self.exception, | |
1329 } | |
1330 | |
1331 | |
1332 @nottest | |
1333 def test_data_iter(required_files): | |
1334 for fname, extra in required_files: | |
1335 data_dict = dict( | |
1336 fname=fname, | |
1337 metadata=extra.get('metadata', {}), | |
1338 composite_data=extra.get('composite_data', []), | |
1339 ftype=extra.get('ftype', DEFAULT_FTYPE), | |
1340 dbkey=extra.get('dbkey', DEFAULT_DBKEY), | |
1341 ) | |
1342 edit_attributes = extra.get('edit_attributes', []) | |
1343 | |
1344 # currently only renaming is supported | |
1345 for edit_att in edit_attributes: | |
1346 if edit_att.get('type', None) == 'name': | |
1347 new_name = edit_att.get('value', None) | |
1348 assert new_name, 'You must supply the new dataset name as the value tag of the edit_attributes tag' | |
1349 data_dict['name'] = new_name | |
1350 else: | |
1351 raise Exception('edit_attributes type (%s) is unimplemented' % edit_att.get('type', None)) | |
1352 | |
1353 yield data_dict | |
1354 | |
1355 | |
1356 def galaxy_requests_post(url, data=None, files=None, as_json=False, params=None, headers=None): | |
1357 """Handle some Galaxy conventions and work around requests issues. | |
1358 | |
1359 This is admittedly kind of hacky, so the interface may change frequently - be | |
1360 careful on reuse. | |
1361 | |
1362 If ``as_json`` is True, use post payload using request's json parameter instead | |
1363 of the data parameter (i.e. assume the contents is a json-ified blob instead of | |
1364 form parameters with individual parameters json-ified if needed). requests doesn't | |
1365 allow files to be specified with the json parameter - so rewrite the parameters | |
1366 to handle that if as_json is True with specified files. | |
1367 """ | |
1368 params = params or {} | |
1369 data = data or {} | |
1370 | |
1371 # handle encoded files | |
1372 if files is None: | |
1373 # if not explicitly passed, check __files... convention used in tool testing | |
1374 # and API testing code | |
1375 files = data.get("__files", None) | |
1376 if files is not None: | |
1377 del data["__files"] | |
1378 | |
1379 # files doesn't really work with json, so dump the parameters | |
1380 # and do a normal POST with request's data parameter. | |
1381 if bool(files) and as_json: | |
1382 as_json = False | |
1383 new_items = {} | |
1384 for key, val in data.items(): | |
1385 if isinstance(val, dict) or isinstance(val, list): | |
1386 new_items[key] = dumps(val) | |
1387 data.update(new_items) | |
1388 | |
1389 kwd = { | |
1390 'files': files, | |
1391 } | |
1392 if headers: | |
1393 kwd['headers'] = headers | |
1394 if as_json: | |
1395 kwd['json'] = data | |
1396 kwd['params'] = params | |
1397 else: | |
1398 data.update(params) | |
1399 kwd['data'] = data | |
1400 | |
1401 return requests.post(url, **kwd) |