view env/lib/python3.9/site-packages/ephemeris/shed_tools.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line source

"""
A tool to automate installation of tool repositories from a Galaxy Tool Shed
into an instance of Galaxy.

Shed-tools has three commands: update, test and install.

Update simply updates all the tools in a Galaxy given connection details on the command line.

Test tests the specified tools in the Galaxy Instance.

Install allows installation of tools in multiple ways.
Galaxy instance details and the installed tools can be provided in one of three
ways:

1. In the YAML format via dedicated files (a sample can be found
   `here <https://github.com/galaxyproject/ansible-galaxy-tools/blob/master/files/tool_list.yaml.sample>`_).
2. On the command line as dedicated script options (see the usage help).
3. As a single composite parameter to the script. The parameter must be a
   single, YAML-formatted string with the keys corresponding to the keys
   available for use in the YAML formatted file (for example:
   `--yaml_tool "{'owner': 'kellrott', 'tool_shed_url':
   'https://testtoolshed.g2.bx.psu.edu', 'tool_panel_section_id':
   'peak_calling', 'name': 'synapse_interface'}"`).

Only one of the methods can be used with each invocation of the script but if
more than one are provided are provided, precedence will correspond to order
of the items in the list above.
When installing tools, Galaxy expects any `tool_panel_section_id` provided when
installing a tool to already exist in the configuration. If the section
does not exist, the tool will be installed outside any section. See
`shed_tool_conf.xml.sample` in this directory for a sample of such file. Before
running this script to install the tools, make sure to place such file into
Galaxy's configuration directory and set Galaxy configuration option
`tool_config_file` to include it.
"""
import datetime as dt
import json
import os
import re
import time
from collections import namedtuple
from concurrent.futures import thread, ThreadPoolExecutor

import requests
import yaml
from bioblend.galaxy.client import ConnectionError
from bioblend.galaxy.toolshed import ToolShedClient
from galaxy.tool_util.verify.interactor import (
    GalaxyInteractorApi,
    verify_tool,
)
from galaxy.util import unicodify

from . import get_galaxy_connection, load_yaml_file
from .ephemeris_log import disable_external_library_logging, setup_global_logger
from .get_tool_list_from_galaxy import GiToToolYaml, the_same_repository, tools_for_repository
from .shed_tools_args import parser
from .shed_tools_methods import complete_repo_information, flatten_repo_info, VALID_KEYS

NON_TERMINAL_REPOSITORY_STATES = {
    'New',
    'Cloning',
    'Setting tool versions',
    'Installing repository dependencies',
    'Installing tool dependencies',
    'Loading proprietary datatypes'
}


