comparison env/lib/python3.9/site-packages/planemo/commands/cmd_list_invocations.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """Module describing the planemo ``list_invocations`` command."""
2 import json
3
4 import click
5
6 from planemo import options
7 from planemo.cli import command_function
8 from planemo.galaxy import profiles
9 from planemo.galaxy.api import get_invocations
10 from planemo.galaxy.workflows import remote_runnable_to_workflow_id
11 from planemo.io import error, info
12 from planemo.runnable_resolve import for_runnable_identifier
13
14 try:
15 from tabulate import tabulate
16 except ImportError:
17 tabulate = None # type: ignore
18
19
20 @click.command('list_invocations')
21 @click.argument(
22 "workflow_identifier",
23 type=click.STRING,
24 )
25 @options.profile_option(required=True)
26 @command_function
27 def cli(ctx, workflow_identifier, **kwds):
28 """
29 Get a list of invocations for a particular workflow ID or alias.
30 """
31 info("Looking for invocations for workflow {}...".format(workflow_identifier))
32 runnable = for_runnable_identifier(ctx, workflow_identifier, kwds)
33 profile = profiles.ensure_profile(ctx, kwds.get('profile'))
34 assert runnable.is_remote_workflow_uri
35 workflow_id = remote_runnable_to_workflow_id(runnable)
36
37 invocations = get_invocations(url=profile['galaxy_url'], key=profile['galaxy_admin_key'] or profile['galaxy_user_key'], workflow_id=workflow_id)
38 if tabulate:
39 state_colors = {
40 'ok': '\033[92m', # green
41 'running': '\033[93m', # yellow
42 'error': '\033[91m', # red
43 'paused': '\033[96m', # cyan
44 'deleted': '\033[95m', # magenta
45 'deleted_new': '\033[95m', # magenta
46 'new': '\033[96m', # cyan
47 'queued': '\033[93m', # yellow
48 }
49 print(tabulate({
50 "Invocation ID": invocations.keys(),
51 "Jobs status": [', '.join(['{}{} jobs {}\033[0m'.format(state_colors[k], v, k) for k, v in inv['states'].items()]
52 ) for inv in invocations.values()],
53 "Invocation report URL": ['{}/workflows/invocations/report?id={}'.format(profile['galaxy_url'].strip('/'), inv_id
54 ) for inv_id in invocations],
55 "History URL": ['{}/histories/view?id={}'.format(profile['galaxy_url'].strip('/'), invocations[inv_id]['history_id']
56 ) for inv_id in invocations]
57 }, headers="keys"))
58 else:
59 error("The tabulate package is not installed, invocations could not be listed correctly.")
60 print(json.dumps(invocations, indent=4, sort_keys=True))
61 info("{} invocations found.".format(len(invocations)))
62 return