Mercurial > repos > shellac > sam_consensus_v3
annotate env/lib/python3.9/site-packages/cwltool/task_queue.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
| author | shellac | 
|---|---|
| date | Mon, 22 Mar 2021 18:12:50 +0000 | 
| parents | |
| children | 
| rev | line source | 
|---|---|
| 0 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 1 # Copyright (C) The Arvados Authors. All rights reserved. | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 2 # | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 3 # SPDX-License-Identifier: Apache-2.0 | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 4 """TaskQueue.""" | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 5 | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 6 import queue | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 7 import threading | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 8 from typing import Callable, Optional | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 9 | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 10 from .loghandler import _logger | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 11 | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 12 | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 13 class TaskQueue: | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 14 """A TaskQueue class. | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 15 | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 16 Uses a first-in, first-out queue of tasks executed on a fixed number of | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 17 threads. | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 18 | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 19 New tasks enter the queue and are started in the order received, | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 20 as worker threads become available. | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 21 | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 22 If thread_count == 0 then tasks will be synchronously executed | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 23 when add() is called (this makes the actual task queue behavior a | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 24 no-op, but may be a useful configuration knob). | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 25 | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 26 The thread_count is also used as the maximum size of the queue. | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 27 | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 28 The threads are created during TaskQueue initialization. Call | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 29 join() when you're done with the TaskQueue and want the threads to | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 30 stop. | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 31 | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 32 | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 33 Attributes | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 34 ---------- | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 35 in_flight | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 36 the number of tasks in the queue | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 37 | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 38 """ | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 39 | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 40 def __init__(self, lock: threading.Lock, thread_count: int): | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 41 """Create a new task queue using the specified lock and number of threads.""" | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 42 self.thread_count = thread_count | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 43 self.task_queue: queue.Queue[Optional[Callable[[], None]]] = queue.Queue( | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 44 maxsize=self.thread_count | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 45 ) | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 46 self.task_queue_threads = [] | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 47 self.lock = lock | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 48 self.in_flight = 0 | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 49 self.error: Optional[BaseException] = None | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 50 | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 51 for _r in range(0, self.thread_count): | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 52 t = threading.Thread(target=self._task_queue_func) | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 53 self.task_queue_threads.append(t) | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 54 t.start() | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 55 | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 56 def _task_queue_func(self) -> None: | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 57 while True: | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 58 task = self.task_queue.get() | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 59 if task is None: | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 60 return | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 61 try: | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 62 task() | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 63 except BaseException as e: | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 64 _logger.exception("Unhandled exception running task") | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 65 self.error = e | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 66 finally: | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 67 with self.lock: | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 68 self.in_flight -= 1 | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 69 | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 70 def add( | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 71 self, | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 72 task: Callable[[], None], | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 73 unlock: Optional[threading.Condition] = None, | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 74 check_done: Optional[threading.Event] = None, | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 75 ) -> None: | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 76 """ | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 77 Add your task to the queue. | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 78 | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 79 The optional unlock will be released prior to attempting to add the | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 80 task to the queue. | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 81 | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 82 If the optional "check_done" threading.Event's flag is set, then we | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 83 will skip adding this task to the queue. | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 84 | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 85 If the TaskQueue was created with thread_count == 0 then your task will | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 86 be synchronously executed. | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 87 | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 88 """ | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 89 if self.thread_count == 0: | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 90 task() | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 91 return | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 92 | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 93 with self.lock: | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 94 self.in_flight += 1 | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 95 | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 96 while True: | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 97 try: | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 98 if unlock is not None: | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 99 unlock.release() | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 100 if check_done is not None and check_done.is_set(): | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 101 with self.lock: | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 102 self.in_flight -= 1 | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 103 return | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 104 self.task_queue.put(task, block=True, timeout=3) | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 105 return | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 106 except queue.Full: | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 107 pass | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 108 finally: | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 109 if unlock is not None: | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 110 unlock.acquire() | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 111 | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 112 def drain(self) -> None: | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 113 """Drain the queue.""" | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 114 try: | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 115 while not self.task_queue.empty(): | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 116 self.task_queue.get(True, 0.1) | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 117 except queue.Empty: | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 118 pass | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 119 | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 120 def join(self) -> None: | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 121 """Wait for all threads to complete.""" | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 122 for _t in self.task_queue_threads: | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 123 self.task_queue.put(None) | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 124 for t in self.task_queue_threads: | 
| 
4f3585e2f14b
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
 shellac parents: diff
changeset | 125 t.join() | 