class InstallRepositoryManager(object):
    """Manages the installation of new repositories on a galaxy instance"""

    def __init__(self,
                 galaxy_instance):
        """Initialize a new tool manager"""
        self.gi = galaxy_instance
        self.tool_shed_client = ToolShedClient(self.gi)

    def installed_repositories(self):
        """Get currently installed tools"""
        return GiToToolYaml(
            gi=self.gi,
            skip_tool_panel_section_name=False,
            get_data_managers=True,
            get_all_tools=True
        ).tool_list.get("tools")

    def filter_installed_repos(self, repos, check_revision=True):
        # TODO: Find a speedier algorithm.
        """This filters a list of repositories"""
        not_installed_repos = []
        already_installed_repos = []
        if check_revision:
            # If we want to check if revisions are equal, flatten the list,
            # so each repository - revision combination has its own entry
            installed_repos = flatten_repo_info(self.installed_repositories())
        else:
            # If we do not care about revision equality, do not do the flatten
            # action to limit the number of comparisons.
            installed_repos = self.installed_repositories()

        for repo in repos:
            for installed_repo in installed_repos:
                if the_same_repository(installed_repo, repo, check_revision):
                    already_installed_repos.append(repo)
                    break
            else:  # This executes when the for loop completes and no match has been found.
                not_installed_repos.append(repo)
        FilterResults = namedtuple("FilterResults", ["not_installed_repos", "already_installed_repos"])
        return FilterResults(already_installed_repos=already_installed_repos, not_installed_repos=not_installed_repos)

    def install_repositories(self,
                             repositories,
                             log=None,
                             force_latest_revision=False,
                             default_toolshed='https://toolshed.g2.bx.psu.edu/',
                             default_install_tool_dependencies=False,
                             default_install_resolver_dependencies=True,
                             default_install_repository_dependencies=True):
        """Install a list of tools on the current galaxy"""
        if not repositories:
            raise ValueError("Empty list of tools was given")
        installation_start = dt.datetime.now()
        installed_repositories = []
        skipped_repositories = []
        errored_repositories = []
        counter = 0

        # Check repos for invalid keys
        for repo in repositories:
            for key in repo.keys():
                if key not in VALID_KEYS and key != 'revisions':
                    if log:
                        log.warning("'{0}' not a valid key. Will be skipped during parsing".format(key))

        # Start by flattening the repo list per revision
        flattened_repos = flatten_repo_info(repositories)
        total_num_repositories = len(flattened_repos)

        # Complete the repo information, and make sure each repository has a revision
        repository_list = []
        for repository in flattened_repos:
            start = dt.datetime.now()
            try:
                complete_repo = complete_repo_information(
                    repository,
                    default_toolshed_url=default_toolshed,
                    require_tool_panel_info=True,
                    default_install_tool_dependencies=default_install_tool_dependencies,
                    default_install_resolver_dependencies=default_install_resolver_dependencies,
                    default_install_repository_dependencies=default_install_repository_dependencies,
                    force_latest_revision=force_latest_revision)
                repository_list.append(complete_repo)
            except Exception as e:
                # We'll run through the loop come whatever may, we log the errored repositories at the end anyway.
                if log:
                    log_repository_install_error(repository, start, unicodify(e), log)
                errored_repositories.append(repository)

        # Filter out already installed repos
        filtered_repos = self.filter_installed_repos(repository_list)

        for skipped_repo in filtered_repos.already_installed_repos:
            counter += 1
            if log:
                log_repository_install_skip(skipped_repo, counter, total_num_repositories, log)
            skipped_repositories.append(skipped_repo)

        # Install repos
        for repository in filtered_repos.not_installed_repos:
            counter += 1
            if log:
                log_repository_install_start(repository, counter=counter, installation_start=installation_start, log=log,
                                             total_num_repositories=total_num_repositories)
            result = self.install_repository_revision(repository, log)
            if result == "error":
                errored_repositories.append(repository)
            elif result == "skipped":
                skipped_repositories.append(repository)
            elif result == "installed":
                installed_repositories.append(repository)

        # Log results
        if log:
            log.info("Installed repositories ({0}): {1}".format(
                len(installed_repositories),
                [(
                    t['name'],
                    t.get('changeset_revision')
                ) for t in installed_repositories])
            )
            log.info("Skipped repositories ({0}): {1}".format(
                len(skipped_repositories),
                [(
                    t['name'],
                    t.get('changeset_revision')
                ) for t in skipped_repositories])
            )
            log.info("Errored repositories ({0}): {1}".format(
                len(errored_repositories),
                [(
                    t['name'],
                    t.get('changeset_revision', "")
                ) for t in errored_repositories])
            )
            log.info("All repositories have been installed.")
            log.info("Total run time: {0}".format(dt.datetime.now() - installation_start))
        InstallResults = namedtuple("InstallResults",
                                    ["installed_repositories", "errored_repositories", "skipped_repositories"])
        return InstallResults(installed_repositories=installed_repositories,
                              skipped_repositories=skipped_repositories,
                              errored_repositories=errored_repositories)

    def update_repositories(self, repositories=None, log=None, **kwargs):
        if not repositories:  # Repositories None or empty list
            repositories = self.installed_repositories()
        else:
            filtered_repos = self.filter_installed_repos(repositories, check_revision=False)
            if filtered_repos.not_installed_repos:
                if log:
                    log.warning("The following tools are not installed and will not be upgraded: {0}".format(
                        filtered_repos.not_installed_repos))
            repositories = filtered_repos.already_installed_repos
        return self.install_repositories(repositories, force_latest_revision=True, log=log, **kwargs)

    def test_tools(self,
                   test_json,
                   repositories=None,
                   log=None,
                   test_user_api_key=None,
                   test_user="ephemeris@galaxyproject.org",
                   parallel_tests=1,
                   ):
        """Run tool tests for all tools in each repository in supplied tool list or ``self.installed_repositories()``.
        """
        tool_test_start = dt.datetime.now()
        tests_passed = []
        test_exceptions = []

        if not repositories:  # If repositories is None or empty list
            # Consider a variant of this that doesn't even consume a tool list YAML? target
            # something like installed_repository_revisions(self.gi)
            repositories = self.installed_repositories()

        target_repositories = flatten_repo_info(repositories)

        installed_tools = []
        for target_repository in target_repositories:
            repo_tools = tools_for_repository(self.gi, target_repository)
            installed_tools.extend(repo_tools)

        all_test_results = []
        galaxy_interactor = self._get_interactor(test_user, test_user_api_key)
        test_history = galaxy_interactor.new_history()

        with ThreadPoolExecutor(max_workers=parallel_tests) as executor:
            try:
                for tool in installed_tools:
                    self._test_tool(executor=executor,
                                    tool=tool,
                                    galaxy_interactor=galaxy_interactor,
                                    test_history=test_history,
                                    log=log,
                                    tool_test_results=all_test_results,
                                    tests_passed=tests_passed,
                                    test_exceptions=test_exceptions,
                                    )
            finally:
                # Always write report, even if test was cancelled.
                try:
                    executor.shutdown(wait=True)
                except KeyboardInterrupt:
                    executor._threads.clear()
                    thread._threads_queues.clear()
                n_passed = len(tests_passed)
                n_failed = len(test_exceptions)
                report_obj = {
                    'version': '0.1',
                    'suitename': 'Ephemeris tool tests targeting %s' % self.gi.base_url,
                    'results': {
                        'total': n_passed + n_failed,
                        'errors': n_failed,
                        'failures': 0,
                        'skips': 0,
                    },
                    'tests': sorted(all_test_results, key=lambda el: el['id']),
                }
                with open(test_json, "w") as f:
                    json.dump(report_obj, f)
                if log:
                    log.info("Report written to '%s'", os.path.abspath(test_json))
                    log.info("Passed tool tests ({0}): {1}".format(
                        n_passed,
                        [t for t in tests_passed])
                    )
                    log.info("Failed tool tests ({0}): {1}".format(
                        n_failed,
                        [t[0] for t in test_exceptions])
                    )
                    log.info("Total tool test time: {0}".format(dt.datetime.now() - tool_test_start))

    def _get_interactor(self, test_user, test_user_api_key):
        if test_user_api_key is None:
            whoami = self.gi.make_get_request(self.gi.url + "/whoami").json()
            if whoami is not None:
                test_user_api_key = self.gi.key
        galaxy_interactor_kwds = {
            "galaxy_url": re.sub('/api', '', self.gi.url),
            "master_api_key": self.gi.key,
            "api_key": test_user_api_key,  # TODO
            "keep_outputs_dir": '',
        }
        if test_user_api_key is None:
            galaxy_interactor_kwds["test_user"] = test_user
        galaxy_interactor = GalaxyInteractorApi(**galaxy_interactor_kwds)
        return galaxy_interactor

    @staticmethod
    def _test_tool(executor,
                   tool,
                   galaxy_interactor,
                   tool_test_results,
                   tests_passed,
                   test_exceptions,
                   log,
                   test_history=None,
                   ):
        if test_history is None:
            test_history = galaxy_interactor.new_history()
        tool_id = tool["id"]
        tool_version = tool["version"]
        try:
            tool_test_dicts = galaxy_interactor.get_tool_tests(tool_id, tool_version=tool_version)
        except Exception as e:
            if log:
                log.warning("Fetching test definition for tool '%s' failed", tool_id, exc_info=True)
            test_exceptions.append((tool_id, e))
            Results = namedtuple("Results", ["tool_test_results", "tests_passed", "test_exceptions"])
            return Results(tool_test_results=tool_test_results,
                           tests_passed=tests_passed,
                           test_exceptions=test_exceptions)
        test_indices = list(range(len(tool_test_dicts)))

        for test_index in test_indices:
            test_id = tool_id + "-" + str(test_index)

            def run_test(index, test_id):

                def register(job_data):
                    tool_test_results.append({
                        'id': test_id,
                        'has_data': True,
                        'data': job_data,
                    })

                try:
                    if log:
                        log.info("Executing test '%s'", test_id)
                    verify_tool(
                        tool_id, galaxy_interactor, test_index=index, tool_version=tool_version,
                        register_job_data=register, quiet=True, test_history=test_history,
                    )
                    tests_passed.append(test_id)
                    if log:
                        log.info("Test '%s' passed", test_id)
                except Exception as e:
                    if log:
                        log.warning("Test '%s' failed", test_id, exc_info=True)
                    test_exceptions.append((test_id, e))

            executor.submit(run_test, test_index, test_id)

    def install_repository_revision(self, repository, log):
        default_err_msg = ('All repositories that you are attempting to install '
                           'have been previously installed.')
        start = dt.datetime.now()
        try:
            repository['new_tool_panel_section_label'] = repository.pop('tool_panel_section_label')
            response = self.tool_shed_client.install_repository_revision(**repository)
            if isinstance(response, dict) and response.get('status', None) == 'ok':
                # This rare case happens if a repository is already installed but
                # was not recognised as such in the above check. In such a
                # case the return value looks like this:
                # {u'status': u'ok', u'message': u'No repositories were
                #  installed, possibly because the selected repository has
                #  already been installed.'}
                if log:
                    log.debug("\tRepository {0} is already installed.".format(repository['name']))
            if log:
                log_repository_install_success(
                    repository=repository,
                    start=start,
                    log=log)
            return "installed"
        except (ConnectionError, requests.exceptions.ConnectionError) as e:
            if default_err_msg in unicodify(e):
                # THIS SHOULD NOT HAPPEN DUE TO THE CHECKS EARLIER
                if log:
                    log.debug("\tRepository %s already installed (at revision %s)" %
                              (repository['name'], repository['changeset_revision']))
                return "skipped"
            elif "504" in unicodify(e) or 'Connection aborted' in unicodify(e):
                if log:
                    log.debug("Timeout during install of %s, extending wait to 1h", repository['name'])
                success = self.wait_for_install(repository=repository, log=log, timeout=3600)
                if success:
                    if log:
                        log_repository_install_success(
                            repository=repository,
                            start=start,
                            log=log)
                    return "installed"
                else:
                    if log:
                        log_repository_install_error(
                            repository=repository,
                            start=start, msg=e.body,
                            log=log)
                    return "error"
            else:
                if log:
                    log_repository_install_error(
                        repository=repository,
                        start=start, msg=e.body,
                        log=log)
                return "error"

    def wait_for_install(self, repository, log=None, timeout=3600):
        """
        If nginx times out, we look into the list of installed repositories
        and try to determine if a repository of the same namer/owner is still installing.
        Returns True if install finished successfully,
        returns False when timeout is exceeded or installation has failed.
        """
        # We request a repository revision, but Galaxy may decide to install the next downloable revision.
        # This ensures we have a revision to track, and if not, finds the revision that is actually being installed
        name = repository['name']
        owner = repository['owner']
        changeset_revision = repository['changeset_revision']
        installed_repos = self.tool_shed_client.get_repositories()
        non_terminal = [r for r in installed_repos if r['name'] == name and r['owner'] == owner and r['status'] in NON_TERMINAL_REPOSITORY_STATES]
        assert len(non_terminal) > 0, "Repository '%s' from owner '%s' not in currently installling Repositories" % (name, owner)
        installing_repo_id = None
        if len(non_terminal) == 1:
            # Unambiguous, we wait for this repo
            installing_repo_id = non_terminal[0]['id']
        if len(non_terminal) > 1:
            # More than one repo with the requested name and owner in installing status.
            # If any repo is of the requested changeset revision we wait for this repo.
            for installing_repo in non_terminal:
                if installing_repo['changeset_revision'] == changeset_revision:
                    installing_repo_id = installing_repo['id']
                    break
        if not installing_repo_id:
            # We may have a repo that is permanently in a non-terminal state (e.g because of restart during installation).
            # Raise an exception and continue with the remaining repos.
            msg = "Multiple repositories for name '%s', owner '%s' found in non-terminal states. Please uninstall all non-terminal repositories."
            raise AssertionError(msg % (name, owner))
        start = dt.datetime.now()
        while (dt.datetime.now() - start) < dt.timedelta(seconds=timeout):
            try:
                installed_repo = self.tool_shed_client.show_repository(installing_repo_id)
                status = installed_repo['status']
                if status == 'Installed':
                    return True
                elif status == 'Error':
                    return False
                elif status in NON_TERMINAL_REPOSITORY_STATES:
                    time.sleep(10)
                else:
                    raise AssertionError("Repository name '%s', owner '%s' in unknown status '%s'" % (name, owner, status))
            except ConnectionError as e:
                if log:
                    log.warning('Failed to get repositories list: %s', unicodify(e))
                time.sleep(10)
        return False


