Mercurial > repos > yating-l > gonramp_apollo_tools
diff util/subtools.py @ 0:ce4f91831680 draft default tip
planemo upload for repository https://github.com/Yating-L/suite_gonramp_apollo.git commit 5367a00befb467f162d1870edb91f9face72e894
author | yating-l |
---|---|
date | Fri, 16 Feb 2018 10:57:13 -0500 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/util/subtools.py Fri Feb 16 10:57:13 2018 -0500 @@ -0,0 +1,220 @@ +#!/usr/bin/env python + +""" +This file include common used functions for converting file format to gff3 +""" +from collections import OrderedDict +import json +import subprocess +import os +import sys +import tempfile +import string +import logging + +class PopenError(Exception): + def __init__(self, cmd, error, return_code): + self.cmd = cmd + self.error = error + self.return_code = return_code + + def __str__(self): + message = "The subprocess {0} has returned the error: {1}.".format( + self.cmd, self.return_code) + message = ','.join( + (message, "Its error message is: {0}".format(self.error))) + return repr(message) + + +def _handleExceptionAndCheckCall(array_call, **kwargs): + """ + This class handle exceptions and call the tool. + It maps the signature of subprocess.check_call: + See https://docs.python.org/2/library/subprocess.html#subprocess.check_call + """ + stdout = kwargs.get('stdout', subprocess.PIPE) + stderr = kwargs.get('stderr', subprocess.PIPE) + shell = kwargs.get('shell', False) + stdin = kwargs.get('stdin', None) + + cmd = array_call[0] + + output = None + error = None + + # TODO: Check the value of array_call and <=[0] + logging.debug("Calling {0}:".format(cmd)) + logging.debug("%s", array_call) + logging.debug("---------") + + # TODO: Use universal_newlines option from Popen? + try: + p = subprocess.Popen(array_call, stdout=stdout, + stderr=stderr, shell=shell, stdin=stdin) + + # TODO: Change this because of possible memory issues => https://docs.python.org/2/library/subprocess.html#subprocess.Popen.communicate + + output, error = p.communicate() + + if stdout == subprocess.PIPE: + logging.debug("\t{0}".format(output)) + else: + logging.debug("\tOutput in file {0}".format(stdout.name)) + # If we detect an error from the subprocess, then we raise an exception + # TODO: Manage if we raise an exception for everything, or use CRITICAL etc... but not stop process + # TODO: The responsability of returning a sys.exit() should not be there, but up in the app. + if p.returncode: + if stderr == subprocess.PIPE: + raise PopenError(cmd, error, p.returncode) + else: + # TODO: To Handle properly with a design behind, if we received a option as a file for the error + raise Exception("Error when calling {0}. Error as been logged in your file {1}. Error code: {2}".format(cmd, stderr.name, p.returncode)) + + except OSError as e: + message = "The subprocess {0} has encountered an OSError: {1}".format( + cmd, e.strerror) + if e.filename: + message = '\n'.join( + (message, ", against this file: {0}".format(e.filename))) + logging.error(message) + sys.exit(-1) + except PopenError as p: + message = "The subprocess {0} has returned the error: {1}.".format( + p.cmd, p.return_code) + message = '\n'.join( + (message, "Its error message is: {0}".format(p.error))) + + logging.exception(message) + + sys.exit(p.return_code) + except Exception as e: + message = "The subprocess {0} has encountered an unknown error: {1}".format( + cmd, e) + logging.exception(message) + + sys.exit(-1) + return output + +def arrow_add_organism(organism_name, organism_dir, public=False): + array_call = ['arrow', 'organisms', 'add_organism', organism_name, organism_dir] + if public: + array_call.append('--public') + p = _handleExceptionAndCheckCall(array_call) + #p = subprocess.check_output(array_call) + return p + +def arrow_create_user(user_email, firstname, lastname, password, admin=False): + """ + Create a new user of Apollo, the default user_role is "user" + """ + array_call = ['arrow', 'users', 'create_user', user_email, firstname, lastname, password] + if admin: + array_call += ['--role', 'admin'] + p = _handleExceptionAndCheckCall(array_call) + j = json.loads(p) + if "userId" in j: + return j['userId'] + elif "error" in j: + logging.error("Got error message: %s", j['error']) + exit(-1) + + +def arrow_delete_user(user_email): + array_call = ['arrow', 'users', 'delete_user', user_email] + p = _handleExceptionAndCheckCall(array_call) + j = json.loads(p) + if "error" in j: + logging.error("Got error message: %s", j['error']) + exit(-1) + +def arrow_add_to_group(groupname, user_email): + if not arrow_get_groups(groupname): + arrow_create_group(groupname) + array_call = ['arrow', 'users', 'add_to_group', groupname, user_email] + p = _handleExceptionAndCheckCall(array_call) + j = json.loads(p) + if j != dict(): + logging.error("Error add user %s to group %s. The user doesn't exist", user_email, groupname) + + +def arrow_remove_from_group(groupname, user_email): + if arrow_get_groups(groupname): + array_call = ['arrow', 'users', 'remove_from_group', groupname, user_email] + p = _handleExceptionAndCheckCall(array_call) + j = json.loads(p) + if j != dict(): + logging.error("Error remove user %s to group %s. The user doesn't exist", user_email, groupname) + else: + logging.error("Group %s doesn't exist. Check if you spell the name correctly", groupname) + +def arrow_create_group(groupname): + if arrow_get_groups(groupname): + logging.error("Group %s already exist. Create a group with another name.", groupname) + array_call = ['arrow', 'groups', 'create_group', groupname] + p = _handleExceptionAndCheckCall(array_call) + +def arrow_get_groups(groupname): + array_call = ['arrow', 'groups', 'get_groups'] + p = _handleExceptionAndCheckCall(array_call) + all_groups = json.loads(p) + for g in all_groups: + if g['name'] == groupname: + return True + return False + +def arrow_update_organism_permissions(user_id, organism, **user_permissions): + array_call = ['arrow', 'users', 'update_organism_permissions', str(user_id), str(organism)] + admin = user_permissions.get("admin", False) + write = user_permissions.get("write", False) + read = user_permissions.get("read", False) + export = user_permissions.get("export", False) + if admin: + array_call.append('--administrate') + if write: + array_call.append('--write') + if read: + array_call.append('--read') + if export: + array_call.append('--export') + p = _handleExceptionAndCheckCall(array_call) + return p + +def arrow_get_users(user_email): + array_call = ['arrow', 'users', 'get_users'] + p = _handleExceptionAndCheckCall(array_call) + all_users = json.loads(p) + for d in all_users: + if d['username'] == user_email: + return d['userId'] + logging.error("Cannot find user %s", user_email) + +def arrow_get_organism(organism_name): + array_call= ['arrow', 'organisms', 'get_organisms', '--common_name', organism_name] + p = _handleExceptionAndCheckCall(array_call) + org = json.loads(p) + if 'error' not in org: + return org[0]['id'] + else: + logging.debug("Got error msg %s when look for organism %s.", org['error'], organism_name) + +def arrow_delete_organism(organism_id): + array_call = ['arrow', 'organisms', 'delete_organism', str(organism_id)] + p = _handleExceptionAndCheckCall(array_call) + return p + +def verify_user_login(username, password, apollo_host): + user_info = {'username': username, 'password': password} + array_call = ['curl', + '-b', 'cookies.txt', + '-c', 'cookies.txt', + '-H', 'Content-Type:application/json', + '-d', json.dumps(user_info), + apollo_host + '/Login?operation=login' + ] + p = _handleExceptionAndCheckCall(array_call) + msg = json.loads(p) + if 'error' in msg: + logging.error("The Authentication for user %s failed. Get error message %s", username, msg['error']) + exit(-1) + +