view env/lib/python3.9/site-packages/planemo/galaxy/activity.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line source

"""Module provides generic interface to running Galaxy tools and workflows."""

import json
import os
import sys
import tempfile
import time
import traceback

import bioblend
from bioblend.util import attach_file
from galaxy.tool_util.client.staging import (
    StagingInterace,
)
from galaxy.tool_util.cwl.util import (
    invocation_to_output,
    output_properties,
    output_to_cwl_json,
    tool_response_to_output,
)
from galaxy.tool_util.parser import get_tool_source
from galaxy.tool_util.verify.interactor import galaxy_requests_post
from galaxy.util import (
    safe_makedirs,
    unicodify,
)
from requests.exceptions import RequestException
from six.moves.urllib.parse import urljoin

from planemo.galaxy.api import summarize_history
from planemo.io import wait_on
from planemo.runnable import (
    ErrorRunResponse,
    get_outputs,
    RunnableType,
    SuccessfulRunResponse,
)

DEFAULT_HISTORY_NAME = "CWL Target History"
ERR_NO_SUCH_TOOL = ("Failed to find tool with ID [%s] in Galaxy - cannot execute job. "
                    "You may need to enable verbose logging and determine why the tool did not load. [%s]")


def execute(ctx, config, runnable, job_path, **kwds):
    """Execute a Galaxy activity."""
    try:
        return _execute(ctx, config, runnable, job_path, **kwds)
    except Exception as e:
        if ctx.verbose:
            ctx.vlog("Failed to execute Galaxy activity, throwing ErrorRunResponse")
            traceback.print_exc(file=sys.stdout)
        return ErrorRunResponse(unicodify(e))


def _verified_tool_id(runnable, user_gi):
    tool_id = _tool_id(runnable.path)
    try:
        user_gi.tools.show_tool(tool_id)
    except Exception as e:
        raise Exception(ERR_NO_SUCH_TOOL % (tool_id, e))
    return tool_id


def _inputs_representation(runnable):
    if runnable.type == RunnableType.cwl_tool:
        inputs_representation = "cwl"
    else:
        inputs_representation = "galaxy"
    return inputs_representation


def log_contents_str(config):
    if hasattr(config, "log_contents"):
        return config.log_contents
    else:
        return "No log for this engine type."


class PlanemoStagingInterface(StagingInterace):

    def __init__(self, ctx, runnable, user_gi, version_major):
        self._ctx = ctx
        self._user_gi = user_gi
        self._runnable = runnable
        self._version_major = version_major

    def _post(self, api_path, payload, files_attached=False):
        params = dict(key=self._user_gi.key)
        url = urljoin(self._user_gi.url, "api/" + api_path)
        return galaxy_requests_post(url, data=payload, params=params, as_json=True).json()

    def _attach_file(self, path):
        return attach_file(path)

    def _handle_job(self, job_response):
        job_id = job_response["id"]
        _wait_for_job(self._user_gi, job_id)

    @property
    def use_fetch_api(self):
        # hack around this not working for galaxy_tools - why is that :(
        return self._runnable.type != RunnableType.galaxy_tool and self._version_major >= "20.09"

    # extension point for planemo to override logging
    def _log(self, message):
        self._ctx.vlog(message)


