view env/lib/python3.9/site-packages/ephemeris/run_data_managers.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line source

#!/usr/bin/env python
"""Run-data-managers is a tool for provisioning data on a galaxy instance.

Run-data-managers has the ability to run multiple data managers that are interdependent.
When a reference genome is needed for bwa-mem for example, Run-data-managers
can first run a data manager to fetch the fasta file and run
another data manager that indexes the fasta file for bwa-mem.
This functionality depends on the "watch_tool_data_dir" setting in galaxy.ini to be True.
Also, if a new data manager is installed, galaxy needs to be restarted in order for it's tool_data_dir to be watched.

Run-data-managers needs a yaml that specifies what data managers are run and with which settings.
Example files can be found `here <https://github.com/galaxyproject/ephemeris/blob/master/tests/run_data_managers.yaml.sample>`_,
`here <https://github.com/galaxyproject/ephemeris/blob/master/tests/run_data_managers.yaml.sample.advanced>`_,
and `here <https://github.com/galaxyproject/ephemeris/blob/master/tests/run_data_managers.yaml.test>`_.

By default run-data-managers skips entries in the yaml file that have already been run.
It checks it in the following way:

  * If the data manager has input variables "name" or "sequence_name" it will check if the "name" column in the data table already has this entry.
    "name" will take precedence over "sequence_name".
  * If the data manager has input variables "value", "sequence_id" or 'dbkey' it will check if the "value"
    column in the data table already has this entry.
    Value takes precedence over sequence_id which takes precedence over dbkey.
  * If none of the above input variables are specified the data manager will always run.
"""
import argparse
import json
import logging
import time
from collections import namedtuple

from bioblend.galaxy.tool_data import ToolDataClient
from bioblend.galaxy.tools import ToolClient
from jinja2 import Template

from . import get_galaxy_connection
from . import load_yaml_file
from .common_parser import get_common_args
from .ephemeris_log import disable_external_library_logging, setup_global_logger

DEFAULT_URL = "http://localhost"
DEFAULT_SOURCE_TABLES = ["all_fasta"]


def wait(gi, job_list, log):
    """
        Waits until all jobs in a list are finished or failed.
        It will check the state of the created datasets every 30s.
        It will return a tuple: ( finished_jobs, failed_jobs )
    """

    failed_jobs = []
    successful_jobs = []

    # Empty list returns false and stops the loop.
    while bool(job_list):
        finished_jobs = []
        for job in job_list:
            job_hid = job['outputs'][0]['hid']
            # check if the output of the running job is either in 'ok' or 'error' state
            state = gi.datasets.show_dataset(job['outputs'][0]['id'])['state']
            if state == 'ok':
                log.info('Job %i finished with state %s.' % (job_hid, state))
                successful_jobs.append(job)
                finished_jobs.append(job)
            if state == 'error':
                log.error('Job %i finished with state %s.' % (job_hid, state))
                job_id = job['jobs'][0]['id']
                job_details = gi.jobs.show_job(job_id, full_details=True)
                log.error(
                    "Job {job_hid}: Tool '{tool_id}' finished with exit code: {exit_code}. Stderr: {stderr}".format(
                        job_hid=job_hid,
                        **job_details
                    ))
                log.debug("Job {job_hid}: Tool '{tool_id}' stdout: {stdout}".format(
                    job_hid=job_hid,
                    **job_details
                ))
                failed_jobs.append(job)
                finished_jobs.append(job)
            else:
                log.debug('Job %i still running.' % job_hid)
        # Remove finished jobs from job_list.
        for finished_job in finished_jobs:
            job_list.remove(finished_job)
        # only sleep if job_list is not empty yet.
        if bool(job_list):
            time.sleep(30)
    return successful_jobs, failed_jobs


def get_first_valid_entry(input_dict, key_list):
    """Iterates over key_list and returns the value of the first key that exists in the dictionary. Or returns None"""
    for key in key_list:
        if key in input_dict:
            return input_dict.get(key)
    return None


