OpenDNSSEC-signer 1.3.0rc3
/build/buildd2-opendnssec_1.3.0~rc3-1-mips-lpJjcT/opendnssec-1.3.0~rc3/signer/src/daemon/worker.c
Go to the documentation of this file.
00001 /*
00002  * $Id: worker.c 5227 2011-06-12 08:51:24Z jakob $
00003  *
00004  * Copyright (c) 2009 NLNet Labs. All rights reserved.
00005  *
00006  * Redistribution and use in source and binary forms, with or without
00007  * modification, are permitted provided that the following conditions
00008  * are met:
00009  * 1. Redistributions of source code must retain the above copyright
00010  *    notice, this list of conditions and the following disclaimer.
00011  * 2. Redistributions in binary form must reproduce the above copyright
00012  *    notice, this list of conditions and the following disclaimer in the
00013  *    documentation and/or other materials provided with the distribution.
00014  *
00015  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
00016  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
00017  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00018  * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
00019  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
00020  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
00021  * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00022  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
00023  * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
00024  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
00025  * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00026  *
00027  */
00028 
00034 #include "adapter/adapi.h"
00035 #include "daemon/engine.h"
00036 #include "daemon/worker.h"
00037 #include "shared/allocator.h"
00038 #include "scheduler/schedule.h"
00039 #include "scheduler/task.h"
00040 #include "shared/locks.h"
00041 #include "shared/log.h"
00042 #include "shared/status.h"
00043 #include "shared/util.h"
00044 #include "signer/tools.h"
00045 #include "signer/zone.h"
00046 #include "signer/zonedata.h"
00047 
00048 #include <time.h> /* time() */
00049 
00050 ods_lookup_table worker_str[] = {
00051     { WORKER_WORKER, "worker" },
00052     { WORKER_DRUDGER, "drudger" },
00053     { 0, NULL }
00054 };
00055 
00056 
00061 worker_type*
00062 worker_create(allocator_type* allocator, int num, worker_id type)
00063 {
00064     worker_type* worker;
00065 
00066     if (!allocator) {
00067         return NULL;
00068     }
00069     ods_log_assert(allocator);
00070 
00071     worker = (worker_type*) allocator_alloc(allocator, sizeof(worker_type));
00072     if (!worker) {
00073         return NULL;
00074     }
00075 
00076     ods_log_debug("create worker[%i]", num +1);
00077     lock_basic_init(&worker->worker_lock);
00078     lock_basic_set(&worker->worker_alarm);
00079     lock_basic_lock(&worker->worker_lock);
00080     worker->allocator = allocator;
00081     worker->thread_num = num +1;
00082     worker->engine = NULL;
00083     worker->task = NULL;
00084     worker->working_with = TASK_NONE;
00085     worker->need_to_exit = 0;
00086     worker->type = type;
00087     worker->clock_in = 0;
00088     worker->jobs_appointed = 0;
00089     worker->jobs_completed = 0;
00090     worker->jobs_failed = 0;
00091     worker->sleeping = 0;
00092     worker->waiting = 0;
00093     lock_basic_unlock(&worker->worker_lock);
00094     return worker;
00095 }
00096 
00097 
00102 static const char*
00103 worker2str(worker_id type)
00104 {
00105     ods_lookup_table *lt = ods_lookup_by_id(worker_str, type);
00106     if (lt) {
00107         return lt->name;
00108     }
00109     return NULL;
00110 }
00111 
00112 
00117 static int
00118 worker_fulfilled(worker_type* worker)
00119 {
00120     return (worker->jobs_completed + worker->jobs_failed) ==
00121         worker->jobs_appointed;
00122 }
00123 
00124 
00129 static void
00130 worker_perform_task(worker_type* worker)
00131 {
00132     engine_type* engine = NULL;
00133     zone_type* zone = NULL;
00134     task_type* task = NULL;
00135     task_id what = TASK_NONE;
00136     time_t when = 0;
00137     time_t never = (3600*24*365);
00138     ods_status status = ODS_STATUS_OK;
00139     int fallthrough = 0;
00140     int backup = 0;
00141     char* working_dir = NULL;
00142     char* cfg_filename = NULL;
00143     uint32_t tmpserial = 0;
00144     time_t start = 0;
00145     time_t end = 0;
00146 
00147     if (!worker || !worker->task || !worker->task->zone || !worker->engine) {
00148         return;
00149     }
00150     ods_log_assert(worker);
00151     ods_log_assert(worker->task);
00152     ods_log_assert(worker->task->zone);
00153 
00154     engine = (engine_type*) worker->engine;
00155     task = (task_type*) worker->task;
00156     zone = (zone_type*) worker->task->zone;
00157     ods_log_debug("[%s[%i]] perform task %s for zone %s at %u",
00158        worker2str(worker->type), worker->thread_num, task_what2str(task->what),
00159        task_who2str(task->who), (uint32_t) worker->clock_in);
00160 
00161     switch (task->what) {
00162         case TASK_SIGNCONF:
00163             worker->working_with = TASK_SIGNCONF;
00164             /* perform 'load signconf' task */
00165             ods_log_verbose("[%s[%i]] load signconf for zone %s",
00166                 worker2str(worker->type), worker->thread_num,
00167                 task_who2str(task->who));
00168             status = zone_load_signconf(zone, &what);
00169 
00170             /* what to do next */
00171             when = time_now();
00172             if (status == ODS_STATUS_UNCHANGED) {
00173                 if (task->halted != TASK_NONE) {
00174                     goto task_perform_continue;
00175                 } else {
00176                     status = ODS_STATUS_OK;
00177                 }
00178             }
00179 
00180             if (status == ODS_STATUS_OK) {
00181                 status = zone_publish_dnskeys(zone, 0);
00182             }
00183             if (status == ODS_STATUS_OK) {
00184                 status = zone_prepare_nsec3(zone, 0);
00185             }
00186             if (status == ODS_STATUS_OK) {
00187                 status = zonedata_commit(zone->zonedata);
00188             }
00189 
00190             if (status == ODS_STATUS_OK) {
00191                 task->interrupt = TASK_NONE;
00192                 task->halted = TASK_NONE;
00193             } else {
00194                 if (task->halted == TASK_NONE) {
00195                     goto task_perform_fail;
00196                 }
00197                 goto task_perform_continue;
00198             }
00199             fallthrough = 0;
00200             break;
00201         case TASK_READ:
00202             worker->working_with = TASK_READ;
00203             /* perform 'read input adapter' task */
00204             ods_log_verbose("[%s[%i]] read zone %s",
00205                 worker2str(worker->type), worker->thread_num,
00206                 task_who2str(task->who));
00207             status = tools_input(zone);
00208 
00209             /* what to do next */
00210             what = TASK_NSECIFY;
00211             when = time_now();
00212             if (status != ODS_STATUS_OK) {
00213                 if (task->halted == TASK_NONE) {
00214                     goto task_perform_fail;
00215                 }
00216                 goto task_perform_continue;
00217             }
00218             fallthrough = 1;
00219         case TASK_NSECIFY:
00220             worker->working_with = TASK_NSECIFY;
00221             ods_log_verbose("[%s[%i]] nsecify zone %s",
00222                 worker2str(worker->type), worker->thread_num,
00223                 task_who2str(task->who));
00224             status = tools_nsecify(zone);
00225 
00226             /* what to do next */
00227             what = TASK_SIGN;
00228             when = time_now();
00229             if (status == ODS_STATUS_OK) {
00230                 if (task->interrupt > TASK_SIGNCONF) {
00231                     task->interrupt = TASK_NONE;
00232                     task->halted = TASK_NONE;
00233                 }
00234             } else {
00235                 if (task->halted == TASK_NONE) {
00236                     goto task_perform_fail;
00237                 }
00238                 goto task_perform_continue;
00239             }
00240             fallthrough = 1;
00241         case TASK_SIGN:
00242             worker->working_with = TASK_SIGN;
00243             ods_log_verbose("[%s[%i]] sign zone %s",
00244                 worker2str(worker->type), worker->thread_num,
00245                 task_who2str(task->who));
00246             tmpserial = zone->zonedata->internal_serial;
00247             status = zone_update_serial(zone);
00248             if (status != ODS_STATUS_OK) {
00249                 ods_log_error("[%s[%i]] unable to sign zone %s: "
00250                     "failed to increment serial",
00251                     worker2str(worker->type), worker->thread_num,
00252                     task_who2str(task->who));
00253             } else {
00254                 /* start timer */
00255                 start = time(NULL);
00256                 if (zone->stats) {
00257                     lock_basic_lock(&zone->stats->stats_lock);
00258                     if (!zone->stats->start_time) {
00259                         zone->stats->start_time = start;
00260                     }
00261                     zone->stats->sig_count = 0;
00262                     zone->stats->sig_soa_count = 0;
00263                     zone->stats->sig_reuse = 0;
00264                     zone->stats->sig_time = 0;
00265                     lock_basic_unlock(&zone->stats->stats_lock);
00266                 }
00267 
00268                 /* queue menial, hard signing work */
00269                 status = zonedata_queue(zone->zonedata, engine->signq, worker);
00270                 ods_log_debug("[%s[%i]] wait until drudgers are finished "
00271                     " signing zone %s", worker2str(worker->type),
00272                     worker->thread_num, task_who2str(task->who));
00273 
00274                 /* sleep until work is done */
00275                 worker_sleep_unless(worker, 0);
00276                 if (worker->jobs_failed > 0) {
00277                     ods_log_error("[%s[%i]] sign zone %s failed: %u of %u "
00278                         "signatures failed", worker2str(worker->type),
00279                         worker->thread_num, task_who2str(task->who),
00280                         worker->jobs_failed, worker->jobs_appointed);
00281                     status = ODS_STATUS_ERR;
00282                 }
00283                 worker->jobs_appointed = 0;
00284                 worker->jobs_completed = 0;
00285                 worker->jobs_failed = 0;
00286 
00287                 /* stop timer */
00288                 end = time(NULL);
00289                 if (status == ODS_STATUS_OK && zone->stats) {
00290                     lock_basic_lock(&zone->stats->stats_lock);
00291                     zone->stats->sig_time = (end-start);
00292                     lock_basic_unlock(&zone->stats->stats_lock);
00293                 }
00294             }
00295 
00296             /* what to do next */
00297             if (status != ODS_STATUS_OK) {
00298                 /* rollback serial */
00299                 zone->zonedata->internal_serial = tmpserial;
00300                 if (task->halted == TASK_NONE) {
00301                     goto task_perform_fail;
00302                 }
00303                 goto task_perform_continue;
00304             } else {
00305                 if (task->interrupt > TASK_SIGNCONF) {
00306                     task->interrupt = TASK_NONE;
00307                     task->halted = TASK_NONE;
00308                 }
00309             }
00310             what = TASK_AUDIT;
00311             when = time_now();
00312             fallthrough = 1;
00313         case TASK_AUDIT:
00314             worker->working_with = TASK_AUDIT;
00315             if (zone->signconf->audit) {
00316                 ods_log_verbose("[%s[%i]] audit zone %s",
00317                     worker2str(worker->type), worker->thread_num,
00318                     task_who2str(task->who));
00319                 working_dir = strdup(engine->config->working_dir);
00320                 cfg_filename = strdup(engine->config->cfg_filename);
00321                 status = tools_audit(zone, working_dir, cfg_filename);
00322                 if (working_dir)  { free((void*)working_dir); }
00323                 if (cfg_filename) { free((void*)cfg_filename); }
00324                 working_dir = NULL;
00325                 cfg_filename = NULL;
00326             } else {
00327                 status = ODS_STATUS_OK;
00328             }
00329 
00330             /* what to do next */
00331             if (status != ODS_STATUS_OK) {
00332                 if (task->halted == TASK_NONE) {
00333                     goto task_perform_fail;
00334                 }
00335                 goto task_perform_continue;
00336             }
00337             what = TASK_WRITE;
00338             when = time_now();
00339             fallthrough = 1;
00340         case TASK_WRITE:
00341             worker->working_with = TASK_WRITE;
00342             ods_log_verbose("[%s[%i]] write zone %s",
00343                 worker2str(worker->type), worker->thread_num,
00344                 task_who2str(task->who));
00345 
00346             status = tools_output(zone);
00347             zone->processed = 1;
00348 
00349             /* what to do next */
00350             if (status != ODS_STATUS_OK) {
00351                 if (task->halted == TASK_NONE) {
00352                     goto task_perform_fail;
00353                 }
00354                 goto task_perform_continue;
00355             } else {
00356                 if (task->interrupt > TASK_SIGNCONF) {
00357                     task->interrupt = TASK_NONE;
00358                     task->halted = TASK_NONE;
00359                 }
00360             }
00361             if (duration2time(zone->signconf->sig_resign_interval)) {
00362                 what = TASK_SIGN;
00363                 when = time_now() +
00364                     duration2time(zone->signconf->sig_resign_interval);
00365             } else {
00366                 what = TASK_NONE;
00367                 when = time_now() + never;
00368             }
00369             backup = 1;
00370             fallthrough = 0;
00371             break;
00372         case TASK_NONE:
00373             worker->working_with = TASK_NONE;
00374             ods_log_warning("[%s[%i]] none task for zone %s",
00375                 worker2str(worker->type), worker->thread_num,
00376                 task_who2str(task->who));
00377             when = time_now() + never;
00378             fallthrough = 0;
00379             break;
00380         default:
00381             ods_log_warning("[%s[%i]] unknown task, trying full sign zone %s",
00382                 worker2str(worker->type), worker->thread_num,
00383                 task_who2str(task->who));
00384             what = TASK_SIGNCONF;
00385             when = time_now();
00386             fallthrough = 0;
00387             break;
00388     }
00389 
00390     /* no error, reset backoff */
00391     task->backoff = 0;
00392 
00393     /* set next task */
00394     if (fallthrough == 0 && task->interrupt != TASK_NONE &&
00395         task->interrupt != what) {
00396         ods_log_debug("[%s[%i]] interrupt task %s for zone %s",
00397             worker2str(worker->type), worker->thread_num,
00398             task_what2str(what), task_who2str(task->who));
00399 
00400         task->what = task->interrupt;
00401         task->when = time_now();
00402         task->halted = what;
00403     } else {
00404         ods_log_debug("[%s[%i]] next task %s for zone %s",
00405             worker2str(worker->type), worker->thread_num,
00406             task_what2str(what), task_who2str(task->who));
00407 
00408         task->what = what;
00409         task->when = when;
00410         if (!fallthrough) {
00411             task->interrupt = TASK_NONE;
00412             task->halted = TASK_NONE;
00413         }
00414     }
00415 
00416     /* backup the last successful run */
00417     if (backup) {
00418         status = zone_backup(zone);
00419         if (status != ODS_STATUS_OK) {
00420             ods_log_warning("[%s[%i]] unable to backup zone %s: %s",
00421             worker2str(worker->type), worker->thread_num,
00422             task_who2str(task->who), ods_status2str(status));
00423             /* just a warning */
00424             status = ODS_STATUS_OK;
00425         }
00426         backup = 0;
00427     }
00428     return;
00429 
00430 task_perform_fail:
00431     /* in case of failure, also mark zone processed (for single run usage) */
00432     zone->processed = 1;
00433 
00434     if (task->backoff) {
00435         task->backoff *= 2;
00436         if (task->backoff > ODS_SE_MAX_BACKOFF) {
00437             task->backoff = ODS_SE_MAX_BACKOFF;
00438         }
00439     } else {
00440         task->backoff = 60;
00441     }
00442     ods_log_error("[%s[%i]] backoff task %s for zone %s with %u seconds",
00443         worker2str(worker->type), worker->thread_num,
00444         task_what2str(task->what), task_who2str(task->who), task->backoff);
00445 
00446     task->when = time_now() + task->backoff;
00447     return;
00448 
00449 task_perform_continue:
00450     ods_log_info("[%s[%i]] continue task %s for zone %s",
00451         worker2str(worker->type), worker->thread_num,
00452         task_what2str(task->halted), task_who2str(task->who));
00453 
00454     what = task->halted;
00455     task->what = what;
00456     task->when = time_now();
00457     task->interrupt = TASK_NONE;
00458     task->halted = TASK_NONE;
00459     if (zone->processed) {
00460         task->when += duration2time(zone->signconf->sig_resign_interval);
00461     }
00462     return;
00463 }
00464 
00465 
00470 static void
00471 worker_work(worker_type* worker)
00472 {
00473     time_t now, timeout = 1;
00474     zone_type* zone = NULL;
00475     ods_status status = ODS_STATUS_OK;
00476 
00477     ods_log_assert(worker);
00478     ods_log_assert(worker->type == WORKER_WORKER);
00479 
00480     while (worker->need_to_exit == 0) {
00481         ods_log_debug("[%s[%i]] report for duty", worker2str(worker->type),
00482             worker->thread_num);
00483         lock_basic_lock(&worker->engine->taskq->schedule_lock);
00484         /* [LOCK] schedule */
00485         worker->task = schedule_pop_task(worker->engine->taskq);
00486         /* [UNLOCK] schedule */
00487         if (worker->task) {
00488             worker->working_with = worker->task->what;
00489             lock_basic_unlock(&worker->engine->taskq->schedule_lock);
00490 
00491             zone = worker->task->zone;
00492             lock_basic_lock(&zone->zone_lock);
00493             /* [LOCK] zone */
00494             ods_log_debug("[%s[%i]] start working on zone %s",
00495                 worker2str(worker->type), worker->thread_num, zone->name);
00496 
00497             worker->clock_in = time(NULL);
00498             worker_perform_task(worker);
00499 
00500             zone->task = worker->task;
00501 
00502             ods_log_debug("[%s[%i]] finished working on zone %s",
00503                 worker2str(worker->type), worker->thread_num, zone->name);
00504             /* [UNLOCK] zone */
00505 
00506             lock_basic_lock(&worker->engine->taskq->schedule_lock);
00507             /* [LOCK] zone, schedule */
00508             worker->task = NULL;
00509             worker->working_with = TASK_NONE;
00510             status = schedule_task(worker->engine->taskq, zone->task, 1);
00511             /* [UNLOCK] zone, schedule */
00512             lock_basic_unlock(&worker->engine->taskq->schedule_lock);
00513             lock_basic_unlock(&zone->zone_lock);
00514 
00515             timeout = 1;
00516         } else {
00517             ods_log_debug("[%s[%i]] nothing to do", worker2str(worker->type),
00518                 worker->thread_num);
00519 
00520             /* [LOCK] schedule */
00521             worker->task = schedule_get_first_task(worker->engine->taskq);
00522             /* [UNLOCK] schedule */
00523             lock_basic_unlock(&worker->engine->taskq->schedule_lock);
00524 
00525             now = time_now();
00526             if (worker->task && !worker->engine->taskq->loading) {
00527                 timeout = (worker->task->when - now);
00528             } else {
00529                 timeout *= 2;
00530                 if (timeout > ODS_SE_MAX_BACKOFF) {
00531                     timeout = ODS_SE_MAX_BACKOFF;
00532                 }
00533             }
00534             worker->task = NULL;
00535             worker_sleep(worker, timeout);
00536         }
00537     }
00538     return;
00539 }
00540 
00541 
00546 static void
00547 worker_drudge(worker_type* worker)
00548 {
00549     zone_type* zone = NULL;
00550     task_type* task = NULL;
00551     rrset_type* rrset = NULL;
00552     ods_status status = ODS_STATUS_OK;
00553     worker_type* chief = NULL;
00554     hsm_ctx_t* ctx = NULL;
00555 
00556     ods_log_assert(worker);
00557     ods_log_assert(worker->type == WORKER_DRUDGER);
00558 
00559     ctx = hsm_create_context();
00560     if (ctx == NULL) {
00561         ods_log_error("[%s[%i]] unable to drudge: error "
00562             "creating libhsm context", worker2str(worker->type),
00563             worker->thread_num);
00564     }
00565 
00566     while (worker->need_to_exit == 0) {
00567         ods_log_debug("[%s[%i]] report for duty", worker2str(worker->type),
00568             worker->thread_num);
00569 
00570         lock_basic_lock(&worker->engine->signq->q_lock);
00571         /* [LOCK] schedule */
00572         rrset = (rrset_type*) fifoq_pop(worker->engine->signq, &chief);
00573         /* [UNLOCK] schedule */
00574         lock_basic_unlock(&worker->engine->signq->q_lock);
00575         if (rrset) {
00576             /* set up the work */
00577             if (chief) {
00578                 task = chief->task;
00579             }
00580             if (task) {
00581                 zone = task->zone;
00582             }
00583             if (!zone) {
00584                 ods_log_error("[%s[%i]] unable to drudge: no zone reference",
00585                     worker2str(worker->type), worker->thread_num);
00586             }
00587             if (zone && ctx) {
00588                 ods_log_assert(rrset);
00589                 ods_log_assert(zone);
00590                 ods_log_assert(zone->dname);
00591                 ods_log_assert(zone->signconf);
00592                 ods_log_assert(ctx);
00593 
00594                 worker->clock_in = time(NULL);
00595                 status = rrset_sign(ctx, rrset, zone->dname, zone->signconf,
00596                     chief->clock_in, zone->stats);
00597             } else {
00598                 status = ODS_STATUS_ASSERT_ERR;
00599             }
00600 
00601             if (chief) {
00602                 lock_basic_lock(&chief->worker_lock);
00603                 if (status == ODS_STATUS_OK) {
00604                     chief->jobs_completed += 1;
00605                 } else {
00606                     chief->jobs_failed += 1;
00607                     /* destroy context? */
00608                 }
00609                 lock_basic_unlock(&chief->worker_lock);
00610 
00611                 if (worker_fulfilled(chief) && chief->sleeping) {
00612                     worker_wakeup(chief);
00613                 }
00614             }
00615             rrset = NULL;
00616             zone = NULL;
00617             task = NULL;
00618             chief = NULL;
00619         } else {
00620             ods_log_debug("[%s[%i]] nothing to do", worker2str(worker->type),
00621                 worker->thread_num);
00622             worker_wait(&worker->engine->signq->q_lock,
00623                 &worker->engine->signq->q_threshold);
00624         }
00625     }
00626 
00627     /* cleanup open HSM sessions */
00628     hsm_destroy_context(ctx);
00629     ctx = NULL;
00630     return;
00631 }
00632 
00633 
00638 void
00639 worker_start(worker_type* worker)
00640 {
00641     ods_log_assert(worker);
00642     switch (worker->type) {
00643         case WORKER_DRUDGER:
00644             worker_drudge(worker);
00645             break;
00646         case WORKER_WORKER:
00647             worker_work(worker);
00648             break;
00649         default:
00650             ods_log_error("[worker] illegal worker (id=%i)", worker->type);
00651             return;
00652     }
00653     return;
00654 }
00655 
00656 
00661 void
00662 worker_sleep(worker_type* worker, time_t timeout)
00663 {
00664     ods_log_assert(worker);
00665     lock_basic_lock(&worker->worker_lock);
00666     /* [LOCK] worker */
00667     worker->sleeping = 1;
00668     lock_basic_sleep(&worker->worker_alarm, &worker->worker_lock,
00669         timeout);
00670     /* [UNLOCK] worker */
00671     lock_basic_unlock(&worker->worker_lock);
00672     return;
00673 }
00674 
00675 
00680 void
00681 worker_sleep_unless(worker_type* worker, time_t timeout)
00682 {
00683     ods_log_assert(worker);
00684     lock_basic_lock(&worker->worker_lock);
00685     /* [LOCK] worker */
00686     if (!worker_fulfilled(worker)) {
00687         worker->sleeping = 1;
00688         lock_basic_sleep(&worker->worker_alarm, &worker->worker_lock,
00689             timeout);
00690     }
00691     /* [UNLOCK] worker */
00692     lock_basic_unlock(&worker->worker_lock);
00693     return;
00694 }
00695 
00696 
00701 void
00702 worker_wakeup(worker_type* worker)
00703 {
00704     ods_log_assert(worker);
00705     if (worker && worker->sleeping && !worker->waiting) {
00706         ods_log_debug("[%s[%i]] wake up", worker2str(worker->type),
00707            worker->thread_num);
00708         lock_basic_lock(&worker->worker_lock);
00709         /* [LOCK] worker */
00710         lock_basic_alarm(&worker->worker_alarm);
00711         worker->sleeping = 0;
00712         /* [UNLOCK] worker */
00713         lock_basic_unlock(&worker->worker_lock);
00714     }
00715     return;
00716 }
00717 
00718 
00723 void
00724 worker_wait(lock_basic_type* lock, cond_basic_type* condition)
00725 {
00726     lock_basic_lock(lock);
00727     /* [LOCK] worker */
00728     lock_basic_sleep(condition, lock, 0);
00729     /* [UNLOCK] worker */
00730     lock_basic_unlock(lock);
00731     return;
00732 }
00733 
00734 
00739 void
00740 worker_notify(lock_basic_type* lock, cond_basic_type* condition)
00741 {
00742     lock_basic_lock(lock);
00743     /* [LOCK] lock */
00744     lock_basic_alarm(condition);
00745     /* [UNLOCK] lock */
00746     lock_basic_unlock(lock);
00747     return;
00748 }
00749 
00750 
00755 void
00756 worker_notify_all(lock_basic_type* lock, cond_basic_type* condition)
00757 {
00758     lock_basic_lock(lock);
00759     /* [LOCK] lock */
00760     lock_basic_broadcast(condition);
00761     /* [UNLOCK] lock */
00762     lock_basic_unlock(lock);
00763     return;
00764 }
00765 
00766 
00771 void
00772 worker_cleanup(worker_type* worker)
00773 {
00774     allocator_type* allocator;
00775     cond_basic_type worker_cond;
00776     lock_basic_type worker_lock;
00777 
00778     if (!worker) {
00779         return;
00780     }
00781     allocator = worker->allocator;
00782     worker_cond = worker->worker_alarm;
00783     worker_lock = worker->worker_lock;
00784 
00785     allocator_deallocate(allocator, (void*) worker);
00786     lock_basic_destroy(&worker_lock);
00787     lock_basic_off(&worker_cond);
00788     return;
00789 }