view @ 8:28e96ae21a44 draft

"planemo upload for repository commit 020906fb54bde7fc143c356f41975c378a741315"
author galaxyp
date Wed, 09 Sep 2020 12:53:13 +0000
children 7e2e9379aec7
line wrap: on
line source

import collections
import json
import operator
import os
import re
import subprocess
import sys
from functools import reduce  # forward compatibility for Python 3

from CTDopts.CTDopts import (

def getFromDict(dataDict, mapList):
    return reduce(operator.getitem, mapList, dataDict)

def setInDict(dataDict, mapList, value):
    getFromDict(dataDict, mapList[:-1])[mapList[-1]] = value

def mergeDicts(d, e):
    insert values from the dict e into dict d
    no values of d are overwritten
    for k, v in e.items():
        if (k in d and isinstance(d[k], dict) and isinstance(e[k],
            mergeDicts(d[k], e[k])
        elif k not in d and not isinstance(e[k],
            d[k] = e[k]
            sys.stderr.write(" could not merge key %s for %s in %s" % (k, d, e))

def _json_object_hook_noenvlookup(d):
    return _json_object_hook(d, envlookup=False)

def _json_object_hook(d, envlookup=True):
    wee helper to transform the json written by galaxy
    while loading
    - True/False (bool objects) -> "true"/"false" (lowercase string)
    - data inputs with multiple and optional true give [None] if no file is given -> []
    - None -> "" (empty string)
    - replace bash expressions (if envlookup is True):
      - environment variables (need to consist capital letters and _) by their value
      - expressions
    for k in d.keys():
        # if type(d[k]) is bool:
        #     d[k] = str(d[k]).lower()
        # else
        if type(d[k]) is list and len(d[k]) == 1 and d[k][0] is None:
            d[k] = []
        elif d[k] is None:
            d[k] = ""
        elif envlookup and type(d[k]) is str and d[k].startswith("$"):
            m = re.fullmatch(r"\$([A-Z_]+)", d[k])
            if m:
                d[k] = os.environ.get(, "")
            m = re.fullmatch(r"\$(\{[A-Z_]+):-(.*)\}", d[k])
            if m:
                d[k] = os.environ.get(,

                p ="echo %s" % d[k], shell=True, check=True, stdout=subprocess.PIPE, encoding="utf8")
                d[k] = p.stdout.strip()
            except subprocess.CalledProcessError:
                sys.stderr.write("fill_ctd error: Could not evaluate %s" % d[k])
    return d

def qstring2list(qs):
    transform a space separated string that is quoted by " into a list
    lst = list()
    qs = qs.split(" ")
    quoted = False
    for p in qs:
        if p == "":
        if p.startswith('"') and p.endswith('"'):
        elif p.startswith('"'):
            quoted = True
            lst.append(p[1:] + " ")
        elif p.endswith('"'):
            quoted = False
            lst[-1] += p[:-1]
            if quoted:
                lst[-1] += p + " "
    return lst

def fix_underscores(args):
    if type(args) is dict:
        for k in list(args.keys()):
            v = args[k]
            if type(v) is dict:
            if k.startswith("_"):
                args[k[1:]] = v
                del args[k]
    elif type(args) is list:
        for i, v in enumerate(args):
            if type(v) is dict:

input_ctd = sys.argv[1]

# load user specified parameters from json
with open(sys.argv[2]) as fh:
    args = json.load(fh, object_hook=_json_object_hook_noenvlookup)

# load hardcoded parameters from json
with open(sys.argv[3]) as fh:
    hc_args = json.load(fh, object_hook=_json_object_hook)

# insert the hc_args into the args
mergeDicts(args, hc_args)

if "adv_opts_cond" in args:
    del args["adv_opts_cond"]

# IDMapper has in and spectra:in params, in is used in out as format_source",
# which does not work in Galaxy:"
# therefore hardcoded params change the name of spectra:in to spectra:_in
# which is corrected here again
# TODO remove once PR is in and adapt profile accordingly

model = CTDModel(from_file=input_ctd)

# transform values from json that correspond to
# - old style booleans (string + restrictions) -> transformed to a str
# - unrestricted ITEMLIST which are represented as strings
#   ("=quoted and space separated) in Galaxy -> transform to lists
# - optional data input parameters that have defaults and for which no
#   value is given -> overwritte with the default
for p in model.get_parameters():

    # check if the parameter is in the arguments from the galaxy tool
    # (from the json file(s)), since advanced parameters are absent
    # if the conditional is set to basic parameters
        getFromDict(args, p.get_lineage(name_only=True))
    except KeyError:
        # few tools use dashes in parameters which are automatically replaced
        # by underscores by Galaxy. in these cases the dictionary needs to be
        # updated
        # TODO might be removed later
            lineage = [_.replace("-", "_") for _ in p.get_lineage(name_only=True)]
            val = getFromDict(args, lineage)
        except KeyError:
            setInDict(args, lineage, val)

    if p.type is str and type(p.restrictions) is _Choices and set(p.restrictions.choices) == set(["true", "false"]):
        v = getFromDict(args, p.get_lineage(name_only=True))
        setInDict(args, p.get_lineage(name_only=True), str(v).lower())

    elif p.is_list and (p.restrictions is None or type(p.restrictions) is _NumericRange):
        v = getFromDict(args, p.get_lineage(name_only=True))
        if type(v) is str:
            setInDict(args, p.get_lineage(name_only=True), qstring2list(v))
    elif p.type is _InFile and not (p.default is None or type(p.default) is _Null):
        v = getFromDict(args, p.get_lineage(name_only=True))
        if v in [[], ""]:
            setInDict(args, p.get_lineage(name_only=True), p.default)

model.write_ctd(input_ctd, arg_dict=args)