Mercurial > repos > geco-team > gmql_queries_editor
comparison gmql_rest_queries.py @ 0:c74a1c7121ec draft default tip
planemo upload for repository https://github.com/lu-brn/gmql-galaxy commit 953ee36ceda5814dc9baa03427bc0eb4ee2e93bd-dirty
author | geco-team |
---|---|
date | Tue, 26 Jun 2018 08:59:49 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:c74a1c7121ec |
---|---|
1 # Galaxy plugin to REST access to the GMQL services | |
2 # (Queries) | |
3 # ---------------------------------------------------------------------------- | |
4 # Luana Brancato, luana.brancato@mail.polimi.it | |
5 # ---------------------------------------------------------------------------- | |
6 | |
7 import argparse | |
8 from time import sleep | |
9 | |
10 from gmql_rest_datasets import list_datasets, import_samples | |
11 from utilities import * | |
12 | |
13 module_execution = 'query_exec' | |
14 module_monitor = 'query_monitor' | |
15 | |
16 | |
17 def check_input(query): | |
18 | |
19 # Clean the input from Galaxy escape characters. | |
20 | |
21 query = query.replace('__dq__', '"') | |
22 query = query.replace('__sq__', "'") | |
23 query = query.replace('__gt__', ">") | |
24 query = query.replace('__lt__', "<") | |
25 query = query.replace('__cn__', '\n') | |
26 | |
27 | |
28 return query | |
29 | |
30 | |
31 def compile_query(user, filename, query, log_file): | |
32 """Compile the given query""" | |
33 | |
34 call = 'compile' | |
35 | |
36 #Read query from file | |
37 with open(query, 'r') as f_in: | |
38 query_text = f_in.read() | |
39 | |
40 #Check the input | |
41 query_cl = check_input(query_text) | |
42 | |
43 | |
44 # Then ask it to be compiled | |
45 url = compose_url(module_execution, call) | |
46 | |
47 outcome = post(url, query_cl, user=user, content_type='text') | |
48 | |
49 status = outcome['status'] | |
50 message = outcome['message'] | |
51 target_ds = outcome['id'] | |
52 | |
53 if status == 'COMPILE_SUCCESS': | |
54 with open(log_file, 'w') as f: | |
55 f.write("{status}\n{dataset}".format(status=status, dataset=target_ds)) | |
56 f.close() | |
57 if status == 'COMPILE_FAILED': | |
58 with open(log_file, 'w') as f: | |
59 f.write("{status}\n{message}".format(status=status, message=message)) | |
60 f.close() | |
61 stop_err("Compilation failed.\nSee log for details.") | |
62 | |
63 | |
64 def run_query(user, filename, query, log_file, rs_format, importResult=True): | |
65 """Run the given query. It returns an execution log and the resulting dataset.""" | |
66 | |
67 | |
68 call = 'run' | |
69 | |
70 # Read query from file | |
71 with open(query, 'r') as f_in: | |
72 query_text = f_in.read() | |
73 | |
74 # Check the input | |
75 query_cl = check_input(query_text) | |
76 | |
77 # Then ask it to be executed | |
78 | |
79 status = "NEW" | |
80 | |
81 url = compose_url(module_execution, call) | |
82 url = url.format(name=filename,output=rs_format) | |
83 | |
84 outcome = post(url, query_cl, user=user, content_type='text') | |
85 | |
86 jobid = outcome['id'] | |
87 | |
88 while status != "SUCCESS" and status != "EXEC_FAILED" and status != "DS_CREATION_FAILED": | |
89 log = read_status(user, jobid) | |
90 status = log['status'] | |
91 sleep(5) | |
92 | |
93 message = log['message'] | |
94 time = log['executionTime'] | |
95 | |
96 if status == "EXEC_FAILED" or status == "DS_CREATION_FAILED": | |
97 with open(log_file, 'w') as f: | |
98 f.write("{status}\n{message}\n{execTime}".format(status=status, message=message, execTime=time)) | |
99 f.close() | |
100 stop_err("Execution failed.\nSee log for details") | |
101 | |
102 if status == "SUCCESS": | |
103 ext_log = read_complete_log(user, jobid) | |
104 job_list = ext_log['log'] | |
105 jobs = "" | |
106 for j in job_list: | |
107 jobs = "{j_list}{j}\n".format(j_list=jobs, j=j) | |
108 | |
109 with open(log_file, 'w') as f: | |
110 f.write("{status}\n" | |
111 "{message}\n" | |
112 "{execTime}\n" | |
113 "\n" | |
114 "{jobs}\n".format(status=status, message=message, execTime=time, jobs=jobs)) | |
115 f.close() | |
116 | |
117 importResult = bool(importResult) | |
118 | |
119 if importResult: | |
120 # For now, it gets only the final result (it's easier to deal later with simple collections | |
121 # than a nested ones) | |
122 | |
123 ds = log['datasets'][-1] | |
124 ds_name = ds.get('name') | |
125 import_samples(user, ds_name) | |
126 | |
127 | |
128 def read_status(user, jobid): | |
129 """Given the job id, it retrieves the status of the current operation | |
130 (as a JSON file)""" | |
131 | |
132 call = 'status' | |
133 | |
134 url = compose_url(module_monitor, call) | |
135 url = url.format(jobid=jobid) | |
136 | |
137 status = get(url, user=user, response_type='json') | |
138 | |
139 return status | |
140 | |
141 | |
142 | |
143 def read_complete_log(user, jobid): | |
144 """Given the jobid, it retrieves the complete log of the latest operation | |
145 (as a JSON file)""" | |
146 | |
147 call = 'log' | |
148 | |
149 url = compose_url(module_monitor, call) | |
150 url = url.format(jobid=jobid) | |
151 | |
152 log = get(url, user=user, response_type='json') | |
153 | |
154 return log | |
155 | |
156 | |
157 def show_jobs(user, output): | |
158 """Retrieve the list of the user's jobs""" | |
159 | |
160 call = 'jobs' | |
161 | |
162 url = compose_url(module_monitor, call) | |
163 | |
164 jobs = get(url, user=user, response_type='json') | |
165 | |
166 jobs_list = jobs['jobs'] | |
167 jobs_out = list() | |
168 | |
169 # For each job in the list retrieve the relative status info | |
170 for j in jobs_list: | |
171 job = dict() | |
172 j_id = j['id'] | |
173 job.update(id=j_id) | |
174 trace = read_status(user, j_id) | |
175 | |
176 status = trace['status'] | |
177 if status == 'SUCCESS' : | |
178 job.update(message=trace['message'], | |
179 status=status, | |
180 ds=trace['datasets'][0]['name'], | |
181 time=trace['executionTime']) | |
182 else : | |
183 job.update(message=trace['message'], | |
184 status=status, | |
185 ds=trace['datasets'][0]['name']) | |
186 | |
187 jobs_out.append(job) | |
188 | |
189 with open(output, 'w') as f: | |
190 for j in jobs_out: | |
191 f.write("{jobid}\t" | |
192 "{status}\t" | |
193 "{message}\t" | |
194 "{ds}\t" | |
195 "{time}\n".format(jobid=j.get('id'), status=j.get('status'), message=j.get('message'), | |
196 ds=j.get('ds'),time=j.get('time'))) | |
197 f.close() | |
198 | |
199 def stop_query(user,jobid,output) : | |
200 """Stop the execution of the given job""" | |
201 | |
202 call = 'stop' | |
203 | |
204 url = compose_url(module_monitor, call) | |
205 url = url.format(jobid=jobid) | |
206 | |
207 outcome = get(url, user=user, response_type='json') | |
208 | |
209 with open(output,'w') as f_out : | |
210 json.dump(outcome, f_out) | |
211 | |
212 | |
213 | |
214 def stop_err(msg): | |
215 sys.stderr.write("%s\n" % msg) | |
216 | |
217 | |
218 def __main__(): | |
219 parser = argparse.ArgumentParser() | |
220 parser.add_argument("-user") | |
221 parser.add_argument("-cmd") | |
222 parser.add_argument("-name") | |
223 parser.add_argument("-query") | |
224 parser.add_argument("-queryNew") | |
225 parser.add_argument("-queryLocal") | |
226 parser.add_argument("-log") | |
227 parser.add_argument("-job") | |
228 parser.add_argument("-format") | |
229 parser.add_argument("-importFlag") | |
230 parser.add_argument("-add_output") | |
231 | |
232 | |
233 args = parser.parse_args() | |
234 | |
235 if args.cmd == 'compile': | |
236 compile_query(args.user, args.name, args.query, args.log) | |
237 if args.cmd == 'execute': | |
238 run_query(args.user, args.name, args.query, args.log, args.format, args.importFlag) | |
239 list_datasets(args.user,args.add_output) | |
240 if args.cmd == 'jobs': | |
241 show_jobs(args.user, args.log) | |
242 if args.cmd == 'stop' : | |
243 stop_query(args.user, args.job, args.log) | |
244 | |
245 | |
246 if __name__ == "__main__": | |
247 __main__() |