comparison gmql_rest_queries.py @ 0:c74a1c7121ec draft default tip

planemo upload for repository https://github.com/lu-brn/gmql-galaxy commit 953ee36ceda5814dc9baa03427bc0eb4ee2e93bd-dirty
author geco-team
date Tue, 26 Jun 2018 08:59:49 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:c74a1c7121ec
1 # Galaxy plugin to REST access to the GMQL services
2 # (Queries)
3 # ----------------------------------------------------------------------------
4 # Luana Brancato, luana.brancato@mail.polimi.it
5 # ----------------------------------------------------------------------------
6
7 import argparse
8 from time import sleep
9
10 from gmql_rest_datasets import list_datasets, import_samples
11 from utilities import *
12
13 module_execution = 'query_exec'
14 module_monitor = 'query_monitor'
15
16
17 def check_input(query):
18
19 # Clean the input from Galaxy escape characters.
20
21 query = query.replace('__dq__', '"')
22 query = query.replace('__sq__', "'")
23 query = query.replace('__gt__', ">")
24 query = query.replace('__lt__', "<")
25 query = query.replace('__cn__', '\n')
26
27
28 return query
29
30
31 def compile_query(user, filename, query, log_file):
32 """Compile the given query"""
33
34 call = 'compile'
35
36 #Read query from file
37 with open(query, 'r') as f_in:
38 query_text = f_in.read()
39
40 #Check the input
41 query_cl = check_input(query_text)
42
43
44 # Then ask it to be compiled
45 url = compose_url(module_execution, call)
46
47 outcome = post(url, query_cl, user=user, content_type='text')
48
49 status = outcome['status']
50 message = outcome['message']
51 target_ds = outcome['id']
52
53 if status == 'COMPILE_SUCCESS':
54 with open(log_file, 'w') as f:
55 f.write("{status}\n{dataset}".format(status=status, dataset=target_ds))
56 f.close()
57 if status == 'COMPILE_FAILED':
58 with open(log_file, 'w') as f:
59 f.write("{status}\n{message}".format(status=status, message=message))
60 f.close()
61 stop_err("Compilation failed.\nSee log for details.")
62
63
64 def run_query(user, filename, query, log_file, rs_format, importResult=True):
65 """Run the given query. It returns an execution log and the resulting dataset."""
66
67
68 call = 'run'
69
70 # Read query from file
71 with open(query, 'r') as f_in:
72 query_text = f_in.read()
73
74 # Check the input
75 query_cl = check_input(query_text)
76
77 # Then ask it to be executed
78
79 status = "NEW"
80
81 url = compose_url(module_execution, call)
82 url = url.format(name=filename,output=rs_format)
83
84 outcome = post(url, query_cl, user=user, content_type='text')
85
86 jobid = outcome['id']
87
88 while status != "SUCCESS" and status != "EXEC_FAILED" and status != "DS_CREATION_FAILED":
89 log = read_status(user, jobid)
90 status = log['status']
91 sleep(5)
92
93 message = log['message']
94 time = log['executionTime']
95
96 if status == "EXEC_FAILED" or status == "DS_CREATION_FAILED":
97 with open(log_file, 'w') as f:
98 f.write("{status}\n{message}\n{execTime}".format(status=status, message=message, execTime=time))
99 f.close()
100 stop_err("Execution failed.\nSee log for details")
101
102 if status == "SUCCESS":
103 ext_log = read_complete_log(user, jobid)
104 job_list = ext_log['log']
105 jobs = ""
106 for j in job_list:
107 jobs = "{j_list}{j}\n".format(j_list=jobs, j=j)
108
109 with open(log_file, 'w') as f:
110 f.write("{status}\n"
111 "{message}\n"
112 "{execTime}\n"
113 "\n"
114 "{jobs}\n".format(status=status, message=message, execTime=time, jobs=jobs))
115 f.close()
116
117 importResult = bool(importResult)
118
119 if importResult:
120 # For now, it gets only the final result (it's easier to deal later with simple collections
121 # than a nested ones)
122
123 ds = log['datasets'][-1]
124 ds_name = ds.get('name')
125 import_samples(user, ds_name)
126
127
128 def read_status(user, jobid):
129 """Given the job id, it retrieves the status of the current operation
130 (as a JSON file)"""
131
132 call = 'status'
133
134 url = compose_url(module_monitor, call)
135 url = url.format(jobid=jobid)
136
137 status = get(url, user=user, response_type='json')
138
139 return status
140
141
142
143 def read_complete_log(user, jobid):
144 """Given the jobid, it retrieves the complete log of the latest operation
145 (as a JSON file)"""
146
147 call = 'log'
148
149 url = compose_url(module_monitor, call)
150 url = url.format(jobid=jobid)
151
152 log = get(url, user=user, response_type='json')
153
154 return log
155
156
157 def show_jobs(user, output):
158 """Retrieve the list of the user's jobs"""
159
160 call = 'jobs'
161
162 url = compose_url(module_monitor, call)
163
164 jobs = get(url, user=user, response_type='json')
165
166 jobs_list = jobs['jobs']
167 jobs_out = list()
168
169 # For each job in the list retrieve the relative status info
170 for j in jobs_list:
171 job = dict()
172 j_id = j['id']
173 job.update(id=j_id)
174 trace = read_status(user, j_id)
175
176 status = trace['status']
177 if status == 'SUCCESS' :
178 job.update(message=trace['message'],
179 status=status,
180 ds=trace['datasets'][0]['name'],
181 time=trace['executionTime'])
182 else :
183 job.update(message=trace['message'],
184 status=status,
185 ds=trace['datasets'][0]['name'])
186
187 jobs_out.append(job)
188
189 with open(output, 'w') as f:
190 for j in jobs_out:
191 f.write("{jobid}\t"
192 "{status}\t"
193 "{message}\t"
194 "{ds}\t"
195 "{time}\n".format(jobid=j.get('id'), status=j.get('status'), message=j.get('message'),
196 ds=j.get('ds'),time=j.get('time')))
197 f.close()
198
199 def stop_query(user,jobid,output) :
200 """Stop the execution of the given job"""
201
202 call = 'stop'
203
204 url = compose_url(module_monitor, call)
205 url = url.format(jobid=jobid)
206
207 outcome = get(url, user=user, response_type='json')
208
209 with open(output,'w') as f_out :
210 json.dump(outcome, f_out)
211
212
213
214 def stop_err(msg):
215 sys.stderr.write("%s\n" % msg)
216
217
218 def __main__():
219 parser = argparse.ArgumentParser()
220 parser.add_argument("-user")
221 parser.add_argument("-cmd")
222 parser.add_argument("-name")
223 parser.add_argument("-query")
224 parser.add_argument("-queryNew")
225 parser.add_argument("-queryLocal")
226 parser.add_argument("-log")
227 parser.add_argument("-job")
228 parser.add_argument("-format")
229 parser.add_argument("-importFlag")
230 parser.add_argument("-add_output")
231
232
233 args = parser.parse_args()
234
235 if args.cmd == 'compile':
236 compile_query(args.user, args.name, args.query, args.log)
237 if args.cmd == 'execute':
238 run_query(args.user, args.name, args.query, args.log, args.format, args.importFlag)
239 list_datasets(args.user,args.add_output)
240 if args.cmd == 'jobs':
241 show_jobs(args.user, args.log)
242 if args.cmd == 'stop' :
243 stop_query(args.user, args.job, args.log)
244
245
246 if __name__ == "__main__":
247 __main__()