diff toolfactory/rgToolFactory2.py @ 75:25dd9364a32a draft

Uploaded
author fubar
date Sat, 14 Nov 2020 23:35:27 +0000
parents 3d4324df911b
children 46db8143adb7
line wrap: on
line diff
--- a/toolfactory/rgToolFactory2.py	Sat Nov 14 21:52:39 2020 +0000
+++ b/toolfactory/rgToolFactory2.py	Sat Nov 14 23:35:27 2020 +0000
@@ -24,7 +24,6 @@
 
 
 import argparse
-import copy
 import logging
 import os
 import re
@@ -35,7 +34,7 @@
 import tempfile
 import time
 
-from bioblend import galaxy
+
 from bioblend import toolshed
 
 import galaxyxml.tool as gxt
@@ -92,8 +91,7 @@
 
 
 def timenow():
-    """return current time as a string
-    """
+    """return current time as a string"""
     return time.strftime("%d/%m/%Y %H:%M:%S", time.localtime(time.time()))
 
 
@@ -126,8 +124,7 @@
 
 
 def parse_citations(citations_text):
-    """
-    """
+    """"""
     citations = [c for c in citations_text.split("**ENTRY**") if c.strip()]
     citation_tuples = []
     for citation in citations:
@@ -291,9 +288,11 @@
         """ positional parameters are complicated by their numeric ordinal"""
         for i, p in enumerate(self.infiles):
             if self.args.parampass == "positional":
-                assert p[ICLPOS].isdigit(), (
-                    "Positional parameters must be ordinal integers - got %s for %s"
-                    % (p[ICLPOS], p[ILABPOS])
+                assert p[
+                    ICLPOS
+                ].isdigit(), "Positional parameters must be ordinal integers - got %s for %s" % (
+                    p[ICLPOS],
+                    p[ILABPOS],
                 )
             p.append(p[ICLPOS])
             if p[ICLPOS].isdigit() or self.args.parampass == "0":
@@ -304,9 +303,11 @@
             self.outfiles
         ):  # trying to automagically gather using extensions
             if self.args.parampass == "positional" and p[OCLPOS] != "STDOUT":
