comparison env/lib/python3.9/site-packages/planemo/galaxy/activity.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000 (2021-03-22)
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """Module provides generic interface to running Galaxy tools and workflows."""
2
3 import json
4 import os
5 import sys
6 import tempfile
7 import time
8 import traceback
9
10 import bioblend
11 from bioblend.util import attach_file
12 from galaxy.tool_util.client.staging import (
13 StagingInterace,
14 )
15 from galaxy.tool_util.cwl.util import (
16 invocation_to_output,
17 output_properties,
18 output_to_cwl_json,
19 tool_response_to_output,
20 )
21 from galaxy.tool_util.parser import get_tool_source
22 from galaxy.tool_util.verify.interactor import galaxy_requests_post
23 from galaxy.util import (
24 safe_makedirs,
25 unicodify,
26 )
27 from requests.exceptions import RequestException
28 from six.moves.urllib.parse import urljoin
29
30 from planemo.galaxy.api import summarize_history
31 from planemo.io import wait_on
32 from planemo.runnable import (
33 ErrorRunResponse,
34 get_outputs,
35 RunnableType,
36 SuccessfulRunResponse,
37 )
38
39 DEFAULT_HISTORY_NAME = "CWL Target History"
40 ERR_NO_SUCH_TOOL = ("Failed to find tool with ID [%s] in Galaxy - cannot execute job. "
41 "You may need to enable verbose logging and determine why the tool did not load. [%s]")
42
43
44 def execute(ctx, config, runnable, job_path, **kwds):
45 """Execute a Galaxy activity."""
46 try:
47 return _execute(ctx, config, runnable, job_path, **kwds)
48 except Exception as e:
49 if ctx.verbose:
50 ctx.vlog("Failed to execute Galaxy activity, throwing ErrorRunResponse")
51 traceback.print_exc(file=sys.stdout)
52 return ErrorRunResponse(unicodify(e))
53
54
55 def _verified_tool_id(runnable, user_gi):
56 tool_id = _tool_id(runnable.path)
57 try:
58 user_gi.tools.show_tool(tool_id)
59 except Exception as e:
60 raise Exception(ERR_NO_SUCH_TOOL % (tool_id, e))
61 return tool_id
62
63
64 def _inputs_representation(runnable):
65 if runnable.type == RunnableType.cwl_tool:
66 inputs_representation = "cwl"
67 else:
68 inputs_representation = "galaxy"
69 return inputs_representation
70
71
72 def log_contents_str(config):
73 if hasattr(config, "log_contents"):
74 return config.log_contents
75 else:
76 return "No log for this engine type."
77
78
79 class PlanemoStagingInterface(StagingInterace):
80
81 def __init__(self, ctx, runnable, user_gi, version_major):
82 self._ctx = ctx
83 self._user_gi = user_gi
84 self._runnable = runnable
85 self._version_major = version_major
86
87 def _post(self, api_path, payload, files_attached=False):
88 params = dict(key=self._user_gi.key)
89 url = urljoin(self._user_gi.url, "api/" + api_path)
90 return galaxy_requests_post(url, data=payload, params=params, as_json=True).json()
91
92 def _attach_file(self, path):
93 return attach_file(path)
94
95 def _handle_job(self, job_response):
96 job_id = job_response["id"]
97 _wait_for_job(self._user_gi, job_id)
98
99 @property
100 def use_fetch_api(self):
101 # hack around this not working for galaxy_tools - why is that :(
102 return self._runnable.type != RunnableType.galaxy_tool and self._version_major >= "20.09"
103
104 # extension point for planemo to override logging
105 def _log(self, message):
106 self._ctx.vlog(message)
107
108
109 def _execute(ctx, config, runnable, job_path, **kwds):
110 user_gi = config.user_gi
111 admin_gi = config.gi
112
113 history_id = _history_id(user_gi, **kwds)
114
115 try:
116 job_dict, _ = stage_in(ctx, runnable, config, user_gi, history_id, job_path, **kwds)
117 except Exception:
118 ctx.vlog("Problem with staging in data for Galaxy activities...")
119 raise
120 if runnable.type in [RunnableType.galaxy_tool, RunnableType.cwl_tool]:
121 response_class = GalaxyToolRunResponse
122 tool_id = _verified_tool_id(runnable, user_gi)
123 inputs_representation = _inputs_representation(runnable)
124 run_tool_payload = dict(
125 history_id=history_id,
126 tool_id=tool_id,
127 inputs=job_dict,
128 inputs_representation=inputs_representation,
129 )
130 ctx.vlog("Post to Galaxy tool API with payload [%s]" % run_tool_payload)
131 tool_run_response = user_gi.tools._post(run_tool_payload)
132
133 job = tool_run_response["jobs"][0]
134 job_id = job["id"]
135 try:
136 final_state = _wait_for_job(user_gi, job_id)
137 except Exception:
138 summarize_history(ctx, user_gi, history_id)
139 raise
140 if final_state != "ok":
141 msg = "Failed to run CWL tool job final job state is [%s]." % final_state
142 summarize_history(ctx, user_gi, history_id)
143 raise Exception(msg)
144
145 ctx.vlog("Final job state was ok, fetching details for job [%s]" % job_id)
146 job_info = admin_gi.jobs.show_job(job_id)
147 response_kwds = {
148 'job_info': job_info,
149 'api_run_response': tool_run_response,
150 }
151 if ctx.verbose:
152 summarize_history(ctx, user_gi, history_id)
153 elif runnable.type in [RunnableType.galaxy_workflow, RunnableType.cwl_workflow]:
154 response_class = GalaxyWorkflowRunResponse
155 workflow_id = config.workflow_id_for_runnable(runnable)
156 ctx.vlog("Found Galaxy workflow ID [%s] for URI [%s]" % (workflow_id, runnable.uri))
157 # TODO: Use the following when BioBlend 0.14 is released
158 # invocation = user_gi.worklfows.invoke_workflow(
159 # workflow_id,
160 # inputs=job_dict,
161 # history_id=history_id,
162 # allow_tool_state_corrections=True,
163 # inputs_by="name",
164 # )
165 payload = dict(
166 workflow_id=workflow_id,
167 history_id=history_id,
168 inputs=job_dict,
169 inputs_by="name",
170 allow_tool_state_corrections=True,
171 )
172 invocations_url = "%s/workflows/%s/invocations" % (
173 user_gi.url,
174 workflow_id,
175 )
176 invocation = user_gi.workflows._post(payload, url=invocations_url)
177 invocation_id = invocation["id"]
178 ctx.vlog("Waiting for invocation [%s]" % invocation_id)
179 polling_backoff = kwds.get("polling_backoff", 0)
180 final_invocation_state = 'new'
181 error_message = ""
182 try:
183 final_invocation_state = _wait_for_invocation(ctx, user_gi, history_id, workflow_id, invocation_id, polling_backoff)
184 assert final_invocation_state == 'scheduled'
185 except Exception:
186 ctx.vlog("Problem waiting on invocation...")
187 summarize_history(ctx, user_gi, history_id)
188 error_message = "Final invocation state is [%s]" % final_invocation_state
189 ctx.vlog("Final invocation state is [%s]" % final_invocation_state)
190 final_state = _wait_for_history(ctx, user_gi, history_id, polling_backoff)
191 if final_state != "ok":
192 msg = "Failed to run workflow final history state is [%s]." % final_state
193 error_message = msg if not error_message else "%s. %s" % (error_message, msg)
194 ctx.vlog(msg)
195 summarize_history(ctx, user_gi, history_id)
196 else:
197 ctx.vlog("Final history state is 'ok'")
198 response_kwds = {
199 'workflow_id': workflow_id,
200 'invocation_id': invocation_id,
201 'history_state': final_state,
202 'invocation_state': final_invocation_state,
203 'error_message': error_message,
204 }
205 else:
206 raise NotImplementedError()
207
208 run_response = response_class(
209 ctx=ctx,
210 runnable=runnable,
211 user_gi=user_gi,
212 history_id=history_id,
213 log=log_contents_str(config),
214 **response_kwds
215 )
216 output_directory = kwds.get("output_directory", None)
217 ctx.vlog("collecting outputs from run...")
218 run_response.collect_outputs(ctx, output_directory)
219 ctx.vlog("collecting outputs complete")
220 return run_response
221
222
223 def stage_in(ctx, runnable, config, user_gi, history_id, job_path, **kwds): # noqa C901
224 # only upload objects as files/collections for CWL workflows...
225 tool_or_workflow = "tool" if runnable.type != RunnableType.cwl_workflow else "workflow"
226 to_posix_lines = runnable.type.is_galaxy_artifact
227 job_dict, datasets = PlanemoStagingInterface(ctx, runnable, user_gi, config.version_major).stage(
228 tool_or_workflow,
229 history_id=history_id,
230 job_path=job_path,
231 use_path_paste=config.use_path_paste,
232 to_posix_lines=to_posix_lines,
233 )
234
235 if datasets:
236 ctx.vlog("uploaded datasets [%s] for activity, checking history state" % datasets)
237 final_state = _wait_for_history(ctx, user_gi, history_id)
238
239 for (dataset, path) in datasets:
240 dataset_details = user_gi.histories.show_dataset(
241 history_id,
242 dataset["id"],
243 )
244 ctx.vlog("Uploaded dataset for path [%s] with metadata [%s]" % (path, dataset_details))
245 else:
246 # Mark uploads as ok because nothing to do.
247 final_state = "ok"
248
249 ctx.vlog("final state is %s" % final_state)
250 if final_state != "ok":
251 msg = "Failed to run job final job state is [%s]." % final_state
252 summarize_history(ctx, user_gi, history_id)
253 raise Exception(msg)
254
255 return job_dict, datasets
256
257
258 def _file_path_to_name(file_path):
259 if file_path is not None:
260 name = os.path.basename(file_path)
261 else:
262 name = "defaultname"
263 return name
264
265
266 class GalaxyBaseRunResponse(SuccessfulRunResponse):
267
268 def __init__(
269 self,
270 ctx,
271 runnable,
272 user_gi,
273 history_id,
274 log,
275 ):
276 self._ctx = ctx
277 self._runnable = runnable
278 self._user_gi = user_gi
279 self._history_id = history_id
280 self._log = log
281
282 self._job_info = None
283
284 self._outputs_dict = None
285
286 def to_galaxy_output(self, output):
287 """Convert runnable output to a GalaxyOutput object.
288
289 Subclasses for workflow and tool execution override this.
290 """
291 raise NotImplementedError()
292
293 def _get_extra_files(self, dataset_details):
294 extra_files_url = "%s/histories/%s/contents/%s/extra_files" % (
295 self._user_gi.url, self._history_id, dataset_details["id"]
296 )
297 extra_files = self._user_gi.jobs._get(url=extra_files_url)
298 return extra_files
299
300 def _get_metadata(self, history_content_type, content_id):
301 if history_content_type == "dataset":
302 return self._user_gi.histories.show_dataset(
303 self._history_id,
304 content_id,
305 )
306 elif history_content_type == "dataset_collection":
307 return self._user_gi.histories.show_dataset_collection(
308 self._history_id,
309 content_id,
310 )
311 else:
312 raise Exception("Unknown history content type encountered [%s]" % history_content_type)
313
314 def collect_outputs(self, ctx, output_directory):
315 assert self._outputs_dict is None, "collect_outputs pre-condition violated"
316
317 outputs_dict = {}
318 if not output_directory:
319 # TODO: rather than creating a directory just use
320 # Galaxy paths if they are available in this
321 # configuration.
322 output_directory = tempfile.mkdtemp()
323
324 def get_dataset(dataset_details, filename=None):
325 parent_basename = dataset_details.get("cwl_file_name")
326 if not parent_basename:
327 parent_basename = dataset_details.get("name")
328 file_ext = dataset_details["file_ext"]
329 if file_ext == "directory":
330 # TODO: rename output_directory to outputs_directory because we can have output directories
331 # and this is confusing...
332 the_output_directory = os.path.join(output_directory, parent_basename)
333 safe_makedirs(the_output_directory)
334 destination = self.download_output_to(dataset_details, the_output_directory, filename=filename)
335 else:
336 destination = self.download_output_to(dataset_details, output_directory, filename=filename)
337 if filename is None:
338 basename = parent_basename
339 else:
340 basename = os.path.basename(filename)
341
342 return {"path": destination, "basename": basename}
343
344 ctx.vlog("collecting outputs to directory %s" % output_directory)
345
346 for runnable_output in get_outputs(self._runnable, gi=self._user_gi):
347 output_id = runnable_output.get_id()
348 if not output_id:
349 ctx.vlog("Workflow output identified without an ID (label), skipping")
350 continue
351 output_dict_value = None
352 is_cwl = self._runnable.type in [RunnableType.cwl_workflow, RunnableType.cwl_tool]
353 output_src = self.output_src(runnable_output)
354 if not is_cwl and output_src["src"] == "hda":
355 output_dataset_id = output_src["id"]
356 dataset = self._get_metadata("dataset", output_dataset_id)
357 dataset_dict = get_dataset(dataset)
358 ctx.vlog("populated destination [%s]" % dataset_dict["path"])
359
360 if dataset["file_ext"] == "expression.json":
361 with open(dataset_dict["path"], "r") as f:
362 output_dict_value = json.load(f)
363 else:
364 output_dict_value = output_properties(**dataset_dict)
365 else:
366 output_dataset_id = output_src["id"]
367 galaxy_output = self.to_galaxy_output(runnable_output)
368 cwl_output = output_to_cwl_json(
369 galaxy_output,
370 self._get_metadata,
371 get_dataset,
372 self._get_extra_files,
373 pseduo_location=True,
374 )
375 if is_cwl:
376 output_dict_value = cwl_output
377 else:
378
379 def attach_file_properties(collection, cwl_output):
380 elements = collection["elements"]
381 assert len(elements) == len(cwl_output)
382 for element, cwl_output_element in zip(elements, cwl_output):
383 element["_output_object"] = cwl_output_element
384 if isinstance(cwl_output_element, list):
385 assert "elements" in element["object"]
386 attach_file_properties(element["object"], cwl_output_element)
387
388 output_metadata = self._get_metadata("dataset_collection", output_dataset_id)
389 attach_file_properties(output_metadata, cwl_output)
390 output_dict_value = output_metadata
391
392 outputs_dict[output_id] = output_dict_value
393
394 self._outputs_dict = outputs_dict
395 ctx.vlog("collected outputs [%s]" % self._outputs_dict)
396
397 @property
398 def log(self):
399 return self._log
400
401 @property
402 def job_info(self):
403 if self._job_info is not None:
404 return dict(
405 stdout=self._job_info["stdout"],
406 stderr=self._job_info["stderr"],
407 command_line=self._job_info["command_line"],
408 )
409 return None
410
411 @property
412 def invocation_details(self):
413 return None
414
415 @property
416 def outputs_dict(self):
417 return self._outputs_dict
418
419 def download_output_to(self, dataset_details, output_directory, filename=None):
420 if filename is None:
421 local_filename = dataset_details.get("cwl_file_name") or dataset_details.get("name")
422 else:
423 local_filename = filename
424 destination = os.path.join(output_directory, local_filename)
425 self._history_content_download(
426 self._history_id,
427 dataset_details["id"],
428 to_path=destination,
429 filename=filename,
430 )
431 return destination
432
433 def _history_content_download(self, history_id, dataset_id, to_path, filename=None):
434 user_gi = self._user_gi
435 url = user_gi.url + "/histories/%s/contents/%s/display" % (history_id, dataset_id)
436
437 data = {}
438 if filename:
439 data["filename"] = filename
440
441 r = user_gi.make_get_request(url, params=data, stream=True, timeout=user_gi.timeout)
442 r.raise_for_status()
443
444 with open(to_path, 'wb') as fp:
445 for chunk in r.iter_content(chunk_size=bioblend.CHUNK_SIZE):
446 if chunk:
447 fp.write(chunk)
448
449
450 class GalaxyToolRunResponse(GalaxyBaseRunResponse):
451
452 def __init__(
453 self,
454 ctx,
455 runnable,
456 user_gi,
457 history_id,
458 log,
459 job_info,
460 api_run_response,
461 ):
462 super(GalaxyToolRunResponse, self).__init__(
463 ctx=ctx,
464 runnable=runnable,
465 user_gi=user_gi,
466 history_id=history_id,
467 log=log,
468 )
469 self._job_info = job_info
470 self.api_run_response = api_run_response
471
472 def is_collection(self, output):
473 # TODO: Make this more rigorous - search both output and output
474 # collections - throw an exception if not found in either place instead
475 # of just assuming all non-datasets are collections.
476 return self.output_src(output)["src"] == "hdca"
477
478 def to_galaxy_output(self, runnable_output):
479 output_id = runnable_output.get_id()
480 return tool_response_to_output(self.api_run_response, self._history_id, output_id)
481
482 def output_src(self, output):
483 outputs = self.api_run_response["outputs"]
484 output_collections = self.api_run_response["output_collections"]
485 output_id = output.get_id()
486 output_src = None
487 self._ctx.vlog("Looking for id [%s] in outputs [%s]" % (output_id, outputs))
488 for output in outputs:
489 if output["output_name"] == output_id:
490 output_src = {"src": "hda", "id": output["id"]}
491 for output_collection in output_collections:
492 if output_collection["output_name"] == output_id:
493 output_src = {"src": "hdca", "id": output_collection["id"]}
494 return output_src
495
496
497 class GalaxyWorkflowRunResponse(GalaxyBaseRunResponse):
498
499 def __init__(
500 self,
501 ctx,
502 runnable,
503 user_gi,
504 history_id,
505 log,
506 workflow_id,
507 invocation_id,
508 history_state='ok',
509 invocation_state='ok',
510 error_message=None,
511 ):
512 super(GalaxyWorkflowRunResponse, self).__init__(
513 ctx=ctx,
514 runnable=runnable,
515 user_gi=user_gi,
516 history_id=history_id,
517 log=log,
518 )
519 self._workflow_id = workflow_id
520 self._invocation_id = invocation_id
521 self._invocation_details = {}
522 self._cached_invocation = None
523 self.history_state = history_state
524 self.invocation_state = invocation_state
525 self.error_message = error_message
526 self._invocation_details = self.collect_invocation_details(invocation_id)
527
528 def to_galaxy_output(self, runnable_output):
529 output_id = runnable_output.get_id()
530 self._ctx.vlog("checking for output in invocation [%s]" % self._invocation)
531 return invocation_to_output(self._invocation, self._history_id, output_id)
532
533 def output_src(self, output):
534 invocation = self._invocation
535 # Use newer workflow outputs API.
536
537 output_name = output.get_id()
538 if output_name in invocation["outputs"]:
539 return invocation["outputs"][output.get_id()]
540 elif output_name in invocation["output_collections"]:
541 return invocation["output_collections"][output.get_id()]
542 else:
543 raise Exception("Failed to find output [%s] in invocation outputs [%s]" % (output_name, invocation["outputs"]))
544
545 def collect_invocation_details(self, invocation_id=None):
546 gi = self._user_gi
547 invocation_steps = {}
548 invocation = self.get_invocation(invocation_id)
549 for step in invocation['steps']:
550 step_label_or_index = "{}. {}".format(step['order_index'], step['workflow_step_label'] or 'Unnamed step')
551 workflow_step = gi.invocations.show_invocation_step(self._invocation_id, step['id'])
552 workflow_step['subworkflow'] = None
553 subworkflow_invocation_id = workflow_step.get('subworkflow_invocation_id')
554 if subworkflow_invocation_id:
555 workflow_step['subworkflow'] = self.collect_invocation_details(subworkflow_invocation_id)
556 workflow_step_job_details = [self._user_gi.jobs.show_job(j['id'], full_details=True) for j in workflow_step['jobs']]
557 workflow_step['jobs'] = workflow_step_job_details
558 invocation_steps[step_label_or_index] = workflow_step
559 return invocation_steps
560
561 @property
562 def invocation_details(self):
563 return self._invocation_details
564
565 def get_invocation(self, invocation_id):
566 return self._user_gi.invocations.show_invocation(invocation_id)
567
568 @property
569 def _invocation(self):
570 if self._cached_invocation is None:
571 self._cached_invocation = self.get_invocation(self._invocation_id)
572 return self._cached_invocation
573
574 @property
575 def was_successful(self):
576 return self.history_state == 'ok' and self.invocation_state == 'scheduled'
577
578
579 def _tool_id(tool_path):
580 tool_source = get_tool_source(tool_path)
581 return tool_source.parse_id()
582
583
584 def _history_id(gi, **kwds):
585 history_id = kwds.get("history_id", None)
586 if history_id is None:
587 history_name = kwds.get("history_name", DEFAULT_HISTORY_NAME) or DEFAULT_HISTORY_NAME
588 history_id = gi.histories.create_history(history_name)["id"]
589 return history_id
590
591
592 def get_dict_from_workflow(gi, workflow_id):
593 return gi.workflows.export_workflow_dict(workflow_id)
594
595
596 def _wait_for_invocation(ctx, gi, history_id, workflow_id, invocation_id, polling_backoff=0):
597
598 def state_func():
599 return _retry_on_timeouts(ctx, gi, lambda gi: gi.workflows.show_invocation(workflow_id, invocation_id))
600
601 return _wait_on_state(state_func, polling_backoff)
602
603
604 def _retry_on_timeouts(ctx, gi, f):
605 gi.timeout = 60
606 try_count = 5
607 try:
608 for try_num in range(try_count):
609 start_time = time.time()
610 try:
611 return f(gi)
612 except RequestException:
613 end_time = time.time()
614 if end_time - start_time > 45 and (try_num + 1) < try_count:
615 ctx.vlog("Galaxy seems to have timedout, retrying to fetch status.")
616 continue
617 else:
618 raise
619 finally:
620 gi.timeout = None
621
622
623 def has_jobs_in_states(ctx, gi, history_id, states):
624 params = {"history_id": history_id}
625 jobs_url = gi.url + '/jobs'
626 jobs = gi.jobs._get(url=jobs_url, params=params)
627 target_jobs = [j for j in jobs if j["state"] in states]
628 return len(target_jobs) > 0
629
630
631 def _wait_for_history(ctx, gi, history_id, polling_backoff=0):
632 # Used to wait for active jobs and then wait for history, but now this is called
633 # after upload is complete and after the invocation has been done scheduling - so
634 # no need to wait for active jobs anymore I think.
635
636 def state_func():
637 return _retry_on_timeouts(ctx, gi, lambda gi: gi.histories.show_history(history_id))
638
639 return _wait_on_state(state_func, polling_backoff)
640
641
642 def _wait_for_job(gi, job_id):
643 def state_func():
644 return gi.jobs.show_job(job_id, full_details=True)
645
646 return _wait_on_state(state_func)
647
648
649 def _wait_on_state(state_func, polling_backoff=0):
650
651 def get_state():
652 response = state_func()
653 state = response["state"]
654 if str(state) not in ["running", "queued", "new", "ready"]:
655 return state
656 else:
657 return None
658 timeout = 60 * 60 * 24
659 final_state = wait_on(get_state, "state", timeout, polling_backoff)
660 return final_state
661
662
663 __all__ = (
664 "execute",
665 )