def _execute(ctx, config, runnable, job_path, **kwds):
    user_gi = config.user_gi
    admin_gi = config.gi

    history_id = _history_id(user_gi, **kwds)

    try:
        job_dict, _ = stage_in(ctx, runnable, config, user_gi, history_id, job_path, **kwds)
    except Exception:
        ctx.vlog("Problem with staging in data for Galaxy activities...")
        raise
    if runnable.type in [RunnableType.galaxy_tool, RunnableType.cwl_tool]:
        response_class = GalaxyToolRunResponse
        tool_id = _verified_tool_id(runnable, user_gi)
        inputs_representation = _inputs_representation(runnable)
        run_tool_payload = dict(
            history_id=history_id,
            tool_id=tool_id,
            inputs=job_dict,
            inputs_representation=inputs_representation,
        )
        ctx.vlog("Post to Galaxy tool API with payload [%s]" % run_tool_payload)
        tool_run_response = user_gi.tools._post(run_tool_payload)

        job = tool_run_response["jobs"][0]
        job_id = job["id"]
        try:
            final_state = _wait_for_job(user_gi, job_id)
        except Exception:
            summarize_history(ctx, user_gi, history_id)
            raise
        if final_state != "ok":
            msg = "Failed to run CWL tool job final job state is [%s]." % final_state
            summarize_history(ctx, user_gi, history_id)
            raise Exception(msg)

        ctx.vlog("Final job state was ok, fetching details for job [%s]" % job_id)
        job_info = admin_gi.jobs.show_job(job_id)
        response_kwds = {
            'job_info': job_info,
            'api_run_response': tool_run_response,
        }
        if ctx.verbose:
            summarize_history(ctx, user_gi, history_id)
    elif runnable.type in [RunnableType.galaxy_workflow, RunnableType.cwl_workflow]:
        response_class = GalaxyWorkflowRunResponse
        workflow_id = config.workflow_id_for_runnable(runnable)
        ctx.vlog("Found Galaxy workflow ID [%s] for URI [%s]" % (workflow_id, runnable.uri))
        # TODO: Use the following when BioBlend 0.14 is released
        # invocation = user_gi.worklfows.invoke_workflow(
        #    workflow_id,
        #    inputs=job_dict,
        #    history_id=history_id,
        #    allow_tool_state_corrections=True,
        #    inputs_by="name",
        # )
        payload = dict(
            workflow_id=workflow_id,
            history_id=history_id,
            inputs=job_dict,
            inputs_by="name",
            allow_tool_state_corrections=True,
        )
        invocations_url = "%s/workflows/%s/invocations" % (
            user_gi.url,
            workflow_id,
        )
        invocation = user_gi.workflows._post(payload, url=invocations_url)
        invocation_id = invocation["id"]
        ctx.vlog("Waiting for invocation [%s]" % invocation_id)
        polling_backoff = kwds.get("polling_backoff", 0)
        final_invocation_state = 'new'
        error_message = ""
        try:
            final_invocation_state = _wait_for_invocation(ctx, user_gi, history_id, workflow_id, invocation_id, polling_backoff)
            assert final_invocation_state == 'scheduled'
        except Exception:
            ctx.vlog("Problem waiting on invocation...")
            summarize_history(ctx, user_gi, history_id)
            error_message = "Final invocation state is [%s]" % final_invocation_state
        ctx.vlog("Final invocation state is [%s]" % final_invocation_state)
        final_state = _wait_for_history(ctx, user_gi, history_id, polling_backoff)
        if final_state != "ok":
            msg = "Failed to run workflow final history state is [%s]." % final_state
            error_message = msg if not error_message else "%s. %s" % (error_message, msg)
            ctx.vlog(msg)
            summarize_history(ctx, user_gi, history_id)
        else:
            ctx.vlog("Final history state is 'ok'")
        response_kwds = {
            'workflow_id': workflow_id,
            'invocation_id': invocation_id,
            'history_state': final_state,
            'invocation_state': final_invocation_state,
            'error_message': error_message,
        }
    else:
        raise NotImplementedError()

    run_response = response_class(
        ctx=ctx,
        runnable=runnable,
        user_gi=user_gi,
        history_id=history_id,
        log=log_contents_str(config),
        **response_kwds
    )
    output_directory = kwds.get("output_directory", None)
    ctx.vlog("collecting outputs from run...")
    run_response.collect_outputs(ctx, output_directory)
    ctx.vlog("collecting outputs complete")
    return run_response


def stage_in(ctx, runnable, config, user_gi, history_id, job_path, **kwds):  # noqa C901
    # only upload objects as files/collections for CWL workflows...
    tool_or_workflow = "tool" if runnable.type != RunnableType.cwl_workflow else "workflow"
    to_posix_lines = runnable.type.is_galaxy_artifact
    job_dict, datasets = PlanemoStagingInterface(ctx, runnable, user_gi, config.version_major).stage(
        tool_or_workflow,
        history_id=history_id,
        job_path=job_path,
        use_path_paste=config.use_path_paste,
        to_posix_lines=to_posix_lines,
    )

    if datasets:
        ctx.vlog("uploaded datasets [%s] for activity, checking history state" % datasets)
        final_state = _wait_for_history(ctx, user_gi, history_id)

        for (dataset, path) in datasets:
            dataset_details = user_gi.histories.show_dataset(
                history_id,
                dataset["id"],
            )
            ctx.vlog("Uploaded dataset for path [%s] with metadata [%s]" % (path, dataset_details))
    else:
        # Mark uploads as ok because nothing to do.
        final_state = "ok"

    ctx.vlog("final state is %s" % final_state)
    if final_state != "ok":
        msg = "Failed to run job final job state is [%s]." % final_state
        summarize_history(ctx, user_gi, history_id)
        raise Exception(msg)

    return job_dict, datasets


