00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 #include "asterisk.h"
00032
00033 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 369013 $")
00034
00035 #ifdef DEBUG_SCHEDULER
00036 #define DEBUG(a) do { \
00037 if (option_debug) \
00038 DEBUG_M(a) \
00039 } while (0)
00040 #else
00041 #define DEBUG(a)
00042 #endif
00043
00044 #include <sys/time.h>
00045
00046 #include "asterisk/sched.h"
00047 #include "asterisk/channel.h"
00048 #include "asterisk/lock.h"
00049 #include "asterisk/utils.h"
00050 #include "asterisk/linkedlists.h"
00051 #include "asterisk/dlinkedlists.h"
00052 #include "asterisk/hashtab.h"
00053 #include "asterisk/heap.h"
00054 #include "asterisk/threadstorage.h"
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064 #define SCHED_MAX_CACHE 128
00065
00066 AST_THREADSTORAGE(last_del_id);
00067
00068 struct sched {
00069 AST_LIST_ENTRY(sched) list;
00070 int id;
00071 struct timeval when;
00072 int resched;
00073 int variable;
00074 const void *data;
00075 ast_sched_cb callback;
00076 ssize_t __heap_index;
00077 };
00078
00079 struct sched_thread {
00080 pthread_t thread;
00081 ast_cond_t cond;
00082 unsigned int stop:1;
00083 };
00084
00085 struct ast_sched_context {
00086 ast_mutex_t lock;
00087 unsigned int eventcnt;
00088 unsigned int schedcnt;
00089 unsigned int highwater;
00090 struct ast_hashtab *schedq_ht;
00091 struct ast_heap *sched_heap;
00092 struct sched_thread *sched_thread;
00093
00094 #ifdef SCHED_MAX_CACHE
00095 AST_LIST_HEAD_NOLOCK(, sched) schedc;
00096 unsigned int schedccnt;
00097 #endif
00098 };
00099
00100 static void *sched_run(void *data)
00101 {
00102 struct ast_sched_context *con = data;
00103
00104 while (!con->sched_thread->stop) {
00105 int ms;
00106 struct timespec ts = {
00107 .tv_sec = 0,
00108 };
00109
00110 ast_mutex_lock(&con->lock);
00111
00112 if (con->sched_thread->stop) {
00113 ast_mutex_unlock(&con->lock);
00114 return NULL;
00115 }
00116
00117 ms = ast_sched_wait(con);
00118
00119 if (ms == -1) {
00120 ast_cond_wait(&con->sched_thread->cond, &con->lock);
00121 } else {
00122 struct timeval tv;
00123 tv = ast_tvadd(ast_tvnow(), ast_samp2tv(ms, 1000));
00124 ts.tv_sec = tv.tv_sec;
00125 ts.tv_nsec = tv.tv_usec * 1000;
00126 ast_cond_timedwait(&con->sched_thread->cond, &con->lock, &ts);
00127 }
00128
00129 ast_mutex_unlock(&con->lock);
00130
00131 if (con->sched_thread->stop) {
00132 return NULL;
00133 }
00134
00135 ast_sched_runq(con);
00136 }
00137
00138 return NULL;
00139 }
00140
00141 static void sched_thread_destroy(struct ast_sched_context *con)
00142 {
00143 if (!con->sched_thread) {
00144 return;
00145 }
00146
00147 if (con->sched_thread->thread != AST_PTHREADT_NULL) {
00148 ast_mutex_lock(&con->lock);
00149 con->sched_thread->stop = 1;
00150 ast_cond_signal(&con->sched_thread->cond);
00151 ast_mutex_unlock(&con->lock);
00152 pthread_join(con->sched_thread->thread, NULL);
00153 con->sched_thread->thread = AST_PTHREADT_NULL;
00154 }
00155
00156 ast_cond_destroy(&con->sched_thread->cond);
00157
00158 ast_free(con->sched_thread);
00159
00160 con->sched_thread = NULL;
00161 }
00162
00163 int ast_sched_start_thread(struct ast_sched_context *con)
00164 {
00165 struct sched_thread *st;
00166
00167 if (con->sched_thread) {
00168 ast_log(LOG_ERROR, "Thread already started on this scheduler context\n");
00169 return -1;
00170 }
00171
00172 if (!(st = ast_calloc(1, sizeof(*st)))) {
00173 return -1;
00174 }
00175
00176 ast_cond_init(&st->cond, NULL);
00177
00178 st->thread = AST_PTHREADT_NULL;
00179
00180 con->sched_thread = st;
00181
00182 if (ast_pthread_create_background(&st->thread, NULL, sched_run, con)) {
00183 ast_log(LOG_ERROR, "Failed to create scheduler thread\n");
00184 sched_thread_destroy(con);
00185 return -1;
00186 }
00187
00188 return 0;
00189 }
00190
00191 static int sched_cmp(const void *a, const void *b)
00192 {
00193 const struct sched *as = a;
00194 const struct sched *bs = b;
00195 return as->id != bs->id;
00196 }
00197
00198 static unsigned int sched_hash(const void *obj)
00199 {
00200 const struct sched *s = obj;
00201 unsigned int h = s->id;
00202 return h;
00203 }
00204
00205 static int sched_time_cmp(void *a, void *b)
00206 {
00207 return ast_tvcmp(((struct sched *) b)->when, ((struct sched *) a)->when);
00208 }
00209
00210 struct ast_sched_context *ast_sched_context_create(void)
00211 {
00212 struct ast_sched_context *tmp;
00213
00214 if (!(tmp = ast_calloc(1, sizeof(*tmp)))) {
00215 return NULL;
00216 }
00217
00218 ast_mutex_init(&tmp->lock);
00219 tmp->eventcnt = 1;
00220
00221 tmp->schedq_ht = ast_hashtab_create(23, sched_cmp, ast_hashtab_resize_java, ast_hashtab_newsize_java, sched_hash, 1);
00222
00223 if (!(tmp->sched_heap = ast_heap_create(8, sched_time_cmp,
00224 offsetof(struct sched, __heap_index)))) {
00225 ast_sched_context_destroy(tmp);
00226 return NULL;
00227 }
00228
00229 return tmp;
00230 }
00231
00232 void ast_sched_context_destroy(struct ast_sched_context *con)
00233 {
00234 struct sched *s;
00235
00236 sched_thread_destroy(con);
00237 con->sched_thread = NULL;
00238
00239 ast_mutex_lock(&con->lock);
00240
00241 #ifdef SCHED_MAX_CACHE
00242 while ((s = AST_LIST_REMOVE_HEAD(&con->schedc, list))) {
00243 ast_free(s);
00244 }
00245 #endif
00246
00247 if (con->sched_heap) {
00248 while ((s = ast_heap_pop(con->sched_heap))) {
00249 ast_free(s);
00250 }
00251 ast_heap_destroy(con->sched_heap);
00252 con->sched_heap = NULL;
00253 }
00254
00255 ast_hashtab_destroy(con->schedq_ht, NULL);
00256 con->schedq_ht = NULL;
00257
00258 ast_mutex_unlock(&con->lock);
00259 ast_mutex_destroy(&con->lock);
00260
00261 ast_free(con);
00262 }
00263
00264 static struct sched *sched_alloc(struct ast_sched_context *con)
00265 {
00266 struct sched *tmp;
00267
00268
00269
00270
00271
00272 #ifdef SCHED_MAX_CACHE
00273 if ((tmp = AST_LIST_REMOVE_HEAD(&con->schedc, list)))
00274 con->schedccnt--;
00275 else
00276 #endif
00277 tmp = ast_calloc(1, sizeof(*tmp));
00278
00279 return tmp;
00280 }
00281
00282 static void sched_release(struct ast_sched_context *con, struct sched *tmp)
00283 {
00284
00285
00286
00287
00288
00289 #ifdef SCHED_MAX_CACHE
00290 if (con->schedccnt < SCHED_MAX_CACHE) {
00291 AST_LIST_INSERT_HEAD(&con->schedc, tmp, list);
00292 con->schedccnt++;
00293 } else
00294 #endif
00295 ast_free(tmp);
00296 }
00297
00298
00299
00300
00301
00302 int ast_sched_wait(struct ast_sched_context *con)
00303 {
00304 int ms;
00305 struct sched *s;
00306
00307 DEBUG(ast_debug(1, "ast_sched_wait()\n"));
00308
00309 ast_mutex_lock(&con->lock);
00310 if ((s = ast_heap_peek(con->sched_heap, 1))) {
00311 ms = ast_tvdiff_ms(s->when, ast_tvnow());
00312 if (ms < 0) {
00313 ms = 0;
00314 }
00315 } else {
00316 ms = -1;
00317 }
00318 ast_mutex_unlock(&con->lock);
00319
00320 return ms;
00321 }
00322
00323
00324
00325
00326
00327
00328
00329 static void schedule(struct ast_sched_context *con, struct sched *s)
00330 {
00331 ast_heap_push(con->sched_heap, s);
00332
00333 if (!ast_hashtab_insert_safe(con->schedq_ht, s)) {
00334 ast_log(LOG_WARNING,"Schedule Queue entry %d is already in table!\n", s->id);
00335 }
00336
00337 con->schedcnt++;
00338
00339 if (con->schedcnt > con->highwater) {
00340 con->highwater = con->schedcnt;
00341 }
00342 }
00343
00344
00345
00346
00347
00348 static int sched_settime(struct timeval *t, int when)
00349 {
00350 struct timeval now = ast_tvnow();
00351
00352
00353 if (ast_tvzero(*t))
00354 *t = now;
00355 *t = ast_tvadd(*t, ast_samp2tv(when, 1000));
00356 if (ast_tvcmp(*t, now) < 0) {
00357 *t = now;
00358 }
00359 return 0;
00360 }
00361
00362 int ast_sched_replace_variable(int old_id, struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
00363 {
00364
00365 if (old_id > 0) {
00366 AST_SCHED_DEL(con, old_id);
00367 }
00368 return ast_sched_add_variable(con, when, callback, data, variable);
00369 }
00370
00371
00372
00373
00374 int ast_sched_add_variable(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
00375 {
00376 struct sched *tmp;
00377 int res = -1;
00378
00379 DEBUG(ast_debug(1, "ast_sched_add()\n"));
00380
00381 ast_mutex_lock(&con->lock);
00382 if ((tmp = sched_alloc(con))) {
00383 tmp->id = con->eventcnt++;
00384 tmp->callback = callback;
00385 tmp->data = data;
00386 tmp->resched = when;
00387 tmp->variable = variable;
00388 tmp->when = ast_tv(0, 0);
00389 if (sched_settime(&tmp->when, when)) {
00390 sched_release(con, tmp);
00391 } else {
00392 schedule(con, tmp);
00393 res = tmp->id;
00394 }
00395 }
00396 #ifdef DUMP_SCHEDULER
00397
00398 if (option_debug)
00399 ast_sched_dump(con);
00400 #endif
00401 if (con->sched_thread) {
00402 ast_cond_signal(&con->sched_thread->cond);
00403 }
00404 ast_mutex_unlock(&con->lock);
00405
00406 return res;
00407 }
00408
00409 int ast_sched_replace(int old_id, struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data)
00410 {
00411 if (old_id > -1) {
00412 AST_SCHED_DEL(con, old_id);
00413 }
00414 return ast_sched_add(con, when, callback, data);
00415 }
00416
00417 int ast_sched_add(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data)
00418 {
00419 return ast_sched_add_variable(con, when, callback, data, 0);
00420 }
00421
00422 const void *ast_sched_find_data(struct ast_sched_context *con, int id)
00423 {
00424 struct sched tmp,*res;
00425 tmp.id = id;
00426 res = ast_hashtab_lookup(con->schedq_ht, &tmp);
00427 if (res)
00428 return res->data;
00429 return NULL;
00430 }
00431
00432
00433
00434
00435
00436
00437
00438 #ifndef AST_DEVMODE
00439 int ast_sched_del(struct ast_sched_context *con, int id)
00440 #else
00441 int _ast_sched_del(struct ast_sched_context *con, int id, const char *file, int line, const char *function)
00442 #endif
00443 {
00444 struct sched *s, tmp = {
00445 .id = id,
00446 };
00447 int *last_id = ast_threadstorage_get(&last_del_id, sizeof(int));
00448
00449 DEBUG(ast_debug(1, "ast_sched_del(%d)\n", id));
00450
00451 if (id < 0) {
00452 return 0;
00453 }
00454
00455 ast_mutex_lock(&con->lock);
00456 s = ast_hashtab_lookup(con->schedq_ht, &tmp);
00457 if (s) {
00458 if (!ast_heap_remove(con->sched_heap, s)) {
00459 ast_log(LOG_WARNING,"sched entry %d not in the sched heap?\n", s->id);
00460 }
00461
00462 if (!ast_hashtab_remove_this_object(con->schedq_ht, s)) {
00463 ast_log(LOG_WARNING,"Found sched entry %d, then couldn't remove it?\n", s->id);
00464 }
00465
00466 con->schedcnt--;
00467
00468 sched_release(con, s);
00469 }
00470
00471 #ifdef DUMP_SCHEDULER
00472
00473 if (option_debug)
00474 ast_sched_dump(con);
00475 #endif
00476 if (con->sched_thread) {
00477 ast_cond_signal(&con->sched_thread->cond);
00478 }
00479 ast_mutex_unlock(&con->lock);
00480
00481 if (!s && *last_id != id) {
00482 ast_debug(1, "Attempted to delete nonexistent schedule entry %d!\n", id);
00483 #ifndef AST_DEVMODE
00484 ast_assert(s != NULL);
00485 #else
00486 {
00487 char buf[100];
00488 snprintf(buf, sizeof(buf), "s != NULL, id=%d", id);
00489 _ast_assert(0, buf, file, line, function);
00490 }
00491 #endif
00492 *last_id = id;
00493 return -1;
00494 } else if (!s) {
00495 return -1;
00496 }
00497
00498 return 0;
00499 }
00500
00501 void ast_sched_report(struct ast_sched_context *con, struct ast_str **buf, struct ast_cb_names *cbnames)
00502 {
00503 int i, x;
00504 struct sched *cur;
00505 int countlist[cbnames->numassocs + 1];
00506 size_t heap_size;
00507
00508 memset(countlist, 0, sizeof(countlist));
00509 ast_str_set(buf, 0, " Highwater = %d\n schedcnt = %d\n", con->highwater, con->schedcnt);
00510
00511 ast_mutex_lock(&con->lock);
00512
00513 heap_size = ast_heap_size(con->sched_heap);
00514 for (x = 1; x <= heap_size; x++) {
00515 cur = ast_heap_peek(con->sched_heap, x);
00516
00517 for (i = 0; i < cbnames->numassocs; i++) {
00518 if (cur->callback == cbnames->cblist[i]) {
00519 break;
00520 }
00521 }
00522 if (i < cbnames->numassocs) {
00523 countlist[i]++;
00524 } else {
00525 countlist[cbnames->numassocs]++;
00526 }
00527 }
00528
00529 ast_mutex_unlock(&con->lock);
00530
00531 for (i = 0; i < cbnames->numassocs; i++) {
00532 ast_str_append(buf, 0, " %s : %d\n", cbnames->list[i], countlist[i]);
00533 }
00534
00535 ast_str_append(buf, 0, " <unknown> : %d\n", countlist[cbnames->numassocs]);
00536 }
00537
00538
00539 void ast_sched_dump(struct ast_sched_context *con)
00540 {
00541 struct sched *q;
00542 struct timeval when = ast_tvnow();
00543 int x;
00544 size_t heap_size;
00545 #ifdef SCHED_MAX_CACHE
00546 ast_debug(1, "Asterisk Schedule Dump (%d in Q, %d Total, %d Cache, %d high-water)\n", con->schedcnt, con->eventcnt - 1, con->schedccnt, con->highwater);
00547 #else
00548 ast_debug(1, "Asterisk Schedule Dump (%d in Q, %d Total, %d high-water)\n", con->schedcnt, con->eventcnt - 1, con->highwater);
00549 #endif
00550
00551 ast_debug(1, "=============================================================\n");
00552 ast_debug(1, "|ID Callback Data Time (sec:ms) |\n");
00553 ast_debug(1, "+-----+-----------------+-----------------+-----------------+\n");
00554 ast_mutex_lock(&con->lock);
00555 heap_size = ast_heap_size(con->sched_heap);
00556 for (x = 1; x <= heap_size; x++) {
00557 struct timeval delta;
00558 q = ast_heap_peek(con->sched_heap, x);
00559 delta = ast_tvsub(q->when, when);
00560 ast_debug(1, "|%.4d | %-15p | %-15p | %.6ld : %.6ld |\n",
00561 q->id,
00562 q->callback,
00563 q->data,
00564 (long)delta.tv_sec,
00565 (long int)delta.tv_usec);
00566 }
00567 ast_mutex_unlock(&con->lock);
00568 ast_debug(1, "=============================================================\n");
00569 }
00570
00571
00572
00573
00574 int ast_sched_runq(struct ast_sched_context *con)
00575 {
00576 struct sched *current;
00577 struct timeval when;
00578 int numevents;
00579 int res;
00580
00581 DEBUG(ast_debug(1, "ast_sched_runq()\n"));
00582
00583 ast_mutex_lock(&con->lock);
00584
00585 when = ast_tvadd(ast_tvnow(), ast_tv(0, 1000));
00586 for (numevents = 0; (current = ast_heap_peek(con->sched_heap, 1)); numevents++) {
00587
00588
00589
00590
00591
00592 if (ast_tvcmp(current->when, when) != -1) {
00593 break;
00594 }
00595
00596 current = ast_heap_pop(con->sched_heap);
00597
00598 if (!ast_hashtab_remove_this_object(con->schedq_ht, current)) {
00599 ast_log(LOG_ERROR,"Sched entry %d was in the schedq list but not in the hashtab???\n", current->id);
00600 }
00601
00602 con->schedcnt--;
00603
00604
00605
00606
00607
00608
00609
00610
00611
00612
00613 ast_mutex_unlock(&con->lock);
00614 res = current->callback(current->data);
00615 ast_mutex_lock(&con->lock);
00616
00617 if (res) {
00618
00619
00620
00621
00622 if (sched_settime(¤t->when, current->variable? res : current->resched)) {
00623 sched_release(con, current);
00624 } else {
00625 schedule(con, current);
00626 }
00627 } else {
00628
00629 sched_release(con, current);
00630 }
00631 }
00632
00633 ast_mutex_unlock(&con->lock);
00634
00635 return numevents;
00636 }
00637
00638 long ast_sched_when(struct ast_sched_context *con,int id)
00639 {
00640 struct sched *s, tmp;
00641 long secs = -1;
00642 DEBUG(ast_debug(1, "ast_sched_when()\n"));
00643
00644 ast_mutex_lock(&con->lock);
00645
00646
00647 tmp.id = id;
00648 s = ast_hashtab_lookup(con->schedq_ht, &tmp);
00649
00650 if (s) {
00651 struct timeval now = ast_tvnow();
00652 secs = s->when.tv_sec - now.tv_sec;
00653 }
00654 ast_mutex_unlock(&con->lock);
00655
00656 return secs;
00657 }