comparison env/lib/python3.9/site-packages/galaxy/tool_util/verify/interactor.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 import io
2 import json
3 import os
4 import re
5 import shutil
6 import sys
7 import tarfile
8 import tempfile
9 import time
10 import zipfile
11 from json import dumps
12 from logging import getLogger
13
14 import requests
15 from packaging.version import parse as parse_version, Version
16 try:
17 from nose.tools import nottest
18 except ImportError:
19 def nottest(x):
20 return x
21
22 from galaxy import util
23 from galaxy.tool_util.parser.interface import TestCollectionDef, TestCollectionOutputDef
24 from galaxy.util.bunch import Bunch
25 from . import verify
26 from .asserts import verify_assertions
27 from .wait import wait_on
28
29 log = getLogger(__name__)
30
31 # Off by default because it can pound the database pretty heavily
32 # and result in sqlite errors on larger tests or larger numbers of
33 # tests.
34 VERBOSE_ERRORS = util.asbool(os.environ.get("GALAXY_TEST_VERBOSE_ERRORS", False))
35 UPLOAD_ASYNC = util.asbool(os.environ.get("GALAXY_TEST_UPLOAD_ASYNC", True))
36 ERROR_MESSAGE_DATASET_SEP = "--------------------------------------"
37 DEFAULT_TOOL_TEST_WAIT = int(os.environ.get("GALAXY_TEST_DEFAULT_WAIT", 86400))
38
39 DEFAULT_FTYPE = 'auto'
40 # This following default dbkey was traditionally hg17 before Galaxy 18.05,
41 # restore this behavior by setting GALAXY_TEST_DEFAULT_DBKEY to hg17.
42 DEFAULT_DBKEY = os.environ.get("GALAXY_TEST_DEFAULT_DBKEY", "?")
43
44
45 class OutputsDict(dict):
46 """Ordered dict that can also be accessed by index.
47
48 >>> out = OutputsDict()
49 >>> out['item1'] = 1
50 >>> out['item2'] = 2
51 >>> out[1] == 2 == out['item2']
52 True
53 """
54
55 def __getitem__(self, item):
56 if isinstance(item, int):
57 return self[list(self.keys())[item]]
58 else:
59 return super().__getitem__(item)
60
61
62 def stage_data_in_history(galaxy_interactor, tool_id, all_test_data, history=None, force_path_paste=False, maxseconds=DEFAULT_TOOL_TEST_WAIT):
63 # Upload any needed files
64 upload_waits = []
65
66 assert tool_id
67
68 if UPLOAD_ASYNC:
69 for test_data in all_test_data:
70 upload_waits.append(galaxy_interactor.stage_data_async(test_data,
71 history,
72 tool_id,
73 force_path_paste=force_path_paste,
74 maxseconds=maxseconds))
75 for upload_wait in upload_waits:
76 upload_wait()
77 else:
78 for test_data in all_test_data:
79 upload_wait = galaxy_interactor.stage_data_async(test_data,
80 history,
81 tool_id,
82 force_path_paste=force_path_paste,
83 maxseconds=maxseconds)
84 upload_wait()
85
86
87 class GalaxyInteractorApi:
88
89 def __init__(self, **kwds):
90 self.api_url = "%s/api" % kwds["galaxy_url"].rstrip("/")
91 self.master_api_key = kwds["master_api_key"]
92 self.api_key = self.__get_user_key(kwds.get("api_key"), kwds.get("master_api_key"), test_user=kwds.get("test_user"))
93 if kwds.get('user_api_key_is_admin_key', False):
94 self.master_api_key = self.api_key
95 self.keep_outputs_dir = kwds["keep_outputs_dir"]
96 self.download_attempts = kwds.get("download_attempts", 1)
97 self.download_sleep = kwds.get("download_sleep", 1)
98 # Local test data directories.
99 self.test_data_directories = kwds.get("test_data") or []
100
101 self._target_galaxy_version = None
102
103 self.uploads = {}
104
105 @property
106 def target_galaxy_version(self):
107 if self._target_galaxy_version is None:
108 self._target_galaxy_version = parse_version(self._get('version').json()['version_major'])
109 return self._target_galaxy_version
110
111 @property
112 def supports_test_data_download(self):
113 return self.target_galaxy_version >= Version("19.01")
114
115 def __get_user_key(self, user_key, admin_key, test_user=None):
116 if not test_user:
117 test_user = "test@bx.psu.edu"
118 if user_key:
119 return user_key
120 test_user = self.ensure_user_with_email(test_user)
121 return self._post("users/%s/api_key" % test_user['id'], key=admin_key).json()
122
123 # def get_tools(self):
124 # response = self._get("tools?in_panel=false")
125 # assert response.status_code == 200, "Non 200 response from tool index API. [%s]" % response.content
126 # return response.json()
127
128 def get_tests_summary(self):
129 response = self._get("tools/tests_summary")
130 assert response.status_code == 200, "Non 200 response from tool tests available API. [%s]" % response.content
131 return response.json()
132
133 def get_tool_tests(self, tool_id, tool_version=None):
134 url = "tools/%s/test_data" % tool_id
135 if tool_version is not None:
136 url += "?tool_version=%s" % tool_version
137 response = self._get(url)
138 assert response.status_code == 200, "Non 200 response from tool test API. [%s]" % response.content
139 return response.json()
140
141 def verify_output_collection(self, output_collection_def, output_collection_id, history, tool_id):
142 data_collection = self._get("dataset_collections/%s" % output_collection_id, data={"instance_type": "history"}).json()
143
144 def verify_dataset(element, element_attrib, element_outfile):
145 hda = element["object"]
146 self.verify_output_dataset(
147 history,
148 hda_id=hda["id"],
149 outfile=element_outfile,
150 attributes=element_attrib,
151 tool_id=tool_id
152 )
153
154 verify_collection(output_collection_def, data_collection, verify_dataset)
155
156 def verify_output(self, history_id, jobs, output_data, output_testdef, tool_id, maxseconds):
157 outfile = output_testdef.outfile
158 attributes = output_testdef.attributes
159 name = output_testdef.name
160
161 self.wait_for_jobs(history_id, jobs, maxseconds)
162 hid = self.__output_id(output_data)
163 # TODO: Twill version verifies dataset is 'ok' in here.
164 try:
165 self.verify_output_dataset(history_id=history_id, hda_id=hid, outfile=outfile, attributes=attributes, tool_id=tool_id)
166 except AssertionError as e:
167 raise AssertionError("Output {}: {}".format(name, str(e)))
168
169 primary_datasets = attributes.get('primary_datasets', {})
170 if primary_datasets:
171 job_id = self._dataset_provenance(history_id, hid)["job_id"]
172 outputs = self._get("jobs/%s/outputs" % (job_id)).json()
173
174 for designation, (primary_outfile, primary_attributes) in primary_datasets.items():
175 primary_output = None
176 for output in outputs:
177 if output["name"] == f'__new_primary_file_{name}|{designation}__':
178 primary_output = output
179 break
180
181 if not primary_output:
182 msg_template = "Failed to find primary dataset with designation [%s] for output with name [%s]"
183 msg_args = (designation, name)
184 raise Exception(msg_template % msg_args)
185
186 primary_hda_id = primary_output["dataset"]["id"]
187 try:
188 self.verify_output_dataset(history_id, primary_hda_id, primary_outfile, primary_attributes, tool_id=tool_id)
189 except AssertionError as e:
190 raise AssertionError("Primary output {}: {}".format(name, str(e)))
191
192 def wait_for_jobs(self, history_id, jobs, maxseconds):
193 for job in jobs:
194 self.wait_for_job(job['id'], history_id, maxseconds)
195
196 def verify_output_dataset(self, history_id, hda_id, outfile, attributes, tool_id):
197 fetcher = self.__dataset_fetcher(history_id)
198 test_data_downloader = self.__test_data_downloader(tool_id)
199 verify_hid(
200 outfile,
201 hda_id=hda_id,
202 attributes=attributes,
203 dataset_fetcher=fetcher,
204 test_data_downloader=test_data_downloader,
205 keep_outputs_dir=self.keep_outputs_dir
206 )
207 self._verify_metadata(history_id, hda_id, attributes)
208
209 def _verify_metadata(self, history_id, hid, attributes):
210 """Check dataset metadata.
211
212 ftype on output maps to `file_ext` on the hda's API description, `name`, `info`,
213 `dbkey` and `tags` all map to the API description directly. Other metadata attributes
214 are assumed to be datatype-specific and mapped with a prefix of `metadata_`.
215 """
216 metadata = attributes.get('metadata', {}).copy()
217 for key in metadata.copy().keys():
218 if key not in ['name', 'info', 'tags', 'created_from_basename']:
219 new_key = "metadata_%s" % key
220 metadata[new_key] = metadata[key]
221 del metadata[key]
222 elif key == "info":
223 metadata["misc_info"] = metadata["info"]
224 del metadata["info"]
225 expected_file_type = attributes.get('ftype', None)
226 if expected_file_type:
227 metadata["file_ext"] = expected_file_type
228
229 if metadata:
230 time.sleep(5)
231 dataset = self._get(f"histories/{history_id}/contents/{hid}").json()
232 for key, value in metadata.items():
233 try:
234 dataset_value = dataset.get(key, None)
235
236 def compare(val, expected):
237 if str(val) != str(expected):
238 msg = "Dataset metadata verification for [%s] failed, expected [%s] but found [%s]. Dataset API value was [%s]."
239 msg_params = (key, value, dataset_value, dataset)
240 msg = msg % msg_params
241 raise Exception(msg)
242
243 if isinstance(dataset_value, list):
244 value = str(value).split(",")
245 if len(value) != len(dataset_value):
246 msg = "Dataset metadata verification for [%s] failed, expected [%s] but found [%s], lists differ in length. Dataset API value was [%s]."
247 msg_params = (key, value, dataset_value, dataset)
248 msg = msg % msg_params
249 raise Exception(msg)
250 for val, expected in zip(dataset_value, value):
251 compare(val, expected)
252 else:
253 compare(dataset_value, value)
254 except KeyError:
255 msg = "Failed to verify dataset metadata, metadata key [%s] was not found." % key
256 raise Exception(msg)
257
258 def wait_for_job(self, job_id, history_id=None, maxseconds=DEFAULT_TOOL_TEST_WAIT):
259 self.wait_for(lambda: self.__job_ready(job_id, history_id), maxseconds=maxseconds)
260
261 def wait_for(self, func, what='tool test run', **kwd):
262 walltime_exceeded = int(kwd.get("maxseconds", DEFAULT_TOOL_TEST_WAIT))
263 wait_on(func, what, walltime_exceeded)
264
265 def get_job_stdio(self, job_id):
266 job_stdio = self.__get_job_stdio(job_id).json()
267 return job_stdio
268
269 def __get_job(self, job_id):
270 return self._get('jobs/%s' % job_id)
271
272 def __get_job_stdio(self, job_id):
273 return self._get('jobs/%s?full=true' % job_id)
274
275 def new_history(self, history_name='test_history', publish_history=False):
276 create_response = self._post("histories", {"name": history_name})
277 try:
278 create_response.raise_for_status()
279 except Exception as e:
280 raise Exception(f"Error occured while creating history with name '{history_name}': {e}")
281 history_id = create_response.json()['id']
282 if publish_history:
283 self.publish_history(history_id)
284 return history_id
285
286 def publish_history(self, history_id):
287 response = self._put(f'histories/{history_id}', json.dumps({'published': True}))
288 response.raise_for_status()
289
290 @nottest
291 def test_data_path(self, tool_id, filename):
292 response = self._get(f"tools/{tool_id}/test_data_path?filename={filename}", admin=True)
293 return response.json()
294
295 @nottest
296 def test_data_download(self, tool_id, filename, mode='file', is_output=True):
297 result = None
298 local_path = None
299
300 if self.supports_test_data_download:
301 response = self._get(f"tools/{tool_id}/test_data_download?filename={filename}", admin=True)
302 if response.status_code == 200:
303 if mode == 'file':
304 result = response.content
305 elif mode == 'directory':
306 prefix = os.path.basename(filename)
307 path = tempfile.mkdtemp(prefix=prefix)
308 fileobj = io.BytesIO(response.content)
309 if zipfile.is_zipfile(fileobj):
310 with zipfile.ZipFile(fileobj) as contents:
311 contents.extractall(path=path)
312 else:
313 # Galaxy < 21.01
314 with tarfile.open(fileobj=fileobj) as tar_contents:
315 tar_contents.extractall(path=path)
316 result = path
317 else:
318 # We can only use local data
319 local_path = self.test_data_path(tool_id, filename)
320
321 if result is None and (local_path is None or not os.path.exists(local_path)):
322 for test_data_directory in self.test_data_directories:
323 local_path = os.path.join(test_data_directory, filename)
324 if os.path.exists(local_path):
325 break
326
327 if result is None and local_path is not None and os.path.exists(local_path):
328 if mode == 'file':
329 with open(local_path, mode='rb') as f:
330 result = f.read()
331 elif mode == 'directory':
332 # Make a copy, since we are going to clean up the returned path
333 path = tempfile.mkdtemp()
334 shutil.copytree(local_path, path)
335 result = path
336
337 if result is None:
338 if is_output:
339 raise AssertionError(f"Test output file ({filename}) is missing. If you are using planemo, try adding --update_test_data to generate it.")
340 else:
341 raise AssertionError(f"Test input file ({filename}) cannot be found.")
342
343 return result
344
345 def __output_id(self, output_data):
346 # Allow data structure coming out of tools API - {id: <id>, output_name: <name>, etc...}
347 # or simple id as comes out of workflow API.
348 try:
349 output_id = output_data.get('id')
350 except AttributeError:
351 output_id = output_data
352 return output_id
353
354 def stage_data_async(self, test_data, history_id, tool_id, force_path_paste=False, maxseconds=DEFAULT_TOOL_TEST_WAIT):
355 fname = test_data['fname']
356 tool_input = {
357 "file_type": test_data['ftype'],
358 "dbkey": test_data['dbkey'],
359 }
360 metadata = test_data.get("metadata", {})
361 if not hasattr(metadata, "items"):
362 raise Exception(f"Invalid metadata description found for input [{fname}] - [{metadata}]")
363 for name, value in test_data.get('metadata', {}).items():
364 tool_input["files_metadata|%s" % name] = value
365
366 composite_data = test_data['composite_data']
367 if composite_data:
368 files = {}
369 for i, file_name in enumerate(composite_data):
370 if force_path_paste:
371 file_path = self.test_data_path(tool_id, file_name)
372 tool_input.update({
373 "files_%d|url_paste" % i: "file://" + file_path
374 })
375 else:
376 file_content = self.test_data_download(tool_id, file_name, is_output=False)
377 files["files_%s|file_data" % i] = file_content
378 tool_input.update({
379 "files_%d|type" % i: "upload_dataset",
380 })
381 name = test_data['name']
382 else:
383 name = os.path.basename(fname)
384 tool_input.update({
385 "files_0|NAME": name,
386 "files_0|type": "upload_dataset",
387 })
388 files = {}
389 if force_path_paste:
390 file_name = self.test_data_path(tool_id, fname)
391 tool_input.update({
392 "files_0|url_paste": "file://" + file_name
393 })
394 else:
395 file_content = self.test_data_download(tool_id, fname, is_output=False)
396 files = {
397 "files_0|file_data": file_content
398 }
399 submit_response_object = self.__submit_tool(history_id, "upload1", tool_input, extra_data={"type": "upload_dataset"}, files=files)
400 submit_response = ensure_tool_run_response_okay(submit_response_object, "upload dataset %s" % name)
401 assert "outputs" in submit_response, "Invalid response from server [%s], expecting outputs in response." % submit_response
402 outputs = submit_response["outputs"]
403 assert len(outputs) > 0, "Invalid response from server [%s], expecting an output dataset." % submit_response
404 dataset = outputs[0]
405 hid = dataset['id']
406 self.uploads[os.path.basename(fname)] = self.uploads[fname] = self.uploads[name] = {"src": "hda", "id": hid}
407 assert "jobs" in submit_response, "Invalid response from server [%s], expecting jobs in response." % submit_response
408 jobs = submit_response["jobs"]
409 assert len(jobs) > 0, "Invalid response from server [%s], expecting a job." % submit_response
410 return lambda: self.wait_for_job(jobs[0]["id"], history_id, maxseconds=maxseconds)
411
412 def run_tool(self, testdef, history_id, resource_parameters=None):
413 # We need to handle the case where we've uploaded a valid compressed file since the upload
414 # tool will have uncompressed it on the fly.
415 resource_parameters = resource_parameters or {}
416 inputs_tree = testdef.inputs.copy()
417 for key, value in inputs_tree.items():
418 values = [value] if not isinstance(value, list) else value
419 new_values = []
420 for value in values:
421 if isinstance(value, TestCollectionDef):
422 hdca_id = self._create_collection(history_id, value)
423 new_values = [dict(src="hdca", id=hdca_id)]
424 elif value in self.uploads:
425 new_values.append(self.uploads[value])
426 else:
427 new_values.append(value)
428 inputs_tree[key] = new_values
429
430 if resource_parameters:
431 inputs_tree["__job_resource|__job_resource__select"] = "yes"
432 for key, value in resource_parameters.items():
433 inputs_tree["__job_resource|%s" % key] = value
434
435 # HACK: Flatten single-value lists. Required when using expand_grouping
436 for key, value in inputs_tree.items():
437 if isinstance(value, list) and len(value) == 1:
438 inputs_tree[key] = value[0]
439
440 submit_response = None
441 for _ in range(DEFAULT_TOOL_TEST_WAIT):
442 submit_response = self.__submit_tool(history_id, tool_id=testdef.tool_id, tool_input=inputs_tree)
443 if _are_tool_inputs_not_ready(submit_response):
444 print("Tool inputs not ready yet")
445 time.sleep(1)
446 continue
447 else:
448 break
449
450 submit_response_object = ensure_tool_run_response_okay(submit_response, "execute tool", inputs_tree)
451 try:
452 return Bunch(
453 inputs=inputs_tree,
454 outputs=self.__dictify_outputs(submit_response_object),
455 output_collections=self.__dictify_output_collections(submit_response_object),
456 jobs=submit_response_object['jobs'],
457 )
458 except KeyError:
459 message = "Error creating a job for these tool inputs - %s" % submit_response_object['err_msg']
460 raise RunToolException(message, inputs_tree)
461
462 def _create_collection(self, history_id, collection_def):
463 create_payload = dict(
464 name=collection_def.name,
465 element_identifiers=dumps(self._element_identifiers(collection_def)),
466 collection_type=collection_def.collection_type,
467 history_id=history_id,
468 )
469 return self._post("dataset_collections", data=create_payload).json()["id"]
470
471 def _element_identifiers(self, collection_def):
472 element_identifiers = []
473 for element_dict in collection_def.elements:
474 element_identifier = element_dict["element_identifier"]
475 element_def = element_dict["element_definition"]
476 if isinstance(element_def, TestCollectionDef):
477 subelement_identifiers = self._element_identifiers(element_def)
478 element = dict(
479 name=element_identifier,
480 src="new_collection",
481 collection_type=element_def.collection_type,
482 element_identifiers=subelement_identifiers
483 )
484 else:
485 element = self.uploads[element_def["value"]].copy()
486 element["name"] = element_identifier
487 tags = element_def.get("attributes").get("tags")
488 if tags:
489 element["tags"] = tags.split(",")
490 element_identifiers.append(element)
491 return element_identifiers
492
493 def __dictify_output_collections(self, submit_response):
494 output_collections_dict = {}
495 for output_collection in submit_response['output_collections']:
496 output_collections_dict[output_collection.get("output_name")] = output_collection
497 return output_collections_dict
498
499 def __dictify_outputs(self, datasets_object):
500 # Convert outputs list to a dictionary that can be accessed by
501 # output_name so can be more flexible about ordering of outputs
502 # but also allows fallback to legacy access as list mode.
503 outputs_dict = OutputsDict()
504
505 for output in datasets_object['outputs']:
506 outputs_dict[output.get("output_name")] = output
507 return outputs_dict
508
509 def output_hid(self, output_data):
510 return output_data['id']
511
512 def delete_history(self, history):
513 self._delete(f"histories/{history}")
514
515 def __job_ready(self, job_id, history_id=None):
516 if job_id is None:
517 raise ValueError("__job_ready passed empty job_id")
518 job_json = self._get("jobs/%s" % job_id).json()
519 state = job_json['state']
520 try:
521 return self._state_ready(state, error_msg="Job in error state.")
522 except Exception:
523 if VERBOSE_ERRORS and history_id is not None:
524 self._summarize_history(history_id)
525 raise
526
527 def _summarize_history(self, history_id):
528 if history_id is None:
529 raise ValueError("_summarize_history passed empty history_id")
530 print("Problem in history with id %s - summary of history's datasets and jobs below." % history_id)
531 try:
532 history_contents = self.__contents(history_id)
533 except Exception:
534 print("*TEST FRAMEWORK FAILED TO FETCH HISTORY DETAILS*")
535 return
536
537 for history_content in history_contents:
538
539 dataset = history_content
540
541 print(ERROR_MESSAGE_DATASET_SEP)
542 dataset_id = dataset.get('id', None)
543 print("| %d - %s (HID - NAME) " % (int(dataset['hid']), dataset['name']))
544 if history_content['history_content_type'] == 'dataset_collection':
545 history_contents_json = self._get("histories/{}/contents/dataset_collections/{}".format(history_id, history_content["id"])).json()
546 print("| Dataset Collection: %s" % history_contents_json)
547 print("|")
548 continue
549
550 try:
551 dataset_info = self._dataset_info(history_id, dataset_id)
552 print("| Dataset State:")
553 print(self.format_for_summary(dataset_info.get("state"), "Dataset state is unknown."))
554 print("| Dataset Blurb:")
555 print(self.format_for_summary(dataset_info.get("misc_blurb", ""), "Dataset blurb was empty."))
556 print("| Dataset Info:")
557 print(self.format_for_summary(dataset_info.get("misc_info", ""), "Dataset info is empty."))
558 print("| Peek:")
559 print(self.format_for_summary(dataset_info.get("peek", ""), "Peek unavilable."))
560 except Exception:
561 print("| *TEST FRAMEWORK ERROR FETCHING DATASET DETAILS*")
562 try:
563 provenance_info = self._dataset_provenance(history_id, dataset_id)
564 print("| Dataset Job Standard Output:")
565 print(self.format_for_summary(provenance_info.get("stdout", ""), "Standard output was empty."))
566 print("| Dataset Job Standard Error:")
567 print(self.format_for_summary(provenance_info.get("stderr", ""), "Standard error was empty."))
568 except Exception:
569 print("| *TEST FRAMEWORK ERROR FETCHING JOB DETAILS*")
570 print("|")
571 try:
572 jobs_json = self._get("jobs?history_id=%s" % history_id).json()
573 for job_json in jobs_json:
574 print(ERROR_MESSAGE_DATASET_SEP)
575 print("| Job %s" % job_json["id"])
576 print("| State: ")
577 print(self.format_for_summary(job_json.get("state", ""), "Job state is unknown."))
578 print("| Update Time:")
579 print(self.format_for_summary(job_json.get("update_time", ""), "Job update time is unknown."))
580 print("| Create Time:")
581 print(self.format_for_summary(job_json.get("create_time", ""), "Job create time is unknown."))
582 print("|")
583 print(ERROR_MESSAGE_DATASET_SEP)
584 except Exception:
585 print(ERROR_MESSAGE_DATASET_SEP)
586 print("*TEST FRAMEWORK FAILED TO FETCH HISTORY JOBS*")
587 print(ERROR_MESSAGE_DATASET_SEP)
588
589 def format_for_summary(self, blob, empty_message, prefix="| "):
590 contents = "\n".join(f"{prefix}{line.strip()}" for line in io.StringIO(blob).readlines() if line.rstrip("\n\r"))
591 return contents or f"{prefix}*{empty_message}*"
592
593 def _dataset_provenance(self, history_id, id):
594 provenance = self._get(f"histories/{history_id}/contents/{id}/provenance").json()
595 return provenance
596
597 def _dataset_info(self, history_id, id):
598 dataset_json = self._get(f"histories/{history_id}/contents/{id}").json()
599 return dataset_json
600
601 def __contents(self, history_id):
602 history_contents_response = self._get("histories/%s/contents" % history_id)
603 history_contents_response.raise_for_status()
604 return history_contents_response.json()
605
606 def _state_ready(self, state_str, error_msg):
607 if state_str == 'ok':
608 return True
609 elif state_str == 'error':
610 raise Exception(error_msg)
611 return None
612
613 def __submit_tool(self, history_id, tool_id, tool_input, extra_data=None, files=None):
614 extra_data = extra_data or {}
615 data = dict(
616 history_id=history_id,
617 tool_id=tool_id,
618 inputs=dumps(tool_input),
619 **extra_data
620 )
621 return self._post("tools", files=files, data=data)
622
623 def ensure_user_with_email(self, email, password=None):
624 admin_key = self.master_api_key
625 all_users_response = self._get('users', key=admin_key)
626 try:
627 all_users_response.raise_for_status()
628 except requests.exceptions.HTTPError as e:
629 raise Exception(f"Failed to verify user with email [{email}] exists - perhaps you're targetting the wrong Galaxy server or using an incorrect admin API key. HTTP error: {e}")
630 all_users = all_users_response.json()
631 try:
632 test_user = [user for user in all_users if user["email"] == email][0]
633 except IndexError:
634 username = re.sub(r"[^a-z-\d]", '--', email.lower())
635 password = password or 'testpass'
636 # If remote user middleware is enabled - this endpoint consumes
637 # ``remote_user_email`` otherwise it requires ``email``, ``password``
638 # and ``username``.
639 data = dict(
640 remote_user_email=email,
641 email=email,
642 password=password,
643 username=username,
644 )
645 test_user = self._post('users', data, key=admin_key).json()
646 return test_user
647
648 def __test_data_downloader(self, tool_id):
649 def test_data_download(filename, mode='file'):
650 return self.test_data_download(tool_id, filename, mode=mode)
651 return test_data_download
652
653 def __dataset_fetcher(self, history_id):
654 def fetcher(hda_id, base_name=None):
655 url = f"histories/{history_id}/contents/{hda_id}/display?raw=true"
656 if base_name:
657 url += "&filename=%s" % base_name
658 response = None
659 for _ in range(self.download_attempts):
660 response = self._get(url)
661 if response.status_code == 500:
662 print(f"Retrying failed download with status code {response.status_code}")
663 time.sleep(self.download_sleep)
664 continue
665 else:
666 break
667
668 response.raise_for_status()
669 return response.content
670
671 return fetcher
672
673 def api_key_header(self, key, admin, anon):
674 header = {}
675 if not anon:
676 if not key:
677 key = self.api_key if not admin else self.master_api_key
678 header['x-api-key'] = key
679 return header
680
681 def _post(self, path, data=None, files=None, key=None, admin=False, anon=False, json=False):
682 # If json=True, use post payload using request's json parameter instead of the data
683 # parameter (i.e. assume the contents is a jsonified blob instead of form parameters
684 # with individual parameters jsonified if needed).
685 headers = self.api_key_header(key=key, admin=admin, anon=anon)
686 url = f"{self.api_url}/{path}"
687 return galaxy_requests_post(url, data=data, files=files, as_json=json, headers=headers)
688
689 def _delete(self, path, data=None, key=None, admin=False, anon=False):
690 headers = self.api_key_header(key=key, admin=admin, anon=anon)
691 return requests.delete(f"{self.api_url}/{path}", params=data, headers=headers)
692
693 def _patch(self, path, data=None, key=None, admin=False, anon=False):
694 headers = self.api_key_header(key=key, admin=admin, anon=anon)
695 return requests.patch(f"{self.api_url}/{path}", data=data, headers=headers)
696
697 def _put(self, path, data=None, key=None, admin=False, anon=False):
698 headers = self.api_key_header(key=key, admin=admin, anon=anon)
699 return requests.put(f"{self.api_url}/{path}", data=data, headers=headers)
700
701 def _get(self, path, data=None, key=None, admin=False, anon=False):
702 headers = self.api_key_header(key=key, admin=admin, anon=anon)
703 if path.startswith("/api"):
704 path = path[len("/api"):]
705 url = f"{self.api_url}/{path}"
706 # no data for GET
707 return requests.get(url, params=data, headers=headers)
708
709
710 def ensure_tool_run_response_okay(submit_response_object, request_desc, inputs=None):
711 if submit_response_object.status_code != 200:
712 message = None
713 dynamic_param_error = False
714 try:
715 err_response = submit_response_object.json()
716 if "param_errors" in err_response:
717 param_errors = err_response["param_errors"]
718 if "dbkey" in param_errors:
719 dbkey_err_obj = param_errors["dbkey"]
720 dbkey_val = dbkey_err_obj.get("parameter_value")
721 message = "Invalid dbkey specified [%s]" % dbkey_val
722 for value in param_errors.values():
723 if isinstance(value, dict) and value.get("is_dynamic"):
724 dynamic_param_error = True
725 if message is None:
726 message = err_response.get("err_msg") or None
727 except Exception:
728 # invalid JSON content.
729 pass
730 if message is None:
731 template = "Request to %s failed - invalid JSON content returned from Galaxy server [%s]"
732 message = template % (request_desc, submit_response_object.text)
733 raise RunToolException(message, inputs, dynamic_param_error=dynamic_param_error)
734 submit_response = submit_response_object.json()
735 return submit_response
736
737
738 def _are_tool_inputs_not_ready(submit_response):
739 if submit_response.status_code != 400:
740 return False
741 try:
742 submit_json = submit_response.json()
743 return submit_json.get("err_code") == 400015
744 except Exception:
745 return False
746
747
748 class RunToolException(Exception):
749
750 def __init__(self, message, inputs=None, dynamic_param_error=False):
751 super().__init__(message)
752 self.inputs = inputs
753 self.dynamic_param_error = dynamic_param_error
754
755
756 # Galaxy specific methods - rest of this can be used with arbitrary files and such.
757 def verify_hid(filename, hda_id, attributes, test_data_downloader, hid="", dataset_fetcher=None, keep_outputs_dir=False):
758 assert dataset_fetcher is not None
759
760 def verify_extra_files(extra_files):
761 _verify_extra_files_content(extra_files, hda_id, dataset_fetcher=dataset_fetcher, test_data_downloader=test_data_downloader, keep_outputs_dir=keep_outputs_dir)
762
763 data = dataset_fetcher(hda_id)
764 item_label = ""
765 verify(
766 item_label,
767 data,
768 attributes=attributes,
769 filename=filename,
770 get_filecontent=test_data_downloader,
771 keep_outputs_dir=keep_outputs_dir,
772 verify_extra_files=verify_extra_files,
773 )
774
775
776 def verify_collection(output_collection_def, data_collection, verify_dataset):
777 name = output_collection_def.name
778
779 expected_collection_type = output_collection_def.collection_type
780 if expected_collection_type:
781 collection_type = data_collection["collection_type"]
782 if expected_collection_type != collection_type:
783 template = "Expected output collection [%s] to be of type [%s], was of type [%s]."
784 message = template % (name, expected_collection_type, collection_type)
785 raise AssertionError(message)
786
787 expected_element_count = output_collection_def.count
788 if expected_element_count:
789 actual_element_count = len(data_collection["elements"])
790 if expected_element_count != actual_element_count:
791 template = "Expected output collection [%s] to have %s elements, but it had %s."
792 message = template % (name, expected_element_count, actual_element_count)
793 raise AssertionError(message)
794
795 def get_element(elements, id):
796 for element in elements:
797 if element["element_identifier"] == id:
798 return element
799 return False
800
801 def verify_elements(element_objects, element_tests):
802 # sorted_test_ids = [None] * len(element_tests)
803 expected_sort_order = []
804
805 eo_ids = [_["element_identifier"] for _ in element_objects]
806 for element_identifier, element_test in element_tests.items():
807 if isinstance(element_test, dict):
808 element_outfile, element_attrib = None, element_test
809 else:
810 element_outfile, element_attrib = element_test
811 if 'expected_sort_order' in element_attrib:
812 expected_sort_order.append(element_identifier)
813
814 element = get_element(element_objects, element_identifier)
815 if not element:
816 template = "Failed to find identifier '%s' in the tool generated collection elements %s"
817 message = template % (element_identifier, eo_ids)
818 raise AssertionError(message)
819
820 element_type = element["element_type"]
821 if element_type != "dataset_collection":
822 verify_dataset(element, element_attrib, element_outfile)
823 if element_type == "dataset_collection":
824 elements = element["object"]["elements"]
825 verify_elements(elements, element_attrib.get("elements", {}))
826
827 if len(expected_sort_order) > 0:
828 i = 0
829 for element_identifier in expected_sort_order:
830 element = None
831 while i < len(element_objects):
832 if element_objects[i]["element_identifier"] == element_identifier:
833 element = element_objects[i]
834 i += 1
835 break
836 i += 1
837 if element is None:
838 template = "Collection identifier '%s' found out of order, expected order of %s for the tool generated collection elements %s"
839 message = template % (element_identifier, expected_sort_order, eo_ids)
840 raise AssertionError(message)
841
842 verify_elements(data_collection["elements"], output_collection_def.element_tests)
843
844
845 def _verify_composite_datatype_file_content(file_name, hda_id, base_name=None, attributes=None, dataset_fetcher=None, test_data_downloader=None, keep_outputs_dir=False, mode='file'):
846 assert dataset_fetcher is not None
847
848 data = dataset_fetcher(hda_id, base_name)
849 item_label = "History item %s" % hda_id
850 try:
851 verify(
852 item_label,
853 data,
854 attributes=attributes,
855 filename=file_name,
856 get_filecontent=test_data_downloader,
857 keep_outputs_dir=keep_outputs_dir,
858 mode=mode,
859 )
860 except AssertionError as err:
861 errmsg = f'Composite file ({base_name}) of {item_label} different than expected, difference:\n'
862 errmsg += util.unicodify(err)
863 raise AssertionError(errmsg)
864
865
866 def _verify_extra_files_content(extra_files, hda_id, dataset_fetcher, test_data_downloader, keep_outputs_dir):
867 files_list = []
868 cleanup_directories = []
869 for extra_file_dict in extra_files:
870 extra_file_type = extra_file_dict["type"]
871 extra_file_name = extra_file_dict["name"]
872 extra_file_attributes = extra_file_dict["attributes"]
873 extra_file_value = extra_file_dict["value"]
874
875 if extra_file_type == 'file':
876 files_list.append((extra_file_name, extra_file_value, extra_file_attributes, extra_file_type))
877 elif extra_file_type == 'directory':
878 extracted_path = test_data_downloader(extra_file_value, mode='directory')
879 cleanup_directories.append(extracted_path)
880 for root, directories, files in util.path.safe_walk(extracted_path):
881 for filename in files:
882 filename = os.path.join(root, filename)
883 filename = os.path.relpath(filename, extracted_path)
884 files_list.append((filename, os.path.join(extracted_path, filename), extra_file_attributes, extra_file_type))
885 else:
886 raise ValueError('unknown extra_files type: %s' % extra_file_type)
887 try:
888 for filename, filepath, attributes, extra_file_type in files_list:
889 _verify_composite_datatype_file_content(filepath, hda_id, base_name=filename, attributes=attributes, dataset_fetcher=dataset_fetcher, test_data_downloader=test_data_downloader, keep_outputs_dir=keep_outputs_dir, mode=extra_file_type)
890 finally:
891 for path in cleanup_directories:
892 shutil.rmtree(path)
893
894
895 class NullClientTestConfig:
896
897 def get_test_config(self, job_data):
898 return None
899
900
901 class DictClientTestConfig:
902
903 def __init__(self, tools):
904 self._tools = tools or {}
905
906 def get_test_config(self, job_data):
907 # TODO: allow short ids, allow versions below outer id instead of key concatenation.
908 tool_id = job_data.get("tool_id")
909 tool_version = job_data.get("tool_version")
910 tool_test_config = None
911 tool_version_test_config = None
912 is_default = False
913 if tool_id in self._tools:
914 tool_test_config = self._tools[tool_id]
915 if tool_test_config is None:
916 tool_id = f"{tool_id}/{tool_version}"
917 if tool_id in self._tools:
918 tool_version_test_config = self._tools[tool_id]
919 else:
920 if tool_version in tool_test_config:
921 tool_version_test_config = tool_test_config[tool_version]
922 elif "default" in tool_test_config:
923 tool_version_test_config = tool_test_config["default"]
924 is_default = True
925
926 if tool_version_test_config:
927 test_index = job_data.get("test_index")
928 if test_index in tool_version_test_config:
929 return tool_version_test_config[test_index]
930 elif str(test_index) in tool_version_test_config:
931 return tool_version_test_config[str(test_index)]
932 if 'default' in tool_version_test_config:
933 return tool_version_test_config['default']
934 elif is_default:
935 return tool_version_test_config
936 return None
937
938
939 def verify_tool(tool_id,
940 galaxy_interactor,
941 resource_parameters=None,
942 register_job_data=None,
943 test_index=0,
944 tool_version=None,
945 quiet=False,
946 test_history=None,
947 no_history_cleanup=False,
948 publish_history=False,
949 force_path_paste=False,
950 maxseconds=DEFAULT_TOOL_TEST_WAIT,
951 tool_test_dicts=None,
952 client_test_config=None,
953 skip_with_reference_data=False,
954 skip_on_dynamic_param_errors=False):
955 if resource_parameters is None:
956 resource_parameters = {}
957 if client_test_config is None:
958 client_test_config = NullClientTestConfig()
959 tool_test_dicts = tool_test_dicts or galaxy_interactor.get_tool_tests(tool_id, tool_version=tool_version)
960 tool_test_dict = tool_test_dicts[test_index]
961 if "test_index" not in tool_test_dict:
962 tool_test_dict["test_index"] = test_index
963 if "tool_id" not in tool_test_dict:
964 tool_test_dict["tool_id"] = tool_id
965 if tool_version is None and "tool_version" in tool_test_dict:
966 tool_version = tool_test_dict.get("tool_version")
967
968 job_data = {
969 "tool_id": tool_id,
970 "tool_version": tool_version,
971 "test_index": test_index,
972 }
973 client_config = client_test_config.get_test_config(job_data)
974 skip_message = None
975 if client_config is not None:
976 job_data.update(client_config)
977 skip_message = job_data.get("skip")
978
979 if not skip_message and skip_with_reference_data:
980 required_data_tables = tool_test_dict.get("required_data_tables")
981 required_loc_files = tool_test_dict.get("required_loc_files")
982 # TODO: actually hit the API and see if these tables are available.
983 if required_data_tables:
984 skip_message = f"Skipping test because of required data tables ({required_data_tables})"
985 if required_loc_files:
986 skip_message = f"Skipping test because of required loc files ({required_loc_files})"
987
988 if skip_message:
989 job_data["status"] = "skip"
990 register_job_data(job_data)
991 return
992
993 tool_test_dict.setdefault('maxseconds', maxseconds)
994 testdef = ToolTestDescription(tool_test_dict)
995 _handle_def_errors(testdef)
996
997 created_history = False
998 if test_history is None:
999 created_history = True
1000 history_name = f"Tool Test History for {tool_id}/{tool_version}-{test_index}"
1001 test_history = galaxy_interactor.new_history(history_name=history_name, publish_history=publish_history)
1002
1003 # Upload data to test_history, run the tool and check the outputs - record
1004 # API input, job info, tool run exception, as well as exceptions related to
1005 # job output checking and register they with the test plugin so it can
1006 # record structured information.
1007 tool_inputs = None
1008 job_stdio = None
1009 job_output_exceptions = None
1010 tool_execution_exception = None
1011 input_staging_exception = None
1012 expected_failure_occurred = False
1013 begin_time = time.time()
1014 try:
1015 try:
1016 stage_data_in_history(
1017 galaxy_interactor,
1018 tool_id,
1019 testdef.test_data(),
1020 history=test_history,
1021 force_path_paste=force_path_paste,
1022 maxseconds=maxseconds,
1023 )
1024 except Exception as e:
1025 input_staging_exception = e
1026 raise
1027 try:
1028 tool_response = galaxy_interactor.run_tool(testdef, test_history, resource_parameters=resource_parameters)
1029 data_list, jobs, tool_inputs = tool_response.outputs, tool_response.jobs, tool_response.inputs
1030 data_collection_list = tool_response.output_collections
1031 except RunToolException as e:
1032 tool_inputs = e.inputs
1033 tool_execution_exception = e
1034 if not testdef.expect_failure:
1035 raise e
1036 else:
1037 expected_failure_occurred = True
1038 except Exception as e:
1039 tool_execution_exception = e
1040 raise e
1041
1042 if not expected_failure_occurred:
1043 assert data_list or data_collection_list
1044
1045 try:
1046 job_stdio = _verify_outputs(testdef, test_history, jobs, tool_id, data_list, data_collection_list, galaxy_interactor, quiet=quiet)
1047 except JobOutputsError as e:
1048 job_stdio = e.job_stdio
1049 job_output_exceptions = e.output_exceptions
1050 raise e
1051 except Exception as e:
1052 job_output_exceptions = [e]
1053 raise e
1054 finally:
1055 if register_job_data is not None:
1056 end_time = time.time()
1057 job_data["time_seconds"] = end_time - begin_time
1058 if tool_inputs is not None:
1059 job_data["inputs"] = tool_inputs
1060 if job_stdio is not None:
1061 job_data["job"] = job_stdio
1062 status = "success"
1063 if job_output_exceptions:
1064 job_data["output_problems"] = [util.unicodify(_) for _ in job_output_exceptions]
1065 status = "failure"
1066 if tool_execution_exception:
1067 job_data["execution_problem"] = util.unicodify(tool_execution_exception)
1068 dynamic_param_error = getattr(tool_execution_exception, "dynamic_param_error", False)
1069 job_data["dynamic_param_error"] = dynamic_param_error
1070 status = "error" if not skip_on_dynamic_param_errors or not dynamic_param_error else "skip"
1071 if input_staging_exception:
1072 job_data["execution_problem"] = "Input staging problem: %s" % util.unicodify(input_staging_exception)
1073 status = "error"
1074 job_data["status"] = status
1075 register_job_data(job_data)
1076
1077 if created_history and not no_history_cleanup:
1078 galaxy_interactor.delete_history(test_history)
1079
1080
1081 def _handle_def_errors(testdef):
1082 # If the test generation had an error, raise
1083 if testdef.error:
1084 if testdef.exception:
1085 if isinstance(testdef.exception, Exception):
1086 raise testdef.exception
1087 else:
1088 raise Exception(testdef.exception)
1089 else:
1090 raise Exception("Test parse failure")
1091
1092
1093 def _verify_outputs(testdef, history, jobs, tool_id, data_list, data_collection_list, galaxy_interactor, quiet=False):
1094 assert len(jobs) == 1, "Test framework logic error, somehow tool test resulted in more than one job."
1095 job = jobs[0]
1096
1097 maxseconds = testdef.maxseconds
1098 if testdef.num_outputs is not None:
1099 expected = testdef.num_outputs
1100 actual = len(data_list) + len(data_collection_list)
1101 if expected != actual:
1102 message_template = "Incorrect number of outputs - expected %d, found %s."
1103 message = message_template % (expected, actual)
1104 raise Exception(message)
1105 found_exceptions = []
1106
1107 def register_exception(e):
1108 if not found_exceptions and not quiet:
1109 # Only print this stuff out once.
1110 for stream in ['stdout', 'stderr']:
1111 if stream in job_stdio:
1112 print(_format_stream(job_stdio[stream], stream=stream, format=True), file=sys.stderr)
1113 found_exceptions.append(e)
1114
1115 if testdef.expect_failure:
1116 if testdef.outputs:
1117 raise Exception("Cannot specify outputs in a test expecting failure.")
1118
1119 # Wait for the job to complete and register expections if the final
1120 # status was not what test was expecting.
1121 job_failed = False
1122 try:
1123 galaxy_interactor.wait_for_job(job['id'], history, maxseconds)
1124 except Exception as e:
1125 job_failed = True
1126 if not testdef.expect_failure:
1127 found_exceptions.append(e)
1128
1129 job_stdio = galaxy_interactor.get_job_stdio(job['id'])
1130
1131 if not job_failed and testdef.expect_failure:
1132 error = AssertionError("Expected job to fail but Galaxy indicated the job successfully completed.")
1133 register_exception(error)
1134
1135 expect_exit_code = testdef.expect_exit_code
1136 if expect_exit_code is not None:
1137 exit_code = job_stdio["exit_code"]
1138 if str(expect_exit_code) != str(exit_code):
1139 error = AssertionError(f"Expected job to complete with exit code {expect_exit_code}, found {exit_code}")
1140 register_exception(error)
1141
1142 for output_index, output_dict in enumerate(testdef.outputs):
1143 # Get the correct hid
1144 name = output_dict["name"]
1145 outfile = output_dict["value"]
1146 attributes = output_dict["attributes"]
1147 output_testdef = Bunch(name=name, outfile=outfile, attributes=attributes)
1148 try:
1149 output_data = data_list[name]
1150 except (TypeError, KeyError):
1151 # Legacy - fall back on ordered data list access if data_list is
1152 # just a list (case with twill variant or if output changes its
1153 # name).
1154 if hasattr(data_list, "values"):
1155 output_data = list(data_list.values())[output_index]
1156 else:
1157 output_data = data_list[len(data_list) - len(testdef.outputs) + output_index]
1158 assert output_data is not None
1159 try:
1160 galaxy_interactor.verify_output(history, jobs, output_data, output_testdef=output_testdef, tool_id=tool_id, maxseconds=maxseconds)
1161 except Exception as e:
1162 register_exception(e)
1163
1164 other_checks = {
1165 "command_line": "Command produced by the job",
1166 "command_version": "Tool version indicated during job execution",
1167 "stdout": "Standard output of the job",
1168 "stderr": "Standard error of the job",
1169 }
1170 # TODO: Only hack the stdio like this for older profile, for newer tool profiles
1171 # add some syntax for asserting job messages maybe - or just drop this because exit
1172 # code and regex on stdio can be tested directly - so this is really testing Galaxy
1173 # core handling more than the tool.
1174 job_messages = job_stdio.get("job_messages") or []
1175 stdout_prefix = ""
1176 stderr_prefix = ""
1177 for job_message in job_messages:
1178 message_type = job_message.get("type")
1179 if message_type == "regex" and job_message.get("stream") == "stderr":
1180 stderr_prefix += (job_message.get("desc") or '') + "\n"
1181 elif message_type == "regex" and job_message.get("stream") == "stdout":
1182 stdout_prefix += (job_message.get("desc") or '') + "\n"
1183 elif message_type == "exit_code":
1184 stderr_prefix += (job_message.get("desc") or '') + "\n"
1185 else:
1186 raise Exception(f"Unknown job message type [{message_type}] in [{job_message}]")
1187
1188 for what, description in other_checks.items():
1189 if getattr(testdef, what, None) is not None:
1190 try:
1191 raw_data = job_stdio[what]
1192 assertions = getattr(testdef, what)
1193 if what == "stdout":
1194 data = stdout_prefix + raw_data
1195 elif what == "stderr":
1196 data = stderr_prefix + raw_data
1197 else:
1198 data = raw_data
1199 verify_assertions(data, assertions)
1200 except AssertionError as err:
1201 errmsg = '%s different than expected\n' % description
1202 errmsg += util.unicodify(err)
1203 register_exception(AssertionError(errmsg))
1204
1205 for output_collection_def in testdef.output_collections:
1206 try:
1207 name = output_collection_def.name
1208 # TODO: data_collection_list is clearly a bad name for dictionary.
1209 if name not in data_collection_list:
1210 template = "Failed to find output [%s], tool outputs include [%s]"
1211 message = template % (name, ",".join(data_collection_list.keys()))
1212 raise AssertionError(message)
1213
1214 # Data collection returned from submission, elements may have been populated after
1215 # the job completed so re-hit the API for more information.
1216 data_collection_id = data_collection_list[name]["id"]
1217 galaxy_interactor.verify_output_collection(output_collection_def, data_collection_id, history, tool_id)
1218 except Exception as e:
1219 register_exception(e)
1220
1221 if found_exceptions:
1222 raise JobOutputsError(found_exceptions, job_stdio)
1223 else:
1224 return job_stdio
1225
1226
1227 def _format_stream(output, stream, format):
1228 output = output or ''
1229 if format:
1230 msg = "---------------------- >> begin tool %s << -----------------------\n" % stream
1231 msg += output + "\n"
1232 msg += "----------------------- >> end tool %s << ------------------------\n" % stream
1233 else:
1234 msg = output
1235 return msg
1236
1237
1238 class JobOutputsError(AssertionError):
1239
1240 def __init__(self, output_exceptions, job_stdio):
1241 big_message = "\n".join(map(util.unicodify, output_exceptions))
1242 super().__init__(big_message)
1243 self.job_stdio = job_stdio
1244 self.output_exceptions = output_exceptions
1245
1246
1247 class ToolTestDescription:
1248 """
1249 Encapsulates information about a tool test, and allows creation of a
1250 dynamic TestCase class (the unittest framework is very class oriented,
1251 doing dynamic tests in this way allows better integration)
1252 """
1253
1254 def __init__(self, processed_test_dict):
1255 assert "test_index" in processed_test_dict, "Invalid processed test description, must have a 'test_index' for naming, etc.."
1256 test_index = processed_test_dict["test_index"]
1257 name = processed_test_dict.get('name', 'Test-%d' % (test_index + 1))
1258 maxseconds = processed_test_dict.get('maxseconds', DEFAULT_TOOL_TEST_WAIT)
1259 if maxseconds is not None:
1260 maxseconds = int(maxseconds)
1261
1262 self.test_index = test_index
1263 assert "tool_id" in processed_test_dict, "Invalid processed test description, must have a 'tool_id' for naming, etc.."
1264 self.tool_id = processed_test_dict["tool_id"]
1265 self.tool_version = processed_test_dict.get("tool_version")
1266 self.name = name
1267 self.maxseconds = maxseconds
1268 self.required_files = processed_test_dict.get("required_files", [])
1269 self.required_data_tables = processed_test_dict.get("required_data_tables", [])
1270 self.required_loc_files = processed_test_dict.get("required_loc_files", [])
1271
1272 inputs = processed_test_dict.get("inputs", {})
1273 loaded_inputs = {}
1274 for key, value in inputs.items():
1275 if isinstance(value, dict) and value.get("model_class"):
1276 loaded_inputs[key] = TestCollectionDef.from_dict(value)
1277 else:
1278 loaded_inputs[key] = value
1279
1280 self.inputs = loaded_inputs
1281 self.outputs = processed_test_dict.get("outputs", [])
1282 self.num_outputs = processed_test_dict.get("num_outputs", None)
1283
1284 self.error = processed_test_dict.get("error", False)
1285 self.exception = processed_test_dict.get("exception", None)
1286
1287 self.output_collections = [TestCollectionOutputDef.from_dict(d) for d in processed_test_dict.get("output_collections", [])]
1288 self.command_line = processed_test_dict.get("command_line", None)
1289 self.command_version = processed_test_dict.get("command_version", None)
1290 self.stdout = processed_test_dict.get("stdout", None)
1291 self.stderr = processed_test_dict.get("stderr", None)
1292 self.expect_exit_code = processed_test_dict.get("expect_exit_code", None)
1293 self.expect_failure = processed_test_dict.get("expect_failure", False)
1294
1295 def test_data(self):
1296 """
1297 Iterator over metadata representing the required files for upload.
1298 """
1299 return test_data_iter(self.required_files)
1300
1301 def to_dict(self):
1302 inputs_dict = {}
1303 for key, value in self.inputs.items():
1304 if hasattr(value, "to_dict"):
1305 inputs_dict[key] = value.to_dict()
1306 else:
1307 inputs_dict[key] = value
1308
1309 return {
1310 "inputs": inputs_dict,
1311 "outputs": self.outputs,
1312 "output_collections": [_.to_dict() for _ in self.output_collections],
1313 "num_outputs": self.num_outputs,
1314 "command_line": self.command_line,
1315 "command_version": self.command_version,
1316 "stdout": self.stdout,
1317 "stderr": self.stderr,
1318 "expect_exit_code": self.expect_exit_code,
1319 "expect_failure": self.expect_failure,
1320 "name": self.name,
1321 "test_index": self.test_index,
1322 "tool_id": self.tool_id,
1323 "tool_version": self.tool_version,
1324 "required_files": self.required_files,
1325 "required_data_tables": self.required_data_tables,
1326 "required_loc_files": self.required_loc_files,
1327 "error": self.error,
1328 "exception": self.exception,
1329 }
1330
1331
1332 @nottest
1333 def test_data_iter(required_files):
1334 for fname, extra in required_files:
1335 data_dict = dict(
1336 fname=fname,
1337 metadata=extra.get('metadata', {}),
1338 composite_data=extra.get('composite_data', []),
1339 ftype=extra.get('ftype', DEFAULT_FTYPE),
1340 dbkey=extra.get('dbkey', DEFAULT_DBKEY),
1341 )
1342 edit_attributes = extra.get('edit_attributes', [])
1343
1344 # currently only renaming is supported
1345 for edit_att in edit_attributes:
1346 if edit_att.get('type', None) == 'name':
1347 new_name = edit_att.get('value', None)
1348 assert new_name, 'You must supply the new dataset name as the value tag of the edit_attributes tag'
1349 data_dict['name'] = new_name
1350 else:
1351 raise Exception('edit_attributes type (%s) is unimplemented' % edit_att.get('type', None))
1352
1353 yield data_dict
1354
1355
1356 def galaxy_requests_post(url, data=None, files=None, as_json=False, params=None, headers=None):
1357 """Handle some Galaxy conventions and work around requests issues.
1358
1359 This is admittedly kind of hacky, so the interface may change frequently - be
1360 careful on reuse.
1361
1362 If ``as_json`` is True, use post payload using request's json parameter instead
1363 of the data parameter (i.e. assume the contents is a json-ified blob instead of
1364 form parameters with individual parameters json-ified if needed). requests doesn't
1365 allow files to be specified with the json parameter - so rewrite the parameters
1366 to handle that if as_json is True with specified files.
1367 """
1368 params = params or {}
1369 data = data or {}
1370
1371 # handle encoded files
1372 if files is None:
1373 # if not explicitly passed, check __files... convention used in tool testing
1374 # and API testing code
1375 files = data.get("__files", None)
1376 if files is not None:
1377 del data["__files"]
1378
1379 # files doesn't really work with json, so dump the parameters
1380 # and do a normal POST with request's data parameter.
1381 if bool(files) and as_json:
1382 as_json = False
1383 new_items = {}
1384 for key, val in data.items():
1385 if isinstance(val, dict) or isinstance(val, list):
1386 new_items[key] = dumps(val)
1387 data.update(new_items)
1388
1389 kwd = {
1390 'files': files,
1391 }
1392 if headers:
1393 kwd['headers'] = headers
1394 if as_json:
1395 kwd['json'] = data
1396 kwd['params'] = params
1397 else:
1398 data.update(params)
1399 kwd['data'] = data
1400
1401 return requests.post(url, **kwd)