Mercurial > repos > youngkim > ezbamqc
comparison ezBAMQC/src/htslib/cram/thread_pool.c @ 0:dfa3745e5fd8
Uploaded
author | youngkim |
---|---|
date | Thu, 24 Mar 2016 17:12:52 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:dfa3745e5fd8 |
---|---|
1 /* | |
2 Copyright (c) 2013 Genome Research Ltd. | |
3 Author: James Bonfield <jkb@sanger.ac.uk> | |
4 | |
5 Redistribution and use in source and binary forms, with or without | |
6 modification, are permitted provided that the following conditions are met: | |
7 | |
8 1. Redistributions of source code must retain the above copyright notice, | |
9 this list of conditions and the following disclaimer. | |
10 | |
11 2. Redistributions in binary form must reproduce the above copyright notice, | |
12 this list of conditions and the following disclaimer in the documentation | |
13 and/or other materials provided with the distribution. | |
14 | |
15 3. Neither the names Genome Research Ltd and Wellcome Trust Sanger | |
16 Institute nor the names of its contributors may be used to endorse or promote | |
17 products derived from this software without specific prior written permission. | |
18 | |
19 THIS SOFTWARE IS PROVIDED BY GENOME RESEARCH LTD AND CONTRIBUTORS "AS IS" AND | |
20 ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED | |
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE | |
22 DISCLAIMED. IN NO EVENT SHALL GENOME RESEARCH LTD OR CONTRIBUTORS BE LIABLE | |
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL | |
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR | |
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER | |
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, | |
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE | |
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | |
29 */ | |
30 | |
31 #include <stdlib.h> | |
32 | |
33 #include <signal.h> | |
34 #include <errno.h> | |
35 #include <stdio.h> | |
36 #include <string.h> | |
37 #include <sys/time.h> | |
38 #include <assert.h> | |
39 | |
40 #include "cram/thread_pool.h" | |
41 | |
42 //#define DEBUG | |
43 //#define DEBUG_TIME | |
44 | |
45 #define IN_ORDER | |
46 | |
47 #ifdef DEBUG | |
48 static int worker_id(t_pool *p) { | |
49 int i; | |
50 pthread_t s = pthread_self(); | |
51 for (i = 0; i < p->tsize; i++) { | |
52 if (pthread_equal(s, p->t[i].tid)) | |
53 return i; | |
54 } | |
55 return -1; | |
56 } | |
57 #endif | |
58 | |
59 /* ---------------------------------------------------------------------------- | |
60 * A queue to hold results from the thread pool. | |
61 * | |
62 * Each thread pool may have jobs of multiple types being queued up and | |
63 * interleaved, so we allow several results queue per pool. | |
64 * | |
65 * The jobs themselves are expected to push their results onto their | |
66 * appropriate results queue. | |
67 */ | |
68 | |
69 /* | |
70 * Adds a result to the end of the result queue. | |
71 * | |
72 * Returns 0 on success; | |
73 * -1 on failure | |
74 */ | |
75 static int t_pool_add_result(t_pool_job *j, void *data) { | |
76 t_results_queue *q = j->q; | |
77 t_pool_result *r; | |
78 | |
79 #ifdef DEBUG | |
80 fprintf(stderr, "%d: Adding resulting to queue %p, serial %d\n", | |
81 worker_id(j->p), q, j->serial); | |
82 #endif | |
83 | |
84 /* No results queue is fine if we don't want any results back */ | |
85 if (!q) | |
86 return 0; | |
87 | |
88 if (!(r = malloc(sizeof(*r)))) | |
89 return -1; | |
90 | |
91 r->next = NULL; | |
92 r->data = data; | |
93 r->serial = j->serial; | |
94 | |
95 pthread_mutex_lock(&q->result_m); | |
96 if (q->result_tail) { | |
97 q->result_tail->next = r; | |
98 q->result_tail = r; | |
99 } else { | |
100 q->result_head = q->result_tail = r; | |
101 } | |
102 q->queue_len++; | |
103 q->pending--; | |
104 | |
105 #ifdef DEBUG | |
106 fprintf(stderr, "%d: Broadcasting result_avail (id %d)\n", | |
107 worker_id(j->p), r->serial); | |
108 #endif | |
109 pthread_cond_signal(&q->result_avail_c); | |
110 #ifdef DEBUG | |
111 fprintf(stderr, "%d: Broadcast complete\n", worker_id(j->p)); | |
112 #endif | |
113 | |
114 pthread_mutex_unlock(&q->result_m); | |
115 | |
116 return 0; | |
117 } | |
118 | |
119 /* Core of t_pool_next_result() */ | |
120 static t_pool_result *t_pool_next_result_locked(t_results_queue *q) { | |
121 t_pool_result *r, *last; | |
122 | |
123 for (last = NULL, r = q->result_head; r; last = r, r = r->next) { | |
124 if (r->serial == q->next_serial) | |
125 break; | |
126 } | |
127 | |
128 if (r) { | |
129 if (q->result_head == r) | |
130 q->result_head = r->next; | |
131 else | |
132 last->next = r->next; | |
133 | |
134 if (q->result_tail == r) | |
135 q->result_tail = last; | |
136 | |
137 if (!q->result_head) | |
138 q->result_tail = NULL; | |
139 | |
140 q->next_serial++; | |
141 q->queue_len--; | |
142 } | |
143 | |
144 return r; | |
145 } | |
146 | |
147 /* | |
148 * Pulls a result off the head of the result queue. Caller should | |
149 * free it (and any internals as appropriate) after use. This doesn't | |
150 * wait for a result to be present. | |
151 * | |
152 * Results will be returned in strict order. | |
153 * | |
154 * Returns t_pool_result pointer if a result is ready. | |
155 * NULL if not. | |
156 */ | |
157 t_pool_result *t_pool_next_result(t_results_queue *q) { | |
158 t_pool_result *r; | |
159 | |
160 #ifdef DEBUG | |
161 fprintf(stderr, "Requesting next result on queue %p\n", q); | |
162 #endif | |
163 | |
164 pthread_mutex_lock(&q->result_m); | |
165 r = t_pool_next_result_locked(q); | |
166 pthread_mutex_unlock(&q->result_m); | |
167 | |
168 #ifdef DEBUG | |
169 fprintf(stderr, "(q=%p) Found %p\n", q, r); | |
170 #endif | |
171 | |
172 return r; | |
173 } | |
174 | |
175 t_pool_result *t_pool_next_result_wait(t_results_queue *q) { | |
176 t_pool_result *r; | |
177 | |
178 #ifdef DEBUG | |
179 fprintf(stderr, "Waiting for result %d...\n", q->next_serial); | |
180 #endif | |
181 | |
182 pthread_mutex_lock(&q->result_m); | |
183 while (!(r = t_pool_next_result_locked(q))) { | |
184 /* Possible race here now avoided via _locked() call, but incase... */ | |
185 struct timeval now; | |
186 struct timespec timeout; | |
187 | |
188 gettimeofday(&now, NULL); | |
189 timeout.tv_sec = now.tv_sec + 10; | |
190 timeout.tv_nsec = now.tv_usec * 1000; | |
191 | |
192 pthread_cond_timedwait(&q->result_avail_c, &q->result_m, &timeout); | |
193 } | |
194 pthread_mutex_unlock(&q->result_m); | |
195 | |
196 return r; | |
197 } | |
198 | |
199 /* | |
200 * Returns true if there are no items on the finished results queue and | |
201 * also none still pending. | |
202 */ | |
203 int t_pool_results_queue_empty(t_results_queue *q) { | |
204 int empty; | |
205 | |
206 pthread_mutex_lock(&q->result_m); | |
207 empty = q->queue_len == 0 && q->pending == 0; | |
208 pthread_mutex_unlock(&q->result_m); | |
209 | |
210 return empty; | |
211 } | |
212 | |
213 | |
214 /* | |
215 * Returns the number of completed jobs on the results queue. | |
216 */ | |
217 int t_pool_results_queue_len(t_results_queue *q) { | |
218 int len; | |
219 | |
220 pthread_mutex_lock(&q->result_m); | |
221 len = q->queue_len; | |
222 pthread_mutex_unlock(&q->result_m); | |
223 | |
224 return len; | |
225 } | |
226 | |
227 int t_pool_results_queue_sz(t_results_queue *q) { | |
228 int len; | |
229 | |
230 pthread_mutex_lock(&q->result_m); | |
231 len = q->queue_len + q->pending; | |
232 pthread_mutex_unlock(&q->result_m); | |
233 | |
234 return len; | |
235 } | |
236 | |
237 /* | |
238 * Frees a result 'r' and if free_data is true also frees | |
239 * the internal r->data result too. | |
240 */ | |
241 void t_pool_delete_result(t_pool_result *r, int free_data) { | |
242 if (!r) | |
243 return; | |
244 | |
245 if (free_data && r->data) | |
246 free(r->data); | |
247 | |
248 free(r); | |
249 } | |
250 | |
251 /* | |
252 * Initialises a results queue. | |
253 * | |
254 * Results queue pointer on success; | |
255 * NULL on failure | |
256 */ | |
257 t_results_queue *t_results_queue_init(void) { | |
258 t_results_queue *q = malloc(sizeof(*q)); | |
259 | |
260 pthread_mutex_init(&q->result_m, NULL); | |
261 pthread_cond_init(&q->result_avail_c, NULL); | |
262 | |
263 q->result_head = NULL; | |
264 q->result_tail = NULL; | |
265 q->next_serial = 0; | |
266 q->curr_serial = 0; | |
267 q->queue_len = 0; | |
268 q->pending = 0; | |
269 | |
270 return q; | |
271 } | |
272 | |
273 /* Deallocates memory for a results queue */ | |
274 void t_results_queue_destroy(t_results_queue *q) { | |
275 #ifdef DEBUG | |
276 fprintf(stderr, "Destroying results queue %p\n", q); | |
277 #endif | |
278 | |
279 if (!q) | |
280 return; | |
281 | |
282 pthread_mutex_destroy(&q->result_m); | |
283 pthread_cond_destroy(&q->result_avail_c); | |
284 | |
285 memset(q, 0xbb, sizeof(*q)); | |
286 free(q); | |
287 | |
288 #ifdef DEBUG | |
289 fprintf(stderr, "Destroyed results queue %p\n", q); | |
290 #endif | |
291 } | |
292 | |
293 /* ---------------------------------------------------------------------------- | |
294 * The thread pool. | |
295 */ | |
296 | |
297 #define TDIFF(t2,t1) ((t2.tv_sec-t1.tv_sec)*1000000 + t2.tv_usec-t1.tv_usec) | |
298 | |
299 /* | |
300 * A worker thread. | |
301 * | |
302 * Each thread waits for the pool to be non-empty. | |
303 * As soon as this applies, one of them succeeds in getting the lock | |
304 * and then executes the job. | |
305 */ | |
306 static void *t_pool_worker(void *arg) { | |
307 t_pool_worker_t *w = (t_pool_worker_t *)arg; | |
308 t_pool *p = w->p; | |
309 t_pool_job *j; | |
310 #ifdef DEBUG_TIME | |
311 struct timeval t1, t2, t3; | |
312 #endif | |
313 | |
314 for (;;) { | |
315 // Pop an item off the pool queue | |
316 #ifdef DEBUG_TIME | |
317 gettimeofday(&t1, NULL); | |
318 #endif | |
319 | |
320 pthread_mutex_lock(&p->pool_m); | |
321 | |
322 #ifdef DEBUG_TIME | |
323 gettimeofday(&t2, NULL); | |
324 p->wait_time += TDIFF(t2,t1); | |
325 w->wait_time += TDIFF(t2,t1); | |
326 #endif | |
327 | |
328 // If there is something on the job list and a higher priority | |
329 // thread waiting, let it handle this instead. | |
330 // while (p->head && p->t_stack_top != -1 && p->t_stack_top < w->idx) { | |
331 // pthread_mutex_unlock(&p->pool_m); | |
332 // pthread_cond_signal(&p->t[p->t_stack_top].pending_c); | |
333 // pthread_mutex_lock(&p->pool_m); | |
334 // } | |
335 | |
336 while (!p->head && !p->shutdown) { | |
337 p->nwaiting++; | |
338 | |
339 if (p->njobs == 0) | |
340 pthread_cond_signal(&p->empty_c); | |
341 #ifdef DEBUG_TIME | |
342 gettimeofday(&t2, NULL); | |
343 #endif | |
344 | |
345 #ifdef IN_ORDER | |
346 // Push this thread to the top of the waiting stack | |
347 if (p->t_stack_top == -1 || p->t_stack_top > w->idx) | |
348 p->t_stack_top = w->idx; | |
349 | |
350 p->t_stack[w->idx] = 1; | |
351 pthread_cond_wait(&w->pending_c, &p->pool_m); | |
352 p->t_stack[w->idx] = 0; | |
353 | |
354 /* Find new t_stack_top */ | |
355 { | |
356 int i; | |
357 p->t_stack_top = -1; | |
358 for (i = 0; i < p->tsize; i++) { | |
359 if (p->t_stack[i]) { | |
360 p->t_stack_top = i; | |
361 break; | |
362 } | |
363 } | |
364 } | |
365 #else | |
366 pthread_cond_wait(&p->pending_c, &p->pool_m); | |
367 #endif | |
368 | |
369 #ifdef DEBUG_TIME | |
370 gettimeofday(&t3, NULL); | |
371 p->wait_time += TDIFF(t3,t2); | |
372 w->wait_time += TDIFF(t3,t2); | |
373 #endif | |
374 p->nwaiting--; | |
375 } | |
376 | |
377 if (p->shutdown) { | |
378 #ifdef DEBUG_TIME | |
379 p->total_time += TDIFF(t3,t1); | |
380 #endif | |
381 #ifdef DEBUG | |
382 fprintf(stderr, "%d: Shutting down\n", worker_id(p)); | |
383 #endif | |
384 pthread_mutex_unlock(&p->pool_m); | |
385 pthread_exit(NULL); | |
386 } | |
387 | |
388 j = p->head; | |
389 if (!(p->head = j->next)) | |
390 p->tail = NULL; | |
391 | |
392 if (p->njobs-- >= p->qsize) | |
393 pthread_cond_signal(&p->full_c); | |
394 | |
395 if (p->njobs == 0) | |
396 pthread_cond_signal(&p->empty_c); | |
397 | |
398 pthread_mutex_unlock(&p->pool_m); | |
399 | |
400 // We have job 'j' - now execute it. | |
401 t_pool_add_result(j, j->func(j->arg)); | |
402 #ifdef DEBUG_TIME | |
403 pthread_mutex_lock(&p->pool_m); | |
404 gettimeofday(&t3, NULL); | |
405 p->total_time += TDIFF(t3,t1); | |
406 pthread_mutex_unlock(&p->pool_m); | |
407 #endif | |
408 memset(j, 0xbb, sizeof(*j)); | |
409 free(j); | |
410 } | |
411 | |
412 return NULL; | |
413 } | |
414 | |
415 /* | |
416 * Creates a worker pool of length qsize with tsize worker threads. | |
417 * | |
418 * Returns pool pointer on success; | |
419 * NULL on failure | |
420 */ | |
421 t_pool *t_pool_init(int qsize, int tsize) { | |
422 int i; | |
423 t_pool *p = malloc(sizeof(*p)); | |
424 p->qsize = qsize; | |
425 p->tsize = tsize; | |
426 p->njobs = 0; | |
427 p->nwaiting = 0; | |
428 p->shutdown = 0; | |
429 p->head = p->tail = NULL; | |
430 p->t_stack = NULL; | |
431 #ifdef DEBUG_TIME | |
432 p->total_time = p->wait_time = 0; | |
433 #endif | |
434 | |
435 p->t = malloc(tsize * sizeof(p->t[0])); | |
436 | |
437 pthread_mutex_init(&p->pool_m, NULL); | |
438 pthread_cond_init(&p->empty_c, NULL); | |
439 pthread_cond_init(&p->full_c, NULL); | |
440 | |
441 pthread_mutex_lock(&p->pool_m); | |
442 | |
443 #ifdef IN_ORDER | |
444 if (!(p->t_stack = malloc(tsize * sizeof(*p->t_stack)))) | |
445 return NULL; | |
446 p->t_stack_top = -1; | |
447 | |
448 for (i = 0; i < tsize; i++) { | |
449 t_pool_worker_t *w = &p->t[i]; | |
450 p->t_stack[i] = 0; | |
451 w->p = p; | |
452 w->idx = i; | |
453 w->wait_time = 0; | |
454 pthread_cond_init(&w->pending_c, NULL); | |
455 if (0 != pthread_create(&w->tid, NULL, t_pool_worker, w)) | |
456 return NULL; | |
457 } | |
458 #else | |
459 pthread_cond_init(&p->pending_c, NULL); | |
460 | |
461 for (i = 0; i < tsize; i++) { | |
462 t_pool_worker_t *w = &p->t[i]; | |
463 w->p = p; | |
464 w->idx = i; | |
465 pthread_cond_init(&w->pending_c, NULL); | |
466 if (0 != pthread_create(&w->tid, NULL, t_pool_worker, w)) | |
467 return NULL; | |
468 } | |
469 #endif | |
470 | |
471 pthread_mutex_unlock(&p->pool_m); | |
472 | |
473 return p; | |
474 } | |
475 | |
476 /* | |
477 * Adds an item to the work pool. | |
478 * | |
479 * FIXME: Maybe return 1,0,-1 and distinguish between job dispathed vs | |
480 * result returned. Ie rather than blocking on full queue we're permitted | |
481 * to return early on "result available" event too. | |
482 * Caller would then have a while loop around t_pool_dispatch. | |
483 * Or, return -1 and set errno to EAGAIN to indicate job not yet submitted. | |
484 * | |
485 * Returns 0 on success | |
486 * -1 on failure | |
487 */ | |
488 int t_pool_dispatch(t_pool *p, t_results_queue *q, | |
489 void *(*func)(void *arg), void *arg) { | |
490 t_pool_job *j = malloc(sizeof(*j)); | |
491 | |
492 if (!j) | |
493 return -1; | |
494 j->func = func; | |
495 j->arg = arg; | |
496 j->next = NULL; | |
497 j->p = p; | |
498 j->q = q; | |
499 if (q) { | |
500 pthread_mutex_lock(&q->result_m); | |
501 j->serial = q->curr_serial++; | |
502 q->pending++; | |
503 pthread_mutex_unlock(&q->result_m); | |
504 } else { | |
505 j->serial = 0; | |
506 } | |
507 | |
508 #ifdef DEBUG | |
509 fprintf(stderr, "Dispatching job %p for queue %p, serial %d\n", j, q, j->serial); | |
510 #endif | |
511 | |
512 pthread_mutex_lock(&p->pool_m); | |
513 | |
514 // Check if queue is full | |
515 while (p->njobs >= p->qsize) | |
516 pthread_cond_wait(&p->full_c, &p->pool_m); | |
517 | |
518 p->njobs++; | |
519 | |
520 if (p->tail) { | |
521 p->tail->next = j; | |
522 p->tail = j; | |
523 } else { | |
524 p->head = p->tail = j; | |
525 } | |
526 | |
527 // Let a worker know we have data. | |
528 #ifdef IN_ORDER | |
529 if (p->t_stack_top >= 0 && p->njobs > p->tsize - p->nwaiting) | |
530 pthread_cond_signal(&p->t[p->t_stack_top].pending_c); | |
531 #else | |
532 pthread_cond_signal(&p->pending_c); | |
533 #endif | |
534 pthread_mutex_unlock(&p->pool_m); | |
535 | |
536 #ifdef DEBUG | |
537 fprintf(stderr, "Dispatched (serial %d)\n", j->serial); | |
538 #endif | |
539 | |
540 return 0; | |
541 } | |
542 | |
543 /* | |
544 * As above but optional non-block flag. | |
545 * | |
546 * nonblock 0 => block if input queue is full | |
547 * nonblock +1 => don't block if input queue is full, but do not add task | |
548 * nonblock -1 => add task regardless of whether queue is full (over-size) | |
549 */ | |
550 int t_pool_dispatch2(t_pool *p, t_results_queue *q, | |
551 void *(*func)(void *arg), void *arg, int nonblock) { | |
552 t_pool_job *j; | |
553 | |
554 #ifdef DEBUG | |
555 fprintf(stderr, "Dispatching job for queue %p, serial %d\n", q, q->curr_serial); | |
556 #endif | |
557 | |
558 pthread_mutex_lock(&p->pool_m); | |
559 | |
560 if (p->njobs >= p->qsize && nonblock == 1) { | |
561 pthread_mutex_unlock(&p->pool_m); | |
562 errno = EAGAIN; | |
563 return -1; | |
564 } | |
565 | |
566 if (!(j = malloc(sizeof(*j)))) | |
567 return -1; | |
568 j->func = func; | |
569 j->arg = arg; | |
570 j->next = NULL; | |
571 j->p = p; | |
572 j->q = q; | |
573 if (q) { | |
574 pthread_mutex_lock(&q->result_m); | |
575 j->serial = q->curr_serial; | |
576 pthread_mutex_unlock(&q->result_m); | |
577 } else { | |
578 j->serial = 0; | |
579 } | |
580 | |
581 if (q) { | |
582 pthread_mutex_lock(&q->result_m); | |
583 q->curr_serial++; | |
584 q->pending++; | |
585 pthread_mutex_unlock(&q->result_m); | |
586 } | |
587 | |
588 // Check if queue is full | |
589 if (nonblock == 0) | |
590 while (p->njobs >= p->qsize) | |
591 pthread_cond_wait(&p->full_c, &p->pool_m); | |
592 | |
593 p->njobs++; | |
594 | |
595 // if (q->curr_serial % 100 == 0) | |
596 // fprintf(stderr, "p->njobs = %d p->qsize = %d\n", p->njobs, p->qsize); | |
597 | |
598 if (p->tail) { | |
599 p->tail->next = j; | |
600 p->tail = j; | |
601 } else { | |
602 p->head = p->tail = j; | |
603 } | |
604 | |
605 #ifdef DEBUG | |
606 fprintf(stderr, "Dispatched (serial %d)\n", j->serial); | |
607 #endif | |
608 | |
609 // Let a worker know we have data. | |
610 #ifdef IN_ORDER | |
611 // Keep incoming queue at 1 per running thread, so there is always | |
612 // something waiting when they end their current task. If we go above | |
613 // this signal to start more threads (if available). This has the effect | |
614 // of concentrating jobs to fewer cores when we are I/O bound, which in | |
615 // turn benefits systems with auto CPU frequency scaling. | |
616 if (p->t_stack_top >= 0 && p->njobs > p->tsize - p->nwaiting) | |
617 pthread_cond_signal(&p->t[p->t_stack_top].pending_c); | |
618 #else | |
619 pthread_cond_signal(&p->pending_c); | |
620 #endif | |
621 | |
622 pthread_mutex_unlock(&p->pool_m); | |
623 | |
624 return 0; | |
625 } | |
626 | |
627 /* | |
628 * Flushes the pool, but doesn't exit. This simply drains the queue and | |
629 * ensures all worker threads have finished their current task. | |
630 * | |
631 * Returns 0 on success; | |
632 * -1 on failure | |
633 */ | |
634 int t_pool_flush(t_pool *p) { | |
635 int i; | |
636 | |
637 #ifdef DEBUG | |
638 fprintf(stderr, "Flushing pool %p\n", p); | |
639 #endif | |
640 | |
641 // Drains the queue | |
642 pthread_mutex_lock(&p->pool_m); | |
643 | |
644 // Wake up everything for the final sprint! | |
645 for (i = 0; i < p->tsize; i++) | |
646 if (p->t_stack[i]) | |
647 pthread_cond_signal(&p->t[i].pending_c); | |
648 | |
649 while (p->njobs || p->nwaiting != p->tsize) | |
650 pthread_cond_wait(&p->empty_c, &p->pool_m); | |
651 | |
652 pthread_mutex_unlock(&p->pool_m); | |
653 | |
654 #ifdef DEBUG | |
655 fprintf(stderr, "Flushed complete for pool %p, njobs=%d, nwaiting=%d\n", | |
656 p, p->njobs, p->nwaiting); | |
657 #endif | |
658 | |
659 return 0; | |
660 } | |
661 | |
662 /* | |
663 * Destroys a thread pool. If 'kill' is true the threads are terminated now, | |
664 * otherwise they are joined into the main thread so they will finish their | |
665 * current work load. | |
666 * | |
667 * Use t_pool_destroy(p,0) after a t_pool_flush(p) on a normal shutdown or | |
668 * t_pool_destroy(p,1) to quickly exit after a fatal error. | |
669 */ | |
670 void t_pool_destroy(t_pool *p, int kill) { | |
671 int i; | |
672 | |
673 #ifdef DEBUG | |
674 fprintf(stderr, "Destroying pool %p, kill=%d\n", p, kill); | |
675 #endif | |
676 | |
677 /* Send shutdown message to worker threads */ | |
678 if (!kill) { | |
679 pthread_mutex_lock(&p->pool_m); | |
680 p->shutdown = 1; | |
681 | |
682 #ifdef DEBUG | |
683 fprintf(stderr, "Sending shutdown request\n"); | |
684 #endif | |
685 | |
686 #ifdef IN_ORDER | |
687 for (i = 0; i < p->tsize; i++) | |
688 pthread_cond_signal(&p->t[i].pending_c); | |
689 #else | |
690 pthread_cond_broadcast(&p->pending_c); | |
691 #endif | |
692 pthread_mutex_unlock(&p->pool_m); | |
693 | |
694 #ifdef DEBUG | |
695 fprintf(stderr, "Shutdown complete\n"); | |
696 #endif | |
697 for (i = 0; i < p->tsize; i++) | |
698 pthread_join(p->t[i].tid, NULL); | |
699 } else { | |
700 for (i = 0; i < p->tsize; i++) | |
701 pthread_kill(p->t[i].tid, SIGINT); | |
702 } | |
703 | |
704 pthread_mutex_destroy(&p->pool_m); | |
705 pthread_cond_destroy(&p->empty_c); | |
706 pthread_cond_destroy(&p->full_c); | |
707 #ifdef IN_ORDER | |
708 for (i = 0; i < p->tsize; i++) | |
709 pthread_cond_destroy(&p->t[i].pending_c); | |
710 #else | |
711 pthread_cond_destroy(&p->pending_c); | |
712 #endif | |
713 | |
714 #ifdef DEBUG_TIME | |
715 fprintf(stderr, "Total time=%f\n", p->total_time / 1000000.0); | |
716 fprintf(stderr, "Wait time=%f\n", p->wait_time / 1000000.0); | |
717 fprintf(stderr, "%d%% utilisation\n", | |
718 (int)(100 - ((100.0 * p->wait_time) / p->total_time + 0.5))); | |
719 for (i = 0; i < p->tsize; i++) | |
720 fprintf(stderr, "%d: Wait time=%f\n", i, | |
721 p->t[i].wait_time / 1000000.0); | |
722 #endif | |
723 | |
724 if (p->t_stack) | |
725 free(p->t_stack); | |
726 | |
727 free(p->t); | |
728 free(p); | |
729 | |
730 #ifdef DEBUG | |
731 fprintf(stderr, "Destroyed pool %p\n", p); | |
732 #endif | |
733 } | |
734 | |
735 | |
736 /*----------------------------------------------------------------------------- | |
737 * Test app. | |
738 */ | |
739 | |
740 #ifdef TEST_MAIN | |
741 | |
742 #include <stdio.h> | |
743 #include <math.h> | |
744 | |
745 void *doit(void *arg) { | |
746 int i, k, x = 0; | |
747 int job = *(int *)arg; | |
748 int *res; | |
749 | |
750 printf("Worker: execute job %d\n", job); | |
751 | |
752 usleep(random() % 1000000); // to coerce job completion out of order | |
753 if (0) { | |
754 for (k = 0; k < 100; k++) { | |
755 for (i = 0; i < 100000; i++) { | |
756 x++; | |
757 x += x * sin(i); | |
758 x += x * cos(x); | |
759 } | |
760 } | |
761 x *= 100; | |
762 x += job; | |
763 } else { | |
764 x = job*job; | |
765 } | |
766 | |
767 printf("Worker: job %d terminating, x=%d\n", job, x); | |
768 | |
769 free(arg); | |
770 | |
771 res = malloc(sizeof(*res)); | |
772 *res = x; | |
773 | |
774 return res; | |
775 } | |
776 | |
777 #define NTHREADS 8 | |
778 | |
779 int main(int argc, char **argv) { | |
780 t_pool *p = t_pool_init(NTHREADS*2, NTHREADS); | |
781 t_results_queue *q = t_results_queue_init(); | |
782 int i; | |
783 t_pool_result *r; | |
784 | |
785 // Dispatch jobs | |
786 for (i = 0; i < 20; i++) { | |
787 int *ip = malloc(sizeof(*ip)); | |
788 *ip = i; | |
789 printf("Submitting %d\n", i); | |
790 t_pool_dispatch(p, q, doit, ip); | |
791 | |
792 // Check for results | |
793 if ((r = t_pool_next_result(q))) { | |
794 printf("RESULT: %d\n", *(int *)r->data); | |
795 t_pool_delete_result(r, 1); | |
796 } | |
797 } | |
798 | |
799 t_pool_flush(p); | |
800 | |
801 while ((r = t_pool_next_result(q))) { | |
802 printf("RESULT: %d\n", *(int *)r->data); | |
803 t_pool_delete_result(r, 1); | |
804 } | |
805 | |
806 t_pool_destroy(p, 0); | |
807 t_results_queue_destroy(q); | |
808 | |
809 return 0; | |
810 } | |
811 #endif |