Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/planemo/galaxy/api.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
| author | shellac |
|---|---|
| date | Mon, 22 Mar 2021 18:12:50 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:4f3585e2f14b |
|---|---|
| 1 """A high-level interface to local Galaxy instances using bioblend.""" | |
| 2 from six import StringIO | |
| 3 | |
| 4 from planemo.bioblend import ensure_module | |
| 5 from planemo.bioblend import galaxy | |
| 6 | |
| 7 DEFAULT_ADMIN_API_KEY = "test_key" | |
| 8 | |
| 9 | |
| 10 def gi(port=None, url=None, key=None): | |
| 11 """Return a bioblend ``GalaxyInstance`` for Galaxy on this port.""" | |
| 12 ensure_module() | |
| 13 if key is None: | |
| 14 key = DEFAULT_ADMIN_API_KEY | |
| 15 if port is None: | |
| 16 url = url | |
| 17 else: | |
| 18 url = "http://localhost:%d" % int(port) | |
| 19 | |
| 20 return galaxy.GalaxyInstance( | |
| 21 url=url, | |
| 22 key=key | |
| 23 ) | |
| 24 | |
| 25 | |
| 26 def test_credentials_valid(port=None, url=None, key=None, is_admin=False): | |
| 27 """Test if provided API credentials are valid""" | |
| 28 test_gi = gi(port, url, key) | |
| 29 try: | |
| 30 current_user = test_gi.users.get_current_user() | |
| 31 if is_admin: | |
| 32 return current_user['is_admin'] | |
| 33 else: | |
| 34 return True | |
| 35 except Exception: | |
| 36 return False | |
| 37 | |
| 38 | |
| 39 def user_api_key(admin_gi): | |
| 40 """Use an admin authenticated account to generate a user API key.""" | |
| 41 ensure_module() | |
| 42 # TODO: thread-safe | |
| 43 users = admin_gi.users | |
| 44 all_users = users.get_users() | |
| 45 | |
| 46 user_id = None | |
| 47 for user in all_users: | |
| 48 if user["email"] == "planemo@galaxyproject.org": | |
| 49 user_id = user["id"] | |
| 50 | |
| 51 if user_id is None: | |
| 52 # TODO: Allow override with --user_api_key. | |
| 53 galaxy_config = admin_gi.config.get_config() | |
| 54 use_remote_user = bool(galaxy_config["use_remote_user"]) | |
| 55 if not use_remote_user: | |
| 56 user_response = users.create_local_user( | |
| 57 "planemo", | |
| 58 "planemo@galaxyproject.org", | |
| 59 "planemo", | |
| 60 ) | |
| 61 user_id = user_response["id"] | |
| 62 else: | |
| 63 user_response = users.create_remote_user( | |
| 64 "planemo@galaxyproject.org", | |
| 65 ) | |
| 66 user_id = user_response["id"] | |
| 67 return users.create_user_apikey(user_id) | |
| 68 | |
| 69 | |
| 70 def summarize_history(ctx, gi, history_id): | |
| 71 """Summarize a history with print() based on similar code in Galaxy for populators. | |
| 72 """ | |
| 73 if not ctx.verbose: | |
| 74 return | |
| 75 | |
| 76 if history_id is None: | |
| 77 raise ValueError("summarize_history passed empty history_id") | |
| 78 try: | |
| 79 history_contents = gi.histories.show_history(history_id, contents=True) | |
| 80 except Exception: | |
| 81 print("Failed to fetch history contents in summarize_history.") | |
| 82 return | |
| 83 | |
| 84 for history_content in history_contents: | |
| 85 history_content_id = history_content.get('id', None) | |
| 86 print("| %d - %s (HID - NAME) " % (int(history_content['hid']), history_content['name'])) | |
| 87 if history_content['history_content_type'] == 'dataset_collection': | |
| 88 history_contents_json = gi.histories.show_dataset_collection(history_id, history_content["id"]) | |
| 89 print("| Dataset Collection: %s" % history_contents_json) | |
| 90 continue | |
| 91 try: | |
| 92 dataset_info = gi.histories.show_dataset(history_id, history_content_id) | |
| 93 print("| Dataset State:") | |
| 94 print(_format_for_summary(dataset_info.get("state"), "Dataset state is unknown.")) | |
| 95 print("| Dataset Blurb:") | |
| 96 print(_format_for_summary(dataset_info.get("misc_blurb", ""), "Dataset blurb was empty.")) | |
| 97 print("| Dataset Info:") | |
| 98 print(_format_for_summary(dataset_info.get("misc_info", ""), "Dataset info is empty.")) | |
| 99 print("| Peek:") | |
| 100 print(_format_for_summary(dataset_info.get("peek", ""), "Peek unavailable.")) | |
| 101 except Exception: | |
| 102 print("| *PLANEMO ERROR FETCHING DATASET DETAILS*") | |
| 103 try: | |
| 104 provenance_info = _dataset_provenance(gi, history_id, history_content_id) | |
| 105 print("| Dataset Job Standard Output:") | |
| 106 print(_format_for_summary(provenance_info.get("stdout", ""), "Standard output was empty.")) | |
| 107 print("| Dataset Job Standard Error:") | |
| 108 print(_format_for_summary(provenance_info.get("stderr", ""), "Standard error was empty.")) | |
| 109 except Exception: | |
| 110 print("| *PLANEMO ERROR FETCHING JOB DETAILS*") | |
| 111 print("|") | |
| 112 | |
| 113 | |
| 114 def get_invocations(url, key, workflow_id): | |
| 115 inv_gi = gi(None, url, key) | |
| 116 invocations = inv_gi.workflows.get_invocations(workflow_id) | |
| 117 return {invocation['id']: { | |
| 118 'states': inv_gi.invocations.get_invocation_summary(invocation['id'])['states'], | |
| 119 'history_id': invocation['history_id']} | |
| 120 for invocation in invocations} | |
| 121 | |
| 122 | |
| 123 def _format_for_summary(blob, empty_message, prefix="| "): | |
| 124 contents = "\n".join(["%s%s" % (prefix, line.strip()) for line in StringIO(blob).readlines() if line.rstrip("\n\r")]) | |
| 125 return contents or "%s*%s*" % (prefix, empty_message) | |
| 126 | |
| 127 | |
| 128 def _dataset_provenance(gi, history_id, id): | |
| 129 provenance = gi.histories.show_dataset_provenance(history_id, id) | |
| 130 return provenance | |
| 131 | |
| 132 | |
| 133 __all__ = ( | |
| 134 "DEFAULT_ADMIN_API_KEY", | |
| 135 "gi", | |
| 136 "user_api_key", | |
| 137 ) |
