Mercurial > repos > ecology > hirondelle_crim
comparison ogc_api_processes_wrapper.R @ 0:afede0916f0a draft
planemo upload for repository https://github.com/galaxyecology/tools-ecology/tree/master/tools/ogc_api_processes_wrapper commit 89c188931ba43399013ebc741bc14365e53d418a
author | ecology |
---|---|
date | Fri, 07 Jun 2024 11:37:33 +0000 |
parents | |
children | 52baddd15640 |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:afede0916f0a |
---|---|
1 library("httr2") | |
2 library("jsonlite") | |
3 library("getopt") | |
4 | |
5 cat("start generic wrapper service \n") | |
6 | |
7 getParameters <- function(){ | |
8 con <- file("inputs.json", "r") | |
9 lines <- readLines(con) | |
10 close(con) | |
11 | |
12 json_string <- paste(lines, collapse = "\n") | |
13 json_data <- fromJSON(json_string) | |
14 return(json_data$conditional_process) | |
15 } | |
16 | |
17 parseResponseBody <- function(body) { | |
18 hex <- c(body) | |
19 intValues <- as.integer(hex) | |
20 rawVector <- as.raw(intValues) | |
21 readableOutput <- rawToChar(rawVector) | |
22 jsonObject <- jsonlite::fromJSON(readableOutput) | |
23 return(jsonObject) | |
24 } | |
25 | |
26 getOutputs <- function(inputs, output, server) { | |
27 url <- paste(paste(server, "/processes/", sep = ""), inputs$select_process, sep = "") | |
28 request <- request(url) | |
29 response <- req_perform(request) | |
30 responseBody <- parseResponseBody(response$body) | |
31 outputs <- list() | |
32 | |
33 for (x in 1:length(responseBody$outputs)) { | |
34 outputformatName <- paste(names(responseBody$outputs[x]), "_outformat", sep="") | |
35 output_item <- list() | |
36 | |
37 for (p in names(inputs)) { | |
38 if(p == outputformatName){ | |
39 format <- list("mediaType" = inputs[[outputformatName]]) | |
40 output_item$format <- format | |
41 } | |
42 } | |
43 output_item$transmissionMode <- "reference" | |
44 outputs[[x]] <- output_item | |
45 } | |
46 | |
47 names(outputs) <- names(responseBody$outputs) | |
48 return(outputs) | |
49 } | |
50 | |
51 executeProcess <- function(url, process, requestBodyData, cookie) { | |
52 url <- paste(paste(paste(url, "processes/", sep = ""), process, sep = ""), "/execution", sep = "") | |
53 requestBodyData$inputs$cookie <- NULL | |
54 requestBodyData$inputs$select_process <- NULL | |
55 | |
56 requestBodyData$inputs$s3_access_key <- requestBodyData$inputs$user_credentials$s3_access_key | |
57 requestBodyData$inputs$s3_secret_key <- requestBodyData$inputs$user_credentials$s3_secret_key | |
58 requestBodyData$inputs$user_credentials <- NULL | |
59 | |
60 body <- list() | |
61 body$inputs <- requestBodyData$inputs | |
62 body$mode <- "async" | |
63 body$response <- "document" | |
64 | |
65 response <- request(url) %>% | |
66 req_headers( | |
67 "Accept" = "application/json", | |
68 "Content-Type" = "application/json", | |
69 "Cookie" = cookie | |
70 ) %>% | |
71 req_body_json(body) %>% | |
72 req_perform() | |
73 | |
74 cat("\n Process executed") | |
75 cat("\n status: ", response$status_code) | |
76 cat("\n jobID: ", parseResponseBody(response$body)$jobID, "\n") | |
77 | |
78 jobID <- parseResponseBody(response$body)$jobID | |
79 | |
80 return(jobID) | |
81 } | |
82 | |
83 checkJobStatus <- function(server, process, jobID, cookie) { | |
84 url <- paste0(server, "processes/", process, "/jobs/", jobID) | |
85 response <- request(url) %>% | |
86 req_headers( | |
87 "Cookie" = cookie | |
88 ) %>% | |
89 req_perform() | |
90 jobStatus <- parseResponseBody(response$body)$status | |
91 jobProgress <- parseResponseBody(response$body)$progress | |
92 return(jobStatus) | |
93 } | |
94 | |
95 getStatusCode <- function(server, process, jobID, cookie) { | |
96 url <- paste0(server, "processes/", process, "/jobs/", jobID) | |
97 response <- request(url) %>% | |
98 req_headers( | |
99 "Cookie" = cookie | |
100 ) %>% | |
101 req_perform() | |
102 status_code <- response$status_code | |
103 return(status_code) | |
104 } | |
105 | |
106 getResult <- function (server, process, jobID, cookie) { | |
107 response <- request(paste0(server, "processes/", process, "/jobs/", jobID, "/results")) %>% | |
108 req_headers( | |
109 "Cookie" = cookie | |
110 ) %>% | |
111 req_perform() | |
112 return(response) | |
113 } | |
114 | |
115 retrieveResults <- function(server, process, jobID, outputData, cookie) { | |
116 status_code <- getStatusCode(server, process, jobID, cookie) | |
117 if(status_code == 200){ | |
118 status <- "running" | |
119 while(status == "running"){ | |
120 jobStatus <- checkJobStatus(server, process, jobID, cookie) | |
121 print(jobStatus) | |
122 if (jobStatus == "succeeded") { | |
123 status <- jobStatus | |
124 result <- getResult(server, process, jobID, cookie) | |
125 if (result$status_code == 200) { | |
126 resultBody <- parseResponseBody(result$body) | |
127 urls <- unname(unlist(lapply(resultBody, function(x) x$href))) | |
128 urls_with_newline <- paste(urls, collapse = "\n") | |
129 con <- file(outputData, "w") | |
130 writeLines(urls_with_newline, con = con) | |
131 close(con) | |
132 } | |
133 } else if (jobStatus == "failed") { | |
134 status <- jobStatus | |
135 } | |
136 Sys.sleep(3) | |
137 } | |
138 cat("\n done \n") | |
139 } else if (status_code1 == 400) { | |
140 print("A query parameter has an invalid value.") | |
141 } else if (status_code1 == 404) { | |
142 print("The requested URI was not found.") | |
143 } else if (status_code1 == 500) { | |
144 print("The requested URI was not found.") | |
145 } else { | |
146 print(paste("HTTP", status_code1, "Error:", resp1$status_message)) | |
147 } | |
148 } | |
149 | |
150 is_url <- function(x) { | |
151 grepl("^https?://", x) | |
152 } | |
153 | |
154 server <- "https://hirondelle.crim.ca/weaver/" | |
155 | |
156 print("--> Retrieve parameters") | |
157 inputParameters <- getParameters() | |
158 print("--> Parameters retrieved") | |
159 | |
160 args <- commandArgs(trailingOnly = TRUE) | |
161 outputLocation <- args[2] | |
162 | |
163 print("--> Retrieve outputs") | |
164 outputs <- getOutputs(inputParameters, outputLocation, server) | |
165 print("--> Outputs retrieved") | |
166 | |
167 print("--> Parse inputs") | |
168 convertedKeys <- c() | |
169 for (key in names(inputParameters)) { | |
170 if (is.character(inputParameters[[key]]) && (endsWith(inputParameters[[key]], ".dat") || endsWith(inputParameters[[key]], ".txt"))) { | |
171 con <- file(inputParameters[[key]], "r") | |
172 url_list <- list() | |
173 #while (length(line <- readLines(con, n = 1)) > 0) { | |
174 # if (is_url(line)) { | |
175 # url_list <- c(url_list, list(list(href = trimws(line)))) | |
176 # } | |
177 #} | |
178 con <- file(inputParameters[[key]], "r") | |
179 lines <- readLines(con) | |
180 close(con) | |
181 json_string <- paste(lines, collapse = "\n") | |
182 json_data <- fromJSON(json_string) | |
183 | |
184 inputParameters[[key]] <- json_data | |
185 convertedKeys <- append(convertedKeys, key) | |
186 } | |
187 else if (grepl("_Array_", key)) { | |
188 keyParts <- strsplit(key, split = "_")[[1]] | |
189 type <- keyParts[length(keyParts)] | |
190 values <- inputParameters[[key]] | |
191 value_list <- strsplit(values, split = ",") | |
192 | |
193 convertedValues <- c() | |
194 | |
195 for (value in value_list) { | |
196 if(type == "integer") { | |
197 value <- as.integer(value) | |
198 } else if (type == "numeric") { | |
199 value <- as.numeric(balue) | |
200 } else if (type == "character") { | |
201 value <- as.character(value) | |
202 } | |
203 convertedValues <- append(convertedValues, value) | |
204 | |
205 convertedKey <- "" | |
206 for (part in keyParts) { | |
207 if(part == "Array") { | |
208 break | |
209 } | |
210 convertedKey <- paste(convertedKey, paste(part, "_", sep=""), sep="") | |
211 } | |
212 convertedKey <- substr(convertedKey, 1, nchar(convertedKey)-1) | |
213 } | |
214 | |
215 inputParameters[[key]] <- convertedValues | |
216 convertedKeys <- append(convertedKeys, convertedKey) | |
217 } else { | |
218 convertedKeys <- append(convertedKeys, key) | |
219 } | |
220 } | |
221 | |
222 names(inputParameters) <- convertedKeys | |
223 print("--> Inputs parsed") | |
224 | |
225 print("--> Prepare process execution") | |
226 jsonData <- list( | |
227 "inputs" = inputParameters, | |
228 "outputs" = outputs | |
229 ) | |
230 | |
231 cookie <- inputParameters$cookie | |
232 | |
233 print("--> Execute process") | |
234 jobID <- executeProcess(server, inputParameters$select_process, jsonData, cookie) | |
235 print("--> Process executed") | |
236 | |
237 print("--> Retrieve results") | |
238 retrieveResults(server, inputParameters$select_process, jobID, outputLocation, cookie) | |
239 print("--> Results retrieved") |