Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/planemo/galaxy/api.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 """A high-level interface to local Galaxy instances using bioblend.""" | |
2 from six import StringIO | |
3 | |
4 from planemo.bioblend import ensure_module | |
5 from planemo.bioblend import galaxy | |
6 | |
7 DEFAULT_ADMIN_API_KEY = "test_key" | |
8 | |
9 | |
10 def gi(port=None, url=None, key=None): | |
11 """Return a bioblend ``GalaxyInstance`` for Galaxy on this port.""" | |
12 ensure_module() | |
13 if key is None: | |
14 key = DEFAULT_ADMIN_API_KEY | |
15 if port is None: | |
16 url = url | |
17 else: | |
18 url = "http://localhost:%d" % int(port) | |
19 | |
20 return galaxy.GalaxyInstance( | |
21 url=url, | |
22 key=key | |
23 ) | |
24 | |
25 | |
26 def test_credentials_valid(port=None, url=None, key=None, is_admin=False): | |
27 """Test if provided API credentials are valid""" | |
28 test_gi = gi(port, url, key) | |
29 try: | |
30 current_user = test_gi.users.get_current_user() | |
31 if is_admin: | |
32 return current_user['is_admin'] | |
33 else: | |
34 return True | |
35 except Exception: | |
36 return False | |
37 | |
38 | |
39 def user_api_key(admin_gi): | |
40 """Use an admin authenticated account to generate a user API key.""" | |
41 ensure_module() | |
42 # TODO: thread-safe | |
43 users = admin_gi.users | |
44 all_users = users.get_users() | |
45 | |
46 user_id = None | |
47 for user in all_users: | |
48 if user["email"] == "planemo@galaxyproject.org": | |
49 user_id = user["id"] | |
50 | |
51 if user_id is None: | |
52 # TODO: Allow override with --user_api_key. | |
53 galaxy_config = admin_gi.config.get_config() | |
54 use_remote_user = bool(galaxy_config["use_remote_user"]) | |
55 if not use_remote_user: | |
56 user_response = users.create_local_user( | |
57 "planemo", | |
58 "planemo@galaxyproject.org", | |
59 "planemo", | |
60 ) | |
61 user_id = user_response["id"] | |
62 else: | |
63 user_response = users.create_remote_user( | |
64 "planemo@galaxyproject.org", | |
65 ) | |
66 user_id = user_response["id"] | |
67 return users.create_user_apikey(user_id) | |
68 | |
69 | |
70 def summarize_history(ctx, gi, history_id): | |
71 """Summarize a history with print() based on similar code in Galaxy for populators. | |
72 """ | |
73 if not ctx.verbose: | |
74 return | |
75 | |
76 if history_id is None: | |
77 raise ValueError("summarize_history passed empty history_id") | |
78 try: | |
79 history_contents = gi.histories.show_history(history_id, contents=True) | |
80 except Exception: | |
81 print("Failed to fetch history contents in summarize_history.") | |
82 return | |
83 | |
84 for history_content in history_contents: | |
85 history_content_id = history_content.get('id', None) | |
86 print("| %d - %s (HID - NAME) " % (int(history_content['hid']), history_content['name'])) | |
87 if history_content['history_content_type'] == 'dataset_collection': | |
88 history_contents_json = gi.histories.show_dataset_collection(history_id, history_content["id"]) | |
89 print("| Dataset Collection: %s" % history_contents_json) | |
90 continue | |
91 try: | |
92 dataset_info = gi.histories.show_dataset(history_id, history_content_id) | |
93 print("| Dataset State:") | |
94 print(_format_for_summary(dataset_info.get("state"), "Dataset state is unknown.")) | |
95 print("| Dataset Blurb:") | |
96 print(_format_for_summary(dataset_info.get("misc_blurb", ""), "Dataset blurb was empty.")) | |
97 print("| Dataset Info:") | |
98 print(_format_for_summary(dataset_info.get("misc_info", ""), "Dataset info is empty.")) | |
99 print("| Peek:") | |
100 print(_format_for_summary(dataset_info.get("peek", ""), "Peek unavailable.")) | |
101 except Exception: | |
102 print("| *PLANEMO ERROR FETCHING DATASET DETAILS*") | |
103 try: | |
104 provenance_info = _dataset_provenance(gi, history_id, history_content_id) | |
105 print("| Dataset Job Standard Output:") | |
106 print(_format_for_summary(provenance_info.get("stdout", ""), "Standard output was empty.")) | |
107 print("| Dataset Job Standard Error:") | |
108 print(_format_for_summary(provenance_info.get("stderr", ""), "Standard error was empty.")) | |
109 except Exception: | |
110 print("| *PLANEMO ERROR FETCHING JOB DETAILS*") | |
111 print("|") | |
112 | |
113 | |
114 def get_invocations(url, key, workflow_id): | |
115 inv_gi = gi(None, url, key) | |
116 invocations = inv_gi.workflows.get_invocations(workflow_id) | |
117 return {invocation['id']: { | |
118 'states': inv_gi.invocations.get_invocation_summary(invocation['id'])['states'], | |
119 'history_id': invocation['history_id']} | |
120 for invocation in invocations} | |
121 | |
122 | |
123 def _format_for_summary(blob, empty_message, prefix="| "): | |
124 contents = "\n".join(["%s%s" % (prefix, line.strip()) for line in StringIO(blob).readlines() if line.rstrip("\n\r")]) | |
125 return contents or "%s*%s*" % (prefix, empty_message) | |
126 | |
127 | |
128 def _dataset_provenance(gi, history_id, id): | |
129 provenance = gi.histories.show_dataset_provenance(history_id, id) | |
130 return provenance | |
131 | |
132 | |
133 __all__ = ( | |
134 "DEFAULT_ADMIN_API_KEY", | |
135 "gi", | |
136 "user_api_key", | |
137 ) |