Mercurial > repos > youngkim > ezbamqc
diff ezBAMQC/src/htslib/cram/thread_pool.h @ 0:dfa3745e5fd8
Uploaded
author | youngkim |
---|---|
date | Thu, 24 Mar 2016 17:12:52 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/ezBAMQC/src/htslib/cram/thread_pool.h Thu Mar 24 17:12:52 2016 -0400 @@ -0,0 +1,210 @@ +/* +Copyright (c) 2013 Genome Research Ltd. +Author: James Bonfield <jkb@sanger.ac.uk> + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + + 1. Redistributions of source code must retain the above copyright notice, +this list of conditions and the following disclaimer. + + 2. Redistributions in binary form must reproduce the above copyright notice, +this list of conditions and the following disclaimer in the documentation +and/or other materials provided with the distribution. + + 3. Neither the names Genome Research Ltd and Wellcome Trust Sanger +Institute nor the names of its contributors may be used to endorse or promote +products derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY GENOME RESEARCH LTD AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL GENOME RESEARCH LTD OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + +/* + * This file implements a thread pool for multi-threading applications. + * It consists of two distinct interfaces: thread pools an results queues. + * + * The pool of threads is given a function pointer and void* data to pass in. + * This means the pool can run jobs of multiple types, albeit first come + * first served with no job scheduling. + * + * Upon completion, the return value from the function pointer is added to + * a results queue. We may have multiple queues in use for the one pool. + * + * An example: reading from BAM and writing to CRAM with 10 threads. We'll + * have a pool of 10 threads and two results queues holding decoded BAM blocks + * and encoded CRAM blocks respectively. + */ + +#ifndef _THREAD_POOL_H_ +#define _THREAD_POOL_H_ + +#include <pthread.h> + +struct t_pool; +struct t_results_queue; + +typedef struct t_pool_job { + void *(*func)(void *arg); + void *arg; + struct t_pool_job *next; + + struct t_pool *p; + struct t_results_queue *q; + int serial; +} t_pool_job; + +typedef struct t_res { + struct t_res *next; + int serial; // sequential number for ordering + void *data; // result itself +} t_pool_result; + +struct t_pool; + +typedef struct { + struct t_pool *p; + int idx; + pthread_t tid; + pthread_cond_t pending_c; + long long wait_time; +} t_pool_worker_t; + +typedef struct t_pool { + int qsize; // size of queue + int njobs; // pending job count + int nwaiting; // how many workers waiting for new jobs + int shutdown; // true if pool is being destroyed + + // queue of pending jobs + t_pool_job *head, *tail; + + // threads + int tsize; // maximum number of jobs + t_pool_worker_t *t; + + // Mutexes + pthread_mutex_t pool_m; // used when updating head/tail + + pthread_cond_t empty_c; + pthread_cond_t pending_c; // not empty + pthread_cond_t full_c; + + // array of worker IDs free + int *t_stack, t_stack_top; + + // Debugging to check wait time + long long total_time, wait_time; +} t_pool; + +typedef struct t_results_queue { + t_pool_result *result_head; + t_pool_result *result_tail; + int next_serial; + int curr_serial; + int queue_len; // number of items in queue + int pending; // number of pending items (in progress or in pool list) + pthread_mutex_t result_m; + pthread_cond_t result_avail_c; +} t_results_queue; + + +/* + * Creates a worker pool of length qsize with tsize worker threads. + * + * Returns pool pointer on success; + * NULL on failure + */ +t_pool *t_pool_init(int qsize, int tsize); + +/* + * Adds an item to the work pool. + * + * FIXME: Maybe return 1,0,-1 and distinguish between job dispathed vs + * result returned. Ie rather than blocking on full queue we're permitted + * to return early on "result available" event too. + * Caller would then have a while loop around t_pool_dispatch. + * Or, return -1 and set errno to E_AGAIN to indicate job not yet submitted. + * + * Returns 0 on success + * -1 on failure + */ +int t_pool_dispatch(t_pool *p, t_results_queue *q, + void *(*func)(void *arg), void *arg); +int t_pool_dispatch2(t_pool *p, t_results_queue *q, + void *(*func)(void *arg), void *arg, int nonblock); + +/* + * Flushes the pool, but doesn't exit. This simply drains the queue and + * ensures all worker threads have finished their current task. + * + * Returns 0 on success; + * -1 on failure + */ +int t_pool_flush(t_pool *p); + +/* + * Destroys a thread pool. If 'kill' is true the threads are terminated now, + * otherwise they are joined into the main thread so they will finish their + * current work load. + * + * Use t_pool_destroy(p,0) after a t_pool_flush(p) on a normal shutdown or + * t_pool_destroy(p,1) to quickly exit after a fatal error. + */ +void t_pool_destroy(t_pool *p, int kill); + +/* + * Pulls a result off the head of the result queue. Caller should + * free it (and any internals as appropriate) after use. This doesn't + * wait for a result to be present. + * + * Results will be returned in strict order. + * + * Returns t_pool_result pointer if a result is ready. + * NULL if not. + */ +t_pool_result *t_pool_next_result(t_results_queue *q); +t_pool_result *t_pool_next_result_wait(t_results_queue *q); + +/* + * Frees a result 'r' and if free_data is true also frees + * the internal r->data result too. + */ +void t_pool_delete_result(t_pool_result *r, int free_data); + +/* + * Initialises a results queue. + * + * Results queue pointer on success; + * NULL on failure + */ +t_results_queue *t_results_queue_init(void); + +/* Deallocates memory for a results queue */ +void t_results_queue_destroy(t_results_queue *q); + +/* + * Returns true if there are no items on the finished results queue and + * also none still pending. + */ +int t_pool_results_queue_empty(t_results_queue *q); + +/* + * Returns the number of completed jobs on the results queue. + */ +int t_pool_results_queue_len(t_results_queue *q); + +/* + * Returns the number of completed jobs plus the number queued up to run. + */ +int t_pool_results_queue_sz(t_results_queue *q); + +#endif /* _THREAD_POOL_H_ */