def _file_path_to_name(file_path):
    if file_path is not None:
        name = os.path.basename(file_path)
    else:
        name = "defaultname"
    return name


class GalaxyBaseRunResponse(SuccessfulRunResponse):

    def __init__(
        self,
        ctx,
        runnable,
        user_gi,
        history_id,
        log,
    ):
        self._ctx = ctx
        self._runnable = runnable
        self._user_gi = user_gi
        self._history_id = history_id
        self._log = log

        self._job_info = None

        self._outputs_dict = None

    def to_galaxy_output(self, output):
        """Convert runnable output to a GalaxyOutput object.

        Subclasses for workflow and tool execution override this.
        """
        raise NotImplementedError()

    def _get_extra_files(self, dataset_details):
        extra_files_url = "%s/histories/%s/contents/%s/extra_files" % (
            self._user_gi.url, self._history_id, dataset_details["id"]
        )
        extra_files = self._user_gi.jobs._get(url=extra_files_url)
        return extra_files

    def _get_metadata(self, history_content_type, content_id):
        if history_content_type == "dataset":
            return self._user_gi.histories.show_dataset(
                self._history_id,
                content_id,
            )
        elif history_content_type == "dataset_collection":
            return self._user_gi.histories.show_dataset_collection(
                self._history_id,
                content_id,
            )
        else:
            raise Exception("Unknown history content type encountered [%s]" % history_content_type)

    def collect_outputs(self, ctx, output_directory):
        assert self._outputs_dict is None, "collect_outputs pre-condition violated"

        outputs_dict = {}
        if not output_directory:
            # TODO: rather than creating a directory just use
            # Galaxy paths if they are available in this
            # configuration.
            output_directory = tempfile.mkdtemp()

        def get_dataset(dataset_details, filename=None):
            parent_basename = dataset_details.get("cwl_file_name")
            if not parent_basename:
                parent_basename = dataset_details.get("name")
            file_ext = dataset_details["file_ext"]
            if file_ext == "directory":
                # TODO: rename output_directory to outputs_directory because we can have output directories
                # and this is confusing...
                the_output_directory = os.path.join(output_directory, parent_basename)
                safe_makedirs(the_output_directory)
                destination = self.download_output_to(dataset_details, the_output_directory, filename=filename)
            else:
                destination = self.download_output_to(dataset_details, output_directory, filename=filename)
            if filename is None:
                basename = parent_basename
            else:
                basename = os.path.basename(filename)

            return {"path": destination, "basename": basename}

        ctx.vlog("collecting outputs to directory %s" % output_directory)

        for runnable_output in get_outputs(self._runnable, gi=self._user_gi):
            output_id = runnable_output.get_id()
            if not output_id:
                ctx.vlog("Workflow output identified without an ID (label), skipping")
                continue
            output_dict_value = None
            is_cwl = self._runnable.type in [RunnableType.cwl_workflow, RunnableType.cwl_tool]
            output_src = self.output_src(runnable_output)
            if not is_cwl and output_src["src"] == "hda":
                output_dataset_id = output_src["id"]
                dataset = self._get_metadata("dataset", output_dataset_id)
                dataset_dict = get_dataset(dataset)
                ctx.vlog("populated destination [%s]" % dataset_dict["path"])

                if dataset["file_ext"] == "expression.json":
                    with open(dataset_dict["path"], "r") as f:
                        output_dict_value = json.load(f)
                else:
                    output_dict_value = output_properties(**dataset_dict)
            else:
                output_dataset_id = output_src["id"]
                galaxy_output = self.to_galaxy_output(runnable_output)
                cwl_output = output_to_cwl_json(
                    galaxy_output,
                    self._get_metadata,
                    get_dataset,
                    self._get_extra_files,
                    pseduo_location=True,
                )
                if is_cwl:
                    output_dict_value = cwl_output
                else:

                    def attach_file_properties(collection, cwl_output):
                        elements = collection["elements"]
                        assert len(elements) == len(cwl_output)
                        for element, cwl_output_element in zip(elements, cwl_output):
                            element["_output_object"] = cwl_output_element
                            if isinstance(cwl_output_element, list):
                                assert "elements" in element["object"]
                                attach_file_properties(element["object"], cwl_output_element)

                    output_metadata = self._get_metadata("dataset_collection", output_dataset_id)
                    attach_file_properties(output_metadata, cwl_output)
                    output_dict_value = output_metadata

            outputs_dict[output_id] = output_dict_value

        self._outputs_dict = outputs_dict
        ctx.vlog("collected outputs [%s]" % self._outputs_dict)

    @property
    def log(self):
        return self._log

    @property
    def job_info(self):
        if self._job_info is not None:
            return dict(
                stdout=self._job_info["stdout"],
                stderr=self._job_info["stderr"],
                command_line=self._job_info["command_line"],
            )
        return None

    @property
    def invocation_details(self):
        return None

    @property
    def outputs_dict(self):
        return self._outputs_dict

    def download_output_to(self, dataset_details, output_directory, filename=None):
        if filename is None:
            local_filename = dataset_details.get("cwl_file_name") or dataset_details.get("name")
        else:
            local_filename = filename
        destination = os.path.join(output_directory, local_filename)
        self._history_content_download(
            self._history_id,
            dataset_details["id"],
            to_path=destination,
            filename=filename,
        )
        return destination

    def _history_content_download(self, history_id, dataset_id, to_path, filename=None):
        user_gi = self._user_gi
        url = user_gi.url + "/histories/%s/contents/%s/display" % (history_id, dataset_id)

        data = {}
        if filename:
            data["filename"] = filename

        r = user_gi.make_get_request(url, params=data, stream=True, timeout=user_gi.timeout)
        r.raise_for_status()

        with open(to_path, 'wb') as fp:
            for chunk in r.iter_content(chunk_size=bioblend.CHUNK_SIZE):
                if chunk:
                    fp.write(chunk)


