Mercurial > repos > ecology > aquainfra_importer
diff data_source.py @ 0:cc18c3bf2666 draft default tip
planemo upload for repository https://github.com/AquaINFRA/tools-ecology/tree/aquainfra_importer commit 2b586af4c987b6105356736f875d2517b25a8be6
author | ecology |
---|---|
date | Tue, 14 May 2024 20:10:29 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/data_source.py Tue May 14 20:10:29 2024 +0000 @@ -0,0 +1,119 @@ +#!/usr/bin/env python +# Retrieves data from external data source applications and +# stores in a dataset file. +# +# Data source application parameters are temporarily stored +# in the dataset file. +import json +import os +import sys +from urllib.parse import urlencode, urlparse +from urllib.request import urlopen + +from galaxy.datatypes import sniff +from galaxy.datatypes.registry import Registry +from galaxy.util import ( + DEFAULT_SOCKET_TIMEOUT, + get_charset_from_http_headers, + stream_to_open_named_file, +) + +GALAXY_PARAM_PREFIX = "GALAXY" +GALAXY_ROOT_DIR = os.path.realpath( + os.path.join(os.path.dirname(__file__), os.pardir, os.pardir) +) +GALAXY_DATATYPES_CONF_FILE = os.path.join( + GALAXY_ROOT_DIR, "datatypes_conf.xml" +) + + +def main(): + if len(sys.argv) >= 3: + max_file_size = int(sys.argv[2]) + else: + max_file_size = 0 + + with open(sys.argv[1]) as fh: + params = json.load(fh) + + out_data_name = params["output_data"][0]["out_data_name"] + + URL = params["param_dict"].get("URL", None) + URL_method = params["param_dict"].get("URL_method", "get") + + datatypes_registry = Registry() + datatypes_registry.load_datatypes( + root_dir=params["job_config"]["GALAXY_ROOT_DIR"], + config=params["job_config"]["GALAXY_DATATYPES_CONF_FILE"], + ) + + for data_dict in params["output_data"]: + cur_filename = data_dict["file_name"] + cur_URL = params["param_dict"].get( + "%s|%s|URL" % (GALAXY_PARAM_PREFIX, + data_dict["out_data_name"]), URL + ) + if not cur_URL or urlparse(cur_URL).scheme not in ("http", "https", + "ftp"): + open(cur_filename, "w").write("") + sys.exit( + "The remote data source application has not sent " + "back a URL parameter in the request." + ) + + try: + if URL_method == "get": + page = urlopen(cur_URL, timeout=DEFAULT_SOCKET_TIMEOUT) + elif URL_method == "post": + param_dict = params["param_dict"] + page = urlopen( + cur_URL, + urlencode(param_dict["incoming_request_params"]).encode( + "utf-8" + ), + timeout=DEFAULT_SOCKET_TIMEOUT, + ) + except Exception as e: + sys.exit( + "The remote data source application may " + "be off line, please try again later. Error: %s" + % str(e) + ) + if max_file_size: + file_size = int(page.info().get("Content-Length", 0)) + if file_size > max_file_size: + sys.exit( + "The requested data size (%d bytes) exceeds the maximum" + "allowed size (%d bytes) on this server." + % (file_size, max_file_size) + ) + try: + cur_filename = stream_to_open_named_file( + page, + os.open( + cur_filename, + os.O_WRONLY | os.O_TRUNC | os.O_CREAT + ), + cur_filename, + source_encoding=get_charset_from_http_headers(page.headers), + ) + except Exception as e: + sys.exit("Unable to fetch %s:\n%s" % (cur_URL, e)) + + try: + ext = sniff.handle_uploaded_dataset_file( + cur_filename, datatypes_registry, ext=data_dict["ext"] + ) + except Exception as e: + sys.exit(str(e)) + + tool_provided_metadata = {out_data_name: {"ext": ext}} + + with open( + params["job_config"]["TOOL_PROVIDED_JOB_METADATA_FILE"], "w" + ) as json_file: + json.dump(tool_provided_metadata, json_file) + + +if __name__ == "__main__": + main()