Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/galaxy/util/heartbeat.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 import os | |
2 import sys | |
3 import threading | |
4 import time | |
5 import traceback | |
6 | |
7 | |
8 def get_current_thread_object_dict(): | |
9 """ | |
10 Get a dictionary of all 'Thread' objects created via the threading | |
11 module keyed by thread_id. Note that not all interpreter threads | |
12 have a thread objects, only the main thread and any created via the | |
13 'threading' module. Threads created via the low level 'thread' module | |
14 will not be in the returned dictionary. | |
15 | |
16 HACK: This mucks with the internals of the threading module since that | |
17 module does not expose any way to match 'Thread' objects with | |
18 intepreter thread identifiers (though it should). | |
19 """ | |
20 rval = dict() | |
21 # Acquire the lock and then union the contents of 'active' and 'limbo' | |
22 # threads into the return value. | |
23 threading._active_limbo_lock.acquire() | |
24 rval.update(threading._active) | |
25 rval.update(threading._limbo) | |
26 threading._active_limbo_lock.release() | |
27 return rval | |
28 | |
29 | |
30 class Heartbeat(threading.Thread): | |
31 """ | |
32 Thread that periodically dumps the state of all threads to a file | |
33 """ | |
34 | |
35 def __init__(self, config, name="Heartbeat Thread", period=20, fname="heartbeat.log"): | |
36 threading.Thread.__init__(self, name=name) | |
37 self.config = config | |
38 self.should_stop = False | |
39 self.period = period | |
40 self.fname = fname | |
41 self.file = None | |
42 self.fname_nonsleeping = None | |
43 self.file_nonsleeping = None | |
44 self.pid = None | |
45 self.nonsleeping_heartbeats = {} | |
46 # Event to wait on when sleeping, allows us to interrupt for shutdown | |
47 self.wait_event = threading.Event() | |
48 | |
49 def run(self): | |
50 self.pid = os.getpid() | |
51 self.fname = self.fname.format( | |
52 server_name=self.config.server_name, | |
53 pid=self.pid | |
54 ) | |
55 fname, ext = os.path.splitext(self.fname) | |
56 self.fname_nonsleeping = fname + '.nonsleeping' + ext | |
57 wait = self.period | |
58 if self.period <= 0: | |
59 wait = 60 | |
60 while not self.should_stop: | |
61 if self.period > 0: | |
62 self.dump() | |
63 self.wait_event.wait(wait) | |
64 | |
65 def open_logs(self): | |
66 if self.file is None or self.file.closed: | |
67 self.file = open(self.fname, "a") | |
68 self.file_nonsleeping = open(self.fname_nonsleeping, "a") | |
69 self.file.write("Heartbeat for pid %d thread started at %s\n\n" % (self.pid, time.asctime())) | |
70 self.file_nonsleeping.write("Non-Sleeping-threads for pid %d thread started at %s\n\n" % (self.pid, time.asctime())) | |
71 | |
72 def close_logs(self): | |
73 if self.file is not None and not self.file.closed: | |
74 self.file.write("Heartbeat for pid %d thread stopped at %s\n\n" % (self.pid, time.asctime())) | |
75 self.file_nonsleeping.write("Non-Sleeping-threads for pid %d thread stopped at %s\n\n" % (self.pid, time.asctime())) | |
76 self.file.close() | |
77 self.file_nonsleeping.close() | |
78 | |
79 def dump(self): | |
80 self.open_logs() | |
81 try: | |
82 # Print separator with timestamp | |
83 self.file.write("Traceback dump for all threads at %s:\n\n" % time.asctime()) | |
84 # Print the thread states | |
85 threads = get_current_thread_object_dict() | |
86 for thread_id, frame in sys._current_frames().items(): | |
87 if thread_id in threads: | |
88 object = repr(threads[thread_id]) | |
89 else: | |
90 object = "<No Thread object>" | |
91 self.file.write(f"Thread {thread_id}, {object}:\n\n") | |
92 traceback.print_stack(frame, file=self.file) | |
93 self.file.write("\n") | |
94 self.file.write("End dump\n\n") | |
95 self.file.flush() | |
96 self.print_nonsleeping(threads) | |
97 except Exception: | |
98 self.file.write("Caught exception attempting to dump thread states:") | |
99 traceback.print_exc(None, self.file) | |
100 self.file.write("\n") | |
101 | |
102 def shutdown(self): | |
103 self.should_stop = True | |
104 self.wait_event.set() | |
105 self.close_logs() | |
106 self.join() | |
107 | |
108 def thread_is_sleeping(self, last_stack_frame): | |
109 """ | |
110 Returns True if the given stack-frame represents a known | |
111 sleeper function (at least in python 2.5) | |
112 """ | |
113 _filename = last_stack_frame[0] | |
114 # _line = last_stack_frame[1] | |
115 _funcname = last_stack_frame[2] | |
116 _text = last_stack_frame[3] | |
117 # Ugly hack to tell if a thread is supposedly sleeping or not | |
118 # These are the most common sleeping functions I've found. | |
119 # Is there a better way? (python interpreter internals?) | |
120 # Tested only with python 2.5 | |
121 if _funcname == "wait" and _text == "waiter.acquire()": | |
122 return True | |
123 if _funcname == "wait" and _text == "_sleep(delay)": | |
124 return True | |
125 if _funcname == "accept" and _text[-14:] == "_sock.accept()": | |
126 return True | |
127 if _funcname in ("monitor", "__monitor", "app_loop", "check") \ | |
128 and _text.startswith("time.sleep(") and _text.endswith(")"): | |
129 return True | |
130 if _funcname == "drain_events" and _text == "sleep(polling_interval)": | |
131 return True | |
132 # Ugly hack: always skip the heartbeat thread | |
133 # TODO: get the current thread-id in python | |
134 # skip heartbeat thread by thread-id, not by filename | |
135 if _filename.find("/lib/galaxy/util/heartbeat.py") != -1: | |
136 return True | |
137 # By default, assume the thread is not sleeping | |
138 return False | |
139 | |
140 def get_interesting_stack_frame(self, stack_frames): | |
141 """ | |
142 Scans a given backtrace stack frames, returns a single | |
143 quadraple of [filename, line, function-name, text] of | |
144 the single, deepest, most interesting frame. | |
145 | |
146 Interesting being:: | |
147 | |
148 inside the galaxy source code ("/lib/galaxy"), | |
149 prefreably not an egg. | |
150 """ | |
151 for _filename, _line, _funcname, _text in reversed(stack_frames): | |
152 idx = _filename.find("/lib/galaxy/") | |
153 if idx != -1: | |
154 relative_filename = _filename[idx:] | |
155 return (relative_filename, _line, _funcname, _text) | |
156 # no "/lib/galaxy" code found, return the innermost frame | |
157 return stack_frames[-1] | |
158 | |
159 def print_nonsleeping(self, threads_object_dict): | |
160 self.file_nonsleeping.write("Non-Sleeping threads at %s:\n\n" % time.asctime()) | |
161 all_threads_are_sleeping = True | |
162 threads = get_current_thread_object_dict() | |
163 for thread_id, frame in sys._current_frames().items(): | |
164 if thread_id in threads: | |
165 object = repr(threads[thread_id]) | |
166 else: | |
167 object = "<No Thread object>" | |
168 tb = traceback.extract_stack(frame) | |
169 if self.thread_is_sleeping(tb[-1]): | |
170 if thread_id in self.nonsleeping_heartbeats: | |
171 del self.nonsleeping_heartbeats[thread_id] | |
172 continue | |
173 | |
174 # Count non-sleeping thread heartbeats | |
175 if thread_id in self.nonsleeping_heartbeats: | |
176 self.nonsleeping_heartbeats[thread_id] += 1 | |
177 else: | |
178 self.nonsleeping_heartbeats[thread_id] = 1 | |
179 | |
180 good_frame = self.get_interesting_stack_frame(tb) | |
181 self.file_nonsleeping.write("Thread %s\t%s\tnon-sleeping for %d heartbeat(s)\n File %s:%d\n Function \"%s\"\n %s\n" % | |
182 (thread_id, object, self.nonsleeping_heartbeats[thread_id], good_frame[0], good_frame[1], good_frame[2], good_frame[3])) | |
183 all_threads_are_sleeping = False | |
184 | |
185 if all_threads_are_sleeping: | |
186 self.file_nonsleeping.write("All threads are sleeping.\n") | |
187 self.file_nonsleeping.write("\n") | |
188 self.file_nonsleeping.flush() | |
189 | |
190 def dump_signal_handler(self, signum, frame): | |
191 self.dump() |