diff irods_upload.py @ 2:0641ea2f75b1 draft

"planemo upload commit b2a00d9c24285fef0fb131d1832ecf4c337e5038-dirty"
author rhohensinner
date Fri, 02 Jul 2021 09:40:25 +0000
parents
children d2be2eb8350f
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/irods_upload.py	Fri Jul 02 09:40:25 2021 +0000
@@ -0,0 +1,340 @@
+#!/usr/bin/env python
+# Processes uploads from the user.
+
+# WARNING: Changes in this tool (particularly as related to parsing) may need
+# to be reflected in galaxy.web.controllers.tool_runner and galaxy.tools
+from __future__ import print_function
+
+import errno
+import os
+import shutil
+import sys
+sys.path.append("/home/richard/galaxy/lib")
+from json import dump, load, loads
+
+from galaxy.datatypes import sniff
+from galaxy.datatypes.registry import Registry
+from galaxy.datatypes.upload_util import handle_upload, UploadProblemException
+from galaxy.util import (
+    bunch,
+    safe_makedirs,
+    unicodify
+)
+from galaxy.util.compression_utils import CompressedFile
+
+assert sys.version_info[:2] >= (2, 7)
+
+
+_file_sources = None
+
+
+def get_file_sources():
+    global _file_sources
+    if _file_sources is None:
+        from galaxy.files import ConfiguredFileSources
+        file_sources = None
+        if os.path.exists("file_sources.json"):
+            file_sources_as_dict = None
+            with open("file_sources.json") as f:
+                file_sources_as_dict = load(f)
+            if file_sources_as_dict is not None:
+                file_sources = ConfiguredFileSources.from_dict(file_sources_as_dict)
+        if file_sources is None:
+            ConfiguredFileSources.from_dict([])
+        _file_sources = file_sources
+    return _file_sources
+
+
+def file_err(msg, dataset):
+    # never remove a server-side upload
+    if dataset.type not in ('server_dir', 'path_paste'):
+        try:
+            os.remove(dataset.path)
+        except Exception:
+            pass
+    return dict(type='dataset',
+                ext='data',
+                dataset_id=dataset.dataset_id,
+                stderr=msg,
+                failed=True)
+
+
+def safe_dict(d):
+    """Recursively clone JSON structure with unicode dictionary keys."""
+    if isinstance(d, dict):
+        return {unicodify(k): safe_dict(v) for k, v in d.items()}
+    elif isinstance(d, list):
+        return [safe_dict(x) for x in d]
+    else:
+        return d
+
+
+def parse_outputs(args):
+    rval = {}
+    for arg in args:
+        id, files_path, path = arg.split(':', 2)
+        rval[int(id)] = (path, files_path)
+    return rval
+
+
+def add_file(dataset, registry, output_path):
+    ext = None
+    compression_type = None
+    line_count = None
+    link_data_only_str = dataset.get('link_data_only', 'copy_files')
+    if link_data_only_str not in ['link_to_files', 'copy_files']:
+        raise UploadProblemException("Invalid setting '%s' for option link_data_only - upload request misconfigured" % link_data_only_str)
+    link_data_only = link_data_only_str == 'link_to_files'
+
+    # run_as_real_user is estimated from galaxy config (external chmod indicated of inputs executed)
+    # If this is True we always purge supplied upload inputs so they are cleaned up and we reuse their
+    # paths during data conversions since this user already owns that path.
+    # Older in_place check for upload jobs created before 18.01, TODO remove in 19.XX. xref #5206
+    run_as_real_user = dataset.get('run_as_real_user', False) or dataset.get("in_place", False)
+
+    # purge_source defaults to True unless this is an FTP import and
+    # ftp_upload_purge has been overridden to False in Galaxy's config.
+    # We set purge_source to False if:
+    # - the job does not have write access to the file, e.g. when running as the
+    #   real user
+    # - the files are uploaded from external paths.
+    purge_source = dataset.get('purge_source', True) and not run_as_real_user and dataset.type not in ('server_dir', 'path_paste')
+
+    # in_place is True unless we are running as a real user or importing external paths (i.e.
+    # this is a real upload and not a path paste or ftp import).
+    # in_place should always be False if running as real user because the uploaded file will
+    # be owned by Galaxy and not the user and it should be False for external paths so Galaxy doesn't
+    # modify files not controlled by Galaxy.
+    in_place = not run_as_real_user and dataset.type not in ('server_dir', 'path_paste', 'ftp_import')
+
+    # Base on the check_upload_content Galaxy config option and on by default, this enables some
+    # security related checks on the uploaded content, but can prevent uploads from working in some cases.
+    check_content = dataset.get('check_content' , True)
+
+    # auto_decompress is a request flag that can be swapped off to prevent Galaxy from automatically
+    # decompressing archive files before sniffing.
+    auto_decompress = dataset.get('auto_decompress', True)
+    try:
+        dataset.file_type
+    except AttributeError:
+        raise UploadProblemException('Unable to process uploaded file, missing file_type parameter.')
+
+    if dataset.type == 'url':
+        try:
+            dataset.path = sniff.stream_url_to_file(dataset.path, file_sources=get_file_sources())
+        except Exception as e:
+            raise UploadProblemException('Unable to fetch %s\n%s' % (dataset.path, unicodify(e)))
+
+    # See if we have an empty file
+    if not os.path.exists(dataset.path):
+        raise UploadProblemException('Uploaded temporary file (%s) does not exist.' % dataset.path)
+
+    stdout, ext, datatype, is_binary, converted_path = handle_upload(
+        registry=registry,
+        path=dataset.path,
+        requested_ext=dataset.file_type,
+        name=dataset.name,
+        tmp_prefix='data_id_%s_upload_' % dataset.dataset_id,
+        tmp_dir=output_adjacent_tmpdir(output_path),
+        check_content=check_content,
+        link_data_only=link_data_only,
+        in_place=in_place,
+        auto_decompress=auto_decompress,
+        convert_to_posix_lines=dataset.to_posix_lines,
+        convert_spaces_to_tabs=dataset.space_to_tab,
+    )
+
+    # Strip compression extension from name
+    if compression_type and not getattr(datatype, 'compressed', False) and dataset.name.endswith('.' + compression_type):
+        dataset.name = dataset.name[:-len('.' + compression_type)]
+
+    # Move dataset
+    if link_data_only:
+        # Never alter a file that will not be copied to Galaxy's local file store.
+        if datatype.dataset_content_needs_grooming(dataset.path):
+            err_msg = 'The uploaded files need grooming, so change your <b>Copy data into Galaxy?</b> selection to be ' + \
+                '<b>Copy files into Galaxy</b> instead of <b>Link to files without copying into Galaxy</b> so grooming can be performed.'
+            raise UploadProblemException(err_msg)
+    if not link_data_only:
+        # Move the dataset to its "real" path. converted_path is a tempfile so we move it even if purge_source is False.
+        if purge_source or converted_path:
+            try:
+                # If user has indicated that the original file to be purged and have converted_path tempfile
+                if purge_source and converted_path:
+                    shutil.move(converted_path, output_path)
+                    os.remove(dataset.path)
+                else:
+                    shutil.move(converted_path or dataset.path, output_path)
+            except OSError as e:
+                # We may not have permission to remove the input
+                if e.errno != errno.EACCES:
+                    raise
+        else:
+            shutil.copy(dataset.path, output_path)
+
+    # Write the job info
+    stdout = stdout or 'uploaded %s file' % ext
+    info = dict(type='dataset',
+                dataset_id=dataset.dataset_id,
+                ext=ext,
+                stdout=stdout,
+                name=dataset.name,
+                line_count=line_count)
+    if dataset.get('uuid', None) is not None:
+        info['uuid'] = dataset.get('uuid')
+    # FIXME: does this belong here? also not output-adjacent-tmpdir aware =/
+    if not link_data_only and datatype and datatype.dataset_content_needs_grooming(output_path):
+        # Groom the dataset content if necessary
+        datatype.groom_dataset_content(output_path)
+    return info
+
+
+def add_composite_file(dataset, registry, output_path, files_path):
+    datatype = None
+
+    # Find data type
+    if dataset.file_type is not None:
+        datatype = registry.get_datatype_by_extension(dataset.file_type)
+
+    def to_path(path_or_url):
+        is_url = path_or_url.find('://') != -1  # todo fixme
+        if is_url:
+            try:
+                temp_name = sniff.stream_url_to_file(path_or_url, file_sources=get_file_sources())
+            except Exception as e:
+                raise UploadProblemException('Unable to fetch %s\n%s' % (path_or_url, unicodify(e)))
+
+            return temp_name, is_url
+
+        return path_or_url, is_url
+
+    def make_files_path():
+        safe_makedirs(files_path)
+
+    def stage_file(name, composite_file_path, is_binary=False):
+        dp = composite_file_path['path']
+        path, is_url = to_path(dp)
+        if is_url:
+            dataset.path = path
+            dp = path
+
+        auto_decompress = composite_file_path.get('auto_decompress', True)
+        if auto_decompress and not datatype.composite_type and CompressedFile.can_decompress(dp):
+            # It isn't an explicitly composite datatype, so these are just extra files to attach
+            # as composite data. It'd be better if Galaxy was communicating this to the tool
+            # a little more explicitly so we didn't need to dispatch on the datatype and so we
+            # could attach arbitrary extra composite data to an existing composite datatype if
+            # if need be? Perhaps that would be a mistake though.
+            CompressedFile(dp).extract(files_path)
+        else:
+            tmpdir = output_adjacent_tmpdir(output_path)
+            tmp_prefix = 'data_id_%s_convert_' % dataset.dataset_id
+            sniff.handle_composite_file(
+                datatype,
+                dp,
+                files_path,
+                name,
+                is_binary,
+                tmpdir,
+                tmp_prefix,
+                composite_file_path,
+            )
+
+    # Do we have pre-defined composite files from the datatype definition.
+    if dataset.composite_files:
+        make_files_path()
+        for name, value in dataset.composite_files.items():
+            value = bunch.Bunch(**value)
+            if value.name not in dataset.composite_file_paths:
+                raise UploadProblemException("Failed to find file_path %s in %s" % (value.name, dataset.composite_file_paths))
+            if dataset.composite_file_paths[value.name] is None and not value.optional:
+                raise UploadProblemException('A required composite data file was not provided (%s)' % name)
+            elif dataset.composite_file_paths[value.name] is not None:
+                composite_file_path = dataset.composite_file_paths[value.name]
+                stage_file(name, composite_file_path, value.is_binary)
+
+    # Do we have ad-hoc user supplied composite files.
+    elif dataset.composite_file_paths:
+        make_files_path()
+        for key, composite_file in dataset.composite_file_paths.items():
+            stage_file(key, composite_file)  # TODO: replace these defaults
+
+    # Move the dataset to its "real" path
+    primary_file_path, _ = to_path(dataset.primary_file)
+    shutil.move(primary_file_path, output_path)
+
+    # Write the job info
+    return dict(type='dataset',
+                dataset_id=dataset.dataset_id,
+                stdout='uploaded %s file' % dataset.file_type)
+
+
+def __read_paramfile(path):
+    with open(path) as fh:
+        obj = load(fh)
+    # If there's a single dataset in an old-style paramfile it'll still parse, but it'll be a dict
+    assert type(obj) == list
+    return obj
+
+
+def __read_old_paramfile(path):
+    datasets = []
+    with open(path) as fh:
+        for line in fh:
+            datasets.append(loads(line))
+    return datasets
+
+
+def __write_job_metadata(metadata):
+    # TODO: make upload/set_metadata compatible with https://github.com/galaxyproject/galaxy/pull/4437
+    with open('galaxy.json', 'w') as fh:
+        for meta in metadata:
+            dump(meta, fh)
+            fh.write('\n')
+
+
+def output_adjacent_tmpdir(output_path):
+    """ For temp files that will ultimately be moved to output_path anyway
+    just create the file directly in output_path's directory so shutil.move
+    will work optimally.
+    """
+    return os.path.dirname(output_path)
+
+
+def __main__():
+
+    if len(sys.argv) < 4:
+        print('usage: upload.py <root> <datatypes_conf> <json paramfile> <output spec> ...', file=sys.stderr)
+        sys.exit(1)
+
+    output_paths = parse_outputs(sys.argv[4:])
+
+    registry = Registry()
+    registry.load_datatypes(root_dir=sys.argv[1], config=sys.argv[2])
+
+    try:
+        datasets = __read_paramfile(sys.argv[3])
+    except (ValueError, AssertionError):
+        datasets = __read_old_paramfile(sys.argv[3])
+
+    metadata = []
+    for dataset in datasets:
+        dataset = bunch.Bunch(**safe_dict(dataset))
+        try:
+            output_path = output_paths[int(dataset.dataset_id)][0]
+        except Exception:
+            print('Output path for dataset %s not found on command line' % dataset.dataset_id, file=sys.stderr)
+            sys.exit(1)
+        try:
+            if dataset.type == 'composite':
+                files_path = output_paths[int(dataset.dataset_id)][1]
+                metadata.append(add_composite_file(dataset, registry, output_path, files_path))
+            else:
+                metadata.append(add_file(dataset, registry, output_path))
+        except UploadProblemException as e:
+            metadata.append(file_err(unicodify(e), dataset))
+    __write_job_metadata(metadata)
+
+
+if __name__ == '__main__':
+    __main__()