Mercurial > repos > fubar > mashmap
diff toolfactory/toolfactory.py @ 1:0183cad9d13b draft
planemo upload
author | fubar |
---|---|
date | Thu, 22 Feb 2024 10:48:01 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/toolfactory/toolfactory.py Thu Feb 22 10:48:01 2024 +0000 @@ -0,0 +1,1393 @@ +# see https://github.com/fubar2/toolfactory +# +# copyright ross lazarus (ross stop lazarus at gmail stop com) May 2012 +# +# all rights reserved +# Licensed under the LGPL +# suggestions for improvement and bug fixes welcome at +# https://github.com/fubar2/toolfactory +# +# February 2023: Refactored to use galaxy-tool-test script in galaxyutil +# planemo not needed if tool is already installed. +# sqlite does not seem to work - switch to postgresql in the installation script +# +# march 2022: Refactored into two tools - generate and test/install +# as part of GTN tutorial development and biocontainer adoption +# The tester runs planemo on a non-tested archive, creates the test outputs +# and returns a new proper tool with test. + + +import argparse +import copy +import json +import logging +import os +import re +import shlex +import shutil +import subprocess +import sys +import tarfile +import tempfile +import time + +from bioblend import galaxy +from bioblend import ConnectionError + +import galaxyxml.tool as gxt +import galaxyxml.tool.parameters as gxtp + +import lxml.etree as ET + +import yaml + + +logger = logging.getLogger(__name__) + + +class Tool_Factory: + """Wrapper for an arbitrary script + uses galaxyxml + """ + + def __init__(self, args=None): # noqa + """ + prepare command line cl for running the tool here + and prepare elements needed for galaxyxml tool generation + """ + assert args.parampass in [ + "0", + "embed", + "argparse", + "positional", + "embednfmod", + ], ( + "args.parampass %s not 0,positional, embed, embednfmod or argparse" + % args.parampass + ) + # sed will update these settings during tfsetup.py first run + self.GALAXY_ADMIN_KEY = "1718977735397126400" + self.GALAXY_URL = "http://localhost:8080" + self.profile = "22.05" + self.not_iuc = True + self.args = args + self.tool_version = self.args.tool_version + self.myversion = "V3.0 February 2023" + self.verbose = True + self.debug = True + self.toolFactoryURL = "https://github.com/fubar2/galaxy_tf_overlay" + self.logger = logging.getLogger(__name__) + self.nfcoremod = False + if args.parampass == "embednfmod": + self.nfcoremod = True + self.script_in_help = False # IUC recommendation + self.tool_name = re.sub("[^a-zA-Z0-9_]+", "", args.tool_name) + self.tool_id = self.tool_name + self.local_tools = os.path.realpath( + os.path.join(args.galaxy_root, "local_tools") + ) + self.repdir = os.path.realpath(args.tfcollection) + self.testdir = os.path.join(self.repdir, self.tool_name) + self.toold = os.path.join(self.local_tools, self.tool_name) + self.tooltestd = os.path.join(self.toold, "test-data") + if self.nfcoremod: + self.local_tools = os.path.join(args.tfcollection, "tools") + self.repdir = os.path.join(args.tfcollection, "TFouts", self.tool_name) + self.toold = os.path.join(self.local_tools, self.tool_name) + self.tooltestd = os.path.join(self.toold, "test-data") + os.makedirs(self.repdir, exist_ok=True) + os.makedirs(self.toold, exist_ok=True) + os.makedirs(self.tooltestd, exist_ok=True) + os.makedirs(self.local_tools, exist_ok=True) + self.local_tool_conf = os.path.join(self.local_tools, "local_tool_conf.xml") + self.ourcwd = os.getcwd() + self.collections = [] + if len(args.collection) > 0: + try: + self.collections = [ + json.loads(x) for x in args.collection if len(x.strip()) > 1 + ] + except Exception: + self.logger.error( + f"--collections parameter {str(args.collection)} is malformed - should be a dictionary" + ) + self.infiles = [] + try: + self.infiles = [ + json.loads(x) for x in args.input_files if len(x.strip()) > 1 + ] + except Exception: + self.logger.error( + f"--input_files parameter {str(args.input_files)} is malformed - should be a dictionary" + ) + self.extra_files = [] + if len(args.xtra_files) > 0: + try: + self.extra_files = [ + json.loads(x) for x in args.xtra_files if len(x.strip()) > 1 + ] + except Exception: + self.logger.error( + f"--xtra_files parameter {str(args.xtra_files)} is malformed - should be a dictionary" + ) + self.outfiles = [] + try: + self.outfiles = [ + json.loads(x) for x in args.output_files if len(x.strip()) > 1 + ] + except Exception: + self.logger.error( + f"--output_files parameter {args.output_files} is malformed - should be a dictionary" + ) + assert ( + len(self.outfiles) + len(self.collections) + ) > 0, "No outfiles or output collections specified. The Galaxy job runner will fail without an output of some sort" + self.addpar = [] + try: + self.addpar = [ + json.loads(x) for x in args.additional_parameters if len(x.strip()) > 1 + ] + except Exception: + self.logger.error( + f"--additional_parameters {args.additional_parameters} is malformed - should be a dictionary" + ) + self.selpar = [] + try: + self.selpar = [ + json.loads(x) for x in args.selecttext_parameters if len(x.strip()) > 1 + ] + except Exception: + self.logger.error( + f"--selecttext_parameters {args.selecttext_parameters} is malformed - should be a dictionary" + ) + self.selfagpar = [] + try: + self.selflagpar = [ + json.loads(x) for x in args.selectflag_parameters if len(x.strip()) > 1 + ] + except Exception: + self.logger.error( + f"--selectflag_parameters {args.selecttext_parameters} is malformed - should be a dictionary" + ) + self.cleanuppar() + self.lastxclredirect = None + self.xmlcl = [] + self.is_positional = self.args.parampass == "positional" + self.is_embedded = self.args.parampass == "embedded" + if self.args.sysexe: + if " " in self.args.sysexe: + self.executeme = shlex.split(self.args.sysexe) + else: + self.executeme = [ + self.args.sysexe, + ] + else: + if self.args.packages: + self.executeme = [ + self.args.packages.split(",")[0].split(":")[0].strip(), + ] + else: + self.executeme = [] + aXCL = self.xmlcl.append + self.newtarpath = args.tested_tool_out + self.tinputs = gxtp.Inputs() + self.toutputs = gxtp.Outputs() + self.testparam = [] + if self.args.script_path: + self.prepScript() + else: + self.script = None + if self.args.cl_override != None: + scos = open(self.args.cl_override, "r").readlines() + self.cl_override = [x.rstrip() for x in scos] + else: + self.cl_override = None + if self.args.test_override != None: + stos = open(self.args.test_override, "r").readlines() + self.test_override = [x.rstrip() for x in stos] + else: + self.test_override = None + if self.args.cl_prefix != None: + scos = open(self.args.cl_prefix, "r").readlines() + self.cl_prefix = [x.rstrip() for x in scos] + else: + self.cl_prefix = None + if self.args.cl_suffix != None: + stos = open(self.args.cl_suffix, "r").readlines() + self.cl_suffix = [x.rstrip() for x in stos] + else: + self.cl_suffix = None + if self.args.script_path: + for ex in self.executeme: + if ex: + aXCL(ex) + aXCL("'$runme'") + else: + for ex in self.executeme: + aXCL(ex) + if self.args.parampass == "0": + self.clsimple() + elif self.args.parampass == "positional": + self.prepclpos() + self.clpositional() + elif self.args.parampass == "argparse": + self.prepargp() + self.clargparse() + elif self.args.parampass.startswith("embed"): + self.prepembed() + else: + logging.error( + "Parampass value %s not in 0, positional, argparse, embed or embednfmod" + % self.args.parampass + ) + logging.shutdown() + sys.exit(6) + + def clsimple(self): + """no parameters or repeats - uses < and > for i/o""" + aXCL = self.xmlcl.append + if len(self.infiles) > 0: + aXCL("<") + aXCL("'$%s'" % self.infiles[0]["infilename"]) + if len(self.outfiles) > 0: + aXCL(">") + aXCL("'$%s'" % self.outfiles[0]["name"]) + + def prepembed(self): + """fix self.script""" + scrip = self.script + if self.nfcoremod: + self.script = ( + '#set prefix = "%s"\n#set task_process = "%s"\n' + % (self.tool_name, self.tool_name) + + scrip + ) + self.xmlcl = [] # wipe anything there + aX = self.xmlcl.append + aX("") + if self.nfcoremod: + aX('#set prefix = "%s"' % self.tool_name) + aX('#set task_process = "%s"' % self.tool_name) + for p in self.collections: + aX("mkdir -p %s &&" % p["name"]) + aX("%s '$runme'" % self.args.sysexe) + + def prepargp(self): + xclsuffix = [] + for i, p in enumerate(self.infiles): + rep = p["required"] in ["optional1", "required1"] + req = p["required"] in ["required", "required1"] + nam = p["infilename"] + flag = p["CL"] + if p["origCL"].strip().upper() == "STDIN": + xappendme = [ + nam, + nam, + "< '$%s'" % nam, + ] + else: + xappendme = [p["CL"], "'$%s'" % p["CL"], ""] + xclsuffix.append(xappendme) + for i, p in enumerate(self.outfiles): + if p["origCL"].strip().upper() == "STDOUT": + self.lastxclredirect = [">", "'$%s'" % p["name"]] + else: + xclsuffix.append([p["name"], "'$%s'" % p["name"], ""]) + for p in self.addpar: + nam = p["name"] + val = p["value"] + flag = p["CL"] + rep = p.get("repeat", 0) == "1" + if rep: + over = f'#for $rep in $R_{nam}:\n--{nam} "$rep.{nam}"\n#end for' + else: + over = p.get("override", "") + if p["type"] == "clflag": + over = f'#if ${nam} == "set"\n --{flag}\n#end if' + xclsuffix.append([p["CL"], "'$%s'" % nam, over]) + for p in self.selpar: + xclsuffix.append([p["CL"], "'$%s'" % p["name"], p.get("override", "")]) + for p in self.selflagpar: + xclsuffix.append(["", "'$%s'" % p["name"], ""]) + for p in self.collections: + newname = p["name"] + xclsuffix.append([newname, "'%s'" % newname, ""]) + self.xclsuffix = xclsuffix + + def prepclpos(self): + xclsuffix = [] + for i, p in enumerate(self.infiles): + if p["origCL"].strip().upper() == "STDIN": + xappendme = [ + "999", + p["infilename"], + "< '$%s'" % p["infilename"], + ] + else: + xappendme = [p["CL"], "'$%s'" % p["infilename"], ""] + xclsuffix.append(xappendme) + for i, p in enumerate(self.outfiles): + if p["origCL"].strip().upper() == "STDOUT": + self.lastxclredirect = [">", "'$%s'" % p["name"]] + else: + xclsuffix.append([p["CL"], "'$%s'" % p["name"], ""]) + for p in self.addpar: + nam = p["name"] + rep = p.get("repeat", "0") == "1" # repeats make NO sense + if rep: + logger.warning( + f"### warning. Repeats for {nam} ignored - not permitted in positional parameter command lines!" + ) + over = p.get("override", "") + xclsuffix.append([p["CL"], "'$%s'" % nam, over]) + for p in self.selpar: + xclsuffix.append([p["CL"], "'$%s'" % p["name"], p.get("override", "")]) + for p in self.selflagpar: + xclsuffix.append(["", "'$%s'" % p["name"], ""]) + for p in self.collections: + newname = p["name"] + xclsuffix.append([newname, "'$%s'" % newname, ""]) + xclsuffix.sort() + self.xclsuffix = xclsuffix + + def prepScript(self): + s = open(self.args.script_path, "r").read() + ss = s.split("\n") + rxcheck = [x for x in ss if x.strip() > ""] + assert len(rxcheck) > 0, "Supplied script is empty. Cannot run" + if self.args.sysexe and self.args.parampass != "embed": + rxcheck.insert(0, "#raw") + rxcheck.append("#end raw") + self.script = "\n".join(rxcheck) + if len(self.executeme) > 0: + self.sfile = os.path.join( + self.repdir, "%s.%s.txt" % (self.tool_name, self.executeme[0]) + ) + else: + self.sfile = os.path.join( + self.repdir, "%s.script.txt" % (self.tool_name) + ) + tscript = open(self.sfile, "w") + tscript.write(self.script) + tscript.write("\n") + tscript.close() + self.spacedScript = [ + f" {x.replace('${','$ {')}" for x in ss if x.strip() > "" + ] + self.escapedScript = rxcheck + + def cleanuppar(self): + """positional parameters are complicated by their numeric ordinal""" + if self.args.parampass == "positional": + for i, p in enumerate(self.infiles): + assert ( + p["CL"].isdigit() or p["CL"].strip().upper() == "STDIN" + ), "Positional parameters must be ordinal integers - got %s for %s" % ( + p["CL"], + p["label"], + ) + for i, p in enumerate(self.outfiles): + assert ( + p["CL"].isdigit() or p["CL"].strip().upper() == "STDOUT" + ), "Positional parameters must be ordinal integers - got %s for %s" % ( + p["CL"], + p["name"], + ) + for i, p in enumerate(self.addpar): + assert p[ + "CL" + ].isdigit(), "Positional parameters must be ordinal integers - got %s for %s" % ( + p["CL"], + p["name"], + ) + for i, p in enumerate(self.infiles): + infp = copy.copy(p) + infp["origCL"] = infp["CL"] + if self.args.parampass in ["positional", "0"]: + infp["infilename"] = infp["label"].replace(" ", "_") + else: + infp["infilename"] = infp["CL"] + self.infiles[i] = infp + for i, p in enumerate(self.outfiles): + outfp = copy.copy(p) + outfp["origCL"] = outfp["CL"] # keep copy + if outfp.get("label", None) == None: + outfp["label"] = "" + self.outfiles[i] = outfp + for i, p in enumerate(self.addpar): + addp = copy.copy(p) + addp["origCL"] = addp["CL"] + self.addpar[i] = addp + for i, p in enumerate(self.collections): + addp = copy.copy(p) + addp["CL"] = addp["name"] + self.collections[i] = addp + + def clpositional(self): + # inputs in order then params + aXCL = self.xmlcl.append + for (k, v, koverride) in self.xclsuffix: + aXCL(v) + if self.lastxclredirect: + for cl in self.lastxclredirect: + aXCL(cl) + + def clargparse(self): + """argparse style""" + aXCL = self.xmlcl.append + # inputs then params in argparse named form + for (k, v, koverride) in self.xclsuffix: + if koverride > "": + k = koverride + aXCL(k) + else: + kl = len(k.strip()) + if kl == 0: + k = " " + elif kl == 1: + k = "-%s" % k + else: + k = "--%s" % k + aXCL(k) + aXCL(v) + if self.lastxclredirect: + for cl in self.lastxclredirect: + aXCL(cl) + + def getNdash(self, newname): + if self.is_positional: + ndash = 0 + else: + ndash = 2 + if len(newname) < 2: + ndash = 1 + return ndash + + def doXMLparam(self): # noqa + """Add all needed elements to tool""" + for p in self.outfiles: + newname = p["name"] + newfmt = p["format"] + newcl = p["CL"] + test = p["test"] + oldcl = p["origCL"] + test = test.strip() + filta = p.get("when", []) + lab = p.get("label", "") + if len(lab.strip()) == 0: + lab = newname + ndash = self.getNdash(newcl) + aparm = gxtp.OutputData( + name=newname, format=newfmt, num_dashes=ndash, label=lab + ) + if len(filta) > 0: + ofilta = gxtp.ChangeFormat() + for ( + whens + ) in filta: # when input=|image_type| value=|large_png| format=|png| + whenss = whens.replace("|", '"').replace("when ", "") + clauses = whenss.split() + for c in clauses: + if c.startswith("value"): + v = c.split("=")[1] + elif c.startswith("format"): + f = c.split("=")[1] + elif c.startswith("input"): + i = c.split("=")[1] + else: + print( + "bad when - need value=, format= and input=, got", whens + ) + owhen = gxtp.ChangeFormatWhen(format=f, input=i, value=v) + ofilta.append(owhen) + aparm.append(ofilta) + aparm.positional = self.is_positional + if self.is_positional: + if oldcl.upper() == "STDOUT": + aparm.positional = 9999999 + aparm.command_line_override = "> '$%s'" % newname + else: + aparm.positional = int(oldcl) + aparm.command_line_override = "'$%s'" % newname + self.toutputs.append(aparm) + ld = None + if test.strip() > "": + if test.strip().startswith("diff"): + c = "diff" + ld = 0 + if test.split(":")[1].isdigit: + ld = int(test.split(":")[1]) + tp = gxtp.TestOutput( + name=newname, + value="%s_sample" % newname, + compare=c, + lines_diff=ld, + ) + elif test.startswith("sim_size"): + c = "sim_size" + tn = test.split(":")[1].strip() + if tn > "": + if "." in tn: + delta = None + delta_frac = min(1.0, float(tn)) + else: + delta = int(tn) + delta_frac = None + tp = gxtp.TestOutput( + name=newname, + value="%s_sample" % newname, + compare=c, + delta=delta, + delta_frac=delta_frac, + ) + else: + c = test + tp = gxtp.TestOutput( + name=newname, + value="%s_sample" % newname, + compare=c, + ) + self.testparam.append(tp) + for p in self.infiles: + newname = p["infilename"] + newfmt = p["format"] + ndash = self.getNdash(newname) + reps = p.get("required", "") in ["optional1", "required1"] + isoptional = p.get("required", "") in ["optional", "optional1"] + if not len(p["label"]) > 0: + alab = p["CL"] + else: + alab = p["label"] + aninput = gxtp.DataParam( + newname, + optional=isoptional, + label=alab, + help=p["help"], + format=newfmt, + multiple=reps, + num_dashes=ndash, + ) + aninput.positional = self.is_positional + if self.is_positional: + if p["origCL"].upper() == "STDIN": + aninput.positional = 9999998 + aninput.command_line_override = "< '$%s'" % newname + else: + aninput.positional = int(p["origCL"]) + aninput.command_line_override = "'$%s'" % newname + self.tinputs.append(aninput) + tparm = gxtp.TestParam(newname, value="%s_sample" % newname) + self.testparam.append(tparm) + for p in self.addpar: + newname = p["name"] + newval = p.get("value", "") + newlabel = p["label"] + newhelp = p.get("help", "") + newtype = p.get("type", "?") + newcl = p["CL"] + oldcl = p["origCL"] + reps = p.get("repeat", "0") == "1" + if not len(newlabel) > 0: + newlabel = newname + ndash = self.getNdash(newname) + if newtype == "text": + aparm = gxtp.TextParam( + newname, + label=newlabel, + help=newhelp, + value=newval, + num_dashes=ndash, + ) + elif newtype == "integer": + aparm = gxtp.IntegerParam( + newname, + label=newlabel, + help=newhelp, + value=int(newval.replace("'", "").replace('"', "")), + num_dashes=ndash, + ) + elif newtype == "float": + aparm = gxtp.FloatParam( + newname, + label=newlabel, + help=newhelp, + value=float(newval.replace("'", "").replace('"', "")), + num_dashes=ndash, + ) + elif newtype == "boolean": + aparm = gxtp.BooleanParam( + newname, + label=newlabel, + help=newhelp, + value=newval, + num_dashes=ndash, + ) + elif newtype == "clflag": + initval = newval + aparm = gxtp.SelectParam( + newname, + label=newlabel, + help=newhelp, + num_dashes=ndash, + display="radio", + ) + anoptt = gxtp.SelectOption( + value="set", + text="Set this flag", + ) + anoptf = gxtp.SelectOption( + value="notset", + text="Do not set this flag", + ) + if p["value"] == "set": # make default same as form + aparm.append(anoptt) + aparm.append(anoptf) + else: + aparm.append(anoptf) + aparm.append(anoptt) + elif newtype == "datacolumn": + aparm = gxtp.TextParam( + newname, + type="data_column", + data_ref=p["dataref"], + multiple=(p["multiple"] == "1"), + label=newlabel, + help=newhelp, + value=newval, + num_dashes=ndash, + ) + else: + raise ValueError( + 'Unrecognised parameter type "%s" for \ + additional parameter %s in makeXML' + % (newtype, newname) + ) + aparm.positional = self.is_positional + if self.is_positional: + aparm.positional = int(oldcl) + if reps: + repe = gxtp.Repeat( + name=f"R_{newname}", + title=f"Any number of {newlabel} repeats are allowed", + ) + repe.append(aparm) + self.tinputs.append(repe) + tparm = gxtp.TestRepeat(name=f"R_{newname}") + tparm2 = gxtp.TestParam(newname, value=newval) + tparm.append(tparm2) + self.testparam.append(tparm) + else: + self.tinputs.append(aparm) + tparm = gxtp.TestParam(newname, value=newval) + self.testparam.append(tparm) + for p in self.selpar: + newname = p["name"] + newval = p.get("value", "") + newlabel = p["label"] + newhelp = p["help"] + newtype = p["type"] + newcl = p["CL"] + if not len(newlabel) > 0: + newlabel = newname + ndash = self.getNdash(newname) + if newtype == "selecttext": + newtext = p["texts"] + aparm = gxtp.SelectParam( + newname, + label=newlabel, + help=newhelp, + num_dashes=ndash, + ) + for i in range(len(newval)): + anopt = gxtp.SelectOption( + value=newval[i], + text=newtext[i], + ) + aparm.append(anopt) + aparm.positional = self.is_positional + if self.is_positional: + aparm.positional = int(newcl) + self.tinputs.append(aparm) + tparm = gxtp.TestParam(newname, value=newval[0]) + self.testparam.append(tparm) + else: + raise ValueError( + 'Unrecognised parameter type "%s" for\ + selecttext parameter %s in makeXML' + % (newtype, newname) + ) + for p in self.selflagpar: + newname = p["name"] + newval = p["value"] + newlabel = p["label"] + newhelp = p["help"] + newtype = p["type"] + newtext = p["texts"] + newcl = p["CL"] + if not len(newlabel) > 0: + newlabel = newname + aparm = gxtp.SelectParam( + newname, + label=newlabel, + help=newhelp, + num_dashes=0, + ) + for i in range(len(newval)): + anopt = gxtp.SelectOption( + value=newval[i], + text=newtext[i], + ) + aparm.append(anopt) + aparm.positional = self.is_positional + if self.is_positional: + aparm.positional = int(newcl) + self.tinputs.append(aparm) + tparm = gxtp.TestParam(newname, value=newval[0]) + self.testparam.append(tparm) + + def doNoXMLparam(self): + """filter style package - stdin to stdout""" + if len(self.infiles) > 0: + alab = self.infiles[0]["label"] + if len(alab) == 0: + alab = self.infiles[0]["infilename"] + max1s = ( + "Maximum one input if parampass is 0 but multiple input files supplied - %s" + % str(self.infiles) + ) + assert len(self.infiles) == 1, max1s + newname = self.infiles[0]["infilename"] + aninput = gxtp.DataParam( + newname, + optional=False, + label=alab, + help=self.infiles[0]["help"], + format=self.infiles[0]["format"], + multiple=False, + num_dashes=0, + ) + aninput.command_line_override = "< $%s" % newname + aninput.positional = True + self.tinputs.append(aninput) + tp = gxtp.TestParam(name=newname, value="%s_sample" % newname) + self.testparam.append(tp) + if len(self.outfiles) > 0: + newname = self.outfiles[0]["name"] + newfmt = self.outfiles[0]["format"] + anout = gxtp.OutputData(newname, format=newfmt, num_dashes=0) + anout.command_line_override = "> $%s" % newname + anout.positional = self.is_positional + self.toutputs.append(anout) + tp = gxtp.TestOutput(name=newname, value="%s_sample" % newname) + self.testparam.append(tp) + + def makeXML(self): # noqa + """ + Create a Galaxy xml tool wrapper for the new script + Uses galaxyhtml + Hmmm. How to get the command line into correct order... + """ + requirements = gxtp.Requirements() + self.condaenv = [] + if self.args.packages: + try: + for d in self.args.packages.split(","): + ver = None + packg = None + d = d.replace("==", ":") + d = d.replace("=", ":") + if ":" in d: + packg, ver = d.split(":")[:2] + ver = ver.strip() + packg = packg.strip() + self.tool_version = ver + else: + packg = d.strip() + ver = None + if ver == "": + ver = None + if packg: + requirements.append( + gxtp.Requirement("package", packg.strip(), ver) + ) + self.condaenv.append(d) + except Exception: + self.logger.error( + "### malformed packages string supplied - cannot parse = %s" + % self.args.packages + ) + sys.exit(2) + elif self.args.container: + requirements.append(gxtp.Requirement("container", self.args.container)) + self.newtool = gxt.Tool( + self.tool_name, + self.tool_id, + self.tool_version, + self.args.tool_desc, + "", + profile=self.profile, + ) + self.newtool.requirements = requirements + iXCL = self.xmlcl.insert + aXCL = self.xmlcl.append + if self.args.cl_prefix: # DIY CL start + self.xmlcl = self.cl_prefix + self.xmlcl + if self.args.cl_suffix: # DIY CL end + self.xmlcl += self.cl_suffix + if self.cl_override: + self.newtool.command_override = self.cl_override # config file + else: + self.newtool.command_override = self.xmlcl + self.cites = self.parse_citations() + cite = gxtp.Citations() + if self.cites and len(self.cites) > 0: + for c in self.cites: + acite = gxtp.Citation(type=c[0], value=c[1]) + cite.append(acite) + acite = gxtp.Citation(type="doi", value="10.1093/bioinformatics/bts573") + cite.append(acite) + self.newtool.citations = cite + safertext = "" + if self.args.help_text: + self.helptext = open(self.args.help_text, "r").readlines() + safertext = "\n".join([self.cheetah_escape(x) for x in self.helptext]) + if len(safertext.strip()) == 0: + safertext = ( + "Ask the tool author (%s) to rebuild with help text please\n" + % (self.args.user_email) + ) + if self.script_in_help and self.args.script_path: + if len(safertext) > 0: + safertext = safertext + "\n\n------\n" # transition allowed! + scr = [x for x in self.spacedScript if x.strip() > ""] + scr.insert(0, "\n\nScript::\n") + if len(scr) > 300: + scr = ( + scr[:100] + + [" >300 lines - stuff deleted", " ......"] + + scr[-100:] + ) + scr.append("\n") + safertext = safertext + "\n".join(scr) + self.newtool.help = " ".join(self.helptext) + for p in self.collections: + newkind = p["kind"] + newname = p["name"] + newlabel = p["label"] + newdisc = p["discover"] + collect = gxtp.OutputCollection(newname, label=newlabel, type=newkind) + disc = gxtp.DiscoverDatasets( + pattern=newdisc, directory=f"{newname}", visible="false" + ) + collect.append(disc) + self.toutputs.append(collect) + try: + tparm = gxtp.TestOutputCollection(newname) # broken until PR merged. + self.testparam.append(tparm) + except Exception: + logging.error( + "WARNING: Galaxyxml version does not have the PR merged yet - tests for collections must be over-ridden until then!" + ) + self.newtool.version_command = f'echo "{self.tool_version}"' + if self.args.parampass == "0": + self.doNoXMLparam() + else: + self.doXMLparam() + self.newtool.outputs = self.toutputs + self.newtool.inputs = self.tinputs + if self.args.script_path: + configfiles = gxtp.Configfiles() + configfiles.append(gxtp.Configfile(name="runme", text=self.script)) + self.newtool.configfiles = configfiles + tests = gxtp.Tests() + test_a = gxtp.Test() + for tp in self.testparam: + test_a.append(tp) + tests.append(test_a) + self.newtool.tests = tests + self.newtool.add_comment( + "Created by %s at %s using the Galaxy Tool Factory." + % (self.args.user_email, self.timenow()) + ) + self.newtool.add_comment("Source in git at: %s" % (self.toolFactoryURL)) + exml = self.newtool.export() + if ( + self.test_override + ): # cannot do this inside galaxyxml as it expects lxml objects for tests + part1 = exml.split("<tests>")[0] + part2 = exml.split("</tests>")[1] + fixed = "%s\n%s\n%s" % (part1, "\n".join(self.test_override), part2) + exml = fixed + with open(os.path.join(self.toold, "%s.xml" % self.tool_name), "w") as xf: + xf.write(exml) + xf.write("\n") + with open(os.path.join(self.repdir, "%s_xml.xml" % self.tool_name), "w") as xf: + xf.write(exml) + xf.write("\n") + + def writeShedyml(self): + """for planemo""" + yuser = self.args.user_email.split("@")[0] + yfname = os.path.join(self.toold, ".shed.yml") + yamlf = open(yfname, "w") + odict = { + "name": self.tool_name, + "owner": "fubar2", + "type": "unrestricted", + "description": "ToolFactory autogenerated tool", + "synopsis": self.args.tool_desc, + "category": "ToolFactory generated Tools", + } + yaml.dump(odict, yamlf, allow_unicode=True) + yamlf.close() + + def writeTFyml(self): + """for posterity""" + adict = {} + rargs = [ + "input_files", + "output_files", + "additional_parameters", + "selecttext_parameters", + "selectflag_parameters", + "xtra_files", + ] + args = vars(self.args) + for k in args.keys(): + if k not in rargs: + adict[k] = args.get(k, None) + else: + if adict.get(k, None): + adict[k].append(adict[k]) + else: + adict[k] = [args.get(k, None)] + adict["script"] = self.script + adict["help"] = self.helptext + yfname = os.path.join(self.repdir, "%s_ToolFactory.yml" % self.tool_name) + yf = open(yfname, "w") + yaml.dump(adict, yf) + yf.close() + + def saveTestdata(self, pname, testDataURL): + """ + may need to be ungzipped and in test folder + """ + res = 0 + localpath = os.path.join(self.tooltestd, "%s_sample" % pname) + print("#### save", testDataURL, "for", pname, "to", localpath) + if not os.path.exists(localpath): + cl = [ + "wget", + "--timeout", + "5", + "--tries", + "2", + "-O", + localpath, + testDataURL, + ] + if testDataURL.endswith(".gz"): # major kludge as usual... + gzlocalpath = "%s.gz" % localpath + cl = [ + "wget", + "-q", + "--timeout", + "5", + "--tries", + "2", + "-O", + gzlocalpath, + testDataURL, + "&&", + "rm", + "-f", + localpath, + "&&", + "gunzip", + gzlocalpath, + ] + p = subprocess.run(" ".join(cl), shell=True) + if p.returncode: + print("Got", p.returncode, "from executing", " ".join(cl)) + else: + print("Not re-downloading", localpath) + return res + + def makeTool(self): + """write xmls and input samples into place""" + if self.args.parampass == 0: + self.doNoXMLparam() + else: + self.makeXML() + if self.args.script_path and self.not_iuc: + stname = os.path.join(self.toold, os.path.split(self.sfile)[1]) + if not os.path.exists(stname): + shutil.copyfile(self.sfile, stname) + logger.info("Copied %s to %s" % (self.sfile, stname)) + for p in self.infiles: + paths = p["name"] + pname = p["CL"] + pathss = paths.split(",") + np = len(pathss) + if p.get("URL", None): + res = self.saveTestdata(pname, p["URL"]) + for i, pth in enumerate(pathss): + if os.path.exists(pth): + if np > 1: + dest = os.path.join( + self.tooltestd, "%s_%d_sample" % (p["infilename"], i + 1) + ) + else: + dest = os.path.join( + self.tooltestd, "%s_sample" % p["infilename"] + ) + shutil.copyfile(pth, dest) + logger.info("Copied %s to %s" % (pth, dest)) + else: + logger.info( + "Optional input path %s does not exist - not copied" % pth + ) + if self.extra_files and len(self.extra_files) > 0: + for xtra in self.extra_files: + fpath = xtra["fpath"] + dest = os.path.join(self.toold, xtra["fname"]) + shutil.copyfile(fpath, dest) + logger.info("Copied xtra file %s to %s" % (fpath, dest)) + shutil.copytree(self.toold, self.testdir, dirs_exist_ok=True) + + def makeToolTar(self, test_retcode=0): + """move outputs into test-data and prepare the tarball""" + excludeme = "tool_test_output" + + def exclude_function(tarinfo): + filename = tarinfo.name + return None if filename.startswith(excludeme) else tarinfo + + logger.info("makeToolTar starting with tool test retcode=%d\n" % test_retcode) + td = os.listdir(self.toold) + for f in td: + if f.startswith("tool_test_output"): + os.unlink(os.path.join(self.toold, f)) + if self.newtarpath: + tf = tarfile.open(self.newtarpath, "w:gz") + tf.add( + name=self.toold, + arcname=self.tool_name, + # filter=exclude_function, + ) + + def planemo_local_test(self): + """ + weird legacyversion error popping up again from package version upgrade in conda_util.py in the venv. + Seems ok if run as a shell script using the Galaxy installed planemo august 1st 2023 + """ + shutil.copytree(self.toold, self.testdir, dirs_exist_ok=True) + x = "%s.xml" % self.tool_name + xout = os.path.abspath(os.path.join(self.testdir, x)) + cl = [ + "planemo", + "test", + "--galaxy_admin_key", + self.GALAXY_ADMIN_KEY, + "--engine", + "external_galaxy", + "--update_test_data", + "--galaxy_url", + self.GALAXY_URL, + xout, + ] + clx = [ + "planemo", + "test", + "--galaxy_admin_key", + "[GALAXY_ADMIN_KEY]", + "--engine", + "external_galaxy", + "--update_test_data", + "--galaxy_url", + self.GALAXY_URL, + xout, + ] + logger.info("planemo_local_test executing: %s" % " ".join(clx)) + p = subprocess.run( + " ".join(cl), + timeout=90, + shell=True, + cwd=self.testdir, + capture_output=True, + check=True, + text=True, + ) + for errline in p.stderr.splitlines(): + logger.info("planemo: %s" % errline) + for errline in p.stdout.splitlines(): + logger.info("planemo: %s" % errline) + shutil.copytree(self.testdir, self.toold) + dest = self.repdir + src = self.tooltestd + logger.info("copying to %s to %s test_outs" % (src, dest)) + shutil.copytree(src, dest, dirs_exist_ok=True) + return p.returncode + + def fast_local_test(self): + """ + galaxy-tool-test -u http://localhost:8080 -a 1613612977827175424 -t tacrev -o local --publish-history + Seems to have a race condition when multiple jobs running. Works well - 15 secs or so if only onejob at a time! so job_conf fixed. + Failure will eventually get stuck. Might need a timeout in the script + """ + scrpt = os.path.join(self.args.toolfactory_dir, "toolfactory_fast_test.sh") + extrapaths = self.tooltestd + cl = ["/usr/bin/bash", scrpt, self.tool_name, extrapaths, extrapaths] + logger.info("fast_local_test executing %s \n" % (" ".join(cl))) + p = subprocess.run( + " ".join(cl), + shell=True, + cwd=self.testdir, + capture_output=True, + check=True, + text=True, + ) + for errline in p.stderr.splitlines(): + logger.info("ephemeris: %s" % errline) + for errline in p.stdout.splitlines(): + logger.info("ephemeris: %s" % errline) + shutil.copytree(self.testdir, self.toold, dirs_exist_ok=True) + dest = self.repdir + src = self.tooltestd + shutil.copytree(src, dest, dirs_exist_ok=True) + return p.returncode + + def update_toolconf(self, remove=False): + """tempting to recreate it from the local_tools directory each time + currently adds new tools if not there. + """ + + def sortchildrenby(parent, attr): + parent[:] = sorted(parent, key=lambda child: child.get(attr)) + + logger.info("Updating tool conf files for %s\n" % (self.tool_name)) + tcpath = self.local_tool_conf + xmlfile = os.path.join(self.tool_name, "%s.xml" % self.tool_name) + try: + parser = ET.XMLParser(remove_blank_text=True) + tree = ET.parse(tcpath, parser) + except ET.XMLSyntaxError: + logger.error( + "### Tool configuration update access error - %s cannot be parsed as xml by element tree\n" + % tcpath + ) + sys.exit(4) + root = tree.getroot() + hasTF = False + e = root.findall("section") + if len(e) > 0: + hasTF = True + TFsection = e[0] + if not hasTF: + TFsection = ET.Element( + "section", {"id": "localtools", "name": "Local Tools"} + ) + root.insert(0, TFsection) # at the top! + our_tools = TFsection.findall("tool") + conf_tools = [x.attrib["file"] for x in our_tools] + if not remove: + if xmlfile not in conf_tools: # new + ET.SubElement(TFsection, "tool", {"file": xmlfile}) + sortchildrenby(TFsection, "file") + tree.write(tcpath, pretty_print=True) + gi = galaxy.GalaxyInstance(url=self.GALAXY_URL, key=self.GALAXY_ADMIN_KEY) + toolready = False + now = time.time() + nloop = 5 + while nloop >= 0 and not toolready: + try: + res = gi.tools.show_tool(tool_id=self.tool_name) + toolready = True + logger.info( + "Tool %s ready after %f seconds - %s\n" + % (self.tool_name, time.time() - now, res) + ) + except ConnectionError: + nloop -= 1 + time.sleep(2) + logger.info("Connection error - waiting 2 seconds.\n") + if nloop < 1: + logger.error( + "Tool %s still not ready after %f seconds - please check the form and the generated xml for errors? \n" + % (self.tool_name, time.time() - now) + ) + return 2 + else: + return 0 + else: + if xmlfile in conf_tools: # remove + for rem in our_tools: + if rem.attrib["file"] == xmlfile: + rem.getparent().remove(rem) + self.logger.info( + "###=============== removed tool %s from %s" + % (xmlfile, tcpath) + ) + sortchildrenby(TFsection, "file") + tree.write(tcpath, pretty_print=True) + + def install_deps(self): + """ + use script to install new tool dependencies + """ + cll = [ + "sh", + "%s/install_tf_deps.sh" % self.args.toolfactory_dir, + self.tool_name, + ] + self.logger.info("Running %s\n" % " ".join(cll)) + try: + p = subprocess.run( + " ".join(cll), shell=True, capture_output=True, check=True, text=True + ) + for errline in p.stderr.splitlines(): + self.logger.info(errline) + return p.returncode + except: + return 1 + + def timenow(self): + """return current time as a string""" + return time.strftime("%d/%m/%Y %H:%M:%S", time.localtime(time.time())) + + def cheetah_escape(self, text): + """Produce entities within text.""" + cheetah_escape_table = {"$": "\\$", "#": "\\#"} + return "".join([cheetah_escape_table.get(c, c) for c in text]) + + def parse_citations(self): + """""" + if self.args.citations: + ct = open(self.args.citations, "r").read() + citations = [c.strip() for c in ct.split("**ENTRY**") if c.strip()] + citation_tuples = [] + for citation in citations: + if citation.startswith("doi"): + citation_tuples.append(("doi", citation[len("doi") :].strip())) + else: + citation_tuples.append( + ("bibtex", citation[len("bibtex") :].strip()) + ) + return citation_tuples + else: + return None + + +def main(): + """ + This is a Galaxy wrapper. + It expects to be called by a special purpose tool.xml + + """ + parser = argparse.ArgumentParser() + a = parser.add_argument + a("--nftest", action="store_true", default=False) + a("--script_path", default=None) + a("--sysexe", default=None) + a("--packages", default=None) + a("--tool_name", default="newtool") + a("--input_files", default=[], action="append") + a("--output_files", default=[], action="append") + a("--user_email", default="Unknown") + a("--bad_user", default=None) + a("--help_text", default=None) + a("--tool_desc", default=None) + a("--toolfactory_dir", default=None) + a("--tool_version", default="0.01") + a("--citations", default=None) + a("--cl_suffix", default=None) + a("--cl_prefix", default=None) + a("--cl_override", default=None) + a("--test_override", default=None) + a("--additional_parameters", action="append", default=[]) + a("--selecttext_parameters", action="append", default=[]) + a("--selectflag_parameters", action="append", default=[]) + a("--edit_additional_parameters", action="store_true", default=False) + a("--parampass", default="positional") + a("--tfcollection", default="toolgen") + a("--galaxy_root", default="/galaxy-central") + a("--collection", action="append", default=[]) + a("--include_tests", default=False, action="store_true") + a("--install_flag", action="store_true", default=False) + a("--admin_only", default=True, action="store_true") + a("--tested_tool_out", default=None) + a("--container", default=None, required=False) + a("--tool_conf_path", default="config/tool_conf.xml") # relative to $__root_dir__ + a( + "--xtra_files", + default=[], + action="append", + ) # history data items to add to the tool base directory + tfcl = sys.argv[1:] + args = parser.parse_args() + if args.admin_only: + assert not args.bad_user, ( + 'UNAUTHORISED: %s is NOT authorized to use this tool until Galaxy admin adds %s to "admin_users" in the galaxy.yml Galaxy configuration file' + % (args.bad_user, args.bad_user) + ) + assert ( + args.tool_name + ), "## This ToolFactory cannot build a tool without a tool name. Please supply one." + os.makedirs(args.tfcollection, exist_ok=True) + logfilename = os.path.join( + args.tfcollection, "ToolFactory_make_%s_log.txt" % args.tool_name + ) + logger.setLevel(logging.INFO) + fh = logging.FileHandler(logfilename, mode="w") + fformatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") + fh.setFormatter(fformatter) + logger.addHandler(fh) + tf = Tool_Factory(args) + tf.makeTool() + tf.writeShedyml() + # tf.writeTFyml()tf.writeTFyml() + tf.update_toolconf() + time.sleep(5) + if tf.condaenv and len(tf.condaenv) > 0: + res = tf.install_deps() + if res > 0: + logger.debug("Toolfactory installed deps failed") + logging.shutdown() + sys.exit(6) + time.sleep(2) + testret = tf.fast_local_test() # planemo_local_test() + if False and int(testret) > 0: + logger.error("ToolFactory tool build and test failed. :(") + logger.info( + "This is usually because the supplied script or dependency did not run correctly with the test inputs and parameter settings" + ) + logger.info("when tested with galaxy_tool_test. Error code:%d" % int(testret)) + logger.info( + "The 'i' (information) option shows how the ToolFactory was called, stderr and stdout, and what the command line was." + ) + logger.info( + "Expand (click on) any of the broken (red) history output titles to see that 'i' button and click it" + ) + logger.info( + "Make sure it is the same as your working test command line and double check that data files are coming from and going to where they should" + ) + logger.info( + "In the output collection, the tool xml <command> element must be the equivalent of your working command line for the test to work" + ) + logging.shutdown() + sys.exit(5) + else: + tf.makeToolTar(testret) + jcl = sys.argv[1:] + with open( + os.path.join( + args.tfcollection, "ToolFactory_%s_commandline.json" % args.tool_name + ), + "w", + ) as fout: + fout.write(" ".join(jcl)) + logging.shutdown() + + +if __name__ == "__main__": + main()