view env/lib/python3.9/site-packages/planemo/commands/cmd_list_invocations.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line source

"""Module describing the planemo ``list_invocations`` command."""
import json

import click

from planemo import options
from planemo.cli import command_function
from planemo.galaxy import profiles
from planemo.galaxy.api import get_invocations
from planemo.galaxy.workflows import remote_runnable_to_workflow_id
from planemo.io import error, info
from planemo.runnable_resolve import for_runnable_identifier

try:
    from tabulate import tabulate
except ImportError:
    tabulate = None  # type: ignore


@click.command('list_invocations')
@click.argument(
    "workflow_identifier",
    type=click.STRING,
)
@options.profile_option(required=True)
@command_function
def cli(ctx, workflow_identifier, **kwds):
    """
    Get a list of invocations for a particular workflow ID or alias.
    """
    info("Looking for invocations for workflow {}...".format(workflow_identifier))
    runnable = for_runnable_identifier(ctx, workflow_identifier, kwds)
    profile = profiles.ensure_profile(ctx, kwds.get('profile'))
    assert runnable.is_remote_workflow_uri
    workflow_id = remote_runnable_to_workflow_id(runnable)

    invocations = get_invocations(url=profile['galaxy_url'], key=profile['galaxy_admin_key'] or profile['galaxy_user_key'], workflow_id=workflow_id)
    if tabulate:
        state_colors = {
            'ok': '\033[92m',           # green
            'running': '\033[93m',      # yellow
            'error': '\033[91m',        # red
            'paused': '\033[96m',       # cyan
            'deleted': '\033[95m',      # magenta
            'deleted_new': '\033[95m',  # magenta
            'new': '\033[96m',          # cyan
            'queued': '\033[93m',       # yellow
        }
        print(tabulate({
                "Invocation ID": invocations.keys(),
                "Jobs status": [', '.join(['{}{} jobs {}\033[0m'.format(state_colors[k], v, k) for k, v in inv['states'].items()]
                                          ) for inv in invocations.values()],
                "Invocation report URL": ['{}/workflows/invocations/report?id={}'.format(profile['galaxy_url'].strip('/'), inv_id
                                                                                         ) for inv_id in invocations],
                "History URL": ['{}/histories/view?id={}'.format(profile['galaxy_url'].strip('/'), invocations[inv_id]['history_id']
                                                                 ) for inv_id in invocations]
            }, headers="keys"))
    else:
        error("The tabulate package is not installed, invocations could not be listed correctly.")
        print(json.dumps(invocations, indent=4, sort_keys=True))
    info("{} invocations found.".format(len(invocations)))
    return