comparison ezBAMQC/src/htslib/cram/thread_pool.c @ 0:dfa3745e5fd8

Uploaded
author youngkim
date Thu, 24 Mar 2016 17:12:52 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:dfa3745e5fd8
1 /*
2 Copyright (c) 2013 Genome Research Ltd.
3 Author: James Bonfield <jkb@sanger.ac.uk>
4
5 Redistribution and use in source and binary forms, with or without
6 modification, are permitted provided that the following conditions are met:
7
8 1. Redistributions of source code must retain the above copyright notice,
9 this list of conditions and the following disclaimer.
10
11 2. Redistributions in binary form must reproduce the above copyright notice,
12 this list of conditions and the following disclaimer in the documentation
13 and/or other materials provided with the distribution.
14
15 3. Neither the names Genome Research Ltd and Wellcome Trust Sanger
16 Institute nor the names of its contributors may be used to endorse or promote
17 products derived from this software without specific prior written permission.
18
19 THIS SOFTWARE IS PROVIDED BY GENOME RESEARCH LTD AND CONTRIBUTORS "AS IS" AND
20 ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL GENOME RESEARCH LTD OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #include <stdlib.h>
32
33 #include <signal.h>
34 #include <errno.h>
35 #include <stdio.h>
36 #include <string.h>
37 #include <sys/time.h>
38 #include <assert.h>
39
40 #include "cram/thread_pool.h"
41
42 //#define DEBUG
43 //#define DEBUG_TIME
44
45 #define IN_ORDER
46
47 #ifdef DEBUG
48 static int worker_id(t_pool *p) {
49 int i;
50 pthread_t s = pthread_self();
51 for (i = 0; i < p->tsize; i++) {
52 if (pthread_equal(s, p->t[i].tid))
53 return i;
54 }
55 return -1;
56 }
57 #endif
58
59 /* ----------------------------------------------------------------------------
60 * A queue to hold results from the thread pool.
61 *
62 * Each thread pool may have jobs of multiple types being queued up and
63 * interleaved, so we allow several results queue per pool.
64 *
65 * The jobs themselves are expected to push their results onto their
66 * appropriate results queue.
67 */
68
69 /*
70 * Adds a result to the end of the result queue.
71 *
72 * Returns 0 on success;
73 * -1 on failure
74 */
75 static int t_pool_add_result(t_pool_job *j, void *data) {
76 t_results_queue *q = j->q;
77 t_pool_result *r;
78
79 #ifdef DEBUG
80 fprintf(stderr, "%d: Adding resulting to queue %p, serial %d\n",
81 worker_id(j->p), q, j->serial);
82 #endif
83
84 /* No results queue is fine if we don't want any results back */
85 if (!q)
86 return 0;
87
88 if (!(r = malloc(sizeof(*r))))
89 return -1;
90
91 r->next = NULL;
92 r->data = data;
93 r->serial = j->serial;
94
95 pthread_mutex_lock(&q->result_m);
96 if (q->result_tail) {
97 q->result_tail->next = r;
98 q->result_tail = r;
99 } else {
100 q->result_head = q->result_tail = r;
101 }
102 q->queue_len++;
103 q->pending--;
104
105 #ifdef DEBUG
106 fprintf(stderr, "%d: Broadcasting result_avail (id %d)\n",
107 worker_id(j->p), r->serial);
108 #endif
109 pthread_cond_signal(&q->result_avail_c);
110 #ifdef DEBUG
111 fprintf(stderr, "%d: Broadcast complete\n", worker_id(j->p));
112 #endif
113
114 pthread_mutex_unlock(&q->result_m);
115
116 return 0;
117 }
118
119 /* Core of t_pool_next_result() */
120 static t_pool_result *t_pool_next_result_locked(t_results_queue *q) {
121 t_pool_result *r, *last;
122
123 for (last = NULL, r = q->result_head; r; last = r, r = r->next) {
124 if (r->serial == q->next_serial)
125 break;
126 }
127
128 if (r) {
129 if (q->result_head == r)
130 q->result_head = r->next;
131 else
132 last->next = r->next;
133
134 if (q->result_tail == r)
135 q->result_tail = last;
136
137 if (!q->result_head)
138 q->result_tail = NULL;
139
140 q->next_serial++;
141 q->queue_len--;
142 }
143
144 return r;
145 }
146
147 /*
148 * Pulls a result off the head of the result queue. Caller should
149 * free it (and any internals as appropriate) after use. This doesn't
150 * wait for a result to be present.
151 *
152 * Results will be returned in strict order.
153 *
154 * Returns t_pool_result pointer if a result is ready.
155 * NULL if not.
156 */
157 t_pool_result *t_pool_next_result(t_results_queue *q) {
158 t_pool_result *r;
159
160 #ifdef DEBUG
161 fprintf(stderr, "Requesting next result on queue %p\n", q);
162 #endif
163
164 pthread_mutex_lock(&q->result_m);
165 r = t_pool_next_result_locked(q);
166 pthread_mutex_unlock(&q->result_m);
167
168 #ifdef DEBUG
169 fprintf(stderr, "(q=%p) Found %p\n", q, r);
170 #endif
171
172 return r;
173 }
174
175 t_pool_result *t_pool_next_result_wait(t_results_queue *q) {
176 t_pool_result *r;
177
178 #ifdef DEBUG
179 fprintf(stderr, "Waiting for result %d...\n", q->next_serial);
180 #endif
181
182 pthread_mutex_lock(&q->result_m);
183 while (!(r = t_pool_next_result_locked(q))) {
184 /* Possible race here now avoided via _locked() call, but incase... */
185 struct timeval now;
186 struct timespec timeout;
187
188 gettimeofday(&now, NULL);
189 timeout.tv_sec = now.tv_sec + 10;
190 timeout.tv_nsec = now.tv_usec * 1000;
191
192 pthread_cond_timedwait(&q->result_avail_c, &q->result_m, &timeout);
193 }
194 pthread_mutex_unlock(&q->result_m);
195
196 return r;
197 }
198
199 /*
200 * Returns true if there are no items on the finished results queue and
201 * also none still pending.
202 */
203 int t_pool_results_queue_empty(t_results_queue *q) {
204 int empty;
205
206 pthread_mutex_lock(&q->result_m);
207 empty = q->queue_len == 0 && q->pending == 0;
208 pthread_mutex_unlock(&q->result_m);
209
210 return empty;
211 }
212
213
214 /*
215 * Returns the number of completed jobs on the results queue.
216 */
217 int t_pool_results_queue_len(t_results_queue *q) {
218 int len;
219
220 pthread_mutex_lock(&q->result_m);
221 len = q->queue_len;
222 pthread_mutex_unlock(&q->result_m);
223
224 return len;
225 }
226
227 int t_pool_results_queue_sz(t_results_queue *q) {
228 int len;
229
230 pthread_mutex_lock(&q->result_m);
231 len = q->queue_len + q->pending;
232 pthread_mutex_unlock(&q->result_m);
233
234 return len;
235 }
236
237 /*
238 * Frees a result 'r' and if free_data is true also frees
239 * the internal r->data result too.
240 */
241 void t_pool_delete_result(t_pool_result *r, int free_data) {
242 if (!r)
243 return;
244
245 if (free_data && r->data)
246 free(r->data);
247
248 free(r);
249 }
250
251 /*
252 * Initialises a results queue.
253 *
254 * Results queue pointer on success;
255 * NULL on failure
256 */
257 t_results_queue *t_results_queue_init(void) {
258 t_results_queue *q = malloc(sizeof(*q));
259
260 pthread_mutex_init(&q->result_m, NULL);
261 pthread_cond_init(&q->result_avail_c, NULL);
262
263 q->result_head = NULL;
264 q->result_tail = NULL;
265 q->next_serial = 0;
266 q->curr_serial = 0;
267 q->queue_len = 0;
268 q->pending = 0;
269
270 return q;
271 }
272
273 /* Deallocates memory for a results queue */
274 void t_results_queue_destroy(t_results_queue *q) {
275 #ifdef DEBUG
276 fprintf(stderr, "Destroying results queue %p\n", q);
277 #endif
278
279 if (!q)
280 return;
281
282 pthread_mutex_destroy(&q->result_m);
283 pthread_cond_destroy(&q->result_avail_c);
284
285 memset(q, 0xbb, sizeof(*q));
286 free(q);
287
288 #ifdef DEBUG
289 fprintf(stderr, "Destroyed results queue %p\n", q);
290 #endif
291 }
292
293 /* ----------------------------------------------------------------------------
294 * The thread pool.
295 */
296
297 #define TDIFF(t2,t1) ((t2.tv_sec-t1.tv_sec)*1000000 + t2.tv_usec-t1.tv_usec)
298
299 /*
300 * A worker thread.
301 *
302 * Each thread waits for the pool to be non-empty.
303 * As soon as this applies, one of them succeeds in getting the lock
304 * and then executes the job.
305 */
306 static void *t_pool_worker(void *arg) {
307 t_pool_worker_t *w = (t_pool_worker_t *)arg;
308 t_pool *p = w->p;
309 t_pool_job *j;
310 #ifdef DEBUG_TIME
311 struct timeval t1, t2, t3;
312 #endif
313
314 for (;;) {
315 // Pop an item off the pool queue
316 #ifdef DEBUG_TIME
317 gettimeofday(&t1, NULL);
318 #endif
319
320 pthread_mutex_lock(&p->pool_m);
321
322 #ifdef DEBUG_TIME
323 gettimeofday(&t2, NULL);
324 p->wait_time += TDIFF(t2,t1);
325 w->wait_time += TDIFF(t2,t1);
326 #endif
327
328 // If there is something on the job list and a higher priority
329 // thread waiting, let it handle this instead.
330 // while (p->head && p->t_stack_top != -1 && p->t_stack_top < w->idx) {
331 // pthread_mutex_unlock(&p->pool_m);
332 // pthread_cond_signal(&p->t[p->t_stack_top].pending_c);
333 // pthread_mutex_lock(&p->pool_m);
334 // }
335
336 while (!p->head && !p->shutdown) {
337 p->nwaiting++;
338
339 if (p->njobs == 0)
340 pthread_cond_signal(&p->empty_c);
341 #ifdef DEBUG_TIME
342 gettimeofday(&t2, NULL);
343 #endif
344
345 #ifdef IN_ORDER
346 // Push this thread to the top of the waiting stack
347 if (p->t_stack_top == -1 || p->t_stack_top > w->idx)
348 p->t_stack_top = w->idx;
349
350 p->t_stack[w->idx] = 1;
351 pthread_cond_wait(&w->pending_c, &p->pool_m);
352 p->t_stack[w->idx] = 0;
353
354 /* Find new t_stack_top */
355 {
356 int i;
357 p->t_stack_top = -1;
358 for (i = 0; i < p->tsize; i++) {
359 if (p->t_stack[i]) {
360 p->t_stack_top = i;
361 break;
362 }
363 }
364 }
365 #else
366 pthread_cond_wait(&p->pending_c, &p->pool_m);
367 #endif
368
369 #ifdef DEBUG_TIME
370 gettimeofday(&t3, NULL);
371 p->wait_time += TDIFF(t3,t2);
372 w->wait_time += TDIFF(t3,t2);
373 #endif
374 p->nwaiting--;
375 }
376
377 if (p->shutdown) {
378 #ifdef DEBUG_TIME
379 p->total_time += TDIFF(t3,t1);
380 #endif
381 #ifdef DEBUG
382 fprintf(stderr, "%d: Shutting down\n", worker_id(p));
383 #endif
384 pthread_mutex_unlock(&p->pool_m);
385 pthread_exit(NULL);
386 }
387
388 j = p->head;
389 if (!(p->head = j->next))
390 p->tail = NULL;
391
392 if (p->njobs-- >= p->qsize)
393 pthread_cond_signal(&p->full_c);
394
395 if (p->njobs == 0)
396 pthread_cond_signal(&p->empty_c);
397
398 pthread_mutex_unlock(&p->pool_m);
399
400 // We have job 'j' - now execute it.
401 t_pool_add_result(j, j->func(j->arg));
402 #ifdef DEBUG_TIME
403 pthread_mutex_lock(&p->pool_m);
404 gettimeofday(&t3, NULL);
405 p->total_time += TDIFF(t3,t1);
406 pthread_mutex_unlock(&p->pool_m);
407 #endif
408 memset(j, 0xbb, sizeof(*j));
409 free(j);
410 }
411
412 return NULL;
413 }
414
415 /*
416 * Creates a worker pool of length qsize with tsize worker threads.
417 *
418 * Returns pool pointer on success;
419 * NULL on failure
420 */
421 t_pool *t_pool_init(int qsize, int tsize) {
422 int i;
423 t_pool *p = malloc(sizeof(*p));
424 p->qsize = qsize;
425 p->tsize = tsize;
426 p->njobs = 0;
427 p->nwaiting = 0;
428 p->shutdown = 0;
429 p->head = p->tail = NULL;
430 p->t_stack = NULL;
431 #ifdef DEBUG_TIME
432 p->total_time = p->wait_time = 0;
433 #endif
434
435 p->t = malloc(tsize * sizeof(p->t[0]));
436
437 pthread_mutex_init(&p->pool_m, NULL);
438 pthread_cond_init(&p->empty_c, NULL);
439 pthread_cond_init(&p->full_c, NULL);
440
441 pthread_mutex_lock(&p->pool_m);
442
443 #ifdef IN_ORDER
444 if (!(p->t_stack = malloc(tsize * sizeof(*p->t_stack))))
445 return NULL;
446 p->t_stack_top = -1;
447
448 for (i = 0; i < tsize; i++) {
449 t_pool_worker_t *w = &p->t[i];
450 p->t_stack[i] = 0;
451 w->p = p;
452 w->idx = i;
453 w->wait_time = 0;
454 pthread_cond_init(&w->pending_c, NULL);
455 if (0 != pthread_create(&w->tid, NULL, t_pool_worker, w))
456 return NULL;
457 }
458 #else
459 pthread_cond_init(&p->pending_c, NULL);
460
461 for (i = 0; i < tsize; i++) {
462 t_pool_worker_t *w = &p->t[i];
463 w->p = p;
464 w->idx = i;
465 pthread_cond_init(&w->pending_c, NULL);
466 if (0 != pthread_create(&w->tid, NULL, t_pool_worker, w))
467 return NULL;
468 }
469 #endif
470
471 pthread_mutex_unlock(&p->pool_m);
472
473 return p;
474 }
475
476 /*
477 * Adds an item to the work pool.
478 *
479 * FIXME: Maybe return 1,0,-1 and distinguish between job dispathed vs
480 * result returned. Ie rather than blocking on full queue we're permitted
481 * to return early on "result available" event too.
482 * Caller would then have a while loop around t_pool_dispatch.
483 * Or, return -1 and set errno to EAGAIN to indicate job not yet submitted.
484 *
485 * Returns 0 on success
486 * -1 on failure
487 */
488 int t_pool_dispatch(t_pool *p, t_results_queue *q,
489 void *(*func)(void *arg), void *arg) {
490 t_pool_job *j = malloc(sizeof(*j));
491
492 if (!j)
493 return -1;
494 j->func = func;
495 j->arg = arg;
496 j->next = NULL;
497 j->p = p;
498 j->q = q;
499 if (q) {
500 pthread_mutex_lock(&q->result_m);
501 j->serial = q->curr_serial++;
502 q->pending++;
503 pthread_mutex_unlock(&q->result_m);
504 } else {
505 j->serial = 0;
506 }
507
508 #ifdef DEBUG
509 fprintf(stderr, "Dispatching job %p for queue %p, serial %d\n", j, q, j->serial);
510 #endif
511
512 pthread_mutex_lock(&p->pool_m);
513
514 // Check if queue is full
515 while (p->njobs >= p->qsize)
516 pthread_cond_wait(&p->full_c, &p->pool_m);
517
518 p->njobs++;
519
520 if (p->tail) {
521 p->tail->next = j;
522 p->tail = j;
523 } else {
524 p->head = p->tail = j;
525 }
526
527 // Let a worker know we have data.
528 #ifdef IN_ORDER
529 if (p->t_stack_top >= 0 && p->njobs > p->tsize - p->nwaiting)
530 pthread_cond_signal(&p->t[p->t_stack_top].pending_c);
531 #else
532 pthread_cond_signal(&p->pending_c);
533 #endif
534 pthread_mutex_unlock(&p->pool_m);
535
536 #ifdef DEBUG
537 fprintf(stderr, "Dispatched (serial %d)\n", j->serial);
538 #endif
539
540 return 0;
541 }
542
543 /*
544 * As above but optional non-block flag.
545 *
546 * nonblock 0 => block if input queue is full
547 * nonblock +1 => don't block if input queue is full, but do not add task
548 * nonblock -1 => add task regardless of whether queue is full (over-size)
549 */
550 int t_pool_dispatch2(t_pool *p, t_results_queue *q,
551 void *(*func)(void *arg), void *arg, int nonblock) {
552 t_pool_job *j;
553
554 #ifdef DEBUG
555 fprintf(stderr, "Dispatching job for queue %p, serial %d\n", q, q->curr_serial);
556 #endif
557
558 pthread_mutex_lock(&p->pool_m);
559
560 if (p->njobs >= p->qsize && nonblock == 1) {
561 pthread_mutex_unlock(&p->pool_m);
562 errno = EAGAIN;
563 return -1;
564 }
565
566 if (!(j = malloc(sizeof(*j))))
567 return -1;
568 j->func = func;
569 j->arg = arg;
570 j->next = NULL;
571 j->p = p;
572 j->q = q;
573 if (q) {
574 pthread_mutex_lock(&q->result_m);
575 j->serial = q->curr_serial;
576 pthread_mutex_unlock(&q->result_m);
577 } else {
578 j->serial = 0;
579 }
580
581 if (q) {
582 pthread_mutex_lock(&q->result_m);
583 q->curr_serial++;
584 q->pending++;
585 pthread_mutex_unlock(&q->result_m);
586 }
587
588 // Check if queue is full
589 if (nonblock == 0)
590 while (p->njobs >= p->qsize)
591 pthread_cond_wait(&p->full_c, &p->pool_m);
592
593 p->njobs++;
594
595 // if (q->curr_serial % 100 == 0)
596 // fprintf(stderr, "p->njobs = %d p->qsize = %d\n", p->njobs, p->qsize);
597
598 if (p->tail) {
599 p->tail->next = j;
600 p->tail = j;
601 } else {
602 p->head = p->tail = j;
603 }
604
605 #ifdef DEBUG
606 fprintf(stderr, "Dispatched (serial %d)\n", j->serial);
607 #endif
608
609 // Let a worker know we have data.
610 #ifdef IN_ORDER
611 // Keep incoming queue at 1 per running thread, so there is always
612 // something waiting when they end their current task. If we go above
613 // this signal to start more threads (if available). This has the effect
614 // of concentrating jobs to fewer cores when we are I/O bound, which in
615 // turn benefits systems with auto CPU frequency scaling.
616 if (p->t_stack_top >= 0 && p->njobs > p->tsize - p->nwaiting)
617 pthread_cond_signal(&p->t[p->t_stack_top].pending_c);
618 #else
619 pthread_cond_signal(&p->pending_c);
620 #endif
621
622 pthread_mutex_unlock(&p->pool_m);
623
624 return 0;
625 }
626
627 /*
628 * Flushes the pool, but doesn't exit. This simply drains the queue and
629 * ensures all worker threads have finished their current task.
630 *
631 * Returns 0 on success;
632 * -1 on failure
633 */
634 int t_pool_flush(t_pool *p) {
635 int i;
636
637 #ifdef DEBUG
638 fprintf(stderr, "Flushing pool %p\n", p);
639 #endif
640
641 // Drains the queue
642 pthread_mutex_lock(&p->pool_m);
643
644 // Wake up everything for the final sprint!
645 for (i = 0; i < p->tsize; i++)
646 if (p->t_stack[i])
647 pthread_cond_signal(&p->t[i].pending_c);
648
649 while (p->njobs || p->nwaiting != p->tsize)
650 pthread_cond_wait(&p->empty_c, &p->pool_m);
651
652 pthread_mutex_unlock(&p->pool_m);
653
654 #ifdef DEBUG
655 fprintf(stderr, "Flushed complete for pool %p, njobs=%d, nwaiting=%d\n",
656 p, p->njobs, p->nwaiting);
657 #endif
658
659 return 0;
660 }
661
662 /*
663 * Destroys a thread pool. If 'kill' is true the threads are terminated now,
664 * otherwise they are joined into the main thread so they will finish their
665 * current work load.
666 *
667 * Use t_pool_destroy(p,0) after a t_pool_flush(p) on a normal shutdown or
668 * t_pool_destroy(p,1) to quickly exit after a fatal error.
669 */
670 void t_pool_destroy(t_pool *p, int kill) {
671 int i;
672
673 #ifdef DEBUG
674 fprintf(stderr, "Destroying pool %p, kill=%d\n", p, kill);
675 #endif
676
677 /* Send shutdown message to worker threads */
678 if (!kill) {
679 pthread_mutex_lock(&p->pool_m);
680 p->shutdown = 1;
681
682 #ifdef DEBUG
683 fprintf(stderr, "Sending shutdown request\n");
684 #endif
685
686 #ifdef IN_ORDER
687 for (i = 0; i < p->tsize; i++)
688 pthread_cond_signal(&p->t[i].pending_c);
689 #else
690 pthread_cond_broadcast(&p->pending_c);
691 #endif
692 pthread_mutex_unlock(&p->pool_m);
693
694 #ifdef DEBUG
695 fprintf(stderr, "Shutdown complete\n");
696 #endif
697 for (i = 0; i < p->tsize; i++)
698 pthread_join(p->t[i].tid, NULL);
699 } else {
700 for (i = 0; i < p->tsize; i++)
701 pthread_kill(p->t[i].tid, SIGINT);
702 }
703
704 pthread_mutex_destroy(&p->pool_m);
705 pthread_cond_destroy(&p->empty_c);
706 pthread_cond_destroy(&p->full_c);
707 #ifdef IN_ORDER
708 for (i = 0; i < p->tsize; i++)
709 pthread_cond_destroy(&p->t[i].pending_c);
710 #else
711 pthread_cond_destroy(&p->pending_c);
712 #endif
713
714 #ifdef DEBUG_TIME
715 fprintf(stderr, "Total time=%f\n", p->total_time / 1000000.0);
716 fprintf(stderr, "Wait time=%f\n", p->wait_time / 1000000.0);
717 fprintf(stderr, "%d%% utilisation\n",
718 (int)(100 - ((100.0 * p->wait_time) / p->total_time + 0.5)));
719 for (i = 0; i < p->tsize; i++)
720 fprintf(stderr, "%d: Wait time=%f\n", i,
721 p->t[i].wait_time / 1000000.0);
722 #endif
723
724 if (p->t_stack)
725 free(p->t_stack);
726
727 free(p->t);
728 free(p);
729
730 #ifdef DEBUG
731 fprintf(stderr, "Destroyed pool %p\n", p);
732 #endif
733 }
734
735
736 /*-----------------------------------------------------------------------------
737 * Test app.
738 */
739
740 #ifdef TEST_MAIN
741
742 #include <stdio.h>
743 #include <math.h>
744
745 void *doit(void *arg) {
746 int i, k, x = 0;
747 int job = *(int *)arg;
748 int *res;
749
750 printf("Worker: execute job %d\n", job);
751
752 usleep(random() % 1000000); // to coerce job completion out of order
753 if (0) {
754 for (k = 0; k < 100; k++) {
755 for (i = 0; i < 100000; i++) {
756 x++;
757 x += x * sin(i);
758 x += x * cos(x);
759 }
760 }
761 x *= 100;
762 x += job;
763 } else {
764 x = job*job;
765 }
766
767 printf("Worker: job %d terminating, x=%d\n", job, x);
768
769 free(arg);
770
771 res = malloc(sizeof(*res));
772 *res = x;
773
774 return res;
775 }
776
777 #define NTHREADS 8
778
779 int main(int argc, char **argv) {
780 t_pool *p = t_pool_init(NTHREADS*2, NTHREADS);
781 t_results_queue *q = t_results_queue_init();
782 int i;
783 t_pool_result *r;
784
785 // Dispatch jobs
786 for (i = 0; i < 20; i++) {
787 int *ip = malloc(sizeof(*ip));
788 *ip = i;
789 printf("Submitting %d\n", i);
790 t_pool_dispatch(p, q, doit, ip);
791
792 // Check for results
793 if ((r = t_pool_next_result(q))) {
794 printf("RESULT: %d\n", *(int *)r->data);
795 t_pool_delete_result(r, 1);
796 }
797 }
798
799 t_pool_flush(p);
800
801 while ((r = t_pool_next_result(q))) {
802 printf("RESULT: %d\n", *(int *)r->data);
803 t_pool_delete_result(r, 1);
804 }
805
806 t_pool_destroy(p, 0);
807 t_results_queue_destroy(q);
808
809 return 0;
810 }
811 #endif