view env/lib/python3.9/site-packages/galaxy/util/tool_shed/common_util.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line source

import errno
import json
import logging
import os
from urllib.parse import urljoin

from routes import url_for

from galaxy import util
from galaxy.util.tool_shed import encoding_util, xml_util

log = logging.getLogger(__name__)

REPOSITORY_OWNER = 'devteam'
# not valid for installed Galaxy, fix
MIGRATE_DIR = os.path.abspath(os.path.join(
    os.path.dirname(__file__), os.pardir, os.pardir, os.pardir, 'galaxy', 'tool_shed', 'galaxy_install', 'migrate'))
TOOL_MIGRATION_SCRIPTS_DIR = os.path.join(MIGRATE_DIR, 'scripts')
TOOL_MIGRATION_VERSIONS_DIR = os.path.join(MIGRATE_DIR, 'versions')


def accumulate_tool_dependencies(tool_shed_accessible, tool_dependencies, all_tool_dependencies):
    if tool_shed_accessible:
        if tool_dependencies:
            for tool_dependency in tool_dependencies:
                if tool_dependency not in all_tool_dependencies:
                    all_tool_dependencies.append(tool_dependency)
    return all_tool_dependencies


def check_for_missing_tools(app, tool_panel_configs, latest_tool_migration_script_number):
    # Get the 000x_tools.xml file associated with the current migrate_tools version number.
    tools_xml_file_path = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                                       os.pardir, 'galaxy_install',
                                                       'migrate', 'scripts',
                                                       '%04d_tools.xml' % latest_tool_migration_script_number))
    # Parse the XML and load the file attributes for later checking against the proprietary tool_panel_config.
    migrated_tool_configs_dict = {}
    tree, error_message = xml_util.parse_xml(tools_xml_file_path)
    if tree is None:
        return False, {}
    root = tree.getroot()
    tool_shed = root.get('name')
    tool_shed_url = get_tool_shed_url_from_tool_shed_registry(app, tool_shed)
    # The default behavior is that the tool shed is down.
    tool_shed_accessible = False
    missing_tool_configs_dict = {}
    if tool_shed_url:
        for elem in root:
            if elem.tag == 'repository':
                repository_dependencies = []
                all_tool_dependencies = []
                repository_name = elem.get('name')
                changeset_revision = elem.get('changeset_revision')
                tool_shed_accessible, repository_dependencies_dict = get_repository_dependencies(app,
                                                                                                 tool_shed_url,
                                                                                                 repository_name,
                                                                                                 REPOSITORY_OWNER,
                                                                                                 changeset_revision)
                if tool_shed_accessible:
                    # Accumulate all tool dependencies defined for repository dependencies for display to the user.
                    for rd_key, rd_tups in repository_dependencies_dict.items():
                        if rd_key in ['root_key', 'description']:
                            continue
                        for rd_tup in rd_tups:
                            tool_shed, name, owner, changeset_revision, prior_installation_required, only_if_compiling_contained_td = \
                                parse_repository_dependency_tuple(rd_tup)
                        tool_shed_accessible, tool_dependencies = get_tool_dependencies(app,
                                                                                        tool_shed_url,
                                                                                        name,
                                                                                        owner,
                                                                                        changeset_revision)
                        all_tool_dependencies = accumulate_tool_dependencies(tool_shed_accessible, tool_dependencies, all_tool_dependencies)
                    tool_shed_accessible, tool_dependencies = get_tool_dependencies(app,
                                                                                    tool_shed_url,
                                                                                    repository_name,
                                                                                    REPOSITORY_OWNER,
                                                                                    changeset_revision)
                    all_tool_dependencies = accumulate_tool_dependencies(tool_shed_accessible, tool_dependencies, all_tool_dependencies)
                    for tool_elem in elem.findall('tool'):
                        tool_config_file_name = tool_elem.get('file')
                        if tool_config_file_name:
                            # We currently do nothing with repository dependencies except install them (we do not display repositories that will be
                            # installed to the user).  However, we'll store them in the following dictionary in case we choose to display them in the
                            # future.
                            dependencies_dict = dict(tool_dependencies=all_tool_dependencies,
                                                     repository_dependencies=repository_dependencies)
                            migrated_tool_configs_dict[tool_config_file_name] = dependencies_dict
                else:
                    break
        if tool_shed_accessible:
            # Parse the proprietary tool_panel_configs (the default is tool_conf.xml) and generate the list of missing tool config file names.
            for tool_panel_config in tool_panel_configs:
                tree, error_message = xml_util.parse_xml(tool_panel_config)
                if tree:
                    root = tree.getroot()
                    for elem in root:
                        if elem.tag == 'tool':
                            missing_tool_configs_dict = check_tool_tag_set(elem, migrated_tool_configs_dict, missing_tool_configs_dict)
                        elif elem.tag == 'section':
                            for section_elem in elem:
                                if section_elem.tag == 'tool':
                                    missing_tool_configs_dict = check_tool_tag_set(section_elem, migrated_tool_configs_dict, missing_tool_configs_dict)
    else:
        exception_msg = f'\n\nThe entry for the main Galaxy tool shed at {tool_shed} is missing from the {app.config.tool_sheds_config} file.  '
        exception_msg += 'The entry for this tool shed must always be available in this file, so re-add it before attempting to start your Galaxy server.\n'
        raise Exception(exception_msg)
    return tool_shed_accessible, missing_tool_configs_dict


