view util/subtools.py @ 0:ce4f91831680 draft default tip

planemo upload for repository https://github.com/Yating-L/suite_gonramp_apollo.git commit 5367a00befb467f162d1870edb91f9face72e894
author yating-l
date Fri, 16 Feb 2018 10:57:13 -0500
parents
children
line wrap: on
line source

#!/usr/bin/env python

"""
This file include common used functions for converting file format to gff3
"""
from collections import OrderedDict
import json
import subprocess
import os
import sys
import tempfile
import string
import logging

class PopenError(Exception):
    def __init__(self, cmd, error, return_code):
        self.cmd = cmd
        self.error = error
        self.return_code = return_code

    def __str__(self):
        message = "The subprocess {0} has returned the error: {1}.".format(
            self.cmd, self.return_code)
        message = ','.join(
            (message, "Its error message is: {0}".format(self.error)))
        return repr(message)


def _handleExceptionAndCheckCall(array_call, **kwargs):
    """
    This class handle exceptions and call the tool.
    It maps the signature of subprocess.check_call:
    See https://docs.python.org/2/library/subprocess.html#subprocess.check_call
    """
    stdout = kwargs.get('stdout', subprocess.PIPE)
    stderr = kwargs.get('stderr', subprocess.PIPE)
    shell = kwargs.get('shell', False)
    stdin = kwargs.get('stdin', None)

    cmd = array_call[0]

    output = None
    error = None

    # TODO: Check the value of array_call and <=[0]
    logging.debug("Calling {0}:".format(cmd))
    logging.debug("%s", array_call)
    logging.debug("---------")

    # TODO: Use universal_newlines option from Popen?
    try:
        p = subprocess.Popen(array_call, stdout=stdout,
                             stderr=stderr, shell=shell, stdin=stdin)

        # TODO: Change this because of possible memory issues => https://docs.python.org/2/library/subprocess.html#subprocess.Popen.communicate

        output, error = p.communicate()

        if stdout == subprocess.PIPE:
            logging.debug("\t{0}".format(output))
        else:
            logging.debug("\tOutput in file {0}".format(stdout.name))
        # If we detect an error from the subprocess, then we raise an exception
        # TODO: Manage if we raise an exception for everything, or use CRITICAL etc... but not stop process
        # TODO: The responsability of returning a sys.exit() should not be there, but up in the app.
        if p.returncode:
            if stderr == subprocess.PIPE:
                raise PopenError(cmd, error, p.returncode)
            else:
                # TODO: To Handle properly with a design behind, if we received a option as a file for the error
                raise Exception("Error when calling {0}. Error as been logged in your file {1}. Error code: {2}".format(cmd, stderr.name, p.returncode))

    except OSError as e:
        message = "The subprocess {0} has encountered an OSError: {1}".format(
            cmd, e.strerror)
        if e.filename:
            message = '\n'.join(
                (message, ", against this file: {0}".format(e.filename)))
        logging.error(message)
        sys.exit(-1)
    except PopenError as p:
        message = "The subprocess {0} has returned the error: {1}.".format(
            p.cmd, p.return_code)
        message = '\n'.join(
            (message, "Its error message is: {0}".format(p.error)))

        logging.exception(message)

        sys.exit(p.return_code)
    except Exception as e:
        message = "The subprocess {0} has encountered an unknown error: {1}".format(
            cmd, e)
        logging.exception(message)

        sys.exit(-1)
    return output

def arrow_add_organism(organism_name, organism_dir, public=False):
    array_call = ['arrow', 'organisms', 'add_organism', organism_name, organism_dir]
    if public:
        array_call.append('--public')
    p = _handleExceptionAndCheckCall(array_call)
    #p = subprocess.check_output(array_call)
    return p

def arrow_create_user(user_email, firstname, lastname, password, admin=False):
    """ 
    Create a new user of Apollo, the default user_role is "user" 
    """
    array_call = ['arrow', 'users', 'create_user', user_email, firstname, lastname, password]
    if admin:
        array_call += ['--role', 'admin']
    p = _handleExceptionAndCheckCall(array_call)
    j = json.loads(p)
    if "userId" in j:
        return j['userId']
    elif "error" in j:
        logging.error("Got error message: %s", j['error'])
        exit(-1)
        
        
