00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #include "asterisk.h"
00031
00032 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 377839 $")
00033
00034 #include "asterisk/_private.h"
00035 #include "asterisk/module.h"
00036 #include "asterisk/time.h"
00037 #include "asterisk/astobj2.h"
00038 #include "asterisk/cli.h"
00039 #include "asterisk/taskprocessor.h"
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049 struct tps_task {
00050
00051 int (*execute)(void *datap);
00052
00053 void *datap;
00054
00055 AST_LIST_ENTRY(tps_task) list;
00056 };
00057
00058
00059 struct tps_taskprocessor_stats {
00060
00061 unsigned long max_qsize;
00062
00063 unsigned long _tasks_processed_count;
00064 };
00065
00066
00067 struct ast_taskprocessor {
00068
00069 const char *name;
00070
00071 ast_cond_t poll_cond;
00072
00073 pthread_t poll_thread;
00074
00075 ast_mutex_t taskprocessor_lock;
00076
00077 unsigned char poll_thread_run;
00078
00079 struct tps_taskprocessor_stats *stats;
00080
00081 long tps_queue_size;
00082
00083 AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue;
00084
00085 AST_LIST_ENTRY(ast_taskprocessor) list;
00086 };
00087 #define TPS_MAX_BUCKETS 7
00088
00089 static struct ao2_container *tps_singletons;
00090
00091
00092 static ast_cond_t cli_ping_cond;
00093
00094
00095 AST_MUTEX_DEFINE_STATIC(cli_ping_cond_lock);
00096
00097
00098 static int tps_hash_cb(const void *obj, const int flags);
00099
00100 static int tps_cmp_cb(void *obj, void *arg, int flags);
00101
00102
00103 static void *tps_processing_function(void *data);
00104
00105
00106 static void tps_taskprocessor_destroy(void *tps);
00107
00108
00109 static int tps_ping_handler(void *datap);
00110
00111
00112 static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps);
00113
00114
00115 static int tps_taskprocessor_depth(struct ast_taskprocessor *tps);
00116
00117 static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
00118 static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
00119
00120 static struct ast_cli_entry taskprocessor_clis[] = {
00121 AST_CLI_DEFINE(cli_tps_ping, "Ping a named task processor"),
00122 AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
00123 };
00124
00125
00126
00127
00128
00129 static void tps_shutdown(void)
00130 {
00131 ast_cli_unregister_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
00132 ao2_t_ref(tps_singletons, -1, "Unref tps_singletons in shutdown");
00133 tps_singletons = NULL;
00134 }
00135
00136
00137 int ast_tps_init(void)
00138 {
00139 if (!(tps_singletons = ao2_container_alloc(TPS_MAX_BUCKETS, tps_hash_cb, tps_cmp_cb))) {
00140 ast_log(LOG_ERROR, "taskprocessor container failed to initialize!\n");
00141 return -1;
00142 }
00143
00144 ast_cond_init(&cli_ping_cond, NULL);
00145
00146 ast_cli_register_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
00147
00148 ast_register_atexit(tps_shutdown);
00149
00150 return 0;
00151 }
00152
00153
00154 static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap)
00155 {
00156 struct tps_task *t;
00157 if ((t = ast_calloc(1, sizeof(*t)))) {
00158 t->execute = task_exe;
00159 t->datap = datap;
00160 }
00161 return t;
00162 }
00163
00164
00165 static void *tps_task_free(struct tps_task *task)
00166 {
00167 if (task) {
00168 ast_free(task);
00169 }
00170 return NULL;
00171 }
00172
00173
00174 static char *tps_taskprocessor_tab_complete(struct ast_taskprocessor *p, struct ast_cli_args *a)
00175 {
00176 int tklen;
00177 int wordnum = 0;
00178 char *name = NULL;
00179 struct ao2_iterator i;
00180
00181 if (a->pos != 3)
00182 return NULL;
00183
00184 tklen = strlen(a->word);
00185 i = ao2_iterator_init(tps_singletons, 0);
00186 while ((p = ao2_iterator_next(&i))) {
00187 if (!strncasecmp(a->word, p->name, tklen) && ++wordnum > a->n) {
00188 name = ast_strdup(p->name);
00189 ao2_ref(p, -1);
00190 break;
00191 }
00192 ao2_ref(p, -1);
00193 }
00194 ao2_iterator_destroy(&i);
00195 return name;
00196 }
00197
00198
00199 static int tps_ping_handler(void *datap)
00200 {
00201 ast_mutex_lock(&cli_ping_cond_lock);
00202 ast_cond_signal(&cli_ping_cond);
00203 ast_mutex_unlock(&cli_ping_cond_lock);
00204 return 0;
00205 }
00206
00207
00208 static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
00209 {
00210 struct timeval begin, end, delta;
00211 const char *name;
00212 struct timeval when;
00213 struct timespec ts;
00214 struct ast_taskprocessor *tps = NULL;
00215
00216 switch (cmd) {
00217 case CLI_INIT:
00218 e->command = "core ping taskprocessor";
00219 e->usage =
00220 "Usage: core ping taskprocessor <taskprocessor>\n"
00221 " Displays the time required for a task to be processed\n";
00222 return NULL;
00223 case CLI_GENERATE:
00224 return tps_taskprocessor_tab_complete(tps, a);
00225 }
00226
00227 if (a->argc != 4)
00228 return CLI_SHOWUSAGE;
00229
00230 name = a->argv[3];
00231 if (!(tps = ast_taskprocessor_get(name, TPS_REF_IF_EXISTS))) {
00232 ast_cli(a->fd, "\nping failed: %s not found\n\n", name);
00233 return CLI_SUCCESS;
00234 }
00235 ast_cli(a->fd, "\npinging %s ...", name);
00236 when = ast_tvadd((begin = ast_tvnow()), ast_samp2tv(1000, 1000));
00237 ts.tv_sec = when.tv_sec;
00238 ts.tv_nsec = when.tv_usec * 1000;
00239 ast_mutex_lock(&cli_ping_cond_lock);
00240 if (ast_taskprocessor_push(tps, tps_ping_handler, 0) < 0) {
00241 ast_cli(a->fd, "\nping failed: could not push task to %s\n\n", name);
00242 ao2_ref(tps, -1);
00243 return CLI_FAILURE;
00244 }
00245 ast_cond_timedwait(&cli_ping_cond, &cli_ping_cond_lock, &ts);
00246 ast_mutex_unlock(&cli_ping_cond_lock);
00247 end = ast_tvnow();
00248 delta = ast_tvsub(end, begin);
00249 ast_cli(a->fd, "\n\t%24s ping time: %.1ld.%.6ld sec\n\n", name, (long)delta.tv_sec, (long int)delta.tv_usec);
00250 ao2_ref(tps, -1);
00251 return CLI_SUCCESS;
00252 }
00253
00254 static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
00255 {
00256 char name[256];
00257 int tcount;
00258 unsigned long qsize;
00259 unsigned long maxqsize;
00260 unsigned long processed;
00261 struct ast_taskprocessor *p;
00262 struct ao2_iterator i;
00263
00264 switch (cmd) {
00265 case CLI_INIT:
00266 e->command = "core show taskprocessors";
00267 e->usage =
00268 "Usage: core show taskprocessors\n"
00269 " Shows a list of instantiated task processors and their statistics\n";
00270 return NULL;
00271 case CLI_GENERATE:
00272 return NULL;
00273 }
00274
00275 if (a->argc != e->args)
00276 return CLI_SHOWUSAGE;
00277
00278 ast_cli(a->fd, "\n\t+----- Processor -----+--- Processed ---+- In Queue -+- Max Depth -+");
00279 i = ao2_iterator_init(tps_singletons, 0);
00280 while ((p = ao2_iterator_next(&i))) {
00281 ast_copy_string(name, p->name, sizeof(name));
00282 qsize = p->tps_queue_size;
00283 maxqsize = p->stats->max_qsize;
00284 processed = p->stats->_tasks_processed_count;
00285 ast_cli(a->fd, "\n%24s %17ld %12ld %12ld", name, processed, qsize, maxqsize);
00286 ao2_ref(p, -1);
00287 }
00288 ao2_iterator_destroy(&i);
00289 tcount = ao2_container_count(tps_singletons);
00290 ast_cli(a->fd, "\n\t+---------------------+-----------------+------------+-------------+\n\t%d taskprocessors\n\n", tcount);
00291 return CLI_SUCCESS;
00292 }
00293
00294
00295 static void *tps_processing_function(void *data)
00296 {
00297 struct ast_taskprocessor *i = data;
00298 struct tps_task *t;
00299 int size;
00300
00301 if (!i) {
00302 ast_log(LOG_ERROR, "cannot start thread_function loop without a ast_taskprocessor structure.\n");
00303 return NULL;
00304 }
00305
00306 while (i->poll_thread_run) {
00307 ast_mutex_lock(&i->taskprocessor_lock);
00308 if (!i->poll_thread_run) {
00309 ast_mutex_unlock(&i->taskprocessor_lock);
00310 break;
00311 }
00312 if (!(size = tps_taskprocessor_depth(i))) {
00313 ast_cond_wait(&i->poll_cond, &i->taskprocessor_lock);
00314 if (!i->poll_thread_run) {
00315 ast_mutex_unlock(&i->taskprocessor_lock);
00316 break;
00317 }
00318 }
00319 ast_mutex_unlock(&i->taskprocessor_lock);
00320
00321 if (!(t = tps_taskprocessor_pop(i))) {
00322 ast_log(LOG_ERROR, "Wtf?? %d tasks in the queue, but we're popping blanks!\n", size);
00323 continue;
00324 }
00325 if (!t->execute) {
00326 ast_log(LOG_WARNING, "Task is missing a function to execute!\n");
00327 tps_task_free(t);
00328 continue;
00329 }
00330 t->execute(t->datap);
00331
00332 ast_mutex_lock(&i->taskprocessor_lock);
00333 if (i->stats) {
00334 i->stats->_tasks_processed_count++;
00335 if (size > i->stats->max_qsize) {
00336 i->stats->max_qsize = size;
00337 }
00338 }
00339 ast_mutex_unlock(&i->taskprocessor_lock);
00340
00341 tps_task_free(t);
00342 }
00343 while ((t = tps_taskprocessor_pop(i))) {
00344 tps_task_free(t);
00345 }
00346 return NULL;
00347 }
00348
00349
00350 static int tps_hash_cb(const void *obj, const int flags)
00351 {
00352 const struct ast_taskprocessor *tps = obj;
00353
00354 return ast_str_case_hash(tps->name);
00355 }
00356
00357
00358 static int tps_cmp_cb(void *obj, void *arg, int flags)
00359 {
00360 struct ast_taskprocessor *lhs = obj, *rhs = arg;
00361
00362 return !strcasecmp(lhs->name, rhs->name) ? CMP_MATCH | CMP_STOP : 0;
00363 }
00364
00365
00366 static void tps_taskprocessor_destroy(void *tps)
00367 {
00368 struct ast_taskprocessor *t = tps;
00369
00370 if (!tps) {
00371 ast_log(LOG_ERROR, "missing taskprocessor\n");
00372 return;
00373 }
00374 ast_debug(1, "destroying taskprocessor '%s'\n", t->name);
00375
00376 ast_mutex_lock(&t->taskprocessor_lock);
00377 t->poll_thread_run = 0;
00378 ast_cond_signal(&t->poll_cond);
00379 ast_mutex_unlock(&t->taskprocessor_lock);
00380 pthread_join(t->poll_thread, NULL);
00381 t->poll_thread = AST_PTHREADT_NULL;
00382 ast_mutex_destroy(&t->taskprocessor_lock);
00383 ast_cond_destroy(&t->poll_cond);
00384
00385 if (t->stats) {
00386 ast_free(t->stats);
00387 t->stats = NULL;
00388 }
00389 ast_free((char *) t->name);
00390 }
00391
00392
00393 static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
00394 {
00395 struct tps_task *task;
00396
00397 if (!tps) {
00398 ast_log(LOG_ERROR, "missing taskprocessor\n");
00399 return NULL;
00400 }
00401 ast_mutex_lock(&tps->taskprocessor_lock);
00402 if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
00403 tps->tps_queue_size--;
00404 }
00405 ast_mutex_unlock(&tps->taskprocessor_lock);
00406 return task;
00407 }
00408
00409 static int tps_taskprocessor_depth(struct ast_taskprocessor *tps)
00410 {
00411 return (tps) ? tps->tps_queue_size : -1;
00412 }
00413
00414
00415 const char *ast_taskprocessor_name(struct ast_taskprocessor *tps)
00416 {
00417 if (!tps) {
00418 ast_log(LOG_ERROR, "no taskprocessor specified!\n");
00419 return NULL;
00420 }
00421 return tps->name;
00422 }
00423
00424
00425
00426
00427 struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_options create)
00428 {
00429 struct ast_taskprocessor *p, tmp_tps = {
00430 .name = name,
00431 };
00432
00433 if (ast_strlen_zero(name)) {
00434 ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
00435 return NULL;
00436 }
00437 ao2_lock(tps_singletons);
00438 p = ao2_find(tps_singletons, &tmp_tps, OBJ_POINTER);
00439 if (p) {
00440 ao2_unlock(tps_singletons);
00441 return p;
00442 }
00443 if (create & TPS_REF_IF_EXISTS) {
00444
00445 ao2_unlock(tps_singletons);
00446 return NULL;
00447 }
00448
00449 if (!(p = ao2_alloc(sizeof(*p), tps_taskprocessor_destroy))) {
00450 ao2_unlock(tps_singletons);
00451 ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
00452 return NULL;
00453 }
00454
00455 ast_cond_init(&p->poll_cond, NULL);
00456 ast_mutex_init(&p->taskprocessor_lock);
00457
00458 if (!(p->stats = ast_calloc(1, sizeof(*p->stats)))) {
00459 ao2_unlock(tps_singletons);
00460 ast_log(LOG_WARNING, "failed to create taskprocessor stats for '%s'\n", name);
00461 ao2_ref(p, -1);
00462 return NULL;
00463 }
00464 if (!(p->name = ast_strdup(name))) {
00465 ao2_unlock(tps_singletons);
00466 ao2_ref(p, -1);
00467 return NULL;
00468 }
00469 p->poll_thread_run = 1;
00470 p->poll_thread = AST_PTHREADT_NULL;
00471 if (ast_pthread_create(&p->poll_thread, NULL, tps_processing_function, p) < 0) {
00472 ao2_unlock(tps_singletons);
00473 ast_log(LOG_ERROR, "Taskprocessor '%s' failed to create the processing thread.\n", p->name);
00474 ao2_ref(p, -1);
00475 return NULL;
00476 }
00477 if (!(ao2_link(tps_singletons, p))) {
00478 ao2_unlock(tps_singletons);
00479 ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
00480 ao2_ref(p, -1);
00481 return NULL;
00482 }
00483 ao2_unlock(tps_singletons);
00484 return p;
00485 }
00486
00487
00488 void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
00489 {
00490 if (tps) {
00491 ao2_lock(tps_singletons);
00492 ao2_unlink(tps_singletons, tps);
00493 if (ao2_ref(tps, -1) > 1) {
00494 ao2_link(tps_singletons, tps);
00495 }
00496 ao2_unlock(tps_singletons);
00497 }
00498 return NULL;
00499 }
00500
00501
00502 int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
00503 {
00504 struct tps_task *t;
00505
00506 if (!tps || !task_exe) {
00507 ast_log(LOG_ERROR, "%s is missing!!\n", (tps) ? "task callback" : "taskprocessor");
00508 return -1;
00509 }
00510 if (!(t = tps_task_alloc(task_exe, datap))) {
00511 ast_log(LOG_ERROR, "failed to allocate task! Can't push to '%s'\n", tps->name);
00512 return -1;
00513 }
00514 ast_mutex_lock(&tps->taskprocessor_lock);
00515 AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
00516 tps->tps_queue_size++;
00517 ast_cond_signal(&tps->poll_cond);
00518 ast_mutex_unlock(&tps->taskprocessor_lock);
00519 return 0;
00520 }
00521