view env/lib/python3.9/site-packages/ephemeris/setup_data_libraries.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line source

#!/usr/bin/env python
'''Tool to setup data libraries on a galaxy instance'''
import argparse
import logging as log
import sys
import time

import yaml
from bioblend import galaxy

from .common_parser import get_common_args


def create_legacy(gi, desc):
    destination = desc["destination"]
    if destination["type"] != "library":
        raise Exception("Only libraries may be created with pre-18.05 Galaxies using this script.")
    library_name = destination.get("name")
    library_description = destination.get("description")
    library_synopsis = destination.get("synopsis")

    # Check to see if the library already exists. If it does, do not recreate it. If it doesn't, create it.
    lib_id = None
    print("Library name: " + str(library_name))
    rmt_lib_list = gi.libraries.get_libraries(name=library_name, deleted=False)
    # Now we need to check if the library has been deleted since deleted=False still returns the deleted libraries!
    not_deleted_rmt_lib_list = []
    folder_id = None

    if rmt_lib_list:
        for x in rmt_lib_list:
            if not x['deleted']:
                not_deleted_rmt_lib_list.append(x)
    if not_deleted_rmt_lib_list:
        lib_id = not_deleted_rmt_lib_list[0]['id']
        print("Library already exists! id: " + str(lib_id))
        folder_id = gi.libraries.show_library(lib_id)['root_folder_id']
    else:
        lib = gi.libraries.create_library(library_name, library_description, library_synopsis)
        lib_id = lib['id']
        folder_id = lib['root_folder_id']

    def populate_items(base_folder_id, has_items):
        if "items" in has_items:
            name = has_items.get("name")
            description = has_items.get("description")
            folder_id = base_folder_id
            if name:
                # Check to see if the folder already exists, if it doesn't create it.
                rmt_folder_list = []
                folder = gi.libraries.get_folders(lib_id, folder_id)
                new_folder_name = "/" + name
                if folder and not folder[0]['name'] == "/":
                    new_folder_name = folder[0]['name'] + "/" + name
                rmt_folder_list = gi.libraries.get_folders(lib_id, name=new_folder_name)
                if rmt_folder_list:
                    folder_id = rmt_folder_list[0]['id']
                else:
                    folder = gi.libraries.create_folder(lib_id, name, description, base_folder_id=base_folder_id)
                    folder_id = folder[0]["id"]
            for item in has_items["items"]:
                populate_items(folder_id, item)
        else:
            src = has_items["src"]
            if src != "url":
                raise Exception("For pre-18.05 Galaxies only support URLs src items are supported.")
            rmt_library_files = gi.folders.show_folder(base_folder_id, contents=True)['folder_contents']
            file_names = []
            for item in rmt_library_files:
                if item['type'] == 'file':
                    file_names.append(item['name'])
            if has_items['url'] not in file_names:
                try:
                    gi.libraries.upload_file_from_url(
                        lib_id,
                        has_items['url'],
                        folder_id=base_folder_id,
                        file_type=has_items['ext']
                    )
                except Exception:
                    log.exception("Could not upload %s to %s/%s", has_items['url'], lib_id, base_folder_id)
        return None

    populate_items(folder_id, desc)
    return []


def create_batch_api(gi, desc):
    hc = galaxy.histories.HistoryClient(gi)
    tc = galaxy.tools.ToolClient(gi)

    history = hc.create_history()
    url = "%s/tools/fetch" % gi.url
    payload = {
        'targets': [desc],
        'history_id': history["id"]
    }
    yield tc._post(payload=payload, url=url)


def setup_data_libraries(gi, data, training=False, legacy=False):
    """
    Load files into a Galaxy data library.
    By default all test-data tools from all installed tools
    will be linked into a data library.
    """

    log.info("Importing data libraries.")
    jc = galaxy.jobs.JobsClient(gi)
    config = galaxy.config.ConfigClient(gi)
    version = config.get_version()

    if legacy:
        create_func = create_legacy
    else:
        version_major = version.get("version_major", "16.01")
        create_func = create_batch_api if version_major >= "18.05" else create_legacy

    library_def = yaml.safe_load(data)

    def normalize_items(has_items):
        # Synchronize Galaxy batch format with older training material style.
        if "files" in has_items:
            items = has_items.pop("files")
            has_items["items"] = items

        items = has_items.get("items", [])
        for item in items:
            normalize_items(item)
            src = item.get("src")
            url = item.get("url")
            if src is None and url:
                item["src"] = "url"
            if "file_type" in item:
                ext = item.pop("file_type")
                item["ext"] = ext

    # Normalize library definitions to allow older ephemeris style and native Galaxy batch
    # upload formats.
    if "libraries" in library_def:
        # File contains multiple definitions.
        library_def["items"] = library_def.pop("libraries")

    if "destination" not in library_def:
        library_def["destination"] = {"type": "library"}
    destination = library_def["destination"]

    if training:
        destination["name"] = destination.get("name", 'Training Data')
        destination["description"] = destination.get("description", 'Data pulled from online archives.')
    else:
        destination["name"] = destination.get("name", 'New Data Library')
        destination["description"] = destination.get("description", '')

    normalize_items(library_def)

    if library_def:
        jobs = list(create_func(gi, library_def))

        job_ids = []
        if legacy:
            for job in jc.get_jobs():
                # Fetch all upload job IDs, ignoring complete ones.
                if job['tool_id'] == 'upload1' and job['state'] not in ('ok', 'error'):
                    job_ids.append(job['id'])

            # Just have to check that all upload1 jobs are termianl.
        else:
            # Otherwise get back an actual list of jobs
            for job in jobs:
                if 'jobs' in job:
                    for subjob in job['jobs']:
                        job_ids.append(subjob['id'])

        while True:
            job_states = [jc.get_state(job) in ('ok', 'error', 'deleted') for job in job_ids]
            log.debug('Job states: %s' % ','.join([
                '%s=%s' % (job_id, job_state) for (job_id, job_state) in zip(job_ids, job_states)]))

            if all(job_states):
                break
            time.sleep(3)

        log.info("Finished importing test data.")


def _parser():
    '''Constructs the parser object'''
    parent = get_common_args()
    parser = argparse.ArgumentParser(
        parents=[parent],
        description='Populate the Galaxy data library with data.'
    )
    parser.add_argument('-i', '--infile', required=True, type=argparse.FileType('r'))
    parser.add_argument('--training', default=False, action="store_true",
                        help="Set defaults that make sense for training data.")
    parser.add_argument('--legacy', default=False, action="store_true",
                        help="Use legacy APIs even for newer Galaxies that should have a batch upload API enabled.")
    return parser


def main():
    args = _parser().parse_args()
    if args.user and args.password:
        gi = galaxy.GalaxyInstance(url=args.galaxy, email=args.user, password=args.password)
    elif args.api_key:
        gi = galaxy.GalaxyInstance(url=args.galaxy, key=args.api_key)
    else:
        sys.exit('Please specify either a valid Galaxy username/password or an API key.')

    if args.verbose:
        log.basicConfig(level=log.DEBUG)

    setup_data_libraries(gi, args.infile, training=args.training, legacy=args.legacy)


if __name__ == '__main__':
    main()