diff env/lib/python3.9/site-packages/planemo/commands/cmd_list_invocations.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/planemo/commands/cmd_list_invocations.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,62 @@
+"""Module describing the planemo ``list_invocations`` command."""
+import json
+
+import click
+
+from planemo import options
+from planemo.cli import command_function
+from planemo.galaxy import profiles
+from planemo.galaxy.api import get_invocations
+from planemo.galaxy.workflows import remote_runnable_to_workflow_id
+from planemo.io import error, info
+from planemo.runnable_resolve import for_runnable_identifier
+
+try:
+    from tabulate import tabulate
+except ImportError:
+    tabulate = None  # type: ignore
+
+
+@click.command('list_invocations')
+@click.argument(
+    "workflow_identifier",
+    type=click.STRING,
+)
+@options.profile_option(required=True)
+@command_function
+def cli(ctx, workflow_identifier, **kwds):
+    """
+    Get a list of invocations for a particular workflow ID or alias.
+    """
+    info("Looking for invocations for workflow {}...".format(workflow_identifier))
+    runnable = for_runnable_identifier(ctx, workflow_identifier, kwds)
+    profile = profiles.ensure_profile(ctx, kwds.get('profile'))
+    assert runnable.is_remote_workflow_uri
+    workflow_id = remote_runnable_to_workflow_id(runnable)
+
+    invocations = get_invocations(url=profile['galaxy_url'], key=profile['galaxy_admin_key'] or profile['galaxy_user_key'], workflow_id=workflow_id)
+    if tabulate:
+        state_colors = {
+            'ok': '\033[92m',           # green
+            'running': '\033[93m',      # yellow
+            'error': '\033[91m',        # red
+            'paused': '\033[96m',       # cyan
+            'deleted': '\033[95m',      # magenta
+            'deleted_new': '\033[95m',  # magenta
+            'new': '\033[96m',          # cyan
+            'queued': '\033[93m',       # yellow
+        }
+        print(tabulate({
+                "Invocation ID": invocations.keys(),
+                "Jobs status": [', '.join(['{}{} jobs {}\033[0m'.format(state_colors[k], v, k) for k, v in inv['states'].items()]
+                                          ) for inv in invocations.values()],
+                "Invocation report URL": ['{}/workflows/invocations/report?id={}'.format(profile['galaxy_url'].strip('/'), inv_id
+                                                                                         ) for inv_id in invocations],
+                "History URL": ['{}/histories/view?id={}'.format(profile['galaxy_url'].strip('/'), invocations[inv_id]['history_id']
+                                                                 ) for inv_id in invocations]
+            }, headers="keys"))
+    else:
+        error("The tabulate package is not installed, invocations could not be listed correctly.")
+        print(json.dumps(invocations, indent=4, sort_keys=True))
+    info("{} invocations found.".format(len(invocations)))
+    return