Mercurial > repos > ecology > zoo_project_ogc_api_processes
comparison ogc_api_processes_wrapper.R @ 0:f6288dd4b77a draft
planemo upload for repository https://github.com/galaxyecology/tools-ecology/tree/master/tools/ogc_api_processes_wrapper commit 4089d69de9c54df0930f55572e95bf3d22bf5e70
author | ecology |
---|---|
date | Sat, 04 May 2024 08:42:42 +0000 |
parents | |
children | bcb6009d5af7 |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:f6288dd4b77a |
---|---|
1 library("httr2") | |
2 library("jsonlite") | |
3 library("getopt") | |
4 | |
5 cat("start generic wrapper service \n") | |
6 | |
7 getParameters <- function(){ | |
8 args <- commandArgs(trailingOnly = TRUE) | |
9 con <- file("inputs.json", "r") | |
10 line <- readLines(con, n = 1) | |
11 json <- fromJSON(line) | |
12 close(con) | |
13 return(json$conditional_process) | |
14 } | |
15 | |
16 parseResponseBody <- function(body) { | |
17 hex <- c(body) | |
18 intValues <- as.integer(hex) | |
19 rawVector <- as.raw(intValues) | |
20 readableOutput <- rawToChar(rawVector) | |
21 jsonObject <- jsonlite::fromJSON(readableOutput) | |
22 return(jsonObject) | |
23 } | |
24 | |
25 getOutputs <- function(inputs, output, server) { | |
26 url <- paste(paste(server, "/processes/", sep = ""), inputs$select_process, sep = "") | |
27 request <- request(url) | |
28 response <- req_perform(request) | |
29 responseBody <- parseResponseBody(response$body) | |
30 outputs <- list() | |
31 | |
32 for (x in 1:length(responseBody$outputs)) { | |
33 outputformatName <- paste(names(responseBody$outputs[x]), "_outformat", sep="") | |
34 output_item <- list() | |
35 | |
36 for (p in names(inputs)) { | |
37 if(p == outputformatName){ | |
38 format <- list("mediaType" = inputs[[outputformatName]]) | |
39 output_item$format <- format | |
40 } | |
41 } | |
42 output_item$transmissionMode <- "reference" | |
43 outputs[[x]] <- output_item | |
44 } | |
45 | |
46 names(outputs) <- names(responseBody$outputs) | |
47 return(outputs) | |
48 } | |
49 | |
50 executeProcess <- function(url, process, requestBodyData, output) { | |
51 url <- paste(paste(paste(url, "processes/", sep = ""), process, sep = ""), "/execution", sep = "") | |
52 response <- request(url) %>% | |
53 req_headers( | |
54 "accept" = "/*", | |
55 "Prefer" = "respond-async;return=representation", | |
56 "Content-Type" = "application/json" | |
57 ) %>% | |
58 req_body_json(requestBodyData) %>% | |
59 req_perform() | |
60 | |
61 cat("\n Process executed") | |
62 cat("\n status: ", response$status_code) | |
63 cat("\n jobID: ", parseResponseBody(response$body)$jobID, "\n") | |
64 | |
65 jobID <- parseResponseBody(response$body)$jobID | |
66 | |
67 return(jobID) | |
68 } | |
69 | |
70 checkJobStatus <- function(server, jobID) { | |
71 response <- request(paste0(server, "jobs/", jobID)) %>% | |
72 req_headers( | |
73 'accept' = 'application/json' | |
74 ) %>% | |
75 req_perform() | |
76 jobStatus <- parseResponseBody(response$body)$status | |
77 jobProgress <- parseResponseBody(response$body)$progress | |
78 cat(paste0("\n status: ", jobStatus, ", progress: ", jobProgress)) | |
79 return(jobStatus) | |
80 } | |
81 | |
82 getStatusCode <- function(server, jobID) { | |
83 url <- paste0(server, "jobs/", jobID) | |
84 response <- request(url) %>% | |
85 req_headers( | |
86 'accept' = 'application/json' | |
87 ) %>% | |
88 req_perform() | |
89 return(response$status_code) | |
90 } | |
91 | |
92 getResult <- function (server, jobID) { | |
93 response <- request(paste0(server, "jobs/", jobID, "/results")) %>% | |
94 req_headers( | |
95 'accept' = 'application/json' | |
96 ) %>% | |
97 req_perform() | |
98 return(response) | |
99 } | |
100 | |
101 retrieveResults <- function(server, jobID, outputData) { | |
102 status_code <- getStatusCode(server, jobID) | |
103 if(status_code == 200){ | |
104 status <- "running" | |
105 cat(status) | |
106 while(status == "running"){ | |
107 jobStatus <- checkJobStatus(server, jobID) | |
108 if (jobStatus == "successful") { | |
109 status <- jobStatus | |
110 result <- getResult(server, jobID) | |
111 if (result$status_code == 200) { | |
112 resultBody <- parseResponseBody(result$body) | |
113 urls <- unname(unlist(lapply(resultBody, function(x) x$href))) | |
114 urls_with_newline <- paste(urls, collapse = "\n") | |
115 sink(paste0(outputData, "_result_urls.txt")) | |
116 cat(urls_with_newline, "\n") | |
117 sink() | |
118 } | |
119 } else if (jobStatus == "failed") { | |
120 status <- jobStatus | |
121 } | |
122 Sys.sleep(3) | |
123 } | |
124 cat("\n done \n") | |
125 } else if (status_code1 == 400) { | |
126 print("A query parameter has an invalid value.") | |
127 } else if (status_code1 == 404) { | |
128 print("The requested URI was not found.") | |
129 } else if (status_code1 == 500) { | |
130 print("The requested URI was not found.") | |
131 } else { | |
132 print(paste("HTTP", status_code1, "Error:", resp1$status_message)) | |
133 } | |
134 } | |
135 | |
136 is_url <- function(x) { | |
137 grepl("^https?://", x) | |
138 } | |
139 | |
140 server <- "https://ospd.geolabs.fr:8300/ogc-api/" | |
141 | |
142 inputParameters <- getParameters() | |
143 | |
144 outputLocation <- inputParameters$outputData | |
145 | |
146 outputs <- getOutputs(inputParameters, outputLocation, server) | |
147 | |
148 for (key in names(inputParameters)) { | |
149 print(inputParameters[[key]]) | |
150 if (is.character(inputParameters[[key]]) && (endsWith(inputParameters[[key]], ".dat") || endsWith(inputParameters[[key]], ".txt"))) { | |
151 con <- file(inputParameters[[key]], "r") | |
152 url_list <- list() | |
153 while (length(line <- readLines(con, n = 1)) > 0) { | |
154 if (is_url(line)) { | |
155 url_list <- c(url_list, list(list(href = trimws(line)))) | |
156 } | |
157 } | |
158 close(con) | |
159 inputParameters[[key]] <- url_list | |
160 } | |
161 } | |
162 | |
163 jsonData <- list( | |
164 "inputs" = inputParameters, | |
165 "outputs" = outputs | |
166 ) | |
167 | |
168 jobID <- executeProcess(server, inputParameters$select_process, jsonData, outputLocation) | |
169 | |
170 retrieveResults(server, jobID, outputLocation) |