class DataManagers:
    def __init__(self, galaxy_instance, configuration):
        """
        :param galaxy_instance: A GalaxyInstance object (import from bioblend.galaxy)
        :param configuration: A dictionary. Examples in the ephemeris documentation.
        """
        self.gi = galaxy_instance
        self.config = configuration
        self.tool_data_client = ToolDataClient(self.gi)
        self.tool_client = ToolClient(self.gi)
        self.possible_name_keys = ['name', 'sequence_name']  # In order of importance!
        self.possible_value_keys = ['value', 'sequence_id', 'dbkey']  # In order of importance!
        self.data_managers = self.config.get('data_managers')
        self.genomes = self.config.get('genomes', '')
        self.source_tables = DEFAULT_SOURCE_TABLES
        self.fetch_jobs = []
        self.skipped_fetch_jobs = []
        self.index_jobs = []
        self.skipped_index_jobs = []

    def initiate_job_lists(self):
        """
        Determines which data managers should be run to populate the data tables.
        Distinguishes between fetch jobs (download files) and index jobs.
        :return: populate self.fetch_jobs, self.skipped_fetch_jobs, self.index_jobs and self.skipped_index_jobs
        """
        self.fetch_jobs = []
        self.skipped_fetch_jobs = []
        self.index_jobs = []
        self.skipped_index_jobs = []
        for dm in self.data_managers:
            jobs, skipped_jobs = self.get_dm_jobs(dm)
            if self.dm_is_fetcher(dm):
                self.fetch_jobs.extend(jobs)
                self.skipped_fetch_jobs.extend(skipped_jobs)
            else:
                self.index_jobs.extend(jobs)
                self.skipped_index_jobs.extend(skipped_jobs)

    def get_dm_jobs(self, dm):
        """Gets the job entries for a single dm. Puts entries that already present in skipped_job_list.
        :returns job_list, skipped_job_list"""
        job_list = []
        skipped_job_list = []
        items = self.parse_items(dm.get('items', ['']))
        for item in items:
            dm_id = dm['id']
            params = dm['params']
            inputs = dict()
            # Iterate over all parameters, replace occurences of {{item}} with the current processing item
            # and create the tool_inputs dict for running the data manager job
            for param in params:
                key, value = list(param.items())[0]
                value_template = Template(value)
                value = value_template.render(item=item)
                inputs.update({key: value})

            job = dict(tool_id=dm_id, inputs=inputs)

            data_tables = dm.get('data_table_reload', [])
            if self.input_entries_exist_in_data_tables(data_tables, inputs):
                skipped_job_list.append(job)
            else:
                job_list.append(job)
        return job_list, skipped_job_list

    def dm_is_fetcher(self, dm):
        """Checks whether the data manager fetches a sequence instead of indexing.
        This is based on the source table.
        :returns True if dm is a fetcher. False if it is not."""
        data_tables = dm.get('data_table_reload', [])
        for data_table in data_tables:
            if data_table in self.source_tables:
                return True
        return False

    def data_table_entry_exists(self, data_table_name, entry, column='value'):
        """Checks whether an entry exists in the a specified column in the data_table."""
        try:
            data_table_content = self.tool_data_client.show_data_table(data_table_name)
        except Exception:
            raise Exception('Table "%s" does not exist' % data_table_name)

        try:
            column_index = data_table_content.get('columns').index(column)
        except IndexError:
            raise IndexError('Column "%s" does not exist in %s' % (column, data_table_name))

        for field in data_table_content.get('fields'):
            if field[column_index] == entry:
                return True
        return False

    def input_entries_exist_in_data_tables(self, data_tables, input_dict):
        """Checks whether name and value entries from the input are already present in the data tables.
        If an entry is missing in of the tables, this function returns False"""
        value_entry = get_first_valid_entry(input_dict, self.possible_value_keys)
        name_entry = get_first_valid_entry(input_dict, self.possible_name_keys)

        # Return False if name and value entries are both None
        if not value_entry and not name_entry:
            return False

        # Check every data table for existence of name and value
        # Return False as soon as entry is not present
        for data_table in data_tables:
            if value_entry:
                if not self.data_table_entry_exists(data_table, value_entry, column='value'):
                    return False
            if name_entry:
                if not self.data_table_entry_exists(data_table, name_entry, column='name'):
                    return False
        # If all checks are passed the entries are present in the database tables.
        return True

    def parse_items(self, items):
        """
        Parses items with jinja2.
        :param items: the items to be parsed
        :return: the parsed items
        """
        if bool(self.genomes):
            items_template = Template(json.dumps(items))
            rendered_items = items_template.render(genomes=json.dumps(self.genomes))
            # Remove trailing " if present
            rendered_items = rendered_items.strip('"')
            items = json.loads(rendered_items)
        return items

    def run(self, log=None, ignore_errors=False, overwrite=False):
        """
        Runs the data managers.
        :param log: The log to be used.
        :param ignore_errors: Ignore erroring data_managers. Continue regardless.
        :param overwrite: Overwrite existing entries in data tables
        """
        self.initiate_job_lists()
        all_succesful_jobs = []
        all_failed_jobs = []
        all_skipped_jobs = []

        if not log:
            log = logging.getLogger()

        def run_jobs(jobs, skipped_jobs):
            job_list = []
            for skipped_job in skipped_jobs:
                if overwrite:
                    log.info('%s already run for %s. Entry will be overwritten.' %
                             (skipped_job["tool_id"], skipped_job["inputs"]))
                    jobs.append(skipped_job)
                else:
                    log.info('%s already run for %s. Skipping.' % (skipped_job["tool_id"], skipped_job["inputs"]))
                    all_skipped_jobs.append(skipped_job)
            for job in jobs:
                started_job = self.tool_client.run_tool(history_id=None, tool_id=job["tool_id"],
                                                        tool_inputs=job["inputs"])
                log.info('Dispatched job %i. Running DM: "%s" with parameters: %s' %
                         (started_job['outputs'][0]['hid'], job["tool_id"], job["inputs"]))
                job_list.append(started_job)

            successful_jobs, failed_jobs = wait(self.gi, job_list, log)
            if failed_jobs:
                if not ignore_errors:
                    log.error('Not all jobs successful! aborting...')
                    raise RuntimeError('Not all jobs successful! aborting...')
                else:
                    log.warning('Not all jobs successful! ignoring...')
            all_succesful_jobs.extend(successful_jobs)
            all_failed_jobs.extend(failed_jobs)

        log.info("Running data managers that populate the following source data tables: %s" % self.source_tables)
        run_jobs(self.fetch_jobs, self.skipped_fetch_jobs)
        log.info("Running data managers that index sequences.")
        run_jobs(self.index_jobs, self.skipped_index_jobs)

        log.info('Finished running data managers. Results:')
        log.info('Successful jobs: %i ' % len(all_succesful_jobs))
        log.info('Skipped jobs: %i ' % len(all_skipped_jobs))
        log.info('Failed jobs: %i ' % len(all_failed_jobs))
        InstallResults = namedtuple("InstallResults", ["successful_jobs", "failed_jobs", "skipped_jobs"])
        return InstallResults(successful_jobs=all_succesful_jobs, failed_jobs=all_failed_jobs,
                              skipped_jobs=all_skipped_jobs)


def _parser():
    """returns the parser object."""
    parent = get_common_args(log_file=True)

    parser = argparse.ArgumentParser(
        parents=[parent],
        description='Running Galaxy data managers in a defined order with defined parameters.'
                    "'watch_tool_data_dir' in galaxy config should be set to true.'")
    parser.add_argument("--config", required=True,
                        help="Path to the YAML config file with the list of data managers and data to install.")
    parser.add_argument("--overwrite", action="store_true",
                        help="Disables checking whether the item already exists in the tool data table.")
    parser.add_argument("--ignore_errors", action="store_true",
                        help="Do not stop running when jobs have failed.")
    return parser


def main():
    disable_external_library_logging()
    parser = _parser()
    args = parser.parse_args()
    log = setup_global_logger(name=__name__, log_file=args.log_file)
    if args.verbose:
        log.setLevel(logging.DEBUG)
    else:
        log.setLevel(logging.INFO)
    gi = get_galaxy_connection(args, file=args.config, log=log, login_required=True)
    config = load_yaml_file(args.config)
    data_managers = DataManagers(gi, config)
    data_managers.run(log, args.ignore_errors, args.overwrite)


if __name__ == '__main__':
    main()