Mercurial > repos > geco-team > gmql_upload
comparison gmql_rest_datasets.py @ 0:078d77023c34 draft default tip
planemo upload for repository https://github.com/lu-brn/gmql-galaxy commit 953ee36ceda5814dc9baa03427bc0eb4ee2e93bd-dirty
| author | geco-team |
|---|---|
| date | Tue, 26 Jun 2018 08:59:19 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:078d77023c34 |
|---|---|
| 1 # Galaxy plugin to REST access to the GMQL services | |
| 2 # (Datasets) | |
| 3 # ---------------------------------------------------------------------------- | |
| 4 # Luana Brancato, luana.brancato@mail.polimi.it | |
| 5 # ---------------------------------------------------------------------------- | |
| 6 | |
| 7 import argparse | |
| 8 | |
| 9 import tempfile | |
| 10 import json | |
| 11 from utilities import * | |
| 12 | |
| 13 module = 'repository' | |
| 14 | |
| 15 | |
| 16 def list_datasets(user, output, saveResult=True): | |
| 17 """Retrieve the list of available datasets""" | |
| 18 | |
| 19 call = 'list_datasets' | |
| 20 url = compose_url(module,call) | |
| 21 | |
| 22 datasets = get(url, user=user) | |
| 23 list_datasets = datasets['datasets'] | |
| 24 | |
| 25 if saveResult: | |
| 26 with open(output,'w') as f: | |
| 27 for ds in list_datasets: | |
| 28 f.write("{name}\t{owner}\n".format(name=ds['name'],owner=ds['owner'])) | |
| 29 f.close() | |
| 30 else: | |
| 31 return list_datasets | |
| 32 | |
| 33 | |
| 34 def list_samples(user, output, ds): | |
| 35 """List the samples of a given dataset""" | |
| 36 | |
| 37 call = 'list_samples' | |
| 38 url = compose_url(module,call) | |
| 39 | |
| 40 # Specify for which dataset. | |
| 41 # If it's a public dataset, the 'public.' prefix must be added to the dataset name | |
| 42 | |
| 43 # Check if the ds is public or not | |
| 44 owner = '' | |
| 45 for d in list_datasets(user, '', False): | |
| 46 if d['name'] == ds : | |
| 47 owner = d['owner'] | |
| 48 | |
| 49 if (owner=='public'): | |
| 50 url = url.format(datasetName='public.'+ ds) | |
| 51 else : | |
| 52 url = url.format(datasetName=ds) | |
| 53 | |
| 54 samples = get(url, user=user) | |
| 55 list_s = samples['samples'] | |
| 56 | |
| 57 with open(output, 'w') as f_out: | |
| 58 for s in list_s: | |
| 59 f_out.write("{id}\t{name}\t{ext}\n".format(id=s['id'], name=s['name'],ext=s['path'].rsplit('.',1)[1])) | |
| 60 | |
| 61 | |
| 62 def rename_dataset(user, output, ds, new): | |
| 63 """Rename a dataset from the user's private space""" | |
| 64 | |
| 65 call = 'rename_dataset' | |
| 66 url = compose_url(module,call) | |
| 67 url = url.format(datasetName=ds, newDatasetName=new) | |
| 68 | |
| 69 outcome = get(url, user=user) | |
| 70 | |
| 71 # Return the updated list of user's datasets | |
| 72 list_datasets(user, output) | |
| 73 | |
| 74 # Write on stdout the operation outcome | |
| 75 sys.stdout.write("Rename: {result}".format(result=outcome['result'])) | |
| 76 | |
| 77 | |
| 78 def delete_dataset(user, output, ds): | |
| 79 """Delete a dataset from the user's private space""" | |
| 80 | |
| 81 call = 'delete_dataset' | |
| 82 url = compose_url(module,call) | |
| 83 url = url.format(datasetName=ds) | |
| 84 | |
| 85 outcome = delete(url, user=user) | |
| 86 | |
| 87 #Return the updated list of user's datasets | |
| 88 list_datasets(user, output) | |
| 89 | |
| 90 #Write on stdout the operation outcome | |
| 91 sys.stdout.write("Delete: {result}".format(result=outcome['result'])) | |
| 92 | |
| 93 | |
| 94 def upload_samples_url(user, output, dataset, schema, samples, updatedDsList): | |
| 95 """Upload a dataset given the urls of the samples and their schema""" | |
| 96 | |
| 97 #Compose the url for the REST call | |
| 98 call = 'upload_url' | |
| 99 url = compose_url(module,call) | |
| 100 url = url.format(datasetName=dataset) | |
| 101 | |
| 102 content = dict() | |
| 103 | |
| 104 # Put back escaped '&' | |
| 105 samples = samples.replace('__amp__', '&') | |
| 106 schema = schema.replace('__amp__', '&') | |
| 107 | |
| 108 # If schema type is given, add the option to the url. Otherwise, it check if the provided schema is a valid url. | |
| 109 | |
| 110 params = dict () | |
| 111 | |
| 112 if schema in ['bed','bedGraph','NarrowPeak','BroadPeak','vcf'] : | |
| 113 params = add_url_param(params, module, call, schema) | |
| 114 else: | |
| 115 #check_schema = validators.url(schema) | |
| 116 #if isinstance(check_schema, validators.utils.ValidationFailure): stop_err("Schema URL not valid") | |
| 117 content.update(schema_file=schema) | |
| 118 | |
| 119 | |
| 120 # Samples are listed one per line. It lists them looking for the new line marker ('__cn__') | |
| 121 samples_list = samples.split('__cn__') | |
| 122 | |
| 123 # The regexp in input can allow a final empty string. The following removes it if present. | |
| 124 if not samples_list[-1]: | |
| 125 samples_list.remove("") | |
| 126 | |
| 127 # # For each sample url, check if it is valid. If at least ones is not, upload fails | |
| 128 # # and which one is saved in the outcome. | |
| 129 # for s in samples_list: | |
| 130 # check_url = validators.url(s) | |
| 131 # if isinstance(check_url, validators.utils.ValidationFailure): | |
| 132 # with open(output, 'w') as f_out: | |
| 133 # f_out.write("This resource couldn't be loaded (invalid url)\n") | |
| 134 # f_out.write("Line %d: %s" % (samples_list.index(s) + 1, s)) | |
| 135 # stop_err("Some URLs are not valid.\nCheck the output file for details.") | |
| 136 | |
| 137 content.update(data_files=samples_list) | |
| 138 | |
| 139 result = post(url, content, user=user, params=params) | |
| 140 | |
| 141 #Return the list of updated samples | |
| 142 list_imported(result, output) | |
| 143 | |
| 144 #Return the updated list of samples | |
| 145 list_datasets(user, updatedDsList) | |
| 146 | |
| 147 | |
| 148 def upload_samples(user, output, dataset, schema, samples, updatedDsList): | |
| 149 """Upload a dataset from the local instance""" | |
| 150 | |
| 151 | |
| 152 #Compose the url for the REST call | |
| 153 call = 'upload_data' | |
| 154 url = compose_url(module, call) | |
| 155 url = url.format(datasetName=dataset) | |
| 156 | |
| 157 #Files dict for payload | |
| 158 files = dict () | |
| 159 | |
| 160 # If the schema type is give, add the option to the url. | |
| 161 params = dict () | |
| 162 | |
| 163 | |
| 164 if schema in ['bed','bedGraph','NarrowPeak','BroadPeak','vcf'] : | |
| 165 params = add_url_param(params, module, call, schema) | |
| 166 else : | |
| 167 # Add the schema given to the payload dictionary | |
| 168 files.update({'schema' : ('{ds}.xml'.format(ds=dataset), open(schema, 'rb'))}) | |
| 169 | |
| 170 # Read samples file path and add them to the form object | |
| 171 # The structure is | |
| 172 # FILENAME PATH | |
| 173 | |
| 174 with open(samples, "r") as file: | |
| 175 s = map(lambda x: x.rstrip('\n').split('\t'), file) | |
| 176 s = [x for x in s if len(x)>1] | |
| 177 #s = [x for x in s if x[0].__contains__('.dat')] | |
| 178 map(lambda x: files.update({'file%d' % (s.index(x) + 1) : (x[0], open(x[1], 'rb'))}), s) | |
| 179 | |
| 180 # Post call | |
| 181 | |
| 182 result = post(url, files, user=user, params=params, content_type='multiform') | |
| 183 | |
| 184 #Return the list of updated samples | |
| 185 list_imported(result, output) | |
| 186 | |
| 187 | |
| 188 #Return the updated list of samples | |
| 189 list_datasets(user, updatedDsList) | |
| 190 | |
| 191 | |
| 192 | |
| 193 def list_imported(result, output) : | |
| 194 """When uploading a ds, the server returns a json object describing what has been imported | |
| 195 INPUT JSON FIELDS - | |
| 196 imported: samples imported with their metadata | |
| 197 autoMetadata: samples imported without metadata (those have been auto generated) """ | |
| 198 | |
| 199 samples = list () | |
| 200 | |
| 201 if 'imported' in result : | |
| 202 imported = result.get('imported') | |
| 203 if imported: | |
| 204 samples.append(result.get('imported')) | |
| 205 if 'autoMetadata' in result : | |
| 206 am = result.get('autoMetadata') | |
| 207 if am : | |
| 208 samples.append(result.get('autoMetadata')) | |
| 209 | |
| 210 | |
| 211 with open(output, 'w') as f_out: | |
| 212 for l in samples: | |
| 213 for s in l : | |
| 214 if 'id' in s and s['id']: | |
| 215 id = s['id'] | |
| 216 else : | |
| 217 id = l.index(s) + 1 | |
| 218 if 'path' in s : | |
| 219 ext = s['path'].rsplit('.')[1] | |
| 220 else : | |
| 221 n = s['name'].rsplit('.') | |
| 222 if n.__len__()>1 : | |
| 223 ext = n[1] | |
| 224 else : | |
| 225 ext = '' | |
| 226 | |
| 227 name = s['name'] | |
| 228 | |
| 229 f_out.write("{id}\t{name}\t{ext}\n".format(id=id, name=name,ext=ext)) | |
| 230 | |
| 231 | |
| 232 def download_samples(user, output, dataset): | |
| 233 """ Download the samples of the given dataset in form of a compressed zip archive.""" | |
| 234 | |
| 235 call = 'download_zip' | |
| 236 | |
| 237 url = compose_url(module, call) | |
| 238 url = url.format(datasetName=dataset) | |
| 239 | |
| 240 # Fetch the archive. | |
| 241 | |
| 242 data = get(url, user=user, response_type='zip') | |
| 243 | |
| 244 with open(output, 'wb') as fd: | |
| 245 for chunk in data.iter_content(chunk_size=128): | |
| 246 fd.write(chunk) | |
| 247 | |
| 248 def get_sample(user, output, dataset, name): | |
| 249 """Retrieve a sample given its name and the dataset it belongs to""" | |
| 250 | |
| 251 call = 'download_sample' | |
| 252 | |
| 253 url = compose_url(module,call) | |
| 254 url = url.format(datasetName=dataset,sample=name) | |
| 255 | |
| 256 data = get(url, user=user, response_type='file') | |
| 257 | |
| 258 with open(output, 'wb') as fd: | |
| 259 for chunk in data.iter_content(chunk_size=128): | |
| 260 fd.write(chunk) | |
| 261 | |
| 262 | |
| 263 def get_sample_meta(user, output, dataset, name): | |
| 264 """Retrieve a sample metadata given its name and the dataset it belongs to""" | |
| 265 | |
| 266 call = 'download_meta' | |
| 267 | |
| 268 url = compose_url(module, call) | |
| 269 url = url.format(datasetName=dataset, sample=name) | |
| 270 | |
| 271 data = get(url, user=user, response_type='file') | |
| 272 | |
| 273 with open(output, 'wb') as fd: | |
| 274 for chunk in data.iter_content(chunk_size=128): | |
| 275 fd.write(chunk) | |
| 276 | |
| 277 def import_samples(user, ds) : | |
| 278 | |
| 279 | |
| 280 # Retrieve the list of the samples in the resulting dataset | |
| 281 # The list is stored in a temporary file | |
| 282 temp = tempfile.NamedTemporaryFile(delete=False) | |
| 283 list_samples(user, temp.name, ds) | |
| 284 | |
| 285 # Retrieve names and extensions of the samples | |
| 286 with open(temp.name, "r") as t: | |
| 287 samples = map(lambda x: helper_samples(x), t) | |
| 288 t.close() | |
| 289 | |
| 290 os.makedirs('samples') | |
| 291 os.makedirs('metadata') | |
| 292 | |
| 293 # Create a new dict containing names and actual path to files | |
| 294 | |
| 295 for s in samples: | |
| 296 | |
| 297 # Get the sample | |
| 298 get_sample(user, "samples/{name}.{ext}".format(name=s['name'].replace('_',''), ext=s['ext']), ds, s['name']) | |
| 299 | |
| 300 # Get its metadata | |
| 301 get_sample_meta(user,"metadata/{name}.meta".format(name=s['name'].replace('_','')),ds,s['name']) | |
| 302 | |
| 303 def helper_samples(s): | |
| 304 """From a list of samples retrieve name and extension""" | |
| 305 split = s.split('\t') | |
| 306 sample = dict() | |
| 307 sample.update(name=split[1]) | |
| 308 sample.update(ext=split[2].rstrip('\n')) | |
| 309 | |
| 310 return sample | |
| 311 | |
| 312 | |
| 313 def get_schema(user, ds, file) : | |
| 314 """Get the schema field of the input dataset and save it in file""" | |
| 315 | |
| 316 call = "schema" | |
| 317 | |
| 318 url = compose_url(module, call) | |
| 319 | |
| 320 # Check if the ds is public or not | |
| 321 owner = '' | |
| 322 for d in list_datasets(user, '', False): | |
| 323 if d['name'] == ds : | |
| 324 owner = d['owner'] | |
| 325 | |
| 326 if (owner=='public'): | |
| 327 url = url.format(datasetName='public.'+ ds) | |
| 328 else : | |
| 329 url = url.format(datasetName=ds) | |
| 330 | |
| 331 schema = get(url, user=user) | |
| 332 | |
| 333 | |
| 334 with open(file,'w') as f_out: | |
| 335 for f in schema['fields'] : | |
| 336 f_out.write('{field}\t{type}\n'.format(field=f['name'],type=f['type'])) | |
| 337 | |
| 338 | |
| 339 | |
| 340 def set_columns_names(user, ds_name, samples_file, schema_file): | |
| 341 | |
| 342 get_schema(user,ds_name, schema_file) | |
| 343 | |
| 344 cwd = os.getcwd().rsplit('/',1)[0] | |
| 345 file = '/'.join([cwd, 'galaxy.json']) | |
| 346 | |
| 347 with open(schema_file, 'r') as f_in: | |
| 348 columns = [x.split('\t') for x in f_in] | |
| 349 column_names = [x[0] for x in columns] | |
| 350 column_types = [x[1].rstrip('\n') for x in columns] | |
| 351 | |
| 352 metadata = dict() | |
| 353 metadata.update(column_names=column_names, | |
| 354 column_types=column_types) | |
| 355 | |
| 356 | |
| 357 with open(file, 'w') as f_out: | |
| 358 with open(samples_file, 'r') as f_in: | |
| 359 samples_list = map(lambda x: x, f_in) | |
| 360 samples_list.pop() | |
| 361 for s in samples_list: | |
| 362 config = dict() | |
| 363 config.update(type='new_primary_dataset', | |
| 364 filename=s, | |
| 365 metadata=metadata) | |
| 366 f_out.write(json.dumps(config) + '\n') | |
| 367 | |
| 368 | |
| 369 | |
| 370 def stop_err(msg): | |
| 371 sys.stderr.write("%s\n" % msg) | |
| 372 sys.exit() | |
| 373 | |
| 374 | |
| 375 def __main__(): | |
| 376 | |
| 377 parser = argparse.ArgumentParser() | |
| 378 parser.add_argument("output") | |
| 379 parser.add_argument("-opt_out1") | |
| 380 parser.add_argument("-user") | |
| 381 parser.add_argument("-cmd") | |
| 382 parser.add_argument("-samples") | |
| 383 parser.add_argument("-dataset") | |
| 384 parser.add_argument("-new_name") | |
| 385 parser.add_argument("-schema") | |
| 386 parser.add_argument("-add_output") | |
| 387 | |
| 388 args = parser.parse_args() | |
| 389 | |
| 390 if args.cmd == 'list': | |
| 391 list_datasets(args.user, args.output) | |
| 392 if args.cmd == 'samples': | |
| 393 list_samples(args.user, args.output, args.dataset) | |
| 394 if args.cmd == 'rename' : | |
| 395 rename_dataset(args.user, args.output, args.dataset, args.new_name) | |
| 396 if args.cmd == 'delete': | |
| 397 delete_dataset(args.user, args.output, args.dataset) | |
| 398 if args.cmd == 'upload_url': | |
| 399 upload_samples_url(args.user, args.output, args.dataset, args.schema, args.samples, args.add_output) | |
| 400 if args.cmd == 'upload' : | |
| 401 upload_samples(args.user, args.output, args.dataset, args.schema, args.samples, args.add_output) | |
| 402 if args.cmd == 'import': | |
| 403 import_samples(args.user, args.dataset) | |
| 404 if args.cmd == 'download' : | |
| 405 download_samples(args.user,args.output,args.dataset) | |
| 406 if args.cmd == 'schema' : | |
| 407 set_columns_names(args.user, args.dataset, args.samples, args.output) | |
| 408 | |
| 409 | |
| 410 if __name__ == "__main__": | |
| 411 __main__() |
