Mercurial > repos > geco-team > gmql_queries_composer
comparison gmql_rest_queries.py @ 0:a80c93182db3 draft default tip
planemo upload for repository https://github.com/lu-brn/gmql-galaxy commit 953ee36ceda5814dc9baa03427bc0eb4ee2e93bd-dirty
| author | geco-team |
|---|---|
| date | Tue, 26 Jun 2018 09:08:06 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:a80c93182db3 |
|---|---|
| 1 # Galaxy plugin to REST access to the GMQL services | |
| 2 # (Queries) | |
| 3 # ---------------------------------------------------------------------------- | |
| 4 # Luana Brancato, luana.brancato@mail.polimi.it | |
| 5 # ---------------------------------------------------------------------------- | |
| 6 | |
| 7 import argparse | |
| 8 from time import sleep | |
| 9 | |
| 10 from gmql_rest_datasets import list_datasets, import_samples | |
| 11 from utilities import * | |
| 12 | |
| 13 module_execution = 'query_exec' | |
| 14 module_monitor = 'query_monitor' | |
| 15 | |
| 16 | |
| 17 def check_input(query): | |
| 18 | |
| 19 # Clean the input from Galaxy escape characters. | |
| 20 | |
| 21 query = query.replace('__dq__', '"') | |
| 22 query = query.replace('__sq__', "'") | |
| 23 query = query.replace('__gt__', ">") | |
| 24 query = query.replace('__lt__', "<") | |
| 25 query = query.replace('__cn__', '\n') | |
| 26 | |
| 27 | |
| 28 return query | |
| 29 | |
| 30 | |
| 31 def compile_query(user, filename, query, log_file): | |
| 32 """Compile the given query""" | |
| 33 | |
| 34 call = 'compile' | |
| 35 | |
| 36 #Read query from file | |
| 37 with open(query, 'r') as f_in: | |
| 38 query_text = f_in.read() | |
| 39 | |
| 40 #Check the input | |
| 41 query_cl = check_input(query_text) | |
| 42 | |
| 43 | |
| 44 # Then ask it to be compiled | |
| 45 url = compose_url(module_execution, call) | |
| 46 | |
| 47 outcome = post(url, query_cl, user=user, content_type='text') | |
| 48 | |
| 49 status = outcome['status'] | |
| 50 message = outcome['message'] | |
| 51 target_ds = outcome['id'] | |
| 52 | |
| 53 if status == 'COMPILE_SUCCESS': | |
| 54 with open(log_file, 'w') as f: | |
| 55 f.write("{status}\n{dataset}".format(status=status, dataset=target_ds)) | |
| 56 f.close() | |
| 57 if status == 'COMPILE_FAILED': | |
| 58 with open(log_file, 'w') as f: | |
| 59 f.write("{status}\n{message}".format(status=status, message=message)) | |
| 60 f.close() | |
| 61 stop_err("Compilation failed.\nSee log for details.") | |
| 62 | |
| 63 | |
| 64 def run_query(user, filename, query, log_file, rs_format, importResult=True): | |
| 65 """Run the given query. It returns an execution log and the resulting dataset.""" | |
| 66 | |
| 67 | |
| 68 call = 'run' | |
| 69 | |
| 70 # Read query from file | |
| 71 with open(query, 'r') as f_in: | |
| 72 query_text = f_in.read() | |
| 73 | |
| 74 # Check the input | |
| 75 query_cl = check_input(query_text) | |
| 76 | |
| 77 # Then ask it to be executed | |
| 78 | |
| 79 status = "NEW" | |
| 80 | |
| 81 url = compose_url(module_execution, call) | |
| 82 url = url.format(name=filename,output=rs_format) | |
| 83 | |
| 84 outcome = post(url, query_cl, user=user, content_type='text') | |
| 85 | |
| 86 jobid = outcome['id'] | |
| 87 | |
| 88 while status != "SUCCESS" and status != "EXEC_FAILED" and status != "DS_CREATION_FAILED": | |
| 89 log = read_status(user, jobid) | |
| 90 status = log['status'] | |
| 91 sleep(5) | |
| 92 | |
| 93 message = log['message'] | |
| 94 time = log['executionTime'] | |
| 95 | |
| 96 if status == "EXEC_FAILED" or status == "DS_CREATION_FAILED": | |
| 97 with open(log_file, 'w') as f: | |
| 98 f.write("{status}\n{message}\n{execTime}".format(status=status, message=message, execTime=time)) | |
| 99 f.close() | |
| 100 stop_err("Execution failed.\nSee log for details") | |
| 101 | |
| 102 if status == "SUCCESS": | |
| 103 ext_log = read_complete_log(user, jobid) | |
| 104 job_list = ext_log['log'] | |
| 105 jobs = "" | |
| 106 for j in job_list: | |
| 107 jobs = "{j_list}{j}\n".format(j_list=jobs, j=j) | |
| 108 | |
| 109 with open(log_file, 'w') as f: | |
| 110 f.write("{status}\n" | |
| 111 "{message}\n" | |
| 112 "{execTime}\n" | |
| 113 "\n" | |
| 114 "{jobs}\n".format(status=status, message=message, execTime=time, jobs=jobs)) | |
| 115 f.close() | |
| 116 | |
| 117 importResult = bool(importResult) | |
| 118 | |
| 119 if importResult: | |
| 120 # For now, it gets only the final result (it's easier to deal later with simple collections | |
| 121 # than a nested ones) | |
| 122 | |
| 123 ds = log['datasets'][-1] | |
| 124 ds_name = ds.get('name') | |
| 125 import_samples(user, ds_name) | |
| 126 | |
| 127 | |
| 128 def read_status(user, jobid): | |
| 129 """Given the job id, it retrieves the status of the current operation | |
| 130 (as a JSON file)""" | |
| 131 | |
| 132 call = 'status' | |
| 133 | |
| 134 url = compose_url(module_monitor, call) | |
| 135 url = url.format(jobid=jobid) | |
| 136 | |
| 137 status = get(url, user=user, response_type='json') | |
| 138 | |
| 139 return status | |
| 140 | |
| 141 | |
| 142 | |
| 143 def read_complete_log(user, jobid): | |
| 144 """Given the jobid, it retrieves the complete log of the latest operation | |
| 145 (as a JSON file)""" | |
| 146 | |
| 147 call = 'log' | |
| 148 | |
| 149 url = compose_url(module_monitor, call) | |
| 150 url = url.format(jobid=jobid) | |
| 151 | |
| 152 log = get(url, user=user, response_type='json') | |
| 153 | |
| 154 return log | |
| 155 | |
| 156 | |
| 157 def show_jobs(user, output): | |
| 158 """Retrieve the list of the user's jobs""" | |
| 159 | |
| 160 call = 'jobs' | |
| 161 | |
| 162 url = compose_url(module_monitor, call) | |
| 163 | |
| 164 jobs = get(url, user=user, response_type='json') | |
| 165 | |
| 166 jobs_list = jobs['jobs'] | |
| 167 jobs_out = list() | |
| 168 | |
| 169 # For each job in the list retrieve the relative status info | |
| 170 for j in jobs_list: | |
| 171 job = dict() | |
| 172 j_id = j['id'] | |
| 173 job.update(id=j_id) | |
| 174 trace = read_status(user, j_id) | |
| 175 | |
| 176 status = trace['status'] | |
| 177 if status == 'SUCCESS' : | |
| 178 job.update(message=trace['message'], | |
| 179 status=status, | |
| 180 ds=trace['datasets'][0]['name'], | |
| 181 time=trace['executionTime']) | |
| 182 else : | |
| 183 job.update(message=trace['message'], | |
| 184 status=status, | |
| 185 ds=trace['datasets'][0]['name']) | |
| 186 | |
| 187 jobs_out.append(job) | |
| 188 | |
| 189 with open(output, 'w') as f: | |
| 190 for j in jobs_out: | |
| 191 f.write("{jobid}\t" | |
| 192 "{status}\t" | |
| 193 "{message}\t" | |
| 194 "{ds}\t" | |
| 195 "{time}\n".format(jobid=j.get('id'), status=j.get('status'), message=j.get('message'), | |
| 196 ds=j.get('ds'),time=j.get('time'))) | |
| 197 f.close() | |
| 198 | |
| 199 def stop_query(user,jobid,output) : | |
| 200 """Stop the execution of the given job""" | |
| 201 | |
| 202 call = 'stop' | |
| 203 | |
| 204 url = compose_url(module_monitor, call) | |
| 205 url = url.format(jobid=jobid) | |
| 206 | |
| 207 outcome = get(url, user=user, response_type='json') | |
| 208 | |
| 209 with open(output,'w') as f_out : | |
| 210 json.dump(outcome, f_out) | |
| 211 | |
| 212 | |
| 213 | |
| 214 def stop_err(msg): | |
| 215 sys.stderr.write("%s\n" % msg) | |
| 216 | |
| 217 | |
| 218 def __main__(): | |
| 219 parser = argparse.ArgumentParser() | |
| 220 parser.add_argument("-user") | |
| 221 parser.add_argument("-cmd") | |
| 222 parser.add_argument("-name") | |
| 223 parser.add_argument("-query") | |
| 224 parser.add_argument("-queryNew") | |
| 225 parser.add_argument("-queryLocal") | |
| 226 parser.add_argument("-log") | |
| 227 parser.add_argument("-job") | |
| 228 parser.add_argument("-format") | |
| 229 parser.add_argument("-importFlag") | |
| 230 parser.add_argument("-add_output") | |
| 231 | |
| 232 | |
| 233 args = parser.parse_args() | |
| 234 | |
| 235 if args.cmd == 'compile': | |
| 236 compile_query(args.user, args.name, args.query, args.log) | |
| 237 if args.cmd == 'execute': | |
| 238 run_query(args.user, args.name, args.query, args.log, args.format, args.importFlag) | |
| 239 list_datasets(args.user,args.add_output) | |
| 240 if args.cmd == 'jobs': | |
| 241 show_jobs(args.user, args.log) | |
| 242 if args.cmd == 'stop' : | |
| 243 stop_query(args.user, args.job, args.log) | |
| 244 | |
| 245 | |
| 246 if __name__ == "__main__": | |
| 247 __main__() |
