diff env/lib/python3.9/site-packages/cwltool/task_queue.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/cwltool/task_queue.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,125 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+"""TaskQueue."""
+
+import queue
+import threading
+from typing import Callable, Optional
+
+from .loghandler import _logger
+
+
+class TaskQueue:
+    """A TaskQueue class.
+
+    Uses a first-in, first-out queue of tasks executed on a fixed number of
+    threads.
+
+    New tasks enter the queue and are started in the order received,
+    as worker threads become available.
+
+    If thread_count == 0 then tasks will be synchronously executed
+    when add() is called (this makes the actual task queue behavior a
+    no-op, but may be a useful configuration knob).
+
+    The thread_count is also used as the maximum size of the queue.
+
+    The threads are created during TaskQueue initialization.  Call
+    join() when you're done with the TaskQueue and want the threads to
+    stop.
+
+
+    Attributes
+    ----------
+    in_flight
+        the number of tasks in the queue
+
+    """
+
+    def __init__(self, lock: threading.Lock, thread_count: int):
+        """Create a new task queue using the specified lock and number of threads."""
+        self.thread_count = thread_count
+        self.task_queue: queue.Queue[Optional[Callable[[], None]]] = queue.Queue(
+            maxsize=self.thread_count
+        )
+        self.task_queue_threads = []
+        self.lock = lock
+        self.in_flight = 0
+        self.error: Optional[BaseException] = None
+
+        for _r in range(0, self.thread_count):
+            t = threading.Thread(target=self._task_queue_func)
+            self.task_queue_threads.append(t)
+            t.start()
+
+    def _task_queue_func(self) -> None:
+        while True:
+            task = self.task_queue.get()
+            if task is None:
+                return
+            try:
+                task()
+            except BaseException as e:
+                _logger.exception("Unhandled exception running task")
+                self.error = e
+            finally:
+                with self.lock:
+                    self.in_flight -= 1
+
+    def add(
+        self,
+        task: Callable[[], None],
+        unlock: Optional[threading.Condition] = None,
+        check_done: Optional[threading.Event] = None,
+    ) -> None:
+        """
+        Add your task to the queue.
+
+        The optional unlock will be released prior to attempting to add the
+        task to the queue.
+
+        If the optional "check_done" threading.Event's flag is set, then we
+        will skip adding this task to the queue.
+
+        If the TaskQueue was created with thread_count == 0 then your task will
+        be synchronously executed.
+
+        """
+        if self.thread_count == 0:
+            task()
+            return
+
+        with self.lock:
+            self.in_flight += 1
+
+        while True:
+            try:
+                if unlock is not None:
+                    unlock.release()
+                if check_done is not None and check_done.is_set():
+                    with self.lock:
+                        self.in_flight -= 1
+                    return
+                self.task_queue.put(task, block=True, timeout=3)
+                return
+            except queue.Full:
+                pass
+            finally:
+                if unlock is not None:
+                    unlock.acquire()
+
+    def drain(self) -> None:
+        """Drain the queue."""
+        try:
+            while not self.task_queue.empty():
+                self.task_queue.get(True, 0.1)
+        except queue.Empty:
+            pass
+
+    def join(self) -> None:
+        """Wait for all threads to complete."""
+        for _t in self.task_queue_threads:
+            self.task_queue.put(None)
+        for t in self.task_queue_threads:
+            t.join()