def arrow_delete_user(user_email):
    array_call = ['arrow', 'users', 'delete_user', user_email]
    p = _handleExceptionAndCheckCall(array_call)
    j = json.loads(p)
    if "error" in j:
        logging.error("Got error message: %s", j['error'])
        exit(-1)

def arrow_add_to_group(groupname, user_email):
    if not arrow_get_groups(groupname):
        arrow_create_group(groupname)
    array_call = ['arrow', 'users', 'add_to_group', groupname, user_email]
    p = _handleExceptionAndCheckCall(array_call)
    j = json.loads(p)
    if j != dict():
        logging.error("Error add user %s to group %s. The user doesn't exist", user_email, groupname)


def arrow_remove_from_group(groupname, user_email):
    if arrow_get_groups(groupname):
        array_call = ['arrow', 'users', 'remove_from_group', groupname, user_email]
        p = _handleExceptionAndCheckCall(array_call)
        j = json.loads(p)
        if j != dict():
            logging.error("Error remove user %s to group %s. The user doesn't exist", user_email, groupname)
    else:
        logging.error("Group %s doesn't exist. Check if you spell the name correctly", groupname)

def arrow_create_group(groupname):
    if arrow_get_groups(groupname):
        logging.error("Group %s already exist. Create a group with another name.", groupname)
    array_call = ['arrow', 'groups', 'create_group', groupname]
    p = _handleExceptionAndCheckCall(array_call)

def arrow_get_groups(groupname):
    array_call = ['arrow', 'groups', 'get_groups']
    p = _handleExceptionAndCheckCall(array_call)
    all_groups = json.loads(p)
    for g in all_groups:
        if g['name'] == groupname:
            return True
    return False

def arrow_update_organism_permissions(user_id, organism, **user_permissions):
    array_call = ['arrow', 'users', 'update_organism_permissions', str(user_id), str(organism)]
    admin = user_permissions.get("admin", False)
    write = user_permissions.get("write", False)
    read = user_permissions.get("read", False)
    export = user_permissions.get("export", False)
    if admin:
        array_call.append('--administrate')
    if write:
        array_call.append('--write')
    if read:
        array_call.append('--read')
    if export:
        array_call.append('--export')
    p = _handleExceptionAndCheckCall(array_call)
    return p

def arrow_get_users(user_email):
    array_call = ['arrow', 'users', 'get_users']
    p = _handleExceptionAndCheckCall(array_call)
    all_users = json.loads(p)
    for d in all_users:
        if d['username'] == user_email:
            return d['userId']
    logging.error("Cannot find user %s", user_email)

def arrow_get_organism(organism_name):
    array_call= ['arrow', 'organisms', 'get_organisms', '--common_name', organism_name]
    p = _handleExceptionAndCheckCall(array_call)
    org = json.loads(p)
    if 'error' not in org:
        return org[0]['id']
    else:
        logging.debug("Got error msg %s when look for organism %s.", org['error'], organism_name)

def arrow_delete_organism(organism_id):
    array_call = ['arrow', 'organisms', 'delete_organism', str(organism_id)]
    p = _handleExceptionAndCheckCall(array_call)
    return p
    
def verify_user_login(username, password, apollo_host):
    user_info = {'username': username, 'password': password}
    array_call = ['curl', 
                  '-b', 'cookies.txt', 
                  '-c', 'cookies.txt', 
                  '-H', 'Content-Type:application/json',
                  '-d', json.dumps(user_info),
                  apollo_host + '/Login?operation=login'
                  ]
    p = _handleExceptionAndCheckCall(array_call)
    msg = json.loads(p)
    if 'error' in msg:
        logging.error("The Authentication for user %s failed. Get error message %s", username, msg['error'])
        exit(-1)