comparison env/lib/python3.9/site-packages/galaxy/util/heartbeat.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 import os
2 import sys
3 import threading
4 import time
5 import traceback
6
7
8 def get_current_thread_object_dict():
9 """
10 Get a dictionary of all 'Thread' objects created via the threading
11 module keyed by thread_id. Note that not all interpreter threads
12 have a thread objects, only the main thread and any created via the
13 'threading' module. Threads created via the low level 'thread' module
14 will not be in the returned dictionary.
15
16 HACK: This mucks with the internals of the threading module since that
17 module does not expose any way to match 'Thread' objects with
18 intepreter thread identifiers (though it should).
19 """
20 rval = dict()
21 # Acquire the lock and then union the contents of 'active' and 'limbo'
22 # threads into the return value.
23 threading._active_limbo_lock.acquire()
24 rval.update(threading._active)
25 rval.update(threading._limbo)
26 threading._active_limbo_lock.release()
27 return rval
28
29
30 class Heartbeat(threading.Thread):
31 """
32 Thread that periodically dumps the state of all threads to a file
33 """
34
35 def __init__(self, config, name="Heartbeat Thread", period=20, fname="heartbeat.log"):
36 threading.Thread.__init__(self, name=name)
37 self.config = config
38 self.should_stop = False
39 self.period = period
40 self.fname = fname
41 self.file = None
42 self.fname_nonsleeping = None
43 self.file_nonsleeping = None
44 self.pid = None
45 self.nonsleeping_heartbeats = {}
46 # Event to wait on when sleeping, allows us to interrupt for shutdown
47 self.wait_event = threading.Event()
48
49 def run(self):
50 self.pid = os.getpid()
51 self.fname = self.fname.format(
52 server_name=self.config.server_name,
53 pid=self.pid
54 )
55 fname, ext = os.path.splitext(self.fname)
56 self.fname_nonsleeping = fname + '.nonsleeping' + ext
57 wait = self.period
58 if self.period <= 0:
59 wait = 60
60 while not self.should_stop:
61 if self.period > 0:
62 self.dump()
63 self.wait_event.wait(wait)
64
65 def open_logs(self):
66 if self.file is None or self.file.closed:
67 self.file = open(self.fname, "a")
68 self.file_nonsleeping = open(self.fname_nonsleeping, "a")
69 self.file.write("Heartbeat for pid %d thread started at %s\n\n" % (self.pid, time.asctime()))
70 self.file_nonsleeping.write("Non-Sleeping-threads for pid %d thread started at %s\n\n" % (self.pid, time.asctime()))
71
72 def close_logs(self):
73 if self.file is not None and not self.file.closed:
74 self.file.write("Heartbeat for pid %d thread stopped at %s\n\n" % (self.pid, time.asctime()))
75 self.file_nonsleeping.write("Non-Sleeping-threads for pid %d thread stopped at %s\n\n" % (self.pid, time.asctime()))
76 self.file.close()
77 self.file_nonsleeping.close()
78
79 def dump(self):
80 self.open_logs()
81 try:
82 # Print separator with timestamp
83 self.file.write("Traceback dump for all threads at %s:\n\n" % time.asctime())
84 # Print the thread states
85 threads = get_current_thread_object_dict()
86 for thread_id, frame in sys._current_frames().items():
87 if thread_id in threads:
88 object = repr(threads[thread_id])
89 else:
90 object = "<No Thread object>"
91 self.file.write(f"Thread {thread_id}, {object}:\n\n")
92 traceback.print_stack(frame, file=self.file)
93 self.file.write("\n")
94 self.file.write("End dump\n\n")
95 self.file.flush()
96 self.print_nonsleeping(threads)
97 except Exception:
98 self.file.write("Caught exception attempting to dump thread states:")
99 traceback.print_exc(None, self.file)
100 self.file.write("\n")
101
102 def shutdown(self):
103 self.should_stop = True
104 self.wait_event.set()
105 self.close_logs()
106 self.join()
107
108 def thread_is_sleeping(self, last_stack_frame):
109 """
110 Returns True if the given stack-frame represents a known
111 sleeper function (at least in python 2.5)
112 """
113 _filename = last_stack_frame[0]
114 # _line = last_stack_frame[1]
115 _funcname = last_stack_frame[2]
116 _text = last_stack_frame[3]
117 # Ugly hack to tell if a thread is supposedly sleeping or not
118 # These are the most common sleeping functions I've found.
119 # Is there a better way? (python interpreter internals?)
120 # Tested only with python 2.5
121 if _funcname == "wait" and _text == "waiter.acquire()":
122 return True
123 if _funcname == "wait" and _text == "_sleep(delay)":
124 return True
125 if _funcname == "accept" and _text[-14:] == "_sock.accept()":
126 return True
127 if _funcname in ("monitor", "__monitor", "app_loop", "check") \
128 and _text.startswith("time.sleep(") and _text.endswith(")"):
129 return True
130 if _funcname == "drain_events" and _text == "sleep(polling_interval)":
131 return True
132 # Ugly hack: always skip the heartbeat thread
133 # TODO: get the current thread-id in python
134 # skip heartbeat thread by thread-id, not by filename
135 if _filename.find("/lib/galaxy/util/heartbeat.py") != -1:
136 return True
137 # By default, assume the thread is not sleeping
138 return False
139
140 def get_interesting_stack_frame(self, stack_frames):
141 """
142 Scans a given backtrace stack frames, returns a single
143 quadraple of [filename, line, function-name, text] of
144 the single, deepest, most interesting frame.
145
146 Interesting being::
147
148 inside the galaxy source code ("/lib/galaxy"),
149 prefreably not an egg.
150 """
151 for _filename, _line, _funcname, _text in reversed(stack_frames):
152 idx = _filename.find("/lib/galaxy/")
153 if idx != -1:
154 relative_filename = _filename[idx:]
155 return (relative_filename, _line, _funcname, _text)
156 # no "/lib/galaxy" code found, return the innermost frame
157 return stack_frames[-1]
158
159 def print_nonsleeping(self, threads_object_dict):
160 self.file_nonsleeping.write("Non-Sleeping threads at %s:\n\n" % time.asctime())
161 all_threads_are_sleeping = True
162 threads = get_current_thread_object_dict()
163 for thread_id, frame in sys._current_frames().items():
164 if thread_id in threads:
165 object = repr(threads[thread_id])
166 else:
167 object = "<No Thread object>"
168 tb = traceback.extract_stack(frame)
169 if self.thread_is_sleeping(tb[-1]):
170 if thread_id in self.nonsleeping_heartbeats:
171 del self.nonsleeping_heartbeats[thread_id]
172 continue
173
174 # Count non-sleeping thread heartbeats
175 if thread_id in self.nonsleeping_heartbeats:
176 self.nonsleeping_heartbeats[thread_id] += 1
177 else:
178 self.nonsleeping_heartbeats[thread_id] = 1
179
180 good_frame = self.get_interesting_stack_frame(tb)
181 self.file_nonsleeping.write("Thread %s\t%s\tnon-sleeping for %d heartbeat(s)\n File %s:%d\n Function \"%s\"\n %s\n" %
182 (thread_id, object, self.nonsleeping_heartbeats[thread_id], good_frame[0], good_frame[1], good_frame[2], good_frame[3]))
183 all_threads_are_sleeping = False
184
185 if all_threads_are_sleeping:
186 self.file_nonsleeping.write("All threads are sleeping.\n")
187 self.file_nonsleeping.write("\n")
188 self.file_nonsleeping.flush()
189
190 def dump_signal_handler(self, signum, frame):
191 self.dump()