diff gmql_rest_datasets.py @ 0:c74a1c7121ec draft default tip

planemo upload for repository https://github.com/lu-brn/gmql-galaxy commit 953ee36ceda5814dc9baa03427bc0eb4ee2e93bd-dirty
author geco-team
date Tue, 26 Jun 2018 08:59:49 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/gmql_rest_datasets.py	Tue Jun 26 08:59:49 2018 -0400
@@ -0,0 +1,411 @@
+# Galaxy plugin to REST access to the GMQL services
+# (Datasets)
+# ----------------------------------------------------------------------------
+# Luana Brancato, luana.brancato@mail.polimi.it
+# ----------------------------------------------------------------------------
+
+import argparse
+
+import tempfile
+import json
+from utilities import *
+
+module = 'repository'
+
+
+def list_datasets(user, output, saveResult=True):
+    """Retrieve the list of available datasets"""
+
+    call = 'list_datasets'
+    url = compose_url(module,call)
+
+    datasets = get(url, user=user)
+    list_datasets = datasets['datasets']
+
+    if saveResult:
+        with open(output,'w') as f:
+            for ds in list_datasets:
+                f.write("{name}\t{owner}\n".format(name=ds['name'],owner=ds['owner']))
+        f.close()
+    else:
+        return list_datasets
+
+
+def list_samples(user, output, ds):
+    """List the samples of a given dataset"""
+
+    call = 'list_samples'
+    url = compose_url(module,call)
+
+    # Specify for which dataset.
+    # If it's a public dataset, the 'public.' prefix must be added to the dataset name
+
+    # Check if the ds is public or not
+    owner = ''
+    for d in list_datasets(user, '', False):
+        if d['name'] == ds :
+            owner = d['owner']
+
+    if (owner=='public'):
+        url = url.format(datasetName='public.'+ ds)
+    else :
+        url = url.format(datasetName=ds)
+
+    samples = get(url, user=user)
+    list_s = samples['samples']
+
+    with open(output, 'w') as f_out:
+        for s in list_s:
+            f_out.write("{id}\t{name}\t{ext}\n".format(id=s['id'], name=s['name'],ext=s['path'].rsplit('.',1)[1]))
+
+
+def rename_dataset(user, output, ds, new):
+    """Rename a dataset from the user's private space"""
+
+    call = 'rename_dataset'
+    url = compose_url(module,call)
+    url = url.format(datasetName=ds, newDatasetName=new)
+
+    outcome = get(url, user=user)
+
+    # Return the updated list of user's datasets
+    list_datasets(user, output)
+
+    # Write on stdout the operation outcome
+    sys.stdout.write("Rename: {result}".format(result=outcome['result']))
+
+
+def delete_dataset(user, output, ds):
+    """Delete a dataset from the user's private space"""
+
+    call = 'delete_dataset'
+    url = compose_url(module,call)
+    url = url.format(datasetName=ds)
+
+    outcome = delete(url, user=user)
+
+    #Return the updated list of user's datasets
+    list_datasets(user, output)
+
+    #Write on stdout the operation outcome
+    sys.stdout.write("Delete: {result}".format(result=outcome['result']))
+
+
+def upload_samples_url(user, output, dataset, schema, samples, updatedDsList):
+    """Upload a dataset given the urls of the samples and their schema"""
+
+    #Compose the url for the REST call
+    call = 'upload_url'
+    url = compose_url(module,call)
+    url = url.format(datasetName=dataset)
+
+    content = dict()
+
+    # Put back escaped '&'
+    samples = samples.replace('__amp__', '&')
+    schema = schema.replace('__amp__', '&')
+
+    # If schema type is given, add the option to the url. Otherwise, it check if the provided schema is a valid url.
+
+    params = dict ()
+
+    if schema in ['bed','bedGraph','NarrowPeak','BroadPeak','vcf'] :
+        params = add_url_param(params, module, call, schema)
+    else:
+        #check_schema = validators.url(schema)
+        #if isinstance(check_schema, validators.utils.ValidationFailure): stop_err("Schema URL not valid")
+        content.update(schema_file=schema)
+
+
+    # Samples are listed one per line. It lists them looking for the new line marker ('__cn__')
+    samples_list = samples.split('__cn__')
+
+    # The regexp in input can allow a final empty string. The following removes it if present.
+    if not samples_list[-1]:
+        samples_list.remove("")
+
+    # # For each sample url, check if it is valid. If at least ones is not, upload fails
+    # # and which one is saved in the outcome.
+    # for s in samples_list:
+    #     check_url = validators.url(s)
+    #     if isinstance(check_url, validators.utils.ValidationFailure):
+    #         with open(output, 'w') as f_out:
+    #             f_out.write("This resource couldn't be loaded (invalid url)\n")
+    #             f_out.write("Line %d: %s" % (samples_list.index(s) + 1, s))
+    #         stop_err("Some URLs are not valid.\nCheck the output file for details.")
+
+    content.update(data_files=samples_list)
+
+    result = post(url, content, user=user, params=params)
+
+    #Return the list of updated samples
+    list_imported(result, output)
+
+    #Return the updated list of samples
+    list_datasets(user, updatedDsList)
+
+
+def upload_samples(user, output, dataset, schema, samples, updatedDsList):
+    """Upload a dataset from the local instance"""
+
+
+    #Compose the url for the REST call
+    call = 'upload_data'
+    url = compose_url(module, call)
+    url = url.format(datasetName=dataset)
+
+    #Files dict for payload
+    files = dict ()
+
+    # If the schema type is give, add the option to the url.
+    params = dict ()
+
+
+    if schema in ['bed','bedGraph','NarrowPeak','BroadPeak','vcf'] :
+        params = add_url_param(params, module, call, schema)
+    else :
+        # Add the schema given to the payload dictionary
+        files.update({'schema' : ('{ds}.xml'.format(ds=dataset), open(schema, 'rb'))})
+
+    # Read samples file path and add them to the form object
+    # The structure is
+    #   FILENAME    PATH
+
+    with open(samples, "r") as file:
+        s = map(lambda x: x.rstrip('\n').split('\t'), file)
+        s = [x for x in s if len(x)>1]
+        #s = [x for x in s if x[0].__contains__('.dat')]
+        map(lambda x: files.update({'file%d' % (s.index(x) + 1) : (x[0], open(x[1], 'rb'))}), s)
+
+    # Post call
+
+    result = post(url, files, user=user, params=params, content_type='multiform')
+
+    #Return the list of updated samples
+    list_imported(result, output)
+
+
+    #Return the updated list of samples
+    list_datasets(user, updatedDsList)
+
+
+
+def list_imported(result, output) :
+    """When uploading a ds, the server returns a json object describing what has been imported
+    INPUT JSON FIELDS -
+    imported: samples imported with their metadata
+    autoMetadata: samples imported without metadata (those have been auto generated) """
+
+    samples = list ()
+
+    if 'imported' in result :
+        imported = result.get('imported')
+        if imported:
+            samples.append(result.get('imported'))
+    if 'autoMetadata' in result :
+        am = result.get('autoMetadata')
+        if am :
+            samples.append(result.get('autoMetadata'))
+
+
+    with open(output, 'w') as f_out:
+        for l in samples:
+            for s in l :
+                if 'id' in s and s['id']:
+                    id = s['id']
+                else :
+                    id = l.index(s) + 1
+                if 'path' in s :
+                    ext = s['path'].rsplit('.')[1]
+                else :
+                    n = s['name'].rsplit('.')
+                    if n.__len__()>1 :
+                        ext = n[1]
+                    else :
+                        ext = ''
+
+                name = s['name']
+
+                f_out.write("{id}\t{name}\t{ext}\n".format(id=id, name=name,ext=ext))
+
+
+def download_samples(user, output, dataset):
+    """ Download the samples of the given dataset in form of a compressed zip archive."""
+
+    call = 'download_zip'
+
+    url = compose_url(module, call)
+    url = url.format(datasetName=dataset)
+
+    # Fetch the archive.
+
+    data = get(url, user=user, response_type='zip')
+
+    with open(output, 'wb') as fd:
+        for chunk in data.iter_content(chunk_size=128):
+            fd.write(chunk)
+
+def get_sample(user, output, dataset, name):
+    """Retrieve a sample given its name and the dataset it belongs to"""
+
+    call = 'download_sample'
+
+    url = compose_url(module,call)
+    url = url.format(datasetName=dataset,sample=name)
+
+    data = get(url, user=user, response_type='file')
+
+    with open(output, 'wb') as fd:
+        for chunk in data.iter_content(chunk_size=128):
+            fd.write(chunk)
+
+
+def get_sample_meta(user, output, dataset, name):
+    """Retrieve a sample metadata given its name and the dataset it belongs to"""
+
+    call = 'download_meta'
+
+    url = compose_url(module, call)
+    url = url.format(datasetName=dataset, sample=name)
+
+    data = get(url, user=user, response_type='file')
+
+    with open(output, 'wb') as fd:
+        for chunk in data.iter_content(chunk_size=128):
+            fd.write(chunk)
+
+def import_samples(user, ds) :
+
+
+    # Retrieve the list of the samples in the resulting dataset
+    # The list is stored in a temporary file
+    temp = tempfile.NamedTemporaryFile(delete=False)
+    list_samples(user, temp.name, ds)
+
+    # Retrieve names and extensions of the samples
+    with open(temp.name, "r") as t:
+        samples = map(lambda x: helper_samples(x), t)
+    t.close()
+
+    os.makedirs('samples')
+    os.makedirs('metadata')
+
+    # Create a new dict containing names and actual path to files
+
+    for s in samples:
+
+        # Get the sample
+        get_sample(user, "samples/{name}.{ext}".format(name=s['name'].replace('_',''), ext=s['ext']), ds, s['name'])
+
+        # Get its metadata
+        get_sample_meta(user,"metadata/{name}.meta".format(name=s['name'].replace('_','')),ds,s['name'])
+
+def helper_samples(s):
+    """From a list of samples retrieve name and extension"""
+    split = s.split('\t')
+    sample = dict()
+    sample.update(name=split[1])
+    sample.update(ext=split[2].rstrip('\n'))
+
+    return sample
+
+
+def get_schema(user, ds, file) :
+    """Get the schema field of the input dataset and save it in file"""
+
+    call = "schema"
+
+    url = compose_url(module, call)
+
+    # Check if the ds is public or not
+    owner = ''
+    for d in list_datasets(user, '', False):
+        if d['name'] == ds :
+            owner = d['owner']
+
+    if (owner=='public'):
+        url = url.format(datasetName='public.'+ ds)
+    else :
+        url = url.format(datasetName=ds)
+
+    schema = get(url, user=user)
+
+
+    with open(file,'w') as f_out:
+        for f in schema['fields'] :
+            f_out.write('{field}\t{type}\n'.format(field=f['name'],type=f['type']))
+
+
+
+def set_columns_names(user, ds_name, samples_file, schema_file):
+
+    get_schema(user,ds_name, schema_file)
+
+    cwd = os.getcwd().rsplit('/',1)[0]
+    file = '/'.join([cwd, 'galaxy.json'])
+
+    with open(schema_file, 'r') as f_in:
+        columns = [x.split('\t') for x in f_in]
+        column_names = [x[0] for x in columns]
+        column_types = [x[1].rstrip('\n') for x in columns]
+
+    metadata = dict()
+    metadata.update(column_names=column_names,
+                    column_types=column_types)
+
+
+    with open(file, 'w') as f_out:
+        with open(samples_file, 'r') as f_in:
+            samples_list = map(lambda x: x, f_in)
+            samples_list.pop()
+            for s in samples_list:
+                config = dict()
+                config.update(type='new_primary_dataset',
+                                  filename=s,
+                                  metadata=metadata)
+                f_out.write(json.dumps(config) + '\n')
+
+
+
+def stop_err(msg):
+    sys.stderr.write("%s\n" % msg)
+    sys.exit()
+
+
+def __main__():
+
+    parser = argparse.ArgumentParser()
+    parser.add_argument("output")
+    parser.add_argument("-opt_out1")
+    parser.add_argument("-user")
+    parser.add_argument("-cmd")
+    parser.add_argument("-samples")
+    parser.add_argument("-dataset")
+    parser.add_argument("-new_name")
+    parser.add_argument("-schema")
+    parser.add_argument("-add_output")
+
+    args = parser.parse_args()
+
+    if args.cmd == 'list':
+        list_datasets(args.user, args.output)
+    if args.cmd == 'samples':
+        list_samples(args.user, args.output, args.dataset)
+    if args.cmd == 'rename' :
+        rename_dataset(args.user, args.output, args.dataset, args.new_name)
+    if args.cmd == 'delete':
+        delete_dataset(args.user, args.output, args.dataset)
+    if args.cmd == 'upload_url':
+        upload_samples_url(args.user, args.output, args.dataset, args.schema, args.samples, args.add_output)
+    if args.cmd == 'upload' :
+        upload_samples(args.user, args.output, args.dataset, args.schema, args.samples, args.add_output)
+    if args.cmd == 'import':
+        import_samples(args.user, args.dataset)
+    if args.cmd == 'download' :
+        download_samples(args.user,args.output,args.dataset)
+    if args.cmd == 'schema' :
+        set_columns_names(args.user, args.dataset, args.samples, args.output)
+
+
+if __name__ == "__main__":
+    __main__()