comparison ogc_api_processes_wrapper.R @ 0:afede0916f0a draft default tip

planemo upload for repository https://github.com/galaxyecology/tools-ecology/tree/master/tools/ogc_api_processes_wrapper commit 89c188931ba43399013ebc741bc14365e53d418a
author ecology
date Fri, 07 Jun 2024 11:37:33 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:afede0916f0a
1 library("httr2")
2 library("jsonlite")
3 library("getopt")
4
5 cat("start generic wrapper service \n")
6
7 getParameters <- function(){
8 con <- file("inputs.json", "r")
9 lines <- readLines(con)
10 close(con)
11
12 json_string <- paste(lines, collapse = "\n")
13 json_data <- fromJSON(json_string)
14 return(json_data$conditional_process)
15 }
16
17 parseResponseBody <- function(body) {
18 hex <- c(body)
19 intValues <- as.integer(hex)
20 rawVector <- as.raw(intValues)
21 readableOutput <- rawToChar(rawVector)
22 jsonObject <- jsonlite::fromJSON(readableOutput)
23 return(jsonObject)
24 }
25
26 getOutputs <- function(inputs, output, server) {
27 url <- paste(paste(server, "/processes/", sep = ""), inputs$select_process, sep = "")
28 request <- request(url)
29 response <- req_perform(request)
30 responseBody <- parseResponseBody(response$body)
31 outputs <- list()
32
33 for (x in 1:length(responseBody$outputs)) {
34 outputformatName <- paste(names(responseBody$outputs[x]), "_outformat", sep="")
35 output_item <- list()
36
37 for (p in names(inputs)) {
38 if(p == outputformatName){
39 format <- list("mediaType" = inputs[[outputformatName]])
40 output_item$format <- format
41 }
42 }
43 output_item$transmissionMode <- "reference"
44 outputs[[x]] <- output_item
45 }
46
47 names(outputs) <- names(responseBody$outputs)
48 return(outputs)
49 }
50
51 executeProcess <- function(url, process, requestBodyData, cookie) {
52 url <- paste(paste(paste(url, "processes/", sep = ""), process, sep = ""), "/execution", sep = "")
53 requestBodyData$inputs$cookie <- NULL
54 requestBodyData$inputs$select_process <- NULL
55
56 requestBodyData$inputs$s3_access_key <- requestBodyData$inputs$user_credentials$s3_access_key
57 requestBodyData$inputs$s3_secret_key <- requestBodyData$inputs$user_credentials$s3_secret_key
58 requestBodyData$inputs$user_credentials <- NULL
59
60 body <- list()
61 body$inputs <- requestBodyData$inputs
62 body$mode <- "async"
63 body$response <- "document"
64
65 response <- request(url) %>%
66 req_headers(
67 "Accept" = "application/json",
68 "Content-Type" = "application/json",
69 "Cookie" = cookie
70 ) %>%
71 req_body_json(body) %>%
72 req_perform()
73
74 cat("\n Process executed")
75 cat("\n status: ", response$status_code)
76 cat("\n jobID: ", parseResponseBody(response$body)$jobID, "\n")
77
78 jobID <- parseResponseBody(response$body)$jobID
79
80 return(jobID)
81 }
82
83 checkJobStatus <- function(server, process, jobID, cookie) {
84 url <- paste0(server, "processes/", process, "/jobs/", jobID)
85 response <- request(url) %>%
86 req_headers(
87 "Cookie" = cookie
88 ) %>%
89 req_perform()
90 jobStatus <- parseResponseBody(response$body)$status
91 jobProgress <- parseResponseBody(response$body)$progress
92 return(jobStatus)
93 }
94
95 getStatusCode <- function(server, process, jobID, cookie) {
96 url <- paste0(server, "processes/", process, "/jobs/", jobID)
97 response <- request(url) %>%
98 req_headers(
99 "Cookie" = cookie
100 ) %>%
101 req_perform()
102 status_code <- response$status_code
103 return(status_code)
104 }
105
106 getResult <- function (server, process, jobID, cookie) {
107 response <- request(paste0(server, "processes/", process, "/jobs/", jobID, "/results")) %>%
108 req_headers(
109 "Cookie" = cookie
110 ) %>%
111 req_perform()
112 return(response)
113 }
114
115 retrieveResults <- function(server, process, jobID, outputData, cookie) {
116 status_code <- getStatusCode(server, process, jobID, cookie)
117 if(status_code == 200){
118 status <- "running"
119 while(status == "running"){
120 jobStatus <- checkJobStatus(server, process, jobID, cookie)
121 print(jobStatus)
122 if (jobStatus == "succeeded") {
123 status <- jobStatus
124 result <- getResult(server, process, jobID, cookie)
125 if (result$status_code == 200) {
126 resultBody <- parseResponseBody(result$body)
127 urls <- unname(unlist(lapply(resultBody, function(x) x$href)))
128 urls_with_newline <- paste(urls, collapse = "\n")
129 con <- file(outputData, "w")
130 writeLines(urls_with_newline, con = con)
131 close(con)
132 }
133 } else if (jobStatus == "failed") {
134 status <- jobStatus
135 }
136 Sys.sleep(3)
137 }
138 cat("\n done \n")
139 } else if (status_code1 == 400) {
140 print("A query parameter has an invalid value.")
141 } else if (status_code1 == 404) {
142 print("The requested URI was not found.")
143 } else if (status_code1 == 500) {
144 print("The requested URI was not found.")
145 } else {
146 print(paste("HTTP", status_code1, "Error:", resp1$status_message))
147 }
148 }
149
150 is_url <- function(x) {
151 grepl("^https?://", x)
152 }
153
154 server <- "https://hirondelle.crim.ca/weaver/"
155
156 print("--> Retrieve parameters")
157 inputParameters <- getParameters()
158 print("--> Parameters retrieved")
159
160 args <- commandArgs(trailingOnly = TRUE)
161 outputLocation <- args[2]
162
163 print("--> Retrieve outputs")
164 outputs <- getOutputs(inputParameters, outputLocation, server)
165 print("--> Outputs retrieved")
166
167 print("--> Parse inputs")
168 convertedKeys <- c()
169 for (key in names(inputParameters)) {
170 if (is.character(inputParameters[[key]]) && (endsWith(inputParameters[[key]], ".dat") || endsWith(inputParameters[[key]], ".txt"))) {
171 con <- file(inputParameters[[key]], "r")
172 url_list <- list()
173 #while (length(line <- readLines(con, n = 1)) > 0) {
174 # if (is_url(line)) {
175 # url_list <- c(url_list, list(list(href = trimws(line))))
176 # }
177 #}
178 con <- file(inputParameters[[key]], "r")
179 lines <- readLines(con)
180 close(con)
181 json_string <- paste(lines, collapse = "\n")
182 json_data <- fromJSON(json_string)
183
184 inputParameters[[key]] <- json_data
185 convertedKeys <- append(convertedKeys, key)
186 }
187 else if (grepl("_Array_", key)) {
188 keyParts <- strsplit(key, split = "_")[[1]]
189 type <- keyParts[length(keyParts)]
190 values <- inputParameters[[key]]
191 value_list <- strsplit(values, split = ",")
192
193 convertedValues <- c()
194
195 for (value in value_list) {
196 if(type == "integer") {
197 value <- as.integer(value)
198 } else if (type == "numeric") {
199 value <- as.numeric(balue)
200 } else if (type == "character") {
201 value <- as.character(value)
202 }
203 convertedValues <- append(convertedValues, value)
204
205 convertedKey <- ""
206 for (part in keyParts) {
207 if(part == "Array") {
208 break
209 }
210 convertedKey <- paste(convertedKey, paste(part, "_", sep=""), sep="")
211 }
212 convertedKey <- substr(convertedKey, 1, nchar(convertedKey)-1)
213 }
214
215 inputParameters[[key]] <- convertedValues
216 convertedKeys <- append(convertedKeys, convertedKey)
217 } else {
218 convertedKeys <- append(convertedKeys, key)
219 }
220 }
221
222 names(inputParameters) <- convertedKeys
223 print("--> Inputs parsed")
224
225 print("--> Prepare process execution")
226 jsonData <- list(
227 "inputs" = inputParameters,
228 "outputs" = outputs
229 )
230
231 cookie <- inputParameters$cookie
232
233 print("--> Execute process")
234 jobID <- executeProcess(server, inputParameters$select_process, jsonData, cookie)
235 print("--> Process executed")
236
237 print("--> Retrieve results")
238 retrieveResults(server, inputParameters$select_process, jobID, outputLocation, cookie)
239 print("--> Results retrieved")