comparison toolfactory/toolfactory.py @ 1:0183cad9d13b draft

planemo upload
author fubar
date Thu, 22 Feb 2024 10:48:01 +0000
parents
children
comparison
equal deleted inserted replaced
0:2beaae16651e 1:0183cad9d13b
1 # see https://github.com/fubar2/toolfactory
2 #
3 # copyright ross lazarus (ross stop lazarus at gmail stop com) May 2012
4 #
5 # all rights reserved
6 # Licensed under the LGPL
7 # suggestions for improvement and bug fixes welcome at
8 # https://github.com/fubar2/toolfactory
9 #
10 # February 2023: Refactored to use galaxy-tool-test script in galaxyutil
11 # planemo not needed if tool is already installed.
12 # sqlite does not seem to work - switch to postgresql in the installation script
13 #
14 # march 2022: Refactored into two tools - generate and test/install
15 # as part of GTN tutorial development and biocontainer adoption
16 # The tester runs planemo on a non-tested archive, creates the test outputs
17 # and returns a new proper tool with test.
18
19
20 import argparse
21 import copy
22 import json
23 import logging
24 import os
25 import re
26 import shlex
27 import shutil
28 import subprocess
29 import sys
30 import tarfile
31 import tempfile
32 import time
33
34 from bioblend import galaxy
35 from bioblend import ConnectionError
36
37 import galaxyxml.tool as gxt
38 import galaxyxml.tool.parameters as gxtp
39
40 import lxml.etree as ET
41
42 import yaml
43
44
45 logger = logging.getLogger(__name__)
46
47
48 class Tool_Factory:
49 """Wrapper for an arbitrary script
50 uses galaxyxml
51 """
52
53 def __init__(self, args=None): # noqa
54 """
55 prepare command line cl for running the tool here
56 and prepare elements needed for galaxyxml tool generation
57 """
58 assert args.parampass in [
59 "0",
60 "embed",
61 "argparse",
62 "positional",
63 "embednfmod",
64 ], (
65 "args.parampass %s not 0,positional, embed, embednfmod or argparse"
66 % args.parampass
67 )
68 # sed will update these settings during tfsetup.py first run
69 self.GALAXY_ADMIN_KEY = "1718977735397126400"
70 self.GALAXY_URL = "http://localhost:8080"
71 self.profile = "22.05"
72 self.not_iuc = True
73 self.args = args
74 self.tool_version = self.args.tool_version
75 self.myversion = "V3.0 February 2023"
76 self.verbose = True
77 self.debug = True
78 self.toolFactoryURL = "https://github.com/fubar2/galaxy_tf_overlay"
79 self.logger = logging.getLogger(__name__)
80 self.nfcoremod = False
81 if args.parampass == "embednfmod":
82 self.nfcoremod = True
83 self.script_in_help = False # IUC recommendation
84 self.tool_name = re.sub("[^a-zA-Z0-9_]+", "", args.tool_name)
85 self.tool_id = self.tool_name
86 self.local_tools = os.path.realpath(
87 os.path.join(args.galaxy_root, "local_tools")
88 )
89 self.repdir = os.path.realpath(args.tfcollection)
90 self.testdir = os.path.join(self.repdir, self.tool_name)
91 self.toold = os.path.join(self.local_tools, self.tool_name)
92 self.tooltestd = os.path.join(self.toold, "test-data")
93 if self.nfcoremod:
94 self.local_tools = os.path.join(args.tfcollection, "tools")
95 self.repdir = os.path.join(args.tfcollection, "TFouts", self.tool_name)
96 self.toold = os.path.join(self.local_tools, self.tool_name)
97 self.tooltestd = os.path.join(self.toold, "test-data")
98 os.makedirs(self.repdir, exist_ok=True)
99 os.makedirs(self.toold, exist_ok=True)
100 os.makedirs(self.tooltestd, exist_ok=True)
101 os.makedirs(self.local_tools, exist_ok=True)
102 self.local_tool_conf = os.path.join(self.local_tools, "local_tool_conf.xml")
103 self.ourcwd = os.getcwd()
104 self.collections = []
105 if len(args.collection) > 0:
106 try:
107 self.collections = [
108 json.loads(x) for x in args.collection if len(x.strip()) > 1
109 ]
110 except Exception:
111 self.logger.error(
112 f"--collections parameter {str(args.collection)} is malformed - should be a dictionary"
113 )
114 self.infiles = []
115 try:
116 self.infiles = [
117 json.loads(x) for x in args.input_files if len(x.strip()) > 1
118 ]
119 except Exception:
120 self.logger.error(
121 f"--input_files parameter {str(args.input_files)} is malformed - should be a dictionary"
122 )
123 self.extra_files = []
124 if len(args.xtra_files) > 0:
125 try:
126 self.extra_files = [
127 json.loads(x) for x in args.xtra_files if len(x.strip()) > 1
128 ]
129 except Exception:
130 self.logger.error(
131 f"--xtra_files parameter {str(args.xtra_files)} is malformed - should be a dictionary"
132 )
133 self.outfiles = []
134 try:
135 self.outfiles = [
136 json.loads(x) for x in args.output_files if len(x.strip()) > 1
137 ]
138 except Exception:
139 self.logger.error(
140 f"--output_files parameter {args.output_files} is malformed - should be a dictionary"
141 )
142 assert (
143 len(self.outfiles) + len(self.collections)
144 ) > 0, "No outfiles or output collections specified. The Galaxy job runner will fail without an output of some sort"
145 self.addpar = []
146 try:
147 self.addpar = [
148 json.loads(x) for x in args.additional_parameters if len(x.strip()) > 1
149 ]
150 except Exception:
151 self.logger.error(
152 f"--additional_parameters {args.additional_parameters} is malformed - should be a dictionary"
153 )
154 self.selpar = []
155 try:
156 self.selpar = [
157 json.loads(x) for x in args.selecttext_parameters if len(x.strip()) > 1
158 ]
159 except Exception:
160 self.logger.error(
161 f"--selecttext_parameters {args.selecttext_parameters} is malformed - should be a dictionary"
162 )
163 self.selfagpar = []
164 try:
165 self.selflagpar = [
166 json.loads(x) for x in args.selectflag_parameters if len(x.strip()) > 1
167 ]
168 except Exception:
169 self.logger.error(
170 f"--selectflag_parameters {args.selecttext_parameters} is malformed - should be a dictionary"
171 )
172 self.cleanuppar()
173 self.lastxclredirect = None
174 self.xmlcl = []
175 self.is_positional = self.args.parampass == "positional"
176 self.is_embedded = self.args.parampass == "embedded"
177 if self.args.sysexe:
178 if " " in self.args.sysexe:
179 self.executeme = shlex.split(self.args.sysexe)
180 else:
181 self.executeme = [
182 self.args.sysexe,
183 ]
184 else:
185 if self.args.packages:
186 self.executeme = [
187 self.args.packages.split(",")[0].split(":")[0].strip(),
188 ]
189 else:
190 self.executeme = []
191 aXCL = self.xmlcl.append
192 self.newtarpath = args.tested_tool_out
193 self.tinputs = gxtp.Inputs()
194 self.toutputs = gxtp.Outputs()
195 self.testparam = []
196 if self.args.script_path:
197 self.prepScript()
198 else:
199 self.script = None
200 if self.args.cl_override != None:
201 scos = open(self.args.cl_override, "r").readlines()
202 self.cl_override = [x.rstrip() for x in scos]
203 else:
204 self.cl_override = None
205 if self.args.test_override != None:
206 stos = open(self.args.test_override, "r").readlines()
207 self.test_override = [x.rstrip() for x in stos]
208 else:
209 self.test_override = None
210 if self.args.cl_prefix != None:
211 scos = open(self.args.cl_prefix, "r").readlines()
212 self.cl_prefix = [x.rstrip() for x in scos]
213 else:
214 self.cl_prefix = None
215 if self.args.cl_suffix != None:
216 stos = open(self.args.cl_suffix, "r").readlines()
217 self.cl_suffix = [x.rstrip() for x in stos]
218 else:
219 self.cl_suffix = None
220 if self.args.script_path:
221 for ex in self.executeme:
222 if ex:
223 aXCL(ex)
224 aXCL("'$runme'")
225 else:
226 for ex in self.executeme:
227 aXCL(ex)
228 if self.args.parampass == "0":
229 self.clsimple()
230 elif self.args.parampass == "positional":
231 self.prepclpos()
232 self.clpositional()
233 elif self.args.parampass == "argparse":
234 self.prepargp()
235 self.clargparse()
236 elif self.args.parampass.startswith("embed"):
237 self.prepembed()
238 else:
239 logging.error(
240 "Parampass value %s not in 0, positional, argparse, embed or embednfmod"
241 % self.args.parampass
242 )
243 logging.shutdown()
244 sys.exit(6)
245
246 def clsimple(self):
247 """no parameters or repeats - uses < and > for i/o"""
248 aXCL = self.xmlcl.append
249 if len(self.infiles) > 0:
250 aXCL("<")
251 aXCL("'$%s'" % self.infiles[0]["infilename"])
252 if len(self.outfiles) > 0:
253 aXCL(">")
254 aXCL("'$%s'" % self.outfiles[0]["name"])
255
256 def prepembed(self):
257 """fix self.script"""
258 scrip = self.script
259 if self.nfcoremod:
260 self.script = (
261 '#set prefix = "%s"\n#set task_process = "%s"\n'
262 % (self.tool_name, self.tool_name)
263 + scrip
264 )
265 self.xmlcl = [] # wipe anything there
266 aX = self.xmlcl.append
267 aX("")
268 if self.nfcoremod:
269 aX('#set prefix = "%s"' % self.tool_name)
270 aX('#set task_process = "%s"' % self.tool_name)
271 for p in self.collections:
272 aX("mkdir -p %s &&" % p["name"])
273 aX("%s '$runme'" % self.args.sysexe)
274
275 def prepargp(self):
276 xclsuffix = []
277 for i, p in enumerate(self.infiles):
278 rep = p["required"] in ["optional1", "required1"]
279 req = p["required"] in ["required", "required1"]
280 nam = p["infilename"]
281 flag = p["CL"]
282 if p["origCL"].strip().upper() == "STDIN":
283 xappendme = [
284 nam,
285 nam,
286 "< '$%s'" % nam,
287 ]
288 else:
289 xappendme = [p["CL"], "'$%s'" % p["CL"], ""]
290 xclsuffix.append(xappendme)
291 for i, p in enumerate(self.outfiles):
292 if p["origCL"].strip().upper() == "STDOUT":
293 self.lastxclredirect = [">", "'$%s'" % p["name"]]
294 else:
295 xclsuffix.append([p["name"], "'$%s'" % p["name"], ""])
296 for p in self.addpar:
297 nam = p["name"]
298 val = p["value"]
299 flag = p["CL"]
300 rep = p.get("repeat", 0) == "1"
301 if rep:
302 over = f'#for $rep in $R_{nam}:\n--{nam} "$rep.{nam}"\n#end for'
303 else:
304 over = p.get("override", "")
305 if p["type"] == "clflag":
306 over = f'#if ${nam} == "set"\n --{flag}\n#end if'
307 xclsuffix.append([p["CL"], "'$%s'" % nam, over])
308 for p in self.selpar:
309 xclsuffix.append([p["CL"], "'$%s'" % p["name"], p.get("override", "")])
310 for p in self.selflagpar:
311 xclsuffix.append(["", "'$%s'" % p["name"], ""])
312 for p in self.collections:
313 newname = p["name"]
314 xclsuffix.append([newname, "'%s'" % newname, ""])
315 self.xclsuffix = xclsuffix
316
317 def prepclpos(self):
318 xclsuffix = []
319 for i, p in enumerate(self.infiles):
320 if p["origCL"].strip().upper() == "STDIN":
321 xappendme = [
322 "999",
323 p["infilename"],
324 "< '$%s'" % p["infilename"],
325 ]
326 else:
327 xappendme = [p["CL"], "'$%s'" % p["infilename"], ""]
328 xclsuffix.append(xappendme)
329 for i, p in enumerate(self.outfiles):
330 if p["origCL"].strip().upper() == "STDOUT":
331 self.lastxclredirect = [">", "'$%s'" % p["name"]]
332 else:
333 xclsuffix.append([p["CL"], "'$%s'" % p["name"], ""])
334 for p in self.addpar:
335 nam = p["name"]
336 rep = p.get("repeat", "0") == "1" # repeats make NO sense
337 if rep:
338 logger.warning(
339 f"### warning. Repeats for {nam} ignored - not permitted in positional parameter command lines!"
340 )
341 over = p.get("override", "")
342 xclsuffix.append([p["CL"], "'$%s'" % nam, over])
343 for p in self.selpar:
344 xclsuffix.append([p["CL"], "'$%s'" % p["name"], p.get("override", "")])
345 for p in self.selflagpar:
346 xclsuffix.append(["", "'$%s'" % p["name"], ""])
347 for p in self.collections:
348 newname = p["name"]
349 xclsuffix.append([newname, "'$%s'" % newname, ""])
350 xclsuffix.sort()
351 self.xclsuffix = xclsuffix
352
353 def prepScript(self):
354 s = open(self.args.script_path, "r").read()
355 ss = s.split("\n")
356 rxcheck = [x for x in ss if x.strip() > ""]
357 assert len(rxcheck) > 0, "Supplied script is empty. Cannot run"
358 if self.args.sysexe and self.args.parampass != "embed":
359 rxcheck.insert(0, "#raw")
360 rxcheck.append("#end raw")
361 self.script = "\n".join(rxcheck)
362 if len(self.executeme) > 0:
363 self.sfile = os.path.join(
364 self.repdir, "%s.%s.txt" % (self.tool_name, self.executeme[0])
365 )
366 else:
367 self.sfile = os.path.join(
368 self.repdir, "%s.script.txt" % (self.tool_name)
369 )
370 tscript = open(self.sfile, "w")
371 tscript.write(self.script)
372 tscript.write("\n")
373 tscript.close()
374 self.spacedScript = [
375 f" {x.replace('${','$ {')}" for x in ss if x.strip() > ""
376 ]
377 self.escapedScript = rxcheck
378
379 def cleanuppar(self):
380 """positional parameters are complicated by their numeric ordinal"""
381 if self.args.parampass == "positional":
382 for i, p in enumerate(self.infiles):
383 assert (
384 p["CL"].isdigit() or p["CL"].strip().upper() == "STDIN"
385 ), "Positional parameters must be ordinal integers - got %s for %s" % (
386 p["CL"],
387 p["label"],
388 )
389 for i, p in enumerate(self.outfiles):
390 assert (
391 p["CL"].isdigit() or p["CL"].strip().upper() == "STDOUT"
392 ), "Positional parameters must be ordinal integers - got %s for %s" % (
393 p["CL"],
394 p["name"],
395 )
396 for i, p in enumerate(self.addpar):
397 assert p[
398 "CL"
399 ].isdigit(), "Positional parameters must be ordinal integers - got %s for %s" % (
400 p["CL"],
401 p["name"],
402 )
403 for i, p in enumerate(self.infiles):
404 infp = copy.copy(p)
405 infp["origCL"] = infp["CL"]
406 if self.args.parampass in ["positional", "0"]:
407 infp["infilename"] = infp["label"].replace(" ", "_")
408 else:
409 infp["infilename"] = infp["CL"]
410 self.infiles[i] = infp
411 for i, p in enumerate(self.outfiles):
412 outfp = copy.copy(p)
413 outfp["origCL"] = outfp["CL"] # keep copy
414 if outfp.get("label", None) == None:
415 outfp["label"] = ""
416 self.outfiles[i] = outfp
417 for i, p in enumerate(self.addpar):
418 addp = copy.copy(p)
419 addp["origCL"] = addp["CL"]
420 self.addpar[i] = addp
421 for i, p in enumerate(self.collections):
422 addp = copy.copy(p)
423 addp["CL"] = addp["name"]
424 self.collections[i] = addp
425
426 def clpositional(self):
427 # inputs in order then params
428 aXCL = self.xmlcl.append
429 for (k, v, koverride) in self.xclsuffix:
430 aXCL(v)
431 if self.lastxclredirect:
432 for cl in self.lastxclredirect:
433 aXCL(cl)
434
435 def clargparse(self):
436 """argparse style"""
437 aXCL = self.xmlcl.append
438 # inputs then params in argparse named form
439 for (k, v, koverride) in self.xclsuffix:
440 if koverride > "":
441 k = koverride
442 aXCL(k)
443 else:
444 kl = len(k.strip())
445 if kl == 0:
446 k = " "
447 elif kl == 1:
448 k = "-%s" % k
449 else:
450 k = "--%s" % k
451 aXCL(k)
452 aXCL(v)
453 if self.lastxclredirect:
454 for cl in self.lastxclredirect:
455 aXCL(cl)
456
457 def getNdash(self, newname):
458 if self.is_positional:
459 ndash = 0
460 else:
461 ndash = 2
462 if len(newname) < 2:
463 ndash = 1
464 return ndash
465
466 def doXMLparam(self): # noqa
467 """Add all needed elements to tool"""
468 for p in self.outfiles:
469 newname = p["name"]
470 newfmt = p["format"]
471 newcl = p["CL"]
472 test = p["test"]
473 oldcl = p["origCL"]
474 test = test.strip()
475 filta = p.get("when", [])
476 lab = p.get("label", "")
477 if len(lab.strip()) == 0:
478 lab = newname
479 ndash = self.getNdash(newcl)
480 aparm = gxtp.OutputData(
481 name=newname, format=newfmt, num_dashes=ndash, label=lab
482 )
483 if len(filta) > 0:
484 ofilta = gxtp.ChangeFormat()
485 for (
486 whens
487 ) in filta: # when input=|image_type| value=|large_png| format=|png|
488 whenss = whens.replace("|", '"').replace("when ", "")
489 clauses = whenss.split()
490 for c in clauses:
491 if c.startswith("value"):
492 v = c.split("=")[1]
493 elif c.startswith("format"):
494 f = c.split("=")[1]
495 elif c.startswith("input"):
496 i = c.split("=")[1]
497 else:
498 print(
499 "bad when - need value=, format= and input=, got", whens
500 )
501 owhen = gxtp.ChangeFormatWhen(format=f, input=i, value=v)
502 ofilta.append(owhen)
503 aparm.append(ofilta)
504 aparm.positional = self.is_positional
505 if self.is_positional:
506 if oldcl.upper() == "STDOUT":
507 aparm.positional = 9999999
508 aparm.command_line_override = "> '$%s'" % newname
509 else:
510 aparm.positional = int(oldcl)
511 aparm.command_line_override = "'$%s'" % newname
512 self.toutputs.append(aparm)
513 ld = None
514 if test.strip() > "":
515 if test.strip().startswith("diff"):
516 c = "diff"
517 ld = 0
518 if test.split(":")[1].isdigit:
519 ld = int(test.split(":")[1])
520 tp = gxtp.TestOutput(
521 name=newname,
522 value="%s_sample" % newname,
523 compare=c,
524 lines_diff=ld,
525 )
526 elif test.startswith("sim_size"):
527 c = "sim_size"
528 tn = test.split(":")[1].strip()
529 if tn > "":
530 if "." in tn:
531 delta = None
532 delta_frac = min(1.0, float(tn))
533 else:
534 delta = int(tn)
535 delta_frac = None
536 tp = gxtp.TestOutput(
537 name=newname,
538 value="%s_sample" % newname,
539 compare=c,
540 delta=delta,
541 delta_frac=delta_frac,
542 )
543 else:
544 c = test
545 tp = gxtp.TestOutput(
546 name=newname,
547 value="%s_sample" % newname,
548 compare=c,
549 )
550 self.testparam.append(tp)
551 for p in self.infiles:
552 newname = p["infilename"]
553 newfmt = p["format"]
554 ndash = self.getNdash(newname)
555 reps = p.get("required", "") in ["optional1", "required1"]
556 isoptional = p.get("required", "") in ["optional", "optional1"]
557 if not len(p["label"]) > 0:
558 alab = p["CL"]
559 else:
560 alab = p["label"]
561 aninput = gxtp.DataParam(
562 newname,
563 optional=isoptional,
564 label=alab,
565 help=p["help"],
566 format=newfmt,
567 multiple=reps,
568 num_dashes=ndash,
569 )
570 aninput.positional = self.is_positional
571 if self.is_positional:
572 if p["origCL"].upper() == "STDIN":
573 aninput.positional = 9999998
574 aninput.command_line_override = "< '$%s'" % newname
575 else:
576 aninput.positional = int(p["origCL"])
577 aninput.command_line_override = "'$%s'" % newname
578 self.tinputs.append(aninput)
579 tparm = gxtp.TestParam(newname, value="%s_sample" % newname)
580 self.testparam.append(tparm)
581 for p in self.addpar:
582 newname = p["name"]
583 newval = p.get("value", "")
584 newlabel = p["label"]
585 newhelp = p.get("help", "")
586 newtype = p.get("type", "?")
587 newcl = p["CL"]
588 oldcl = p["origCL"]
589 reps = p.get("repeat", "0") == "1"
590 if not len(newlabel) > 0:
591 newlabel = newname
592 ndash = self.getNdash(newname)
593 if newtype == "text":
594 aparm = gxtp.TextParam(
595 newname,
596 label=newlabel,
597 help=newhelp,
598 value=newval,
599 num_dashes=ndash,
600 )
601 elif newtype == "integer":
602 aparm = gxtp.IntegerParam(
603 newname,
604 label=newlabel,
605 help=newhelp,
606 value=int(newval.replace("'", "").replace('"', "")),
607 num_dashes=ndash,
608 )
609 elif newtype == "float":
610 aparm = gxtp.FloatParam(
611 newname,
612 label=newlabel,
613 help=newhelp,
614 value=float(newval.replace("'", "").replace('"', "")),
615 num_dashes=ndash,
616 )
617 elif newtype == "boolean":
618 aparm = gxtp.BooleanParam(
619 newname,
620 label=newlabel,
621 help=newhelp,
622 value=newval,
623 num_dashes=ndash,
624 )
625 elif newtype == "clflag":
626 initval = newval
627 aparm = gxtp.SelectParam(
628 newname,
629 label=newlabel,
630 help=newhelp,
631 num_dashes=ndash,
632 display="radio",
633 )
634 anoptt = gxtp.SelectOption(
635 value="set",
636 text="Set this flag",
637 )
638 anoptf = gxtp.SelectOption(
639 value="notset",
640 text="Do not set this flag",
641 )
642 if p["value"] == "set": # make default same as form
643 aparm.append(anoptt)
644 aparm.append(anoptf)
645 else:
646 aparm.append(anoptf)
647 aparm.append(anoptt)
648 elif newtype == "datacolumn":
649 aparm = gxtp.TextParam(
650 newname,
651 type="data_column",
652 data_ref=p["dataref"],
653 multiple=(p["multiple"] == "1"),
654 label=newlabel,
655 help=newhelp,
656 value=newval,
657 num_dashes=ndash,
658 )
659 else:
660 raise ValueError(
661 'Unrecognised parameter type "%s" for \
662 additional parameter %s in makeXML'
663 % (newtype, newname)
664 )
665 aparm.positional = self.is_positional
666 if self.is_positional:
667 aparm.positional = int(oldcl)
668 if reps:
669 repe = gxtp.Repeat(
670 name=f"R_{newname}",
671 title=f"Any number of {newlabel} repeats are allowed",
672 )
673 repe.append(aparm)
674 self.tinputs.append(repe)
675 tparm = gxtp.TestRepeat(name=f"R_{newname}")
676 tparm2 = gxtp.TestParam(newname, value=newval)
677 tparm.append(tparm2)
678 self.testparam.append(tparm)
679 else:
680 self.tinputs.append(aparm)
681 tparm = gxtp.TestParam(newname, value=newval)
682 self.testparam.append(tparm)
683 for p in self.selpar:
684 newname = p["name"]
685 newval = p.get("value", "")
686 newlabel = p["label"]
687 newhelp = p["help"]
688 newtype = p["type"]
689 newcl = p["CL"]
690 if not len(newlabel) > 0:
691 newlabel = newname
692 ndash = self.getNdash(newname)
693 if newtype == "selecttext":
694 newtext = p["texts"]
695 aparm = gxtp.SelectParam(
696 newname,
697 label=newlabel,
698 help=newhelp,
699 num_dashes=ndash,
700 )
701 for i in range(len(newval)):
702 anopt = gxtp.SelectOption(
703 value=newval[i],
704 text=newtext[i],
705 )
706 aparm.append(anopt)
707 aparm.positional = self.is_positional
708 if self.is_positional:
709 aparm.positional = int(newcl)
710 self.tinputs.append(aparm)
711 tparm = gxtp.TestParam(newname, value=newval[0])
712 self.testparam.append(tparm)
713 else:
714 raise ValueError(
715 'Unrecognised parameter type "%s" for\
716 selecttext parameter %s in makeXML'
717 % (newtype, newname)
718 )
719 for p in self.selflagpar:
720 newname = p["name"]
721 newval = p["value"]
722 newlabel = p["label"]
723 newhelp = p["help"]
724 newtype = p["type"]
725 newtext = p["texts"]
726 newcl = p["CL"]
727 if not len(newlabel) > 0:
728 newlabel = newname
729 aparm = gxtp.SelectParam(
730 newname,
731 label=newlabel,
732 help=newhelp,
733 num_dashes=0,
734 )
735 for i in range(len(newval)):
736 anopt = gxtp.SelectOption(
737 value=newval[i],
738 text=newtext[i],
739 )
740 aparm.append(anopt)
741 aparm.positional = self.is_positional
742 if self.is_positional:
743 aparm.positional = int(newcl)
744 self.tinputs.append(aparm)
745 tparm = gxtp.TestParam(newname, value=newval[0])
746 self.testparam.append(tparm)
747
748 def doNoXMLparam(self):
749 """filter style package - stdin to stdout"""
750 if len(self.infiles) > 0:
751 alab = self.infiles[0]["label"]
752 if len(alab) == 0:
753 alab = self.infiles[0]["infilename"]
754 max1s = (
755 "Maximum one input if parampass is 0 but multiple input files supplied - %s"
756 % str(self.infiles)
757 )
758 assert len(self.infiles) == 1, max1s
759 newname = self.infiles[0]["infilename"]
760 aninput = gxtp.DataParam(
761 newname,
762 optional=False,
763 label=alab,
764 help=self.infiles[0]["help"],
765 format=self.infiles[0]["format"],
766 multiple=False,
767 num_dashes=0,
768 )
769 aninput.command_line_override = "< $%s" % newname
770 aninput.positional = True
771 self.tinputs.append(aninput)
772 tp = gxtp.TestParam(name=newname, value="%s_sample" % newname)
773 self.testparam.append(tp)
774 if len(self.outfiles) > 0:
775 newname = self.outfiles[0]["name"]
776 newfmt = self.outfiles[0]["format"]
777 anout = gxtp.OutputData(newname, format=newfmt, num_dashes=0)
778 anout.command_line_override = "> $%s" % newname
779 anout.positional = self.is_positional
780 self.toutputs.append(anout)
781 tp = gxtp.TestOutput(name=newname, value="%s_sample" % newname)
782 self.testparam.append(tp)
783
784 def makeXML(self): # noqa
785 """
786 Create a Galaxy xml tool wrapper for the new script
787 Uses galaxyhtml
788 Hmmm. How to get the command line into correct order...
789 """
790 requirements = gxtp.Requirements()
791 self.condaenv = []
792 if self.args.packages:
793 try:
794 for d in self.args.packages.split(","):
795 ver = None
796 packg = None
797 d = d.replace("==", ":")
798 d = d.replace("=", ":")
799 if ":" in d:
800 packg, ver = d.split(":")[:2]
801 ver = ver.strip()
802 packg = packg.strip()
803 self.tool_version = ver
804 else:
805 packg = d.strip()
806 ver = None
807 if ver == "":
808 ver = None
809 if packg:
810 requirements.append(
811 gxtp.Requirement("package", packg.strip(), ver)
812 )
813 self.condaenv.append(d)
814 except Exception:
815 self.logger.error(
816 "### malformed packages string supplied - cannot parse = %s"
817 % self.args.packages
818 )
819 sys.exit(2)
820 elif self.args.container:
821 requirements.append(gxtp.Requirement("container", self.args.container))
822 self.newtool = gxt.Tool(
823 self.tool_name,
824 self.tool_id,
825 self.tool_version,
826 self.args.tool_desc,
827 "",
828 profile=self.profile,
829 )
830 self.newtool.requirements = requirements
831 iXCL = self.xmlcl.insert
832 aXCL = self.xmlcl.append
833 if self.args.cl_prefix: # DIY CL start
834 self.xmlcl = self.cl_prefix + self.xmlcl
835 if self.args.cl_suffix: # DIY CL end
836 self.xmlcl += self.cl_suffix
837 if self.cl_override:
838 self.newtool.command_override = self.cl_override # config file
839 else:
840 self.newtool.command_override = self.xmlcl
841 self.cites = self.parse_citations()
842 cite = gxtp.Citations()
843 if self.cites and len(self.cites) > 0:
844 for c in self.cites:
845 acite = gxtp.Citation(type=c[0], value=c[1])
846 cite.append(acite)
847 acite = gxtp.Citation(type="doi", value="10.1093/bioinformatics/bts573")
848 cite.append(acite)
849 self.newtool.citations = cite
850 safertext = ""
851 if self.args.help_text:
852 self.helptext = open(self.args.help_text, "r").readlines()
853 safertext = "\n".join([self.cheetah_escape(x) for x in self.helptext])
854 if len(safertext.strip()) == 0:
855 safertext = (
856 "Ask the tool author (%s) to rebuild with help text please\n"
857 % (self.args.user_email)
858 )
859 if self.script_in_help and self.args.script_path:
860 if len(safertext) > 0:
861 safertext = safertext + "\n\n------\n" # transition allowed!
862 scr = [x for x in self.spacedScript if x.strip() > ""]
863 scr.insert(0, "\n\nScript::\n")
864 if len(scr) > 300:
865 scr = (
866 scr[:100]
867 + [" >300 lines - stuff deleted", " ......"]
868 + scr[-100:]
869 )
870 scr.append("\n")
871 safertext = safertext + "\n".join(scr)
872 self.newtool.help = " ".join(self.helptext)
873 for p in self.collections:
874 newkind = p["kind"]
875 newname = p["name"]
876 newlabel = p["label"]
877 newdisc = p["discover"]
878 collect = gxtp.OutputCollection(newname, label=newlabel, type=newkind)
879 disc = gxtp.DiscoverDatasets(
880 pattern=newdisc, directory=f"{newname}", visible="false"
881 )
882 collect.append(disc)
883 self.toutputs.append(collect)
884 try:
885 tparm = gxtp.TestOutputCollection(newname) # broken until PR merged.
886 self.testparam.append(tparm)
887 except Exception:
888 logging.error(
889 "WARNING: Galaxyxml version does not have the PR merged yet - tests for collections must be over-ridden until then!"
890 )
891 self.newtool.version_command = f'echo "{self.tool_version}"'
892 if self.args.parampass == "0":
893 self.doNoXMLparam()
894 else:
895 self.doXMLparam()
896 self.newtool.outputs = self.toutputs
897 self.newtool.inputs = self.tinputs
898 if self.args.script_path:
899 configfiles = gxtp.Configfiles()
900 configfiles.append(gxtp.Configfile(name="runme", text=self.script))
901 self.newtool.configfiles = configfiles
902 tests = gxtp.Tests()
903 test_a = gxtp.Test()
904 for tp in self.testparam:
905 test_a.append(tp)
906 tests.append(test_a)
907 self.newtool.tests = tests
908 self.newtool.add_comment(
909 "Created by %s at %s using the Galaxy Tool Factory."
910 % (self.args.user_email, self.timenow())
911 )
912 self.newtool.add_comment("Source in git at: %s" % (self.toolFactoryURL))
913 exml = self.newtool.export()
914 if (
915 self.test_override
916 ): # cannot do this inside galaxyxml as it expects lxml objects for tests
917 part1 = exml.split("<tests>")[0]
918 part2 = exml.split("</tests>")[1]
919 fixed = "%s\n%s\n%s" % (part1, "\n".join(self.test_override), part2)
920 exml = fixed
921 with open(os.path.join(self.toold, "%s.xml" % self.tool_name), "w") as xf:
922 xf.write(exml)
923 xf.write("\n")
924 with open(os.path.join(self.repdir, "%s_xml.xml" % self.tool_name), "w") as xf:
925 xf.write(exml)
926 xf.write("\n")
927
928 def writeShedyml(self):
929 """for planemo"""
930 yuser = self.args.user_email.split("@")[0]
931 yfname = os.path.join(self.toold, ".shed.yml")
932 yamlf = open(yfname, "w")
933 odict = {
934 "name": self.tool_name,
935 "owner": "fubar2",
936 "type": "unrestricted",
937 "description": "ToolFactory autogenerated tool",
938 "synopsis": self.args.tool_desc,
939 "category": "ToolFactory generated Tools",
940 }
941 yaml.dump(odict, yamlf, allow_unicode=True)
942 yamlf.close()
943
944 def writeTFyml(self):
945 """for posterity"""
946 adict = {}
947 rargs = [
948 "input_files",
949 "output_files",
950 "additional_parameters",
951 "selecttext_parameters",
952 "selectflag_parameters",
953 "xtra_files",
954 ]
955 args = vars(self.args)
956 for k in args.keys():
957 if k not in rargs:
958 adict[k] = args.get(k, None)
959 else:
960 if adict.get(k, None):
961 adict[k].append(adict[k])
962 else:
963 adict[k] = [args.get(k, None)]
964 adict["script"] = self.script
965 adict["help"] = self.helptext
966 yfname = os.path.join(self.repdir, "%s_ToolFactory.yml" % self.tool_name)
967 yf = open(yfname, "w")
968 yaml.dump(adict, yf)
969 yf.close()
970
971 def saveTestdata(self, pname, testDataURL):
972 """
973 may need to be ungzipped and in test folder
974 """
975 res = 0
976 localpath = os.path.join(self.tooltestd, "%s_sample" % pname)
977 print("#### save", testDataURL, "for", pname, "to", localpath)
978 if not os.path.exists(localpath):
979 cl = [
980 "wget",
981 "--timeout",
982 "5",
983 "--tries",
984 "2",
985 "-O",
986 localpath,
987 testDataURL,
988 ]
989 if testDataURL.endswith(".gz"): # major kludge as usual...
990 gzlocalpath = "%s.gz" % localpath
991 cl = [
992 "wget",
993 "-q",
994 "--timeout",
995 "5",
996 "--tries",
997 "2",
998 "-O",
999 gzlocalpath,
1000 testDataURL,
1001 "&&",
1002 "rm",
1003 "-f",
1004 localpath,
1005 "&&",
1006 "gunzip",
1007 gzlocalpath,
1008 ]
1009 p = subprocess.run(" ".join(cl), shell=True)
1010 if p.returncode:
1011 print("Got", p.returncode, "from executing", " ".join(cl))
1012 else:
1013 print("Not re-downloading", localpath)
1014 return res
1015
1016 def makeTool(self):
1017 """write xmls and input samples into place"""
1018 if self.args.parampass == 0:
1019 self.doNoXMLparam()
1020 else:
1021 self.makeXML()
1022 if self.args.script_path and self.not_iuc:
1023 stname = os.path.join(self.toold, os.path.split(self.sfile)[1])
1024 if not os.path.exists(stname):
1025 shutil.copyfile(self.sfile, stname)
1026 logger.info("Copied %s to %s" % (self.sfile, stname))
1027 for p in self.infiles:
1028 paths = p["name"]
1029 pname = p["CL"]
1030 pathss = paths.split(",")
1031 np = len(pathss)
1032 if p.get("URL", None):
1033 res = self.saveTestdata(pname, p["URL"])
1034 for i, pth in enumerate(pathss):
1035 if os.path.exists(pth):
1036 if np > 1:
1037 dest = os.path.join(
1038 self.tooltestd, "%s_%d_sample" % (p["infilename"], i + 1)
1039 )
1040 else:
1041 dest = os.path.join(
1042 self.tooltestd, "%s_sample" % p["infilename"]
1043 )
1044 shutil.copyfile(pth, dest)
1045 logger.info("Copied %s to %s" % (pth, dest))
1046 else:
1047 logger.info(
1048 "Optional input path %s does not exist - not copied" % pth
1049 )
1050 if self.extra_files and len(self.extra_files) > 0:
1051 for xtra in self.extra_files:
1052 fpath = xtra["fpath"]
1053 dest = os.path.join(self.toold, xtra["fname"])
1054 shutil.copyfile(fpath, dest)
1055 logger.info("Copied xtra file %s to %s" % (fpath, dest))
1056 shutil.copytree(self.toold, self.testdir, dirs_exist_ok=True)
1057
1058 def makeToolTar(self, test_retcode=0):
1059 """move outputs into test-data and prepare the tarball"""
1060 excludeme = "tool_test_output"
1061
1062 def exclude_function(tarinfo):
1063 filename = tarinfo.name
1064 return None if filename.startswith(excludeme) else tarinfo
1065
1066 logger.info("makeToolTar starting with tool test retcode=%d\n" % test_retcode)
1067 td = os.listdir(self.toold)
1068 for f in td:
1069 if f.startswith("tool_test_output"):
1070 os.unlink(os.path.join(self.toold, f))
1071 if self.newtarpath:
1072 tf = tarfile.open(self.newtarpath, "w:gz")
1073 tf.add(
1074 name=self.toold,
1075 arcname=self.tool_name,
1076 # filter=exclude_function,
1077 )
1078
1079 def planemo_local_test(self):
1080 """
1081 weird legacyversion error popping up again from package version upgrade in conda_util.py in the venv.
1082 Seems ok if run as a shell script using the Galaxy installed planemo august 1st 2023
1083 """
1084 shutil.copytree(self.toold, self.testdir, dirs_exist_ok=True)
1085 x = "%s.xml" % self.tool_name
1086 xout = os.path.abspath(os.path.join(self.testdir, x))
1087 cl = [
1088 "planemo",
1089 "test",
1090 "--galaxy_admin_key",
1091 self.GALAXY_ADMIN_KEY,
1092 "--engine",
1093 "external_galaxy",
1094 "--update_test_data",
1095 "--galaxy_url",
1096 self.GALAXY_URL,
1097 xout,
1098 ]
1099 clx = [
1100 "planemo",
1101 "test",
1102 "--galaxy_admin_key",
1103 "[GALAXY_ADMIN_KEY]",
1104 "--engine",
1105 "external_galaxy",
1106 "--update_test_data",
1107 "--galaxy_url",
1108 self.GALAXY_URL,
1109 xout,
1110 ]
1111 logger.info("planemo_local_test executing: %s" % " ".join(clx))
1112 p = subprocess.run(
1113 " ".join(cl),
1114 timeout=90,
1115 shell=True,
1116 cwd=self.testdir,
1117 capture_output=True,
1118 check=True,
1119 text=True,
1120 )
1121 for errline in p.stderr.splitlines():
1122 logger.info("planemo: %s" % errline)
1123 for errline in p.stdout.splitlines():
1124 logger.info("planemo: %s" % errline)
1125 shutil.copytree(self.testdir, self.toold)
1126 dest = self.repdir
1127 src = self.tooltestd
1128 logger.info("copying to %s to %s test_outs" % (src, dest))
1129 shutil.copytree(src, dest, dirs_exist_ok=True)
1130 return p.returncode
1131
1132 def fast_local_test(self):
1133 """
1134 galaxy-tool-test -u http://localhost:8080 -a 1613612977827175424 -t tacrev -o local --publish-history
1135 Seems to have a race condition when multiple jobs running. Works well - 15 secs or so if only onejob at a time! so job_conf fixed.
1136 Failure will eventually get stuck. Might need a timeout in the script
1137 """
1138 scrpt = os.path.join(self.args.toolfactory_dir, "toolfactory_fast_test.sh")
1139 extrapaths = self.tooltestd
1140 cl = ["/usr/bin/bash", scrpt, self.tool_name, extrapaths, extrapaths]
1141 logger.info("fast_local_test executing %s \n" % (" ".join(cl)))
1142 p = subprocess.run(
1143 " ".join(cl),
1144 shell=True,
1145 cwd=self.testdir,
1146 capture_output=True,
1147 check=True,
1148 text=True,
1149 )
1150 for errline in p.stderr.splitlines():
1151 logger.info("ephemeris: %s" % errline)
1152 for errline in p.stdout.splitlines():
1153 logger.info("ephemeris: %s" % errline)
1154 shutil.copytree(self.testdir, self.toold, dirs_exist_ok=True)
1155 dest = self.repdir
1156 src = self.tooltestd
1157 shutil.copytree(src, dest, dirs_exist_ok=True)
1158 return p.returncode
1159
1160 def update_toolconf(self, remove=False):
1161 """tempting to recreate it from the local_tools directory each time
1162 currently adds new tools if not there.
1163 """
1164
1165 def sortchildrenby(parent, attr):
1166 parent[:] = sorted(parent, key=lambda child: child.get(attr))
1167
1168 logger.info("Updating tool conf files for %s\n" % (self.tool_name))
1169 tcpath = self.local_tool_conf
1170 xmlfile = os.path.join(self.tool_name, "%s.xml" % self.tool_name)
1171 try:
1172 parser = ET.XMLParser(remove_blank_text=True)
1173 tree = ET.parse(tcpath, parser)
1174 except ET.XMLSyntaxError:
1175 logger.error(
1176 "### Tool configuration update access error - %s cannot be parsed as xml by element tree\n"
1177 % tcpath
1178 )
1179 sys.exit(4)
1180 root = tree.getroot()
1181 hasTF = False
1182 e = root.findall("section")
1183 if len(e) > 0:
1184 hasTF = True
1185 TFsection = e[0]
1186 if not hasTF:
1187 TFsection = ET.Element(
1188 "section", {"id": "localtools", "name": "Local Tools"}
1189 )
1190 root.insert(0, TFsection) # at the top!
1191 our_tools = TFsection.findall("tool")
1192 conf_tools = [x.attrib["file"] for x in our_tools]
1193 if not remove:
1194 if xmlfile not in conf_tools: # new
1195 ET.SubElement(TFsection, "tool", {"file": xmlfile})
1196 sortchildrenby(TFsection, "file")
1197 tree.write(tcpath, pretty_print=True)
1198 gi = galaxy.GalaxyInstance(url=self.GALAXY_URL, key=self.GALAXY_ADMIN_KEY)
1199 toolready = False
1200 now = time.time()
1201 nloop = 5
1202 while nloop >= 0 and not toolready:
1203 try:
1204 res = gi.tools.show_tool(tool_id=self.tool_name)
1205 toolready = True
1206 logger.info(
1207 "Tool %s ready after %f seconds - %s\n"
1208 % (self.tool_name, time.time() - now, res)
1209 )
1210 except ConnectionError:
1211 nloop -= 1
1212 time.sleep(2)
1213 logger.info("Connection error - waiting 2 seconds.\n")
1214 if nloop < 1:
1215 logger.error(
1216 "Tool %s still not ready after %f seconds - please check the form and the generated xml for errors? \n"
1217 % (self.tool_name, time.time() - now)
1218 )
1219 return 2
1220 else:
1221 return 0
1222 else:
1223 if xmlfile in conf_tools: # remove
1224 for rem in our_tools:
1225 if rem.attrib["file"] == xmlfile:
1226 rem.getparent().remove(rem)
1227 self.logger.info(
1228 "###=============== removed tool %s from %s"
1229 % (xmlfile, tcpath)
1230 )
1231 sortchildrenby(TFsection, "file")
1232 tree.write(tcpath, pretty_print=True)
1233
1234 def install_deps(self):
1235 """
1236 use script to install new tool dependencies
1237 """
1238 cll = [
1239 "sh",
1240 "%s/install_tf_deps.sh" % self.args.toolfactory_dir,
1241 self.tool_name,
1242 ]
1243 self.logger.info("Running %s\n" % " ".join(cll))
1244 try:
1245 p = subprocess.run(
1246 " ".join(cll), shell=True, capture_output=True, check=True, text=True
1247 )
1248 for errline in p.stderr.splitlines():
1249 self.logger.info(errline)
1250 return p.returncode
1251 except:
1252 return 1
1253
1254 def timenow(self):
1255 """return current time as a string"""
1256 return time.strftime("%d/%m/%Y %H:%M:%S", time.localtime(time.time()))
1257
1258 def cheetah_escape(self, text):
1259 """Produce entities within text."""
1260 cheetah_escape_table = {"$": "\\$", "#": "\\#"}
1261 return "".join([cheetah_escape_table.get(c, c) for c in text])
1262
1263 def parse_citations(self):
1264 """"""
1265 if self.args.citations:
1266 ct = open(self.args.citations, "r").read()
1267 citations = [c.strip() for c in ct.split("**ENTRY**") if c.strip()]
1268 citation_tuples = []
1269 for citation in citations:
1270 if citation.startswith("doi"):
1271 citation_tuples.append(("doi", citation[len("doi") :].strip()))
1272 else:
1273 citation_tuples.append(
1274 ("bibtex", citation[len("bibtex") :].strip())
1275 )
1276 return citation_tuples
1277 else:
1278 return None
1279
1280
1281 def main():
1282 """
1283 This is a Galaxy wrapper.
1284 It expects to be called by a special purpose tool.xml
1285
1286 """
1287 parser = argparse.ArgumentParser()
1288 a = parser.add_argument
1289 a("--nftest", action="store_true", default=False)
1290 a("--script_path", default=None)
1291 a("--sysexe", default=None)
1292 a("--packages", default=None)
1293 a("--tool_name", default="newtool")
1294 a("--input_files", default=[], action="append")
1295 a("--output_files", default=[], action="append")
1296 a("--user_email", default="Unknown")
1297 a("--bad_user", default=None)
1298 a("--help_text", default=None)
1299 a("--tool_desc", default=None)
1300 a("--toolfactory_dir", default=None)
1301 a("--tool_version", default="0.01")
1302 a("--citations", default=None)
1303 a("--cl_suffix", default=None)
1304 a("--cl_prefix", default=None)
1305 a("--cl_override", default=None)
1306 a("--test_override", default=None)
1307 a("--additional_parameters", action="append", default=[])
1308 a("--selecttext_parameters", action="append", default=[])
1309 a("--selectflag_parameters", action="append", default=[])
1310 a("--edit_additional_parameters", action="store_true", default=False)
1311 a("--parampass", default="positional")
1312 a("--tfcollection", default="toolgen")
1313 a("--galaxy_root", default="/galaxy-central")
1314 a("--collection", action="append", default=[])
1315 a("--include_tests", default=False, action="store_true")
1316 a("--install_flag", action="store_true", default=False)
1317 a("--admin_only", default=True, action="store_true")
1318 a("--tested_tool_out", default=None)
1319 a("--container", default=None, required=False)
1320 a("--tool_conf_path", default="config/tool_conf.xml") # relative to $__root_dir__
1321 a(
1322 "--xtra_files",
1323 default=[],
1324 action="append",
1325 ) # history data items to add to the tool base directory
1326 tfcl = sys.argv[1:]
1327 args = parser.parse_args()
1328 if args.admin_only:
1329 assert not args.bad_user, (
1330 'UNAUTHORISED: %s is NOT authorized to use this tool until Galaxy admin adds %s to "admin_users" in the galaxy.yml Galaxy configuration file'
1331 % (args.bad_user, args.bad_user)
1332 )
1333 assert (
1334 args.tool_name
1335 ), "## This ToolFactory cannot build a tool without a tool name. Please supply one."
1336 os.makedirs(args.tfcollection, exist_ok=True)
1337 logfilename = os.path.join(
1338 args.tfcollection, "ToolFactory_make_%s_log.txt" % args.tool_name
1339 )
1340 logger.setLevel(logging.INFO)
1341 fh = logging.FileHandler(logfilename, mode="w")
1342 fformatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
1343 fh.setFormatter(fformatter)
1344 logger.addHandler(fh)
1345 tf = Tool_Factory(args)
1346 tf.makeTool()
1347 tf.writeShedyml()
1348 # tf.writeTFyml()tf.writeTFyml()
1349 tf.update_toolconf()
1350 time.sleep(5)
1351 if tf.condaenv and len(tf.condaenv) > 0:
1352 res = tf.install_deps()
1353 if res > 0:
1354 logger.debug("Toolfactory installed deps failed")
1355 logging.shutdown()
1356 sys.exit(6)
1357 time.sleep(2)
1358 testret = tf.fast_local_test() # planemo_local_test()
1359 if False and int(testret) > 0:
1360 logger.error("ToolFactory tool build and test failed. :(")
1361 logger.info(
1362 "This is usually because the supplied script or dependency did not run correctly with the test inputs and parameter settings"
1363 )
1364 logger.info("when tested with galaxy_tool_test. Error code:%d" % int(testret))
1365 logger.info(
1366 "The 'i' (information) option shows how the ToolFactory was called, stderr and stdout, and what the command line was."
1367 )
1368 logger.info(
1369 "Expand (click on) any of the broken (red) history output titles to see that 'i' button and click it"
1370 )
1371 logger.info(
1372 "Make sure it is the same as your working test command line and double check that data files are coming from and going to where they should"
1373 )
1374 logger.info(
1375 "In the output collection, the tool xml <command> element must be the equivalent of your working command line for the test to work"
1376 )
1377 logging.shutdown()
1378 sys.exit(5)
1379 else:
1380 tf.makeToolTar(testret)
1381 jcl = sys.argv[1:]
1382 with open(
1383 os.path.join(
1384 args.tfcollection, "ToolFactory_%s_commandline.json" % args.tool_name
1385 ),
1386 "w",
1387 ) as fout:
1388 fout.write(" ".join(jcl))
1389 logging.shutdown()
1390
1391
1392 if __name__ == "__main__":
1393 main()