comparison env/lib/python3.9/site-packages/ephemeris/run_data_managers.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 #!/usr/bin/env python
2 """Run-data-managers is a tool for provisioning data on a galaxy instance.
3
4 Run-data-managers has the ability to run multiple data managers that are interdependent.
5 When a reference genome is needed for bwa-mem for example, Run-data-managers
6 can first run a data manager to fetch the fasta file and run
7 another data manager that indexes the fasta file for bwa-mem.
8 This functionality depends on the "watch_tool_data_dir" setting in galaxy.ini to be True.
9 Also, if a new data manager is installed, galaxy needs to be restarted in order for it's tool_data_dir to be watched.
10
11 Run-data-managers needs a yaml that specifies what data managers are run and with which settings.
12 Example files can be found `here <https://github.com/galaxyproject/ephemeris/blob/master/tests/run_data_managers.yaml.sample>`_,
13 `here <https://github.com/galaxyproject/ephemeris/blob/master/tests/run_data_managers.yaml.sample.advanced>`_,
14 and `here <https://github.com/galaxyproject/ephemeris/blob/master/tests/run_data_managers.yaml.test>`_.
15
16 By default run-data-managers skips entries in the yaml file that have already been run.
17 It checks it in the following way:
18
19 * If the data manager has input variables "name" or "sequence_name" it will check if the "name" column in the data table already has this entry.
20 "name" will take precedence over "sequence_name".
21 * If the data manager has input variables "value", "sequence_id" or 'dbkey' it will check if the "value"
22 column in the data table already has this entry.
23 Value takes precedence over sequence_id which takes precedence over dbkey.
24 * If none of the above input variables are specified the data manager will always run.
25 """
26 import argparse
27 import json
28 import logging
29 import time
30 from collections import namedtuple
31
32 from bioblend.galaxy.tool_data import ToolDataClient
33 from bioblend.galaxy.tools import ToolClient
34 from jinja2 import Template
35
36 from . import get_galaxy_connection
37 from . import load_yaml_file
38 from .common_parser import get_common_args
39 from .ephemeris_log import disable_external_library_logging, setup_global_logger
40
41 DEFAULT_URL = "http://localhost"
42 DEFAULT_SOURCE_TABLES = ["all_fasta"]
43
44
45 def wait(gi, job_list, log):
46 """
47 Waits until all jobs in a list are finished or failed.
48 It will check the state of the created datasets every 30s.
49 It will return a tuple: ( finished_jobs, failed_jobs )
50 """
51
52 failed_jobs = []
53 successful_jobs = []
54
55 # Empty list returns false and stops the loop.
56 while bool(job_list):
57 finished_jobs = []
58 for job in job_list:
59 job_hid = job['outputs'][0]['hid']
60 # check if the output of the running job is either in 'ok' or 'error' state
61 state = gi.datasets.show_dataset(job['outputs'][0]['id'])['state']
62 if state == 'ok':
63 log.info('Job %i finished with state %s.' % (job_hid, state))
64 successful_jobs.append(job)
65 finished_jobs.append(job)
66 if state == 'error':
67 log.error('Job %i finished with state %s.' % (job_hid, state))
68 job_id = job['jobs'][0]['id']
69 job_details = gi.jobs.show_job(job_id, full_details=True)
70 log.error(
71 "Job {job_hid}: Tool '{tool_id}' finished with exit code: {exit_code}. Stderr: {stderr}".format(
72 job_hid=job_hid,
73 **job_details
74 ))
75 log.debug("Job {job_hid}: Tool '{tool_id}' stdout: {stdout}".format(
76 job_hid=job_hid,
77 **job_details
78 ))
79 failed_jobs.append(job)
80 finished_jobs.append(job)
81 else:
82 log.debug('Job %i still running.' % job_hid)
83 # Remove finished jobs from job_list.
84 for finished_job in finished_jobs:
85 job_list.remove(finished_job)
86 # only sleep if job_list is not empty yet.
87 if bool(job_list):
88 time.sleep(30)
89 return successful_jobs, failed_jobs
90
91
92 def get_first_valid_entry(input_dict, key_list):
93 """Iterates over key_list and returns the value of the first key that exists in the dictionary. Or returns None"""
94 for key in key_list:
95 if key in input_dict:
96 return input_dict.get(key)
97 return None
98
99
100 class DataManagers:
101 def __init__(self, galaxy_instance, configuration):
102 """
103 :param galaxy_instance: A GalaxyInstance object (import from bioblend.galaxy)
104 :param configuration: A dictionary. Examples in the ephemeris documentation.
105 """
106 self.gi = galaxy_instance
107 self.config = configuration
108 self.tool_data_client = ToolDataClient(self.gi)
109 self.tool_client = ToolClient(self.gi)
110 self.possible_name_keys = ['name', 'sequence_name'] # In order of importance!
111 self.possible_value_keys = ['value', 'sequence_id', 'dbkey'] # In order of importance!
112 self.data_managers = self.config.get('data_managers')
113 self.genomes = self.config.get('genomes', '')
114 self.source_tables = DEFAULT_SOURCE_TABLES
115 self.fetch_jobs = []
116 self.skipped_fetch_jobs = []
117 self.index_jobs = []
118 self.skipped_index_jobs = []
119
120 def initiate_job_lists(self):
121 """
122 Determines which data managers should be run to populate the data tables.
123 Distinguishes between fetch jobs (download files) and index jobs.
124 :return: populate self.fetch_jobs, self.skipped_fetch_jobs, self.index_jobs and self.skipped_index_jobs
125 """
126 self.fetch_jobs = []
127 self.skipped_fetch_jobs = []
128 self.index_jobs = []
129 self.skipped_index_jobs = []
130 for dm in self.data_managers:
131 jobs, skipped_jobs = self.get_dm_jobs(dm)
132 if self.dm_is_fetcher(dm):
133 self.fetch_jobs.extend(jobs)
134 self.skipped_fetch_jobs.extend(skipped_jobs)
135 else:
136 self.index_jobs.extend(jobs)
137 self.skipped_index_jobs.extend(skipped_jobs)
138
139 def get_dm_jobs(self, dm):
140 """Gets the job entries for a single dm. Puts entries that already present in skipped_job_list.
141 :returns job_list, skipped_job_list"""
142 job_list = []
143 skipped_job_list = []
144 items = self.parse_items(dm.get('items', ['']))
145 for item in items:
146 dm_id = dm['id']
147 params = dm['params']
148 inputs = dict()
149 # Iterate over all parameters, replace occurences of {{item}} with the current processing item
150 # and create the tool_inputs dict for running the data manager job
151 for param in params:
152 key, value = list(param.items())[0]
153 value_template = Template(value)
154 value = value_template.render(item=item)
155 inputs.update({key: value})
156
157 job = dict(tool_id=dm_id, inputs=inputs)
158
159 data_tables = dm.get('data_table_reload', [])
160 if self.input_entries_exist_in_data_tables(data_tables, inputs):
161 skipped_job_list.append(job)
162 else:
163 job_list.append(job)
164 return job_list, skipped_job_list
165
166 def dm_is_fetcher(self, dm):
167 """Checks whether the data manager fetches a sequence instead of indexing.
168 This is based on the source table.
169 :returns True if dm is a fetcher. False if it is not."""
170 data_tables = dm.get('data_table_reload', [])
171 for data_table in data_tables:
172 if data_table in self.source_tables:
173 return True
174 return False
175
176 def data_table_entry_exists(self, data_table_name, entry, column='value'):
177 """Checks whether an entry exists in the a specified column in the data_table."""
178 try:
179 data_table_content = self.tool_data_client.show_data_table(data_table_name)
180 except Exception:
181 raise Exception('Table "%s" does not exist' % data_table_name)
182
183 try:
184 column_index = data_table_content.get('columns').index(column)
185 except IndexError:
186 raise IndexError('Column "%s" does not exist in %s' % (column, data_table_name))
187
188 for field in data_table_content.get('fields'):
189 if field[column_index] == entry:
190 return True
191 return False
192
193 def input_entries_exist_in_data_tables(self, data_tables, input_dict):
194 """Checks whether name and value entries from the input are already present in the data tables.
195 If an entry is missing in of the tables, this function returns False"""
196 value_entry = get_first_valid_entry(input_dict, self.possible_value_keys)
197 name_entry = get_first_valid_entry(input_dict, self.possible_name_keys)
198
199 # Return False if name and value entries are both None
200 if not value_entry and not name_entry:
201 return False
202
203 # Check every data table for existence of name and value
204 # Return False as soon as entry is not present
205 for data_table in data_tables:
206 if value_entry:
207 if not self.data_table_entry_exists(data_table, value_entry, column='value'):
208 return False
209 if name_entry:
210 if not self.data_table_entry_exists(data_table, name_entry, column='name'):
211 return False
212 # If all checks are passed the entries are present in the database tables.
213 return True
214
215 def parse_items(self, items):
216 """
217 Parses items with jinja2.
218 :param items: the items to be parsed
219 :return: the parsed items
220 """
221 if bool(self.genomes):
222 items_template = Template(json.dumps(items))
223 rendered_items = items_template.render(genomes=json.dumps(self.genomes))
224 # Remove trailing " if present
225 rendered_items = rendered_items.strip('"')
226 items = json.loads(rendered_items)
227 return items
228
229 def run(self, log=None, ignore_errors=False, overwrite=False):
230 """
231 Runs the data managers.
232 :param log: The log to be used.
233 :param ignore_errors: Ignore erroring data_managers. Continue regardless.
234 :param overwrite: Overwrite existing entries in data tables
235 """
236 self.initiate_job_lists()
237 all_succesful_jobs = []
238 all_failed_jobs = []
239 all_skipped_jobs = []
240
241 if not log:
242 log = logging.getLogger()
243
244 def run_jobs(jobs, skipped_jobs):
245 job_list = []
246 for skipped_job in skipped_jobs:
247 if overwrite:
248 log.info('%s already run for %s. Entry will be overwritten.' %
249 (skipped_job["tool_id"], skipped_job["inputs"]))
250 jobs.append(skipped_job)
251 else:
252 log.info('%s already run for %s. Skipping.' % (skipped_job["tool_id"], skipped_job["inputs"]))
253 all_skipped_jobs.append(skipped_job)
254 for job in jobs:
255 started_job = self.tool_client.run_tool(history_id=None, tool_id=job["tool_id"],
256 tool_inputs=job["inputs"])
257 log.info('Dispatched job %i. Running DM: "%s" with parameters: %s' %
258 (started_job['outputs'][0]['hid'], job["tool_id"], job["inputs"]))
259 job_list.append(started_job)
260
261 successful_jobs, failed_jobs = wait(self.gi, job_list, log)
262 if failed_jobs:
263 if not ignore_errors:
264 log.error('Not all jobs successful! aborting...')
265 raise RuntimeError('Not all jobs successful! aborting...')
266 else:
267 log.warning('Not all jobs successful! ignoring...')
268 all_succesful_jobs.extend(successful_jobs)
269 all_failed_jobs.extend(failed_jobs)
270
271 log.info("Running data managers that populate the following source data tables: %s" % self.source_tables)
272 run_jobs(self.fetch_jobs, self.skipped_fetch_jobs)
273 log.info("Running data managers that index sequences.")
274 run_jobs(self.index_jobs, self.skipped_index_jobs)
275
276 log.info('Finished running data managers. Results:')
277 log.info('Successful jobs: %i ' % len(all_succesful_jobs))
278 log.info('Skipped jobs: %i ' % len(all_skipped_jobs))
279 log.info('Failed jobs: %i ' % len(all_failed_jobs))
280 InstallResults = namedtuple("InstallResults", ["successful_jobs", "failed_jobs", "skipped_jobs"])
281 return InstallResults(successful_jobs=all_succesful_jobs, failed_jobs=all_failed_jobs,
282 skipped_jobs=all_skipped_jobs)
283
284
285 def _parser():
286 """returns the parser object."""
287 parent = get_common_args(log_file=True)
288
289 parser = argparse.ArgumentParser(
290 parents=[parent],
291 description='Running Galaxy data managers in a defined order with defined parameters.'
292 "'watch_tool_data_dir' in galaxy config should be set to true.'")
293 parser.add_argument("--config", required=True,
294 help="Path to the YAML config file with the list of data managers and data to install.")
295 parser.add_argument("--overwrite", action="store_true",
296 help="Disables checking whether the item already exists in the tool data table.")
297 parser.add_argument("--ignore_errors", action="store_true",
298 help="Do not stop running when jobs have failed.")
299 return parser
300
301
302 def main():
303 disable_external_library_logging()
304 parser = _parser()
305 args = parser.parse_args()
306 log = setup_global_logger(name=__name__, log_file=args.log_file)
307 if args.verbose:
308 log.setLevel(logging.DEBUG)
309 else:
310 log.setLevel(logging.INFO)
311 gi = get_galaxy_connection(args, file=args.config, log=log, login_required=True)
312 config = load_yaml_file(args.config)
313 data_managers = DataManagers(gi, config)
314 data_managers.run(log, args.ignore_errors, args.overwrite)
315
316
317 if __name__ == '__main__':
318 main()