Mercurial > repos > geco-team > gmql_upload
comparison gmql_rest_datasets.py @ 0:078d77023c34 draft default tip
planemo upload for repository https://github.com/lu-brn/gmql-galaxy commit 953ee36ceda5814dc9baa03427bc0eb4ee2e93bd-dirty
author | geco-team |
---|---|
date | Tue, 26 Jun 2018 08:59:19 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:078d77023c34 |
---|---|
1 # Galaxy plugin to REST access to the GMQL services | |
2 # (Datasets) | |
3 # ---------------------------------------------------------------------------- | |
4 # Luana Brancato, luana.brancato@mail.polimi.it | |
5 # ---------------------------------------------------------------------------- | |
6 | |
7 import argparse | |
8 | |
9 import tempfile | |
10 import json | |
11 from utilities import * | |
12 | |
13 module = 'repository' | |
14 | |
15 | |
16 def list_datasets(user, output, saveResult=True): | |
17 """Retrieve the list of available datasets""" | |
18 | |
19 call = 'list_datasets' | |
20 url = compose_url(module,call) | |
21 | |
22 datasets = get(url, user=user) | |
23 list_datasets = datasets['datasets'] | |
24 | |
25 if saveResult: | |
26 with open(output,'w') as f: | |
27 for ds in list_datasets: | |
28 f.write("{name}\t{owner}\n".format(name=ds['name'],owner=ds['owner'])) | |
29 f.close() | |
30 else: | |
31 return list_datasets | |
32 | |
33 | |
34 def list_samples(user, output, ds): | |
35 """List the samples of a given dataset""" | |
36 | |
37 call = 'list_samples' | |
38 url = compose_url(module,call) | |
39 | |
40 # Specify for which dataset. | |
41 # If it's a public dataset, the 'public.' prefix must be added to the dataset name | |
42 | |
43 # Check if the ds is public or not | |
44 owner = '' | |
45 for d in list_datasets(user, '', False): | |
46 if d['name'] == ds : | |
47 owner = d['owner'] | |
48 | |
49 if (owner=='public'): | |
50 url = url.format(datasetName='public.'+ ds) | |
51 else : | |
52 url = url.format(datasetName=ds) | |
53 | |
54 samples = get(url, user=user) | |
55 list_s = samples['samples'] | |
56 | |
57 with open(output, 'w') as f_out: | |
58 for s in list_s: | |
59 f_out.write("{id}\t{name}\t{ext}\n".format(id=s['id'], name=s['name'],ext=s['path'].rsplit('.',1)[1])) | |
60 | |
61 | |
62 def rename_dataset(user, output, ds, new): | |
63 """Rename a dataset from the user's private space""" | |
64 | |
65 call = 'rename_dataset' | |
66 url = compose_url(module,call) | |
67 url = url.format(datasetName=ds, newDatasetName=new) | |
68 | |
69 outcome = get(url, user=user) | |
70 | |
71 # Return the updated list of user's datasets | |
72 list_datasets(user, output) | |
73 | |
74 # Write on stdout the operation outcome | |
75 sys.stdout.write("Rename: {result}".format(result=outcome['result'])) | |
76 | |
77 | |
78 def delete_dataset(user, output, ds): | |
79 """Delete a dataset from the user's private space""" | |
80 | |
81 call = 'delete_dataset' | |
82 url = compose_url(module,call) | |
83 url = url.format(datasetName=ds) | |
84 | |
85 outcome = delete(url, user=user) | |
86 | |
87 #Return the updated list of user's datasets | |
88 list_datasets(user, output) | |
89 | |
90 #Write on stdout the operation outcome | |
91 sys.stdout.write("Delete: {result}".format(result=outcome['result'])) | |
92 | |
93 | |
94 def upload_samples_url(user, output, dataset, schema, samples, updatedDsList): | |
95 """Upload a dataset given the urls of the samples and their schema""" | |
96 | |
97 #Compose the url for the REST call | |
98 call = 'upload_url' | |
99 url = compose_url(module,call) | |
100 url = url.format(datasetName=dataset) | |
101 | |
102 content = dict() | |
103 | |
104 # Put back escaped '&' | |
105 samples = samples.replace('__amp__', '&') | |
106 schema = schema.replace('__amp__', '&') | |
107 | |
108 # If schema type is given, add the option to the url. Otherwise, it check if the provided schema is a valid url. | |
109 | |
110 params = dict () | |
111 | |
112 if schema in ['bed','bedGraph','NarrowPeak','BroadPeak','vcf'] : | |
113 params = add_url_param(params, module, call, schema) | |
114 else: | |
115 #check_schema = validators.url(schema) | |
116 #if isinstance(check_schema, validators.utils.ValidationFailure): stop_err("Schema URL not valid") | |
117 content.update(schema_file=schema) | |
118 | |
119 | |
120 # Samples are listed one per line. It lists them looking for the new line marker ('__cn__') | |
121 samples_list = samples.split('__cn__') | |
122 | |
123 # The regexp in input can allow a final empty string. The following removes it if present. | |
124 if not samples_list[-1]: | |
125 samples_list.remove("") | |
126 | |
127 # # For each sample url, check if it is valid. If at least ones is not, upload fails | |
128 # # and which one is saved in the outcome. | |
129 # for s in samples_list: | |
130 # check_url = validators.url(s) | |
131 # if isinstance(check_url, validators.utils.ValidationFailure): | |
132 # with open(output, 'w') as f_out: | |
133 # f_out.write("This resource couldn't be loaded (invalid url)\n") | |
134 # f_out.write("Line %d: %s" % (samples_list.index(s) + 1, s)) | |
135 # stop_err("Some URLs are not valid.\nCheck the output file for details.") | |
136 | |
137 content.update(data_files=samples_list) | |
138 | |
139 result = post(url, content, user=user, params=params) | |
140 | |
141 #Return the list of updated samples | |
142 list_imported(result, output) | |
143 | |
144 #Return the updated list of samples | |
145 list_datasets(user, updatedDsList) | |
146 | |
147 | |
148 def upload_samples(user, output, dataset, schema, samples, updatedDsList): | |
149 """Upload a dataset from the local instance""" | |
150 | |
151 | |
152 #Compose the url for the REST call | |
153 call = 'upload_data' | |
154 url = compose_url(module, call) | |
155 url = url.format(datasetName=dataset) | |
156 | |
157 #Files dict for payload | |
158 files = dict () | |
159 | |
160 # If the schema type is give, add the option to the url. | |
161 params = dict () | |
162 | |
163 | |
164 if schema in ['bed','bedGraph','NarrowPeak','BroadPeak','vcf'] : | |
165 params = add_url_param(params, module, call, schema) | |
166 else : | |
167 # Add the schema given to the payload dictionary | |
168 files.update({'schema' : ('{ds}.xml'.format(ds=dataset), open(schema, 'rb'))}) | |
169 | |
170 # Read samples file path and add them to the form object | |
171 # The structure is | |
172 # FILENAME PATH | |
173 | |
174 with open(samples, "r") as file: | |
175 s = map(lambda x: x.rstrip('\n').split('\t'), file) | |
176 s = [x for x in s if len(x)>1] | |
177 #s = [x for x in s if x[0].__contains__('.dat')] | |
178 map(lambda x: files.update({'file%d' % (s.index(x) + 1) : (x[0], open(x[1], 'rb'))}), s) | |
179 | |
180 # Post call | |
181 | |
182 result = post(url, files, user=user, params=params, content_type='multiform') | |
183 | |
184 #Return the list of updated samples | |
185 list_imported(result, output) | |
186 | |
187 | |
188 #Return the updated list of samples | |
189 list_datasets(user, updatedDsList) | |
190 | |
191 | |
192 | |
193 def list_imported(result, output) : | |
194 """When uploading a ds, the server returns a json object describing what has been imported | |
195 INPUT JSON FIELDS - | |
196 imported: samples imported with their metadata | |
197 autoMetadata: samples imported without metadata (those have been auto generated) """ | |
198 | |
199 samples = list () | |
200 | |
201 if 'imported' in result : | |
202 imported = result.get('imported') | |
203 if imported: | |
204 samples.append(result.get('imported')) | |
205 if 'autoMetadata' in result : | |
206 am = result.get('autoMetadata') | |
207 if am : | |
208 samples.append(result.get('autoMetadata')) | |
209 | |
210 | |
211 with open(output, 'w') as f_out: | |
212 for l in samples: | |
213 for s in l : | |
214 if 'id' in s and s['id']: | |
215 id = s['id'] | |
216 else : | |
217 id = l.index(s) + 1 | |
218 if 'path' in s : | |
219 ext = s['path'].rsplit('.')[1] | |
220 else : | |
221 n = s['name'].rsplit('.') | |
222 if n.__len__()>1 : | |
223 ext = n[1] | |
224 else : | |
225 ext = '' | |
226 | |
227 name = s['name'] | |
228 | |
229 f_out.write("{id}\t{name}\t{ext}\n".format(id=id, name=name,ext=ext)) | |
230 | |
231 | |
232 def download_samples(user, output, dataset): | |
233 """ Download the samples of the given dataset in form of a compressed zip archive.""" | |
234 | |
235 call = 'download_zip' | |
236 | |
237 url = compose_url(module, call) | |
238 url = url.format(datasetName=dataset) | |
239 | |
240 # Fetch the archive. | |
241 | |
242 data = get(url, user=user, response_type='zip') | |
243 | |
244 with open(output, 'wb') as fd: | |
245 for chunk in data.iter_content(chunk_size=128): | |
246 fd.write(chunk) | |
247 | |
248 def get_sample(user, output, dataset, name): | |
249 """Retrieve a sample given its name and the dataset it belongs to""" | |
250 | |
251 call = 'download_sample' | |
252 | |
253 url = compose_url(module,call) | |
254 url = url.format(datasetName=dataset,sample=name) | |
255 | |
256 data = get(url, user=user, response_type='file') | |
257 | |
258 with open(output, 'wb') as fd: | |
259 for chunk in data.iter_content(chunk_size=128): | |
260 fd.write(chunk) | |
261 | |
262 | |
263 def get_sample_meta(user, output, dataset, name): | |
264 """Retrieve a sample metadata given its name and the dataset it belongs to""" | |
265 | |
266 call = 'download_meta' | |
267 | |
268 url = compose_url(module, call) | |
269 url = url.format(datasetName=dataset, sample=name) | |
270 | |
271 data = get(url, user=user, response_type='file') | |
272 | |
273 with open(output, 'wb') as fd: | |
274 for chunk in data.iter_content(chunk_size=128): | |
275 fd.write(chunk) | |
276 | |
277 def import_samples(user, ds) : | |
278 | |
279 | |
280 # Retrieve the list of the samples in the resulting dataset | |
281 # The list is stored in a temporary file | |
282 temp = tempfile.NamedTemporaryFile(delete=False) | |
283 list_samples(user, temp.name, ds) | |
284 | |
285 # Retrieve names and extensions of the samples | |
286 with open(temp.name, "r") as t: | |
287 samples = map(lambda x: helper_samples(x), t) | |
288 t.close() | |
289 | |
290 os.makedirs('samples') | |
291 os.makedirs('metadata') | |
292 | |
293 # Create a new dict containing names and actual path to files | |
294 | |
295 for s in samples: | |
296 | |
297 # Get the sample | |
298 get_sample(user, "samples/{name}.{ext}".format(name=s['name'].replace('_',''), ext=s['ext']), ds, s['name']) | |
299 | |
300 # Get its metadata | |
301 get_sample_meta(user,"metadata/{name}.meta".format(name=s['name'].replace('_','')),ds,s['name']) | |
302 | |
303 def helper_samples(s): | |
304 """From a list of samples retrieve name and extension""" | |
305 split = s.split('\t') | |
306 sample = dict() | |
307 sample.update(name=split[1]) | |
308 sample.update(ext=split[2].rstrip('\n')) | |
309 | |
310 return sample | |
311 | |
312 | |
313 def get_schema(user, ds, file) : | |
314 """Get the schema field of the input dataset and save it in file""" | |
315 | |
316 call = "schema" | |
317 | |
318 url = compose_url(module, call) | |
319 | |
320 # Check if the ds is public or not | |
321 owner = '' | |
322 for d in list_datasets(user, '', False): | |
323 if d['name'] == ds : | |
324 owner = d['owner'] | |
325 | |
326 if (owner=='public'): | |
327 url = url.format(datasetName='public.'+ ds) | |
328 else : | |
329 url = url.format(datasetName=ds) | |
330 | |
331 schema = get(url, user=user) | |
332 | |
333 | |
334 with open(file,'w') as f_out: | |
335 for f in schema['fields'] : | |
336 f_out.write('{field}\t{type}\n'.format(field=f['name'],type=f['type'])) | |
337 | |
338 | |
339 | |
340 def set_columns_names(user, ds_name, samples_file, schema_file): | |
341 | |
342 get_schema(user,ds_name, schema_file) | |
343 | |
344 cwd = os.getcwd().rsplit('/',1)[0] | |
345 file = '/'.join([cwd, 'galaxy.json']) | |
346 | |
347 with open(schema_file, 'r') as f_in: | |
348 columns = [x.split('\t') for x in f_in] | |
349 column_names = [x[0] for x in columns] | |
350 column_types = [x[1].rstrip('\n') for x in columns] | |
351 | |
352 metadata = dict() | |
353 metadata.update(column_names=column_names, | |
354 column_types=column_types) | |
355 | |
356 | |
357 with open(file, 'w') as f_out: | |
358 with open(samples_file, 'r') as f_in: | |
359 samples_list = map(lambda x: x, f_in) | |
360 samples_list.pop() | |
361 for s in samples_list: | |
362 config = dict() | |
363 config.update(type='new_primary_dataset', | |
364 filename=s, | |
365 metadata=metadata) | |
366 f_out.write(json.dumps(config) + '\n') | |
367 | |
368 | |
369 | |
370 def stop_err(msg): | |
371 sys.stderr.write("%s\n" % msg) | |
372 sys.exit() | |
373 | |
374 | |
375 def __main__(): | |
376 | |
377 parser = argparse.ArgumentParser() | |
378 parser.add_argument("output") | |
379 parser.add_argument("-opt_out1") | |
380 parser.add_argument("-user") | |
381 parser.add_argument("-cmd") | |
382 parser.add_argument("-samples") | |
383 parser.add_argument("-dataset") | |
384 parser.add_argument("-new_name") | |
385 parser.add_argument("-schema") | |
386 parser.add_argument("-add_output") | |
387 | |
388 args = parser.parse_args() | |
389 | |
390 if args.cmd == 'list': | |
391 list_datasets(args.user, args.output) | |
392 if args.cmd == 'samples': | |
393 list_samples(args.user, args.output, args.dataset) | |
394 if args.cmd == 'rename' : | |
395 rename_dataset(args.user, args.output, args.dataset, args.new_name) | |
396 if args.cmd == 'delete': | |
397 delete_dataset(args.user, args.output, args.dataset) | |
398 if args.cmd == 'upload_url': | |
399 upload_samples_url(args.user, args.output, args.dataset, args.schema, args.samples, args.add_output) | |
400 if args.cmd == 'upload' : | |
401 upload_samples(args.user, args.output, args.dataset, args.schema, args.samples, args.add_output) | |
402 if args.cmd == 'import': | |
403 import_samples(args.user, args.dataset) | |
404 if args.cmd == 'download' : | |
405 download_samples(args.user,args.output,args.dataset) | |
406 if args.cmd == 'schema' : | |
407 set_columns_names(args.user, args.dataset, args.samples, args.output) | |
408 | |
409 | |
410 if __name__ == "__main__": | |
411 __main__() |