-                assert p[OCLPOS].isdigit(), (
-                    "Positional parameters must be ordinal integers - got %s for %s"
-                    % (p[OCLPOS], p[ONAMEPOS])
+                assert p[
+                    OCLPOS
+                ].isdigit(), "Positional parameters must be ordinal integers - got %s for %s" % (
+                    p[OCLPOS],
+                    p[ONAMEPOS],
                 )
             p.append(p[OCLPOS])
             if p[OCLPOS].isdigit() or p[OCLPOS] == "STDOUT":
@@ -315,9 +316,11 @@
             self.outfiles[i] = p
         for i, p in enumerate(self.addpar):
             if self.args.parampass == "positional":
-                assert p[ACLPOS].isdigit(), (
-                    "Positional parameters must be ordinal integers - got %s for %s"
-                    % (p[ACLPOS], p[ANAMEPOS])
+                assert p[
+                    ACLPOS
+                ].isdigit(), "Positional parameters must be ordinal integers - got %s for %s" % (
+                    p[ACLPOS],
+                    p[ANAMEPOS],
                 )
             p.append(p[ACLPOS])
             if p[ACLPOS].isdigit():
@@ -326,8 +329,7 @@
             self.addpar[i] = p
 
     def clsimple(self):
-        """ no parameters - uses < and > for i/o
-        """
+        """no parameters - uses < and > for i/o"""
         aCL = self.cl.append
         aXCL = self.xmlcl.append
 
@@ -358,8 +360,7 @@
             aXCL(self.lastxclredirect[1])
 
     def clargparse(self):
-        """ argparse style
-        """
+        """argparse style"""
         aCL = self.cl.append
         aXCL = self.xmlcl.append
         # inputs then params in argparse named form
@@ -520,7 +521,9 @@
             anout.command_line_override = "> $%s" % newname
             anout.positional = self.is_positional
             self.toutputs.append(anout)
-            tp = gxtp.TestOutput(name=newname, value="%s_sample" % newname, format=newfmt)
+            tp = gxtp.TestOutput(
+                name=newname, value="%s_sample" % newname, format=newfmt
+            )
             self.testparam.append(tp)
 
     def makeXML(self):
@@ -537,11 +540,14 @@
             helptext = open(self.args.help_text, "r").readlines()
             safertext = [html_escape(x) for x in helptext]
             if False and self.args.script_path:
-                scrp = self.script.split('\n')
-                scrpt = ['   %s' % x for x in scrp] # try to stop templating
-                scrpt.insert(0,"```\n")
+                scrp = self.script.split("\n")
+                scrpt = ["   %s" % x for x in scrp]  # try to stop templating
+                scrpt.insert(0, "```\n")
                 if len(scrpt) > 300:
-                    safertext = safertext + scrpt[:100] + ['>500 lines - stuff deleted','......'] + scrpt[-100:]
+                    safertext = (
+                        safertext + scrpt[:100] + \
+                        [">500 lines - stuff deleted", "......"] + scrpt[-100:]
+                    )
                 else:
                     safertext = safertext + scrpt
                 safertext.append("\n```")
@@ -628,7 +634,7 @@
                 else:
                     sto = open(self.tlog, "w")
                 sto.write(
-                       "## Executing Toolfactory generated command line = %s\n" % scl
+                    "## Executing Toolfactory generated command line = %s\n" % scl
                 )
             sto.flush()
             p = subprocess.run(self.cl, shell=False, stdout=sto, stderr=ste)
@@ -645,9 +651,7 @@
             else:
                 sto = sys.stdout
             p = subprocess.run(self.cl, shell=False, stdout=sto, stdin=sti)
-            sto.write(
-              "## Executing Toolfactory generated command line = %s\n" % scl
-            )
+            sto.write("## Executing Toolfactory generated command line = %s\n" % scl)
             retval = p.returncode
             sto.close()
             sti.close()
@@ -660,7 +664,6 @@
         logging.debug("run done")
         return retval
 
-
     def shedLoad(self):
         """
         {'deleted': False,
@@ -677,36 +680,44 @@
         else:
             sto = open(self.tlog, "w")
 
-        ts = toolshed.ToolShedInstance(url=self.args.toolshed_url,key=self.args.toolshed_api_key,verify=False)
+        ts = toolshed.ToolShedInstance(
+            url=self.args.toolshed_url, key=self.args.toolshed_api_key, verify=False
+        )
         repos = ts.repositories.get_repositories()
-        rnames = [x.get('name','?') for x in repos]
-        rids = [x.get('id','?') for x in repos]
-        sto.write(f'############names={rnames} rids={rids}')
-        cat = 'ToolFactory generated tools'
+        rnames = [x.get("name", "?") for x in repos]
+        rids = [x.get("id", "?") for x in repos]
+        sto.write(f"############names={rnames} rids={rids}")
+        cat = "ToolFactory generated tools"
         if self.args.tool_name not in rnames:
             tscat = ts.categories.get_categories()
-            cnames = [x.get('name','?') for x in tscat]
-            cids = [x.get('id','?') for x in tscat]
+            cnames = [x.get("name", "?") for x in tscat]
+            cids = [x.get("id", "?") for x in tscat]
             catID = None
             if cat in cnames:
                 ci = cnames.index(cat)
                 catID = cids[ci]
-            res = ts.repositories.create_repository(name=self.args.tool_name, synopsis='Synopsis:%s' % self.args.tool_desc, description=self.args.tool_desc,
-                          type='unrestricted', remote_repository_url=self.args.toolshed_url,
-                          homepage_url=None, category_ids=catID)
-            tid = res.get('id',None)
-            sto.write(f'##########create res={res}')
+            res = ts.repositories.create_repository(
+                name=self.args.tool_name,
+                synopsis="Synopsis:%s" % self.args.tool_desc,
+                description=self.args.tool_desc,
+                type="unrestricted",
+                remote_repository_url=self.args.toolshed_url,
+                homepage_url=None,
+                category_ids=catID,
+            )
+            tid = res.get("id", None)
+            sto.write(f"##########create res={res}")
         else:
-             i = rnames.index(self.args.tool_name)
-             tid = rids[i]
-        res = ts.repositories.update_repository(id=tid, tar_ball_path=self.newtarpath, commit_message=None)
-        sto.write(f'#####update res={res}')
+            i = rnames.index(self.args.tool_name)
+            tid = rids[i]
+        res = ts.repositories.update_repository(
+            id=tid, tar_ball_path=self.newtarpath, commit_message=None
+        )
+        sto.write(f"#####update res={res}")
         sto.close()
 
-
     def eph_galaxy_load(self):
-        """load the new tool from the local toolshed after planemo uploads it
-        """
+        """load the new tool from the local toolshed after planemo uploads it"""
         if os.path.exists(self.tlog):
             tout = open(self.tlog, "a")
         else:
@@ -725,17 +736,17 @@
             "fubar",
             "--toolshed",
             self.args.toolshed_url,
-           "--section_label",
+            "--section_label",
             "ToolFactory",
-
         ]
         tout.write("running\n%s\n" % " ".join(cll))
         p = subprocess.run(cll, shell=False, stderr=tout, stdout=tout)
-        tout.write("installed %s - got retcode %d\n" % (self.args.tool_name,p.returncode))
+        tout.write(
+            "installed %s - got retcode %d\n" % (self.args.tool_name, p.returncode)
+        )
         tout.close()
         return p.returncode
 
-
     def planemo_shedload(self):
         """
         planemo shed_create --shed_target testtoolshed
@@ -754,17 +765,27 @@
             tout = open(self.tlog, "a")
         else:
             tout = open(self.tlog, "w")
-        ts = toolshed.ToolShedInstance(url=self.args.toolshed_url,key=self.args.toolshed_api_key,verify=False)
+        ts = toolshed.ToolShedInstance(
+            url=self.args.toolshed_url, key=self.args.toolshed_api_key, verify=False
+        )
         repos = ts.repositories.get_repositories()
-        rnames = [x.get('name','?') for x in repos]
-        rids = [x.get('id','?') for x in repos]
-        tout.write(f'############names={rnames} rids={rids}')
-        cat = 'ToolFactory generated tools'
+        rnames = [x.get("name", "?") for x in repos]
+        rids = [x.get("id", "?") for x in repos]
+        tout.write(f"############names={rnames} rids={rids}")
+        #cat = "ToolFactory generated tools"
         if self.args.tool_name not in rnames:
-            cll = ["planemo", "shed_create", "--shed_target", "local",
-            "--owner","fubar","--name",
-            self.args.tool_name,"--shed_key",
-            self.args.toolshed_api_key,]
+            cll = [
+                "planemo",
+                "shed_create",
+                "--shed_target",
+                "local",
+                "--owner",
+                "fubar",
+                "--name",
+                self.args.tool_name,
+                "--shed_key",
+                self.args.toolshed_api_key,
+            ]
             try:
                 p = subprocess.run(
                     cll, shell=False, cwd=self.tooloutdir, stdout=tout, stderr=tout
@@ -772,7 +793,7 @@
             except:
                 pass
             if p.returncode != 0:
-               tout.write("Repository %s exists" % self.args.tool_name)
+                tout.write("Repository %s exists" % self.args.tool_name)
             else:
                 tout.write("initiated %s" % self.args.tool_name)
         cll = [
@@ -790,36 +811,34 @@
             self.newtarpath,
         ]
         p = subprocess.run(cll, shell=False, stdout=tout, stderr=tout)
-        tout.write("Ran %s got %d" %  (" ".join(cll), p.returncode))
+        tout.write("Ran %s got %d" % (" ".join(cll), p.returncode))
         tout.close()
         return p.returncode
 
-
     def eph_test(self):
-            """
-            """
-            if os.path.exists(self.tlog):
-                tout = open(self.tlog, "a")
-            else:
-                tout = open(self.tlog, "w")
-            cll = [
-                    "shed-tools",
-                    "test",
-                 "-g",
-                self.args.galaxy_url,
-                "-a",
-                self.args.galaxy_api_key,
-                "--name",
-                self.args.tool_name,
-                "--owner",
-                "fubar",
-                   ]
-            p = subprocess.run(
-                    cll, shell=False, cwd=self.tooloutdir, stderr=tout, stdout=tout
-                )
-            tout.write("eph_test Ran %s got %d" % (" ".join(cll), p.returncode))
-            tout.close()
-            return p.returncode
+        """"""
+        if os.path.exists(self.tlog):
+            tout = open(self.tlog, "a")
+        else:
+            tout = open(self.tlog, "w")
+        cll = [
+            "shed-tools",
+            "test",
+            "-g",
+            self.args.galaxy_url,
+            "-a",
+            self.args.galaxy_api_key,
+            "--name",
+            self.args.tool_name,
+            "--owner",
+            "fubar",
+        ]
+        p = subprocess.run(
+            cll, shell=False, cwd=self.tooloutdir, stderr=tout, stdout=tout
+        )
+        tout.write("eph_test Ran %s got %d" % (" ".join(cll), p.returncode))
+        tout.close()
+        return p.returncode
 
     def planemo_test(self, genoutputs=True):
         """planemo is a requirement so is available for testing
@@ -833,7 +852,7 @@
         else:
             tout = open(self.tlog, "w")
         if genoutputs:
-            dummy,tfile = tempfile.mkstemp()
+            dummy, tfile = tempfile.mkstemp()
             cll = [
                 "planemo",
                 "test",
@@ -843,23 +862,28 @@
                 xreal,
             ]
             p = subprocess.run(
-                    cll, shell=False, cwd=self.tooloutdir, stderr=dummy, stdout=dummy,
-                )
+                cll,
+                shell=False,
+                cwd=self.tooloutdir,
+                stderr=dummy,
+                stdout=dummy,
+            )
         else:
-            cll = ["planemo", "test", "--galaxy_root",
+            cll = [
+                "planemo",
+                "test",
+                "--galaxy_root",
                 self.args.galaxy_root,
-                xreal,]
+                xreal,
+            ]
             p = subprocess.run(
-                    cll, shell=False, cwd=self.tooloutdir, stderr=tout, stdout=tout
-                )
+                cll, shell=False, cwd=self.tooloutdir, stderr=tout, stdout=tout
+            )
         tout.close()
         return p.returncode
 
-
-
     def writeShedyml(self):
-        """for planemo
-        """
+        """for planemo"""
         yuser = self.args.user_email.split("@")[0]
         yfname = os.path.join(self.tooloutdir, ".shed.yml")
         yamlf = open(yfname, "w")
@@ -875,8 +899,7 @@
         yamlf.close()
 
     def makeTool(self):
-        """write xmls and input samples into place
-        """
+        """write xmls and input samples into place"""
         self.makeXML()
         if self.args.script_path:
             stname = os.path.join(self.tooloutdir, "%s" % (self.sfile))
@@ -893,12 +916,17 @@
             shutil.copyfile(pth, dest)
 
     def makeToolTar(self):
-        """ move outputs into test-data and prepare the tarball
-        """
+        """move outputs into test-data and prepare the tarball"""
         excludeme = "tool_test_output"
+
         def exclude_function(tarinfo):
-           filename = tarinfo.name
-           return None if filename.startswith(excludeme)  or os.path.splitext(filename)[1].startswith(excludeme) else tarinfo
+            filename = tarinfo.name
+            return (
+                None
+                if filename.startswith(excludeme) or \
+                os.path.splitext(filename)[1].startswith(excludeme)
+                else tarinfo
+            )
 
         for p in self.outfiles:
             src = p[ONAMEPOS]
@@ -919,8 +947,7 @@
         shutil.copyfile(self.newtarpath, self.args.new_tool)
 
     def moveRunOutputs(self):
-        """need to move planemo or run outputs into toolfactory collection
-        """
+        """need to move planemo or run outputs into toolfactory collection"""
         with os.scandir(self.tooloutdir) as outs:
             for entry in outs:
                 if not entry.is_file() or entry.name.startswith("."):
@@ -968,15 +995,16 @@
     a("--tfout", default="./tfout")
     a("--new_tool", default="new_tool")
     a("--galaxy_url", default="http://localhost:8080")
-    a("--toolshed_url", default="http://localhost:9009") # make sure this is NOT 127.0.0.1 - it won't work if tool_sheds_conf.xml has localhost
-    #a("--galaxy_api_key", default="1e62ddad74fe9bf112859f4e9efea48b")
-    #a("--toolshed_api_key", default="9154c91f2a162bf12fda15764f43846c")
+    a(
+        "--toolshed_url", default="http://localhost:9009"
+    )  # make sure this is NOT 127.0.0.1 - it won't work if tool_sheds_conf.xml has localhost
+    # a("--galaxy_api_key", default="1e62ddad74fe9bf112859f4e9efea48b")
+    # a("--toolshed_api_key", default="9154c91f2a162bf12fda15764f43846c")
 
     a("--toolshed_api_key", default="fakekey")
     a("--galaxy_api_key", default="fakekey")
     a("--galaxy_root", default="/galaxy-central")
 
-
     args = parser.parse_args()
     assert not args.bad_user, (
         'UNAUTHORISED: %s is NOT authorized to use this tool until Galaxy admin adds %s to "admin_users" in the Galaxy configuration file'
@@ -1004,11 +1032,10 @@
         retcode = r.planemo_test(genoutputs=False)
         r.moveRunOutputs()
         if args.make_Tool == "gentestinstall":
-            retcode = r.planemo_shedload() #r.shedLoad()
-            print(f'planemo_shedload returned {retcode}')
+            retcode = r.planemo_shedload()  # r.shedLoad()
+            print(f"planemo_shedload returned {retcode}")
             r.eph_galaxy_load()
 
 
-
 if __name__ == "__main__":
     main()