diff ezBAMQC/src/htslib/cram/thread_pool.h @ 0:dfa3745e5fd8

Uploaded
author youngkim
date Thu, 24 Mar 2016 17:12:52 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/ezBAMQC/src/htslib/cram/thread_pool.h	Thu Mar 24 17:12:52 2016 -0400
@@ -0,0 +1,210 @@
+/*
+Copyright (c) 2013 Genome Research Ltd.
+Author: James Bonfield <jkb@sanger.ac.uk>
+
+Redistribution and use in source and binary forms, with or without 
+modification, are permitted provided that the following conditions are met:
+
+   1. Redistributions of source code must retain the above copyright notice, 
+this list of conditions and the following disclaimer.
+
+   2. Redistributions in binary form must reproduce the above copyright notice, 
+this list of conditions and the following disclaimer in the documentation 
+and/or other materials provided with the distribution.
+
+   3. Neither the names Genome Research Ltd and Wellcome Trust Sanger
+Institute nor the names of its contributors may be used to endorse or promote
+products derived from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY GENOME RESEARCH LTD AND CONTRIBUTORS "AS IS" AND 
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 
+DISCLAIMED. IN NO EVENT SHALL GENOME RESEARCH LTD OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+/*
+ * This file implements a thread pool for multi-threading applications.
+ * It consists of two distinct interfaces: thread pools an results queues.
+ *
+ * The pool of threads is given a function pointer and void* data to pass in.
+ * This means the pool can run jobs of multiple types, albeit first come
+ * first served with no job scheduling.
+ *
+ * Upon completion, the return value from the function pointer is added to
+ * a results queue. We may have multiple queues in use for the one pool.
+ *
+ * An example: reading from BAM and writing to CRAM with 10 threads. We'll
+ * have a pool of 10 threads and two results queues holding decoded BAM blocks
+ * and encoded CRAM blocks respectively.
+ */
+
+#ifndef _THREAD_POOL_H_
+#define _THREAD_POOL_H_
+
+#include <pthread.h>
+
+struct t_pool;
+struct t_results_queue;
+
+typedef struct t_pool_job {
+    void *(*func)(void *arg);
+    void *arg;
+    struct t_pool_job *next;
+
+    struct t_pool *p;
+    struct t_results_queue *q;
+    int serial;
+} t_pool_job;
+
+typedef struct t_res {
+    struct t_res *next;
+    int serial; // sequential number for ordering
+    void *data; // result itself
+} t_pool_result;
+
+struct t_pool;
+
+typedef struct {
+    struct t_pool *p;
+    int idx;
+    pthread_t tid;
+    pthread_cond_t  pending_c;
+    long long wait_time;
+} t_pool_worker_t;
+
+typedef struct t_pool {
+    int qsize;    // size of queue
+    int njobs;    // pending job count
+    int nwaiting; // how many workers waiting for new jobs
+    int shutdown; // true if pool is being destroyed
+
+    // queue of pending jobs
+    t_pool_job *head, *tail;
+
+    // threads
+    int tsize;    // maximum number of jobs
+    t_pool_worker_t *t;
+
+    // Mutexes
+    pthread_mutex_t pool_m; // used when updating head/tail
+
+    pthread_cond_t  empty_c;
+    pthread_cond_t  pending_c; // not empty
+    pthread_cond_t  full_c;
+
+    // array of worker IDs free
+    int *t_stack, t_stack_top;
+
+    // Debugging to check wait time
+    long long total_time, wait_time;
+} t_pool;
+
+typedef struct t_results_queue {
+    t_pool_result *result_head;
+    t_pool_result *result_tail;
+    int next_serial;
+    int curr_serial;
+    int queue_len;  // number of items in queue
+    int pending;    // number of pending items (in progress or in pool list)
+    pthread_mutex_t result_m;
+    pthread_cond_t result_avail_c;
+} t_results_queue;
+
+
+/*
+ * Creates a worker pool of length qsize with tsize worker threads.
+ *
+ * Returns pool pointer on success;
+ *         NULL on failure
+ */
+t_pool *t_pool_init(int qsize, int tsize);
+
+/*
+ * Adds an item to the work pool.
+ *
+ * FIXME: Maybe return 1,0,-1 and distinguish between job dispathed vs
+ * result returned. Ie rather than blocking on full queue we're permitted
+ * to return early on "result available" event too.
+ * Caller would then have a while loop around t_pool_dispatch.
+ * Or, return -1 and set errno to E_AGAIN to indicate job not yet submitted.
+ *
+ * Returns 0 on success
+ *        -1 on failure
+ */
+int t_pool_dispatch(t_pool *p, t_results_queue *q,
+		    void *(*func)(void *arg), void *arg);
+int t_pool_dispatch2(t_pool *p, t_results_queue *q,
+		     void *(*func)(void *arg), void *arg, int nonblock);
+
+/*
+ * Flushes the pool, but doesn't exit. This simply drains the queue and
+ * ensures all worker threads have finished their current task.
+ *
+ * Returns 0 on success;
+ *        -1 on failure
+ */
+int t_pool_flush(t_pool *p);
+
+/*
+ * Destroys a thread pool. If 'kill' is true the threads are terminated now,
+ * otherwise they are joined into the main thread so they will finish their
+ * current work load.
+ *
+ * Use t_pool_destroy(p,0) after a t_pool_flush(p) on a normal shutdown or
+ * t_pool_destroy(p,1) to quickly exit after a fatal error.
+ */
+void t_pool_destroy(t_pool *p, int kill);
+
+/*
+ * Pulls a result off the head of the result queue. Caller should
+ * free it (and any internals as appropriate) after use. This doesn't
+ * wait for a result to be present.
+ *
+ * Results will be returned in strict order.
+ * 
+ * Returns t_pool_result pointer if a result is ready.
+ *         NULL if not.
+ */
+t_pool_result *t_pool_next_result(t_results_queue *q);
+t_pool_result *t_pool_next_result_wait(t_results_queue *q);
+
+/*
+ * Frees a result 'r' and if free_data is true also frees
+ * the internal r->data result too.
+ */
+void t_pool_delete_result(t_pool_result *r, int free_data);
+
+/*
+ * Initialises a results queue.
+ *
+ * Results queue pointer on success;
+ *         NULL on failure
+ */
+t_results_queue *t_results_queue_init(void);
+
+/* Deallocates memory for a results queue */
+void t_results_queue_destroy(t_results_queue *q);
+
+/*
+ * Returns true if there are no items on the finished results queue and
+ * also none still pending.
+ */
+int t_pool_results_queue_empty(t_results_queue *q);
+
+/*
+ * Returns the number of completed jobs on the results queue.
+ */
+int t_pool_results_queue_len(t_results_queue *q);
+
+/*
+ * Returns the number of completed jobs plus the number queued up to run.
+ */
+int t_pool_results_queue_sz(t_results_queue *q);
+
+#endif /* _THREAD_POOL_H_ */