Mercurial > repos > ecology > aquainfra
diff aquainfra_ogc_api_processes.R @ 0:3073e68373a5 draft default tip
planemo upload for repository https://github.com/AquaINFRA/tools-ecology/tree/master commit d07f89c8f6d6ee44c05de6b1b8047b387e76cad6
author | ecology |
---|---|
date | Thu, 10 Oct 2024 09:56:27 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/aquainfra_ogc_api_processes.R Thu Oct 10 09:56:27 2024 +0000 @@ -0,0 +1,329 @@ +library("httr2") +library("jsonlite") +library("getopt") + +cat("start generic wrapper service \n") + +remove_null_values <- function(x) { + # Check if the input is a list + if (is.list(x)) { + # Remove NULL values and apply the function recursively to sublists + x <- lapply(x, remove_null_values) + x <- x[!sapply(x, is.null)] + } + return(x) +} + +getParameters <- function() { + con <- file("inputs.json", "r") + lines <- readLines(con) + close(con) + + json_string <- paste(lines, collapse = "\n") + json_data <- fromJSON(json_string) + + # Remove NULL values from json_data + cleaned_json_data <- remove_null_values(json_data) + return(cleaned_json_data$conditional_process) +} + +parseResponseBody <- function(body) { + hex <- c(body) + intValues <- as.integer(hex) + rawVector <- as.raw(intValues) + readableOutput <- rawToChar(rawVector) + jsonObject <- jsonlite::fromJSON(readableOutput) + return(jsonObject) +} + +getOutputs <- function(inputs, output, server) { + url <- + paste(paste(server, "/processes/", sep = ""), + inputs$select_process, + sep = "") + request <- request(url) + response <- req_perform(request) + responseBody <- parseResponseBody(response$body) + outputs <- list() + + for (x in 1:length(responseBody$outputs)) { + outputformatName <- + paste(names(responseBody$outputs[x]), "_outformat", sep = "") + output_item <- list() + + for (p in names(inputs)) { + if (p == outputformatName) { + format <- list("mediaType" = inputs[[outputformatName]]) + output_item$format <- format + } + } + output_item$transmissionMode <- "reference" + outputs[[x]] <- output_item + } + + names(outputs) <- names(responseBody$outputs) + return(outputs) +} + +executeProcess <- function(url, process, requestBodyData) { + url <- + paste(paste(paste(url, "processes/", sep = ""), process, sep = ""), "/execution", sep = "") + requestBodyData$inputs$select_process <- NULL + + body <- list() + body$inputs <- requestBodyData$inputs + + response <- request(url) %>% + req_headers("Content-Type" = "application/json", + "Prefer" = "respond-async") %>% + req_body_json(body) %>% + req_perform() + + cat("\n Process executed") + cat("\n status: ", response$status_code) + #if ( process == "barplot-trend-results") { + # process = "batplot-trend-results" + #} + #href <- parseResponseBody(response$body)$outputs[[gsub("-", "_", process)]]$href + jobId <- parseResponseBody(response$body)$jobID + + return(jobId) +} + +checkJobStatus <- function(server, process, jobID) { + url <- paste0(server, "jobs/", jobID) + response <- request(url) %>% + req_perform() + jobStatus <- parseResponseBody(response$body)$status + jobProgress <- parseResponseBody(response$body)$progress + return(jobStatus) +} + +getStatusCode <- function(server, process, jobID) { + url <- paste0(server, "jobs/", jobID) + print(url) + response <- request(url) %>% + req_perform() + status_code <- response$status_code + return(status_code) +} + +getResult <- function (server, process, jobID) { + response <- + request(paste0(server, "jobs/", jobID, "/results?f=json")) %>% + req_perform() + return(response) +} + +# Recursive function to search for href in a nested list +findHref <- function(obj) { + hrefs <- c() # Initialize an empty vector to store hrefs + + if (is.list(obj)) { + # If the object is a list, loop through its elements + for (name in names(obj)) { + element <- obj[[name]] + + if (is.list(element)) { + # Recursively search if the element is another list + hrefs <- c(hrefs, findHref(element)) + } else if (name == "href") { + # If the element has a name "href", capture its value + hrefs <- c(hrefs, element) + } + } + } + return(hrefs) +} + +retrieveResults <- function(server, process, jobID, outputData) { + status_code <- getStatusCode(server, process, jobID) + print(status_code) + + if (status_code == 200) { + status <- "running" + + while (status == "running") { + jobStatus <- checkJobStatus(server, process, jobID) + print(jobStatus) + + if (jobStatus == "successful") { + status <- jobStatus + result <- getResult(server, process, jobID) + + if (result$status_code == 200) { + resultBody <- parseResponseBody(result$body) + print(resultBody) + + # Call the recursive function to find all hrefs + hrefs <- findHref(resultBody) + + if (length(hrefs) > 0) { + # Collapse the URLs with a newline + urls_with_newline <- paste(hrefs, collapse = "\n") + print(urls_with_newline) + + # Write the URLs to a file + con <- file(outputData, "w") + writeLines(urls_with_newline, con = con) + close(con) + } else { + print("No hrefs found.") + } + } + } else if (jobStatus == "failed") { + status <- jobStatus + } + Sys.sleep(3) + } + + cat("\n done \n") + + } else if (status_code1 == 400) { + print("A query parameter has an invalid value.") + } else if (status_code1 == 404) { + print("The requested URI was not found.") + } else if (status_code1 == 500) { + print("The requested URI was not found.") + } else { + print(paste("HTTP", status_code1, "Error:", resp1$status_message)) + } +} + + + +saveResult <- function(href, outputData) { + con <- file(outputData, "w") + writeLines(href, con = con) + close(con) +} + +is_url <- function(x) { + grepl("^https?://", x) +} + +server <- "https://aqua.igb-berlin.de/pygeoapi-dev/" + +print("--> Retrieve parameters") +inputParameters <- getParameters() +#print(inputParameters) +print("--> Parameters retrieved") + +args <- commandArgs(trailingOnly = TRUE) +outputLocation <- args[2] + +print("--> Retrieve outputs") +outputs <- getOutputs(inputParameters, outputLocation, server) +print("--> Outputs retrieved") + +print("--> Parse inputs") +convertedKeys <- c() +for (key in names(inputParameters)) { + if (is.character(inputParameters[[key]]) && + (endsWith(inputParameters[[key]], ".dat") || + endsWith(inputParameters[[key]], ".txt"))) { + con <- file(inputParameters[[key]], "r") + url_list <- list() + #while (length(line <- readLines(con, n = 1)) > 0) { + # if (is_url(line)) { + # url_list <- c(url_list, list(list(href = trimws(line)))) + # } + #} + con <- file(inputParameters[[key]], "r") + lines <- readLines(con) + print("--------------------------------------------------------------------1") + print(length(lines)) + close(con) + if (!length(lines) > 1 && endsWith(lines, ".jp2") && startsWith(lines, "https")) { + print("--------------------------------------------------------------------2") + tmp <- list() + tmp$href <- lines + tmp$type <- "image/jp2" + inputParameters[[key]] <- tmp + } + else if (!length(lines) > 1 && endsWith(lines, ".zip") && startsWith(lines, "https")) { + print("--------------------------------------------------------------------3") + json_string <- paste(lines, collapse = "\n") + inputParameters[[key]] <- json_string + } else if (!length(lines) > 1 && (endsWith(lines, ".xlsx") || endsWith(lines, ".csv") || grepl("f=csv", lines)) && startsWith(lines, "https")) { + print("--------------------------------------------------------------------4") + json_string <- paste(lines, collapse = "\n") + inputParameters[[key]] <- json_string + } else if (inputParameters$select_process == "plot-image" || + inputParameters$select_process == "reproject-image") { + print("--------------------------------------------------------------------5") + tmp <- list() + tmp$href <- lines + tmp$type <- "image/tiff; application=geotiff" + if (inputParameters$select_process == "reproject-image") { + tmp$type <- "image/tiff; subtype=geotiff" + } + inputParameters[[key]] <- tmp + } else { + print("-----------------------------------6") + json_string <- paste(lines, collapse = "\n") + json_data <- fromJSON(json_string) + inputParameters[[key]] <- json_data + } + convertedKeys <- append(convertedKeys, key) + } + else if (grepl("_Array_", key)) { + keyParts <- strsplit(key, split = "_")[[1]] + type <- keyParts[length(keyParts)] + values <- inputParameters[[key]] + value_list <- strsplit(values, split = ",") + convertedValues <- c() + + for (value in value_list) { + if (type == "integer") { + value <- as.integer(value) + } else if (type == "numeric") { + value <- as.numeric(value) + } else if (type == "character") { + value <- as.character(value) + } + convertedValues <- append(convertedValues, value) + + convertedKey <- "" + for (part in keyParts) { + if (part == "Array") { + break + } + convertedKey <- + paste(convertedKey, paste(part, "_", sep = ""), sep = "") + } + convertedKey <- substr(convertedKey, 1, nchar(convertedKey) - 1) + } + + inputParameters[[key]] <- convertedValues + print("-------------------------") + print(convertedValues) + print("-------------------------") + convertedKeys <- append(convertedKeys, convertedKey) + } else { + print("-------------------------") + print(key) + print(inputParameters[[key]]) + if (!is.null(inputParameters[[key]])) { + convertedKeys <- append(convertedKeys, key) + } + print("-------------------------") + + } +} +print(inputParameters) +names(inputParameters) <- convertedKeys +#print(inputParameters) +print("--> Inputs parsed") + +print("--> Prepare process execution") +jsonData <- list("inputs" = inputParameters, + "outputs" = outputs) + +print("--> Execute process") +jobId <- executeProcess(server, inputParameters$select_process, jsonData) +print("--> Process executed") + +print("--> Retrieve results") +retrieveResults(server, inputParameters$select_process, jobId, outputLocation) +print("--> Results retrieved") \ No newline at end of file