0
|
1 /*
|
|
2 Copyright (c) 2013 Genome Research Ltd.
|
|
3 Author: James Bonfield <jkb@sanger.ac.uk>
|
|
4
|
|
5 Redistribution and use in source and binary forms, with or without
|
|
6 modification, are permitted provided that the following conditions are met:
|
|
7
|
|
8 1. Redistributions of source code must retain the above copyright notice,
|
|
9 this list of conditions and the following disclaimer.
|
|
10
|
|
11 2. Redistributions in binary form must reproduce the above copyright notice,
|
|
12 this list of conditions and the following disclaimer in the documentation
|
|
13 and/or other materials provided with the distribution.
|
|
14
|
|
15 3. Neither the names Genome Research Ltd and Wellcome Trust Sanger
|
|
16 Institute nor the names of its contributors may be used to endorse or promote
|
|
17 products derived from this software without specific prior written permission.
|
|
18
|
|
19 THIS SOFTWARE IS PROVIDED BY GENOME RESEARCH LTD AND CONTRIBUTORS "AS IS" AND
|
|
20 ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
|
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
|
22 DISCLAIMED. IN NO EVENT SHALL GENOME RESEARCH LTD OR CONTRIBUTORS BE LIABLE
|
|
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
|
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
|
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
|
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
|
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
|
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
29 */
|
|
30
|
|
31 #include <stdlib.h>
|
|
32
|
|
33 #include <signal.h>
|
|
34 #include <errno.h>
|
|
35 #include <stdio.h>
|
|
36 #include <string.h>
|
|
37 #include <sys/time.h>
|
|
38 #include <assert.h>
|
|
39
|
|
40 #include "cram/thread_pool.h"
|
|
41
|
|
42 //#define DEBUG
|
|
43 //#define DEBUG_TIME
|
|
44
|
|
45 #define IN_ORDER
|
|
46
|
|
47 #ifdef DEBUG
|
|
48 static int worker_id(t_pool *p) {
|
|
49 int i;
|
|
50 pthread_t s = pthread_self();
|
|
51 for (i = 0; i < p->tsize; i++) {
|
|
52 if (pthread_equal(s, p->t[i].tid))
|
|
53 return i;
|
|
54 }
|
|
55 return -1;
|
|
56 }
|
|
57 #endif
|
|
58
|
|
59 /* ----------------------------------------------------------------------------
|
|
60 * A queue to hold results from the thread pool.
|
|
61 *
|
|
62 * Each thread pool may have jobs of multiple types being queued up and
|
|
63 * interleaved, so we allow several results queue per pool.
|
|
64 *
|
|
65 * The jobs themselves are expected to push their results onto their
|
|
66 * appropriate results queue.
|
|
67 */
|
|
68
|
|
69 /*
|
|
70 * Adds a result to the end of the result queue.
|
|
71 *
|
|
72 * Returns 0 on success;
|
|
73 * -1 on failure
|
|
74 */
|
|
75 static int t_pool_add_result(t_pool_job *j, void *data) {
|
|
76 t_results_queue *q = j->q;
|
|
77 t_pool_result *r;
|
|
78
|
|
79 #ifdef DEBUG
|
|
80 fprintf(stderr, "%d: Adding resulting to queue %p, serial %d\n",
|
|
81 worker_id(j->p), q, j->serial);
|
|
82 #endif
|
|
83
|
|
84 /* No results queue is fine if we don't want any results back */
|
|
85 if (!q)
|
|
86 return 0;
|
|
87
|
|
88 if (!(r = malloc(sizeof(*r))))
|
|
89 return -1;
|
|
90
|
|
91 r->next = NULL;
|
|
92 r->data = data;
|
|
93 r->serial = j->serial;
|
|
94
|
|
95 pthread_mutex_lock(&q->result_m);
|
|
96 if (q->result_tail) {
|
|
97 q->result_tail->next = r;
|
|
98 q->result_tail = r;
|
|
99 } else {
|
|
100 q->result_head = q->result_tail = r;
|
|
101 }
|
|
102 q->queue_len++;
|
|
103 q->pending--;
|
|
104
|
|
105 #ifdef DEBUG
|
|
106 fprintf(stderr, "%d: Broadcasting result_avail (id %d)\n",
|
|
107 worker_id(j->p), r->serial);
|
|
108 #endif
|
|
109 pthread_cond_signal(&q->result_avail_c);
|
|
110 #ifdef DEBUG
|
|
111 fprintf(stderr, "%d: Broadcast complete\n", worker_id(j->p));
|
|
112 #endif
|
|
113
|
|
114 pthread_mutex_unlock(&q->result_m);
|
|
115
|
|
116 return 0;
|
|
117 }
|
|
118
|
|
119 /* Core of t_pool_next_result() */
|
|
120 static t_pool_result *t_pool_next_result_locked(t_results_queue *q) {
|
|
121 t_pool_result *r, *last;
|
|
122
|
|
123 for (last = NULL, r = q->result_head; r; last = r, r = r->next) {
|
|
124 if (r->serial == q->next_serial)
|
|
125 break;
|
|
126 }
|
|
127
|
|
128 if (r) {
|
|
129 if (q->result_head == r)
|
|
130 q->result_head = r->next;
|
|
131 else
|
|
132 last->next = r->next;
|
|
133
|
|
134 if (q->result_tail == r)
|
|
135 q->result_tail = last;
|
|
136
|
|
137 if (!q->result_head)
|
|
138 q->result_tail = NULL;
|
|
139
|
|
140 q->next_serial++;
|
|
141 q->queue_len--;
|
|
142 }
|
|
143
|
|
144 return r;
|
|
145 }
|
|
146
|
|
147 /*
|
|
148 * Pulls a result off the head of the result queue. Caller should
|
|
149 * free it (and any internals as appropriate) after use. This doesn't
|
|
150 * wait for a result to be present.
|
|
151 *
|
|
152 * Results will be returned in strict order.
|
|
153 *
|
|
154 * Returns t_pool_result pointer if a result is ready.
|
|
155 * NULL if not.
|
|
156 */
|
|
157 t_pool_result *t_pool_next_result(t_results_queue *q) {
|
|
158 t_pool_result *r;
|
|
159
|
|
160 #ifdef DEBUG
|
|
161 fprintf(stderr, "Requesting next result on queue %p\n", q);
|
|
162 #endif
|
|
163
|
|
164 pthread_mutex_lock(&q->result_m);
|
|
165 r = t_pool_next_result_locked(q);
|
|
166 pthread_mutex_unlock(&q->result_m);
|
|
167
|
|
168 #ifdef DEBUG
|
|
169 fprintf(stderr, "(q=%p) Found %p\n", q, r);
|
|
170 #endif
|
|
171
|
|
172 return r;
|
|
173 }
|
|
174
|
|
175 t_pool_result *t_pool_next_result_wait(t_results_queue *q) {
|
|
176 t_pool_result *r;
|
|
177
|
|
178 #ifdef DEBUG
|
|
179 fprintf(stderr, "Waiting for result %d...\n", q->next_serial);
|
|
180 #endif
|
|
181
|
|
182 pthread_mutex_lock(&q->result_m);
|
|
183 while (!(r = t_pool_next_result_locked(q))) {
|
|
184 /* Possible race here now avoided via _locked() call, but incase... */
|
|
185 struct timeval now;
|
|
186 struct timespec timeout;
|
|
187
|
|
188 gettimeofday(&now, NULL);
|
|
189 timeout.tv_sec = now.tv_sec + 10;
|
|
190 timeout.tv_nsec = now.tv_usec * 1000;
|
|
191
|
|
192 pthread_cond_timedwait(&q->result_avail_c, &q->result_m, &timeout);
|
|
193 }
|
|
194 pthread_mutex_unlock(&q->result_m);
|
|
195
|
|
196 return r;
|
|
197 }
|
|
198
|
|
199 /*
|
|
200 * Returns true if there are no items on the finished results queue and
|
|
201 * also none still pending.
|
|
202 */
|
|
203 int t_pool_results_queue_empty(t_results_queue *q) {
|
|
204 int empty;
|
|
205
|
|
206 pthread_mutex_lock(&q->result_m);
|
|
207 empty = q->queue_len == 0 && q->pending == 0;
|
|
208 pthread_mutex_unlock(&q->result_m);
|
|
209
|
|
210 return empty;
|
|
211 }
|
|
212
|
|
213
|
|
214 /*
|
|
215 * Returns the number of completed jobs on the results queue.
|
|
216 */
|
|
217 int t_pool_results_queue_len(t_results_queue *q) {
|
|
218 int len;
|
|
219
|
|
220 pthread_mutex_lock(&q->result_m);
|
|
221 len = q->queue_len;
|
|
222 pthread_mutex_unlock(&q->result_m);
|
|
223
|
|
224 return len;
|
|
225 }
|
|
226
|
|
227 int t_pool_results_queue_sz(t_results_queue *q) {
|
|
228 int len;
|
|
229
|
|
230 pthread_mutex_lock(&q->result_m);
|
|
231 len = q->queue_len + q->pending;
|
|
232 pthread_mutex_unlock(&q->result_m);
|
|
233
|
|
234 return len;
|
|
235 }
|
|
236
|
|
237 /*
|
|
238 * Frees a result 'r' and if free_data is true also frees
|
|
239 * the internal r->data result too.
|
|
240 */
|
|
241 void t_pool_delete_result(t_pool_result *r, int free_data) {
|
|
242 if (!r)
|
|
243 return;
|
|
244
|
|
245 if (free_data && r->data)
|
|
246 free(r->data);
|
|
247
|
|
248 free(r);
|
|
249 }
|
|
250
|
|
251 /*
|
|
252 * Initialises a results queue.
|
|
253 *
|
|
254 * Results queue pointer on success;
|
|
255 * NULL on failure
|
|
256 */
|
|
257 t_results_queue *t_results_queue_init(void) {
|
|
258 t_results_queue *q = malloc(sizeof(*q));
|
|
259
|
|
260 pthread_mutex_init(&q->result_m, NULL);
|
|
261 pthread_cond_init(&q->result_avail_c, NULL);
|
|
262
|
|
263 q->result_head = NULL;
|
|
264 q->result_tail = NULL;
|
|
265 q->next_serial = 0;
|
|
266 q->curr_serial = 0;
|
|
267 q->queue_len = 0;
|
|
268 q->pending = 0;
|
|
269
|
|
270 return q;
|
|
271 }
|
|
272
|
|
273 /* Deallocates memory for a results queue */
|
|
274 void t_results_queue_destroy(t_results_queue *q) {
|
|
275 #ifdef DEBUG
|
|
276 fprintf(stderr, "Destroying results queue %p\n", q);
|
|
277 #endif
|
|
278
|
|
279 if (!q)
|
|
280 return;
|
|
281
|
|
282 pthread_mutex_destroy(&q->result_m);
|
|
283 pthread_cond_destroy(&q->result_avail_c);
|
|
284
|
|
285 memset(q, 0xbb, sizeof(*q));
|
|
286 free(q);
|
|
287
|
|
288 #ifdef DEBUG
|
|
289 fprintf(stderr, "Destroyed results queue %p\n", q);
|
|
290 #endif
|
|
291 }
|
|
292
|
|
293 /* ----------------------------------------------------------------------------
|
|
294 * The thread pool.
|
|
295 */
|
|
296
|
|
297 #define TDIFF(t2,t1) ((t2.tv_sec-t1.tv_sec)*1000000 + t2.tv_usec-t1.tv_usec)
|
|
298
|
|
299 /*
|
|
300 * A worker thread.
|
|
301 *
|
|
302 * Each thread waits for the pool to be non-empty.
|
|
303 * As soon as this applies, one of them succeeds in getting the lock
|
|
304 * and then executes the job.
|
|
305 */
|
|
306 static void *t_pool_worker(void *arg) {
|
|
307 t_pool_worker_t *w = (t_pool_worker_t *)arg;
|
|
308 t_pool *p = w->p;
|
|
309 t_pool_job *j;
|
|
310 #ifdef DEBUG_TIME
|
|
311 struct timeval t1, t2, t3;
|
|
312 #endif
|
|
313
|
|
314 for (;;) {
|
|
315 // Pop an item off the pool queue
|
|
316 #ifdef DEBUG_TIME
|
|
317 gettimeofday(&t1, NULL);
|
|
318 #endif
|
|
319
|
|
320 pthread_mutex_lock(&p->pool_m);
|
|
321
|
|
322 #ifdef DEBUG_TIME
|
|
323 gettimeofday(&t2, NULL);
|
|
324 p->wait_time += TDIFF(t2,t1);
|
|
325 w->wait_time += TDIFF(t2,t1);
|
|
326 #endif
|
|
327
|
|
328 // If there is something on the job list and a higher priority
|
|
329 // thread waiting, let it handle this instead.
|
|
330 // while (p->head && p->t_stack_top != -1 && p->t_stack_top < w->idx) {
|
|
331 // pthread_mutex_unlock(&p->pool_m);
|
|
332 // pthread_cond_signal(&p->t[p->t_stack_top].pending_c);
|
|
333 // pthread_mutex_lock(&p->pool_m);
|
|
334 // }
|
|
335
|
|
336 while (!p->head && !p->shutdown) {
|
|
337 p->nwaiting++;
|
|
338
|
|
339 if (p->njobs == 0)
|
|
340 pthread_cond_signal(&p->empty_c);
|
|
341 #ifdef DEBUG_TIME
|
|
342 gettimeofday(&t2, NULL);
|
|
343 #endif
|
|
344
|
|
345 #ifdef IN_ORDER
|
|
346 // Push this thread to the top of the waiting stack
|
|
347 if (p->t_stack_top == -1 || p->t_stack_top > w->idx)
|
|
348 p->t_stack_top = w->idx;
|
|
349
|
|
350 p->t_stack[w->idx] = 1;
|
|
351 pthread_cond_wait(&w->pending_c, &p->pool_m);
|
|
352 p->t_stack[w->idx] = 0;
|
|
353
|
|
354 /* Find new t_stack_top */
|
|
355 {
|
|
356 int i;
|
|
357 p->t_stack_top = -1;
|
|
358 for (i = 0; i < p->tsize; i++) {
|
|
359 if (p->t_stack[i]) {
|
|
360 p->t_stack_top = i;
|
|
361 break;
|
|
362 }
|
|
363 }
|
|
364 }
|
|
365 #else
|
|
366 pthread_cond_wait(&p->pending_c, &p->pool_m);
|
|
367 #endif
|
|
368
|
|
369 #ifdef DEBUG_TIME
|
|
370 gettimeofday(&t3, NULL);
|
|
371 p->wait_time += TDIFF(t3,t2);
|
|
372 w->wait_time += TDIFF(t3,t2);
|
|
373 #endif
|
|
374 p->nwaiting--;
|
|
375 }
|
|
376
|
|
377 if (p->shutdown) {
|
|
378 #ifdef DEBUG_TIME
|
|
379 p->total_time += TDIFF(t3,t1);
|
|
380 #endif
|
|
381 #ifdef DEBUG
|
|
382 fprintf(stderr, "%d: Shutting down\n", worker_id(p));
|
|
383 #endif
|
|
384 pthread_mutex_unlock(&p->pool_m);
|
|
385 pthread_exit(NULL);
|
|
386 }
|
|
387
|
|
388 j = p->head;
|
|
389 if (!(p->head = j->next))
|
|
390 p->tail = NULL;
|
|
391
|
|
392 if (p->njobs-- >= p->qsize)
|
|
393 pthread_cond_signal(&p->full_c);
|
|
394
|
|
395 if (p->njobs == 0)
|
|
396 pthread_cond_signal(&p->empty_c);
|
|
397
|
|
398 pthread_mutex_unlock(&p->pool_m);
|
|
399
|
|
400 // We have job 'j' - now execute it.
|
|
401 t_pool_add_result(j, j->func(j->arg));
|
|
402 #ifdef DEBUG_TIME
|
|
403 pthread_mutex_lock(&p->pool_m);
|
|
404 gettimeofday(&t3, NULL);
|
|
405 p->total_time += TDIFF(t3,t1);
|
|
406 pthread_mutex_unlock(&p->pool_m);
|
|
407 #endif
|
|
408 memset(j, 0xbb, sizeof(*j));
|
|
409 free(j);
|
|
410 }
|
|
411
|
|
412 return NULL;
|
|
413 }
|
|
414
|
|
415 /*
|
|
416 * Creates a worker pool of length qsize with tsize worker threads.
|
|
417 *
|
|
418 * Returns pool pointer on success;
|
|
419 * NULL on failure
|
|
420 */
|
|
421 t_pool *t_pool_init(int qsize, int tsize) {
|
|
422 int i;
|
|
423 t_pool *p = malloc(sizeof(*p));
|
|
424 p->qsize = qsize;
|
|
425 p->tsize = tsize;
|
|
426 p->njobs = 0;
|
|
427 p->nwaiting = 0;
|
|
428 p->shutdown = 0;
|
|
429 p->head = p->tail = NULL;
|
|
430 p->t_stack = NULL;
|
|
431 #ifdef DEBUG_TIME
|
|
432 p->total_time = p->wait_time = 0;
|
|
433 #endif
|
|
434
|
|
435 p->t = malloc(tsize * sizeof(p->t[0]));
|
|
436
|
|
437 pthread_mutex_init(&p->pool_m, NULL);
|
|
438 pthread_cond_init(&p->empty_c, NULL);
|
|
439 pthread_cond_init(&p->full_c, NULL);
|
|
440
|
|
441 pthread_mutex_lock(&p->pool_m);
|
|
442
|
|
443 #ifdef IN_ORDER
|
|
444 if (!(p->t_stack = malloc(tsize * sizeof(*p->t_stack))))
|
|
445 return NULL;
|
|
446 p->t_stack_top = -1;
|
|
447
|
|
448 for (i = 0; i < tsize; i++) {
|
|
449 t_pool_worker_t *w = &p->t[i];
|
|
450 p->t_stack[i] = 0;
|
|
451 w->p = p;
|
|
452 w->idx = i;
|
|
453 w->wait_time = 0;
|
|
454 pthread_cond_init(&w->pending_c, NULL);
|
|
455 if (0 != pthread_create(&w->tid, NULL, t_pool_worker, w))
|
|
456 return NULL;
|
|
457 }
|
|
458 #else
|
|
459 pthread_cond_init(&p->pending_c, NULL);
|
|
460
|
|
461 for (i = 0; i < tsize; i++) {
|
|
462 t_pool_worker_t *w = &p->t[i];
|
|
463 w->p = p;
|
|
464 w->idx = i;
|
|
465 pthread_cond_init(&w->pending_c, NULL);
|
|
466 if (0 != pthread_create(&w->tid, NULL, t_pool_worker, w))
|
|
467 return NULL;
|
|
468 }
|
|
469 #endif
|
|
470
|
|
471 pthread_mutex_unlock(&p->pool_m);
|
|
472
|
|
473 return p;
|
|
474 }
|
|
475
|
|
476 /*
|
|
477 * Adds an item to the work pool.
|
|
478 *
|
|
479 * FIXME: Maybe return 1,0,-1 and distinguish between job dispathed vs
|
|
480 * result returned. Ie rather than blocking on full queue we're permitted
|
|
481 * to return early on "result available" event too.
|
|
482 * Caller would then have a while loop around t_pool_dispatch.
|
|
483 * Or, return -1 and set errno to EAGAIN to indicate job not yet submitted.
|
|
484 *
|
|
485 * Returns 0 on success
|
|
486 * -1 on failure
|
|
487 */
|
|
488 int t_pool_dispatch(t_pool *p, t_results_queue *q,
|
|
489 void *(*func)(void *arg), void *arg) {
|
|
490 t_pool_job *j = malloc(sizeof(*j));
|
|
491
|
|
492 if (!j)
|
|
493 return -1;
|
|
494 j->func = func;
|
|
495 j->arg = arg;
|
|
496 j->next = NULL;
|
|
497 j->p = p;
|
|
498 j->q = q;
|
|
499 if (q) {
|
|
500 pthread_mutex_lock(&q->result_m);
|
|
501 j->serial = q->curr_serial++;
|
|
502 q->pending++;
|
|
503 pthread_mutex_unlock(&q->result_m);
|
|
504 } else {
|
|
505 j->serial = 0;
|
|
506 }
|
|
507
|
|
508 #ifdef DEBUG
|
|
509 fprintf(stderr, "Dispatching job %p for queue %p, serial %d\n", j, q, j->serial);
|
|
510 #endif
|
|
511
|
|
512 pthread_mutex_lock(&p->pool_m);
|
|
513
|
|
514 // Check if queue is full
|
|
515 while (p->njobs >= p->qsize)
|
|
516 pthread_cond_wait(&p->full_c, &p->pool_m);
|
|
517
|
|
518 p->njobs++;
|
|
519
|
|
520 if (p->tail) {
|
|
521 p->tail->next = j;
|
|
522 p->tail = j;
|
|
523 } else {
|
|
524 p->head = p->tail = j;
|
|
525 }
|
|
526
|
|
527 // Let a worker know we have data.
|
|
528 #ifdef IN_ORDER
|
|
529 if (p->t_stack_top >= 0 && p->njobs > p->tsize - p->nwaiting)
|
|
530 pthread_cond_signal(&p->t[p->t_stack_top].pending_c);
|
|
531 #else
|
|
532 pthread_cond_signal(&p->pending_c);
|
|
533 #endif
|
|
534 pthread_mutex_unlock(&p->pool_m);
|
|
535
|
|
536 #ifdef DEBUG
|
|
537 fprintf(stderr, "Dispatched (serial %d)\n", j->serial);
|
|
538 #endif
|
|
539
|
|
540 return 0;
|
|
541 }
|
|
542
|
|
543 /*
|
|
544 * As above but optional non-block flag.
|
|
545 *
|
|
546 * nonblock 0 => block if input queue is full
|
|
547 * nonblock +1 => don't block if input queue is full, but do not add task
|
|
548 * nonblock -1 => add task regardless of whether queue is full (over-size)
|
|
549 */
|
|
550 int t_pool_dispatch2(t_pool *p, t_results_queue *q,
|
|
551 void *(*func)(void *arg), void *arg, int nonblock) {
|
|
552 t_pool_job *j;
|
|
553
|
|
554 #ifdef DEBUG
|
|
555 fprintf(stderr, "Dispatching job for queue %p, serial %d\n", q, q->curr_serial);
|
|
556 #endif
|
|
557
|
|
558 pthread_mutex_lock(&p->pool_m);
|
|
559
|
|
560 if (p->njobs >= p->qsize && nonblock == 1) {
|
|
561 pthread_mutex_unlock(&p->pool_m);
|
|
562 errno = EAGAIN;
|
|
563 return -1;
|
|
564 }
|
|
565
|
|
566 if (!(j = malloc(sizeof(*j))))
|
|
567 return -1;
|
|
568 j->func = func;
|
|
569 j->arg = arg;
|
|
570 j->next = NULL;
|
|
571 j->p = p;
|
|
572 j->q = q;
|
|
573 if (q) {
|
|
574 pthread_mutex_lock(&q->result_m);
|
|
575 j->serial = q->curr_serial;
|
|
576 pthread_mutex_unlock(&q->result_m);
|
|
577 } else {
|
|
578 j->serial = 0;
|
|
579 }
|
|
580
|
|
581 if (q) {
|
|
582 pthread_mutex_lock(&q->result_m);
|
|
583 q->curr_serial++;
|
|
584 q->pending++;
|
|
585 pthread_mutex_unlock(&q->result_m);
|
|
586 }
|
|
587
|
|
588 // Check if queue is full
|
|
589 if (nonblock == 0)
|
|
590 while (p->njobs >= p->qsize)
|
|
591 pthread_cond_wait(&p->full_c, &p->pool_m);
|
|
592
|
|
593 p->njobs++;
|
|
594
|
|
595 // if (q->curr_serial % 100 == 0)
|
|
596 // fprintf(stderr, "p->njobs = %d p->qsize = %d\n", p->njobs, p->qsize);
|
|
597
|
|
598 if (p->tail) {
|
|
599 p->tail->next = j;
|
|
600 p->tail = j;
|
|
601 } else {
|
|
602 p->head = p->tail = j;
|
|
603 }
|
|
604
|
|
605 #ifdef DEBUG
|
|
606 fprintf(stderr, "Dispatched (serial %d)\n", j->serial);
|
|
607 #endif
|
|
608
|
|
609 // Let a worker know we have data.
|
|
610 #ifdef IN_ORDER
|
|
611 // Keep incoming queue at 1 per running thread, so there is always
|
|
612 // something waiting when they end their current task. If we go above
|
|
613 // this signal to start more threads (if available). This has the effect
|
|
614 // of concentrating jobs to fewer cores when we are I/O bound, which in
|
|
615 // turn benefits systems with auto CPU frequency scaling.
|
|
616 if (p->t_stack_top >= 0 && p->njobs > p->tsize - p->nwaiting)
|
|
617 pthread_cond_signal(&p->t[p->t_stack_top].pending_c);
|
|
618 #else
|
|
619 pthread_cond_signal(&p->pending_c);
|
|
620 #endif
|
|
621
|
|
622 pthread_mutex_unlock(&p->pool_m);
|
|
623
|
|
624 return 0;
|
|
625 }
|
|
626
|
|
627 /*
|
|
628 * Flushes the pool, but doesn't exit. This simply drains the queue and
|
|
629 * ensures all worker threads have finished their current task.
|
|
630 *
|
|
631 * Returns 0 on success;
|
|
632 * -1 on failure
|
|
633 */
|
|
634 int t_pool_flush(t_pool *p) {
|
|
635 int i;
|
|
636
|
|
637 #ifdef DEBUG
|
|
638 fprintf(stderr, "Flushing pool %p\n", p);
|
|
639 #endif
|
|
640
|
|
641 // Drains the queue
|
|
642 pthread_mutex_lock(&p->pool_m);
|
|
643
|
|
644 // Wake up everything for the final sprint!
|
|
645 for (i = 0; i < p->tsize; i++)
|
|
646 if (p->t_stack[i])
|
|
647 pthread_cond_signal(&p->t[i].pending_c);
|
|
648
|
|
649 while (p->njobs || p->nwaiting != p->tsize)
|
|
650 pthread_cond_wait(&p->empty_c, &p->pool_m);
|
|
651
|
|
652 pthread_mutex_unlock(&p->pool_m);
|
|
653
|
|
654 #ifdef DEBUG
|
|
655 fprintf(stderr, "Flushed complete for pool %p, njobs=%d, nwaiting=%d\n",
|
|
656 p, p->njobs, p->nwaiting);
|
|
657 #endif
|
|
658
|
|
659 return 0;
|
|
660 }
|
|
661
|
|
662 /*
|
|
663 * Destroys a thread pool. If 'kill' is true the threads are terminated now,
|
|
664 * otherwise they are joined into the main thread so they will finish their
|
|
665 * current work load.
|
|
666 *
|
|
667 * Use t_pool_destroy(p,0) after a t_pool_flush(p) on a normal shutdown or
|
|
668 * t_pool_destroy(p,1) to quickly exit after a fatal error.
|
|
669 */
|
|
670 void t_pool_destroy(t_pool *p, int kill) {
|
|
671 int i;
|
|
672
|
|
673 #ifdef DEBUG
|
|
674 fprintf(stderr, "Destroying pool %p, kill=%d\n", p, kill);
|
|
675 #endif
|
|
676
|
|
677 /* Send shutdown message to worker threads */
|
|
678 if (!kill) {
|
|
679 pthread_mutex_lock(&p->pool_m);
|
|
680 p->shutdown = 1;
|
|
681
|
|
682 #ifdef DEBUG
|
|
683 fprintf(stderr, "Sending shutdown request\n");
|
|
684 #endif
|
|
685
|
|
686 #ifdef IN_ORDER
|
|
687 for (i = 0; i < p->tsize; i++)
|
|
688 pthread_cond_signal(&p->t[i].pending_c);
|
|
689 #else
|
|
690 pthread_cond_broadcast(&p->pending_c);
|
|
691 #endif
|
|
692 pthread_mutex_unlock(&p->pool_m);
|
|
693
|
|
694 #ifdef DEBUG
|
|
695 fprintf(stderr, "Shutdown complete\n");
|
|
696 #endif
|
|
697 for (i = 0; i < p->tsize; i++)
|
|
698 pthread_join(p->t[i].tid, NULL);
|
|
699 } else {
|
|
700 for (i = 0; i < p->tsize; i++)
|
|
701 pthread_kill(p->t[i].tid, SIGINT);
|
|
702 }
|
|
703
|
|
704 pthread_mutex_destroy(&p->pool_m);
|
|
705 pthread_cond_destroy(&p->empty_c);
|
|
706 pthread_cond_destroy(&p->full_c);
|
|
707 #ifdef IN_ORDER
|
|
708 for (i = 0; i < p->tsize; i++)
|
|
709 pthread_cond_destroy(&p->t[i].pending_c);
|
|
710 #else
|
|
711 pthread_cond_destroy(&p->pending_c);
|
|
712 #endif
|
|
713
|
|
714 #ifdef DEBUG_TIME
|
|
715 fprintf(stderr, "Total time=%f\n", p->total_time / 1000000.0);
|
|
716 fprintf(stderr, "Wait time=%f\n", p->wait_time / 1000000.0);
|
|
717 fprintf(stderr, "%d%% utilisation\n",
|
|
718 (int)(100 - ((100.0 * p->wait_time) / p->total_time + 0.5)));
|
|
719 for (i = 0; i < p->tsize; i++)
|
|
720 fprintf(stderr, "%d: Wait time=%f\n", i,
|
|
721 p->t[i].wait_time / 1000000.0);
|
|
722 #endif
|
|
723
|
|
724 if (p->t_stack)
|
|
725 free(p->t_stack);
|
|
726
|
|
727 free(p->t);
|
|
728 free(p);
|
|
729
|
|
730 #ifdef DEBUG
|
|
731 fprintf(stderr, "Destroyed pool %p\n", p);
|
|
732 #endif
|
|
733 }
|
|
734
|
|
735
|
|
736 /*-----------------------------------------------------------------------------
|
|
737 * Test app.
|
|
738 */
|
|
739
|
|
740 #ifdef TEST_MAIN
|
|
741
|
|
742 #include <stdio.h>
|
|
743 #include <math.h>
|
|
744
|
|
745 void *doit(void *arg) {
|
|
746 int i, k, x = 0;
|
|
747 int job = *(int *)arg;
|
|
748 int *res;
|
|
749
|
|
750 printf("Worker: execute job %d\n", job);
|
|
751
|
|
752 usleep(random() % 1000000); // to coerce job completion out of order
|
|
753 if (0) {
|
|
754 for (k = 0; k < 100; k++) {
|
|
755 for (i = 0; i < 100000; i++) {
|
|
756 x++;
|
|
757 x += x * sin(i);
|
|
758 x += x * cos(x);
|
|
759 }
|
|
760 }
|
|
761 x *= 100;
|
|
762 x += job;
|
|
763 } else {
|
|
764 x = job*job;
|
|
765 }
|
|
766
|
|
767 printf("Worker: job %d terminating, x=%d\n", job, x);
|
|
768
|
|
769 free(arg);
|
|
770
|
|
771 res = malloc(sizeof(*res));
|
|
772 *res = x;
|
|
773
|
|
774 return res;
|
|
775 }
|
|
776
|
|
777 #define NTHREADS 8
|
|
778
|
|
779 int main(int argc, char **argv) {
|
|
780 t_pool *p = t_pool_init(NTHREADS*2, NTHREADS);
|
|
781 t_results_queue *q = t_results_queue_init();
|
|
782 int i;
|
|
783 t_pool_result *r;
|
|
784
|
|
785 // Dispatch jobs
|
|
786 for (i = 0; i < 20; i++) {
|
|
787 int *ip = malloc(sizeof(*ip));
|
|
788 *ip = i;
|
|
789 printf("Submitting %d\n", i);
|
|
790 t_pool_dispatch(p, q, doit, ip);
|
|
791
|
|
792 // Check for results
|
|
793 if ((r = t_pool_next_result(q))) {
|
|
794 printf("RESULT: %d\n", *(int *)r->data);
|
|
795 t_pool_delete_result(r, 1);
|
|
796 }
|
|
797 }
|
|
798
|
|
799 t_pool_flush(p);
|
|
800
|
|
801 while ((r = t_pool_next_result(q))) {
|
|
802 printf("RESULT: %d\n", *(int *)r->data);
|
|
803 t_pool_delete_result(r, 1);
|
|
804 }
|
|
805
|
|
806 t_pool_destroy(p, 0);
|
|
807 t_results_queue_destroy(q);
|
|
808
|
|
809 return 0;
|
|
810 }
|
|
811 #endif
|