OpenDNSSEC-signer 1.3.0rc3
|
00001 /* 00002 * $Id: worker.c 5227 2011-06-12 08:51:24Z jakob $ 00003 * 00004 * Copyright (c) 2009 NLNet Labs. All rights reserved. 00005 * 00006 * Redistribution and use in source and binary forms, with or without 00007 * modification, are permitted provided that the following conditions 00008 * are met: 00009 * 1. Redistributions of source code must retain the above copyright 00010 * notice, this list of conditions and the following disclaimer. 00011 * 2. Redistributions in binary form must reproduce the above copyright 00012 * notice, this list of conditions and the following disclaimer in the 00013 * documentation and/or other materials provided with the distribution. 00014 * 00015 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 00016 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 00017 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 00018 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY 00019 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 00020 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE 00021 * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00022 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER 00023 * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 00024 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN 00025 * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00026 * 00027 */ 00028 00034 #include "adapter/adapi.h" 00035 #include "daemon/engine.h" 00036 #include "daemon/worker.h" 00037 #include "shared/allocator.h" 00038 #include "scheduler/schedule.h" 00039 #include "scheduler/task.h" 00040 #include "shared/locks.h" 00041 #include "shared/log.h" 00042 #include "shared/status.h" 00043 #include "shared/util.h" 00044 #include "signer/tools.h" 00045 #include "signer/zone.h" 00046 #include "signer/zonedata.h" 00047 00048 #include <time.h> /* time() */ 00049 00050 ods_lookup_table worker_str[] = { 00051 { WORKER_WORKER, "worker" }, 00052 { WORKER_DRUDGER, "drudger" }, 00053 { 0, NULL } 00054 }; 00055 00056 00061 worker_type* 00062 worker_create(allocator_type* allocator, int num, worker_id type) 00063 { 00064 worker_type* worker; 00065 00066 if (!allocator) { 00067 return NULL; 00068 } 00069 ods_log_assert(allocator); 00070 00071 worker = (worker_type*) allocator_alloc(allocator, sizeof(worker_type)); 00072 if (!worker) { 00073 return NULL; 00074 } 00075 00076 ods_log_debug("create worker[%i]", num +1); 00077 lock_basic_init(&worker->worker_lock); 00078 lock_basic_set(&worker->worker_alarm); 00079 lock_basic_lock(&worker->worker_lock); 00080 worker->allocator = allocator; 00081 worker->thread_num = num +1; 00082 worker->engine = NULL; 00083 worker->task = NULL; 00084 worker->working_with = TASK_NONE; 00085 worker->need_to_exit = 0; 00086 worker->type = type; 00087 worker->clock_in = 0; 00088 worker->jobs_appointed = 0; 00089 worker->jobs_completed = 0; 00090 worker->jobs_failed = 0; 00091 worker->sleeping = 0; 00092 worker->waiting = 0; 00093 lock_basic_unlock(&worker->worker_lock); 00094 return worker; 00095 } 00096 00097 00102 static const char* 00103 worker2str(worker_id type) 00104 { 00105 ods_lookup_table *lt = ods_lookup_by_id(worker_str, type); 00106 if (lt) { 00107 return lt->name; 00108 } 00109 return NULL; 00110 } 00111 00112 00117 static int 00118 worker_fulfilled(worker_type* worker) 00119 { 00120 return (worker->jobs_completed + worker->jobs_failed) == 00121 worker->jobs_appointed; 00122 } 00123 00124 00129 static void 00130 worker_perform_task(worker_type* worker) 00131 { 00132 engine_type* engine = NULL; 00133 zone_type* zone = NULL; 00134 task_type* task = NULL; 00135 task_id what = TASK_NONE; 00136 time_t when = 0; 00137 time_t never = (3600*24*365); 00138 ods_status status = ODS_STATUS_OK; 00139 int fallthrough = 0; 00140 int backup = 0; 00141 char* working_dir = NULL; 00142 char* cfg_filename = NULL; 00143 uint32_t tmpserial = 0; 00144 time_t start = 0; 00145 time_t end = 0; 00146 00147 if (!worker || !worker->task || !worker->task->zone || !worker->engine) { 00148 return; 00149 } 00150 ods_log_assert(worker); 00151 ods_log_assert(worker->task); 00152 ods_log_assert(worker->task->zone); 00153 00154 engine = (engine_type*) worker->engine; 00155 task = (task_type*) worker->task; 00156 zone = (zone_type*) worker->task->zone; 00157 ods_log_debug("[%s[%i]] perform task %s for zone %s at %u", 00158 worker2str(worker->type), worker->thread_num, task_what2str(task->what), 00159 task_who2str(task->who), (uint32_t) worker->clock_in); 00160 00161 switch (task->what) { 00162 case TASK_SIGNCONF: 00163 worker->working_with = TASK_SIGNCONF; 00164 /* perform 'load signconf' task */ 00165 ods_log_verbose("[%s[%i]] load signconf for zone %s", 00166 worker2str(worker->type), worker->thread_num, 00167 task_who2str(task->who)); 00168 status = zone_load_signconf(zone, &what); 00169 00170 /* what to do next */ 00171 when = time_now(); 00172 if (status == ODS_STATUS_UNCHANGED) { 00173 if (task->halted != TASK_NONE) { 00174 goto task_perform_continue; 00175 } else { 00176 status = ODS_STATUS_OK; 00177 } 00178 } 00179 00180 if (status == ODS_STATUS_OK) { 00181 status = zone_publish_dnskeys(zone, 0); 00182 } 00183 if (status == ODS_STATUS_OK) { 00184 status = zone_prepare_nsec3(zone, 0); 00185 } 00186 if (status == ODS_STATUS_OK) { 00187 status = zonedata_commit(zone->zonedata); 00188 } 00189 00190 if (status == ODS_STATUS_OK) { 00191 task->interrupt = TASK_NONE; 00192 task->halted = TASK_NONE; 00193 } else { 00194 if (task->halted == TASK_NONE) { 00195 goto task_perform_fail; 00196 } 00197 goto task_perform_continue; 00198 } 00199 fallthrough = 0; 00200 break; 00201 case TASK_READ: 00202 worker->working_with = TASK_READ; 00203 /* perform 'read input adapter' task */ 00204 ods_log_verbose("[%s[%i]] read zone %s", 00205 worker2str(worker->type), worker->thread_num, 00206 task_who2str(task->who)); 00207 status = tools_input(zone); 00208 00209 /* what to do next */ 00210 what = TASK_NSECIFY; 00211 when = time_now(); 00212 if (status != ODS_STATUS_OK) { 00213 if (task->halted == TASK_NONE) { 00214 goto task_perform_fail; 00215 } 00216 goto task_perform_continue; 00217 } 00218 fallthrough = 1; 00219 case TASK_NSECIFY: 00220 worker->working_with = TASK_NSECIFY; 00221 ods_log_verbose("[%s[%i]] nsecify zone %s", 00222 worker2str(worker->type), worker->thread_num, 00223 task_who2str(task->who)); 00224 status = tools_nsecify(zone); 00225 00226 /* what to do next */ 00227 what = TASK_SIGN; 00228 when = time_now(); 00229 if (status == ODS_STATUS_OK) { 00230 if (task->interrupt > TASK_SIGNCONF) { 00231 task->interrupt = TASK_NONE; 00232 task->halted = TASK_NONE; 00233 } 00234 } else { 00235 if (task->halted == TASK_NONE) { 00236 goto task_perform_fail; 00237 } 00238 goto task_perform_continue; 00239 } 00240 fallthrough = 1; 00241 case TASK_SIGN: 00242 worker->working_with = TASK_SIGN; 00243 ods_log_verbose("[%s[%i]] sign zone %s", 00244 worker2str(worker->type), worker->thread_num, 00245 task_who2str(task->who)); 00246 tmpserial = zone->zonedata->internal_serial; 00247 status = zone_update_serial(zone); 00248 if (status != ODS_STATUS_OK) { 00249 ods_log_error("[%s[%i]] unable to sign zone %s: " 00250 "failed to increment serial", 00251 worker2str(worker->type), worker->thread_num, 00252 task_who2str(task->who)); 00253 } else { 00254 /* start timer */ 00255 start = time(NULL); 00256 if (zone->stats) { 00257 lock_basic_lock(&zone->stats->stats_lock); 00258 if (!zone->stats->start_time) { 00259 zone->stats->start_time = start; 00260 } 00261 zone->stats->sig_count = 0; 00262 zone->stats->sig_soa_count = 0; 00263 zone->stats->sig_reuse = 0; 00264 zone->stats->sig_time = 0; 00265 lock_basic_unlock(&zone->stats->stats_lock); 00266 } 00267 00268 /* queue menial, hard signing work */ 00269 status = zonedata_queue(zone->zonedata, engine->signq, worker); 00270 ods_log_debug("[%s[%i]] wait until drudgers are finished " 00271 " signing zone %s", worker2str(worker->type), 00272 worker->thread_num, task_who2str(task->who)); 00273 00274 /* sleep until work is done */ 00275 worker_sleep_unless(worker, 0); 00276 if (worker->jobs_failed > 0) { 00277 ods_log_error("[%s[%i]] sign zone %s failed: %u of %u " 00278 "signatures failed", worker2str(worker->type), 00279 worker->thread_num, task_who2str(task->who), 00280 worker->jobs_failed, worker->jobs_appointed); 00281 status = ODS_STATUS_ERR; 00282 } 00283 worker->jobs_appointed = 0; 00284 worker->jobs_completed = 0; 00285 worker->jobs_failed = 0; 00286 00287 /* stop timer */ 00288 end = time(NULL); 00289 if (status == ODS_STATUS_OK && zone->stats) { 00290 lock_basic_lock(&zone->stats->stats_lock); 00291 zone->stats->sig_time = (end-start); 00292 lock_basic_unlock(&zone->stats->stats_lock); 00293 } 00294 } 00295 00296 /* what to do next */ 00297 if (status != ODS_STATUS_OK) { 00298 /* rollback serial */ 00299 zone->zonedata->internal_serial = tmpserial; 00300 if (task->halted == TASK_NONE) { 00301 goto task_perform_fail; 00302 } 00303 goto task_perform_continue; 00304 } else { 00305 if (task->interrupt > TASK_SIGNCONF) { 00306 task->interrupt = TASK_NONE; 00307 task->halted = TASK_NONE; 00308 } 00309 } 00310 what = TASK_AUDIT; 00311 when = time_now(); 00312 fallthrough = 1; 00313 case TASK_AUDIT: 00314 worker->working_with = TASK_AUDIT; 00315 if (zone->signconf->audit) { 00316 ods_log_verbose("[%s[%i]] audit zone %s", 00317 worker2str(worker->type), worker->thread_num, 00318 task_who2str(task->who)); 00319 working_dir = strdup(engine->config->working_dir); 00320 cfg_filename = strdup(engine->config->cfg_filename); 00321 status = tools_audit(zone, working_dir, cfg_filename); 00322 if (working_dir) { free((void*)working_dir); } 00323 if (cfg_filename) { free((void*)cfg_filename); } 00324 working_dir = NULL; 00325 cfg_filename = NULL; 00326 } else { 00327 status = ODS_STATUS_OK; 00328 } 00329 00330 /* what to do next */ 00331 if (status != ODS_STATUS_OK) { 00332 if (task->halted == TASK_NONE) { 00333 goto task_perform_fail; 00334 } 00335 goto task_perform_continue; 00336 } 00337 what = TASK_WRITE; 00338 when = time_now(); 00339 fallthrough = 1; 00340 case TASK_WRITE: 00341 worker->working_with = TASK_WRITE; 00342 ods_log_verbose("[%s[%i]] write zone %s", 00343 worker2str(worker->type), worker->thread_num, 00344 task_who2str(task->who)); 00345 00346 status = tools_output(zone); 00347 zone->processed = 1; 00348 00349 /* what to do next */ 00350 if (status != ODS_STATUS_OK) { 00351 if (task->halted == TASK_NONE) { 00352 goto task_perform_fail; 00353 } 00354 goto task_perform_continue; 00355 } else { 00356 if (task->interrupt > TASK_SIGNCONF) { 00357 task->interrupt = TASK_NONE; 00358 task->halted = TASK_NONE; 00359 } 00360 } 00361 if (duration2time(zone->signconf->sig_resign_interval)) { 00362 what = TASK_SIGN; 00363 when = time_now() + 00364 duration2time(zone->signconf->sig_resign_interval); 00365 } else { 00366 what = TASK_NONE; 00367 when = time_now() + never; 00368 } 00369 backup = 1; 00370 fallthrough = 0; 00371 break; 00372 case TASK_NONE: 00373 worker->working_with = TASK_NONE; 00374 ods_log_warning("[%s[%i]] none task for zone %s", 00375 worker2str(worker->type), worker->thread_num, 00376 task_who2str(task->who)); 00377 when = time_now() + never; 00378 fallthrough = 0; 00379 break; 00380 default: 00381 ods_log_warning("[%s[%i]] unknown task, trying full sign zone %s", 00382 worker2str(worker->type), worker->thread_num, 00383 task_who2str(task->who)); 00384 what = TASK_SIGNCONF; 00385 when = time_now(); 00386 fallthrough = 0; 00387 break; 00388 } 00389 00390 /* no error, reset backoff */ 00391 task->backoff = 0; 00392 00393 /* set next task */ 00394 if (fallthrough == 0 && task->interrupt != TASK_NONE && 00395 task->interrupt != what) { 00396 ods_log_debug("[%s[%i]] interrupt task %s for zone %s", 00397 worker2str(worker->type), worker->thread_num, 00398 task_what2str(what), task_who2str(task->who)); 00399 00400 task->what = task->interrupt; 00401 task->when = time_now(); 00402 task->halted = what; 00403 } else { 00404 ods_log_debug("[%s[%i]] next task %s for zone %s", 00405 worker2str(worker->type), worker->thread_num, 00406 task_what2str(what), task_who2str(task->who)); 00407 00408 task->what = what; 00409 task->when = when; 00410 if (!fallthrough) { 00411 task->interrupt = TASK_NONE; 00412 task->halted = TASK_NONE; 00413 } 00414 } 00415 00416 /* backup the last successful run */ 00417 if (backup) { 00418 status = zone_backup(zone); 00419 if (status != ODS_STATUS_OK) { 00420 ods_log_warning("[%s[%i]] unable to backup zone %s: %s", 00421 worker2str(worker->type), worker->thread_num, 00422 task_who2str(task->who), ods_status2str(status)); 00423 /* just a warning */ 00424 status = ODS_STATUS_OK; 00425 } 00426 backup = 0; 00427 } 00428 return; 00429 00430 task_perform_fail: 00431 /* in case of failure, also mark zone processed (for single run usage) */ 00432 zone->processed = 1; 00433 00434 if (task->backoff) { 00435 task->backoff *= 2; 00436 if (task->backoff > ODS_SE_MAX_BACKOFF) { 00437 task->backoff = ODS_SE_MAX_BACKOFF; 00438 } 00439 } else { 00440 task->backoff = 60; 00441 } 00442 ods_log_error("[%s[%i]] backoff task %s for zone %s with %u seconds", 00443 worker2str(worker->type), worker->thread_num, 00444 task_what2str(task->what), task_who2str(task->who), task->backoff); 00445 00446 task->when = time_now() + task->backoff; 00447 return; 00448 00449 task_perform_continue: 00450 ods_log_info("[%s[%i]] continue task %s for zone %s", 00451 worker2str(worker->type), worker->thread_num, 00452 task_what2str(task->halted), task_who2str(task->who)); 00453 00454 what = task->halted; 00455 task->what = what; 00456 task->when = time_now(); 00457 task->interrupt = TASK_NONE; 00458 task->halted = TASK_NONE; 00459 if (zone->processed) { 00460 task->when += duration2time(zone->signconf->sig_resign_interval); 00461 } 00462 return; 00463 } 00464 00465 00470 static void 00471 worker_work(worker_type* worker) 00472 { 00473 time_t now, timeout = 1; 00474 zone_type* zone = NULL; 00475 ods_status status = ODS_STATUS_OK; 00476 00477 ods_log_assert(worker); 00478 ods_log_assert(worker->type == WORKER_WORKER); 00479 00480 while (worker->need_to_exit == 0) { 00481 ods_log_debug("[%s[%i]] report for duty", worker2str(worker->type), 00482 worker->thread_num); 00483 lock_basic_lock(&worker->engine->taskq->schedule_lock); 00484 /* [LOCK] schedule */ 00485 worker->task = schedule_pop_task(worker->engine->taskq); 00486 /* [UNLOCK] schedule */ 00487 if (worker->task) { 00488 worker->working_with = worker->task->what; 00489 lock_basic_unlock(&worker->engine->taskq->schedule_lock); 00490 00491 zone = worker->task->zone; 00492 lock_basic_lock(&zone->zone_lock); 00493 /* [LOCK] zone */ 00494 ods_log_debug("[%s[%i]] start working on zone %s", 00495 worker2str(worker->type), worker->thread_num, zone->name); 00496 00497 worker->clock_in = time(NULL); 00498 worker_perform_task(worker); 00499 00500 zone->task = worker->task; 00501 00502 ods_log_debug("[%s[%i]] finished working on zone %s", 00503 worker2str(worker->type), worker->thread_num, zone->name); 00504 /* [UNLOCK] zone */ 00505 00506 lock_basic_lock(&worker->engine->taskq->schedule_lock); 00507 /* [LOCK] zone, schedule */ 00508 worker->task = NULL; 00509 worker->working_with = TASK_NONE; 00510 status = schedule_task(worker->engine->taskq, zone->task, 1); 00511 /* [UNLOCK] zone, schedule */ 00512 lock_basic_unlock(&worker->engine->taskq->schedule_lock); 00513 lock_basic_unlock(&zone->zone_lock); 00514 00515 timeout = 1; 00516 } else { 00517 ods_log_debug("[%s[%i]] nothing to do", worker2str(worker->type), 00518 worker->thread_num); 00519 00520 /* [LOCK] schedule */ 00521 worker->task = schedule_get_first_task(worker->engine->taskq); 00522 /* [UNLOCK] schedule */ 00523 lock_basic_unlock(&worker->engine->taskq->schedule_lock); 00524 00525 now = time_now(); 00526 if (worker->task && !worker->engine->taskq->loading) { 00527 timeout = (worker->task->when - now); 00528 } else { 00529 timeout *= 2; 00530 if (timeout > ODS_SE_MAX_BACKOFF) { 00531 timeout = ODS_SE_MAX_BACKOFF; 00532 } 00533 } 00534 worker->task = NULL; 00535 worker_sleep(worker, timeout); 00536 } 00537 } 00538 return; 00539 } 00540 00541 00546 static void 00547 worker_drudge(worker_type* worker) 00548 { 00549 zone_type* zone = NULL; 00550 task_type* task = NULL; 00551 rrset_type* rrset = NULL; 00552 ods_status status = ODS_STATUS_OK; 00553 worker_type* chief = NULL; 00554 hsm_ctx_t* ctx = NULL; 00555 00556 ods_log_assert(worker); 00557 ods_log_assert(worker->type == WORKER_DRUDGER); 00558 00559 ctx = hsm_create_context(); 00560 if (ctx == NULL) { 00561 ods_log_error("[%s[%i]] unable to drudge: error " 00562 "creating libhsm context", worker2str(worker->type), 00563 worker->thread_num); 00564 } 00565 00566 while (worker->need_to_exit == 0) { 00567 ods_log_debug("[%s[%i]] report for duty", worker2str(worker->type), 00568 worker->thread_num); 00569 00570 lock_basic_lock(&worker->engine->signq->q_lock); 00571 /* [LOCK] schedule */ 00572 rrset = (rrset_type*) fifoq_pop(worker->engine->signq, &chief); 00573 /* [UNLOCK] schedule */ 00574 lock_basic_unlock(&worker->engine->signq->q_lock); 00575 if (rrset) { 00576 /* set up the work */ 00577 if (chief) { 00578 task = chief->task; 00579 } 00580 if (task) { 00581 zone = task->zone; 00582 } 00583 if (!zone) { 00584 ods_log_error("[%s[%i]] unable to drudge: no zone reference", 00585 worker2str(worker->type), worker->thread_num); 00586 } 00587 if (zone && ctx) { 00588 ods_log_assert(rrset); 00589 ods_log_assert(zone); 00590 ods_log_assert(zone->dname); 00591 ods_log_assert(zone->signconf); 00592 ods_log_assert(ctx); 00593 00594 worker->clock_in = time(NULL); 00595 status = rrset_sign(ctx, rrset, zone->dname, zone->signconf, 00596 chief->clock_in, zone->stats); 00597 } else { 00598 status = ODS_STATUS_ASSERT_ERR; 00599 } 00600 00601 if (chief) { 00602 lock_basic_lock(&chief->worker_lock); 00603 if (status == ODS_STATUS_OK) { 00604 chief->jobs_completed += 1; 00605 } else { 00606 chief->jobs_failed += 1; 00607 /* destroy context? */ 00608 } 00609 lock_basic_unlock(&chief->worker_lock); 00610 00611 if (worker_fulfilled(chief) && chief->sleeping) { 00612 worker_wakeup(chief); 00613 } 00614 } 00615 rrset = NULL; 00616 zone = NULL; 00617 task = NULL; 00618 chief = NULL; 00619 } else { 00620 ods_log_debug("[%s[%i]] nothing to do", worker2str(worker->type), 00621 worker->thread_num); 00622 worker_wait(&worker->engine->signq->q_lock, 00623 &worker->engine->signq->q_threshold); 00624 } 00625 } 00626 00627 /* cleanup open HSM sessions */ 00628 hsm_destroy_context(ctx); 00629 ctx = NULL; 00630 return; 00631 } 00632 00633 00638 void 00639 worker_start(worker_type* worker) 00640 { 00641 ods_log_assert(worker); 00642 switch (worker->type) { 00643 case WORKER_DRUDGER: 00644 worker_drudge(worker); 00645 break; 00646 case WORKER_WORKER: 00647 worker_work(worker); 00648 break; 00649 default: 00650 ods_log_error("[worker] illegal worker (id=%i)", worker->type); 00651 return; 00652 } 00653 return; 00654 } 00655 00656 00661 void 00662 worker_sleep(worker_type* worker, time_t timeout) 00663 { 00664 ods_log_assert(worker); 00665 lock_basic_lock(&worker->worker_lock); 00666 /* [LOCK] worker */ 00667 worker->sleeping = 1; 00668 lock_basic_sleep(&worker->worker_alarm, &worker->worker_lock, 00669 timeout); 00670 /* [UNLOCK] worker */ 00671 lock_basic_unlock(&worker->worker_lock); 00672 return; 00673 } 00674 00675 00680 void 00681 worker_sleep_unless(worker_type* worker, time_t timeout) 00682 { 00683 ods_log_assert(worker); 00684 lock_basic_lock(&worker->worker_lock); 00685 /* [LOCK] worker */ 00686 if (!worker_fulfilled(worker)) { 00687 worker->sleeping = 1; 00688 lock_basic_sleep(&worker->worker_alarm, &worker->worker_lock, 00689 timeout); 00690 } 00691 /* [UNLOCK] worker */ 00692 lock_basic_unlock(&worker->worker_lock); 00693 return; 00694 } 00695 00696 00701 void 00702 worker_wakeup(worker_type* worker) 00703 { 00704 ods_log_assert(worker); 00705 if (worker && worker->sleeping && !worker->waiting) { 00706 ods_log_debug("[%s[%i]] wake up", worker2str(worker->type), 00707 worker->thread_num); 00708 lock_basic_lock(&worker->worker_lock); 00709 /* [LOCK] worker */ 00710 lock_basic_alarm(&worker->worker_alarm); 00711 worker->sleeping = 0; 00712 /* [UNLOCK] worker */ 00713 lock_basic_unlock(&worker->worker_lock); 00714 } 00715 return; 00716 } 00717 00718 00723 void 00724 worker_wait(lock_basic_type* lock, cond_basic_type* condition) 00725 { 00726 lock_basic_lock(lock); 00727 /* [LOCK] worker */ 00728 lock_basic_sleep(condition, lock, 0); 00729 /* [UNLOCK] worker */ 00730 lock_basic_unlock(lock); 00731 return; 00732 } 00733 00734 00739 void 00740 worker_notify(lock_basic_type* lock, cond_basic_type* condition) 00741 { 00742 lock_basic_lock(lock); 00743 /* [LOCK] lock */ 00744 lock_basic_alarm(condition); 00745 /* [UNLOCK] lock */ 00746 lock_basic_unlock(lock); 00747 return; 00748 } 00749 00750 00755 void 00756 worker_notify_all(lock_basic_type* lock, cond_basic_type* condition) 00757 { 00758 lock_basic_lock(lock); 00759 /* [LOCK] lock */ 00760 lock_basic_broadcast(condition); 00761 /* [UNLOCK] lock */ 00762 lock_basic_unlock(lock); 00763 return; 00764 } 00765 00766 00771 void 00772 worker_cleanup(worker_type* worker) 00773 { 00774 allocator_type* allocator; 00775 cond_basic_type worker_cond; 00776 lock_basic_type worker_lock; 00777 00778 if (!worker) { 00779 return; 00780 } 00781 allocator = worker->allocator; 00782 worker_cond = worker->worker_alarm; 00783 worker_lock = worker->worker_lock; 00784 00785 allocator_deallocate(allocator, (void*) worker); 00786 lock_basic_destroy(&worker_lock); 00787 lock_basic_off(&worker_cond); 00788 return; 00789 }