Mercurial > repos > ecology > aquainfra
comparison aquainfra_ogc_api_processes.R @ 0:3073e68373a5 draft default tip
planemo upload for repository https://github.com/AquaINFRA/tools-ecology/tree/master commit d07f89c8f6d6ee44c05de6b1b8047b387e76cad6
author | ecology |
---|---|
date | Thu, 10 Oct 2024 09:56:27 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:3073e68373a5 |
---|---|
1 library("httr2") | |
2 library("jsonlite") | |
3 library("getopt") | |
4 | |
5 cat("start generic wrapper service \n") | |
6 | |
7 remove_null_values <- function(x) { | |
8 # Check if the input is a list | |
9 if (is.list(x)) { | |
10 # Remove NULL values and apply the function recursively to sublists | |
11 x <- lapply(x, remove_null_values) | |
12 x <- x[!sapply(x, is.null)] | |
13 } | |
14 return(x) | |
15 } | |
16 | |
17 getParameters <- function() { | |
18 con <- file("inputs.json", "r") | |
19 lines <- readLines(con) | |
20 close(con) | |
21 | |
22 json_string <- paste(lines, collapse = "\n") | |
23 json_data <- fromJSON(json_string) | |
24 | |
25 # Remove NULL values from json_data | |
26 cleaned_json_data <- remove_null_values(json_data) | |
27 return(cleaned_json_data$conditional_process) | |
28 } | |
29 | |
30 parseResponseBody <- function(body) { | |
31 hex <- c(body) | |
32 intValues <- as.integer(hex) | |
33 rawVector <- as.raw(intValues) | |
34 readableOutput <- rawToChar(rawVector) | |
35 jsonObject <- jsonlite::fromJSON(readableOutput) | |
36 return(jsonObject) | |
37 } | |
38 | |
39 getOutputs <- function(inputs, output, server) { | |
40 url <- | |
41 paste(paste(server, "/processes/", sep = ""), | |
42 inputs$select_process, | |
43 sep = "") | |
44 request <- request(url) | |
45 response <- req_perform(request) | |
46 responseBody <- parseResponseBody(response$body) | |
47 outputs <- list() | |
48 | |
49 for (x in 1:length(responseBody$outputs)) { | |
50 outputformatName <- | |
51 paste(names(responseBody$outputs[x]), "_outformat", sep = "") | |
52 output_item <- list() | |
53 | |
54 for (p in names(inputs)) { | |
55 if (p == outputformatName) { | |
56 format <- list("mediaType" = inputs[[outputformatName]]) | |
57 output_item$format <- format | |
58 } | |
59 } | |
60 output_item$transmissionMode <- "reference" | |
61 outputs[[x]] <- output_item | |
62 } | |
63 | |
64 names(outputs) <- names(responseBody$outputs) | |
65 return(outputs) | |
66 } | |
67 | |
68 executeProcess <- function(url, process, requestBodyData) { | |
69 url <- | |
70 paste(paste(paste(url, "processes/", sep = ""), process, sep = ""), "/execution", sep = "") | |
71 requestBodyData$inputs$select_process <- NULL | |
72 | |
73 body <- list() | |
74 body$inputs <- requestBodyData$inputs | |
75 | |
76 response <- request(url) %>% | |
77 req_headers("Content-Type" = "application/json", | |
78 "Prefer" = "respond-async") %>% | |
79 req_body_json(body) %>% | |
80 req_perform() | |
81 | |
82 cat("\n Process executed") | |
83 cat("\n status: ", response$status_code) | |
84 #if ( process == "barplot-trend-results") { | |
85 # process = "batplot-trend-results" | |
86 #} | |
87 #href <- parseResponseBody(response$body)$outputs[[gsub("-", "_", process)]]$href | |
88 jobId <- parseResponseBody(response$body)$jobID | |
89 | |
90 return(jobId) | |
91 } | |
92 | |
93 checkJobStatus <- function(server, process, jobID) { | |
94 url <- paste0(server, "jobs/", jobID) | |
95 response <- request(url) %>% | |
96 req_perform() | |
97 jobStatus <- parseResponseBody(response$body)$status | |
98 jobProgress <- parseResponseBody(response$body)$progress | |
99 return(jobStatus) | |
100 } | |
101 | |
102 getStatusCode <- function(server, process, jobID) { | |
103 url <- paste0(server, "jobs/", jobID) | |
104 print(url) | |
105 response <- request(url) %>% | |
106 req_perform() | |
107 status_code <- response$status_code | |
108 return(status_code) | |
109 } | |
110 | |
111 getResult <- function (server, process, jobID) { | |
112 response <- | |
113 request(paste0(server, "jobs/", jobID, "/results?f=json")) %>% | |
114 req_perform() | |
115 return(response) | |
116 } | |
117 | |
118 # Recursive function to search for href in a nested list | |
119 findHref <- function(obj) { | |
120 hrefs <- c() # Initialize an empty vector to store hrefs | |
121 | |
122 if (is.list(obj)) { | |
123 # If the object is a list, loop through its elements | |
124 for (name in names(obj)) { | |
125 element <- obj[[name]] | |
126 | |
127 if (is.list(element)) { | |
128 # Recursively search if the element is another list | |
129 hrefs <- c(hrefs, findHref(element)) | |
130 } else if (name == "href") { | |
131 # If the element has a name "href", capture its value | |
132 hrefs <- c(hrefs, element) | |
133 } | |
134 } | |
135 } | |
136 return(hrefs) | |
137 } | |
138 | |
139 retrieveResults <- function(server, process, jobID, outputData) { | |
140 status_code <- getStatusCode(server, process, jobID) | |
141 print(status_code) | |
142 | |
143 if (status_code == 200) { | |
144 status <- "running" | |
145 | |
146 while (status == "running") { | |
147 jobStatus <- checkJobStatus(server, process, jobID) | |
148 print(jobStatus) | |
149 | |
150 if (jobStatus == "successful") { | |
151 status <- jobStatus | |
152 result <- getResult(server, process, jobID) | |
153 | |
154 if (result$status_code == 200) { | |
155 resultBody <- parseResponseBody(result$body) | |
156 print(resultBody) | |
157 | |
158 # Call the recursive function to find all hrefs | |
159 hrefs <- findHref(resultBody) | |
160 | |
161 if (length(hrefs) > 0) { | |
162 # Collapse the URLs with a newline | |
163 urls_with_newline <- paste(hrefs, collapse = "\n") | |
164 print(urls_with_newline) | |
165 | |
166 # Write the URLs to a file | |
167 con <- file(outputData, "w") | |
168 writeLines(urls_with_newline, con = con) | |
169 close(con) | |
170 } else { | |
171 print("No hrefs found.") | |
172 } | |
173 } | |
174 } else if (jobStatus == "failed") { | |
175 status <- jobStatus | |
176 } | |
177 Sys.sleep(3) | |
178 } | |
179 | |
180 cat("\n done \n") | |
181 | |
182 } else if (status_code1 == 400) { | |
183 print("A query parameter has an invalid value.") | |
184 } else if (status_code1 == 404) { | |
185 print("The requested URI was not found.") | |
186 } else if (status_code1 == 500) { | |
187 print("The requested URI was not found.") | |
188 } else { | |
189 print(paste("HTTP", status_code1, "Error:", resp1$status_message)) | |
190 } | |
191 } | |
192 | |
193 | |
194 | |
195 saveResult <- function(href, outputData) { | |
196 con <- file(outputData, "w") | |
197 writeLines(href, con = con) | |
198 close(con) | |
199 } | |
200 | |
201 is_url <- function(x) { | |
202 grepl("^https?://", x) | |
203 } | |
204 | |
205 server <- "https://aqua.igb-berlin.de/pygeoapi-dev/" | |
206 | |
207 print("--> Retrieve parameters") | |
208 inputParameters <- getParameters() | |
209 #print(inputParameters) | |
210 print("--> Parameters retrieved") | |
211 | |
212 args <- commandArgs(trailingOnly = TRUE) | |
213 outputLocation <- args[2] | |
214 | |
215 print("--> Retrieve outputs") | |
216 outputs <- getOutputs(inputParameters, outputLocation, server) | |
217 print("--> Outputs retrieved") | |
218 | |
219 print("--> Parse inputs") | |
220 convertedKeys <- c() | |
221 for (key in names(inputParameters)) { | |
222 if (is.character(inputParameters[[key]]) && | |
223 (endsWith(inputParameters[[key]], ".dat") || | |
224 endsWith(inputParameters[[key]], ".txt"))) { | |
225 con <- file(inputParameters[[key]], "r") | |
226 url_list <- list() | |
227 #while (length(line <- readLines(con, n = 1)) > 0) { | |
228 # if (is_url(line)) { | |
229 # url_list <- c(url_list, list(list(href = trimws(line)))) | |
230 # } | |
231 #} | |
232 con <- file(inputParameters[[key]], "r") | |
233 lines <- readLines(con) | |
234 print("--------------------------------------------------------------------1") | |
235 print(length(lines)) | |
236 close(con) | |
237 if (!length(lines) > 1 && endsWith(lines, ".jp2") && startsWith(lines, "https")) { | |
238 print("--------------------------------------------------------------------2") | |
239 tmp <- list() | |
240 tmp$href <- lines | |
241 tmp$type <- "image/jp2" | |
242 inputParameters[[key]] <- tmp | |
243 } | |
244 else if (!length(lines) > 1 && endsWith(lines, ".zip") && startsWith(lines, "https")) { | |
245 print("--------------------------------------------------------------------3") | |
246 json_string <- paste(lines, collapse = "\n") | |
247 inputParameters[[key]] <- json_string | |
248 } else if (!length(lines) > 1 && (endsWith(lines, ".xlsx") || endsWith(lines, ".csv") || grepl("f=csv", lines)) && startsWith(lines, "https")) { | |
249 print("--------------------------------------------------------------------4") | |
250 json_string <- paste(lines, collapse = "\n") | |
251 inputParameters[[key]] <- json_string | |
252 } else if (inputParameters$select_process == "plot-image" || | |
253 inputParameters$select_process == "reproject-image") { | |
254 print("--------------------------------------------------------------------5") | |
255 tmp <- list() | |
256 tmp$href <- lines | |
257 tmp$type <- "image/tiff; application=geotiff" | |
258 if (inputParameters$select_process == "reproject-image") { | |
259 tmp$type <- "image/tiff; subtype=geotiff" | |
260 } | |
261 inputParameters[[key]] <- tmp | |
262 } else { | |
263 print("-----------------------------------6") | |
264 json_string <- paste(lines, collapse = "\n") | |
265 json_data <- fromJSON(json_string) | |
266 inputParameters[[key]] <- json_data | |
267 } | |
268 convertedKeys <- append(convertedKeys, key) | |
269 } | |
270 else if (grepl("_Array_", key)) { | |
271 keyParts <- strsplit(key, split = "_")[[1]] | |
272 type <- keyParts[length(keyParts)] | |
273 values <- inputParameters[[key]] | |
274 value_list <- strsplit(values, split = ",") | |
275 convertedValues <- c() | |
276 | |
277 for (value in value_list) { | |
278 if (type == "integer") { | |
279 value <- as.integer(value) | |
280 } else if (type == "numeric") { | |
281 value <- as.numeric(value) | |
282 } else if (type == "character") { | |
283 value <- as.character(value) | |
284 } | |
285 convertedValues <- append(convertedValues, value) | |
286 | |
287 convertedKey <- "" | |
288 for (part in keyParts) { | |
289 if (part == "Array") { | |
290 break | |
291 } | |
292 convertedKey <- | |
293 paste(convertedKey, paste(part, "_", sep = ""), sep = "") | |
294 } | |
295 convertedKey <- substr(convertedKey, 1, nchar(convertedKey) - 1) | |
296 } | |
297 | |
298 inputParameters[[key]] <- convertedValues | |
299 print("-------------------------") | |
300 print(convertedValues) | |
301 print("-------------------------") | |
302 convertedKeys <- append(convertedKeys, convertedKey) | |
303 } else { | |
304 print("-------------------------") | |
305 print(key) | |
306 print(inputParameters[[key]]) | |
307 if (!is.null(inputParameters[[key]])) { | |
308 convertedKeys <- append(convertedKeys, key) | |
309 } | |
310 print("-------------------------") | |
311 | |
312 } | |
313 } | |
314 print(inputParameters) | |
315 names(inputParameters) <- convertedKeys | |
316 #print(inputParameters) | |
317 print("--> Inputs parsed") | |
318 | |
319 print("--> Prepare process execution") | |
320 jsonData <- list("inputs" = inputParameters, | |
321 "outputs" = outputs) | |
322 | |
323 print("--> Execute process") | |
324 jobId <- executeProcess(server, inputParameters$select_process, jsonData) | |
325 print("--> Process executed") | |
326 | |
327 print("--> Retrieve results") | |
328 retrieveResults(server, inputParameters$select_process, jobId, outputLocation) | |
329 print("--> Results retrieved") |