Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/planemo/galaxy/activity.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 (2021-03-22) |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 """Module provides generic interface to running Galaxy tools and workflows.""" | |
2 | |
3 import json | |
4 import os | |
5 import sys | |
6 import tempfile | |
7 import time | |
8 import traceback | |
9 | |
10 import bioblend | |
11 from bioblend.util import attach_file | |
12 from galaxy.tool_util.client.staging import ( | |
13 StagingInterace, | |
14 ) | |
15 from galaxy.tool_util.cwl.util import ( | |
16 invocation_to_output, | |
17 output_properties, | |
18 output_to_cwl_json, | |
19 tool_response_to_output, | |
20 ) | |
21 from galaxy.tool_util.parser import get_tool_source | |
22 from galaxy.tool_util.verify.interactor import galaxy_requests_post | |
23 from galaxy.util import ( | |
24 safe_makedirs, | |
25 unicodify, | |
26 ) | |
27 from requests.exceptions import RequestException | |
28 from six.moves.urllib.parse import urljoin | |
29 | |
30 from planemo.galaxy.api import summarize_history | |
31 from planemo.io import wait_on | |
32 from planemo.runnable import ( | |
33 ErrorRunResponse, | |
34 get_outputs, | |
35 RunnableType, | |
36 SuccessfulRunResponse, | |
37 ) | |
38 | |
39 DEFAULT_HISTORY_NAME = "CWL Target History" | |
40 ERR_NO_SUCH_TOOL = ("Failed to find tool with ID [%s] in Galaxy - cannot execute job. " | |
41 "You may need to enable verbose logging and determine why the tool did not load. [%s]") | |
42 | |
43 | |
44 def execute(ctx, config, runnable, job_path, **kwds): | |
45 """Execute a Galaxy activity.""" | |
46 try: | |
47 return _execute(ctx, config, runnable, job_path, **kwds) | |
48 except Exception as e: | |
49 if ctx.verbose: | |
50 ctx.vlog("Failed to execute Galaxy activity, throwing ErrorRunResponse") | |
51 traceback.print_exc(file=sys.stdout) | |
52 return ErrorRunResponse(unicodify(e)) | |
53 | |
54 | |
55 def _verified_tool_id(runnable, user_gi): | |
56 tool_id = _tool_id(runnable.path) | |
57 try: | |
58 user_gi.tools.show_tool(tool_id) | |
59 except Exception as e: | |
60 raise Exception(ERR_NO_SUCH_TOOL % (tool_id, e)) | |
61 return tool_id | |
62 | |
63 | |
64 def _inputs_representation(runnable): | |
65 if runnable.type == RunnableType.cwl_tool: | |
66 inputs_representation = "cwl" | |
67 else: | |
68 inputs_representation = "galaxy" | |
69 return inputs_representation | |
70 | |
71 | |
72 def log_contents_str(config): | |
73 if hasattr(config, "log_contents"): | |
74 return config.log_contents | |
75 else: | |
76 return "No log for this engine type." | |
77 | |
78 | |
79 class PlanemoStagingInterface(StagingInterace): | |
80 | |
81 def __init__(self, ctx, runnable, user_gi, version_major): | |
82 self._ctx = ctx | |
83 self._user_gi = user_gi | |
84 self._runnable = runnable | |
85 self._version_major = version_major | |
86 | |
87 def _post(self, api_path, payload, files_attached=False): | |
88 params = dict(key=self._user_gi.key) | |
89 url = urljoin(self._user_gi.url, "api/" + api_path) | |
90 return galaxy_requests_post(url, data=payload, params=params, as_json=True).json() | |
91 | |
92 def _attach_file(self, path): | |
93 return attach_file(path) | |
94 | |
95 def _handle_job(self, job_response): | |
96 job_id = job_response["id"] | |
97 _wait_for_job(self._user_gi, job_id) | |
98 | |
99 @property | |
100 def use_fetch_api(self): | |
101 # hack around this not working for galaxy_tools - why is that :( | |
102 return self._runnable.type != RunnableType.galaxy_tool and self._version_major >= "20.09" | |
103 | |
104 # extension point for planemo to override logging | |
105 def _log(self, message): | |
106 self._ctx.vlog(message) | |
107 | |
108 | |
109 def _execute(ctx, config, runnable, job_path, **kwds): | |
110 user_gi = config.user_gi | |
111 admin_gi = config.gi | |
112 | |
113 history_id = _history_id(user_gi, **kwds) | |
114 | |
115 try: | |
116 job_dict, _ = stage_in(ctx, runnable, config, user_gi, history_id, job_path, **kwds) | |
117 except Exception: | |
118 ctx.vlog("Problem with staging in data for Galaxy activities...") | |
119 raise | |
120 if runnable.type in [RunnableType.galaxy_tool, RunnableType.cwl_tool]: | |
121 response_class = GalaxyToolRunResponse | |
122 tool_id = _verified_tool_id(runnable, user_gi) | |
123 inputs_representation = _inputs_representation(runnable) | |
124 run_tool_payload = dict( | |
125 history_id=history_id, | |
126 tool_id=tool_id, | |
127 inputs=job_dict, | |
128 inputs_representation=inputs_representation, | |
129 ) | |
130 ctx.vlog("Post to Galaxy tool API with payload [%s]" % run_tool_payload) | |
131 tool_run_response = user_gi.tools._post(run_tool_payload) | |
132 | |
133 job = tool_run_response["jobs"][0] | |
134 job_id = job["id"] | |
135 try: | |
136 final_state = _wait_for_job(user_gi, job_id) | |
137 except Exception: | |
138 summarize_history(ctx, user_gi, history_id) | |
139 raise | |
140 if final_state != "ok": | |
141 msg = "Failed to run CWL tool job final job state is [%s]." % final_state | |
142 summarize_history(ctx, user_gi, history_id) | |
143 raise Exception(msg) | |
144 | |
145 ctx.vlog("Final job state was ok, fetching details for job [%s]" % job_id) | |
146 job_info = admin_gi.jobs.show_job(job_id) | |
147 response_kwds = { | |
148 'job_info': job_info, | |
149 'api_run_response': tool_run_response, | |
150 } | |
151 if ctx.verbose: | |
152 summarize_history(ctx, user_gi, history_id) | |
153 elif runnable.type in [RunnableType.galaxy_workflow, RunnableType.cwl_workflow]: | |
154 response_class = GalaxyWorkflowRunResponse | |
155 workflow_id = config.workflow_id_for_runnable(runnable) | |
156 ctx.vlog("Found Galaxy workflow ID [%s] for URI [%s]" % (workflow_id, runnable.uri)) | |
157 # TODO: Use the following when BioBlend 0.14 is released | |
158 # invocation = user_gi.worklfows.invoke_workflow( | |
159 # workflow_id, | |
160 # inputs=job_dict, | |
161 # history_id=history_id, | |
162 # allow_tool_state_corrections=True, | |
163 # inputs_by="name", | |
164 # ) | |
165 payload = dict( | |
166 workflow_id=workflow_id, | |
167 history_id=history_id, | |
168 inputs=job_dict, | |
169 inputs_by="name", | |
170 allow_tool_state_corrections=True, | |
171 ) | |
172 invocations_url = "%s/workflows/%s/invocations" % ( | |
173 user_gi.url, | |
174 workflow_id, | |
175 ) | |
176 invocation = user_gi.workflows._post(payload, url=invocations_url) | |
177 invocation_id = invocation["id"] | |
178 ctx.vlog("Waiting for invocation [%s]" % invocation_id) | |
179 polling_backoff = kwds.get("polling_backoff", 0) | |
180 final_invocation_state = 'new' | |
181 error_message = "" | |
182 try: | |
183 final_invocation_state = _wait_for_invocation(ctx, user_gi, history_id, workflow_id, invocation_id, polling_backoff) | |
184 assert final_invocation_state == 'scheduled' | |
185 except Exception: | |
186 ctx.vlog("Problem waiting on invocation...") | |
187 summarize_history(ctx, user_gi, history_id) | |
188 error_message = "Final invocation state is [%s]" % final_invocation_state | |
189 ctx.vlog("Final invocation state is [%s]" % final_invocation_state) | |
190 final_state = _wait_for_history(ctx, user_gi, history_id, polling_backoff) | |
191 if final_state != "ok": | |
192 msg = "Failed to run workflow final history state is [%s]." % final_state | |
193 error_message = msg if not error_message else "%s. %s" % (error_message, msg) | |
194 ctx.vlog(msg) | |
195 summarize_history(ctx, user_gi, history_id) | |
196 else: | |
197 ctx.vlog("Final history state is 'ok'") | |
198 response_kwds = { | |
199 'workflow_id': workflow_id, | |
200 'invocation_id': invocation_id, | |
201 'history_state': final_state, | |
202 'invocation_state': final_invocation_state, | |
203 'error_message': error_message, | |
204 } | |
205 else: | |
206 raise NotImplementedError() | |
207 | |
208 run_response = response_class( | |
209 ctx=ctx, | |
210 runnable=runnable, | |
211 user_gi=user_gi, | |
212 history_id=history_id, | |
213 log=log_contents_str(config), | |
214 **response_kwds | |
215 ) | |
216 output_directory = kwds.get("output_directory", None) | |
217 ctx.vlog("collecting outputs from run...") | |
218 run_response.collect_outputs(ctx, output_directory) | |
219 ctx.vlog("collecting outputs complete") | |
220 return run_response | |
221 | |
222 | |
223 def stage_in(ctx, runnable, config, user_gi, history_id, job_path, **kwds): # noqa C901 | |
224 # only upload objects as files/collections for CWL workflows... | |
225 tool_or_workflow = "tool" if runnable.type != RunnableType.cwl_workflow else "workflow" | |
226 to_posix_lines = runnable.type.is_galaxy_artifact | |
227 job_dict, datasets = PlanemoStagingInterface(ctx, runnable, user_gi, config.version_major).stage( | |
228 tool_or_workflow, | |
229 history_id=history_id, | |
230 job_path=job_path, | |
231 use_path_paste=config.use_path_paste, | |
232 to_posix_lines=to_posix_lines, | |
233 ) | |
234 | |
235 if datasets: | |
236 ctx.vlog("uploaded datasets [%s] for activity, checking history state" % datasets) | |
237 final_state = _wait_for_history(ctx, user_gi, history_id) | |
238 | |
239 for (dataset, path) in datasets: | |
240 dataset_details = user_gi.histories.show_dataset( | |
241 history_id, | |
242 dataset["id"], | |
243 ) | |
244 ctx.vlog("Uploaded dataset for path [%s] with metadata [%s]" % (path, dataset_details)) | |
245 else: | |
246 # Mark uploads as ok because nothing to do. | |
247 final_state = "ok" | |
248 | |
249 ctx.vlog("final state is %s" % final_state) | |
250 if final_state != "ok": | |
251 msg = "Failed to run job final job state is [%s]." % final_state | |
252 summarize_history(ctx, user_gi, history_id) | |
253 raise Exception(msg) | |
254 | |
255 return job_dict, datasets | |
256 | |
257 | |
258 def _file_path_to_name(file_path): | |
259 if file_path is not None: | |
260 name = os.path.basename(file_path) | |
261 else: | |
262 name = "defaultname" | |
263 return name | |
264 | |
265 | |
266 class GalaxyBaseRunResponse(SuccessfulRunResponse): | |
267 | |
268 def __init__( | |
269 self, | |
270 ctx, | |
271 runnable, | |
272 user_gi, | |
273 history_id, | |
274 log, | |
275 ): | |
276 self._ctx = ctx | |
277 self._runnable = runnable | |
278 self._user_gi = user_gi | |
279 self._history_id = history_id | |
280 self._log = log | |
281 | |
282 self._job_info = None | |
283 | |
284 self._outputs_dict = None | |
285 | |
286 def to_galaxy_output(self, output): | |
287 """Convert runnable output to a GalaxyOutput object. | |
288 | |
289 Subclasses for workflow and tool execution override this. | |
290 """ | |
291 raise NotImplementedError() | |
292 | |
293 def _get_extra_files(self, dataset_details): | |
294 extra_files_url = "%s/histories/%s/contents/%s/extra_files" % ( | |
295 self._user_gi.url, self._history_id, dataset_details["id"] | |
296 ) | |
297 extra_files = self._user_gi.jobs._get(url=extra_files_url) | |
298 return extra_files | |
299 | |
300 def _get_metadata(self, history_content_type, content_id): | |
301 if history_content_type == "dataset": | |
302 return self._user_gi.histories.show_dataset( | |
303 self._history_id, | |
304 content_id, | |
305 ) | |
306 elif history_content_type == "dataset_collection": | |
307 return self._user_gi.histories.show_dataset_collection( | |
308 self._history_id, | |
309 content_id, | |
310 ) | |
311 else: | |
312 raise Exception("Unknown history content type encountered [%s]" % history_content_type) | |
313 | |
314 def collect_outputs(self, ctx, output_directory): | |
315 assert self._outputs_dict is None, "collect_outputs pre-condition violated" | |
316 | |
317 outputs_dict = {} | |
318 if not output_directory: | |
319 # TODO: rather than creating a directory just use | |
320 # Galaxy paths if they are available in this | |
321 # configuration. | |
322 output_directory = tempfile.mkdtemp() | |
323 | |
324 def get_dataset(dataset_details, filename=None): | |
325 parent_basename = dataset_details.get("cwl_file_name") | |
326 if not parent_basename: | |
327 parent_basename = dataset_details.get("name") | |
328 file_ext = dataset_details["file_ext"] | |
329 if file_ext == "directory": | |
330 # TODO: rename output_directory to outputs_directory because we can have output directories | |
331 # and this is confusing... | |
332 the_output_directory = os.path.join(output_directory, parent_basename) | |
333 safe_makedirs(the_output_directory) | |
334 destination = self.download_output_to(dataset_details, the_output_directory, filename=filename) | |
335 else: | |
336 destination = self.download_output_to(dataset_details, output_directory, filename=filename) | |
337 if filename is None: | |
338 basename = parent_basename | |
339 else: | |
340 basename = os.path.basename(filename) | |
341 | |
342 return {"path": destination, "basename": basename} | |
343 | |
344 ctx.vlog("collecting outputs to directory %s" % output_directory) | |
345 | |
346 for runnable_output in get_outputs(self._runnable, gi=self._user_gi): | |
347 output_id = runnable_output.get_id() | |
348 if not output_id: | |
349 ctx.vlog("Workflow output identified without an ID (label), skipping") | |
350 continue | |
351 output_dict_value = None | |
352 is_cwl = self._runnable.type in [RunnableType.cwl_workflow, RunnableType.cwl_tool] | |
353 output_src = self.output_src(runnable_output) | |
354 if not is_cwl and output_src["src"] == "hda": | |
355 output_dataset_id = output_src["id"] | |
356 dataset = self._get_metadata("dataset", output_dataset_id) | |
357 dataset_dict = get_dataset(dataset) | |
358 ctx.vlog("populated destination [%s]" % dataset_dict["path"]) | |
359 | |
360 if dataset["file_ext"] == "expression.json": | |
361 with open(dataset_dict["path"], "r") as f: | |
362 output_dict_value = json.load(f) | |
363 else: | |
364 output_dict_value = output_properties(**dataset_dict) | |
365 else: | |
366 output_dataset_id = output_src["id"] | |
367 galaxy_output = self.to_galaxy_output(runnable_output) | |
368 cwl_output = output_to_cwl_json( | |
369 galaxy_output, | |
370 self._get_metadata, | |
371 get_dataset, | |
372 self._get_extra_files, | |
373 pseduo_location=True, | |
374 ) | |
375 if is_cwl: | |
376 output_dict_value = cwl_output | |
377 else: | |
378 | |
379 def attach_file_properties(collection, cwl_output): | |
380 elements = collection["elements"] | |
381 assert len(elements) == len(cwl_output) | |
382 for element, cwl_output_element in zip(elements, cwl_output): | |
383 element["_output_object"] = cwl_output_element | |
384 if isinstance(cwl_output_element, list): | |
385 assert "elements" in element["object"] | |
386 attach_file_properties(element["object"], cwl_output_element) | |
387 | |
388 output_metadata = self._get_metadata("dataset_collection", output_dataset_id) | |
389 attach_file_properties(output_metadata, cwl_output) | |
390 output_dict_value = output_metadata | |
391 | |
392 outputs_dict[output_id] = output_dict_value | |
393 | |
394 self._outputs_dict = outputs_dict | |
395 ctx.vlog("collected outputs [%s]" % self._outputs_dict) | |
396 | |
397 @property | |
398 def log(self): | |
399 return self._log | |
400 | |
401 @property | |
402 def job_info(self): | |
403 if self._job_info is not None: | |
404 return dict( | |
405 stdout=self._job_info["stdout"], | |
406 stderr=self._job_info["stderr"], | |
407 command_line=self._job_info["command_line"], | |
408 ) | |
409 return None | |
410 | |
411 @property | |
412 def invocation_details(self): | |
413 return None | |
414 | |
415 @property | |
416 def outputs_dict(self): | |
417 return self._outputs_dict | |
418 | |
419 def download_output_to(self, dataset_details, output_directory, filename=None): | |
420 if filename is None: | |
421 local_filename = dataset_details.get("cwl_file_name") or dataset_details.get("name") | |
422 else: | |
423 local_filename = filename | |
424 destination = os.path.join(output_directory, local_filename) | |
425 self._history_content_download( | |
426 self._history_id, | |
427 dataset_details["id"], | |
428 to_path=destination, | |
429 filename=filename, | |
430 ) | |
431 return destination | |
432 | |
433 def _history_content_download(self, history_id, dataset_id, to_path, filename=None): | |
434 user_gi = self._user_gi | |
435 url = user_gi.url + "/histories/%s/contents/%s/display" % (history_id, dataset_id) | |
436 | |
437 data = {} | |
438 if filename: | |
439 data["filename"] = filename | |
440 | |
441 r = user_gi.make_get_request(url, params=data, stream=True, timeout=user_gi.timeout) | |
442 r.raise_for_status() | |
443 | |
444 with open(to_path, 'wb') as fp: | |
445 for chunk in r.iter_content(chunk_size=bioblend.CHUNK_SIZE): | |
446 if chunk: | |
447 fp.write(chunk) | |
448 | |
449 | |
450 class GalaxyToolRunResponse(GalaxyBaseRunResponse): | |
451 | |
452 def __init__( | |
453 self, | |
454 ctx, | |
455 runnable, | |
456 user_gi, | |
457 history_id, | |
458 log, | |
459 job_info, | |
460 api_run_response, | |
461 ): | |
462 super(GalaxyToolRunResponse, self).__init__( | |
463 ctx=ctx, | |
464 runnable=runnable, | |
465 user_gi=user_gi, | |
466 history_id=history_id, | |
467 log=log, | |
468 ) | |
469 self._job_info = job_info | |
470 self.api_run_response = api_run_response | |
471 | |
472 def is_collection(self, output): | |
473 # TODO: Make this more rigorous - search both output and output | |
474 # collections - throw an exception if not found in either place instead | |
475 # of just assuming all non-datasets are collections. | |
476 return self.output_src(output)["src"] == "hdca" | |
477 | |
478 def to_galaxy_output(self, runnable_output): | |
479 output_id = runnable_output.get_id() | |
480 return tool_response_to_output(self.api_run_response, self._history_id, output_id) | |
481 | |
482 def output_src(self, output): | |
483 outputs = self.api_run_response["outputs"] | |
484 output_collections = self.api_run_response["output_collections"] | |
485 output_id = output.get_id() | |
486 output_src = None | |
487 self._ctx.vlog("Looking for id [%s] in outputs [%s]" % (output_id, outputs)) | |
488 for output in outputs: | |
489 if output["output_name"] == output_id: | |
490 output_src = {"src": "hda", "id": output["id"]} | |
491 for output_collection in output_collections: | |
492 if output_collection["output_name"] == output_id: | |
493 output_src = {"src": "hdca", "id": output_collection["id"]} | |
494 return output_src | |
495 | |
496 | |
497 class GalaxyWorkflowRunResponse(GalaxyBaseRunResponse): | |
498 | |
499 def __init__( | |
500 self, | |
501 ctx, | |
502 runnable, | |
503 user_gi, | |
504 history_id, | |
505 log, | |
506 workflow_id, | |
507 invocation_id, | |
508 history_state='ok', | |
509 invocation_state='ok', | |
510 error_message=None, | |
511 ): | |
512 super(GalaxyWorkflowRunResponse, self).__init__( | |
513 ctx=ctx, | |
514 runnable=runnable, | |
515 user_gi=user_gi, | |
516 history_id=history_id, | |
517 log=log, | |
518 ) | |
519 self._workflow_id = workflow_id | |
520 self._invocation_id = invocation_id | |
521 self._invocation_details = {} | |
522 self._cached_invocation = None | |
523 self.history_state = history_state | |
524 self.invocation_state = invocation_state | |
525 self.error_message = error_message | |
526 self._invocation_details = self.collect_invocation_details(invocation_id) | |
527 | |
528 def to_galaxy_output(self, runnable_output): | |
529 output_id = runnable_output.get_id() | |
530 self._ctx.vlog("checking for output in invocation [%s]" % self._invocation) | |
531 return invocation_to_output(self._invocation, self._history_id, output_id) | |
532 | |
533 def output_src(self, output): | |
534 invocation = self._invocation | |
535 # Use newer workflow outputs API. | |
536 | |
537 output_name = output.get_id() | |
538 if output_name in invocation["outputs"]: | |
539 return invocation["outputs"][output.get_id()] | |
540 elif output_name in invocation["output_collections"]: | |
541 return invocation["output_collections"][output.get_id()] | |
542 else: | |
543 raise Exception("Failed to find output [%s] in invocation outputs [%s]" % (output_name, invocation["outputs"])) | |
544 | |
545 def collect_invocation_details(self, invocation_id=None): | |
546 gi = self._user_gi | |
547 invocation_steps = {} | |
548 invocation = self.get_invocation(invocation_id) | |
549 for step in invocation['steps']: | |
550 step_label_or_index = "{}. {}".format(step['order_index'], step['workflow_step_label'] or 'Unnamed step') | |
551 workflow_step = gi.invocations.show_invocation_step(self._invocation_id, step['id']) | |
552 workflow_step['subworkflow'] = None | |
553 subworkflow_invocation_id = workflow_step.get('subworkflow_invocation_id') | |
554 if subworkflow_invocation_id: | |
555 workflow_step['subworkflow'] = self.collect_invocation_details(subworkflow_invocation_id) | |
556 workflow_step_job_details = [self._user_gi.jobs.show_job(j['id'], full_details=True) for j in workflow_step['jobs']] | |
557 workflow_step['jobs'] = workflow_step_job_details | |
558 invocation_steps[step_label_or_index] = workflow_step | |
559 return invocation_steps | |
560 | |
561 @property | |
562 def invocation_details(self): | |
563 return self._invocation_details | |
564 | |
565 def get_invocation(self, invocation_id): | |
566 return self._user_gi.invocations.show_invocation(invocation_id) | |
567 | |
568 @property | |
569 def _invocation(self): | |
570 if self._cached_invocation is None: | |
571 self._cached_invocation = self.get_invocation(self._invocation_id) | |
572 return self._cached_invocation | |
573 | |
574 @property | |
575 def was_successful(self): | |
576 return self.history_state == 'ok' and self.invocation_state == 'scheduled' | |
577 | |
578 | |
579 def _tool_id(tool_path): | |
580 tool_source = get_tool_source(tool_path) | |
581 return tool_source.parse_id() | |
582 | |
583 | |
584 def _history_id(gi, **kwds): | |
585 history_id = kwds.get("history_id", None) | |
586 if history_id is None: | |
587 history_name = kwds.get("history_name", DEFAULT_HISTORY_NAME) or DEFAULT_HISTORY_NAME | |
588 history_id = gi.histories.create_history(history_name)["id"] | |
589 return history_id | |
590 | |
591 | |
592 def get_dict_from_workflow(gi, workflow_id): | |
593 return gi.workflows.export_workflow_dict(workflow_id) | |
594 | |
595 | |
596 def _wait_for_invocation(ctx, gi, history_id, workflow_id, invocation_id, polling_backoff=0): | |
597 | |
598 def state_func(): | |
599 return _retry_on_timeouts(ctx, gi, lambda gi: gi.workflows.show_invocation(workflow_id, invocation_id)) | |
600 | |
601 return _wait_on_state(state_func, polling_backoff) | |
602 | |
603 | |
604 def _retry_on_timeouts(ctx, gi, f): | |
605 gi.timeout = 60 | |
606 try_count = 5 | |
607 try: | |
608 for try_num in range(try_count): | |
609 start_time = time.time() | |
610 try: | |
611 return f(gi) | |
612 except RequestException: | |
613 end_time = time.time() | |
614 if end_time - start_time > 45 and (try_num + 1) < try_count: | |
615 ctx.vlog("Galaxy seems to have timedout, retrying to fetch status.") | |
616 continue | |
617 else: | |
618 raise | |
619 finally: | |
620 gi.timeout = None | |
621 | |
622 | |
623 def has_jobs_in_states(ctx, gi, history_id, states): | |
624 params = {"history_id": history_id} | |
625 jobs_url = gi.url + '/jobs' | |
626 jobs = gi.jobs._get(url=jobs_url, params=params) | |
627 target_jobs = [j for j in jobs if j["state"] in states] | |
628 return len(target_jobs) > 0 | |
629 | |
630 | |
631 def _wait_for_history(ctx, gi, history_id, polling_backoff=0): | |
632 # Used to wait for active jobs and then wait for history, but now this is called | |
633 # after upload is complete and after the invocation has been done scheduling - so | |
634 # no need to wait for active jobs anymore I think. | |
635 | |
636 def state_func(): | |
637 return _retry_on_timeouts(ctx, gi, lambda gi: gi.histories.show_history(history_id)) | |
638 | |
639 return _wait_on_state(state_func, polling_backoff) | |
640 | |
641 | |
642 def _wait_for_job(gi, job_id): | |
643 def state_func(): | |
644 return gi.jobs.show_job(job_id, full_details=True) | |
645 | |
646 return _wait_on_state(state_func) | |
647 | |
648 | |
649 def _wait_on_state(state_func, polling_backoff=0): | |
650 | |
651 def get_state(): | |
652 response = state_func() | |
653 state = response["state"] | |
654 if str(state) not in ["running", "queued", "new", "ready"]: | |
655 return state | |
656 else: | |
657 return None | |
658 timeout = 60 * 60 * 24 | |
659 final_state = wait_on(get_state, "state", timeout, polling_backoff) | |
660 return final_state | |
661 | |
662 | |
663 __all__ = ( | |
664 "execute", | |
665 ) |