Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/planemo/galaxy/api.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
| author | shellac |
|---|---|
| date | Mon, 01 Jun 2020 08:59:25 -0400 |
| parents | 79f47841a781 |
| children |
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/planemo/galaxy/api.py Thu May 14 16:47:39 2020 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,115 +0,0 @@ -"""A high-level interface to local Galaxy instances using bioblend.""" -from six import StringIO - -from planemo.bioblend import ensure_module -from planemo.bioblend import galaxy - -DEFAULT_MASTER_API_KEY = "test_key" - - -def gi(port=None, url=None, key=None): - """Return a bioblend ``GalaxyInstance`` for Galaxy on this port.""" - ensure_module() - if key is None: - key = DEFAULT_MASTER_API_KEY - if port is None: - url = url - else: - url = "http://localhost:%d" % int(port) - - return galaxy.GalaxyInstance( - url=url, - key=key - ) - - -def user_api_key(admin_gi): - """Use an admin authenticated account to generate a user API key.""" - ensure_module() - # TODO: thread-safe - users = admin_gi.users - all_users = users.get_users() - - user_id = None - for user in all_users: - if user["email"] == "planemo@galaxyproject.org": - user_id = user["id"] - - if user_id is None: - # TODO: Allow override with --user_api_key. - galaxy_config = admin_gi.config.get_config() - use_remote_user = bool(galaxy_config["use_remote_user"]) - if not use_remote_user: - user_response = users.create_local_user( - "planemo", - "planemo@galaxyproject.org", - "planemo", - ) - user_id = user_response["id"] - else: - user_response = users.create_remote_user( - "planemo@galaxyproject.org", - ) - user_id = user_response["id"] - return users.create_user_apikey(user_id) - - -def summarize_history(ctx, gi, history_id): - """Summarize a history with print() based on similar code in Galaxy for populators. - """ - if not ctx.verbose: - return - - if history_id is None: - raise ValueError("summarize_history passed empty history_id") - try: - history_contents = gi.histories.show_history(history_id, contents=True) - except Exception: - print("Failed to fetch history contents in summarize_history.") - return - - for history_content in history_contents: - history_content_id = history_content.get('id', None) - print("| %d - %s (HID - NAME) " % (int(history_content['hid']), history_content['name'])) - if history_content['history_content_type'] == 'dataset_collection': - history_contents_json = gi.histories.show_dataset_collection(history_id, history_content["id"]) - print("| Dataset Collection: %s" % history_contents_json) - continue - try: - dataset_info = gi.histories.show_dataset(history_id, history_content_id) - print("| Dataset State:") - print(_format_for_summary(dataset_info.get("state"), "Dataset state is unknown.")) - print("| Dataset Blurb:") - print(_format_for_summary(dataset_info.get("misc_blurb", ""), "Dataset blurb was empty.")) - print("| Dataset Info:") - print(_format_for_summary(dataset_info.get("misc_info", ""), "Dataset info is empty.")) - print("| Peek:") - print(_format_for_summary(dataset_info.get("peek", ""), "Peek unavilable.")) - except Exception: - print("| *PLANEMO ERROR FETCHING DATASET DETAILS*") - try: - provenance_info = _dataset_provenance(gi, history_id, history_content_id) - print("| Dataset Job Standard Output:") - print(_format_for_summary(provenance_info.get("stdout", ""), "Standard output was empty.")) - print("| Dataset Job Standard Error:") - print(_format_for_summary(provenance_info.get("stderr", ""), "Standard error was empty.")) - except Exception: - print("| *PLANEMO ERROR FETCHING JOB DETAILS*") - print("|") - - -def _format_for_summary(blob, empty_message, prefix="| "): - contents = "\n".join(["%s%s" % (prefix, line.strip()) for line in StringIO(blob).readlines() if line.rstrip("\n\r")]) - return contents or "%s*%s*" % (prefix, empty_message) - - -def _dataset_provenance(gi, history_id, id): - provenance = gi.histories.show_dataset_provenance(history_id, id) - return provenance - - -__all__ = ( - "DEFAULT_MASTER_API_KEY", - "gi", - "user_api_key", -)
