comparison gmql_rest_datasets.py @ 0:c74a1c7121ec draft default tip

planemo upload for repository https://github.com/lu-brn/gmql-galaxy commit 953ee36ceda5814dc9baa03427bc0eb4ee2e93bd-dirty
author geco-team
date Tue, 26 Jun 2018 08:59:49 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:c74a1c7121ec
1 # Galaxy plugin to REST access to the GMQL services
2 # (Datasets)
3 # ----------------------------------------------------------------------------
4 # Luana Brancato, luana.brancato@mail.polimi.it
5 # ----------------------------------------------------------------------------
6
7 import argparse
8
9 import tempfile
10 import json
11 from utilities import *
12
13 module = 'repository'
14
15
16 def list_datasets(user, output, saveResult=True):
17 """Retrieve the list of available datasets"""
18
19 call = 'list_datasets'
20 url = compose_url(module,call)
21
22 datasets = get(url, user=user)
23 list_datasets = datasets['datasets']
24
25 if saveResult:
26 with open(output,'w') as f:
27 for ds in list_datasets:
28 f.write("{name}\t{owner}\n".format(name=ds['name'],owner=ds['owner']))
29 f.close()
30 else:
31 return list_datasets
32
33
34 def list_samples(user, output, ds):
35 """List the samples of a given dataset"""
36
37 call = 'list_samples'
38 url = compose_url(module,call)
39
40 # Specify for which dataset.
41 # If it's a public dataset, the 'public.' prefix must be added to the dataset name
42
43 # Check if the ds is public or not
44 owner = ''
45 for d in list_datasets(user, '', False):
46 if d['name'] == ds :
47 owner = d['owner']
48
49 if (owner=='public'):
50 url = url.format(datasetName='public.'+ ds)
51 else :
52 url = url.format(datasetName=ds)
53
54 samples = get(url, user=user)
55 list_s = samples['samples']
56
57 with open(output, 'w') as f_out:
58 for s in list_s:
59 f_out.write("{id}\t{name}\t{ext}\n".format(id=s['id'], name=s['name'],ext=s['path'].rsplit('.',1)[1]))
60
61
62 def rename_dataset(user, output, ds, new):
63 """Rename a dataset from the user's private space"""
64
65 call = 'rename_dataset'
66 url = compose_url(module,call)
67 url = url.format(datasetName=ds, newDatasetName=new)
68
69 outcome = get(url, user=user)
70
71 # Return the updated list of user's datasets
72 list_datasets(user, output)
73
74 # Write on stdout the operation outcome
75 sys.stdout.write("Rename: {result}".format(result=outcome['result']))
76
77
78 def delete_dataset(user, output, ds):
79 """Delete a dataset from the user's private space"""
80
81 call = 'delete_dataset'
82 url = compose_url(module,call)
83 url = url.format(datasetName=ds)
84
85 outcome = delete(url, user=user)
86
87 #Return the updated list of user's datasets
88 list_datasets(user, output)
89
90 #Write on stdout the operation outcome
91 sys.stdout.write("Delete: {result}".format(result=outcome['result']))
92
93
94 def upload_samples_url(user, output, dataset, schema, samples, updatedDsList):
95 """Upload a dataset given the urls of the samples and their schema"""
96
97 #Compose the url for the REST call
98 call = 'upload_url'
99 url = compose_url(module,call)
100 url = url.format(datasetName=dataset)
101
102 content = dict()
103
104 # Put back escaped '&'
105 samples = samples.replace('__amp__', '&')
106 schema = schema.replace('__amp__', '&')
107
108 # If schema type is given, add the option to the url. Otherwise, it check if the provided schema is a valid url.
109
110 params = dict ()
111
112 if schema in ['bed','bedGraph','NarrowPeak','BroadPeak','vcf'] :
113 params = add_url_param(params, module, call, schema)
114 else:
115 #check_schema = validators.url(schema)
116 #if isinstance(check_schema, validators.utils.ValidationFailure): stop_err("Schema URL not valid")
117 content.update(schema_file=schema)
118
119
120 # Samples are listed one per line. It lists them looking for the new line marker ('__cn__')
121 samples_list = samples.split('__cn__')
122
123 # The regexp in input can allow a final empty string. The following removes it if present.
124 if not samples_list[-1]:
125 samples_list.remove("")
126
127 # # For each sample url, check if it is valid. If at least ones is not, upload fails
128 # # and which one is saved in the outcome.
129 # for s in samples_list:
130 # check_url = validators.url(s)
131 # if isinstance(check_url, validators.utils.ValidationFailure):
132 # with open(output, 'w') as f_out:
133 # f_out.write("This resource couldn't be loaded (invalid url)\n")
134 # f_out.write("Line %d: %s" % (samples_list.index(s) + 1, s))
135 # stop_err("Some URLs are not valid.\nCheck the output file for details.")
136
137 content.update(data_files=samples_list)
138
139 result = post(url, content, user=user, params=params)
140
141 #Return the list of updated samples
142 list_imported(result, output)
143
144 #Return the updated list of samples
145 list_datasets(user, updatedDsList)
146
147
148 def upload_samples(user, output, dataset, schema, samples, updatedDsList):
149 """Upload a dataset from the local instance"""
150
151
152 #Compose the url for the REST call
153 call = 'upload_data'
154 url = compose_url(module, call)
155 url = url.format(datasetName=dataset)
156
157 #Files dict for payload
158 files = dict ()
159
160 # If the schema type is give, add the option to the url.
161 params = dict ()
162
163
164 if schema in ['bed','bedGraph','NarrowPeak','BroadPeak','vcf'] :
165 params = add_url_param(params, module, call, schema)
166 else :
167 # Add the schema given to the payload dictionary
168 files.update({'schema' : ('{ds}.xml'.format(ds=dataset), open(schema, 'rb'))})
169
170 # Read samples file path and add them to the form object
171 # The structure is
172 # FILENAME PATH
173
174 with open(samples, "r") as file:
175 s = map(lambda x: x.rstrip('\n').split('\t'), file)
176 s = [x for x in s if len(x)>1]
177 #s = [x for x in s if x[0].__contains__('.dat')]
178 map(lambda x: files.update({'file%d' % (s.index(x) + 1) : (x[0], open(x[1], 'rb'))}), s)
179
180 # Post call
181
182 result = post(url, files, user=user, params=params, content_type='multiform')
183
184 #Return the list of updated samples
185 list_imported(result, output)
186
187
188 #Return the updated list of samples
189 list_datasets(user, updatedDsList)
190
191
192
193 def list_imported(result, output) :
194 """When uploading a ds, the server returns a json object describing what has been imported
195 INPUT JSON FIELDS -
196 imported: samples imported with their metadata
197 autoMetadata: samples imported without metadata (those have been auto generated) """
198
199 samples = list ()
200
201 if 'imported' in result :
202 imported = result.get('imported')
203 if imported:
204 samples.append(result.get('imported'))
205 if 'autoMetadata' in result :
206 am = result.get('autoMetadata')
207 if am :
208 samples.append(result.get('autoMetadata'))
209
210
211 with open(output, 'w') as f_out:
212 for l in samples:
213 for s in l :
214 if 'id' in s and s['id']:
215 id = s['id']
216 else :
217 id = l.index(s) + 1
218 if 'path' in s :
219 ext = s['path'].rsplit('.')[1]
220 else :
221 n = s['name'].rsplit('.')
222 if n.__len__()>1 :
223 ext = n[1]
224 else :
225 ext = ''
226
227 name = s['name']
228
229 f_out.write("{id}\t{name}\t{ext}\n".format(id=id, name=name,ext=ext))
230
231
232 def download_samples(user, output, dataset):
233 """ Download the samples of the given dataset in form of a compressed zip archive."""
234
235 call = 'download_zip'
236
237 url = compose_url(module, call)
238 url = url.format(datasetName=dataset)
239
240 # Fetch the archive.
241
242 data = get(url, user=user, response_type='zip')
243
244 with open(output, 'wb') as fd:
245 for chunk in data.iter_content(chunk_size=128):
246 fd.write(chunk)
247
248 def get_sample(user, output, dataset, name):
249 """Retrieve a sample given its name and the dataset it belongs to"""
250
251 call = 'download_sample'
252
253 url = compose_url(module,call)
254 url = url.format(datasetName=dataset,sample=name)
255
256 data = get(url, user=user, response_type='file')
257
258 with open(output, 'wb') as fd:
259 for chunk in data.iter_content(chunk_size=128):
260 fd.write(chunk)
261
262
263 def get_sample_meta(user, output, dataset, name):
264 """Retrieve a sample metadata given its name and the dataset it belongs to"""
265
266 call = 'download_meta'
267
268 url = compose_url(module, call)
269 url = url.format(datasetName=dataset, sample=name)
270
271 data = get(url, user=user, response_type='file')
272
273 with open(output, 'wb') as fd:
274 for chunk in data.iter_content(chunk_size=128):
275 fd.write(chunk)
276
277 def import_samples(user, ds) :
278
279
280 # Retrieve the list of the samples in the resulting dataset
281 # The list is stored in a temporary file
282 temp = tempfile.NamedTemporaryFile(delete=False)
283 list_samples(user, temp.name, ds)
284
285 # Retrieve names and extensions of the samples
286 with open(temp.name, "r") as t:
287 samples = map(lambda x: helper_samples(x), t)
288 t.close()
289
290 os.makedirs('samples')
291 os.makedirs('metadata')
292
293 # Create a new dict containing names and actual path to files
294
295 for s in samples:
296
297 # Get the sample
298 get_sample(user, "samples/{name}.{ext}".format(name=s['name'].replace('_',''), ext=s['ext']), ds, s['name'])
299
300 # Get its metadata
301 get_sample_meta(user,"metadata/{name}.meta".format(name=s['name'].replace('_','')),ds,s['name'])
302
303 def helper_samples(s):
304 """From a list of samples retrieve name and extension"""
305 split = s.split('\t')
306 sample = dict()
307 sample.update(name=split[1])
308 sample.update(ext=split[2].rstrip('\n'))
309
310 return sample
311
312
313 def get_schema(user, ds, file) :
314 """Get the schema field of the input dataset and save it in file"""
315
316 call = "schema"
317
318 url = compose_url(module, call)
319
320 # Check if the ds is public or not
321 owner = ''
322 for d in list_datasets(user, '', False):
323 if d['name'] == ds :
324 owner = d['owner']
325
326 if (owner=='public'):
327 url = url.format(datasetName='public.'+ ds)
328 else :
329 url = url.format(datasetName=ds)
330
331 schema = get(url, user=user)
332
333
334 with open(file,'w') as f_out:
335 for f in schema['fields'] :
336 f_out.write('{field}\t{type}\n'.format(field=f['name'],type=f['type']))
337
338
339
340 def set_columns_names(user, ds_name, samples_file, schema_file):
341
342 get_schema(user,ds_name, schema_file)
343
344 cwd = os.getcwd().rsplit('/',1)[0]
345 file = '/'.join([cwd, 'galaxy.json'])
346
347 with open(schema_file, 'r') as f_in:
348 columns = [x.split('\t') for x in f_in]
349 column_names = [x[0] for x in columns]
350 column_types = [x[1].rstrip('\n') for x in columns]
351
352 metadata = dict()
353 metadata.update(column_names=column_names,
354 column_types=column_types)
355
356
357 with open(file, 'w') as f_out:
358 with open(samples_file, 'r') as f_in:
359 samples_list = map(lambda x: x, f_in)
360 samples_list.pop()
361 for s in samples_list:
362 config = dict()
363 config.update(type='new_primary_dataset',
364 filename=s,
365 metadata=metadata)
366 f_out.write(json.dumps(config) + '\n')
367
368
369
370 def stop_err(msg):
371 sys.stderr.write("%s\n" % msg)
372 sys.exit()
373
374
375 def __main__():
376
377 parser = argparse.ArgumentParser()
378 parser.add_argument("output")
379 parser.add_argument("-opt_out1")
380 parser.add_argument("-user")
381 parser.add_argument("-cmd")
382 parser.add_argument("-samples")
383 parser.add_argument("-dataset")
384 parser.add_argument("-new_name")
385 parser.add_argument("-schema")
386 parser.add_argument("-add_output")
387
388 args = parser.parse_args()
389
390 if args.cmd == 'list':
391 list_datasets(args.user, args.output)
392 if args.cmd == 'samples':
393 list_samples(args.user, args.output, args.dataset)
394 if args.cmd == 'rename' :
395 rename_dataset(args.user, args.output, args.dataset, args.new_name)
396 if args.cmd == 'delete':
397 delete_dataset(args.user, args.output, args.dataset)
398 if args.cmd == 'upload_url':
399 upload_samples_url(args.user, args.output, args.dataset, args.schema, args.samples, args.add_output)
400 if args.cmd == 'upload' :
401 upload_samples(args.user, args.output, args.dataset, args.schema, args.samples, args.add_output)
402 if args.cmd == 'import':
403 import_samples(args.user, args.dataset)
404 if args.cmd == 'download' :
405 download_samples(args.user,args.output,args.dataset)
406 if args.cmd == 'schema' :
407 set_columns_names(args.user, args.dataset, args.samples, args.output)
408
409
410 if __name__ == "__main__":
411 __main__()