Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/galaxy/util/heartbeat.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
| author | guerler | 
|---|---|
| date | Fri, 31 Jul 2020 00:32:28 -0400 | 
| parents | |
| children | 
   comparison
  equal
  deleted
  inserted
  replaced
| 0:d30785e31577 | 1:56ad4e20f292 | 
|---|---|
| 1 import os | |
| 2 import sys | |
| 3 import threading | |
| 4 import time | |
| 5 import traceback | |
| 6 | |
| 7 from six import iteritems | |
| 8 | |
| 9 | |
| 10 def get_current_thread_object_dict(): | |
| 11 """ | |
| 12 Get a dictionary of all 'Thread' objects created via the threading | |
| 13 module keyed by thread_id. Note that not all interpreter threads | |
| 14 have a thread objects, only the main thread and any created via the | |
| 15 'threading' module. Threads created via the low level 'thread' module | |
| 16 will not be in the returned dictionary. | |
| 17 | |
| 18 HACK: This mucks with the internals of the threading module since that | |
| 19 module does not expose any way to match 'Thread' objects with | |
| 20 intepreter thread identifiers (though it should). | |
| 21 """ | |
| 22 rval = dict() | |
| 23 # Acquire the lock and then union the contents of 'active' and 'limbo' | |
| 24 # threads into the return value. | |
| 25 threading._active_limbo_lock.acquire() | |
| 26 rval.update(threading._active) | |
| 27 rval.update(threading._limbo) | |
| 28 threading._active_limbo_lock.release() | |
| 29 return rval | |
| 30 | |
| 31 | |
| 32 class Heartbeat(threading.Thread): | |
| 33 """ | |
| 34 Thread that periodically dumps the state of all threads to a file | |
| 35 """ | |
| 36 | |
| 37 def __init__(self, config, name="Heartbeat Thread", period=20, fname="heartbeat.log"): | |
| 38 threading.Thread.__init__(self, name=name) | |
| 39 self.config = config | |
| 40 self.should_stop = False | |
| 41 self.period = period | |
| 42 self.fname = fname | |
| 43 self.file = None | |
| 44 self.fname_nonsleeping = None | |
| 45 self.file_nonsleeping = None | |
| 46 self.pid = None | |
| 47 self.nonsleeping_heartbeats = {} | |
| 48 # Event to wait on when sleeping, allows us to interrupt for shutdown | |
| 49 self.wait_event = threading.Event() | |
| 50 | |
| 51 def run(self): | |
| 52 self.pid = os.getpid() | |
| 53 self.fname = self.fname.format( | |
| 54 server_name=self.config.server_name, | |
| 55 pid=self.pid | |
| 56 ) | |
| 57 fname, ext = os.path.splitext(self.fname) | |
| 58 self.fname_nonsleeping = fname + '.nonsleeping' + ext | |
| 59 wait = self.period | |
| 60 if self.period <= 0: | |
| 61 wait = 60 | |
| 62 while not self.should_stop: | |
| 63 if self.period > 0: | |
| 64 self.dump() | |
| 65 self.wait_event.wait(wait) | |
| 66 | |
| 67 def open_logs(self): | |
| 68 if self.file is None or self.file.closed: | |
| 69 self.file = open(self.fname, "a") | |
| 70 self.file_nonsleeping = open(self.fname_nonsleeping, "a") | |
| 71 self.file.write("Heartbeat for pid %d thread started at %s\n\n" % (self.pid, time.asctime())) | |
| 72 self.file_nonsleeping.write("Non-Sleeping-threads for pid %d thread started at %s\n\n" % (self.pid, time.asctime())) | |
| 73 | |
| 74 def close_logs(self): | |
| 75 if self.file is not None and not self.file.closed: | |
| 76 self.file.write("Heartbeat for pid %d thread stopped at %s\n\n" % (self.pid, time.asctime())) | |
| 77 self.file_nonsleeping.write("Non-Sleeping-threads for pid %d thread stopped at %s\n\n" % (self.pid, time.asctime())) | |
| 78 self.file.close() | |
| 79 self.file_nonsleeping.close() | |
| 80 | |
| 81 def dump(self): | |
| 82 self.open_logs() | |
| 83 try: | |
| 84 # Print separator with timestamp | |
| 85 self.file.write("Traceback dump for all threads at %s:\n\n" % time.asctime()) | |
| 86 # Print the thread states | |
| 87 threads = get_current_thread_object_dict() | |
| 88 for thread_id, frame in iteritems(sys._current_frames()): | |
| 89 if thread_id in threads: | |
| 90 object = repr(threads[thread_id]) | |
| 91 else: | |
| 92 object = "<No Thread object>" | |
| 93 self.file.write("Thread %s, %s:\n\n" % (thread_id, object)) | |
| 94 traceback.print_stack(frame, file=self.file) | |
| 95 self.file.write("\n") | |
| 96 self.file.write("End dump\n\n") | |
| 97 self.file.flush() | |
| 98 self.print_nonsleeping(threads) | |
| 99 except Exception: | |
| 100 self.file.write("Caught exception attempting to dump thread states:") | |
| 101 traceback.print_exc(None, self.file) | |
| 102 self.file.write("\n") | |
| 103 | |
| 104 def shutdown(self): | |
| 105 self.should_stop = True | |
| 106 self.wait_event.set() | |
| 107 self.close_logs() | |
| 108 self.join() | |
| 109 | |
| 110 def thread_is_sleeping(self, last_stack_frame): | |
| 111 """ | |
| 112 Returns True if the given stack-frame represents a known | |
| 113 sleeper function (at least in python 2.5) | |
| 114 """ | |
| 115 _filename = last_stack_frame[0] | |
| 116 # _line = last_stack_frame[1] | |
| 117 _funcname = last_stack_frame[2] | |
| 118 _text = last_stack_frame[3] | |
| 119 # Ugly hack to tell if a thread is supposedly sleeping or not | |
| 120 # These are the most common sleeping functions I've found. | |
| 121 # Is there a better way? (python interpreter internals?) | |
| 122 # Tested only with python 2.5 | |
| 123 if _funcname == "wait" and _text == "waiter.acquire()": | |
| 124 return True | |
| 125 if _funcname == "wait" and _text == "_sleep(delay)": | |
| 126 return True | |
| 127 if _funcname == "accept" and _text[-14:] == "_sock.accept()": | |
| 128 return True | |
| 129 if _funcname in ("monitor", "__monitor", "app_loop", "check") \ | |
| 130 and _text.startswith("time.sleep(") and _text.endswith(")"): | |
| 131 return True | |
| 132 if _funcname == "drain_events" and _text == "sleep(polling_interval)": | |
| 133 return True | |
| 134 # Ugly hack: always skip the heartbeat thread | |
| 135 # TODO: get the current thread-id in python | |
| 136 # skip heartbeat thread by thread-id, not by filename | |
| 137 if _filename.find("/lib/galaxy/util/heartbeat.py") != -1: | |
| 138 return True | |
| 139 # By default, assume the thread is not sleeping | |
| 140 return False | |
| 141 | |
| 142 def get_interesting_stack_frame(self, stack_frames): | |
| 143 """ | |
| 144 Scans a given backtrace stack frames, returns a single | |
| 145 quadraple of [filename, line, function-name, text] of | |
| 146 the single, deepest, most interesting frame. | |
| 147 | |
| 148 Interesting being:: | |
| 149 | |
| 150 inside the galaxy source code ("/lib/galaxy"), | |
| 151 prefreably not an egg. | |
| 152 """ | |
| 153 for _filename, _line, _funcname, _text in reversed(stack_frames): | |
| 154 idx = _filename.find("/lib/galaxy/") | |
| 155 if idx != -1: | |
| 156 relative_filename = _filename[idx:] | |
| 157 return (relative_filename, _line, _funcname, _text) | |
| 158 # no "/lib/galaxy" code found, return the innermost frame | |
| 159 return stack_frames[-1] | |
| 160 | |
| 161 def print_nonsleeping(self, threads_object_dict): | |
| 162 self.file_nonsleeping.write("Non-Sleeping threads at %s:\n\n" % time.asctime()) | |
| 163 all_threads_are_sleeping = True | |
| 164 threads = get_current_thread_object_dict() | |
| 165 for thread_id, frame in iteritems(sys._current_frames()): | |
| 166 if thread_id in threads: | |
| 167 object = repr(threads[thread_id]) | |
| 168 else: | |
| 169 object = "<No Thread object>" | |
| 170 tb = traceback.extract_stack(frame) | |
| 171 if self.thread_is_sleeping(tb[-1]): | |
| 172 if thread_id in self.nonsleeping_heartbeats: | |
| 173 del self.nonsleeping_heartbeats[thread_id] | |
| 174 continue | |
| 175 | |
| 176 # Count non-sleeping thread heartbeats | |
| 177 if thread_id in self.nonsleeping_heartbeats: | |
| 178 self.nonsleeping_heartbeats[thread_id] += 1 | |
| 179 else: | |
| 180 self.nonsleeping_heartbeats[thread_id] = 1 | |
| 181 | |
| 182 good_frame = self.get_interesting_stack_frame(tb) | |
| 183 self.file_nonsleeping.write("Thread %s\t%s\tnon-sleeping for %d heartbeat(s)\n File %s:%d\n Function \"%s\"\n %s\n" % | |
| 184 (thread_id, object, self.nonsleeping_heartbeats[thread_id], good_frame[0], good_frame[1], good_frame[2], good_frame[3])) | |
| 185 all_threads_are_sleeping = False | |
| 186 | |
| 187 if all_threads_are_sleeping: | |
| 188 self.file_nonsleeping.write("All threads are sleeping.\n") | |
| 189 self.file_nonsleeping.write("\n") | |
| 190 self.file_nonsleeping.flush() | |
| 191 | |
| 192 def dump_signal_handler(self, signum, frame): | |
| 193 self.dump() | 
