Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/ephemeris/run_data_managers.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
| author | shellac |
|---|---|
| date | Sat, 02 May 2020 07:14:21 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:26e78fe6e8c4 |
|---|---|
| 1 #!/usr/bin/env python | |
| 2 """Run-data-managers is a tool for provisioning data on a galaxy instance. | |
| 3 | |
| 4 Run-data-managers has the ability to run multiple data managers that are interdependent. | |
| 5 When a reference genome is needed for bwa-mem for example, Run-data-managers | |
| 6 can first run a data manager to fetch the fasta file and run | |
| 7 another data manager that indexes the fasta file for bwa-mem. | |
| 8 This functionality depends on the "watch_tool_data_dir" setting in galaxy.ini to be True. | |
| 9 Also, if a new data manager is installed, galaxy needs to be restarted in order for it's tool_data_dir to be watched. | |
| 10 | |
| 11 Run-data-managers needs a yaml that specifies what data managers are run and with which settings. | |
| 12 Example files can be found `here <https://github.com/galaxyproject/ephemeris/blob/master/tests/run_data_managers.yaml.sample>`_, | |
| 13 `here <https://github.com/galaxyproject/ephemeris/blob/master/tests/run_data_managers.yaml.sample.advanced>`_, | |
| 14 and `here <https://github.com/galaxyproject/ephemeris/blob/master/tests/run_data_managers.yaml.test>`_. | |
| 15 | |
| 16 By default run-data-managers skips entries in the yaml file that have already been run. | |
| 17 It checks it in the following way: | |
| 18 | |
| 19 * If the data manager has input variables "name" or "sequence_name" it will check if the "name" column in the data table already has this entry. | |
| 20 "name" will take precedence over "sequence_name". | |
| 21 * If the data manager has input variables "value", "sequence_id" or 'dbkey' it will check if the "value" | |
| 22 column in the data table already has this entry. | |
| 23 Value takes precedence over sequence_id which takes precedence over dbkey. | |
| 24 * If none of the above input variables are specified the data manager will always run. | |
| 25 """ | |
| 26 import argparse | |
| 27 import json | |
| 28 import logging | |
| 29 import time | |
| 30 from collections import namedtuple | |
| 31 | |
| 32 from bioblend.galaxy.tool_data import ToolDataClient | |
| 33 from bioblend.galaxy.tools import ToolClient | |
| 34 from jinja2 import Template | |
| 35 | |
| 36 from . import get_galaxy_connection | |
| 37 from . import load_yaml_file | |
| 38 from .common_parser import get_common_args | |
| 39 from .ephemeris_log import disable_external_library_logging, setup_global_logger | |
| 40 | |
| 41 DEFAULT_URL = "http://localhost" | |
| 42 DEFAULT_SOURCE_TABLES = ["all_fasta"] | |
| 43 | |
| 44 | |
| 45 def wait(gi, job_list, log): | |
| 46 """ | |
| 47 Waits until all jobs in a list are finished or failed. | |
| 48 It will check the state of the created datasets every 30s. | |
| 49 It will return a tuple: ( finished_jobs, failed_jobs ) | |
| 50 """ | |
| 51 | |
| 52 failed_jobs = [] | |
| 53 successful_jobs = [] | |
| 54 | |
| 55 # Empty list returns false and stops the loop. | |
| 56 while bool(job_list): | |
| 57 finished_jobs = [] | |
| 58 for job in job_list: | |
| 59 job_hid = job['outputs'][0]['hid'] | |
| 60 # check if the output of the running job is either in 'ok' or 'error' state | |
| 61 state = gi.datasets.show_dataset(job['outputs'][0]['id'])['state'] | |
| 62 if state == 'ok': | |
| 63 log.info('Job %i finished with state %s.' % (job_hid, state)) | |
| 64 successful_jobs.append(job) | |
| 65 finished_jobs.append(job) | |
| 66 if state == 'error': | |
| 67 log.error('Job %i finished with state %s.' % (job_hid, state)) | |
| 68 job_id = job['jobs'][0]['id'] | |
| 69 job_details = gi.jobs.show_job(job_id, full_details=True) | |
| 70 log.error( | |
| 71 "Job {job_hid}: Tool '{tool_id}' finished with exit code: {exit_code}. Stderr: {stderr}".format( | |
| 72 job_hid=job_hid, | |
| 73 **job_details | |
| 74 )) | |
| 75 log.debug("Job {job_hid}: Tool '{tool_id}' stdout: {stdout}".format( | |
| 76 job_hid=job_hid, | |
| 77 **job_details | |
| 78 )) | |
| 79 failed_jobs.append(job) | |
| 80 finished_jobs.append(job) | |
| 81 else: | |
| 82 log.debug('Job %i still running.' % job_hid) | |
| 83 # Remove finished jobs from job_list. | |
| 84 for finished_job in finished_jobs: | |
| 85 job_list.remove(finished_job) | |
| 86 # only sleep if job_list is not empty yet. | |
| 87 if bool(job_list): | |
| 88 time.sleep(30) | |
| 89 return successful_jobs, failed_jobs | |
| 90 | |
| 91 | |
| 92 def get_first_valid_entry(input_dict, key_list): | |
| 93 """Iterates over key_list and returns the value of the first key that exists in the dictionary. Or returns None""" | |
| 94 for key in key_list: | |
| 95 if key in input_dict: | |
| 96 return input_dict.get(key) | |
| 97 return None | |
| 98 | |
| 99 | |
| 100 class DataManagers: | |
| 101 def __init__(self, galaxy_instance, configuration): | |
| 102 """ | |
| 103 :param galaxy_instance: A GalaxyInstance object (import from bioblend.galaxy) | |
| 104 :param configuration: A dictionary. Examples in the ephemeris documentation. | |
| 105 """ | |
| 106 self.gi = galaxy_instance | |
| 107 self.config = configuration | |
| 108 self.tool_data_client = ToolDataClient(self.gi) | |
| 109 self.tool_client = ToolClient(self.gi) | |
| 110 self.possible_name_keys = ['name', 'sequence_name'] # In order of importance! | |
| 111 self.possible_value_keys = ['value', 'sequence_id', 'dbkey'] # In order of importance! | |
| 112 self.data_managers = self.config.get('data_managers') | |
| 113 self.genomes = self.config.get('genomes', '') | |
| 114 self.source_tables = DEFAULT_SOURCE_TABLES | |
| 115 self.fetch_jobs = [] | |
| 116 self.skipped_fetch_jobs = [] | |
| 117 self.index_jobs = [] | |
| 118 self.skipped_index_jobs = [] | |
| 119 | |
| 120 def initiate_job_lists(self): | |
| 121 """ | |
| 122 Determines which data managers should be run to populate the data tables. | |
| 123 Distinguishes between fetch jobs (download files) and index jobs. | |
| 124 :return: populate self.fetch_jobs, self.skipped_fetch_jobs, self.index_jobs and self.skipped_index_jobs | |
| 125 """ | |
| 126 self.fetch_jobs = [] | |
| 127 self.skipped_fetch_jobs = [] | |
| 128 self.index_jobs = [] | |
| 129 self.skipped_index_jobs = [] | |
| 130 for dm in self.data_managers: | |
| 131 jobs, skipped_jobs = self.get_dm_jobs(dm) | |
| 132 if self.dm_is_fetcher(dm): | |
| 133 self.fetch_jobs.extend(jobs) | |
| 134 self.skipped_fetch_jobs.extend(skipped_jobs) | |
| 135 else: | |
| 136 self.index_jobs.extend(jobs) | |
| 137 self.skipped_index_jobs.extend(skipped_jobs) | |
| 138 | |
| 139 def get_dm_jobs(self, dm): | |
| 140 """Gets the job entries for a single dm. Puts entries that already present in skipped_job_list. | |
| 141 :returns job_list, skipped_job_list""" | |
| 142 job_list = [] | |
| 143 skipped_job_list = [] | |
| 144 items = self.parse_items(dm.get('items', [''])) | |
| 145 for item in items: | |
| 146 dm_id = dm['id'] | |
| 147 params = dm['params'] | |
| 148 inputs = dict() | |
| 149 # Iterate over all parameters, replace occurences of {{item}} with the current processing item | |
| 150 # and create the tool_inputs dict for running the data manager job | |
| 151 for param in params: | |
| 152 key, value = list(param.items())[0] | |
| 153 value_template = Template(value) | |
| 154 value = value_template.render(item=item) | |
| 155 inputs.update({key: value}) | |
| 156 | |
| 157 job = dict(tool_id=dm_id, inputs=inputs) | |
| 158 | |
| 159 data_tables = dm.get('data_table_reload', []) | |
| 160 if self.input_entries_exist_in_data_tables(data_tables, inputs): | |
| 161 skipped_job_list.append(job) | |
| 162 else: | |
| 163 job_list.append(job) | |
| 164 return job_list, skipped_job_list | |
| 165 | |
| 166 def dm_is_fetcher(self, dm): | |
| 167 """Checks whether the data manager fetches a sequence instead of indexing. | |
| 168 This is based on the source table. | |
| 169 :returns True if dm is a fetcher. False if it is not.""" | |
| 170 data_tables = dm.get('data_table_reload', []) | |
| 171 for data_table in data_tables: | |
| 172 if data_table in self.source_tables: | |
| 173 return True | |
| 174 return False | |
| 175 | |
| 176 def data_table_entry_exists(self, data_table_name, entry, column='value'): | |
| 177 """Checks whether an entry exists in the a specified column in the data_table.""" | |
| 178 try: | |
| 179 data_table_content = self.tool_data_client.show_data_table(data_table_name) | |
| 180 except Exception: | |
| 181 raise Exception('Table "%s" does not exist' % data_table_name) | |
| 182 | |
| 183 try: | |
| 184 column_index = data_table_content.get('columns').index(column) | |
| 185 except IndexError: | |
| 186 raise IndexError('Column "%s" does not exist in %s' % (column, data_table_name)) | |
| 187 | |
| 188 for field in data_table_content.get('fields'): | |
| 189 if field[column_index] == entry: | |
| 190 return True | |
| 191 return False | |
| 192 | |
| 193 def input_entries_exist_in_data_tables(self, data_tables, input_dict): | |
| 194 """Checks whether name and value entries from the input are already present in the data tables. | |
| 195 If an entry is missing in of the tables, this function returns False""" | |
| 196 value_entry = get_first_valid_entry(input_dict, self.possible_value_keys) | |
| 197 name_entry = get_first_valid_entry(input_dict, self.possible_name_keys) | |
| 198 | |
| 199 # Return False if name and value entries are both None | |
| 200 if not value_entry and not name_entry: | |
| 201 return False | |
| 202 | |
| 203 # Check every data table for existence of name and value | |
| 204 # Return False as soon as entry is not present | |
| 205 for data_table in data_tables: | |
| 206 if value_entry: | |
| 207 if not self.data_table_entry_exists(data_table, value_entry, column='value'): | |
| 208 return False | |
| 209 if name_entry: | |
| 210 if not self.data_table_entry_exists(data_table, name_entry, column='name'): | |
| 211 return False | |
| 212 # If all checks are passed the entries are present in the database tables. | |
| 213 return True | |
| 214 | |
| 215 def parse_items(self, items): | |
| 216 """ | |
| 217 Parses items with jinja2. | |
| 218 :param items: the items to be parsed | |
| 219 :return: the parsed items | |
| 220 """ | |
| 221 if bool(self.genomes): | |
| 222 items_template = Template(json.dumps(items)) | |
| 223 rendered_items = items_template.render(genomes=json.dumps(self.genomes)) | |
| 224 # Remove trailing " if present | |
| 225 rendered_items = rendered_items.strip('"') | |
| 226 items = json.loads(rendered_items) | |
| 227 return items | |
| 228 | |
| 229 def run(self, log=None, ignore_errors=False, overwrite=False): | |
| 230 """ | |
| 231 Runs the data managers. | |
| 232 :param log: The log to be used. | |
| 233 :param ignore_errors: Ignore erroring data_managers. Continue regardless. | |
| 234 :param overwrite: Overwrite existing entries in data tables | |
| 235 """ | |
| 236 self.initiate_job_lists() | |
| 237 all_succesful_jobs = [] | |
| 238 all_failed_jobs = [] | |
| 239 all_skipped_jobs = [] | |
| 240 | |
| 241 if not log: | |
| 242 log = logging.getLogger() | |
| 243 | |
| 244 def run_jobs(jobs, skipped_jobs): | |
| 245 job_list = [] | |
| 246 for skipped_job in skipped_jobs: | |
| 247 if overwrite: | |
| 248 log.info('%s already run for %s. Entry will be overwritten.' % | |
| 249 (skipped_job["tool_id"], skipped_job["inputs"])) | |
| 250 jobs.append(skipped_job) | |
| 251 else: | |
| 252 log.info('%s already run for %s. Skipping.' % (skipped_job["tool_id"], skipped_job["inputs"])) | |
| 253 all_skipped_jobs.append(skipped_job) | |
| 254 for job in jobs: | |
| 255 started_job = self.tool_client.run_tool(history_id=None, tool_id=job["tool_id"], | |
| 256 tool_inputs=job["inputs"]) | |
| 257 log.info('Dispatched job %i. Running DM: "%s" with parameters: %s' % | |
| 258 (started_job['outputs'][0]['hid'], job["tool_id"], job["inputs"])) | |
| 259 job_list.append(started_job) | |
| 260 | |
| 261 successful_jobs, failed_jobs = wait(self.gi, job_list, log) | |
| 262 if failed_jobs: | |
| 263 if not ignore_errors: | |
| 264 log.error('Not all jobs successful! aborting...') | |
| 265 raise RuntimeError('Not all jobs successful! aborting...') | |
| 266 else: | |
| 267 log.warning('Not all jobs successful! ignoring...') | |
| 268 all_succesful_jobs.extend(successful_jobs) | |
| 269 all_failed_jobs.extend(failed_jobs) | |
| 270 | |
| 271 log.info("Running data managers that populate the following source data tables: %s" % self.source_tables) | |
| 272 run_jobs(self.fetch_jobs, self.skipped_fetch_jobs) | |
| 273 log.info("Running data managers that index sequences.") | |
| 274 run_jobs(self.index_jobs, self.skipped_index_jobs) | |
| 275 | |
| 276 log.info('Finished running data managers. Results:') | |
| 277 log.info('Successful jobs: %i ' % len(all_succesful_jobs)) | |
| 278 log.info('Skipped jobs: %i ' % len(all_skipped_jobs)) | |
| 279 log.info('Failed jobs: %i ' % len(all_failed_jobs)) | |
| 280 InstallResults = namedtuple("InstallResults", ["successful_jobs", "failed_jobs", "skipped_jobs"]) | |
| 281 return InstallResults(successful_jobs=all_succesful_jobs, failed_jobs=all_failed_jobs, | |
| 282 skipped_jobs=all_skipped_jobs) | |
| 283 | |
| 284 | |
| 285 def _parser(): | |
| 286 """returns the parser object.""" | |
| 287 parent = get_common_args(log_file=True) | |
| 288 | |
| 289 parser = argparse.ArgumentParser( | |
| 290 parents=[parent], | |
| 291 description='Running Galaxy data managers in a defined order with defined parameters.' | |
| 292 "'watch_tool_data_dir' in galaxy config should be set to true.'") | |
| 293 parser.add_argument("--config", required=True, | |
| 294 help="Path to the YAML config file with the list of data managers and data to install.") | |
| 295 parser.add_argument("--overwrite", action="store_true", | |
| 296 help="Disables checking whether the item already exists in the tool data table.") | |
| 297 parser.add_argument("--ignore_errors", action="store_true", | |
| 298 help="Do not stop running when jobs have failed.") | |
| 299 return parser | |
| 300 | |
| 301 | |
| 302 def main(): | |
| 303 disable_external_library_logging() | |
| 304 parser = _parser() | |
| 305 args = parser.parse_args() | |
| 306 log = setup_global_logger(name=__name__, log_file=args.log_file) | |
| 307 if args.verbose: | |
| 308 log.setLevel(logging.DEBUG) | |
| 309 else: | |
| 310 log.setLevel(logging.INFO) | |
| 311 gi = get_galaxy_connection(args, file=args.config, log=log, login_required=True) | |
| 312 config = load_yaml_file(args.config) | |
| 313 data_managers = DataManagers(gi, config) | |
| 314 data_managers.run(log, args.ignore_errors, args.overwrite) | |
| 315 | |
| 316 | |
| 317 if __name__ == '__main__': | |
| 318 main() |
