Mercurial > repos > ecology > hirondelle_crim
view ogc_api_processes_wrapper.R @ 1:52baddd15640 draft default tip
planemo upload for repository https://github.com/galaxyecology/tools-ecology/tree/master/tools/ogc_api_processes_wrapper commit 8b4b58222af2c77abd41dd8f17862a24ca7d3381
author | ecology |
---|---|
date | Fri, 06 Sep 2024 10:30:30 +0000 |
parents | afede0916f0a |
children |
line wrap: on
line source
library("httr2") library("jsonlite") library("getopt") cat("start generic wrapper service \n") remove_null_values <- function(x) { # Check if the input is a list if (is.list(x)) { # Remove NULL values and apply the function recursively to sublists x <- lapply(x, remove_null_values) x <- x[!sapply(x, is.null)] } return(x) } getParameters <- function() { con <- file("inputs.json", "r") lines <- readLines(con) close(con) json_string <- paste(lines, collapse = "\n") json_data <- fromJSON(json_string) # Remove NULL values from json_data cleaned_json_data <- remove_null_values(json_data) return(cleaned_json_data$conditional_process) } parseResponseBody <- function(body) { hex <- c(body) intValues <- as.integer(hex) rawVector <- as.raw(intValues) readableOutput <- rawToChar(rawVector) jsonObject <- jsonlite::fromJSON(readableOutput) return(jsonObject) } getOutputs <- function(inputs, output, server) { url <- paste(paste(server, "/processes/", sep = ""), inputs$select_process, sep = "") request <- request(url) response <- req_perform(request) responseBody <- parseResponseBody(response$body) outputs <- list() for (x in 1:length(responseBody$outputs)) { outputformatName <- paste(names(responseBody$outputs[x]), "_outformat", sep = "") output_item <- list() for (p in names(inputs)) { if (p == outputformatName) { format <- list("mediaType" = inputs[[outputformatName]]) output_item$format <- format } } output_item$transmissionMode <- "reference" outputs[[x]] <- output_item } names(outputs) <- names(responseBody$outputs) return(outputs) } executeProcess <- function(url, process, requestBodyData, cookie) { url <- paste(paste(paste(url, "processes/", sep = ""), process, sep = ""), "/execution", sep = "") requestBodyData$inputs$cookie <- NULL requestBodyData$inputs$select_process <- NULL requestBodyData$inputs$s3_access_key <- requestBodyData$inputs$user_credentials$s3_access_key requestBodyData$inputs$s3_secret_key <- requestBodyData$inputs$user_credentials$s3_secret_key requestBodyData$inputs$user_credentials <- NULL if (process == "plot-image") { tmp <- requestBodyData$inputs$color_scale color_scale <- gsub("__ob__", "[", tmp) color_scale <- gsub("__cb__", "]", color_scale) requestBodyData$inputs$color_scale <- color_scale #print(requestBodyData$inputs$color_scale) } if (process == "calculate-band") { requestBodyData$inputs$name <- "output" } if (process == "reproject-image") { requestBodyData$inputs$output_name <- "output" } #requestBodyData$inputs$input_image$href <- "https://hirondelle.crim.ca/wpsoutputs/weaver/public/test-data/S2A_MSIL2A_20190701T110621_N0500_R137_T29SPC_20230604T023542_turbidity.tiff" body <- list() body$inputs <- requestBodyData$inputs #print(body$inputs) body$mode <- "async" body$response <- "document" #print(body$inputs) response <- request(url) %>% req_headers("Accept" = "application/json", "Content-Type" = "application/json", "Cookie" = cookie) %>% req_body_json(body) %>% req_perform() cat("\n Process executed") cat("\n status: ", response$status_code) cat("\n jobID: ", parseResponseBody(response$body)$jobID, "\n") jobID <- parseResponseBody(response$body)$jobID return(jobID) } checkJobStatus <- function(server, process, jobID, cookie) { url <- paste0(server, "processes/", process, "/jobs/", jobID) response <- request(url) %>% req_headers("Cookie" = cookie) %>% req_perform() jobStatus <- parseResponseBody(response$body)$status jobProgress <- parseResponseBody(response$body)$progress return(jobStatus) } getStatusCode <- function(server, process, jobID, cookie) { url <- paste0(server, "processes/", process, "/jobs/", jobID) response <- request(url) %>% req_headers("Cookie" = cookie) %>% req_perform() status_code <- response$status_code return(status_code) } getResult <- function (server, process, jobID, cookie) { response <- request(paste0(server, "processes/", process, "/jobs/", jobID, "/results")) %>% req_headers("Cookie" = cookie) %>% req_perform() return(response) } retrieveResults <- function(server, process, jobID, outputData, cookie) { status_code <- getStatusCode(server, process, jobID, cookie) if (status_code == 200) { status <- "running" while (status == "running") { jobStatus <- checkJobStatus(server, process, jobID, cookie) print(jobStatus) if (jobStatus == "succeeded") { status <- jobStatus result <- getResult(server, process, jobID, cookie) if (result$status_code == 200) { resultBody <- parseResponseBody(result$body) #print(resultBody) if (process == "select-products-sentinel2") { urls <- unname(unlist(lapply(resultBody, function(x) x$value))) } else if (process == "download-band-sentinel2-product-safe" || process == "calculate-band" || process == "plot-image" || process == "reproject-image") { urls <- unname(unlist(lapply(resultBody, function(x) x$href))) } urls_with_newline <- paste(urls, collapse = "\n") con <- file(outputData, "w") writeLines(urls_with_newline, con = con) close(con) } } else if (jobStatus == "failed") { status <- jobStatus } Sys.sleep(3) } cat("\n done \n") } else if (status_code1 == 400) { print("A query parameter has an invalid value.") } else if (status_code1 == 404) { print("The requested URI was not found.") } else if (status_code1 == 500) { print("The requested URI was not found.") } else { print(paste("HTTP", status_code1, "Error:", resp1$status_message)) } } is_url <- function(x) { grepl("^https?://", x) } server <- "https://hirondelle.crim.ca/weaver/" print("--> Retrieve parameters") inputParameters <- getParameters() #print(inputParameters) print("--> Parameters retrieved") args <- commandArgs(trailingOnly = TRUE) outputLocation <- args[2] print("--> Retrieve outputs") outputs <- getOutputs(inputParameters, outputLocation, server) print("--> Outputs retrieved") print("--> Parse inputs") convertedKeys <- c() for (key in names(inputParameters)) { if (is.character(inputParameters[[key]]) && (endsWith(inputParameters[[key]], ".dat") || endsWith(inputParameters[[key]], ".txt"))) { con <- file(inputParameters[[key]], "r") url_list <- list() #while (length(line <- readLines(con, n = 1)) > 0) { # if (is_url(line)) { # url_list <- c(url_list, list(list(href = trimws(line)))) # } #} con <- file(inputParameters[[key]], "r") lines <- readLines(con) print("--------------------------------------------------------------------1") print(length(lines)) close(con) if (!length(lines) > 1 && endsWith(lines, ".jp2") && startsWith(lines, "https")) { print("--------------------------------------------------------------------2") tmp <- list() tmp$href <- lines tmp$type <- "image/jp2" inputParameters[[key]] <- tmp } else if (!length(lines) > 1 && endsWith(lines, ".SAFE") && startsWith(lines, "s3:")) { print("--------------------------------------------------------------------3") json_string <- paste(lines, collapse = "\n") inputParameters[[key]] <- json_string } else if (inputParameters$select_process == "plot-image" || inputParameters$select_process == "reproject-image") { print("--------------------------------------------------------------------4") tmp <- list() tmp$href <- lines tmp$type <- "image/tiff; application=geotiff" if (inputParameters$select_process == "reproject-image") { tmp$type <- "image/tiff; subtype=geotiff" } inputParameters[[key]] <- tmp } else { print("-----------------------------------5") json_string <- paste(lines, collapse = "\n") json_data <- fromJSON(json_string) inputParameters[[key]] <- json_data } convertedKeys <- append(convertedKeys, key) } else if (grepl("_Array_", key)) { keyParts <- strsplit(key, split = "_")[[1]] type <- keyParts[length(keyParts)] values <- inputParameters[[key]] value_list <- strsplit(values, split = ",") convertedValues <- c() for (value in value_list) { if (type == "integer") { value <- as.integer(value) } else if (type == "numeric") { value <- as.numeric(balue) } else if (type == "character") { value <- as.character(value) } convertedValues <- append(convertedValues, value) convertedKey <- "" for (part in keyParts) { if (part == "Array") { break } convertedKey <- paste(convertedKey, paste(part, "_", sep = ""), sep = "") } convertedKey <- substr(convertedKey, 1, nchar(convertedKey) - 1) } inputParameters[[key]] <- convertedValues #print("-------------------------") #print(convertedValues) #print("-------------------------") convertedKeys <- append(convertedKeys, convertedKey) } else { #print("-------------------------") #print(key) #print(inputParameters[[key]]) if (!is.null(inputParameters[[key]])) { convertedKeys <- append(convertedKeys, key) } #print("-------------------------") } } #print(inputParameters) names(inputParameters) <- convertedKeys #print(inputParameters) print("--> Inputs parsed") print("--> Prepare process execution") jsonData <- list("inputs" = inputParameters, "outputs" = outputs) cookie <- inputParameters$cookie print("--> Execute process") jobID <- executeProcess(server, inputParameters$select_process, jsonData, cookie) print("--> Process executed") print("--> Retrieve results") retrieveResults(server, inputParameters$select_process, jobID, outputLocation, cookie) print("--> Results retrieved")