Mercurial > repos > fubar > tool_factory_2
diff toolfactory/rgToolFactory2.py @ 75:25dd9364a32a draft
Uploaded
| author | fubar | 
|---|---|
| date | Sat, 14 Nov 2020 23:35:27 +0000 | 
| parents | 3d4324df911b | 
| children | 46db8143adb7 | 
line wrap: on
 line diff
--- a/toolfactory/rgToolFactory2.py Sat Nov 14 21:52:39 2020 +0000 +++ b/toolfactory/rgToolFactory2.py Sat Nov 14 23:35:27 2020 +0000 @@ -24,7 +24,6 @@ import argparse -import copy import logging import os import re @@ -35,7 +34,7 @@ import tempfile import time -from bioblend import galaxy + from bioblend import toolshed import galaxyxml.tool as gxt @@ -92,8 +91,7 @@ def timenow(): - """return current time as a string - """ + """return current time as a string""" return time.strftime("%d/%m/%Y %H:%M:%S", time.localtime(time.time())) @@ -126,8 +124,7 @@ def parse_citations(citations_text): - """ - """ + """""" citations = [c for c in citations_text.split("**ENTRY**") if c.strip()] citation_tuples = [] for citation in citations: @@ -291,9 +288,11 @@ """ positional parameters are complicated by their numeric ordinal""" for i, p in enumerate(self.infiles): if self.args.parampass == "positional": - assert p[ICLPOS].isdigit(), ( - "Positional parameters must be ordinal integers - got %s for %s" - % (p[ICLPOS], p[ILABPOS]) + assert p[ + ICLPOS + ].isdigit(), "Positional parameters must be ordinal integers - got %s for %s" % ( + p[ICLPOS], + p[ILABPOS], ) p.append(p[ICLPOS]) if p[ICLPOS].isdigit() or self.args.parampass == "0": @@ -304,9 +303,11 @@ self.outfiles ): # trying to automagically gather using extensions if self.args.parampass == "positional" and p[OCLPOS] != "STDOUT": - assert p[OCLPOS].isdigit(), ( - "Positional parameters must be ordinal integers - got %s for %s" - % (p[OCLPOS], p[ONAMEPOS]) + assert p[ + OCLPOS + ].isdigit(), "Positional parameters must be ordinal integers - got %s for %s" % ( + p[OCLPOS], + p[ONAMEPOS], ) p.append(p[OCLPOS]) if p[OCLPOS].isdigit() or p[OCLPOS] == "STDOUT": @@ -315,9 +316,11 @@ self.outfiles[i] = p for i, p in enumerate(self.addpar): if self.args.parampass == "positional": - assert p[ACLPOS].isdigit(), ( - "Positional parameters must be ordinal integers - got %s for %s" - % (p[ACLPOS], p[ANAMEPOS]) + assert p[ + ACLPOS + ].isdigit(), "Positional parameters must be ordinal integers - got %s for %s" % ( + p[ACLPOS], + p[ANAMEPOS], ) p.append(p[ACLPOS]) if p[ACLPOS].isdigit(): @@ -326,8 +329,7 @@ self.addpar[i] = p def clsimple(self): - """ no parameters - uses < and > for i/o - """ + """no parameters - uses < and > for i/o""" aCL = self.cl.append aXCL = self.xmlcl.append @@ -358,8 +360,7 @@ aXCL(self.lastxclredirect[1]) def clargparse(self): - """ argparse style - """ + """argparse style""" aCL = self.cl.append aXCL = self.xmlcl.append # inputs then params in argparse named form @@ -520,7 +521,9 @@ anout.command_line_override = "> $%s" % newname anout.positional = self.is_positional self.toutputs.append(anout) - tp = gxtp.TestOutput(name=newname, value="%s_sample" % newname, format=newfmt) + tp = gxtp.TestOutput( + name=newname, value="%s_sample" % newname, format=newfmt + ) self.testparam.append(tp) def makeXML(self): @@ -537,11 +540,14 @@ helptext = open(self.args.help_text, "r").readlines() safertext = [html_escape(x) for x in helptext] if False and self.args.script_path: - scrp = self.script.split('\n') - scrpt = [' %s' % x for x in scrp] # try to stop templating - scrpt.insert(0,"```\n") + scrp = self.script.split("\n") + scrpt = [" %s" % x for x in scrp] # try to stop templating + scrpt.insert(0, "```\n") if len(scrpt) > 300: - safertext = safertext + scrpt[:100] + ['>500 lines - stuff deleted','......'] + scrpt[-100:] + safertext = ( + safertext + scrpt[:100] + \ + [">500 lines - stuff deleted", "......"] + scrpt[-100:] + ) else: safertext = safertext + scrpt safertext.append("\n```") @@ -628,7 +634,7 @@ else: sto = open(self.tlog, "w") sto.write( - "## Executing Toolfactory generated command line = %s\n" % scl + "## Executing Toolfactory generated command line = %s\n" % scl ) sto.flush() p = subprocess.run(self.cl, shell=False, stdout=sto, stderr=ste) @@ -645,9 +651,7 @@ else: sto = sys.stdout p = subprocess.run(self.cl, shell=False, stdout=sto, stdin=sti) - sto.write( - "## Executing Toolfactory generated command line = %s\n" % scl - ) + sto.write("## Executing Toolfactory generated command line = %s\n" % scl) retval = p.returncode sto.close() sti.close() @@ -660,7 +664,6 @@ logging.debug("run done") return retval - def shedLoad(self): """ {'deleted': False, @@ -677,36 +680,44 @@ else: sto = open(self.tlog, "w") - ts = toolshed.ToolShedInstance(url=self.args.toolshed_url,key=self.args.toolshed_api_key,verify=False) + ts = toolshed.ToolShedInstance( + url=self.args.toolshed_url, key=self.args.toolshed_api_key, verify=False + ) repos = ts.repositories.get_repositories() - rnames = [x.get('name','?') for x in repos] - rids = [x.get('id','?') for x in repos] - sto.write(f'############names={rnames} rids={rids}') - cat = 'ToolFactory generated tools' + rnames = [x.get("name", "?") for x in repos] + rids = [x.get("id", "?") for x in repos] + sto.write(f"############names={rnames} rids={rids}") + cat = "ToolFactory generated tools" if self.args.tool_name not in rnames: tscat = ts.categories.get_categories() - cnames = [x.get('name','?') for x in tscat] - cids = [x.get('id','?') for x in tscat] + cnames = [x.get("name", "?") for x in tscat] + cids = [x.get("id", "?") for x in tscat] catID = None if cat in cnames: ci = cnames.index(cat) catID = cids[ci] - res = ts.repositories.create_repository(name=self.args.tool_name, synopsis='Synopsis:%s' % self.args.tool_desc, description=self.args.tool_desc, - type='unrestricted', remote_repository_url=self.args.toolshed_url, - homepage_url=None, category_ids=catID) - tid = res.get('id',None) - sto.write(f'##########create res={res}') + res = ts.repositories.create_repository( + name=self.args.tool_name, + synopsis="Synopsis:%s" % self.args.tool_desc, + description=self.args.tool_desc, + type="unrestricted", + remote_repository_url=self.args.toolshed_url, + homepage_url=None, + category_ids=catID, + ) + tid = res.get("id", None) + sto.write(f"##########create res={res}") else: - i = rnames.index(self.args.tool_name) - tid = rids[i] - res = ts.repositories.update_repository(id=tid, tar_ball_path=self.newtarpath, commit_message=None) - sto.write(f'#####update res={res}') + i = rnames.index(self.args.tool_name) + tid = rids[i] + res = ts.repositories.update_repository( + id=tid, tar_ball_path=self.newtarpath, commit_message=None + ) + sto.write(f"#####update res={res}") sto.close() - def eph_galaxy_load(self): - """load the new tool from the local toolshed after planemo uploads it - """ + """load the new tool from the local toolshed after planemo uploads it""" if os.path.exists(self.tlog): tout = open(self.tlog, "a") else: @@ -725,17 +736,17 @@ "fubar", "--toolshed", self.args.toolshed_url, - "--section_label", + "--section_label", "ToolFactory", - ] tout.write("running\n%s\n" % " ".join(cll)) p = subprocess.run(cll, shell=False, stderr=tout, stdout=tout) - tout.write("installed %s - got retcode %d\n" % (self.args.tool_name,p.returncode)) + tout.write( + "installed %s - got retcode %d\n" % (self.args.tool_name, p.returncode) + ) tout.close() return p.returncode - def planemo_shedload(self): """ planemo shed_create --shed_target testtoolshed @@ -754,17 +765,27 @@ tout = open(self.tlog, "a") else: tout = open(self.tlog, "w") - ts = toolshed.ToolShedInstance(url=self.args.toolshed_url,key=self.args.toolshed_api_key,verify=False) + ts = toolshed.ToolShedInstance( + url=self.args.toolshed_url, key=self.args.toolshed_api_key, verify=False + ) repos = ts.repositories.get_repositories() - rnames = [x.get('name','?') for x in repos] - rids = [x.get('id','?') for x in repos] - tout.write(f'############names={rnames} rids={rids}') - cat = 'ToolFactory generated tools' + rnames = [x.get("name", "?") for x in repos] + rids = [x.get("id", "?") for x in repos] + tout.write(f"############names={rnames} rids={rids}") + #cat = "ToolFactory generated tools" if self.args.tool_name not in rnames: - cll = ["planemo", "shed_create", "--shed_target", "local", - "--owner","fubar","--name", - self.args.tool_name,"--shed_key", - self.args.toolshed_api_key,] + cll = [ + "planemo", + "shed_create", + "--shed_target", + "local", + "--owner", + "fubar", + "--name", + self.args.tool_name, + "--shed_key", + self.args.toolshed_api_key, + ] try: p = subprocess.run( cll, shell=False, cwd=self.tooloutdir, stdout=tout, stderr=tout @@ -772,7 +793,7 @@ except: pass if p.returncode != 0: - tout.write("Repository %s exists" % self.args.tool_name) + tout.write("Repository %s exists" % self.args.tool_name) else: tout.write("initiated %s" % self.args.tool_name) cll = [ @@ -790,36 +811,34 @@ self.newtarpath, ] p = subprocess.run(cll, shell=False, stdout=tout, stderr=tout) - tout.write("Ran %s got %d" % (" ".join(cll), p.returncode)) + tout.write("Ran %s got %d" % (" ".join(cll), p.returncode)) tout.close() return p.returncode - def eph_test(self): - """ - """ - if os.path.exists(self.tlog): - tout = open(self.tlog, "a") - else: - tout = open(self.tlog, "w") - cll = [ - "shed-tools", - "test", - "-g", - self.args.galaxy_url, - "-a", - self.args.galaxy_api_key, - "--name", - self.args.tool_name, - "--owner", - "fubar", - ] - p = subprocess.run( - cll, shell=False, cwd=self.tooloutdir, stderr=tout, stdout=tout - ) - tout.write("eph_test Ran %s got %d" % (" ".join(cll), p.returncode)) - tout.close() - return p.returncode + """""" + if os.path.exists(self.tlog): + tout = open(self.tlog, "a") + else: + tout = open(self.tlog, "w") + cll = [ + "shed-tools", + "test", + "-g", + self.args.galaxy_url, + "-a", + self.args.galaxy_api_key, + "--name", + self.args.tool_name, + "--owner", + "fubar", + ] + p = subprocess.run( + cll, shell=False, cwd=self.tooloutdir, stderr=tout, stdout=tout + ) + tout.write("eph_test Ran %s got %d" % (" ".join(cll), p.returncode)) + tout.close() + return p.returncode def planemo_test(self, genoutputs=True): """planemo is a requirement so is available for testing @@ -833,7 +852,7 @@ else: tout = open(self.tlog, "w") if genoutputs: - dummy,tfile = tempfile.mkstemp() + dummy, tfile = tempfile.mkstemp() cll = [ "planemo", "test", @@ -843,23 +862,28 @@ xreal, ] p = subprocess.run( - cll, shell=False, cwd=self.tooloutdir, stderr=dummy, stdout=dummy, - ) + cll, + shell=False, + cwd=self.tooloutdir, + stderr=dummy, + stdout=dummy, + ) else: - cll = ["planemo", "test", "--galaxy_root", + cll = [ + "planemo", + "test", + "--galaxy_root", self.args.galaxy_root, - xreal,] + xreal, + ] p = subprocess.run( - cll, shell=False, cwd=self.tooloutdir, stderr=tout, stdout=tout - ) + cll, shell=False, cwd=self.tooloutdir, stderr=tout, stdout=tout + ) tout.close() return p.returncode - - def writeShedyml(self): - """for planemo - """ + """for planemo""" yuser = self.args.user_email.split("@")[0] yfname = os.path.join(self.tooloutdir, ".shed.yml") yamlf = open(yfname, "w") @@ -875,8 +899,7 @@ yamlf.close() def makeTool(self): - """write xmls and input samples into place - """ + """write xmls and input samples into place""" self.makeXML() if self.args.script_path: stname = os.path.join(self.tooloutdir, "%s" % (self.sfile)) @@ -893,12 +916,17 @@ shutil.copyfile(pth, dest) def makeToolTar(self): - """ move outputs into test-data and prepare the tarball - """ + """move outputs into test-data and prepare the tarball""" excludeme = "tool_test_output" + def exclude_function(tarinfo): - filename = tarinfo.name - return None if filename.startswith(excludeme) or os.path.splitext(filename)[1].startswith(excludeme) else tarinfo + filename = tarinfo.name + return ( + None + if filename.startswith(excludeme) or \ + os.path.splitext(filename)[1].startswith(excludeme) + else tarinfo + ) for p in self.outfiles: src = p[ONAMEPOS] @@ -919,8 +947,7 @@ shutil.copyfile(self.newtarpath, self.args.new_tool) def moveRunOutputs(self): - """need to move planemo or run outputs into toolfactory collection - """ + """need to move planemo or run outputs into toolfactory collection""" with os.scandir(self.tooloutdir) as outs: for entry in outs: if not entry.is_file() or entry.name.startswith("."): @@ -968,15 +995,16 @@ a("--tfout", default="./tfout") a("--new_tool", default="new_tool") a("--galaxy_url", default="http://localhost:8080") - a("--toolshed_url", default="http://localhost:9009") # make sure this is NOT 127.0.0.1 - it won't work if tool_sheds_conf.xml has localhost - #a("--galaxy_api_key", default="1e62ddad74fe9bf112859f4e9efea48b") - #a("--toolshed_api_key", default="9154c91f2a162bf12fda15764f43846c") + a( + "--toolshed_url", default="http://localhost:9009" + ) # make sure this is NOT 127.0.0.1 - it won't work if tool_sheds_conf.xml has localhost + # a("--galaxy_api_key", default="1e62ddad74fe9bf112859f4e9efea48b") + # a("--toolshed_api_key", default="9154c91f2a162bf12fda15764f43846c") a("--toolshed_api_key", default="fakekey") a("--galaxy_api_key", default="fakekey") a("--galaxy_root", default="/galaxy-central") - args = parser.parse_args() assert not args.bad_user, ( 'UNAUTHORISED: %s is NOT authorized to use this tool until Galaxy admin adds %s to "admin_users" in the Galaxy configuration file' @@ -1004,11 +1032,10 @@ retcode = r.planemo_test(genoutputs=False) r.moveRunOutputs() if args.make_Tool == "gentestinstall": - retcode = r.planemo_shedload() #r.shedLoad() - print(f'planemo_shedload returned {retcode}') + retcode = r.planemo_shedload() # r.shedLoad() + print(f"planemo_shedload returned {retcode}") r.eph_galaxy_load() - if __name__ == "__main__": main()
