view utilities.py @ 0:35d52820e7c7 draft default tip

planemo upload for repository https://github.com/lu-brn/gmql-galaxy commit 953ee36ceda5814dc9baa03427bc0eb4ee2e93bd-dirty
author geco-team
date Tue, 26 Jun 2018 09:01:39 -0400
parents
children
line wrap: on
line source

# Helper functions to perform REST calls on the GMQL server.
# ----------------------------------------------------------------------------
# Luana Brancato, luana.brancato@mail.polimi.it
# ----------------------------------------------------------------------------

import sys
import os
import yaml
import requests


def load_parts(module, call) :
    """Given the module and the single operation, returns the fragments for the url to call"""

    y_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),'gmql_rest.yaml')

    with open(y_path,'r') as yamlf :
        cfg = yaml.load(yamlf)

    parts = list ()

    gmql = cfg['GMQL_URL']
    prefix = cfg[module]['prefix']
    op = cfg[module]['operations'][call]

    parts.append(gmql)
    if prefix :
        parts.append(prefix)

    for p in op :
        parts.append(p)

    return parts

def compose_url(module, call) :
    """Given the fragments of a url, return the composite one"""

    parts = load_parts(module,call)
    url = '/'.join(parts)

    return url

def add_url_param(params, module, op, value,) :
    """Given the params dict, add a new pair of key:value with the given value and the key set for given module and operation"""

    y_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'gmql_rest.yaml')

    with open(y_path, 'r') as yamlf:
        cfg = yaml.load(yamlf)
    yamlf.close()

    key = cfg[module]['params'][op]

    params.update({key : value})

    return params


def read_token(input):
    """It takes the tabular file with the information over the user
     name   authToken   valid_flag
     It checks if the user is still valid and extract the authToken for the REST calls"""

    with open(input,'r') as f_in :
        user = f_in.readline().rstrip('\n').split('\t')

    if user[2] :
        token = user[1]
    else :
        stop_err("This session is no longer valid")


    return token

def expire_user(input):
    """Set the validity flag of a user token to false"""

    with open(input,'r') as f:
        user = f.readline().rstrip('\n').split('\t')

    user[2] = False

    with open(input,'w') as f :
         f.write('{fullName}\t{token}\t{valid}\n'.format(fullName=user[0], token=user[1],
                                                            valid=user[2]))


def get(url, user=None, response_type='json') :
    """GET Request
    :param url: url where to fetch the requested resource
    :param user: for authenticated requests; if not provided make an unauthenticated request (es. for login)
    :param response_type: type of the fetched response.
        JSON ( Default )
        TEXT
        ZIP
        FILE
    """

    #Set request headers
    headers = dict ()

    if user :
        headers.update({'X-AUTH-TOKEN' : read_token(user)})

    if response_type == 'text' :
        headers.update({'Accept' : 'text/plain'})
    elif response_type == 'zip' :
        pass
    elif response_type == 'file' :
        headers.update({'Accept' : 'file'})
    else :
        headers.update({'Accept' : 'application/json'})


    #Make the request
    response = requests.get(url, headers=headers)

    #Check returned server status
    status_code = response.status_code

    #Read result. If Server OK, read according to response_type. Raise an error otherwise.
    if status_code == requests.codes.ok :
        if response_type == 'json' :
            return response.json()
        elif response_type == 'text' :
            return response.text
        else :
            return response
    elif status_code == requests.codes.unauthorized :
        expire_user(user)
        stop_err("You are not authorized to do this. \nPlease login first.")
    elif status_code == requests.codes.not_found :
        stop_err("Resource not found for this user.")
    else :
        stop_err("Error {code}: {reason}\n{message}".format(code=status_code,
                                                            reason=response.reason,
                                                            message=response.content))


def post(url, payload, user=None, params=None, content_type='json', response_type='json') :
    """ POST Request
    :param url: url where to post data
    :param payload: payload for the post request. Type is specified by content_type.
    :param user:  for authenticated requests; if not provided make an unauthenticated request (es. for registration)
    :param params: optional query parameters
    :param content_type
    :param response_type: Default is json
    """


    # Set request headers
    headers = dict()

    if user:
        headers.update({'X-AUTH-TOKEN': read_token(user)})

    headers.update({'Accept': 'application/json'})

    if content_type == 'text' :
        headers.update({'Content-Type' : 'text/plain'})
        response = requests.post(url, params=params, headers=headers, data=payload)
    elif content_type == 'multiform' :
        response = requests.post(url, params=params, headers=headers, files=payload)
    else :
        headers.update({'Content-Type': 'application/json'})
        response = requests.post(url, params=params, headers=headers, json=payload)

    # Check returned server status
    status_code = response.status_code


    if status_code == requests.codes.ok :
       return response.json()
    elif status_code == requests.codes.unauthorized :
       content = response.content
       if content.__contains__("The username or password you entered don't match") :
           stop_err("The username or password you entered don't match")
       else:
           expire_user(user)
           stop_err("You are not authorized to do this. \nPlease login first.")
    else :
        stop_err("Error {code}: {reason}\n{message}".format(code=status_code,
                                                 reason=response.reason,
                                                 message=response.content))



def delete(url, user=None, response_type='json') :
    """DELETE request
    :param url: url where to post data
    :param user:  for authenticated requests; if not provided make an unauthenticated request (es. for registration)
    :param response_type: Default is json
    """

    # Set request headers
    headers = dict()

    if user:
        headers.update({'X-AUTH-TOKEN': read_token(user)})

    headers.update({'Accept': 'application/json'})

    #Make the request
    response = requests.delete(url, headers=headers)

    #Check returned server status
    status_code = response.status_code


    #If Server OK, read result. Raise an error otherwise.
    if status_code == requests.codes.ok :
            return response.json()
    elif status_code == requests.codes.unauthorized :
        expire_user(user)
        stop_err("You are not authorized to do this. \nPlease login first.")
    elif status_code == requests.codes.not_found :
        stop_err("Resource not found for this user.")
    else :
        stop_err("Error {code}: {reason}".format(code=status_code,
                                                 reason=response.reason))



def stop_err(msg):
    sys.stderr.write("%s\n" % msg)
    sys.exit()