class GalaxyToolRunResponse(GalaxyBaseRunResponse):

    def __init__(
        self,
        ctx,
        runnable,
        user_gi,
        history_id,
        log,
        job_info,
        api_run_response,
    ):
        super(GalaxyToolRunResponse, self).__init__(
            ctx=ctx,
            runnable=runnable,
            user_gi=user_gi,
            history_id=history_id,
            log=log,
        )
        self._job_info = job_info
        self.api_run_response = api_run_response

    def is_collection(self, output):
        # TODO: Make this more rigorous - search both output and output
        # collections - throw an exception if not found in either place instead
        # of just assuming all non-datasets are collections.
        return self.output_src(output)["src"] == "hdca"

    def to_galaxy_output(self, runnable_output):
        output_id = runnable_output.get_id()
        return tool_response_to_output(self.api_run_response, self._history_id, output_id)

    def output_src(self, output):
        outputs = self.api_run_response["outputs"]
        output_collections = self.api_run_response["output_collections"]
        output_id = output.get_id()
        output_src = None
        self._ctx.vlog("Looking for id [%s] in outputs [%s]" % (output_id, outputs))
        for output in outputs:
            if output["output_name"] == output_id:
                output_src = {"src": "hda", "id": output["id"]}
        for output_collection in output_collections:
            if output_collection["output_name"] == output_id:
                output_src = {"src": "hdca", "id": output_collection["id"]}
        return output_src


