Mercurial > repos > geco-team > gmql_queries_editor
view gmql_rest_queries.py @ 0:c74a1c7121ec draft default tip
planemo upload for repository https://github.com/lu-brn/gmql-galaxy commit 953ee36ceda5814dc9baa03427bc0eb4ee2e93bd-dirty
author | geco-team |
---|---|
date | Tue, 26 Jun 2018 08:59:49 -0400 |
parents | |
children |
line wrap: on
line source
# Galaxy plugin to REST access to the GMQL services # (Queries) # ---------------------------------------------------------------------------- # Luana Brancato, luana.brancato@mail.polimi.it # ---------------------------------------------------------------------------- import argparse from time import sleep from gmql_rest_datasets import list_datasets, import_samples from utilities import * module_execution = 'query_exec' module_monitor = 'query_monitor' def check_input(query): # Clean the input from Galaxy escape characters. query = query.replace('__dq__', '"') query = query.replace('__sq__', "'") query = query.replace('__gt__', ">") query = query.replace('__lt__', "<") query = query.replace('__cn__', '\n') return query def compile_query(user, filename, query, log_file): """Compile the given query""" call = 'compile' #Read query from file with open(query, 'r') as f_in: query_text = f_in.read() #Check the input query_cl = check_input(query_text) # Then ask it to be compiled url = compose_url(module_execution, call) outcome = post(url, query_cl, user=user, content_type='text') status = outcome['status'] message = outcome['message'] target_ds = outcome['id'] if status == 'COMPILE_SUCCESS': with open(log_file, 'w') as f: f.write("{status}\n{dataset}".format(status=status, dataset=target_ds)) f.close() if status == 'COMPILE_FAILED': with open(log_file, 'w') as f: f.write("{status}\n{message}".format(status=status, message=message)) f.close() stop_err("Compilation failed.\nSee log for details.") def run_query(user, filename, query, log_file, rs_format, importResult=True): """Run the given query. It returns an execution log and the resulting dataset.""" call = 'run' # Read query from file with open(query, 'r') as f_in: query_text = f_in.read() # Check the input query_cl = check_input(query_text) # Then ask it to be executed status = "NEW" url = compose_url(module_execution, call) url = url.format(name=filename,output=rs_format) outcome = post(url, query_cl, user=user, content_type='text') jobid = outcome['id'] while status != "SUCCESS" and status != "EXEC_FAILED" and status != "DS_CREATION_FAILED": log = read_status(user, jobid) status = log['status'] sleep(5) message = log['message'] time = log['executionTime'] if status == "EXEC_FAILED" or status == "DS_CREATION_FAILED": with open(log_file, 'w') as f: f.write("{status}\n{message}\n{execTime}".format(status=status, message=message, execTime=time)) f.close() stop_err("Execution failed.\nSee log for details") if status == "SUCCESS": ext_log = read_complete_log(user, jobid) job_list = ext_log['log'] jobs = "" for j in job_list: jobs = "{j_list}{j}\n".format(j_list=jobs, j=j) with open(log_file, 'w') as f: f.write("{status}\n" "{message}\n" "{execTime}\n" "\n" "{jobs}\n".format(status=status, message=message, execTime=time, jobs=jobs)) f.close() importResult = bool(importResult) if importResult: # For now, it gets only the final result (it's easier to deal later with simple collections # than a nested ones) ds = log['datasets'][-1] ds_name = ds.get('name') import_samples(user, ds_name) def read_status(user, jobid): """Given the job id, it retrieves the status of the current operation (as a JSON file)""" call = 'status' url = compose_url(module_monitor, call) url = url.format(jobid=jobid) status = get(url, user=user, response_type='json') return status def read_complete_log(user, jobid): """Given the jobid, it retrieves the complete log of the latest operation (as a JSON file)""" call = 'log' url = compose_url(module_monitor, call) url = url.format(jobid=jobid) log = get(url, user=user, response_type='json') return log def show_jobs(user, output): """Retrieve the list of the user's jobs""" call = 'jobs' url = compose_url(module_monitor, call) jobs = get(url, user=user, response_type='json') jobs_list = jobs['jobs'] jobs_out = list() # For each job in the list retrieve the relative status info for j in jobs_list: job = dict() j_id = j['id'] job.update(id=j_id) trace = read_status(user, j_id) status = trace['status'] if status == 'SUCCESS' : job.update(message=trace['message'], status=status, ds=trace['datasets'][0]['name'], time=trace['executionTime']) else : job.update(message=trace['message'], status=status, ds=trace['datasets'][0]['name']) jobs_out.append(job) with open(output, 'w') as f: for j in jobs_out: f.write("{jobid}\t" "{status}\t" "{message}\t" "{ds}\t" "{time}\n".format(jobid=j.get('id'), status=j.get('status'), message=j.get('message'), ds=j.get('ds'),time=j.get('time'))) f.close() def stop_query(user,jobid,output) : """Stop the execution of the given job""" call = 'stop' url = compose_url(module_monitor, call) url = url.format(jobid=jobid) outcome = get(url, user=user, response_type='json') with open(output,'w') as f_out : json.dump(outcome, f_out) def stop_err(msg): sys.stderr.write("%s\n" % msg) def __main__(): parser = argparse.ArgumentParser() parser.add_argument("-user") parser.add_argument("-cmd") parser.add_argument("-name") parser.add_argument("-query") parser.add_argument("-queryNew") parser.add_argument("-queryLocal") parser.add_argument("-log") parser.add_argument("-job") parser.add_argument("-format") parser.add_argument("-importFlag") parser.add_argument("-add_output") args = parser.parse_args() if args.cmd == 'compile': compile_query(args.user, args.name, args.query, args.log) if args.cmd == 'execute': run_query(args.user, args.name, args.query, args.log, args.format, args.importFlag) list_datasets(args.user,args.add_output) if args.cmd == 'jobs': show_jobs(args.user, args.log) if args.cmd == 'stop' : stop_query(args.user, args.job, args.log) if __name__ == "__main__": __main__()