comparison toolfactory/toolwatcher.py @ 6:efefe43f23c8 draft default tip

Uploaded
author fubar
date Fri, 30 Apr 2021 02:10:32 +0000 (2021-04-30)
parents
children
comparison
equal deleted inserted replaced
5:e2c8c2fa192d 6:efefe43f23c8
1 #!/usr/bin/python
2 from datetime import datetime, timedelta
3 from io import BytesIO as BIO
4 import logging
5 import os
6 import subprocess
7 import tarfile
8 import time
9 from watchdog.observers import Observer
10 from watchdog.events import FileSystemEventHandler
11 from watchdog.events import PatternMatchingEventHandler
12
13 class ToolHandler(PatternMatchingEventHandler):
14
15 def __init__(self, watchme):
16 PatternMatchingEventHandler.__init__(self, patterns=['*.xml'],
17 ignore_directories=False, case_sensitive=False)
18 self.last_modified = datetime.now()
19 self.tool_dir = watchme
20 self.work_dir = os.getcwd()
21 self.galaxy_root = os.path.split(watchme)[0]
22 logging.info(self.galaxy_root)
23 self.tar_dir = os.path.join(self.galaxy_root, 'tooltardir')
24 if not os.path.exists(self.tar_dir):
25 os.mkdir(self.tar_dir)
26
27 def on_created(self, event):
28 self.on_modified(event)
29
30 def on_modified(self, event):
31 if datetime.now() - self.last_modified < timedelta(seconds=1):
32 return
33 else:
34 if os.path.exists(event.src_path):
35 self.last_modified = datetime.now()
36 logging.info(f"{event.src_path} was {event.event_type}")
37 p = self.planemo_test(event.src_path)
38 if p:
39 if p.returncode == 0:
40 newtarpath = self.makeToolTar(event.src_path)
41 logging.info('### Tested toolshed tarball %s written' % newtarpath)
42 else:
43 logging.debug('### planemo stdout:')
44 logging.debug(p.stdout)
45 logging.debug('### planemo stderr:')
46 logging.debug(p.stderr)
47 logging.info('### Planemo call return code =' % p.returncode)
48 else:
49 logging.info('Directory %s deleted' % event.src_path)
50
51 def planemo_test(self, xml_path):
52 toolpath, toolfile = os.path.split(xml_path)
53 dirlist = os.listdir(toolpath)
54 toolname = os.path.basename(toolpath)
55 logging.info('### test dirlist %s, path %s toolname %s' % (dirlist, xml_path, toolname))
56 xmls = [x for x in dirlist if os.path.splitext(x)[1] == '.xml']
57 if not len(xmls) > 0:
58 logging.warning('Found no xml files after change to %s' % xml_path)
59 return None
60 tool_test_output = os.path.join(toolpath, f"{toolname}_planemo_test_report.html")
61 cll = [
62 "planemo",
63 "test",
64 "--test_output",
65 tool_test_output,
66 "--galaxy_root",
67 self.galaxy_root,
68 "--update_test_data",
69 xml_path,
70 ]
71 logging.info('### calling %s' % ' '.join(cll))
72 p = subprocess.run(
73 cll,
74 cwd = toolpath,
75 shell=False,
76 capture_output=True,
77 encoding='utf8',
78 )
79 return p
80
81 def makeToolTar(self, xml_path):
82 """move outputs into test-data and prepare the tarball"""
83 excludeme = "_planemo_test_report.html"
84
85 def exclude_function(tarinfo):
86 filename = tarinfo.name
87 return None if filename.endswith(excludeme) else tarinfo
88
89 tooldir, xml_file = os.path.split(xml_path)
90 os.chdir(self.tool_dir)
91 toolname = os.path.splitext(xml_file)[0]
92 newtarpath = os.path.join(self.tar_dir, '%s_toolshed.gz' % toolname)
93 tf = tarfile.open(newtarpath, "w:gz")
94 tf.add(
95 name=toolname,
96 arcname=toolname,
97 filter=exclude_function,
98 )
99 tf.close()
100 os.chdir(self.work_dir)
101 return newtarpath
102
103
104 if __name__ == "__main__":
105 watchme = '/home/ross/gal21/tools'
106 logging.basicConfig(level=logging.INFO,
107 #filename = os.path.join(watchme,"toolwatcher.log")
108 #filemode = "w",
109 format='%(asctime)s - %(message)s',
110 datefmt='%Y-%m-%d %H:%M:%S')
111 event_handler = ToolHandler(watchme=watchme)
112 observer = Observer()
113 observer.schedule(event_handler, path=watchme, recursive=True)
114 observer.start()
115 try:
116 while True:
117 time.sleep(1)
118 except KeyboardInterrupt:
119 observer.stop()
120 observer.join()
121
122
123