def log_repository_install_error(repository, start, msg, log):
    """
    Log failed repository installations. Return a dictionary with information
    """
    end = dt.datetime.now()
    log.error(
        "\t* Error installing a repository (after %s seconds)! Name: %s," "owner: %s, ""revision: %s, error: %s",
        str(end - start),
        repository.get('name', ""),
        repository.get('owner', ""),
        repository.get('changeset_revision', ""),
        msg)


def log_repository_install_success(repository, start, log):
    """
    Log successful repository installation.
    Repositories that finish in error still count as successful installs currently.
    """
    end = dt.datetime.now()
    log.debug(
        "\trepository %s installed successfully (in %s) at revision %s" % (
            repository['name'],
            str(end - start),
            repository['changeset_revision']
        )
    )


def log_repository_install_skip(repository, counter, total_num_repositories, log):
    log.debug(
        "({0}/{1}) repository {2} already installed at revision {3}. Skipping."
        .format(
            counter,
            total_num_repositories,
            repository['name'],
            repository['changeset_revision']
        )
    )


def log_repository_install_start(repository, counter, total_num_repositories, installation_start, log):
    log.debug(
        '(%s/%s) Installing repository %s from %s to section "%s" at revision %s (TRT: %s)' % (
            counter, total_num_repositories,
            repository['name'],
            repository['owner'],
            repository['tool_panel_section_id'] or repository['tool_panel_section_label'],
            repository['changeset_revision'],
            dt.datetime.now() - installation_start
        )
    )


