Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/planemo/galaxy/activity.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
| author | shellac |
|---|---|
| date | Mon, 22 Mar 2021 18:12:50 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:4f3585e2f14b |
|---|---|
| 1 """Module provides generic interface to running Galaxy tools and workflows.""" | |
| 2 | |
| 3 import json | |
| 4 import os | |
| 5 import sys | |
| 6 import tempfile | |
| 7 import time | |
| 8 import traceback | |
| 9 | |
| 10 import bioblend | |
| 11 from bioblend.util import attach_file | |
| 12 from galaxy.tool_util.client.staging import ( | |
| 13 StagingInterace, | |
| 14 ) | |
| 15 from galaxy.tool_util.cwl.util import ( | |
| 16 invocation_to_output, | |
| 17 output_properties, | |
| 18 output_to_cwl_json, | |
| 19 tool_response_to_output, | |
| 20 ) | |
| 21 from galaxy.tool_util.parser import get_tool_source | |
| 22 from galaxy.tool_util.verify.interactor import galaxy_requests_post | |
| 23 from galaxy.util import ( | |
| 24 safe_makedirs, | |
| 25 unicodify, | |
| 26 ) | |
| 27 from requests.exceptions import RequestException | |
| 28 from six.moves.urllib.parse import urljoin | |
| 29 | |
| 30 from planemo.galaxy.api import summarize_history | |
| 31 from planemo.io import wait_on | |
| 32 from planemo.runnable import ( | |
| 33 ErrorRunResponse, | |
| 34 get_outputs, | |
| 35 RunnableType, | |
| 36 SuccessfulRunResponse, | |
| 37 ) | |
| 38 | |
| 39 DEFAULT_HISTORY_NAME = "CWL Target History" | |
| 40 ERR_NO_SUCH_TOOL = ("Failed to find tool with ID [%s] in Galaxy - cannot execute job. " | |
| 41 "You may need to enable verbose logging and determine why the tool did not load. [%s]") | |
| 42 | |
| 43 | |
| 44 def execute(ctx, config, runnable, job_path, **kwds): | |
| 45 """Execute a Galaxy activity.""" | |
| 46 try: | |
| 47 return _execute(ctx, config, runnable, job_path, **kwds) | |
| 48 except Exception as e: | |
| 49 if ctx.verbose: | |
| 50 ctx.vlog("Failed to execute Galaxy activity, throwing ErrorRunResponse") | |
| 51 traceback.print_exc(file=sys.stdout) | |
| 52 return ErrorRunResponse(unicodify(e)) | |
| 53 | |
| 54 | |
| 55 def _verified_tool_id(runnable, user_gi): | |
| 56 tool_id = _tool_id(runnable.path) | |
| 57 try: | |
| 58 user_gi.tools.show_tool(tool_id) | |
| 59 except Exception as e: | |
| 60 raise Exception(ERR_NO_SUCH_TOOL % (tool_id, e)) | |
| 61 return tool_id | |
| 62 | |
| 63 | |
| 64 def _inputs_representation(runnable): | |
| 65 if runnable.type == RunnableType.cwl_tool: | |
| 66 inputs_representation = "cwl" | |
| 67 else: | |
| 68 inputs_representation = "galaxy" | |
| 69 return inputs_representation | |
| 70 | |
| 71 | |
| 72 def log_contents_str(config): | |
| 73 if hasattr(config, "log_contents"): | |
| 74 return config.log_contents | |
| 75 else: | |
| 76 return "No log for this engine type." | |
| 77 | |
| 78 | |
| 79 class PlanemoStagingInterface(StagingInterace): | |
| 80 | |
| 81 def __init__(self, ctx, runnable, user_gi, version_major): | |
| 82 self._ctx = ctx | |
| 83 self._user_gi = user_gi | |
| 84 self._runnable = runnable | |
| 85 self._version_major = version_major | |
| 86 | |
| 87 def _post(self, api_path, payload, files_attached=False): | |
| 88 params = dict(key=self._user_gi.key) | |
| 89 url = urljoin(self._user_gi.url, "api/" + api_path) | |
| 90 return galaxy_requests_post(url, data=payload, params=params, as_json=True).json() | |
| 91 | |
| 92 def _attach_file(self, path): | |
| 93 return attach_file(path) | |
| 94 | |
| 95 def _handle_job(self, job_response): | |
| 96 job_id = job_response["id"] | |
| 97 _wait_for_job(self._user_gi, job_id) | |
| 98 | |
| 99 @property | |
| 100 def use_fetch_api(self): | |
| 101 # hack around this not working for galaxy_tools - why is that :( | |
| 102 return self._runnable.type != RunnableType.galaxy_tool and self._version_major >= "20.09" | |
| 103 | |
| 104 # extension point for planemo to override logging | |
| 105 def _log(self, message): | |
| 106 self._ctx.vlog(message) | |
| 107 | |
| 108 | |
| 109 def _execute(ctx, config, runnable, job_path, **kwds): | |
| 110 user_gi = config.user_gi | |
| 111 admin_gi = config.gi | |
| 112 | |
| 113 history_id = _history_id(user_gi, **kwds) | |
| 114 | |
| 115 try: | |
| 116 job_dict, _ = stage_in(ctx, runnable, config, user_gi, history_id, job_path, **kwds) | |
| 117 except Exception: | |
| 118 ctx.vlog("Problem with staging in data for Galaxy activities...") | |
| 119 raise | |
| 120 if runnable.type in [RunnableType.galaxy_tool, RunnableType.cwl_tool]: | |
| 121 response_class = GalaxyToolRunResponse | |
| 122 tool_id = _verified_tool_id(runnable, user_gi) | |
| 123 inputs_representation = _inputs_representation(runnable) | |
| 124 run_tool_payload = dict( | |
| 125 history_id=history_id, | |
| 126 tool_id=tool_id, | |
| 127 inputs=job_dict, | |
| 128 inputs_representation=inputs_representation, | |
| 129 ) | |
| 130 ctx.vlog("Post to Galaxy tool API with payload [%s]" % run_tool_payload) | |
| 131 tool_run_response = user_gi.tools._post(run_tool_payload) | |
| 132 | |
| 133 job = tool_run_response["jobs"][0] | |
| 134 job_id = job["id"] | |
| 135 try: | |
| 136 final_state = _wait_for_job(user_gi, job_id) | |
| 137 except Exception: | |
| 138 summarize_history(ctx, user_gi, history_id) | |
| 139 raise | |
| 140 if final_state != "ok": | |
| 141 msg = "Failed to run CWL tool job final job state is [%s]." % final_state | |
| 142 summarize_history(ctx, user_gi, history_id) | |
| 143 raise Exception(msg) | |
| 144 | |
| 145 ctx.vlog("Final job state was ok, fetching details for job [%s]" % job_id) | |
| 146 job_info = admin_gi.jobs.show_job(job_id) | |
| 147 response_kwds = { | |
| 148 'job_info': job_info, | |
| 149 'api_run_response': tool_run_response, | |
| 150 } | |
| 151 if ctx.verbose: | |
| 152 summarize_history(ctx, user_gi, history_id) | |
| 153 elif runnable.type in [RunnableType.galaxy_workflow, RunnableType.cwl_workflow]: | |
| 154 response_class = GalaxyWorkflowRunResponse | |
| 155 workflow_id = config.workflow_id_for_runnable(runnable) | |
| 156 ctx.vlog("Found Galaxy workflow ID [%s] for URI [%s]" % (workflow_id, runnable.uri)) | |
| 157 # TODO: Use the following when BioBlend 0.14 is released | |
| 158 # invocation = user_gi.worklfows.invoke_workflow( | |
| 159 # workflow_id, | |
| 160 # inputs=job_dict, | |
| 161 # history_id=history_id, | |
| 162 # allow_tool_state_corrections=True, | |
| 163 # inputs_by="name", | |
| 164 # ) | |
| 165 payload = dict( | |
| 166 workflow_id=workflow_id, | |
| 167 history_id=history_id, | |
| 168 inputs=job_dict, | |
| 169 inputs_by="name", | |
| 170 allow_tool_state_corrections=True, | |
| 171 ) | |
| 172 invocations_url = "%s/workflows/%s/invocations" % ( | |
| 173 user_gi.url, | |
| 174 workflow_id, | |
| 175 ) | |
| 176 invocation = user_gi.workflows._post(payload, url=invocations_url) | |
| 177 invocation_id = invocation["id"] | |
| 178 ctx.vlog("Waiting for invocation [%s]" % invocation_id) | |
| 179 polling_backoff = kwds.get("polling_backoff", 0) | |
| 180 final_invocation_state = 'new' | |
| 181 error_message = "" | |
| 182 try: | |
| 183 final_invocation_state = _wait_for_invocation(ctx, user_gi, history_id, workflow_id, invocation_id, polling_backoff) | |
| 184 assert final_invocation_state == 'scheduled' | |
| 185 except Exception: | |
| 186 ctx.vlog("Problem waiting on invocation...") | |
| 187 summarize_history(ctx, user_gi, history_id) | |
| 188 error_message = "Final invocation state is [%s]" % final_invocation_state | |
| 189 ctx.vlog("Final invocation state is [%s]" % final_invocation_state) | |
| 190 final_state = _wait_for_history(ctx, user_gi, history_id, polling_backoff) | |
| 191 if final_state != "ok": | |
| 192 msg = "Failed to run workflow final history state is [%s]." % final_state | |
| 193 error_message = msg if not error_message else "%s. %s" % (error_message, msg) | |
| 194 ctx.vlog(msg) | |
| 195 summarize_history(ctx, user_gi, history_id) | |
| 196 else: | |
| 197 ctx.vlog("Final history state is 'ok'") | |
| 198 response_kwds = { | |
| 199 'workflow_id': workflow_id, | |
| 200 'invocation_id': invocation_id, | |
| 201 'history_state': final_state, | |
| 202 'invocation_state': final_invocation_state, | |
| 203 'error_message': error_message, | |
| 204 } | |
| 205 else: | |
| 206 raise NotImplementedError() | |
| 207 | |
| 208 run_response = response_class( | |
| 209 ctx=ctx, | |
| 210 runnable=runnable, | |
| 211 user_gi=user_gi, | |
| 212 history_id=history_id, | |
| 213 log=log_contents_str(config), | |
| 214 **response_kwds | |
| 215 ) | |
| 216 output_directory = kwds.get("output_directory", None) | |
| 217 ctx.vlog("collecting outputs from run...") | |
| 218 run_response.collect_outputs(ctx, output_directory) | |
| 219 ctx.vlog("collecting outputs complete") | |
| 220 return run_response | |
| 221 | |
| 222 | |
| 223 def stage_in(ctx, runnable, config, user_gi, history_id, job_path, **kwds): # noqa C901 | |
| 224 # only upload objects as files/collections for CWL workflows... | |
| 225 tool_or_workflow = "tool" if runnable.type != RunnableType.cwl_workflow else "workflow" | |
| 226 to_posix_lines = runnable.type.is_galaxy_artifact | |
| 227 job_dict, datasets = PlanemoStagingInterface(ctx, runnable, user_gi, config.version_major).stage( | |
| 228 tool_or_workflow, | |
| 229 history_id=history_id, | |
| 230 job_path=job_path, | |
| 231 use_path_paste=config.use_path_paste, | |
| 232 to_posix_lines=to_posix_lines, | |
| 233 ) | |
| 234 | |
| 235 if datasets: | |
| 236 ctx.vlog("uploaded datasets [%s] for activity, checking history state" % datasets) | |
| 237 final_state = _wait_for_history(ctx, user_gi, history_id) | |
| 238 | |
| 239 for (dataset, path) in datasets: | |
| 240 dataset_details = user_gi.histories.show_dataset( | |
| 241 history_id, | |
| 242 dataset["id"], | |
| 243 ) | |
| 244 ctx.vlog("Uploaded dataset for path [%s] with metadata [%s]" % (path, dataset_details)) | |
| 245 else: | |
| 246 # Mark uploads as ok because nothing to do. | |
| 247 final_state = "ok" | |
| 248 | |
| 249 ctx.vlog("final state is %s" % final_state) | |
| 250 if final_state != "ok": | |
| 251 msg = "Failed to run job final job state is [%s]." % final_state | |
| 252 summarize_history(ctx, user_gi, history_id) | |
| 253 raise Exception(msg) | |
| 254 | |
| 255 return job_dict, datasets | |
| 256 | |
| 257 | |
| 258 def _file_path_to_name(file_path): | |
| 259 if file_path is not None: | |
| 260 name = os.path.basename(file_path) | |
| 261 else: | |
| 262 name = "defaultname" | |
| 263 return name | |
| 264 | |
| 265 | |
| 266 class GalaxyBaseRunResponse(SuccessfulRunResponse): | |
| 267 | |
| 268 def __init__( | |
| 269 self, | |
| 270 ctx, | |
| 271 runnable, | |
| 272 user_gi, | |
| 273 history_id, | |
| 274 log, | |
| 275 ): | |
| 276 self._ctx = ctx | |
| 277 self._runnable = runnable | |
| 278 self._user_gi = user_gi | |
| 279 self._history_id = history_id | |
| 280 self._log = log | |
| 281 | |
| 282 self._job_info = None | |
| 283 | |
| 284 self._outputs_dict = None | |
| 285 | |
| 286 def to_galaxy_output(self, output): | |
| 287 """Convert runnable output to a GalaxyOutput object. | |
| 288 | |
| 289 Subclasses for workflow and tool execution override this. | |
| 290 """ | |
| 291 raise NotImplementedError() | |
| 292 | |
| 293 def _get_extra_files(self, dataset_details): | |
| 294 extra_files_url = "%s/histories/%s/contents/%s/extra_files" % ( | |
| 295 self._user_gi.url, self._history_id, dataset_details["id"] | |
| 296 ) | |
| 297 extra_files = self._user_gi.jobs._get(url=extra_files_url) | |
| 298 return extra_files | |
| 299 | |
| 300 def _get_metadata(self, history_content_type, content_id): | |
| 301 if history_content_type == "dataset": | |
| 302 return self._user_gi.histories.show_dataset( | |
| 303 self._history_id, | |
| 304 content_id, | |
| 305 ) | |
| 306 elif history_content_type == "dataset_collection": | |
| 307 return self._user_gi.histories.show_dataset_collection( | |
| 308 self._history_id, | |
| 309 content_id, | |
| 310 ) | |
| 311 else: | |
| 312 raise Exception("Unknown history content type encountered [%s]" % history_content_type) | |
| 313 | |
| 314 def collect_outputs(self, ctx, output_directory): | |
| 315 assert self._outputs_dict is None, "collect_outputs pre-condition violated" | |
| 316 | |
| 317 outputs_dict = {} | |
| 318 if not output_directory: | |
| 319 # TODO: rather than creating a directory just use | |
| 320 # Galaxy paths if they are available in this | |
| 321 # configuration. | |
| 322 output_directory = tempfile.mkdtemp() | |
| 323 | |
| 324 def get_dataset(dataset_details, filename=None): | |
| 325 parent_basename = dataset_details.get("cwl_file_name") | |
| 326 if not parent_basename: | |
| 327 parent_basename = dataset_details.get("name") | |
| 328 file_ext = dataset_details["file_ext"] | |
| 329 if file_ext == "directory": | |
| 330 # TODO: rename output_directory to outputs_directory because we can have output directories | |
| 331 # and this is confusing... | |
| 332 the_output_directory = os.path.join(output_directory, parent_basename) | |
| 333 safe_makedirs(the_output_directory) | |
| 334 destination = self.download_output_to(dataset_details, the_output_directory, filename=filename) | |
| 335 else: | |
| 336 destination = self.download_output_to(dataset_details, output_directory, filename=filename) | |
| 337 if filename is None: | |
| 338 basename = parent_basename | |
| 339 else: | |
| 340 basename = os.path.basename(filename) | |
| 341 | |
| 342 return {"path": destination, "basename": basename} | |
| 343 | |
| 344 ctx.vlog("collecting outputs to directory %s" % output_directory) | |
| 345 | |
| 346 for runnable_output in get_outputs(self._runnable, gi=self._user_gi): | |
| 347 output_id = runnable_output.get_id() | |
| 348 if not output_id: | |
| 349 ctx.vlog("Workflow output identified without an ID (label), skipping") | |
| 350 continue | |
| 351 output_dict_value = None | |
| 352 is_cwl = self._runnable.type in [RunnableType.cwl_workflow, RunnableType.cwl_tool] | |
| 353 output_src = self.output_src(runnable_output) | |
| 354 if not is_cwl and output_src["src"] == "hda": | |
| 355 output_dataset_id = output_src["id"] | |
| 356 dataset = self._get_metadata("dataset", output_dataset_id) | |
| 357 dataset_dict = get_dataset(dataset) | |
| 358 ctx.vlog("populated destination [%s]" % dataset_dict["path"]) | |
| 359 | |
| 360 if dataset["file_ext"] == "expression.json": | |
| 361 with open(dataset_dict["path"], "r") as f: | |
| 362 output_dict_value = json.load(f) | |
| 363 else: | |
| 364 output_dict_value = output_properties(**dataset_dict) | |
| 365 else: | |
| 366 output_dataset_id = output_src["id"] | |
| 367 galaxy_output = self.to_galaxy_output(runnable_output) | |
| 368 cwl_output = output_to_cwl_json( | |
| 369 galaxy_output, | |
| 370 self._get_metadata, | |
| 371 get_dataset, | |
| 372 self._get_extra_files, | |
| 373 pseduo_location=True, | |
| 374 ) | |
| 375 if is_cwl: | |
| 376 output_dict_value = cwl_output | |
| 377 else: | |
| 378 | |
| 379 def attach_file_properties(collection, cwl_output): | |
| 380 elements = collection["elements"] | |
| 381 assert len(elements) == len(cwl_output) | |
| 382 for element, cwl_output_element in zip(elements, cwl_output): | |
| 383 element["_output_object"] = cwl_output_element | |
| 384 if isinstance(cwl_output_element, list): | |
| 385 assert "elements" in element["object"] | |
| 386 attach_file_properties(element["object"], cwl_output_element) | |
| 387 | |
| 388 output_metadata = self._get_metadata("dataset_collection", output_dataset_id) | |
| 389 attach_file_properties(output_metadata, cwl_output) | |
| 390 output_dict_value = output_metadata | |
| 391 | |
| 392 outputs_dict[output_id] = output_dict_value | |
| 393 | |
| 394 self._outputs_dict = outputs_dict | |
| 395 ctx.vlog("collected outputs [%s]" % self._outputs_dict) | |
| 396 | |
| 397 @property | |
| 398 def log(self): | |
| 399 return self._log | |
| 400 | |
| 401 @property | |
| 402 def job_info(self): | |
| 403 if self._job_info is not None: | |
| 404 return dict( | |
| 405 stdout=self._job_info["stdout"], | |
| 406 stderr=self._job_info["stderr"], | |
| 407 command_line=self._job_info["command_line"], | |
| 408 ) | |
| 409 return None | |
| 410 | |
| 411 @property | |
| 412 def invocation_details(self): | |
| 413 return None | |
| 414 | |
| 415 @property | |
| 416 def outputs_dict(self): | |
| 417 return self._outputs_dict | |
| 418 | |
| 419 def download_output_to(self, dataset_details, output_directory, filename=None): | |
| 420 if filename is None: | |
| 421 local_filename = dataset_details.get("cwl_file_name") or dataset_details.get("name") | |
| 422 else: | |
| 423 local_filename = filename | |
| 424 destination = os.path.join(output_directory, local_filename) | |
| 425 self._history_content_download( | |
| 426 self._history_id, | |
| 427 dataset_details["id"], | |
| 428 to_path=destination, | |
| 429 filename=filename, | |
| 430 ) | |
| 431 return destination | |
| 432 | |
| 433 def _history_content_download(self, history_id, dataset_id, to_path, filename=None): | |
| 434 user_gi = self._user_gi | |
| 435 url = user_gi.url + "/histories/%s/contents/%s/display" % (history_id, dataset_id) | |
| 436 | |
| 437 data = {} | |
| 438 if filename: | |
| 439 data["filename"] = filename | |
| 440 | |
| 441 r = user_gi.make_get_request(url, params=data, stream=True, timeout=user_gi.timeout) | |
| 442 r.raise_for_status() | |
| 443 | |
| 444 with open(to_path, 'wb') as fp: | |
| 445 for chunk in r.iter_content(chunk_size=bioblend.CHUNK_SIZE): | |
| 446 if chunk: | |
| 447 fp.write(chunk) | |
| 448 | |
| 449 | |
| 450 class GalaxyToolRunResponse(GalaxyBaseRunResponse): | |
| 451 | |
| 452 def __init__( | |
| 453 self, | |
| 454 ctx, | |
| 455 runnable, | |
| 456 user_gi, | |
| 457 history_id, | |
| 458 log, | |
| 459 job_info, | |
| 460 api_run_response, | |
| 461 ): | |
| 462 super(GalaxyToolRunResponse, self).__init__( | |
| 463 ctx=ctx, | |
| 464 runnable=runnable, | |
| 465 user_gi=user_gi, | |
| 466 history_id=history_id, | |
| 467 log=log, | |
| 468 ) | |
| 469 self._job_info = job_info | |
| 470 self.api_run_response = api_run_response | |
| 471 | |
| 472 def is_collection(self, output): | |
| 473 # TODO: Make this more rigorous - search both output and output | |
| 474 # collections - throw an exception if not found in either place instead | |
| 475 # of just assuming all non-datasets are collections. | |
| 476 return self.output_src(output)["src"] == "hdca" | |
| 477 | |
| 478 def to_galaxy_output(self, runnable_output): | |
| 479 output_id = runnable_output.get_id() | |
| 480 return tool_response_to_output(self.api_run_response, self._history_id, output_id) | |
| 481 | |
| 482 def output_src(self, output): | |
| 483 outputs = self.api_run_response["outputs"] | |
| 484 output_collections = self.api_run_response["output_collections"] | |
| 485 output_id = output.get_id() | |
| 486 output_src = None | |
| 487 self._ctx.vlog("Looking for id [%s] in outputs [%s]" % (output_id, outputs)) | |
| 488 for output in outputs: | |
| 489 if output["output_name"] == output_id: | |
| 490 output_src = {"src": "hda", "id": output["id"]} | |
| 491 for output_collection in output_collections: | |
| 492 if output_collection["output_name"] == output_id: | |
| 493 output_src = {"src": "hdca", "id": output_collection["id"]} | |
| 494 return output_src | |
| 495 | |
| 496 | |
| 497 class GalaxyWorkflowRunResponse(GalaxyBaseRunResponse): | |
| 498 | |
| 499 def __init__( | |
| 500 self, | |
| 501 ctx, | |
| 502 runnable, | |
| 503 user_gi, | |
| 504 history_id, | |
| 505 log, | |
| 506 workflow_id, | |
| 507 invocation_id, | |
| 508 history_state='ok', | |
| 509 invocation_state='ok', | |
| 510 error_message=None, | |
| 511 ): | |
| 512 super(GalaxyWorkflowRunResponse, self).__init__( | |
| 513 ctx=ctx, | |
| 514 runnable=runnable, | |
| 515 user_gi=user_gi, | |
| 516 history_id=history_id, | |
| 517 log=log, | |
| 518 ) | |
| 519 self._workflow_id = workflow_id | |
| 520 self._invocation_id = invocation_id | |
| 521 self._invocation_details = {} | |
| 522 self._cached_invocation = None | |
| 523 self.history_state = history_state | |
| 524 self.invocation_state = invocation_state | |
| 525 self.error_message = error_message | |
| 526 self._invocation_details = self.collect_invocation_details(invocation_id) | |
| 527 | |
| 528 def to_galaxy_output(self, runnable_output): | |
| 529 output_id = runnable_output.get_id() | |
| 530 self._ctx.vlog("checking for output in invocation [%s]" % self._invocation) | |
| 531 return invocation_to_output(self._invocation, self._history_id, output_id) | |
| 532 | |
| 533 def output_src(self, output): | |
| 534 invocation = self._invocation | |
| 535 # Use newer workflow outputs API. | |
| 536 | |
| 537 output_name = output.get_id() | |
| 538 if output_name in invocation["outputs"]: | |
| 539 return invocation["outputs"][output.get_id()] | |
| 540 elif output_name in invocation["output_collections"]: | |
| 541 return invocation["output_collections"][output.get_id()] | |
| 542 else: | |
| 543 raise Exception("Failed to find output [%s] in invocation outputs [%s]" % (output_name, invocation["outputs"])) | |
| 544 | |
| 545 def collect_invocation_details(self, invocation_id=None): | |
| 546 gi = self._user_gi | |
| 547 invocation_steps = {} | |
| 548 invocation = self.get_invocation(invocation_id) | |
| 549 for step in invocation['steps']: | |
| 550 step_label_or_index = "{}. {}".format(step['order_index'], step['workflow_step_label'] or 'Unnamed step') | |
| 551 workflow_step = gi.invocations.show_invocation_step(self._invocation_id, step['id']) | |
| 552 workflow_step['subworkflow'] = None | |
| 553 subworkflow_invocation_id = workflow_step.get('subworkflow_invocation_id') | |
| 554 if subworkflow_invocation_id: | |
| 555 workflow_step['subworkflow'] = self.collect_invocation_details(subworkflow_invocation_id) | |
| 556 workflow_step_job_details = [self._user_gi.jobs.show_job(j['id'], full_details=True) for j in workflow_step['jobs']] | |
| 557 workflow_step['jobs'] = workflow_step_job_details | |
| 558 invocation_steps[step_label_or_index] = workflow_step | |
| 559 return invocation_steps | |
| 560 | |
| 561 @property | |
| 562 def invocation_details(self): | |
| 563 return self._invocation_details | |
| 564 | |
| 565 def get_invocation(self, invocation_id): | |
| 566 return self._user_gi.invocations.show_invocation(invocation_id) | |
| 567 | |
| 568 @property | |
| 569 def _invocation(self): | |
| 570 if self._cached_invocation is None: | |
| 571 self._cached_invocation = self.get_invocation(self._invocation_id) | |
| 572 return self._cached_invocation | |
| 573 | |
| 574 @property | |
| 575 def was_successful(self): | |
| 576 return self.history_state == 'ok' and self.invocation_state == 'scheduled' | |
| 577 | |
| 578 | |
| 579 def _tool_id(tool_path): | |
| 580 tool_source = get_tool_source(tool_path) | |
| 581 return tool_source.parse_id() | |
| 582 | |
| 583 | |
| 584 def _history_id(gi, **kwds): | |
| 585 history_id = kwds.get("history_id", None) | |
| 586 if history_id is None: | |
| 587 history_name = kwds.get("history_name", DEFAULT_HISTORY_NAME) or DEFAULT_HISTORY_NAME | |
| 588 history_id = gi.histories.create_history(history_name)["id"] | |
| 589 return history_id | |
| 590 | |
| 591 | |
| 592 def get_dict_from_workflow(gi, workflow_id): | |
| 593 return gi.workflows.export_workflow_dict(workflow_id) | |
| 594 | |
| 595 | |
| 596 def _wait_for_invocation(ctx, gi, history_id, workflow_id, invocation_id, polling_backoff=0): | |
| 597 | |
| 598 def state_func(): | |
| 599 return _retry_on_timeouts(ctx, gi, lambda gi: gi.workflows.show_invocation(workflow_id, invocation_id)) | |
| 600 | |
| 601 return _wait_on_state(state_func, polling_backoff) | |
| 602 | |
| 603 | |
| 604 def _retry_on_timeouts(ctx, gi, f): | |
| 605 gi.timeout = 60 | |
| 606 try_count = 5 | |
| 607 try: | |
| 608 for try_num in range(try_count): | |
| 609 start_time = time.time() | |
| 610 try: | |
| 611 return f(gi) | |
| 612 except RequestException: | |
| 613 end_time = time.time() | |
| 614 if end_time - start_time > 45 and (try_num + 1) < try_count: | |
| 615 ctx.vlog("Galaxy seems to have timedout, retrying to fetch status.") | |
| 616 continue | |
| 617 else: | |
| 618 raise | |
| 619 finally: | |
| 620 gi.timeout = None | |
| 621 | |
| 622 | |
| 623 def has_jobs_in_states(ctx, gi, history_id, states): | |
| 624 params = {"history_id": history_id} | |
| 625 jobs_url = gi.url + '/jobs' | |
| 626 jobs = gi.jobs._get(url=jobs_url, params=params) | |
| 627 target_jobs = [j for j in jobs if j["state"] in states] | |
| 628 return len(target_jobs) > 0 | |
| 629 | |
| 630 | |
| 631 def _wait_for_history(ctx, gi, history_id, polling_backoff=0): | |
| 632 # Used to wait for active jobs and then wait for history, but now this is called | |
| 633 # after upload is complete and after the invocation has been done scheduling - so | |
| 634 # no need to wait for active jobs anymore I think. | |
| 635 | |
| 636 def state_func(): | |
| 637 return _retry_on_timeouts(ctx, gi, lambda gi: gi.histories.show_history(history_id)) | |
| 638 | |
| 639 return _wait_on_state(state_func, polling_backoff) | |
| 640 | |
| 641 | |
| 642 def _wait_for_job(gi, job_id): | |
| 643 def state_func(): | |
| 644 return gi.jobs.show_job(job_id, full_details=True) | |
| 645 | |
| 646 return _wait_on_state(state_func) | |
| 647 | |
| 648 | |
| 649 def _wait_on_state(state_func, polling_backoff=0): | |
| 650 | |
| 651 def get_state(): | |
| 652 response = state_func() | |
| 653 state = response["state"] | |
| 654 if str(state) not in ["running", "queued", "new", "ready"]: | |
| 655 return state | |
| 656 else: | |
| 657 return None | |
| 658 timeout = 60 * 60 * 24 | |
| 659 final_state = wait_on(get_state, "state", timeout, polling_backoff) | |
| 660 return final_state | |
| 661 | |
| 662 | |
| 663 __all__ = ( | |
| 664 "execute", | |
| 665 ) |