class GalaxyWorkflowRunResponse(GalaxyBaseRunResponse):

    def __init__(
        self,
        ctx,
        runnable,
        user_gi,
        history_id,
        log,
        workflow_id,
        invocation_id,
        history_state='ok',
        invocation_state='ok',
        error_message=None,
    ):
        super(GalaxyWorkflowRunResponse, self).__init__(
            ctx=ctx,
            runnable=runnable,
            user_gi=user_gi,
            history_id=history_id,
            log=log,
        )
        self._workflow_id = workflow_id
        self._invocation_id = invocation_id
        self._invocation_details = {}
        self._cached_invocation = None
        self.history_state = history_state
        self.invocation_state = invocation_state
        self.error_message = error_message
        self._invocation_details = self.collect_invocation_details(invocation_id)

    def to_galaxy_output(self, runnable_output):
        output_id = runnable_output.get_id()
        self._ctx.vlog("checking for output in invocation [%s]" % self._invocation)
        return invocation_to_output(self._invocation, self._history_id, output_id)

    def output_src(self, output):
        invocation = self._invocation
        # Use newer workflow outputs API.

        output_name = output.get_id()
        if output_name in invocation["outputs"]:
            return invocation["outputs"][output.get_id()]
        elif output_name in invocation["output_collections"]:
            return invocation["output_collections"][output.get_id()]
        else:
            raise Exception("Failed to find output [%s] in invocation outputs [%s]" % (output_name, invocation["outputs"]))

    def collect_invocation_details(self, invocation_id=None):
        gi = self._user_gi
        invocation_steps = {}
        invocation = self.get_invocation(invocation_id)
        for step in invocation['steps']:
            step_label_or_index = "{}. {}".format(step['order_index'], step['workflow_step_label'] or 'Unnamed step')
            workflow_step = gi.invocations.show_invocation_step(self._invocation_id, step['id'])
            workflow_step['subworkflow'] = None
            subworkflow_invocation_id = workflow_step.get('subworkflow_invocation_id')
            if subworkflow_invocation_id:
                workflow_step['subworkflow'] = self.collect_invocation_details(subworkflow_invocation_id)
            workflow_step_job_details = [self._user_gi.jobs.show_job(j['id'], full_details=True) for j in workflow_step['jobs']]
            workflow_step['jobs'] = workflow_step_job_details
            invocation_steps[step_label_or_index] = workflow_step
        return invocation_steps

    @property
    def invocation_details(self):
        return self._invocation_details

    def get_invocation(self, invocation_id):
        return self._user_gi.invocations.show_invocation(invocation_id)

    @property
    def _invocation(self):
        if self._cached_invocation is None:
            self._cached_invocation = self.get_invocation(self._invocation_id)
        return self._cached_invocation

    @property
    def was_successful(self):
        return self.history_state == 'ok' and self.invocation_state == 'scheduled'


def _tool_id(tool_path):
    tool_source = get_tool_source(tool_path)
    return tool_source.parse_id()


def _history_id(gi, **kwds):
    history_id = kwds.get("history_id", None)
    if history_id is None:
        history_name = kwds.get("history_name", DEFAULT_HISTORY_NAME) or DEFAULT_HISTORY_NAME
        history_id = gi.histories.create_history(history_name)["id"]
    return history_id


def get_dict_from_workflow(gi, workflow_id):
    return gi.workflows.export_workflow_dict(workflow_id)


def _wait_for_invocation(ctx, gi, history_id, workflow_id, invocation_id, polling_backoff=0):

    def state_func():
        return _retry_on_timeouts(ctx, gi, lambda gi: gi.workflows.show_invocation(workflow_id, invocation_id))

    return _wait_on_state(state_func, polling_backoff)


def _retry_on_timeouts(ctx, gi, f):
    gi.timeout = 60
    try_count = 5
    try:
        for try_num in range(try_count):
            start_time = time.time()
            try:
                return f(gi)
            except RequestException:
                end_time = time.time()
                if end_time - start_time > 45 and (try_num + 1) < try_count:
                    ctx.vlog("Galaxy seems to have timedout, retrying to fetch status.")
                    continue
                else:
                    raise
    finally:
        gi.timeout = None


def has_jobs_in_states(ctx, gi, history_id, states):
    params = {"history_id": history_id}
    jobs_url = gi.url + '/jobs'
    jobs = gi.jobs._get(url=jobs_url, params=params)
    target_jobs = [j for j in jobs if j["state"] in states]
    return len(target_jobs) > 0


def _wait_for_history(ctx, gi, history_id, polling_backoff=0):
    # Used to wait for active jobs and then wait for history, but now this is called
    # after upload is complete and after the invocation has been done scheduling - so
    # no need to wait for active jobs anymore I think.

    def state_func():
        return _retry_on_timeouts(ctx, gi, lambda gi: gi.histories.show_history(history_id))

    return _wait_on_state(state_func, polling_backoff)


def _wait_for_job(gi, job_id):
    def state_func():
        return gi.jobs.show_job(job_id, full_details=True)

    return _wait_on_state(state_func)


def _wait_on_state(state_func, polling_backoff=0):

    def get_state():
        response = state_func()
        state = response["state"]
        if str(state) not in ["running", "queued", "new", "ready"]:
            return state
        else:
            return None
    timeout = 60 * 60 * 24
    final_state = wait_on(get_state, "state", timeout, polling_backoff)
    return final_state


__all__ = (
    "execute",
)