| 0 | 1 /* | 
|  | 2 Copyright (c) 2013 Genome Research Ltd. | 
|  | 3 Author: James Bonfield <jkb@sanger.ac.uk> | 
|  | 4 | 
|  | 5 Redistribution and use in source and binary forms, with or without | 
|  | 6 modification, are permitted provided that the following conditions are met: | 
|  | 7 | 
|  | 8    1. Redistributions of source code must retain the above copyright notice, | 
|  | 9 this list of conditions and the following disclaimer. | 
|  | 10 | 
|  | 11    2. Redistributions in binary form must reproduce the above copyright notice, | 
|  | 12 this list of conditions and the following disclaimer in the documentation | 
|  | 13 and/or other materials provided with the distribution. | 
|  | 14 | 
|  | 15    3. Neither the names Genome Research Ltd and Wellcome Trust Sanger | 
|  | 16 Institute nor the names of its contributors may be used to endorse or promote | 
|  | 17 products derived from this software without specific prior written permission. | 
|  | 18 | 
|  | 19 THIS SOFTWARE IS PROVIDED BY GENOME RESEARCH LTD AND CONTRIBUTORS "AS IS" AND | 
|  | 20 ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED | 
|  | 21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE | 
|  | 22 DISCLAIMED. IN NO EVENT SHALL GENOME RESEARCH LTD OR CONTRIBUTORS BE LIABLE | 
|  | 23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL | 
|  | 24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR | 
|  | 25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER | 
|  | 26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, | 
|  | 27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE | 
|  | 28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | 
|  | 29 */ | 
|  | 30 | 
|  | 31 #include <stdlib.h> | 
|  | 32 | 
|  | 33 #include <signal.h> | 
|  | 34 #include <errno.h> | 
|  | 35 #include <stdio.h> | 
|  | 36 #include <string.h> | 
|  | 37 #include <sys/time.h> | 
|  | 38 #include <assert.h> | 
|  | 39 | 
|  | 40 #include "cram/thread_pool.h" | 
|  | 41 | 
|  | 42 //#define DEBUG | 
|  | 43 //#define DEBUG_TIME | 
|  | 44 | 
|  | 45 #define IN_ORDER | 
|  | 46 | 
|  | 47 #ifdef DEBUG | 
|  | 48 static int worker_id(t_pool *p) { | 
|  | 49     int i; | 
|  | 50     pthread_t s = pthread_self(); | 
|  | 51     for (i = 0; i < p->tsize; i++) { | 
|  | 52 	if (pthread_equal(s, p->t[i].tid)) | 
|  | 53 	    return i; | 
|  | 54     } | 
|  | 55     return -1; | 
|  | 56 } | 
|  | 57 #endif | 
|  | 58 | 
|  | 59 /* ---------------------------------------------------------------------------- | 
|  | 60  * A queue to hold results from the thread pool. | 
|  | 61  * | 
|  | 62  * Each thread pool may have jobs of multiple types being queued up and | 
|  | 63  * interleaved, so we allow several results queue per pool. | 
|  | 64  * | 
|  | 65  * The jobs themselves are expected to push their results onto their | 
|  | 66  * appropriate results queue. | 
|  | 67  */ | 
|  | 68 | 
|  | 69 /* | 
|  | 70  * Adds a result to the end of the result queue. | 
|  | 71  * | 
|  | 72  * Returns 0 on success; | 
|  | 73  *        -1 on failure | 
|  | 74  */ | 
|  | 75 static int t_pool_add_result(t_pool_job *j, void *data) { | 
|  | 76     t_results_queue *q = j->q; | 
|  | 77     t_pool_result *r; | 
|  | 78 | 
|  | 79 #ifdef DEBUG | 
|  | 80     fprintf(stderr, "%d: Adding resulting to queue %p, serial %d\n", | 
|  | 81 	    worker_id(j->p), q, j->serial); | 
|  | 82 #endif | 
|  | 83 | 
|  | 84     /* No results queue is fine if we don't want any results back */ | 
|  | 85     if (!q) | 
|  | 86 	return 0; | 
|  | 87 | 
|  | 88     if (!(r = malloc(sizeof(*r)))) | 
|  | 89 	return -1; | 
|  | 90 | 
|  | 91     r->next = NULL; | 
|  | 92     r->data = data; | 
|  | 93     r->serial = j->serial; | 
|  | 94 | 
|  | 95     pthread_mutex_lock(&q->result_m); | 
|  | 96     if (q->result_tail) { | 
|  | 97 	q->result_tail->next = r; | 
|  | 98 	q->result_tail = r; | 
|  | 99     } else { | 
|  | 100 	q->result_head = q->result_tail = r; | 
|  | 101     } | 
|  | 102     q->queue_len++; | 
|  | 103     q->pending--; | 
|  | 104 | 
|  | 105 #ifdef DEBUG | 
|  | 106     fprintf(stderr, "%d: Broadcasting result_avail (id %d)\n", | 
|  | 107 	    worker_id(j->p), r->serial); | 
|  | 108 #endif | 
|  | 109     pthread_cond_signal(&q->result_avail_c); | 
|  | 110 #ifdef DEBUG | 
|  | 111     fprintf(stderr, "%d: Broadcast complete\n", worker_id(j->p)); | 
|  | 112 #endif | 
|  | 113 | 
|  | 114     pthread_mutex_unlock(&q->result_m); | 
|  | 115 | 
|  | 116     return 0; | 
|  | 117 } | 
|  | 118 | 
|  | 119 /* Core of t_pool_next_result() */ | 
|  | 120 static t_pool_result *t_pool_next_result_locked(t_results_queue *q) { | 
|  | 121     t_pool_result *r, *last; | 
|  | 122 | 
|  | 123     for (last = NULL, r = q->result_head; r; last = r, r = r->next) { | 
|  | 124 	if (r->serial == q->next_serial) | 
|  | 125 	    break; | 
|  | 126     } | 
|  | 127 | 
|  | 128     if (r) { | 
|  | 129 	if (q->result_head == r) | 
|  | 130 	    q->result_head = r->next; | 
|  | 131 	else | 
|  | 132 	    last->next = r->next; | 
|  | 133 | 
|  | 134 	if (q->result_tail == r) | 
|  | 135 	    q->result_tail = last; | 
|  | 136 | 
|  | 137 	if (!q->result_head) | 
|  | 138 	    q->result_tail = NULL; | 
|  | 139 | 
|  | 140 	q->next_serial++; | 
|  | 141 	q->queue_len--; | 
|  | 142     } | 
|  | 143 | 
|  | 144     return r; | 
|  | 145 } | 
|  | 146 | 
|  | 147 /* | 
|  | 148  * Pulls a result off the head of the result queue. Caller should | 
|  | 149  * free it (and any internals as appropriate) after use. This doesn't | 
|  | 150  * wait for a result to be present. | 
|  | 151  * | 
|  | 152  * Results will be returned in strict order. | 
|  | 153  * | 
|  | 154  * Returns t_pool_result pointer if a result is ready. | 
|  | 155  *         NULL if not. | 
|  | 156  */ | 
|  | 157 t_pool_result *t_pool_next_result(t_results_queue *q) { | 
|  | 158     t_pool_result *r; | 
|  | 159 | 
|  | 160 #ifdef DEBUG | 
|  | 161     fprintf(stderr, "Requesting next result on queue %p\n", q); | 
|  | 162 #endif | 
|  | 163 | 
|  | 164     pthread_mutex_lock(&q->result_m); | 
|  | 165     r = t_pool_next_result_locked(q); | 
|  | 166     pthread_mutex_unlock(&q->result_m); | 
|  | 167 | 
|  | 168 #ifdef DEBUG | 
|  | 169     fprintf(stderr, "(q=%p) Found %p\n", q, r); | 
|  | 170 #endif | 
|  | 171 | 
|  | 172     return r; | 
|  | 173 } | 
|  | 174 | 
|  | 175 t_pool_result *t_pool_next_result_wait(t_results_queue *q) { | 
|  | 176     t_pool_result *r; | 
|  | 177 | 
|  | 178 #ifdef DEBUG | 
|  | 179     fprintf(stderr, "Waiting for result %d...\n", q->next_serial); | 
|  | 180 #endif | 
|  | 181 | 
|  | 182     pthread_mutex_lock(&q->result_m); | 
|  | 183     while (!(r = t_pool_next_result_locked(q))) { | 
|  | 184 	/* Possible race here now avoided via _locked() call, but incase... */ | 
|  | 185 	struct timeval now; | 
|  | 186 	struct timespec timeout; | 
|  | 187 | 
|  | 188 	gettimeofday(&now, NULL); | 
|  | 189 	timeout.tv_sec = now.tv_sec + 10; | 
|  | 190 	timeout.tv_nsec = now.tv_usec * 1000; | 
|  | 191 | 
|  | 192 	pthread_cond_timedwait(&q->result_avail_c, &q->result_m, &timeout); | 
|  | 193     } | 
|  | 194     pthread_mutex_unlock(&q->result_m); | 
|  | 195 | 
|  | 196     return r; | 
|  | 197 } | 
|  | 198 | 
|  | 199 /* | 
|  | 200  * Returns true if there are no items on the finished results queue and | 
|  | 201  * also none still pending. | 
|  | 202  */ | 
|  | 203 int t_pool_results_queue_empty(t_results_queue *q) { | 
|  | 204     int empty; | 
|  | 205 | 
|  | 206     pthread_mutex_lock(&q->result_m); | 
|  | 207     empty = q->queue_len == 0 && q->pending == 0; | 
|  | 208     pthread_mutex_unlock(&q->result_m); | 
|  | 209 | 
|  | 210     return empty; | 
|  | 211 } | 
|  | 212 | 
|  | 213 | 
|  | 214 /* | 
|  | 215  * Returns the number of completed jobs on the results queue. | 
|  | 216  */ | 
|  | 217 int t_pool_results_queue_len(t_results_queue *q) { | 
|  | 218     int len; | 
|  | 219 | 
|  | 220     pthread_mutex_lock(&q->result_m); | 
|  | 221     len = q->queue_len; | 
|  | 222     pthread_mutex_unlock(&q->result_m); | 
|  | 223 | 
|  | 224     return len; | 
|  | 225 } | 
|  | 226 | 
|  | 227 int t_pool_results_queue_sz(t_results_queue *q) { | 
|  | 228     int len; | 
|  | 229 | 
|  | 230     pthread_mutex_lock(&q->result_m); | 
|  | 231     len = q->queue_len + q->pending; | 
|  | 232     pthread_mutex_unlock(&q->result_m); | 
|  | 233 | 
|  | 234     return len; | 
|  | 235 } | 
|  | 236 | 
|  | 237 /* | 
|  | 238  * Frees a result 'r' and if free_data is true also frees | 
|  | 239  * the internal r->data result too. | 
|  | 240  */ | 
|  | 241 void t_pool_delete_result(t_pool_result *r, int free_data) { | 
|  | 242     if (!r) | 
|  | 243 	return; | 
|  | 244 | 
|  | 245     if (free_data && r->data) | 
|  | 246 	free(r->data); | 
|  | 247 | 
|  | 248     free(r); | 
|  | 249 } | 
|  | 250 | 
|  | 251 /* | 
|  | 252  * Initialises a results queue. | 
|  | 253  * | 
|  | 254  * Results queue pointer on success; | 
|  | 255  *         NULL on failure | 
|  | 256  */ | 
|  | 257 t_results_queue *t_results_queue_init(void) { | 
|  | 258     t_results_queue *q = malloc(sizeof(*q)); | 
|  | 259 | 
|  | 260     pthread_mutex_init(&q->result_m, NULL); | 
|  | 261     pthread_cond_init(&q->result_avail_c, NULL); | 
|  | 262 | 
|  | 263     q->result_head = NULL; | 
|  | 264     q->result_tail = NULL; | 
|  | 265     q->next_serial = 0; | 
|  | 266     q->curr_serial = 0; | 
|  | 267     q->queue_len   = 0; | 
|  | 268     q->pending     = 0; | 
|  | 269 | 
|  | 270     return q; | 
|  | 271 } | 
|  | 272 | 
|  | 273 /* Deallocates memory for a results queue */ | 
|  | 274 void t_results_queue_destroy(t_results_queue *q) { | 
|  | 275 #ifdef DEBUG | 
|  | 276     fprintf(stderr, "Destroying results queue %p\n", q); | 
|  | 277 #endif | 
|  | 278 | 
|  | 279     if (!q) | 
|  | 280 	return; | 
|  | 281 | 
|  | 282     pthread_mutex_destroy(&q->result_m); | 
|  | 283     pthread_cond_destroy(&q->result_avail_c); | 
|  | 284 | 
|  | 285     memset(q, 0xbb, sizeof(*q)); | 
|  | 286     free(q); | 
|  | 287 | 
|  | 288 #ifdef DEBUG | 
|  | 289     fprintf(stderr, "Destroyed results queue %p\n", q); | 
|  | 290 #endif | 
|  | 291 } | 
|  | 292 | 
|  | 293 /* ---------------------------------------------------------------------------- | 
|  | 294  * The thread pool. | 
|  | 295  */ | 
|  | 296 | 
|  | 297 #define TDIFF(t2,t1) ((t2.tv_sec-t1.tv_sec)*1000000 + t2.tv_usec-t1.tv_usec) | 
|  | 298 | 
|  | 299 /* | 
|  | 300  * A worker thread. | 
|  | 301  * | 
|  | 302  * Each thread waits for the pool to be non-empty. | 
|  | 303  * As soon as this applies, one of them succeeds in getting the lock | 
|  | 304  * and then executes the job. | 
|  | 305  */ | 
|  | 306 static void *t_pool_worker(void *arg) { | 
|  | 307     t_pool_worker_t *w = (t_pool_worker_t *)arg; | 
|  | 308     t_pool *p = w->p; | 
|  | 309     t_pool_job *j; | 
|  | 310 #ifdef DEBUG_TIME | 
|  | 311     struct timeval t1, t2, t3; | 
|  | 312 #endif | 
|  | 313 | 
|  | 314     for (;;) { | 
|  | 315 	// Pop an item off the pool queue | 
|  | 316 #ifdef DEBUG_TIME | 
|  | 317 	gettimeofday(&t1, NULL); | 
|  | 318 #endif | 
|  | 319 | 
|  | 320 	pthread_mutex_lock(&p->pool_m); | 
|  | 321 | 
|  | 322 #ifdef DEBUG_TIME | 
|  | 323 	gettimeofday(&t2, NULL); | 
|  | 324 	p->wait_time += TDIFF(t2,t1); | 
|  | 325 	w->wait_time += TDIFF(t2,t1); | 
|  | 326 #endif | 
|  | 327 | 
|  | 328 	// If there is something on the job list and a higher priority | 
|  | 329 	// thread waiting, let it handle this instead. | 
|  | 330 //	while (p->head && p->t_stack_top != -1 && p->t_stack_top < w->idx) { | 
|  | 331 //	    pthread_mutex_unlock(&p->pool_m); | 
|  | 332 //	    pthread_cond_signal(&p->t[p->t_stack_top].pending_c); | 
|  | 333 //	    pthread_mutex_lock(&p->pool_m); | 
|  | 334 //	} | 
|  | 335 | 
|  | 336 	while (!p->head && !p->shutdown) { | 
|  | 337 	    p->nwaiting++; | 
|  | 338 | 
|  | 339 	    if (p->njobs == 0) | 
|  | 340 		pthread_cond_signal(&p->empty_c); | 
|  | 341 #ifdef DEBUG_TIME | 
|  | 342 	    gettimeofday(&t2, NULL); | 
|  | 343 #endif | 
|  | 344 | 
|  | 345 #ifdef IN_ORDER | 
|  | 346 	    // Push this thread to the top of the waiting stack | 
|  | 347 	    if (p->t_stack_top == -1 || p->t_stack_top > w->idx) | 
|  | 348 		p->t_stack_top = w->idx; | 
|  | 349 | 
|  | 350 	    p->t_stack[w->idx] = 1; | 
|  | 351 	    pthread_cond_wait(&w->pending_c, &p->pool_m); | 
|  | 352 	    p->t_stack[w->idx] = 0; | 
|  | 353 | 
|  | 354 	    /* Find new t_stack_top */ | 
|  | 355 	    { | 
|  | 356 		int i; | 
|  | 357 		p->t_stack_top = -1; | 
|  | 358 		for (i = 0; i < p->tsize; i++) { | 
|  | 359 		    if (p->t_stack[i]) { | 
|  | 360 			p->t_stack_top = i; | 
|  | 361 			break; | 
|  | 362 		    } | 
|  | 363 		} | 
|  | 364 	    } | 
|  | 365 #else | 
|  | 366 	    pthread_cond_wait(&p->pending_c, &p->pool_m); | 
|  | 367 #endif | 
|  | 368 | 
|  | 369 #ifdef DEBUG_TIME | 
|  | 370 	    gettimeofday(&t3, NULL); | 
|  | 371 	    p->wait_time += TDIFF(t3,t2); | 
|  | 372 	    w->wait_time += TDIFF(t3,t2); | 
|  | 373 #endif | 
|  | 374 	    p->nwaiting--; | 
|  | 375 	} | 
|  | 376 | 
|  | 377 	if (p->shutdown) { | 
|  | 378 #ifdef DEBUG_TIME | 
|  | 379 	    p->total_time += TDIFF(t3,t1); | 
|  | 380 #endif | 
|  | 381 #ifdef DEBUG | 
|  | 382 	    fprintf(stderr, "%d: Shutting down\n", worker_id(p)); | 
|  | 383 #endif | 
|  | 384 	    pthread_mutex_unlock(&p->pool_m); | 
|  | 385 	    pthread_exit(NULL); | 
|  | 386 	} | 
|  | 387 | 
|  | 388 	j = p->head; | 
|  | 389 	if (!(p->head = j->next)) | 
|  | 390 	    p->tail = NULL; | 
|  | 391 | 
|  | 392 	if (p->njobs-- >= p->qsize) | 
|  | 393 	    pthread_cond_signal(&p->full_c); | 
|  | 394 | 
|  | 395 	if (p->njobs == 0) | 
|  | 396 	    pthread_cond_signal(&p->empty_c); | 
|  | 397 | 
|  | 398 	pthread_mutex_unlock(&p->pool_m); | 
|  | 399 | 
|  | 400 	// We have job 'j' - now execute it. | 
|  | 401 	t_pool_add_result(j, j->func(j->arg)); | 
|  | 402 #ifdef DEBUG_TIME | 
|  | 403 	pthread_mutex_lock(&p->pool_m); | 
|  | 404 	gettimeofday(&t3, NULL); | 
|  | 405 	p->total_time += TDIFF(t3,t1); | 
|  | 406 	pthread_mutex_unlock(&p->pool_m); | 
|  | 407 #endif | 
|  | 408 	memset(j, 0xbb, sizeof(*j)); | 
|  | 409 	free(j); | 
|  | 410     } | 
|  | 411 | 
|  | 412     return NULL; | 
|  | 413 } | 
|  | 414 | 
|  | 415 /* | 
|  | 416  * Creates a worker pool of length qsize with tsize worker threads. | 
|  | 417  * | 
|  | 418  * Returns pool pointer on success; | 
|  | 419  *         NULL on failure | 
|  | 420  */ | 
|  | 421 t_pool *t_pool_init(int qsize, int tsize) { | 
|  | 422     int i; | 
|  | 423     t_pool *p = malloc(sizeof(*p)); | 
|  | 424     p->qsize = qsize; | 
|  | 425     p->tsize = tsize; | 
|  | 426     p->njobs = 0; | 
|  | 427     p->nwaiting = 0; | 
|  | 428     p->shutdown = 0; | 
|  | 429     p->head = p->tail = NULL; | 
|  | 430     p->t_stack = NULL; | 
|  | 431 #ifdef DEBUG_TIME | 
|  | 432     p->total_time = p->wait_time = 0; | 
|  | 433 #endif | 
|  | 434 | 
|  | 435     p->t = malloc(tsize * sizeof(p->t[0])); | 
|  | 436 | 
|  | 437     pthread_mutex_init(&p->pool_m, NULL); | 
|  | 438     pthread_cond_init(&p->empty_c, NULL); | 
|  | 439     pthread_cond_init(&p->full_c, NULL); | 
|  | 440 | 
|  | 441     pthread_mutex_lock(&p->pool_m); | 
|  | 442 | 
|  | 443 #ifdef IN_ORDER | 
|  | 444     if (!(p->t_stack = malloc(tsize * sizeof(*p->t_stack)))) | 
|  | 445 	return NULL; | 
|  | 446     p->t_stack_top = -1; | 
|  | 447 | 
|  | 448     for (i = 0; i < tsize; i++) { | 
|  | 449 	t_pool_worker_t *w = &p->t[i]; | 
|  | 450 	p->t_stack[i] = 0; | 
|  | 451 	w->p = p; | 
|  | 452 	w->idx = i; | 
|  | 453 	w->wait_time = 0; | 
|  | 454 	pthread_cond_init(&w->pending_c, NULL); | 
|  | 455 	if (0 != pthread_create(&w->tid, NULL, t_pool_worker, w)) | 
|  | 456 	    return NULL; | 
|  | 457     } | 
|  | 458 #else | 
|  | 459     pthread_cond_init(&p->pending_c, NULL); | 
|  | 460 | 
|  | 461     for (i = 0; i < tsize; i++) { | 
|  | 462 	t_pool_worker_t *w = &p->t[i]; | 
|  | 463 	w->p = p; | 
|  | 464 	w->idx = i; | 
|  | 465 	pthread_cond_init(&w->pending_c, NULL); | 
|  | 466 	if (0 != pthread_create(&w->tid, NULL, t_pool_worker, w)) | 
|  | 467 	    return NULL; | 
|  | 468     } | 
|  | 469 #endif | 
|  | 470 | 
|  | 471     pthread_mutex_unlock(&p->pool_m); | 
|  | 472 | 
|  | 473     return p; | 
|  | 474 } | 
|  | 475 | 
|  | 476 /* | 
|  | 477  * Adds an item to the work pool. | 
|  | 478  * | 
|  | 479  * FIXME: Maybe return 1,0,-1 and distinguish between job dispathed vs | 
|  | 480  * result returned. Ie rather than blocking on full queue we're permitted | 
|  | 481  * to return early on "result available" event too. | 
|  | 482  * Caller would then have a while loop around t_pool_dispatch. | 
|  | 483  * Or, return -1 and set errno to EAGAIN to indicate job not yet submitted. | 
|  | 484  * | 
|  | 485  * Returns 0 on success | 
|  | 486  *        -1 on failure | 
|  | 487  */ | 
|  | 488 int t_pool_dispatch(t_pool *p, t_results_queue *q, | 
|  | 489 		    void *(*func)(void *arg), void *arg) { | 
|  | 490     t_pool_job *j = malloc(sizeof(*j)); | 
|  | 491 | 
|  | 492     if (!j) | 
|  | 493 	return -1; | 
|  | 494     j->func = func; | 
|  | 495     j->arg = arg; | 
|  | 496     j->next = NULL; | 
|  | 497     j->p = p; | 
|  | 498     j->q = q; | 
|  | 499     if (q) { | 
|  | 500 	pthread_mutex_lock(&q->result_m); | 
|  | 501 	j->serial = q->curr_serial++; | 
|  | 502 	q->pending++; | 
|  | 503 	pthread_mutex_unlock(&q->result_m); | 
|  | 504     } else { | 
|  | 505 	j->serial = 0; | 
|  | 506     } | 
|  | 507 | 
|  | 508 #ifdef DEBUG | 
|  | 509     fprintf(stderr, "Dispatching job %p for queue %p, serial %d\n", j, q, j->serial); | 
|  | 510 #endif | 
|  | 511 | 
|  | 512     pthread_mutex_lock(&p->pool_m); | 
|  | 513 | 
|  | 514     // Check if queue is full | 
|  | 515     while (p->njobs >= p->qsize) | 
|  | 516 	pthread_cond_wait(&p->full_c, &p->pool_m); | 
|  | 517 | 
|  | 518     p->njobs++; | 
|  | 519 | 
|  | 520     if (p->tail) { | 
|  | 521 	p->tail->next = j; | 
|  | 522 	p->tail = j; | 
|  | 523     } else { | 
|  | 524 	p->head = p->tail = j; | 
|  | 525     } | 
|  | 526 | 
|  | 527     // Let a worker know we have data. | 
|  | 528 #ifdef IN_ORDER | 
|  | 529     if (p->t_stack_top >= 0 && p->njobs > p->tsize - p->nwaiting) | 
|  | 530 	pthread_cond_signal(&p->t[p->t_stack_top].pending_c); | 
|  | 531 #else | 
|  | 532     pthread_cond_signal(&p->pending_c); | 
|  | 533 #endif | 
|  | 534     pthread_mutex_unlock(&p->pool_m); | 
|  | 535 | 
|  | 536 #ifdef DEBUG | 
|  | 537     fprintf(stderr, "Dispatched (serial %d)\n", j->serial); | 
|  | 538 #endif | 
|  | 539 | 
|  | 540     return 0; | 
|  | 541 } | 
|  | 542 | 
|  | 543 /* | 
|  | 544  * As above but optional non-block flag. | 
|  | 545  * | 
|  | 546  * nonblock  0 => block if input queue is full | 
|  | 547  * nonblock +1 => don't block if input queue is full, but do not add task | 
|  | 548  * nonblock -1 => add task regardless of whether queue is full (over-size) | 
|  | 549  */ | 
|  | 550 int t_pool_dispatch2(t_pool *p, t_results_queue *q, | 
|  | 551 		     void *(*func)(void *arg), void *arg, int nonblock) { | 
|  | 552     t_pool_job *j; | 
|  | 553 | 
|  | 554 #ifdef DEBUG | 
|  | 555     fprintf(stderr, "Dispatching job for queue %p, serial %d\n", q, q->curr_serial); | 
|  | 556 #endif | 
|  | 557 | 
|  | 558     pthread_mutex_lock(&p->pool_m); | 
|  | 559 | 
|  | 560     if (p->njobs >= p->qsize && nonblock == 1) { | 
|  | 561 	pthread_mutex_unlock(&p->pool_m); | 
|  | 562 	errno = EAGAIN; | 
|  | 563 	return -1; | 
|  | 564     } | 
|  | 565 | 
|  | 566     if (!(j = malloc(sizeof(*j)))) | 
|  | 567 	return -1; | 
|  | 568     j->func = func; | 
|  | 569     j->arg = arg; | 
|  | 570     j->next = NULL; | 
|  | 571     j->p = p; | 
|  | 572     j->q = q; | 
|  | 573     if (q) { | 
|  | 574 	pthread_mutex_lock(&q->result_m); | 
|  | 575 	j->serial = q->curr_serial; | 
|  | 576 	pthread_mutex_unlock(&q->result_m); | 
|  | 577     } else { | 
|  | 578 	j->serial = 0; | 
|  | 579     } | 
|  | 580 | 
|  | 581     if (q) { | 
|  | 582 	pthread_mutex_lock(&q->result_m); | 
|  | 583 	q->curr_serial++; | 
|  | 584 	q->pending++; | 
|  | 585 	pthread_mutex_unlock(&q->result_m); | 
|  | 586     } | 
|  | 587 | 
|  | 588     // Check if queue is full | 
|  | 589     if (nonblock == 0) | 
|  | 590 	while (p->njobs >= p->qsize) | 
|  | 591 	    pthread_cond_wait(&p->full_c, &p->pool_m); | 
|  | 592 | 
|  | 593     p->njobs++; | 
|  | 594 | 
|  | 595 //    if (q->curr_serial % 100 == 0) | 
|  | 596 //	fprintf(stderr, "p->njobs = %d    p->qsize = %d\n", p->njobs, p->qsize); | 
|  | 597 | 
|  | 598     if (p->tail) { | 
|  | 599 	p->tail->next = j; | 
|  | 600 	p->tail = j; | 
|  | 601     } else { | 
|  | 602 	p->head = p->tail = j; | 
|  | 603     } | 
|  | 604 | 
|  | 605 #ifdef DEBUG | 
|  | 606     fprintf(stderr, "Dispatched (serial %d)\n", j->serial); | 
|  | 607 #endif | 
|  | 608 | 
|  | 609     // Let a worker know we have data. | 
|  | 610 #ifdef IN_ORDER | 
|  | 611     // Keep incoming queue at 1 per running thread, so there is always | 
|  | 612     // something waiting when they end their current task.  If we go above | 
|  | 613     // this signal to start more threads (if available). This has the effect | 
|  | 614     // of concentrating jobs to fewer cores when we are I/O bound, which in | 
|  | 615     // turn benefits systems with auto CPU frequency scaling. | 
|  | 616     if (p->t_stack_top >= 0 && p->njobs > p->tsize - p->nwaiting) | 
|  | 617 	pthread_cond_signal(&p->t[p->t_stack_top].pending_c); | 
|  | 618 #else | 
|  | 619     pthread_cond_signal(&p->pending_c); | 
|  | 620 #endif | 
|  | 621 | 
|  | 622     pthread_mutex_unlock(&p->pool_m); | 
|  | 623 | 
|  | 624     return 0; | 
|  | 625 } | 
|  | 626 | 
|  | 627 /* | 
|  | 628  * Flushes the pool, but doesn't exit. This simply drains the queue and | 
|  | 629  * ensures all worker threads have finished their current task. | 
|  | 630  * | 
|  | 631  * Returns 0 on success; | 
|  | 632  *        -1 on failure | 
|  | 633  */ | 
|  | 634 int t_pool_flush(t_pool *p) { | 
|  | 635     int i; | 
|  | 636 | 
|  | 637 #ifdef DEBUG | 
|  | 638     fprintf(stderr, "Flushing pool %p\n", p); | 
|  | 639 #endif | 
|  | 640 | 
|  | 641     // Drains the queue | 
|  | 642     pthread_mutex_lock(&p->pool_m); | 
|  | 643 | 
|  | 644     // Wake up everything for the final sprint! | 
|  | 645     for (i = 0; i < p->tsize; i++) | 
|  | 646 	if (p->t_stack[i]) | 
|  | 647 	    pthread_cond_signal(&p->t[i].pending_c); | 
|  | 648 | 
|  | 649     while (p->njobs || p->nwaiting != p->tsize) | 
|  | 650 	pthread_cond_wait(&p->empty_c, &p->pool_m); | 
|  | 651 | 
|  | 652     pthread_mutex_unlock(&p->pool_m); | 
|  | 653 | 
|  | 654 #ifdef DEBUG | 
|  | 655     fprintf(stderr, "Flushed complete for pool %p, njobs=%d, nwaiting=%d\n", | 
|  | 656 	    p, p->njobs, p->nwaiting); | 
|  | 657 #endif | 
|  | 658 | 
|  | 659     return 0; | 
|  | 660 } | 
|  | 661 | 
|  | 662 /* | 
|  | 663  * Destroys a thread pool. If 'kill' is true the threads are terminated now, | 
|  | 664  * otherwise they are joined into the main thread so they will finish their | 
|  | 665  * current work load. | 
|  | 666  * | 
|  | 667  * Use t_pool_destroy(p,0) after a t_pool_flush(p) on a normal shutdown or | 
|  | 668  * t_pool_destroy(p,1) to quickly exit after a fatal error. | 
|  | 669  */ | 
|  | 670 void t_pool_destroy(t_pool *p, int kill) { | 
|  | 671     int i; | 
|  | 672 | 
|  | 673 #ifdef DEBUG | 
|  | 674     fprintf(stderr, "Destroying pool %p, kill=%d\n", p, kill); | 
|  | 675 #endif | 
|  | 676 | 
|  | 677     /* Send shutdown message to worker threads */ | 
|  | 678     if (!kill) { | 
|  | 679 	pthread_mutex_lock(&p->pool_m); | 
|  | 680 	p->shutdown = 1; | 
|  | 681 | 
|  | 682 #ifdef DEBUG | 
|  | 683 	fprintf(stderr, "Sending shutdown request\n"); | 
|  | 684 #endif | 
|  | 685 | 
|  | 686 #ifdef IN_ORDER | 
|  | 687 	for (i = 0; i < p->tsize; i++) | 
|  | 688 	    pthread_cond_signal(&p->t[i].pending_c); | 
|  | 689 #else | 
|  | 690 	pthread_cond_broadcast(&p->pending_c); | 
|  | 691 #endif | 
|  | 692 	pthread_mutex_unlock(&p->pool_m); | 
|  | 693 | 
|  | 694 #ifdef DEBUG | 
|  | 695 	fprintf(stderr, "Shutdown complete\n"); | 
|  | 696 #endif | 
|  | 697 	for (i = 0; i < p->tsize; i++) | 
|  | 698 	    pthread_join(p->t[i].tid, NULL); | 
|  | 699     } else { | 
|  | 700 	for (i = 0; i < p->tsize; i++) | 
|  | 701 	    pthread_kill(p->t[i].tid, SIGINT); | 
|  | 702     } | 
|  | 703 | 
|  | 704     pthread_mutex_destroy(&p->pool_m); | 
|  | 705     pthread_cond_destroy(&p->empty_c); | 
|  | 706     pthread_cond_destroy(&p->full_c); | 
|  | 707 #ifdef IN_ORDER | 
|  | 708     for (i = 0; i < p->tsize; i++) | 
|  | 709 	pthread_cond_destroy(&p->t[i].pending_c); | 
|  | 710 #else | 
|  | 711     pthread_cond_destroy(&p->pending_c); | 
|  | 712 #endif | 
|  | 713 | 
|  | 714 #ifdef DEBUG_TIME | 
|  | 715     fprintf(stderr, "Total time=%f\n", p->total_time / 1000000.0); | 
|  | 716     fprintf(stderr, "Wait  time=%f\n", p->wait_time  / 1000000.0); | 
|  | 717     fprintf(stderr, "%d%% utilisation\n", | 
|  | 718 	    (int)(100 - ((100.0 * p->wait_time) / p->total_time + 0.5))); | 
|  | 719     for (i = 0; i < p->tsize; i++) | 
|  | 720 	fprintf(stderr, "%d: Wait time=%f\n", i, | 
|  | 721 		p->t[i].wait_time / 1000000.0); | 
|  | 722 #endif | 
|  | 723 | 
|  | 724     if (p->t_stack) | 
|  | 725 	free(p->t_stack); | 
|  | 726 | 
|  | 727     free(p->t); | 
|  | 728     free(p); | 
|  | 729 | 
|  | 730 #ifdef DEBUG | 
|  | 731     fprintf(stderr, "Destroyed pool %p\n", p); | 
|  | 732 #endif | 
|  | 733 } | 
|  | 734 | 
|  | 735 | 
|  | 736 /*----------------------------------------------------------------------------- | 
|  | 737  * Test app. | 
|  | 738  */ | 
|  | 739 | 
|  | 740 #ifdef TEST_MAIN | 
|  | 741 | 
|  | 742 #include <stdio.h> | 
|  | 743 #include <math.h> | 
|  | 744 | 
|  | 745 void *doit(void *arg) { | 
|  | 746     int i, k, x = 0; | 
|  | 747     int job = *(int *)arg; | 
|  | 748     int *res; | 
|  | 749 | 
|  | 750     printf("Worker: execute job %d\n", job); | 
|  | 751 | 
|  | 752     usleep(random() % 1000000); // to coerce job completion out of order | 
|  | 753     if (0) { | 
|  | 754 	for (k = 0; k < 100; k++) { | 
|  | 755 	    for (i = 0; i < 100000; i++) { | 
|  | 756 		x++; | 
|  | 757 		x += x * sin(i); | 
|  | 758 		x += x * cos(x); | 
|  | 759 	    } | 
|  | 760 	} | 
|  | 761 	x *= 100; | 
|  | 762 	x += job; | 
|  | 763     } else { | 
|  | 764 	x = job*job; | 
|  | 765     } | 
|  | 766 | 
|  | 767     printf("Worker: job %d terminating, x=%d\n", job, x); | 
|  | 768 | 
|  | 769     free(arg); | 
|  | 770 | 
|  | 771     res = malloc(sizeof(*res)); | 
|  | 772     *res = x; | 
|  | 773 | 
|  | 774     return res; | 
|  | 775 } | 
|  | 776 | 
|  | 777 #define NTHREADS 8 | 
|  | 778 | 
|  | 779 int main(int argc, char **argv) { | 
|  | 780     t_pool *p = t_pool_init(NTHREADS*2, NTHREADS); | 
|  | 781     t_results_queue *q = t_results_queue_init(); | 
|  | 782     int i; | 
|  | 783     t_pool_result *r; | 
|  | 784 | 
|  | 785     // Dispatch jobs | 
|  | 786     for (i = 0; i < 20; i++) { | 
|  | 787 	int *ip = malloc(sizeof(*ip)); | 
|  | 788 	*ip = i; | 
|  | 789 	printf("Submitting %d\n", i); | 
|  | 790 	t_pool_dispatch(p, q, doit, ip); | 
|  | 791 | 
|  | 792 	// Check for results | 
|  | 793 	if ((r = t_pool_next_result(q))) { | 
|  | 794 	    printf("RESULT: %d\n", *(int *)r->data); | 
|  | 795 	    t_pool_delete_result(r, 1); | 
|  | 796 	} | 
|  | 797     } | 
|  | 798 | 
|  | 799     t_pool_flush(p); | 
|  | 800 | 
|  | 801     while ((r = t_pool_next_result(q))) { | 
|  | 802 	printf("RESULT: %d\n", *(int *)r->data); | 
|  | 803 	t_pool_delete_result(r, 1); | 
|  | 804     } | 
|  | 805 | 
|  | 806     t_pool_destroy(p, 0); | 
|  | 807     t_results_queue_destroy(q); | 
|  | 808 | 
|  | 809     return 0; | 
|  | 810 } | 
|  | 811 #endif |