def check_tool_tag_set(elem, migrated_tool_configs_dict, missing_tool_configs_dict):
    file_path = elem.get('file', None)
    if file_path:
        name = os.path.basename(file_path)
        for migrated_tool_config in migrated_tool_configs_dict.keys():
            if migrated_tool_config in [file_path, name]:
                missing_tool_configs_dict[name] = migrated_tool_configs_dict[migrated_tool_config]
    return missing_tool_configs_dict


def generate_clone_url_for_installed_repository(app, repository):
    """Generate the URL for cloning a repository that has been installed into a Galaxy instance."""
    tool_shed_url = get_tool_shed_url_from_tool_shed_registry(app, str(repository.tool_shed))
    return util.build_url(tool_shed_url, pathspec=['repos', str(repository.owner), str(repository.name)])


def generate_clone_url_for_repository_in_tool_shed(user, repository):
    """Generate the URL for cloning a repository that is in the tool shed."""
    base_url = url_for('/', qualified=True).rstrip('/')
    if user:
        protocol, base = base_url.split('://')
        username = '%s@' % user.username
        return f'{protocol}://{username}{base}/repos/{repository.user.username}/{repository.name}'
    else:
        return f'{base_url}/repos/{repository.user.username}/{repository.name}'


def generate_clone_url_from_repo_info_tup(app, repo_info_tup):
    """Generate the URL for cloning a repository given a tuple of toolshed, name, owner, changeset_revision."""
    # Example tuple: ['http://localhost:9009', 'blast_datatypes', 'test', '461a4216e8ab', False]
    toolshed, name, owner, changeset_revision, prior_installation_required, only_if_compiling_contained_td = \
        parse_repository_dependency_tuple(repo_info_tup)
    tool_shed_url = get_tool_shed_url_from_tool_shed_registry(app, toolshed)
    # Don't include the changeset_revision in clone urls.
    return util.build_url(tool_shed_url, pathspec=['repos', owner, name])


def get_non_shed_tool_panel_configs(app):
    """Get the non-shed related tool panel configs - there can be more than one, and the default is tool_conf.xml."""
    config_filenames = []
    for config_filename in app.config.tool_configs:
        # Any config file that includes a tool_path attribute in the root tag set like the following is shed-related.
        # <toolbox tool_path="database/shed_tools">
        try:
            tree, error_message = xml_util.parse_xml(config_filename)
        except OSError as exc:
            if (config_filename == app.config.shed_tool_conf and not
                    app.config.shed_tool_conf_set and
                    exc.errno == errno.ENOENT):
                continue
            raise
        if tree is None:
            continue
        root = tree.getroot()
        tool_path = root.get('tool_path', None)
        if tool_path is None:
            config_filenames.append(config_filename)
    return config_filenames


def get_repository_dependencies(app, tool_shed_url, repository_name, repository_owner, changeset_revision):
    repository_dependencies_dict = {}
    tool_shed_accessible = True
    params = dict(name=repository_name, owner=repository_owner, changeset_revision=changeset_revision)
    pathspec = ['repository', 'get_repository_dependencies']
    try:
        raw_text = util.url_get(tool_shed_url, auth=app.tool_shed_registry.url_auth(tool_shed_url), pathspec=pathspec, params=params)
        tool_shed_accessible = True
    except Exception as e:
        tool_shed_accessible = False
        log.warning("The URL\n%s\nraised the exception:\n%s\n", util.build_url(tool_shed_url, pathspec=pathspec, params=params), e)
    if tool_shed_accessible:
        if len(raw_text) > 2:
            encoded_text = json.loads(util.unicodify(raw_text))
            repository_dependencies_dict = encoding_util.tool_shed_decode(encoded_text)
    return tool_shed_accessible, repository_dependencies_dict