def args_to_repos(args):
    if args.tool_list_file:
        tool_list = load_yaml_file(args.tool_list_file)
        repos = tool_list['tools']
    elif args.tool_yaml:
        repos = [yaml.safe_load(args.tool_yaml)]
    elif args.name and args.owner:
        repo = dict(
            owner=args.owner,
            name=args.name,
            tool_panel_section_id=args.tool_panel_section_id,
            tool_panel_section_label=args.tool_panel_section_label,
            revisions=args.revisions
        )
        if args.tool_shed_url:
            repo["tool_shed_url"] = args.tool_shed_url
        repos = [repo]
    else:
        repos = []
    return repos


def main():
    disable_external_library_logging()
    args = parser().parse_args()
    log = setup_global_logger(name=__name__, log_file=args.log_file)
    gi = get_galaxy_connection(args, file=args.tool_list_file, log=log, login_required=True)
    install_repository_manager = InstallRepositoryManager(gi)

    repos = args_to_repos(args)

    if args.tool_list_file:
        tool_list = load_yaml_file(args.tool_list_file)
    else:
        tool_list = dict()

    # Get some of the other installation arguments
    kwargs = dict(
        default_install_tool_dependencies=tool_list.get("install_tool_dependencies") or getattr(args,
                                                                                                "install_tool_dependencies",
                                                                                                False),
        default_install_repository_dependencies=tool_list.get("install_repository_dependencies") or getattr(args,
                                                                                                            "install_repository_dependencies",
                                                                                                            False),
        default_install_resolver_dependencies=tool_list.get("install_resolver_dependencies") or getattr(args,
                                                                                                        "install_resolver_dependencies",
                                                                                                        False))

    # Start installing/updating and store the results in install_results.
    # Or do testing if the action is `test`
    install_results = None
    if args.action == "update":
        install_results = install_repository_manager.update_repositories(
            repositories=repos,
            log=log,
            **kwargs)
    elif args.action == "install":
        install_results = install_repository_manager.install_repositories(
            repos,
            log=log,
            force_latest_revision=args.force_latest_revision,
            **kwargs)
    elif args.action == "test":
        install_repository_manager.test_tools(
            test_json=args.test_json,
            repositories=repos,
            log=log,
            test_user_api_key=args.test_user_api_key,
            test_user=args.test_user,
            parallel_tests=args.parallel_tests,
        )
    else:
        raise NotImplementedError("This point in the code should not be reached. Please contact the developers.")

    # Run tests on the install results if required.
    if install_results and args.test or args.test_existing:
        to_be_tested_repositories = install_results.installed_repositories
        if args.test_existing:
            to_be_tested_repositories.extend(install_results.skipped_repositories)
        if to_be_tested_repositories:
            install_repository_manager.test_tools(
                test_json=args.test_json,
                repositories=to_be_tested_repositories,
                log=log,
                test_user_api_key=args.test_user_api_key,
                test_user=args.test_user,
                parallel_tests=args.parallel_tests,
            )


if __name__ == "__main__":
    main()