diff toolfactory/toolwatcher.py @ 6:efefe43f23c8 draft default tip

Uploaded
author fubar
date Fri, 30 Apr 2021 02:10:32 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/toolfactory/toolwatcher.py	Fri Apr 30 02:10:32 2021 +0000
@@ -0,0 +1,123 @@
+#!/usr/bin/python
+from datetime import datetime, timedelta
+from io import BytesIO as BIO
+import logging
+import os
+import subprocess
+import tarfile
+import time
+from watchdog.observers import Observer
+from watchdog.events import FileSystemEventHandler
+from watchdog.events import PatternMatchingEventHandler
+
+class ToolHandler(PatternMatchingEventHandler):
+
+    def __init__(self, watchme):
+        PatternMatchingEventHandler.__init__(self, patterns=['*.xml'],
+                ignore_directories=False, case_sensitive=False)
+        self.last_modified = datetime.now()
+        self.tool_dir = watchme
+        self.work_dir = os.getcwd()
+        self.galaxy_root = os.path.split(watchme)[0]
+        logging.info(self.galaxy_root)
+        self.tar_dir = os.path.join(self.galaxy_root, 'tooltardir')
+        if not os.path.exists(self.tar_dir):
+                os.mkdir(self.tar_dir)
+
+    def on_created(self, event):
+        self.on_modified(event)
+
+    def on_modified(self, event):
+        if datetime.now() - self.last_modified < timedelta(seconds=1):
+            return
+        else:
+            if os.path.exists(event.src_path):
+                self.last_modified = datetime.now()
+                logging.info(f"{event.src_path} was {event.event_type}")
+                p = self.planemo_test(event.src_path)
+                if p:
+                    if p.returncode == 0:
+                        newtarpath = self.makeToolTar(event.src_path)
+                        logging.info('### Tested toolshed tarball %s written' % newtarpath)
+                    else:
+                        logging.debug('### planemo stdout:')
+                        logging.debug(p.stdout)
+                        logging.debug('### planemo stderr:')
+                        logging.debug(p.stderr)
+                        logging.info('### Planemo call return code =' % p.returncode)
+            else:
+                logging.info('Directory %s deleted' % event.src_path)
+
+    def planemo_test(self, xml_path):
+        toolpath, toolfile = os.path.split(xml_path)
+        dirlist = os.listdir(toolpath)
+        toolname = os.path.basename(toolpath)
+        logging.info('### test dirlist %s, path %s toolname %s' % (dirlist, xml_path, toolname))
+        xmls = [x for x in dirlist if os.path.splitext(x)[1] == '.xml']
+        if not len(xmls) > 0:
+            logging.warning('Found no xml files after change to %s' % xml_path)
+            return None
+        tool_test_output = os.path.join(toolpath, f"{toolname}_planemo_test_report.html")
+        cll = [
+            "planemo",
+            "test",
+            "--test_output",
+            tool_test_output,
+            "--galaxy_root",
+            self.galaxy_root,
+            "--update_test_data",
+            xml_path,
+        ]
+        logging.info('### calling %s' % ' '.join(cll))
+        p = subprocess.run(
+            cll,
+            cwd = toolpath,
+            shell=False,
+            capture_output=True,
+            encoding='utf8',
+        )
+        return p
+
+    def makeToolTar(self, xml_path):
+        """move outputs into test-data and prepare the tarball"""
+        excludeme = "_planemo_test_report.html"
+
+        def exclude_function(tarinfo):
+            filename = tarinfo.name
+            return None if filename.endswith(excludeme) else tarinfo
+
+        tooldir, xml_file = os.path.split(xml_path)
+        os.chdir(self.tool_dir)
+        toolname = os.path.splitext(xml_file)[0]
+        newtarpath = os.path.join(self.tar_dir, '%s_toolshed.gz' % toolname)
+        tf = tarfile.open(newtarpath, "w:gz")
+        tf.add(
+            name=toolname,
+            arcname=toolname,
+            filter=exclude_function,
+        )
+        tf.close()
+        os.chdir(self.work_dir)
+        return newtarpath
+
+
+if __name__ == "__main__":
+    watchme = '/home/ross/gal21/tools'
+    logging.basicConfig(level=logging.INFO,
+                    #filename = os.path.join(watchme,"toolwatcher.log")
+                    #filemode = "w",
+                    format='%(asctime)s - %(message)s',
+                    datefmt='%Y-%m-%d %H:%M:%S')
+    event_handler = ToolHandler(watchme=watchme)
+    observer = Observer()
+    observer.schedule(event_handler, path=watchme, recursive=True)
+    observer.start()
+    try:
+        while True:
+            time.sleep(1)
+    except KeyboardInterrupt:
+        observer.stop()
+    observer.join()
+
+
+