def get_protocol_from_tool_shed_url(tool_shed_url):
    """Return the protocol from the received tool_shed_url if it exists."""
    try:
        if tool_shed_url.find('://') > 0:
            return tool_shed_url.split('://')[0].lower()
    except Exception:
        # We receive a lot of calls here where the tool_shed_url is None.  The container_util uses
        # that value when creating a header row.  If the tool_shed_url is not None, we have a problem.
        if tool_shed_url is not None:
            log.exception("Handled exception getting the protocol from Tool Shed URL %s", str(tool_shed_url))
        # Default to HTTP protocol.
        return 'http'


def get_tool_dependencies(app, tool_shed_url, repository_name, repository_owner, changeset_revision):
    tool_dependencies = []
    tool_shed_accessible = True
    params = dict(name=repository_name, owner=repository_owner, changeset_revision=changeset_revision)
    pathspec = ['repository', 'get_tool_dependencies']
    try:
        text = util.url_get(tool_shed_url, auth=app.tool_shed_registry.url_auth(tool_shed_url), pathspec=pathspec, params=params)
        tool_shed_accessible = True
    except Exception as e:
        tool_shed_accessible = False
        log.warning("The URL\n%s\nraised the exception:\n%s\n", util.build_url(tool_shed_url, pathspec=pathspec, params=params), e)
    if tool_shed_accessible:
        if text:
            tool_dependencies_dict = encoding_util.tool_shed_decode(text)
            for requirements_dict in tool_dependencies_dict.values():
                tool_dependency_name = requirements_dict['name']
                tool_dependency_version = requirements_dict['version']
                tool_dependency_type = requirements_dict['type']
                tool_dependencies.append((tool_dependency_name, tool_dependency_version, tool_dependency_type))
    return tool_shed_accessible, tool_dependencies


def get_tool_shed_repository_ids(as_string=False, **kwd):
    tsrid = kwd.get('tool_shed_repository_id', None)
    tsridslist = util.listify(kwd.get('tool_shed_repository_ids', None))
    if not tsridslist:
        tsridslist = util.listify(kwd.get('id', None))
    if tsridslist is not None:
        if tsrid is not None and tsrid not in tsridslist:
            tsridslist.append(tsrid)
        if as_string:
            return ','.join(tsridslist)
        return tsridslist
    else:
        tsridslist = util.listify(kwd.get('ordered_tsr_ids', None))
        if tsridslist is not None:
            if as_string:
                return ','.join(tsridslist)
            return tsridslist
    if as_string:
        return ''
    return []


def get_tool_shed_url_from_tool_shed_registry(app, tool_shed):
    """
    The value of tool_shed is something like: toolshed.g2.bx.psu.edu.  We need the URL to this tool shed, which is
    something like: http://toolshed.g2.bx.psu.edu/
    """
    cleaned_tool_shed = remove_protocol_from_tool_shed_url(tool_shed)
    for shed_url in app.tool_shed_registry.tool_sheds.values():
        if shed_url.find(cleaned_tool_shed) >= 0:
            if shed_url.endswith('/'):
                shed_url = shed_url.rstrip('/')
            return shed_url
    # The tool shed from which the repository was originally installed must no longer be configured in tool_sheds_conf.xml.
    return None


def get_tool_shed_repository_url(app, tool_shed, owner, name):
    tool_shed_url = get_tool_shed_url_from_tool_shed_registry(app, tool_shed)
    if tool_shed_url:
        # Append a slash to the tool shed URL, because urlparse.urljoin will eliminate
        # the last part of a URL if it does not end with a forward slash.
        tool_shed_url = '%s/' % tool_shed_url
        return urljoin(tool_shed_url, f'view/{owner}/{name}')
    return tool_shed_url


def get_user_by_username(app, username):
    """Get a user from the database by username."""
    sa_session = app.model.context.current
    try:
        user = sa_session.query(app.model.User) \
                         .filter(app.model.User.table.c.username == username) \
                         .one()
        return user
    except Exception:
        return None


