comparison ogc_api_processes_wrapper.R @ 0:f6288dd4b77a draft

planemo upload for repository https://github.com/galaxyecology/tools-ecology/tree/master/tools/ogc_api_processes_wrapper commit 4089d69de9c54df0930f55572e95bf3d22bf5e70
author ecology
date Sat, 04 May 2024 08:42:42 +0000
parents
children bcb6009d5af7
comparison
equal deleted inserted replaced
-1:000000000000 0:f6288dd4b77a
1 library("httr2")
2 library("jsonlite")
3 library("getopt")
4
5 cat("start generic wrapper service \n")
6
7 getParameters <- function(){
8 args <- commandArgs(trailingOnly = TRUE)
9 con <- file("inputs.json", "r")
10 line <- readLines(con, n = 1)
11 json <- fromJSON(line)
12 close(con)
13 return(json$conditional_process)
14 }
15
16 parseResponseBody <- function(body) {
17 hex <- c(body)
18 intValues <- as.integer(hex)
19 rawVector <- as.raw(intValues)
20 readableOutput <- rawToChar(rawVector)
21 jsonObject <- jsonlite::fromJSON(readableOutput)
22 return(jsonObject)
23 }
24
25 getOutputs <- function(inputs, output, server) {
26 url <- paste(paste(server, "/processes/", sep = ""), inputs$select_process, sep = "")
27 request <- request(url)
28 response <- req_perform(request)
29 responseBody <- parseResponseBody(response$body)
30 outputs <- list()
31
32 for (x in 1:length(responseBody$outputs)) {
33 outputformatName <- paste(names(responseBody$outputs[x]), "_outformat", sep="")
34 output_item <- list()
35
36 for (p in names(inputs)) {
37 if(p == outputformatName){
38 format <- list("mediaType" = inputs[[outputformatName]])
39 output_item$format <- format
40 }
41 }
42 output_item$transmissionMode <- "reference"
43 outputs[[x]] <- output_item
44 }
45
46 names(outputs) <- names(responseBody$outputs)
47 return(outputs)
48 }
49
50 executeProcess <- function(url, process, requestBodyData, output) {
51 url <- paste(paste(paste(url, "processes/", sep = ""), process, sep = ""), "/execution", sep = "")
52 response <- request(url) %>%
53 req_headers(
54 "accept" = "/*",
55 "Prefer" = "respond-async;return=representation",
56 "Content-Type" = "application/json"
57 ) %>%
58 req_body_json(requestBodyData) %>%
59 req_perform()
60
61 cat("\n Process executed")
62 cat("\n status: ", response$status_code)
63 cat("\n jobID: ", parseResponseBody(response$body)$jobID, "\n")
64
65 jobID <- parseResponseBody(response$body)$jobID
66
67 return(jobID)
68 }
69
70 checkJobStatus <- function(server, jobID) {
71 response <- request(paste0(server, "jobs/", jobID)) %>%
72 req_headers(
73 'accept' = 'application/json'
74 ) %>%
75 req_perform()
76 jobStatus <- parseResponseBody(response$body)$status
77 jobProgress <- parseResponseBody(response$body)$progress
78 cat(paste0("\n status: ", jobStatus, ", progress: ", jobProgress))
79 return(jobStatus)
80 }
81
82 getStatusCode <- function(server, jobID) {
83 url <- paste0(server, "jobs/", jobID)
84 response <- request(url) %>%
85 req_headers(
86 'accept' = 'application/json'
87 ) %>%
88 req_perform()
89 return(response$status_code)
90 }
91
92 getResult <- function (server, jobID) {
93 response <- request(paste0(server, "jobs/", jobID, "/results")) %>%
94 req_headers(
95 'accept' = 'application/json'
96 ) %>%
97 req_perform()
98 return(response)
99 }
100
101 retrieveResults <- function(server, jobID, outputData) {
102 status_code <- getStatusCode(server, jobID)
103 if(status_code == 200){
104 status <- "running"
105 cat(status)
106 while(status == "running"){
107 jobStatus <- checkJobStatus(server, jobID)
108 if (jobStatus == "successful") {
109 status <- jobStatus
110 result <- getResult(server, jobID)
111 if (result$status_code == 200) {
112 resultBody <- parseResponseBody(result$body)
113 urls <- unname(unlist(lapply(resultBody, function(x) x$href)))
114 urls_with_newline <- paste(urls, collapse = "\n")
115 sink(paste0(outputData, "_result_urls.txt"))
116 cat(urls_with_newline, "\n")
117 sink()
118 }
119 } else if (jobStatus == "failed") {
120 status <- jobStatus
121 }
122 Sys.sleep(3)
123 }
124 cat("\n done \n")
125 } else if (status_code1 == 400) {
126 print("A query parameter has an invalid value.")
127 } else if (status_code1 == 404) {
128 print("The requested URI was not found.")
129 } else if (status_code1 == 500) {
130 print("The requested URI was not found.")
131 } else {
132 print(paste("HTTP", status_code1, "Error:", resp1$status_message))
133 }
134 }
135
136 is_url <- function(x) {
137 grepl("^https?://", x)
138 }
139
140 server <- "https://ospd.geolabs.fr:8300/ogc-api/"
141
142 inputParameters <- getParameters()
143
144 outputLocation <- inputParameters$outputData
145
146 outputs <- getOutputs(inputParameters, outputLocation, server)
147
148 for (key in names(inputParameters)) {
149 print(inputParameters[[key]])
150 if (is.character(inputParameters[[key]]) && (endsWith(inputParameters[[key]], ".dat") || endsWith(inputParameters[[key]], ".txt"))) {
151 con <- file(inputParameters[[key]], "r")
152 url_list <- list()
153 while (length(line <- readLines(con, n = 1)) > 0) {
154 if (is_url(line)) {
155 url_list <- c(url_list, list(list(href = trimws(line))))
156 }
157 }
158 close(con)
159 inputParameters[[key]] <- url_list
160 }
161 }
162
163 jsonData <- list(
164 "inputs" = inputParameters,
165 "outputs" = outputs
166 )
167
168 jobID <- executeProcess(server, inputParameters$select_process, jsonData, outputLocation)
169
170 retrieveResults(server, jobID, outputLocation)