Mercurial > repos > geco-team > gmql_upload
view gmql_rest_datasets.py @ 0:078d77023c34 draft default tip
planemo upload for repository https://github.com/lu-brn/gmql-galaxy commit 953ee36ceda5814dc9baa03427bc0eb4ee2e93bd-dirty
author | geco-team |
---|---|
date | Tue, 26 Jun 2018 08:59:19 -0400 |
parents | |
children |
line wrap: on
line source
# Galaxy plugin to REST access to the GMQL services # (Datasets) # ---------------------------------------------------------------------------- # Luana Brancato, luana.brancato@mail.polimi.it # ---------------------------------------------------------------------------- import argparse import tempfile import json from utilities import * module = 'repository' def list_datasets(user, output, saveResult=True): """Retrieve the list of available datasets""" call = 'list_datasets' url = compose_url(module,call) datasets = get(url, user=user) list_datasets = datasets['datasets'] if saveResult: with open(output,'w') as f: for ds in list_datasets: f.write("{name}\t{owner}\n".format(name=ds['name'],owner=ds['owner'])) f.close() else: return list_datasets def list_samples(user, output, ds): """List the samples of a given dataset""" call = 'list_samples' url = compose_url(module,call) # Specify for which dataset. # If it's a public dataset, the 'public.' prefix must be added to the dataset name # Check if the ds is public or not owner = '' for d in list_datasets(user, '', False): if d['name'] == ds : owner = d['owner'] if (owner=='public'): url = url.format(datasetName='public.'+ ds) else : url = url.format(datasetName=ds) samples = get(url, user=user) list_s = samples['samples'] with open(output, 'w') as f_out: for s in list_s: f_out.write("{id}\t{name}\t{ext}\n".format(id=s['id'], name=s['name'],ext=s['path'].rsplit('.',1)[1])) def rename_dataset(user, output, ds, new): """Rename a dataset from the user's private space""" call = 'rename_dataset' url = compose_url(module,call) url = url.format(datasetName=ds, newDatasetName=new) outcome = get(url, user=user) # Return the updated list of user's datasets list_datasets(user, output) # Write on stdout the operation outcome sys.stdout.write("Rename: {result}".format(result=outcome['result'])) def delete_dataset(user, output, ds): """Delete a dataset from the user's private space""" call = 'delete_dataset' url = compose_url(module,call) url = url.format(datasetName=ds) outcome = delete(url, user=user) #Return the updated list of user's datasets list_datasets(user, output) #Write on stdout the operation outcome sys.stdout.write("Delete: {result}".format(result=outcome['result'])) def upload_samples_url(user, output, dataset, schema, samples, updatedDsList): """Upload a dataset given the urls of the samples and their schema""" #Compose the url for the REST call call = 'upload_url' url = compose_url(module,call) url = url.format(datasetName=dataset) content = dict() # Put back escaped '&' samples = samples.replace('__amp__', '&') schema = schema.replace('__amp__', '&') # If schema type is given, add the option to the url. Otherwise, it check if the provided schema is a valid url. params = dict () if schema in ['bed','bedGraph','NarrowPeak','BroadPeak','vcf'] : params = add_url_param(params, module, call, schema) else: #check_schema = validators.url(schema) #if isinstance(check_schema, validators.utils.ValidationFailure): stop_err("Schema URL not valid") content.update(schema_file=schema) # Samples are listed one per line. It lists them looking for the new line marker ('__cn__') samples_list = samples.split('__cn__') # The regexp in input can allow a final empty string. The following removes it if present. if not samples_list[-1]: samples_list.remove("") # # For each sample url, check if it is valid. If at least ones is not, upload fails # # and which one is saved in the outcome. # for s in samples_list: # check_url = validators.url(s) # if isinstance(check_url, validators.utils.ValidationFailure): # with open(output, 'w') as f_out: # f_out.write("This resource couldn't be loaded (invalid url)\n") # f_out.write("Line %d: %s" % (samples_list.index(s) + 1, s)) # stop_err("Some URLs are not valid.\nCheck the output file for details.") content.update(data_files=samples_list) result = post(url, content, user=user, params=params) #Return the list of updated samples list_imported(result, output) #Return the updated list of samples list_datasets(user, updatedDsList) def upload_samples(user, output, dataset, schema, samples, updatedDsList): """Upload a dataset from the local instance""" #Compose the url for the REST call call = 'upload_data' url = compose_url(module, call) url = url.format(datasetName=dataset) #Files dict for payload files = dict () # If the schema type is give, add the option to the url. params = dict () if schema in ['bed','bedGraph','NarrowPeak','BroadPeak','vcf'] : params = add_url_param(params, module, call, schema) else : # Add the schema given to the payload dictionary files.update({'schema' : ('{ds}.xml'.format(ds=dataset), open(schema, 'rb'))}) # Read samples file path and add them to the form object # The structure is # FILENAME PATH with open(samples, "r") as file: s = map(lambda x: x.rstrip('\n').split('\t'), file) s = [x for x in s if len(x)>1] #s = [x for x in s if x[0].__contains__('.dat')] map(lambda x: files.update({'file%d' % (s.index(x) + 1) : (x[0], open(x[1], 'rb'))}), s) # Post call result = post(url, files, user=user, params=params, content_type='multiform') #Return the list of updated samples list_imported(result, output) #Return the updated list of samples list_datasets(user, updatedDsList) def list_imported(result, output) : """When uploading a ds, the server returns a json object describing what has been imported INPUT JSON FIELDS - imported: samples imported with their metadata autoMetadata: samples imported without metadata (those have been auto generated) """ samples = list () if 'imported' in result : imported = result.get('imported') if imported: samples.append(result.get('imported')) if 'autoMetadata' in result : am = result.get('autoMetadata') if am : samples.append(result.get('autoMetadata')) with open(output, 'w') as f_out: for l in samples: for s in l : if 'id' in s and s['id']: id = s['id'] else : id = l.index(s) + 1 if 'path' in s : ext = s['path'].rsplit('.')[1] else : n = s['name'].rsplit('.') if n.__len__()>1 : ext = n[1] else : ext = '' name = s['name'] f_out.write("{id}\t{name}\t{ext}\n".format(id=id, name=name,ext=ext)) def download_samples(user, output, dataset): """ Download the samples of the given dataset in form of a compressed zip archive.""" call = 'download_zip' url = compose_url(module, call) url = url.format(datasetName=dataset) # Fetch the archive. data = get(url, user=user, response_type='zip') with open(output, 'wb') as fd: for chunk in data.iter_content(chunk_size=128): fd.write(chunk) def get_sample(user, output, dataset, name): """Retrieve a sample given its name and the dataset it belongs to""" call = 'download_sample' url = compose_url(module,call) url = url.format(datasetName=dataset,sample=name) data = get(url, user=user, response_type='file') with open(output, 'wb') as fd: for chunk in data.iter_content(chunk_size=128): fd.write(chunk) def get_sample_meta(user, output, dataset, name): """Retrieve a sample metadata given its name and the dataset it belongs to""" call = 'download_meta' url = compose_url(module, call) url = url.format(datasetName=dataset, sample=name) data = get(url, user=user, response_type='file') with open(output, 'wb') as fd: for chunk in data.iter_content(chunk_size=128): fd.write(chunk) def import_samples(user, ds) : # Retrieve the list of the samples in the resulting dataset # The list is stored in a temporary file temp = tempfile.NamedTemporaryFile(delete=False) list_samples(user, temp.name, ds) # Retrieve names and extensions of the samples with open(temp.name, "r") as t: samples = map(lambda x: helper_samples(x), t) t.close() os.makedirs('samples') os.makedirs('metadata') # Create a new dict containing names and actual path to files for s in samples: # Get the sample get_sample(user, "samples/{name}.{ext}".format(name=s['name'].replace('_',''), ext=s['ext']), ds, s['name']) # Get its metadata get_sample_meta(user,"metadata/{name}.meta".format(name=s['name'].replace('_','')),ds,s['name']) def helper_samples(s): """From a list of samples retrieve name and extension""" split = s.split('\t') sample = dict() sample.update(name=split[1]) sample.update(ext=split[2].rstrip('\n')) return sample def get_schema(user, ds, file) : """Get the schema field of the input dataset and save it in file""" call = "schema" url = compose_url(module, call) # Check if the ds is public or not owner = '' for d in list_datasets(user, '', False): if d['name'] == ds : owner = d['owner'] if (owner=='public'): url = url.format(datasetName='public.'+ ds) else : url = url.format(datasetName=ds) schema = get(url, user=user) with open(file,'w') as f_out: for f in schema['fields'] : f_out.write('{field}\t{type}\n'.format(field=f['name'],type=f['type'])) def set_columns_names(user, ds_name, samples_file, schema_file): get_schema(user,ds_name, schema_file) cwd = os.getcwd().rsplit('/',1)[0] file = '/'.join([cwd, 'galaxy.json']) with open(schema_file, 'r') as f_in: columns = [x.split('\t') for x in f_in] column_names = [x[0] for x in columns] column_types = [x[1].rstrip('\n') for x in columns] metadata = dict() metadata.update(column_names=column_names, column_types=column_types) with open(file, 'w') as f_out: with open(samples_file, 'r') as f_in: samples_list = map(lambda x: x, f_in) samples_list.pop() for s in samples_list: config = dict() config.update(type='new_primary_dataset', filename=s, metadata=metadata) f_out.write(json.dumps(config) + '\n') def stop_err(msg): sys.stderr.write("%s\n" % msg) sys.exit() def __main__(): parser = argparse.ArgumentParser() parser.add_argument("output") parser.add_argument("-opt_out1") parser.add_argument("-user") parser.add_argument("-cmd") parser.add_argument("-samples") parser.add_argument("-dataset") parser.add_argument("-new_name") parser.add_argument("-schema") parser.add_argument("-add_output") args = parser.parse_args() if args.cmd == 'list': list_datasets(args.user, args.output) if args.cmd == 'samples': list_samples(args.user, args.output, args.dataset) if args.cmd == 'rename' : rename_dataset(args.user, args.output, args.dataset, args.new_name) if args.cmd == 'delete': delete_dataset(args.user, args.output, args.dataset) if args.cmd == 'upload_url': upload_samples_url(args.user, args.output, args.dataset, args.schema, args.samples, args.add_output) if args.cmd == 'upload' : upload_samples(args.user, args.output, args.dataset, args.schema, args.samples, args.add_output) if args.cmd == 'import': import_samples(args.user, args.dataset) if args.cmd == 'download' : download_samples(args.user,args.output,args.dataset) if args.cmd == 'schema' : set_columns_names(args.user, args.dataset, args.samples, args.output) if __name__ == "__main__": __main__()