def handle_galaxy_url(trans, **kwd):
    galaxy_url = kwd.get('galaxy_url', None)
    if galaxy_url:
        trans.set_cookie(galaxy_url, name='toolshedgalaxyurl')
    else:
        galaxy_url = trans.get_cookie(name='toolshedgalaxyurl')
    return galaxy_url


def handle_tool_shed_url_protocol(app, shed_url):
    """Handle secure and insecure HTTP protocol since they may change over time."""
    try:
        if app.name == 'galaxy':
            url = remove_protocol_from_tool_shed_url(shed_url)
            tool_shed_url = get_tool_shed_url_from_tool_shed_registry(app, url)
        else:
            tool_shed_url = str(url_for('/', qualified=True)).rstrip('/')
        return tool_shed_url
    except Exception:
        # We receive a lot of calls here where the tool_shed_url is None.  The container_util uses
        # that value when creating a header row.  If the tool_shed_url is not None, we have a problem.
        if shed_url is not None:
            log.exception("Handled exception removing protocol from URL %s", str(shed_url))
        return shed_url


def parse_repository_dependency_tuple(repository_dependency_tuple, contains_error=False):
    # Default both prior_installation_required and only_if_compiling_contained_td to False in cases where metadata should be reset on the
    # repository containing the repository_dependency definition.
    prior_installation_required = 'False'
    only_if_compiling_contained_td = 'False'
    if contains_error:
        if len(repository_dependency_tuple) == 5:
            tool_shed, name, owner, changeset_revision, error = repository_dependency_tuple
        elif len(repository_dependency_tuple) == 6:
            tool_shed, name, owner, changeset_revision, prior_installation_required, error = repository_dependency_tuple
        elif len(repository_dependency_tuple) == 7:
            tool_shed, name, owner, changeset_revision, prior_installation_required, only_if_compiling_contained_td, error = \
                repository_dependency_tuple
        return tool_shed, name, owner, changeset_revision, prior_installation_required, only_if_compiling_contained_td, error
    else:
        if len(repository_dependency_tuple) == 4:
            tool_shed, name, owner, changeset_revision = repository_dependency_tuple
        elif len(repository_dependency_tuple) == 5:
            tool_shed, name, owner, changeset_revision, prior_installation_required = repository_dependency_tuple
        elif len(repository_dependency_tuple) == 6:
            tool_shed, name, owner, changeset_revision, prior_installation_required, only_if_compiling_contained_td = repository_dependency_tuple
        return tool_shed, name, owner, changeset_revision, prior_installation_required, only_if_compiling_contained_td


def remove_port_from_tool_shed_url(tool_shed_url):
    """Return a partial Tool Shed URL, eliminating the port if it exists."""
    try:
        if tool_shed_url.find(':') > 0:
            # Eliminate the port, if any, since it will result in an invalid directory name.
            new_tool_shed_url = tool_shed_url.split(':')[0]
        else:
            new_tool_shed_url = tool_shed_url
        return new_tool_shed_url.rstrip('/')
    except Exception:
        # We receive a lot of calls here where the tool_shed_url is None.  The container_util uses
        # that value when creating a header row.  If the tool_shed_url is not None, we have a problem.
        if tool_shed_url is not None:
            log.exception("Handled exception removing the port from Tool Shed URL %s", str(tool_shed_url))
        return tool_shed_url


def remove_protocol_and_port_from_tool_shed_url(tool_shed_url):
    """Return a partial Tool Shed URL, eliminating the protocol and/or port if either exists."""
    tool_shed = remove_protocol_from_tool_shed_url(tool_shed_url)
    tool_shed = remove_port_from_tool_shed_url(tool_shed)
    return tool_shed


def remove_protocol_and_user_from_clone_url(repository_clone_url):
    """Return a URL that can be used to clone a repository, eliminating the protocol and user if either exists."""
    if repository_clone_url.find('@') > 0:
        # We have an url that includes an authenticated user, something like:
        # http://test@bx.psu.edu:9009/repos/some_username/column
        items = repository_clone_url.split('@')
        tmp_url = items[1]
    elif repository_clone_url.find('//') > 0:
        # We have an url that includes only a protocol, something like:
        # http://bx.psu.edu:9009/repos/some_username/column
        items = repository_clone_url.split('//')
        tmp_url = items[1]
    else:
        tmp_url = repository_clone_url
    return tmp_url.rstrip('/')


def remove_protocol_from_tool_shed_url(tool_shed_url):
    """Return a partial Tool Shed URL, eliminating the protocol if it exists."""
    return util.remove_protocol_from_url(tool_shed_url)