Mercurial > repos > geco-team > gmql_upload
diff gmql_rest_datasets.py @ 0:078d77023c34 draft default tip
planemo upload for repository https://github.com/lu-brn/gmql-galaxy commit 953ee36ceda5814dc9baa03427bc0eb4ee2e93bd-dirty
author | geco-team |
---|---|
date | Tue, 26 Jun 2018 08:59:19 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gmql_rest_datasets.py Tue Jun 26 08:59:19 2018 -0400 @@ -0,0 +1,411 @@ +# Galaxy plugin to REST access to the GMQL services +# (Datasets) +# ---------------------------------------------------------------------------- +# Luana Brancato, luana.brancato@mail.polimi.it +# ---------------------------------------------------------------------------- + +import argparse + +import tempfile +import json +from utilities import * + +module = 'repository' + + +def list_datasets(user, output, saveResult=True): + """Retrieve the list of available datasets""" + + call = 'list_datasets' + url = compose_url(module,call) + + datasets = get(url, user=user) + list_datasets = datasets['datasets'] + + if saveResult: + with open(output,'w') as f: + for ds in list_datasets: + f.write("{name}\t{owner}\n".format(name=ds['name'],owner=ds['owner'])) + f.close() + else: + return list_datasets + + +def list_samples(user, output, ds): + """List the samples of a given dataset""" + + call = 'list_samples' + url = compose_url(module,call) + + # Specify for which dataset. + # If it's a public dataset, the 'public.' prefix must be added to the dataset name + + # Check if the ds is public or not + owner = '' + for d in list_datasets(user, '', False): + if d['name'] == ds : + owner = d['owner'] + + if (owner=='public'): + url = url.format(datasetName='public.'+ ds) + else : + url = url.format(datasetName=ds) + + samples = get(url, user=user) + list_s = samples['samples'] + + with open(output, 'w') as f_out: + for s in list_s: + f_out.write("{id}\t{name}\t{ext}\n".format(id=s['id'], name=s['name'],ext=s['path'].rsplit('.',1)[1])) + + +def rename_dataset(user, output, ds, new): + """Rename a dataset from the user's private space""" + + call = 'rename_dataset' + url = compose_url(module,call) + url = url.format(datasetName=ds, newDatasetName=new) + + outcome = get(url, user=user) + + # Return the updated list of user's datasets + list_datasets(user, output) + + # Write on stdout the operation outcome + sys.stdout.write("Rename: {result}".format(result=outcome['result'])) + + +def delete_dataset(user, output, ds): + """Delete a dataset from the user's private space""" + + call = 'delete_dataset' + url = compose_url(module,call) + url = url.format(datasetName=ds) + + outcome = delete(url, user=user) + + #Return the updated list of user's datasets + list_datasets(user, output) + + #Write on stdout the operation outcome + sys.stdout.write("Delete: {result}".format(result=outcome['result'])) + + +def upload_samples_url(user, output, dataset, schema, samples, updatedDsList): + """Upload a dataset given the urls of the samples and their schema""" + + #Compose the url for the REST call + call = 'upload_url' + url = compose_url(module,call) + url = url.format(datasetName=dataset) + + content = dict() + + # Put back escaped '&' + samples = samples.replace('__amp__', '&') + schema = schema.replace('__amp__', '&') + + # If schema type is given, add the option to the url. Otherwise, it check if the provided schema is a valid url. + + params = dict () + + if schema in ['bed','bedGraph','NarrowPeak','BroadPeak','vcf'] : + params = add_url_param(params, module, call, schema) + else: + #check_schema = validators.url(schema) + #if isinstance(check_schema, validators.utils.ValidationFailure): stop_err("Schema URL not valid") + content.update(schema_file=schema) + + + # Samples are listed one per line. It lists them looking for the new line marker ('__cn__') + samples_list = samples.split('__cn__') + + # The regexp in input can allow a final empty string. The following removes it if present. + if not samples_list[-1]: + samples_list.remove("") + + # # For each sample url, check if it is valid. If at least ones is not, upload fails + # # and which one is saved in the outcome. + # for s in samples_list: + # check_url = validators.url(s) + # if isinstance(check_url, validators.utils.ValidationFailure): + # with open(output, 'w') as f_out: + # f_out.write("This resource couldn't be loaded (invalid url)\n") + # f_out.write("Line %d: %s" % (samples_list.index(s) + 1, s)) + # stop_err("Some URLs are not valid.\nCheck the output file for details.") + + content.update(data_files=samples_list) + + result = post(url, content, user=user, params=params) + + #Return the list of updated samples + list_imported(result, output) + + #Return the updated list of samples + list_datasets(user, updatedDsList) + + +def upload_samples(user, output, dataset, schema, samples, updatedDsList): + """Upload a dataset from the local instance""" + + + #Compose the url for the REST call + call = 'upload_data' + url = compose_url(module, call) + url = url.format(datasetName=dataset) + + #Files dict for payload + files = dict () + + # If the schema type is give, add the option to the url. + params = dict () + + + if schema in ['bed','bedGraph','NarrowPeak','BroadPeak','vcf'] : + params = add_url_param(params, module, call, schema) + else : + # Add the schema given to the payload dictionary + files.update({'schema' : ('{ds}.xml'.format(ds=dataset), open(schema, 'rb'))}) + + # Read samples file path and add them to the form object + # The structure is + # FILENAME PATH + + with open(samples, "r") as file: + s = map(lambda x: x.rstrip('\n').split('\t'), file) + s = [x for x in s if len(x)>1] + #s = [x for x in s if x[0].__contains__('.dat')] + map(lambda x: files.update({'file%d' % (s.index(x) + 1) : (x[0], open(x[1], 'rb'))}), s) + + # Post call + + result = post(url, files, user=user, params=params, content_type='multiform') + + #Return the list of updated samples + list_imported(result, output) + + + #Return the updated list of samples + list_datasets(user, updatedDsList) + + + +def list_imported(result, output) : + """When uploading a ds, the server returns a json object describing what has been imported + INPUT JSON FIELDS - + imported: samples imported with their metadata + autoMetadata: samples imported without metadata (those have been auto generated) """ + + samples = list () + + if 'imported' in result : + imported = result.get('imported') + if imported: + samples.append(result.get('imported')) + if 'autoMetadata' in result : + am = result.get('autoMetadata') + if am : + samples.append(result.get('autoMetadata')) + + + with open(output, 'w') as f_out: + for l in samples: + for s in l : + if 'id' in s and s['id']: + id = s['id'] + else : + id = l.index(s) + 1 + if 'path' in s : + ext = s['path'].rsplit('.')[1] + else : + n = s['name'].rsplit('.') + if n.__len__()>1 : + ext = n[1] + else : + ext = '' + + name = s['name'] + + f_out.write("{id}\t{name}\t{ext}\n".format(id=id, name=name,ext=ext)) + + +def download_samples(user, output, dataset): + """ Download the samples of the given dataset in form of a compressed zip archive.""" + + call = 'download_zip' + + url = compose_url(module, call) + url = url.format(datasetName=dataset) + + # Fetch the archive. + + data = get(url, user=user, response_type='zip') + + with open(output, 'wb') as fd: + for chunk in data.iter_content(chunk_size=128): + fd.write(chunk) + +def get_sample(user, output, dataset, name): + """Retrieve a sample given its name and the dataset it belongs to""" + + call = 'download_sample' + + url = compose_url(module,call) + url = url.format(datasetName=dataset,sample=name) + + data = get(url, user=user, response_type='file') + + with open(output, 'wb') as fd: + for chunk in data.iter_content(chunk_size=128): + fd.write(chunk) + + +def get_sample_meta(user, output, dataset, name): + """Retrieve a sample metadata given its name and the dataset it belongs to""" + + call = 'download_meta' + + url = compose_url(module, call) + url = url.format(datasetName=dataset, sample=name) + + data = get(url, user=user, response_type='file') + + with open(output, 'wb') as fd: + for chunk in data.iter_content(chunk_size=128): + fd.write(chunk) + +def import_samples(user, ds) : + + + # Retrieve the list of the samples in the resulting dataset + # The list is stored in a temporary file + temp = tempfile.NamedTemporaryFile(delete=False) + list_samples(user, temp.name, ds) + + # Retrieve names and extensions of the samples + with open(temp.name, "r") as t: + samples = map(lambda x: helper_samples(x), t) + t.close() + + os.makedirs('samples') + os.makedirs('metadata') + + # Create a new dict containing names and actual path to files + + for s in samples: + + # Get the sample + get_sample(user, "samples/{name}.{ext}".format(name=s['name'].replace('_',''), ext=s['ext']), ds, s['name']) + + # Get its metadata + get_sample_meta(user,"metadata/{name}.meta".format(name=s['name'].replace('_','')),ds,s['name']) + +def helper_samples(s): + """From a list of samples retrieve name and extension""" + split = s.split('\t') + sample = dict() + sample.update(name=split[1]) + sample.update(ext=split[2].rstrip('\n')) + + return sample + + +def get_schema(user, ds, file) : + """Get the schema field of the input dataset and save it in file""" + + call = "schema" + + url = compose_url(module, call) + + # Check if the ds is public or not + owner = '' + for d in list_datasets(user, '', False): + if d['name'] == ds : + owner = d['owner'] + + if (owner=='public'): + url = url.format(datasetName='public.'+ ds) + else : + url = url.format(datasetName=ds) + + schema = get(url, user=user) + + + with open(file,'w') as f_out: + for f in schema['fields'] : + f_out.write('{field}\t{type}\n'.format(field=f['name'],type=f['type'])) + + + +def set_columns_names(user, ds_name, samples_file, schema_file): + + get_schema(user,ds_name, schema_file) + + cwd = os.getcwd().rsplit('/',1)[0] + file = '/'.join([cwd, 'galaxy.json']) + + with open(schema_file, 'r') as f_in: + columns = [x.split('\t') for x in f_in] + column_names = [x[0] for x in columns] + column_types = [x[1].rstrip('\n') for x in columns] + + metadata = dict() + metadata.update(column_names=column_names, + column_types=column_types) + + + with open(file, 'w') as f_out: + with open(samples_file, 'r') as f_in: + samples_list = map(lambda x: x, f_in) + samples_list.pop() + for s in samples_list: + config = dict() + config.update(type='new_primary_dataset', + filename=s, + metadata=metadata) + f_out.write(json.dumps(config) + '\n') + + + +def stop_err(msg): + sys.stderr.write("%s\n" % msg) + sys.exit() + + +def __main__(): + + parser = argparse.ArgumentParser() + parser.add_argument("output") + parser.add_argument("-opt_out1") + parser.add_argument("-user") + parser.add_argument("-cmd") + parser.add_argument("-samples") + parser.add_argument("-dataset") + parser.add_argument("-new_name") + parser.add_argument("-schema") + parser.add_argument("-add_output") + + args = parser.parse_args() + + if args.cmd == 'list': + list_datasets(args.user, args.output) + if args.cmd == 'samples': + list_samples(args.user, args.output, args.dataset) + if args.cmd == 'rename' : + rename_dataset(args.user, args.output, args.dataset, args.new_name) + if args.cmd == 'delete': + delete_dataset(args.user, args.output, args.dataset) + if args.cmd == 'upload_url': + upload_samples_url(args.user, args.output, args.dataset, args.schema, args.samples, args.add_output) + if args.cmd == 'upload' : + upload_samples(args.user, args.output, args.dataset, args.schema, args.samples, args.add_output) + if args.cmd == 'import': + import_samples(args.user, args.dataset) + if args.cmd == 'download' : + download_samples(args.user,args.output,args.dataset) + if args.cmd == 'schema' : + set_columns_names(args.user, args.dataset, args.samples, args.output) + + +if __name__ == "__main__": + __main__()