comparison aquainfra_ogc_api_processes.R @ 0:3073e68373a5 draft default tip

planemo upload for repository https://github.com/AquaINFRA/tools-ecology/tree/master commit d07f89c8f6d6ee44c05de6b1b8047b387e76cad6
author ecology
date Thu, 10 Oct 2024 09:56:27 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3073e68373a5
1 library("httr2")
2 library("jsonlite")
3 library("getopt")
4
5 cat("start generic wrapper service \n")
6
7 remove_null_values <- function(x) {
8 # Check if the input is a list
9 if (is.list(x)) {
10 # Remove NULL values and apply the function recursively to sublists
11 x <- lapply(x, remove_null_values)
12 x <- x[!sapply(x, is.null)]
13 }
14 return(x)
15 }
16
17 getParameters <- function() {
18 con <- file("inputs.json", "r")
19 lines <- readLines(con)
20 close(con)
21
22 json_string <- paste(lines, collapse = "\n")
23 json_data <- fromJSON(json_string)
24
25 # Remove NULL values from json_data
26 cleaned_json_data <- remove_null_values(json_data)
27 return(cleaned_json_data$conditional_process)
28 }
29
30 parseResponseBody <- function(body) {
31 hex <- c(body)
32 intValues <- as.integer(hex)
33 rawVector <- as.raw(intValues)
34 readableOutput <- rawToChar(rawVector)
35 jsonObject <- jsonlite::fromJSON(readableOutput)
36 return(jsonObject)
37 }
38
39 getOutputs <- function(inputs, output, server) {
40 url <-
41 paste(paste(server, "/processes/", sep = ""),
42 inputs$select_process,
43 sep = "")
44 request <- request(url)
45 response <- req_perform(request)
46 responseBody <- parseResponseBody(response$body)
47 outputs <- list()
48
49 for (x in 1:length(responseBody$outputs)) {
50 outputformatName <-
51 paste(names(responseBody$outputs[x]), "_outformat", sep = "")
52 output_item <- list()
53
54 for (p in names(inputs)) {
55 if (p == outputformatName) {
56 format <- list("mediaType" = inputs[[outputformatName]])
57 output_item$format <- format
58 }
59 }
60 output_item$transmissionMode <- "reference"
61 outputs[[x]] <- output_item
62 }
63
64 names(outputs) <- names(responseBody$outputs)
65 return(outputs)
66 }
67
68 executeProcess <- function(url, process, requestBodyData) {
69 url <-
70 paste(paste(paste(url, "processes/", sep = ""), process, sep = ""), "/execution", sep = "")
71 requestBodyData$inputs$select_process <- NULL
72
73 body <- list()
74 body$inputs <- requestBodyData$inputs
75
76 response <- request(url) %>%
77 req_headers("Content-Type" = "application/json",
78 "Prefer" = "respond-async") %>%
79 req_body_json(body) %>%
80 req_perform()
81
82 cat("\n Process executed")
83 cat("\n status: ", response$status_code)
84 #if ( process == "barplot-trend-results") {
85 # process = "batplot-trend-results"
86 #}
87 #href <- parseResponseBody(response$body)$outputs[[gsub("-", "_", process)]]$href
88 jobId <- parseResponseBody(response$body)$jobID
89
90 return(jobId)
91 }
92
93 checkJobStatus <- function(server, process, jobID) {
94 url <- paste0(server, "jobs/", jobID)
95 response <- request(url) %>%
96 req_perform()
97 jobStatus <- parseResponseBody(response$body)$status
98 jobProgress <- parseResponseBody(response$body)$progress
99 return(jobStatus)
100 }
101
102 getStatusCode <- function(server, process, jobID) {
103 url <- paste0(server, "jobs/", jobID)
104 print(url)
105 response <- request(url) %>%
106 req_perform()
107 status_code <- response$status_code
108 return(status_code)
109 }
110
111 getResult <- function (server, process, jobID) {
112 response <-
113 request(paste0(server, "jobs/", jobID, "/results?f=json")) %>%
114 req_perform()
115 return(response)
116 }
117
118 # Recursive function to search for href in a nested list
119 findHref <- function(obj) {
120 hrefs <- c() # Initialize an empty vector to store hrefs
121
122 if (is.list(obj)) {
123 # If the object is a list, loop through its elements
124 for (name in names(obj)) {
125 element <- obj[[name]]
126
127 if (is.list(element)) {
128 # Recursively search if the element is another list
129 hrefs <- c(hrefs, findHref(element))
130 } else if (name == "href") {
131 # If the element has a name "href", capture its value
132 hrefs <- c(hrefs, element)
133 }
134 }
135 }
136 return(hrefs)
137 }
138
139 retrieveResults <- function(server, process, jobID, outputData) {
140 status_code <- getStatusCode(server, process, jobID)
141 print(status_code)
142
143 if (status_code == 200) {
144 status <- "running"
145
146 while (status == "running") {
147 jobStatus <- checkJobStatus(server, process, jobID)
148 print(jobStatus)
149
150 if (jobStatus == "successful") {
151 status <- jobStatus
152 result <- getResult(server, process, jobID)
153
154 if (result$status_code == 200) {
155 resultBody <- parseResponseBody(result$body)
156 print(resultBody)
157
158 # Call the recursive function to find all hrefs
159 hrefs <- findHref(resultBody)
160
161 if (length(hrefs) > 0) {
162 # Collapse the URLs with a newline
163 urls_with_newline <- paste(hrefs, collapse = "\n")
164 print(urls_with_newline)
165
166 # Write the URLs to a file
167 con <- file(outputData, "w")
168 writeLines(urls_with_newline, con = con)
169 close(con)
170 } else {
171 print("No hrefs found.")
172 }
173 }
174 } else if (jobStatus == "failed") {
175 status <- jobStatus
176 }
177 Sys.sleep(3)
178 }
179
180 cat("\n done \n")
181
182 } else if (status_code1 == 400) {
183 print("A query parameter has an invalid value.")
184 } else if (status_code1 == 404) {
185 print("The requested URI was not found.")
186 } else if (status_code1 == 500) {
187 print("The requested URI was not found.")
188 } else {
189 print(paste("HTTP", status_code1, "Error:", resp1$status_message))
190 }
191 }
192
193
194
195 saveResult <- function(href, outputData) {
196 con <- file(outputData, "w")
197 writeLines(href, con = con)
198 close(con)
199 }
200
201 is_url <- function(x) {
202 grepl("^https?://", x)
203 }
204
205 server <- "https://aqua.igb-berlin.de/pygeoapi-dev/"
206
207 print("--> Retrieve parameters")
208 inputParameters <- getParameters()
209 #print(inputParameters)
210 print("--> Parameters retrieved")
211
212 args <- commandArgs(trailingOnly = TRUE)
213 outputLocation <- args[2]
214
215 print("--> Retrieve outputs")
216 outputs <- getOutputs(inputParameters, outputLocation, server)
217 print("--> Outputs retrieved")
218
219 print("--> Parse inputs")
220 convertedKeys <- c()
221 for (key in names(inputParameters)) {
222 if (is.character(inputParameters[[key]]) &&
223 (endsWith(inputParameters[[key]], ".dat") ||
224 endsWith(inputParameters[[key]], ".txt"))) {
225 con <- file(inputParameters[[key]], "r")
226 url_list <- list()
227 #while (length(line <- readLines(con, n = 1)) > 0) {
228 # if (is_url(line)) {
229 # url_list <- c(url_list, list(list(href = trimws(line))))
230 # }
231 #}
232 con <- file(inputParameters[[key]], "r")
233 lines <- readLines(con)
234 print("--------------------------------------------------------------------1")
235 print(length(lines))
236 close(con)
237 if (!length(lines) > 1 && endsWith(lines, ".jp2") && startsWith(lines, "https")) {
238 print("--------------------------------------------------------------------2")
239 tmp <- list()
240 tmp$href <- lines
241 tmp$type <- "image/jp2"
242 inputParameters[[key]] <- tmp
243 }
244 else if (!length(lines) > 1 && endsWith(lines, ".zip") && startsWith(lines, "https")) {
245 print("--------------------------------------------------------------------3")
246 json_string <- paste(lines, collapse = "\n")
247 inputParameters[[key]] <- json_string
248 } else if (!length(lines) > 1 && (endsWith(lines, ".xlsx") || endsWith(lines, ".csv") || grepl("f=csv", lines)) && startsWith(lines, "https")) {
249 print("--------------------------------------------------------------------4")
250 json_string <- paste(lines, collapse = "\n")
251 inputParameters[[key]] <- json_string
252 } else if (inputParameters$select_process == "plot-image" ||
253 inputParameters$select_process == "reproject-image") {
254 print("--------------------------------------------------------------------5")
255 tmp <- list()
256 tmp$href <- lines
257 tmp$type <- "image/tiff; application=geotiff"
258 if (inputParameters$select_process == "reproject-image") {
259 tmp$type <- "image/tiff; subtype=geotiff"
260 }
261 inputParameters[[key]] <- tmp
262 } else {
263 print("-----------------------------------6")
264 json_string <- paste(lines, collapse = "\n")
265 json_data <- fromJSON(json_string)
266 inputParameters[[key]] <- json_data
267 }
268 convertedKeys <- append(convertedKeys, key)
269 }
270 else if (grepl("_Array_", key)) {
271 keyParts <- strsplit(key, split = "_")[[1]]
272 type <- keyParts[length(keyParts)]
273 values <- inputParameters[[key]]
274 value_list <- strsplit(values, split = ",")
275 convertedValues <- c()
276
277 for (value in value_list) {
278 if (type == "integer") {
279 value <- as.integer(value)
280 } else if (type == "numeric") {
281 value <- as.numeric(value)
282 } else if (type == "character") {
283 value <- as.character(value)
284 }
285 convertedValues <- append(convertedValues, value)
286
287 convertedKey <- ""
288 for (part in keyParts) {
289 if (part == "Array") {
290 break
291 }
292 convertedKey <-
293 paste(convertedKey, paste(part, "_", sep = ""), sep = "")
294 }
295 convertedKey <- substr(convertedKey, 1, nchar(convertedKey) - 1)
296 }
297
298 inputParameters[[key]] <- convertedValues
299 print("-------------------------")
300 print(convertedValues)
301 print("-------------------------")
302 convertedKeys <- append(convertedKeys, convertedKey)
303 } else {
304 print("-------------------------")
305 print(key)
306 print(inputParameters[[key]])
307 if (!is.null(inputParameters[[key]])) {
308 convertedKeys <- append(convertedKeys, key)
309 }
310 print("-------------------------")
311
312 }
313 }
314 print(inputParameters)
315 names(inputParameters) <- convertedKeys
316 #print(inputParameters)
317 print("--> Inputs parsed")
318
319 print("--> Prepare process execution")
320 jsonData <- list("inputs" = inputParameters,
321 "outputs" = outputs)
322
323 print("--> Execute process")
324 jobId <- executeProcess(server, inputParameters$select_process, jsonData)
325 print("--> Process executed")
326
327 print("--> Retrieve results")
328 retrieveResults(server, inputParameters$select_process, jobId, outputLocation)
329 print("--> Results retrieved")