comparison env/lib/python3.9/site-packages/ephemeris/setup_data_libraries.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 #!/usr/bin/env python
2 '''Tool to setup data libraries on a galaxy instance'''
3 import argparse
4 import logging as log
5 import sys
6 import time
7
8 import yaml
9 from bioblend import galaxy
10
11 from .common_parser import get_common_args
12
13
14 def create_legacy(gi, desc):
15 destination = desc["destination"]
16 if destination["type"] != "library":
17 raise Exception("Only libraries may be created with pre-18.05 Galaxies using this script.")
18 library_name = destination.get("name")
19 library_description = destination.get("description")
20 library_synopsis = destination.get("synopsis")
21
22 # Check to see if the library already exists. If it does, do not recreate it. If it doesn't, create it.
23 lib_id = None
24 print("Library name: " + str(library_name))
25 rmt_lib_list = gi.libraries.get_libraries(name=library_name, deleted=False)
26 # Now we need to check if the library has been deleted since deleted=False still returns the deleted libraries!
27 not_deleted_rmt_lib_list = []
28 folder_id = None
29
30 if rmt_lib_list:
31 for x in rmt_lib_list:
32 if not x['deleted']:
33 not_deleted_rmt_lib_list.append(x)
34 if not_deleted_rmt_lib_list:
35 lib_id = not_deleted_rmt_lib_list[0]['id']
36 print("Library already exists! id: " + str(lib_id))
37 folder_id = gi.libraries.show_library(lib_id)['root_folder_id']
38 else:
39 lib = gi.libraries.create_library(library_name, library_description, library_synopsis)
40 lib_id = lib['id']
41 folder_id = lib['root_folder_id']
42
43 def populate_items(base_folder_id, has_items):
44 if "items" in has_items:
45 name = has_items.get("name")
46 description = has_items.get("description")
47 folder_id = base_folder_id
48 if name:
49 # Check to see if the folder already exists, if it doesn't create it.
50 rmt_folder_list = []
51 folder = gi.libraries.get_folders(lib_id, folder_id)
52 new_folder_name = "/" + name
53 if folder and not folder[0]['name'] == "/":
54 new_folder_name = folder[0]['name'] + "/" + name
55 rmt_folder_list = gi.libraries.get_folders(lib_id, name=new_folder_name)
56 if rmt_folder_list:
57 folder_id = rmt_folder_list[0]['id']
58 else:
59 folder = gi.libraries.create_folder(lib_id, name, description, base_folder_id=base_folder_id)
60 folder_id = folder[0]["id"]
61 for item in has_items["items"]:
62 populate_items(folder_id, item)
63 else:
64 src = has_items["src"]
65 if src != "url":
66 raise Exception("For pre-18.05 Galaxies only support URLs src items are supported.")
67 rmt_library_files = gi.folders.show_folder(base_folder_id, contents=True)['folder_contents']
68 file_names = []
69 for item in rmt_library_files:
70 if item['type'] == 'file':
71 file_names.append(item['name'])
72 if has_items['url'] not in file_names:
73 try:
74 gi.libraries.upload_file_from_url(
75 lib_id,
76 has_items['url'],
77 folder_id=base_folder_id,
78 file_type=has_items['ext']
79 )
80 except Exception:
81 log.exception("Could not upload %s to %s/%s", has_items['url'], lib_id, base_folder_id)
82 return None
83
84 populate_items(folder_id, desc)
85 return []
86
87
88 def create_batch_api(gi, desc):
89 hc = galaxy.histories.HistoryClient(gi)
90 tc = galaxy.tools.ToolClient(gi)
91
92 history = hc.create_history()
93 url = "%s/tools/fetch" % gi.url
94 payload = {
95 'targets': [desc],
96 'history_id': history["id"]
97 }
98 yield tc._post(payload=payload, url=url)
99
100
101 def setup_data_libraries(gi, data, training=False, legacy=False):
102 """
103 Load files into a Galaxy data library.
104 By default all test-data tools from all installed tools
105 will be linked into a data library.
106 """
107
108 log.info("Importing data libraries.")
109 jc = galaxy.jobs.JobsClient(gi)
110 config = galaxy.config.ConfigClient(gi)
111 version = config.get_version()
112
113 if legacy:
114 create_func = create_legacy
115 else:
116 version_major = version.get("version_major", "16.01")
117 create_func = create_batch_api if version_major >= "18.05" else create_legacy
118
119 library_def = yaml.safe_load(data)
120
121 def normalize_items(has_items):
122 # Synchronize Galaxy batch format with older training material style.
123 if "files" in has_items:
124 items = has_items.pop("files")
125 has_items["items"] = items
126
127 items = has_items.get("items", [])
128 for item in items:
129 normalize_items(item)
130 src = item.get("src")
131 url = item.get("url")
132 if src is None and url:
133 item["src"] = "url"
134 if "file_type" in item:
135 ext = item.pop("file_type")
136 item["ext"] = ext
137
138 # Normalize library definitions to allow older ephemeris style and native Galaxy batch
139 # upload formats.
140 if "libraries" in library_def:
141 # File contains multiple definitions.
142 library_def["items"] = library_def.pop("libraries")
143
144 if "destination" not in library_def:
145 library_def["destination"] = {"type": "library"}
146 destination = library_def["destination"]
147
148 if training:
149 destination["name"] = destination.get("name", 'Training Data')
150 destination["description"] = destination.get("description", 'Data pulled from online archives.')
151 else:
152 destination["name"] = destination.get("name", 'New Data Library')
153 destination["description"] = destination.get("description", '')
154
155 normalize_items(library_def)
156
157 if library_def:
158 jobs = list(create_func(gi, library_def))
159
160 job_ids = []
161 if legacy:
162 for job in jc.get_jobs():
163 # Fetch all upload job IDs, ignoring complete ones.
164 if job['tool_id'] == 'upload1' and job['state'] not in ('ok', 'error'):
165 job_ids.append(job['id'])
166
167 # Just have to check that all upload1 jobs are termianl.
168 else:
169 # Otherwise get back an actual list of jobs
170 for job in jobs:
171 if 'jobs' in job:
172 for subjob in job['jobs']:
173 job_ids.append(subjob['id'])
174
175 while True:
176 job_states = [jc.get_state(job) in ('ok', 'error', 'deleted') for job in job_ids]
177 log.debug('Job states: %s' % ','.join([
178 '%s=%s' % (job_id, job_state) for (job_id, job_state) in zip(job_ids, job_states)]))
179
180 if all(job_states):
181 break
182 time.sleep(3)
183
184 log.info("Finished importing test data.")
185
186
187 def _parser():
188 '''Constructs the parser object'''
189 parent = get_common_args()
190 parser = argparse.ArgumentParser(
191 parents=[parent],
192 description='Populate the Galaxy data library with data.'
193 )
194 parser.add_argument('-i', '--infile', required=True, type=argparse.FileType('r'))
195 parser.add_argument('--training', default=False, action="store_true",
196 help="Set defaults that make sense for training data.")
197 parser.add_argument('--legacy', default=False, action="store_true",
198 help="Use legacy APIs even for newer Galaxies that should have a batch upload API enabled.")
199 return parser
200
201
202 def main():
203 args = _parser().parse_args()
204 if args.user and args.password:
205 gi = galaxy.GalaxyInstance(url=args.galaxy, email=args.user, password=args.password)
206 elif args.api_key:
207 gi = galaxy.GalaxyInstance(url=args.galaxy, key=args.api_key)
208 else:
209 sys.exit('Please specify either a valid Galaxy username/password or an API key.')
210
211 if args.verbose:
212 log.basicConfig(level=log.DEBUG)
213
214 setup_data_libraries(gi, args.infile, training=args.training, legacy=args.legacy)
215
216
217 if __name__ == '__main__':
218 main()