Mercurial > repos > ecology > aquainfra_ogc_api_processes
comparison aquainfra_ogc_api_processes.R @ 7:78768c06b60f draft
planemo upload for repository https://github.com/AquaINFRA/tools-ecology/tree/master commit 176da50f4af1ff7f4e0e325984c8edd27fe93926
| author | ecology |
|---|---|
| date | Mon, 21 Jul 2025 06:19:15 +0000 |
| parents | af00a67d3649 |
| children | 71f754d61a04 |
comparison
equal
deleted
inserted
replaced
| 6:5acff8859db2 | 7:78768c06b60f |
|---|---|
| 1 library("httr2") | 1 library("httr2") |
| 2 library("jsonlite") | 2 library("jsonlite") |
| 3 library("getopt") | 3 library("getopt") |
| 4 | 4 |
| 5 cat("start generic wrapper service \n") | 5 cat("START GENERIC WRAPPER SERVICE \n") |
| 6 | 6 |
| 7 remove_null_values <- function(x) { | 7 remove_null_values <- function(x) { |
| 8 if (is.list(x)) { | 8 if (is.list(x)) { |
| 9 x <- lapply(x, remove_null_values) | 9 x <- lapply(x, remove_null_values) |
| 10 x <- x[!sapply(x, is.null)] | 10 x <- x[!sapply(x, is.null)] |
| 31 readableOutput <- rawToChar(rawVector) | 31 readableOutput <- rawToChar(rawVector) |
| 32 jsonObject <- jsonlite::fromJSON(readableOutput) | 32 jsonObject <- jsonlite::fromJSON(readableOutput) |
| 33 return(jsonObject) | 33 return(jsonObject) |
| 34 } | 34 } |
| 35 | 35 |
| 36 getOutputs <- function(inputs, output, server) { | |
| 37 url <- | |
| 38 paste(paste(server, "processes/", sep = ""), | |
| 39 inputs$select_process, | |
| 40 sep = "") | |
| 41 print(url) | |
| 42 request <- request(url) | |
| 43 response <- req_perform(request) | |
| 44 responseBody <- parseResponseBody(response$body) | |
| 45 outputs <- list() | |
| 46 | |
| 47 for (x in 1:length(responseBody$outputs)) { | |
| 48 outputformatName <- | |
| 49 paste(names(responseBody$outputs[x]), "_outformat", sep = "") | |
| 50 output_item <- list() | |
| 51 | |
| 52 for (p in names(inputs)) { | |
| 53 if (p == outputformatName) { | |
| 54 format <- list("mediaType" = inputs[[outputformatName]]) | |
| 55 output_item$format <- format | |
| 56 } | |
| 57 } | |
| 58 output_item$transmissionMode <- "reference" | |
| 59 outputs[[x]] <- output_item | |
| 60 } | |
| 61 | |
| 62 names(outputs) <- names(responseBody$outputs) | |
| 63 return(outputs) | |
| 64 } | |
| 65 | |
| 66 executeProcess <- function(url, process, requestBodyData) { | 36 executeProcess <- function(url, process, requestBodyData) { |
| 67 url <- | 37 url <- |
| 68 paste(paste(paste(url, "processes/", sep = ""), process, sep = ""), "/execution", sep = "") | 38 paste(paste(paste(url, "processes/", sep = ""), process, sep = ""), "/execution", sep = "") |
| 69 requestBodyData$inputs$select_process <- NULL | 39 requestBodyData$inputs$select_process <- NULL |
| 70 | 40 |
| 75 req_headers("Content-Type" = "application/json", | 45 req_headers("Content-Type" = "application/json", |
| 76 "Prefer" = "respond-async") %>% | 46 "Prefer" = "respond-async") %>% |
| 77 req_body_json(body) %>% | 47 req_body_json(body) %>% |
| 78 req_perform() | 48 req_perform() |
| 79 | 49 |
| 80 cat("\n Process executed") | 50 cat("\n 3.1: Process executed") |
| 81 cat("\n status: ", response$status_code) | 51 cat("\n 3.1: Status code: ", response$status_code) |
| 82 jobId <- parseResponseBody(response$body)$jobID | 52 jobId <- parseResponseBody(response$body)$jobID |
| 53 cat("\n 3.1: Job ID: ", jobId, "\n") | |
| 83 | 54 |
| 84 return(jobId) | 55 return(jobId) |
| 85 } | 56 } |
| 86 | 57 |
| 87 checkJobStatus <- function(server, process, jobID) { | 58 checkJobStatus <- function(server, process, jobID) { |
| 91 jobStatus <- parseResponseBody(response$body)$status | 62 jobStatus <- parseResponseBody(response$body)$status |
| 92 jobProgress <- parseResponseBody(response$body)$progress | 63 jobProgress <- parseResponseBody(response$body)$progress |
| 93 return(jobStatus) | 64 return(jobStatus) |
| 94 } | 65 } |
| 95 | 66 |
| 96 getStatusCode <- function(server, process, jobID) { | 67 getStatusCode <- function(url) { |
| 97 url <- paste0(server, "jobs/", jobID) | |
| 98 print(url) | |
| 99 response <- request(url) %>% | 68 response <- request(url) %>% |
| 100 req_perform() | 69 req_perform() |
| 101 status_code <- response$status_code | 70 status_code <- response$status_code |
| 102 return(status_code) | 71 return(status_code) |
| 103 } | 72 } |
| 123 } | 92 } |
| 124 return(hrefs) | 93 return(hrefs) |
| 125 } | 94 } |
| 126 | 95 |
| 127 retrieveResults <- function(server, process, jobID, outputData) { | 96 retrieveResults <- function(server, process, jobID, outputData) { |
| 128 status_code <- getStatusCode(server, process, jobID) | 97 url <- paste0(server, "jobs/", jobID) |
| 129 print(status_code) | 98 cat(" 4.1: Job URL: ", url) |
| 99 status_code <- getStatusCode(url) | |
| 100 cat("\n 4.2: Status code: ", status_code, "\n") | |
| 130 | 101 |
| 131 if (status_code == 200) { | 102 if (status_code == 200) { |
| 132 status <- "running" | 103 status <- "running" |
| 133 | 104 |
| 134 while (status == "running") { | 105 while (status == "running") { |
| 135 jobStatus <- checkJobStatus(server, process, jobID) | 106 jobStatus <- checkJobStatus(server, process, jobID) |
| 136 print(jobStatus) | 107 cat(" 4.3: Job status: ", jobStatus, "\n") |
| 137 | 108 |
| 138 if (jobStatus == "successful") { | 109 if (jobStatus == "successful") { |
| 139 status <- jobStatus | 110 status <- jobStatus |
| 140 result <- getResult(server, process, jobID) | 111 result <- getResult(server, process, jobID) |
| 141 | 112 |
| 142 if (result$status_code == 200) { | 113 if (result$status_code == 200) { |
| 143 resultBody <- parseResponseBody(result$body) | 114 resultBody <- parseResponseBody(result$body) |
| 115 cat("\n 4.4 Outputs: \n") | |
| 144 print(resultBody) | 116 print(resultBody) |
| 145 hrefs <- findHref(resultBody) | 117 hrefs <- findHref(resultBody) |
| 146 | 118 |
| 147 if (length(hrefs) > 0) { | 119 if (length(hrefs) > 0) { |
| 148 urls_with_newline <- paste(hrefs, collapse = "\n") | 120 urls_with_newline <- paste(hrefs, collapse = "\n") |
| 149 print(urls_with_newline) | |
| 150 con <- file(outputData, "w") | 121 con <- file(outputData, "w") |
| 151 writeLines(urls_with_newline, con = con) | 122 writeLines(urls_with_newline, con = con) |
| 152 close(con) | 123 close(con) |
| 153 } else { | 124 } else { |
| 154 print("No hrefs found.") | 125 stop(paste0("Job failed. No hrefs found. See details at: ", server, "jobs/", jobID)) |
| 155 } | 126 } |
| 156 } | 127 } |
| 157 } else if (jobStatus == "failed") { | 128 } else if (jobStatus == "failed") { |
| 158 status <- jobStatus | 129 stop(paste0("Job failed. See details at: ", server, "jobs/", jobID)) |
| 159 } | 130 } |
| 160 Sys.sleep(3) | 131 Sys.sleep(3) |
| 161 } | 132 } |
| 162 | |
| 163 cat("\n done \n") | |
| 164 | 133 |
| 165 } else if (status_code1 == 400) { | 134 } else if (status_code1 == 400) { |
| 166 print("A query parameter has an invalid value.") | 135 print("A query parameter has an invalid value.") |
| 167 } else if (status_code1 == 404) { | 136 } else if (status_code1 == 404) { |
| 168 print("The requested URI was not found.") | 137 print("The requested URI was not found.") |
| 183 grepl("^https?://", x) | 152 grepl("^https?://", x) |
| 184 } | 153 } |
| 185 | 154 |
| 186 server <- "https://aquainfra.ogc.igb-berlin.de/pygeoapi/" | 155 server <- "https://aquainfra.ogc.igb-berlin.de/pygeoapi/" |
| 187 | 156 |
| 188 print("--> Retrieve parameters") | 157 cat("\n1: START RETRIEVING PARAMETERS\n\n") |
| 189 inputParameters <- getParameters() | 158 inputParameters <- getParameters() |
| 190 print("--> Parameters retrieved") | 159 print(inputParameters) |
| 160 cat("1: END RETRIEVING PARAMETERS\n") | |
| 191 | 161 |
| 192 args <- commandArgs(trailingOnly = TRUE) | 162 args <- commandArgs(trailingOnly = TRUE) |
| 193 outputLocation <- args[2] | 163 outputLocation <- args[2] |
| 194 | 164 |
| 195 print("--> Retrieve outputs") | 165 cat("\n2: START PARSING INPUTS\n\n") |
| 196 outputs <- getOutputs(inputParameters, outputLocation, server) | |
| 197 print("--> Outputs retrieved") | |
| 198 | |
| 199 print("--> Parse inputs") | |
| 200 | |
| 201 convertedKeys <- c() | 166 convertedKeys <- c() |
| 202 | 167 |
| 203 for (key in names(inputParameters)) { | 168 for (key in names(inputParameters)) { |
| 204 if (is.character(inputParameters[[key]]) && | 169 if (is.character(inputParameters[[key]]) && |
| 205 (endsWith(inputParameters[[key]], ".dat") || | 170 (endsWith(inputParameters[[key]], ".dat") || |
| 207 con <- file(inputParameters[[key]], "r") | 172 con <- file(inputParameters[[key]], "r") |
| 208 url_list <- list() | 173 url_list <- list() |
| 209 | 174 |
| 210 con <- file(inputParameters[[key]], "r") | 175 con <- file(inputParameters[[key]], "r") |
| 211 lines <- readLines(con) | 176 lines <- readLines(con) |
| 212 print(length(lines)) | |
| 213 close(con) | 177 close(con) |
| 214 | 178 |
| 215 json_string <- paste(lines, collapse = "\n") | 179 json_string <- paste(lines, collapse = ",") |
| 216 inputParameters[[key]] <- json_string | 180 inputParameters[[key]] <- json_string |
| 217 | 181 |
| 218 convertedKeys <- append(convertedKeys, key) | 182 convertedKeys <- append(convertedKeys, key) |
| 219 } | |
| 220 else if (grepl("_Array_", key)) { | |
| 221 keyParts <- strsplit(key, split = "_")[[1]] | |
| 222 type <- keyParts[length(keyParts)] | |
| 223 values <- inputParameters[[key]] | |
| 224 value_list <- strsplit(values, split = ",") | |
| 225 convertedValues <- c() | |
| 226 | |
| 227 for (value in value_list) { | |
| 228 if (type == "integer") { | |
| 229 value <- as.integer(value) | |
| 230 } else if (type == "numeric") { | |
| 231 value <- as.numeric(value) | |
| 232 } else if (type == "character") { | |
| 233 value <- as.character(value) | |
| 234 } | |
| 235 convertedValues <- append(convertedValues, value) | |
| 236 | |
| 237 convertedKey <- "" | |
| 238 for (part in keyParts) { | |
| 239 if (part == "Array") { | |
| 240 break | |
| 241 } | |
| 242 convertedKey <- | |
| 243 paste(convertedKey, paste(part, "_", sep = ""), sep = "") | |
| 244 } | |
| 245 convertedKey <- substr(convertedKey, 1, nchar(convertedKey) - 1) | |
| 246 } | |
| 247 | |
| 248 inputParameters[[key]] <- convertedValues | |
| 249 convertedKeys <- append(convertedKeys, convertedKey) | |
| 250 } else { | 183 } else { |
| 251 if (!is.null(inputParameters[[key]])) { | 184 if (!is.null(inputParameters[[key]])) { |
| 252 convertedKeys <- append(convertedKeys, key) | 185 convertedKeys <- append(convertedKeys, key) |
| 253 } | 186 } |
| 254 } | 187 } |
| 255 } | 188 } |
| 189 names(inputParameters) <- convertedKeys | |
| 256 print(inputParameters) | 190 print(inputParameters) |
| 257 names(inputParameters) <- convertedKeys | 191 cat("2: END PARSING INPUTSs\n") |
| 258 print("--> Inputs parsed") | 192 |
| 259 | 193 cat("\n3: START EXECUTING PROCESS\n") |
| 260 print("--> Prepare process execution") | 194 jsonData <- list("inputs" = inputParameters) |
| 261 jsonData <- list("inputs" = inputParameters, | |
| 262 "outputs" = outputs) | |
| 263 | |
| 264 print("--> Execute process") | |
| 265 jobId <- executeProcess(server, inputParameters$select_process, jsonData) | 195 jobId <- executeProcess(server, inputParameters$select_process, jsonData) |
| 266 print("--> Process executed") | 196 cat("\n3: END EXECUTING PROCESS\n") |
| 267 | 197 |
| 268 print("--> Retrieve results") | 198 cat("\n4: START RETRIEVING RESULTS\n\n") |
| 269 retrieveResults(server, inputParameters$select_process, jobId, outputLocation) | 199 retrieveResults(server, inputParameters$select_process, jobId, outputLocation) |
| 270 print("--> Results retrieved") | 200 cat("4: END RETRIEVING RESULTS\n") |
| 201 | |
| 202 cat("\n5: DONE.") |
