comparison env/lib/python3.9/site-packages/ephemeris/shed_tools.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """
2 A tool to automate installation of tool repositories from a Galaxy Tool Shed
3 into an instance of Galaxy.
4
5 Shed-tools has three commands: update, test and install.
6
7 Update simply updates all the tools in a Galaxy given connection details on the command line.
8
9 Test tests the specified tools in the Galaxy Instance.
10
11 Install allows installation of tools in multiple ways.
12 Galaxy instance details and the installed tools can be provided in one of three
13 ways:
14
15 1. In the YAML format via dedicated files (a sample can be found
16 `here <https://github.com/galaxyproject/ansible-galaxy-tools/blob/master/files/tool_list.yaml.sample>`_).
17 2. On the command line as dedicated script options (see the usage help).
18 3. As a single composite parameter to the script. The parameter must be a
19 single, YAML-formatted string with the keys corresponding to the keys
20 available for use in the YAML formatted file (for example:
21 `--yaml_tool "{'owner': 'kellrott', 'tool_shed_url':
22 'https://testtoolshed.g2.bx.psu.edu', 'tool_panel_section_id':
23 'peak_calling', 'name': 'synapse_interface'}"`).
24
25 Only one of the methods can be used with each invocation of the script but if
26 more than one are provided are provided, precedence will correspond to order
27 of the items in the list above.
28 When installing tools, Galaxy expects any `tool_panel_section_id` provided when
29 installing a tool to already exist in the configuration. If the section
30 does not exist, the tool will be installed outside any section. See
31 `shed_tool_conf.xml.sample` in this directory for a sample of such file. Before
32 running this script to install the tools, make sure to place such file into
33 Galaxy's configuration directory and set Galaxy configuration option
34 `tool_config_file` to include it.
35 """
36 import datetime as dt
37 import json
38 import os
39 import re
40 import time
41 from collections import namedtuple
42 from concurrent.futures import thread, ThreadPoolExecutor
43
44 import requests
45 import yaml
46 from bioblend.galaxy.client import ConnectionError
47 from bioblend.galaxy.toolshed import ToolShedClient
48 from galaxy.tool_util.verify.interactor import (
49 GalaxyInteractorApi,
50 verify_tool,
51 )
52 from galaxy.util import unicodify
53
54 from . import get_galaxy_connection, load_yaml_file
55 from .ephemeris_log import disable_external_library_logging, setup_global_logger
56 from .get_tool_list_from_galaxy import GiToToolYaml, the_same_repository, tools_for_repository
57 from .shed_tools_args import parser
58 from .shed_tools_methods import complete_repo_information, flatten_repo_info, VALID_KEYS
59
60 NON_TERMINAL_REPOSITORY_STATES = {
61 'New',
62 'Cloning',
63 'Setting tool versions',
64 'Installing repository dependencies',
65 'Installing tool dependencies',
66 'Loading proprietary datatypes'
67 }
68
69
70 class InstallRepositoryManager(object):
71 """Manages the installation of new repositories on a galaxy instance"""
72
73 def __init__(self,
74 galaxy_instance):
75 """Initialize a new tool manager"""
76 self.gi = galaxy_instance
77 self.tool_shed_client = ToolShedClient(self.gi)
78
79 def installed_repositories(self):
80 """Get currently installed tools"""
81 return GiToToolYaml(
82 gi=self.gi,
83 skip_tool_panel_section_name=False,
84 get_data_managers=True,
85 get_all_tools=True
86 ).tool_list.get("tools")
87
88 def filter_installed_repos(self, repos, check_revision=True):
89 # TODO: Find a speedier algorithm.
90 """This filters a list of repositories"""
91 not_installed_repos = []
92 already_installed_repos = []
93 if check_revision:
94 # If we want to check if revisions are equal, flatten the list,
95 # so each repository - revision combination has its own entry
96 installed_repos = flatten_repo_info(self.installed_repositories())
97 else:
98 # If we do not care about revision equality, do not do the flatten
99 # action to limit the number of comparisons.
100 installed_repos = self.installed_repositories()
101
102 for repo in repos:
103 for installed_repo in installed_repos:
104 if the_same_repository(installed_repo, repo, check_revision):
105 already_installed_repos.append(repo)
106 break
107 else: # This executes when the for loop completes and no match has been found.
108 not_installed_repos.append(repo)
109 FilterResults = namedtuple("FilterResults", ["not_installed_repos", "already_installed_repos"])
110 return FilterResults(already_installed_repos=already_installed_repos, not_installed_repos=not_installed_repos)
111
112 def install_repositories(self,
113 repositories,
114 log=None,
115 force_latest_revision=False,
116 default_toolshed='https://toolshed.g2.bx.psu.edu/',
117 default_install_tool_dependencies=False,
118 default_install_resolver_dependencies=True,
119 default_install_repository_dependencies=True):
120 """Install a list of tools on the current galaxy"""
121 if not repositories:
122 raise ValueError("Empty list of tools was given")
123 installation_start = dt.datetime.now()
124 installed_repositories = []
125 skipped_repositories = []
126 errored_repositories = []
127 counter = 0
128
129 # Check repos for invalid keys
130 for repo in repositories:
131 for key in repo.keys():
132 if key not in VALID_KEYS and key != 'revisions':
133 if log:
134 log.warning("'{0}' not a valid key. Will be skipped during parsing".format(key))
135
136 # Start by flattening the repo list per revision
137 flattened_repos = flatten_repo_info(repositories)
138 total_num_repositories = len(flattened_repos)
139
140 # Complete the repo information, and make sure each repository has a revision
141 repository_list = []
142 for repository in flattened_repos:
143 start = dt.datetime.now()
144 try:
145 complete_repo = complete_repo_information(
146 repository,
147 default_toolshed_url=default_toolshed,
148 require_tool_panel_info=True,
149 default_install_tool_dependencies=default_install_tool_dependencies,
150 default_install_resolver_dependencies=default_install_resolver_dependencies,
151 default_install_repository_dependencies=default_install_repository_dependencies,
152 force_latest_revision=force_latest_revision)
153 repository_list.append(complete_repo)
154 except Exception as e:
155 # We'll run through the loop come whatever may, we log the errored repositories at the end anyway.
156 if log:
157 log_repository_install_error(repository, start, unicodify(e), log)
158 errored_repositories.append(repository)
159
160 # Filter out already installed repos
161 filtered_repos = self.filter_installed_repos(repository_list)
162
163 for skipped_repo in filtered_repos.already_installed_repos:
164 counter += 1
165 if log:
166 log_repository_install_skip(skipped_repo, counter, total_num_repositories, log)
167 skipped_repositories.append(skipped_repo)
168
169 # Install repos
170 for repository in filtered_repos.not_installed_repos:
171 counter += 1
172 if log:
173 log_repository_install_start(repository, counter=counter, installation_start=installation_start, log=log,
174 total_num_repositories=total_num_repositories)
175 result = self.install_repository_revision(repository, log)
176 if result == "error":
177 errored_repositories.append(repository)
178 elif result == "skipped":
179 skipped_repositories.append(repository)
180 elif result == "installed":
181 installed_repositories.append(repository)
182
183 # Log results
184 if log:
185 log.info("Installed repositories ({0}): {1}".format(
186 len(installed_repositories),
187 [(
188 t['name'],
189 t.get('changeset_revision')
190 ) for t in installed_repositories])
191 )
192 log.info("Skipped repositories ({0}): {1}".format(
193 len(skipped_repositories),
194 [(
195 t['name'],
196 t.get('changeset_revision')
197 ) for t in skipped_repositories])
198 )
199 log.info("Errored repositories ({0}): {1}".format(
200 len(errored_repositories),
201 [(
202 t['name'],
203 t.get('changeset_revision', "")
204 ) for t in errored_repositories])
205 )
206 log.info("All repositories have been installed.")
207 log.info("Total run time: {0}".format(dt.datetime.now() - installation_start))
208 InstallResults = namedtuple("InstallResults",
209 ["installed_repositories", "errored_repositories", "skipped_repositories"])
210 return InstallResults(installed_repositories=installed_repositories,
211 skipped_repositories=skipped_repositories,
212 errored_repositories=errored_repositories)
213
214 def update_repositories(self, repositories=None, log=None, **kwargs):
215 if not repositories: # Repositories None or empty list
216 repositories = self.installed_repositories()
217 else:
218 filtered_repos = self.filter_installed_repos(repositories, check_revision=False)
219 if filtered_repos.not_installed_repos:
220 if log:
221 log.warning("The following tools are not installed and will not be upgraded: {0}".format(
222 filtered_repos.not_installed_repos))
223 repositories = filtered_repos.already_installed_repos
224 return self.install_repositories(repositories, force_latest_revision=True, log=log, **kwargs)
225
226 def test_tools(self,
227 test_json,
228 repositories=None,
229 log=None,
230 test_user_api_key=None,
231 test_user="ephemeris@galaxyproject.org",
232 parallel_tests=1,
233 ):
234 """Run tool tests for all tools in each repository in supplied tool list or ``self.installed_repositories()``.
235 """
236 tool_test_start = dt.datetime.now()
237 tests_passed = []
238 test_exceptions = []
239
240 if not repositories: # If repositories is None or empty list
241 # Consider a variant of this that doesn't even consume a tool list YAML? target
242 # something like installed_repository_revisions(self.gi)
243 repositories = self.installed_repositories()
244
245 target_repositories = flatten_repo_info(repositories)
246
247 installed_tools = []
248 for target_repository in target_repositories:
249 repo_tools = tools_for_repository(self.gi, target_repository)
250 installed_tools.extend(repo_tools)
251
252 all_test_results = []
253 galaxy_interactor = self._get_interactor(test_user, test_user_api_key)
254 test_history = galaxy_interactor.new_history()
255
256 with ThreadPoolExecutor(max_workers=parallel_tests) as executor:
257 try:
258 for tool in installed_tools:
259 self._test_tool(executor=executor,
260 tool=tool,
261 galaxy_interactor=galaxy_interactor,
262 test_history=test_history,
263 log=log,
264 tool_test_results=all_test_results,
265 tests_passed=tests_passed,
266 test_exceptions=test_exceptions,
267 )
268 finally:
269 # Always write report, even if test was cancelled.
270 try:
271 executor.shutdown(wait=True)
272 except KeyboardInterrupt:
273 executor._threads.clear()
274 thread._threads_queues.clear()
275 n_passed = len(tests_passed)
276 n_failed = len(test_exceptions)
277 report_obj = {
278 'version': '0.1',
279 'suitename': 'Ephemeris tool tests targeting %s' % self.gi.base_url,
280 'results': {
281 'total': n_passed + n_failed,
282 'errors': n_failed,
283 'failures': 0,
284 'skips': 0,
285 },
286 'tests': sorted(all_test_results, key=lambda el: el['id']),
287 }
288 with open(test_json, "w") as f:
289 json.dump(report_obj, f)
290 if log:
291 log.info("Report written to '%s'", os.path.abspath(test_json))
292 log.info("Passed tool tests ({0}): {1}".format(
293 n_passed,
294 [t for t in tests_passed])
295 )
296 log.info("Failed tool tests ({0}): {1}".format(
297 n_failed,
298 [t[0] for t in test_exceptions])
299 )
300 log.info("Total tool test time: {0}".format(dt.datetime.now() - tool_test_start))
301
302 def _get_interactor(self, test_user, test_user_api_key):
303 if test_user_api_key is None:
304 whoami = self.gi.make_get_request(self.gi.url + "/whoami").json()
305 if whoami is not None:
306 test_user_api_key = self.gi.key
307 galaxy_interactor_kwds = {
308 "galaxy_url": re.sub('/api', '', self.gi.url),
309 "master_api_key": self.gi.key,
310 "api_key": test_user_api_key, # TODO
311 "keep_outputs_dir": '',
312 }
313 if test_user_api_key is None:
314 galaxy_interactor_kwds["test_user"] = test_user
315 galaxy_interactor = GalaxyInteractorApi(**galaxy_interactor_kwds)
316 return galaxy_interactor
317
318 @staticmethod
319 def _test_tool(executor,
320 tool,
321 galaxy_interactor,
322 tool_test_results,
323 tests_passed,
324 test_exceptions,
325 log,
326 test_history=None,
327 ):
328 if test_history is None:
329 test_history = galaxy_interactor.new_history()
330 tool_id = tool["id"]
331 tool_version = tool["version"]
332 try:
333 tool_test_dicts = galaxy_interactor.get_tool_tests(tool_id, tool_version=tool_version)
334 except Exception as e:
335 if log:
336 log.warning("Fetching test definition for tool '%s' failed", tool_id, exc_info=True)
337 test_exceptions.append((tool_id, e))
338 Results = namedtuple("Results", ["tool_test_results", "tests_passed", "test_exceptions"])
339 return Results(tool_test_results=tool_test_results,
340 tests_passed=tests_passed,
341 test_exceptions=test_exceptions)
342 test_indices = list(range(len(tool_test_dicts)))
343
344 for test_index in test_indices:
345 test_id = tool_id + "-" + str(test_index)
346
347 def run_test(index, test_id):
348
349 def register(job_data):
350 tool_test_results.append({
351 'id': test_id,
352 'has_data': True,
353 'data': job_data,
354 })
355
356 try:
357 if log:
358 log.info("Executing test '%s'", test_id)
359 verify_tool(
360 tool_id, galaxy_interactor, test_index=index, tool_version=tool_version,
361 register_job_data=register, quiet=True, test_history=test_history,
362 )
363 tests_passed.append(test_id)
364 if log:
365 log.info("Test '%s' passed", test_id)
366 except Exception as e:
367 if log:
368 log.warning("Test '%s' failed", test_id, exc_info=True)
369 test_exceptions.append((test_id, e))
370
371 executor.submit(run_test, test_index, test_id)
372
373 def install_repository_revision(self, repository, log):
374 default_err_msg = ('All repositories that you are attempting to install '
375 'have been previously installed.')
376 start = dt.datetime.now()
377 try:
378 repository['new_tool_panel_section_label'] = repository.pop('tool_panel_section_label')
379 response = self.tool_shed_client.install_repository_revision(**repository)
380 if isinstance(response, dict) and response.get('status', None) == 'ok':
381 # This rare case happens if a repository is already installed but
382 # was not recognised as such in the above check. In such a
383 # case the return value looks like this:
384 # {u'status': u'ok', u'message': u'No repositories were
385 # installed, possibly because the selected repository has
386 # already been installed.'}
387 if log:
388 log.debug("\tRepository {0} is already installed.".format(repository['name']))
389 if log:
390 log_repository_install_success(
391 repository=repository,
392 start=start,
393 log=log)
394 return "installed"
395 except (ConnectionError, requests.exceptions.ConnectionError) as e:
396 if default_err_msg in unicodify(e):
397 # THIS SHOULD NOT HAPPEN DUE TO THE CHECKS EARLIER
398 if log:
399 log.debug("\tRepository %s already installed (at revision %s)" %
400 (repository['name'], repository['changeset_revision']))
401 return "skipped"
402 elif "504" in unicodify(e) or 'Connection aborted' in unicodify(e):
403 if log:
404 log.debug("Timeout during install of %s, extending wait to 1h", repository['name'])
405 success = self.wait_for_install(repository=repository, log=log, timeout=3600)
406 if success:
407 if log:
408 log_repository_install_success(
409 repository=repository,
410 start=start,
411 log=log)
412 return "installed"
413 else:
414 if log:
415 log_repository_install_error(
416 repository=repository,
417 start=start, msg=e.body,
418 log=log)
419 return "error"
420 else:
421 if log:
422 log_repository_install_error(
423 repository=repository,
424 start=start, msg=e.body,
425 log=log)
426 return "error"
427
428 def wait_for_install(self, repository, log=None, timeout=3600):
429 """
430 If nginx times out, we look into the list of installed repositories
431 and try to determine if a repository of the same namer/owner is still installing.
432 Returns True if install finished successfully,
433 returns False when timeout is exceeded or installation has failed.
434 """
435 # We request a repository revision, but Galaxy may decide to install the next downloable revision.
436 # This ensures we have a revision to track, and if not, finds the revision that is actually being installed
437 name = repository['name']
438 owner = repository['owner']
439 changeset_revision = repository['changeset_revision']
440 installed_repos = self.tool_shed_client.get_repositories()
441 non_terminal = [r for r in installed_repos if r['name'] == name and r['owner'] == owner and r['status'] in NON_TERMINAL_REPOSITORY_STATES]
442 assert len(non_terminal) > 0, "Repository '%s' from owner '%s' not in currently installling Repositories" % (name, owner)
443 installing_repo_id = None
444 if len(non_terminal) == 1:
445 # Unambiguous, we wait for this repo
446 installing_repo_id = non_terminal[0]['id']
447 if len(non_terminal) > 1:
448 # More than one repo with the requested name and owner in installing status.
449 # If any repo is of the requested changeset revision we wait for this repo.
450 for installing_repo in non_terminal:
451 if installing_repo['changeset_revision'] == changeset_revision:
452 installing_repo_id = installing_repo['id']
453 break
454 if not installing_repo_id:
455 # We may have a repo that is permanently in a non-terminal state (e.g because of restart during installation).
456 # Raise an exception and continue with the remaining repos.
457 msg = "Multiple repositories for name '%s', owner '%s' found in non-terminal states. Please uninstall all non-terminal repositories."
458 raise AssertionError(msg % (name, owner))
459 start = dt.datetime.now()
460 while (dt.datetime.now() - start) < dt.timedelta(seconds=timeout):
461 try:
462 installed_repo = self.tool_shed_client.show_repository(installing_repo_id)
463 status = installed_repo['status']
464 if status == 'Installed':
465 return True
466 elif status == 'Error':
467 return False
468 elif status in NON_TERMINAL_REPOSITORY_STATES:
469 time.sleep(10)
470 else:
471 raise AssertionError("Repository name '%s', owner '%s' in unknown status '%s'" % (name, owner, status))
472 except ConnectionError as e:
473 if log:
474 log.warning('Failed to get repositories list: %s', unicodify(e))
475 time.sleep(10)
476 return False
477
478
479 def log_repository_install_error(repository, start, msg, log):
480 """
481 Log failed repository installations. Return a dictionary with information
482 """
483 end = dt.datetime.now()
484 log.error(
485 "\t* Error installing a repository (after %s seconds)! Name: %s," "owner: %s, ""revision: %s, error: %s",
486 str(end - start),
487 repository.get('name', ""),
488 repository.get('owner', ""),
489 repository.get('changeset_revision', ""),
490 msg)
491
492
493 def log_repository_install_success(repository, start, log):
494 """
495 Log successful repository installation.
496 Repositories that finish in error still count as successful installs currently.
497 """
498 end = dt.datetime.now()
499 log.debug(
500 "\trepository %s installed successfully (in %s) at revision %s" % (
501 repository['name'],
502 str(end - start),
503 repository['changeset_revision']
504 )
505 )
506
507
508 def log_repository_install_skip(repository, counter, total_num_repositories, log):
509 log.debug(
510 "({0}/{1}) repository {2} already installed at revision {3}. Skipping."
511 .format(
512 counter,
513 total_num_repositories,
514 repository['name'],
515 repository['changeset_revision']
516 )
517 )
518
519
520 def log_repository_install_start(repository, counter, total_num_repositories, installation_start, log):
521 log.debug(
522 '(%s/%s) Installing repository %s from %s to section "%s" at revision %s (TRT: %s)' % (
523 counter, total_num_repositories,
524 repository['name'],
525 repository['owner'],
526 repository['tool_panel_section_id'] or repository['tool_panel_section_label'],
527 repository['changeset_revision'],
528 dt.datetime.now() - installation_start
529 )
530 )
531
532
533 def args_to_repos(args):
534 if args.tool_list_file:
535 tool_list = load_yaml_file(args.tool_list_file)
536 repos = tool_list['tools']
537 elif args.tool_yaml:
538 repos = [yaml.safe_load(args.tool_yaml)]
539 elif args.name and args.owner:
540 repo = dict(
541 owner=args.owner,
542 name=args.name,
543 tool_panel_section_id=args.tool_panel_section_id,
544 tool_panel_section_label=args.tool_panel_section_label,
545 revisions=args.revisions
546 )
547 if args.tool_shed_url:
548 repo["tool_shed_url"] = args.tool_shed_url
549 repos = [repo]
550 else:
551 repos = []
552 return repos
553
554
555 def main():
556 disable_external_library_logging()
557 args = parser().parse_args()
558 log = setup_global_logger(name=__name__, log_file=args.log_file)
559 gi = get_galaxy_connection(args, file=args.tool_list_file, log=log, login_required=True)
560 install_repository_manager = InstallRepositoryManager(gi)
561
562 repos = args_to_repos(args)
563
564 if args.tool_list_file:
565 tool_list = load_yaml_file(args.tool_list_file)
566 else:
567 tool_list = dict()
568
569 # Get some of the other installation arguments
570 kwargs = dict(
571 default_install_tool_dependencies=tool_list.get("install_tool_dependencies") or getattr(args,
572 "install_tool_dependencies",
573 False),
574 default_install_repository_dependencies=tool_list.get("install_repository_dependencies") or getattr(args,
575 "install_repository_dependencies",
576 False),
577 default_install_resolver_dependencies=tool_list.get("install_resolver_dependencies") or getattr(args,
578 "install_resolver_dependencies",
579 False))
580
581 # Start installing/updating and store the results in install_results.
582 # Or do testing if the action is `test`
583 install_results = None
584 if args.action == "update":
585 install_results = install_repository_manager.update_repositories(
586 repositories=repos,
587 log=log,
588 **kwargs)
589 elif args.action == "install":
590 install_results = install_repository_manager.install_repositories(
591 repos,
592 log=log,
593 force_latest_revision=args.force_latest_revision,
594 **kwargs)
595 elif args.action == "test":
596 install_repository_manager.test_tools(
597 test_json=args.test_json,
598 repositories=repos,
599 log=log,
600 test_user_api_key=args.test_user_api_key,
601 test_user=args.test_user,
602 parallel_tests=args.parallel_tests,
603 )
604 else:
605 raise NotImplementedError("This point in the code should not be reached. Please contact the developers.")
606
607 # Run tests on the install results if required.
608 if install_results and args.test or args.test_existing:
609 to_be_tested_repositories = install_results.installed_repositories
610 if args.test_existing:
611 to_be_tested_repositories.extend(install_results.skipped_repositories)
612 if to_be_tested_repositories:
613 install_repository_manager.test_tools(
614 test_json=args.test_json,
615 repositories=to_be_tested_repositories,
616 log=log,
617 test_user_api_key=args.test_user_api_key,
618 test_user=args.test_user,
619 parallel_tests=args.parallel_tests,
620 )
621
622
623 if __name__ == "__main__":
624 main()