Mercurial > repos > ecology > aquainfra_ogc_api_processes
comparison aquainfra_ogc_api_processes.R @ 0:0077885b6f1d draft
planemo upload for repository https://github.com/AquaINFRA/tools-ecology/tree/master commit 6db8e8425f0525fc2e5df8cb43beb3b14024d0ab
| author | ecology | 
|---|---|
| date | Mon, 14 Oct 2024 12:23:01 +0000 | 
| parents | |
| children | 1ff618d89af4 | 
   comparison
  equal
  deleted
  inserted
  replaced
| -1:000000000000 | 0:0077885b6f1d | 
|---|---|
| 1 library("httr2") | |
| 2 library("jsonlite") | |
| 3 library("getopt") | |
| 4 | |
| 5 cat("start generic wrapper service \n") | |
| 6 | |
| 7 remove_null_values <- function(x) { | |
| 8 # Check if the input is a list | |
| 9 if (is.list(x)) { | |
| 10 # Remove NULL values and apply the function recursively to sublists | |
| 11 x <- lapply(x, remove_null_values) | |
| 12 x <- x[!sapply(x, is.null)] | |
| 13 } | |
| 14 return(x) | |
| 15 } | |
| 16 | |
| 17 getParameters <- function() { | |
| 18 con <- file("inputs.json", "r") | |
| 19 lines <- readLines(con) | |
| 20 close(con) | |
| 21 | |
| 22 json_string <- paste(lines, collapse = "\n") | |
| 23 json_data <- fromJSON(json_string) | |
| 24 | |
| 25 # Remove NULL values from json_data | |
| 26 cleaned_json_data <- remove_null_values(json_data) | |
| 27 return(cleaned_json_data$conditional_process) | |
| 28 } | |
| 29 | |
| 30 parseResponseBody <- function(body) { | |
| 31 hex <- c(body) | |
| 32 intValues <- as.integer(hex) | |
| 33 rawVector <- as.raw(intValues) | |
| 34 readableOutput <- rawToChar(rawVector) | |
| 35 jsonObject <- jsonlite::fromJSON(readableOutput) | |
| 36 return(jsonObject) | |
| 37 } | |
| 38 | |
| 39 getOutputs <- function(inputs, output, server) { | |
| 40 url <- | |
| 41 paste(paste(server, "/processes/", sep = ""), | |
| 42 inputs$select_process, | |
| 43 sep = "") | |
| 44 request <- request(url) | |
| 45 response <- req_perform(request) | |
| 46 responseBody <- parseResponseBody(response$body) | |
| 47 outputs <- list() | |
| 48 | |
| 49 for (x in 1:length(responseBody$outputs)) { | |
| 50 outputformatName <- | |
| 51 paste(names(responseBody$outputs[x]), "_outformat", sep = "") | |
| 52 output_item <- list() | |
| 53 | |
| 54 for (p in names(inputs)) { | |
| 55 if (p == outputformatName) { | |
| 56 format <- list("mediaType" = inputs[[outputformatName]]) | |
| 57 output_item$format <- format | |
| 58 } | |
| 59 } | |
| 60 output_item$transmissionMode <- "reference" | |
| 61 outputs[[x]] <- output_item | |
| 62 } | |
| 63 | |
| 64 names(outputs) <- names(responseBody$outputs) | |
| 65 return(outputs) | |
| 66 } | |
| 67 | |
| 68 executeProcess <- function(url, process, requestBodyData) { | |
| 69 url <- | |
| 70 paste(paste(paste(url, "processes/", sep = ""), process, sep = ""), "/execution", sep = "") | |
| 71 requestBodyData$inputs$select_process <- NULL | |
| 72 | |
| 73 body <- list() | |
| 74 body$inputs <- requestBodyData$inputs | |
| 75 | |
| 76 response <- request(url) %>% | |
| 77 req_headers("Content-Type" = "application/json", | |
| 78 "Prefer" = "respond-async") %>% | |
| 79 req_body_json(body) %>% | |
| 80 req_perform() | |
| 81 | |
| 82 cat("\n Process executed") | |
| 83 cat("\n status: ", response$status_code) | |
| 84 #if ( process == "barplot-trend-results") { | |
| 85 # process = "batplot-trend-results" | |
| 86 #} | |
| 87 #href <- parseResponseBody(response$body)$outputs[[gsub("-", "_", process)]]$href | |
| 88 jobId <- parseResponseBody(response$body)$jobID | |
| 89 | |
| 90 return(jobId) | |
| 91 } | |
| 92 | |
| 93 checkJobStatus <- function(server, process, jobID) { | |
| 94 url <- paste0(server, "jobs/", jobID) | |
| 95 response <- request(url) %>% | |
| 96 req_perform() | |
| 97 jobStatus <- parseResponseBody(response$body)$status | |
| 98 jobProgress <- parseResponseBody(response$body)$progress | |
| 99 return(jobStatus) | |
| 100 } | |
| 101 | |
| 102 getStatusCode <- function(server, process, jobID) { | |
| 103 url <- paste0(server, "jobs/", jobID) | |
| 104 print(url) | |
| 105 response <- request(url) %>% | |
| 106 req_perform() | |
| 107 status_code <- response$status_code | |
| 108 return(status_code) | |
| 109 } | |
| 110 | |
| 111 getResult <- function (server, process, jobID) { | |
| 112 response <- | |
| 113 request(paste0(server, "jobs/", jobID, "/results?f=json")) %>% | |
| 114 req_perform() | |
| 115 return(response) | |
| 116 } | |
| 117 | |
| 118 # Recursive function to search for href in a nested list | |
| 119 findHref <- function(obj) { | |
| 120 hrefs <- c() # Initialize an empty vector to store hrefs | |
| 121 | |
| 122 if (is.list(obj)) { | |
| 123 # If the object is a list, loop through its elements | |
| 124 for (name in names(obj)) { | |
| 125 element <- obj[[name]] | |
| 126 | |
| 127 if (is.list(element)) { | |
| 128 # Recursively search if the element is another list | |
| 129 hrefs <- c(hrefs, findHref(element)) | |
| 130 } else if (name == "href") { | |
| 131 # If the element has a name "href", capture its value | |
| 132 hrefs <- c(hrefs, element) | |
| 133 } | |
| 134 } | |
| 135 } | |
| 136 return(hrefs) | |
| 137 } | |
| 138 | |
| 139 retrieveResults <- function(server, process, jobID, outputData) { | |
| 140 status_code <- getStatusCode(server, process, jobID) | |
| 141 print(status_code) | |
| 142 | |
| 143 if (status_code == 200) { | |
| 144 status <- "running" | |
| 145 | |
| 146 while (status == "running") { | |
| 147 jobStatus <- checkJobStatus(server, process, jobID) | |
| 148 print(jobStatus) | |
| 149 | |
| 150 if (jobStatus == "successful") { | |
| 151 status <- jobStatus | |
| 152 result <- getResult(server, process, jobID) | |
| 153 | |
| 154 if (result$status_code == 200) { | |
| 155 resultBody <- parseResponseBody(result$body) | |
| 156 print(resultBody) | |
| 157 | |
| 158 # Call the recursive function to find all hrefs | |
| 159 hrefs <- findHref(resultBody) | |
| 160 | |
| 161 if (length(hrefs) > 0) { | |
| 162 # Collapse the URLs with a newline | |
| 163 urls_with_newline <- paste(hrefs, collapse = "\n") | |
| 164 print(urls_with_newline) | |
| 165 | |
| 166 # Write the URLs to a file | |
| 167 con <- file(outputData, "w") | |
| 168 writeLines(urls_with_newline, con = con) | |
| 169 close(con) | |
| 170 } else { | |
| 171 print("No hrefs found.") | |
| 172 } | |
| 173 } | |
| 174 } else if (jobStatus == "failed") { | |
| 175 status <- jobStatus | |
| 176 } | |
| 177 Sys.sleep(3) | |
| 178 } | |
| 179 | |
| 180 cat("\n done \n") | |
| 181 | |
| 182 } else if (status_code1 == 400) { | |
| 183 print("A query parameter has an invalid value.") | |
| 184 } else if (status_code1 == 404) { | |
| 185 print("The requested URI was not found.") | |
| 186 } else if (status_code1 == 500) { | |
| 187 print("The requested URI was not found.") | |
| 188 } else { | |
| 189 print(paste("HTTP", status_code1, "Error:", resp1$status_message)) | |
| 190 } | |
| 191 } | |
| 192 | |
| 193 | |
| 194 | |
| 195 saveResult <- function(href, outputData) { | |
| 196 con <- file(outputData, "w") | |
| 197 writeLines(href, con = con) | |
| 198 close(con) | |
| 199 } | |
| 200 | |
| 201 is_url <- function(x) { | |
| 202 grepl("^https?://", x) | |
| 203 } | |
| 204 | |
| 205 server <- "https://aqua.igb-berlin.de/pygeoapi-dev/" | |
| 206 | |
| 207 print("--> Retrieve parameters") | |
| 208 inputParameters <- getParameters() | |
| 209 #print(inputParameters) | |
| 210 print("--> Parameters retrieved") | |
| 211 | |
| 212 args <- commandArgs(trailingOnly = TRUE) | |
| 213 outputLocation <- args[2] | |
| 214 | |
| 215 print("--> Retrieve outputs") | |
| 216 outputs <- getOutputs(inputParameters, outputLocation, server) | |
| 217 print("--> Outputs retrieved") | |
| 218 | |
| 219 print("--> Parse inputs") | |
| 220 convertedKeys <- c() | |
| 221 for (key in names(inputParameters)) { | |
| 222 if (is.character(inputParameters[[key]]) && | |
| 223 (endsWith(inputParameters[[key]], ".dat") || | |
| 224 endsWith(inputParameters[[key]], ".txt"))) { | |
| 225 con <- file(inputParameters[[key]], "r") | |
| 226 url_list <- list() | |
| 227 #while (length(line <- readLines(con, n = 1)) > 0) { | |
| 228 # if (is_url(line)) { | |
| 229 # url_list <- c(url_list, list(list(href = trimws(line)))) | |
| 230 # } | |
| 231 #} | |
| 232 con <- file(inputParameters[[key]], "r") | |
| 233 lines <- readLines(con) | |
| 234 print("--------------------------------------------------------------------1") | |
| 235 print(length(lines)) | |
| 236 close(con) | |
| 237 if (!length(lines) > 1 && endsWith(lines, ".jp2") && startsWith(lines, "https")) { | |
| 238 print("--------------------------------------------------------------------2") | |
| 239 tmp <- list() | |
| 240 tmp$href <- lines | |
| 241 tmp$type <- "image/jp2" | |
| 242 inputParameters[[key]] <- tmp | |
| 243 } | |
| 244 else if (!length(lines) > 1 && endsWith(lines, ".zip") && startsWith(lines, "https")) { | |
| 245 print("--------------------------------------------------------------------3") | |
| 246 json_string <- paste(lines, collapse = "\n") | |
| 247 inputParameters[[key]] <- json_string | |
| 248 } else if (!length(lines) > 1 && (endsWith(lines, ".xlsx") || endsWith(lines, ".csv") || grepl("f=csv", lines)) && startsWith(lines, "https")) { | |
| 249 print("--------------------------------------------------------------------4") | |
| 250 json_string <- paste(lines, collapse = "\n") | |
| 251 inputParameters[[key]] <- json_string | |
| 252 } else if (inputParameters$select_process == "plot-image" || | |
| 253 inputParameters$select_process == "reproject-image") { | |
| 254 print("--------------------------------------------------------------------5") | |
| 255 tmp <- list() | |
| 256 tmp$href <- lines | |
| 257 tmp$type <- "image/tiff; application=geotiff" | |
| 258 if (inputParameters$select_process == "reproject-image") { | |
| 259 tmp$type <- "image/tiff; subtype=geotiff" | |
| 260 } | |
| 261 inputParameters[[key]] <- tmp | |
| 262 } else { | |
| 263 print("-----------------------------------6") | |
| 264 json_string <- paste(lines, collapse = "\n") | |
| 265 json_data <- fromJSON(json_string) | |
| 266 inputParameters[[key]] <- json_data | |
| 267 } | |
| 268 convertedKeys <- append(convertedKeys, key) | |
| 269 } | |
| 270 else if (grepl("_Array_", key)) { | |
| 271 keyParts <- strsplit(key, split = "_")[[1]] | |
| 272 type <- keyParts[length(keyParts)] | |
| 273 values <- inputParameters[[key]] | |
| 274 value_list <- strsplit(values, split = ",") | |
| 275 convertedValues <- c() | |
| 276 | |
| 277 for (value in value_list) { | |
| 278 if (type == "integer") { | |
| 279 value <- as.integer(value) | |
| 280 } else if (type == "numeric") { | |
| 281 value <- as.numeric(value) | |
| 282 } else if (type == "character") { | |
| 283 value <- as.character(value) | |
| 284 } | |
| 285 convertedValues <- append(convertedValues, value) | |
| 286 | |
| 287 convertedKey <- "" | |
| 288 for (part in keyParts) { | |
| 289 if (part == "Array") { | |
| 290 break | |
| 291 } | |
| 292 convertedKey <- | |
| 293 paste(convertedKey, paste(part, "_", sep = ""), sep = "") | |
| 294 } | |
| 295 convertedKey <- substr(convertedKey, 1, nchar(convertedKey) - 1) | |
| 296 } | |
| 297 | |
| 298 inputParameters[[key]] <- convertedValues | |
| 299 print("-------------------------") | |
| 300 print(convertedValues) | |
| 301 print("-------------------------") | |
| 302 convertedKeys <- append(convertedKeys, convertedKey) | |
| 303 } else { | |
| 304 print("-------------------------") | |
| 305 print(key) | |
| 306 print(inputParameters[[key]]) | |
| 307 if (!is.null(inputParameters[[key]])) { | |
| 308 convertedKeys <- append(convertedKeys, key) | |
| 309 } | |
| 310 print("-------------------------") | |
| 311 | |
| 312 } | |
| 313 } | |
| 314 print(inputParameters) | |
| 315 names(inputParameters) <- convertedKeys | |
| 316 #print(inputParameters) | |
| 317 print("--> Inputs parsed") | |
| 318 | |
| 319 print("--> Prepare process execution") | |
| 320 jsonData <- list("inputs" = inputParameters, | |
| 321 "outputs" = outputs) | |
| 322 | |
| 323 print("--> Execute process") | |
| 324 jobId <- executeProcess(server, inputParameters$select_process, jsonData) | |
| 325 print("--> Process executed") | |
| 326 | |
| 327 print("--> Retrieve results") | |
| 328 retrieveResults(server, inputParameters$select_process, jobId, outputLocation) | |
| 329 print("--> Results retrieved") | 
