Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/planemo/galaxy/api.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
| author | shellac |
|---|---|
| date | Sat, 02 May 2020 07:14:21 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:26e78fe6e8c4 |
|---|---|
| 1 """A high-level interface to local Galaxy instances using bioblend.""" | |
| 2 from six import StringIO | |
| 3 | |
| 4 from planemo.bioblend import ensure_module | |
| 5 from planemo.bioblend import galaxy | |
| 6 | |
| 7 DEFAULT_MASTER_API_KEY = "test_key" | |
| 8 | |
| 9 | |
| 10 def gi(port=None, url=None, key=None): | |
| 11 """Return a bioblend ``GalaxyInstance`` for Galaxy on this port.""" | |
| 12 ensure_module() | |
| 13 if key is None: | |
| 14 key = DEFAULT_MASTER_API_KEY | |
| 15 if port is None: | |
| 16 url = url | |
| 17 else: | |
| 18 url = "http://localhost:%d" % int(port) | |
| 19 | |
| 20 return galaxy.GalaxyInstance( | |
| 21 url=url, | |
| 22 key=key | |
| 23 ) | |
| 24 | |
| 25 | |
| 26 def user_api_key(admin_gi): | |
| 27 """Use an admin authenticated account to generate a user API key.""" | |
| 28 ensure_module() | |
| 29 # TODO: thread-safe | |
| 30 users = admin_gi.users | |
| 31 all_users = users.get_users() | |
| 32 | |
| 33 user_id = None | |
| 34 for user in all_users: | |
| 35 if user["email"] == "planemo@galaxyproject.org": | |
| 36 user_id = user["id"] | |
| 37 | |
| 38 if user_id is None: | |
| 39 # TODO: Allow override with --user_api_key. | |
| 40 galaxy_config = admin_gi.config.get_config() | |
| 41 use_remote_user = bool(galaxy_config["use_remote_user"]) | |
| 42 if not use_remote_user: | |
| 43 user_response = users.create_local_user( | |
| 44 "planemo", | |
| 45 "planemo@galaxyproject.org", | |
| 46 "planemo", | |
| 47 ) | |
| 48 user_id = user_response["id"] | |
| 49 else: | |
| 50 user_response = users.create_remote_user( | |
| 51 "planemo@galaxyproject.org", | |
| 52 ) | |
| 53 user_id = user_response["id"] | |
| 54 return users.create_user_apikey(user_id) | |
| 55 | |
| 56 | |
| 57 def summarize_history(ctx, gi, history_id): | |
| 58 """Summarize a history with print() based on similar code in Galaxy for populators. | |
| 59 """ | |
| 60 if not ctx.verbose: | |
| 61 return | |
| 62 | |
| 63 if history_id is None: | |
| 64 raise ValueError("summarize_history passed empty history_id") | |
| 65 try: | |
| 66 history_contents = gi.histories.show_history(history_id, contents=True) | |
| 67 except Exception: | |
| 68 print("Failed to fetch history contents in summarize_history.") | |
| 69 return | |
| 70 | |
| 71 for history_content in history_contents: | |
| 72 history_content_id = history_content.get('id', None) | |
| 73 print("| %d - %s (HID - NAME) " % (int(history_content['hid']), history_content['name'])) | |
| 74 if history_content['history_content_type'] == 'dataset_collection': | |
| 75 history_contents_json = gi.histories.show_dataset_collection(history_id, history_content["id"]) | |
| 76 print("| Dataset Collection: %s" % history_contents_json) | |
| 77 continue | |
| 78 try: | |
| 79 dataset_info = gi.histories.show_dataset(history_id, history_content_id) | |
| 80 print("| Dataset State:") | |
| 81 print(_format_for_summary(dataset_info.get("state"), "Dataset state is unknown.")) | |
| 82 print("| Dataset Blurb:") | |
| 83 print(_format_for_summary(dataset_info.get("misc_blurb", ""), "Dataset blurb was empty.")) | |
| 84 print("| Dataset Info:") | |
| 85 print(_format_for_summary(dataset_info.get("misc_info", ""), "Dataset info is empty.")) | |
| 86 print("| Peek:") | |
| 87 print(_format_for_summary(dataset_info.get("peek", ""), "Peek unavilable.")) | |
| 88 except Exception: | |
| 89 print("| *PLANEMO ERROR FETCHING DATASET DETAILS*") | |
| 90 try: | |
| 91 provenance_info = _dataset_provenance(gi, history_id, history_content_id) | |
| 92 print("| Dataset Job Standard Output:") | |
| 93 print(_format_for_summary(provenance_info.get("stdout", ""), "Standard output was empty.")) | |
| 94 print("| Dataset Job Standard Error:") | |
| 95 print(_format_for_summary(provenance_info.get("stderr", ""), "Standard error was empty.")) | |
| 96 except Exception: | |
| 97 print("| *PLANEMO ERROR FETCHING JOB DETAILS*") | |
| 98 print("|") | |
| 99 | |
| 100 | |
| 101 def _format_for_summary(blob, empty_message, prefix="| "): | |
| 102 contents = "\n".join(["%s%s" % (prefix, line.strip()) for line in StringIO(blob).readlines() if line.rstrip("\n\r")]) | |
| 103 return contents or "%s*%s*" % (prefix, empty_message) | |
| 104 | |
| 105 | |
| 106 def _dataset_provenance(gi, history_id, id): | |
| 107 provenance = gi.histories.show_dataset_provenance(history_id, id) | |
| 108 return provenance | |
| 109 | |
| 110 | |
| 111 __all__ = ( | |
| 112 "DEFAULT_MASTER_API_KEY", | |
| 113 "gi", | |
| 114 "user_api_key", | |
| 115 ) |
