comparison env/lib/python3.9/site-packages/cwltool/task_queue.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4 """TaskQueue."""
5
6 import queue
7 import threading
8 from typing import Callable, Optional
9
10 from .loghandler import _logger
11
12
13 class TaskQueue:
14 """A TaskQueue class.
15
16 Uses a first-in, first-out queue of tasks executed on a fixed number of
17 threads.
18
19 New tasks enter the queue and are started in the order received,
20 as worker threads become available.
21
22 If thread_count == 0 then tasks will be synchronously executed
23 when add() is called (this makes the actual task queue behavior a
24 no-op, but may be a useful configuration knob).
25
26 The thread_count is also used as the maximum size of the queue.
27
28 The threads are created during TaskQueue initialization. Call
29 join() when you're done with the TaskQueue and want the threads to
30 stop.
31
32
33 Attributes
34 ----------
35 in_flight
36 the number of tasks in the queue
37
38 """
39
40 def __init__(self, lock: threading.Lock, thread_count: int):
41 """Create a new task queue using the specified lock and number of threads."""
42 self.thread_count = thread_count
43 self.task_queue: queue.Queue[Optional[Callable[[], None]]] = queue.Queue(
44 maxsize=self.thread_count
45 )
46 self.task_queue_threads = []
47 self.lock = lock
48 self.in_flight = 0
49 self.error: Optional[BaseException] = None
50
51 for _r in range(0, self.thread_count):
52 t = threading.Thread(target=self._task_queue_func)
53 self.task_queue_threads.append(t)
54 t.start()
55
56 def _task_queue_func(self) -> None:
57 while True:
58 task = self.task_queue.get()
59 if task is None:
60 return
61 try:
62 task()
63 except BaseException as e:
64 _logger.exception("Unhandled exception running task")
65 self.error = e
66 finally:
67 with self.lock:
68 self.in_flight -= 1
69
70 def add(
71 self,
72 task: Callable[[], None],
73 unlock: Optional[threading.Condition] = None,
74 check_done: Optional[threading.Event] = None,
75 ) -> None:
76 """
77 Add your task to the queue.
78
79 The optional unlock will be released prior to attempting to add the
80 task to the queue.
81
82 If the optional "check_done" threading.Event's flag is set, then we
83 will skip adding this task to the queue.
84
85 If the TaskQueue was created with thread_count == 0 then your task will
86 be synchronously executed.
87
88 """
89 if self.thread_count == 0:
90 task()
91 return
92
93 with self.lock:
94 self.in_flight += 1
95
96 while True:
97 try:
98 if unlock is not None:
99 unlock.release()
100 if check_done is not None and check_done.is_set():
101 with self.lock:
102 self.in_flight -= 1
103 return
104 self.task_queue.put(task, block=True, timeout=3)
105 return
106 except queue.Full:
107 pass
108 finally:
109 if unlock is not None:
110 unlock.acquire()
111
112 def drain(self) -> None:
113 """Drain the queue."""
114 try:
115 while not self.task_queue.empty():
116 self.task_queue.get(True, 0.1)
117 except queue.Empty:
118 pass
119
120 def join(self) -> None:
121 """Wait for all threads to complete."""
122 for _t in self.task_queue_threads:
123 self.task_queue.put(None)
124 for t in self.task_queue_threads:
125 t.join()