00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 #include "asterisk.h"
00033
00034 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 370535 $");
00035
00036 #include <corosync/cpg.h>
00037 #include <corosync/cfg.h>
00038
00039 #include "asterisk/module.h"
00040 #include "asterisk/logger.h"
00041 #include "asterisk/poll-compat.h"
00042 #include "asterisk/config.h"
00043 #include "asterisk/event.h"
00044 #include "asterisk/cli.h"
00045 #include "asterisk/devicestate.h"
00046
00047 AST_RWLOCK_DEFINE_STATIC(event_types_lock);
00048
00049 static struct {
00050 const char *name;
00051 struct ast_event_sub *sub;
00052 unsigned char publish;
00053 unsigned char publish_default;
00054 unsigned char subscribe;
00055 unsigned char subscribe_default;
00056 } event_types[] = {
00057 [AST_EVENT_MWI] = { .name = "mwi", },
00058 [AST_EVENT_DEVICE_STATE_CHANGE] = { .name = "device_state", },
00059 [AST_EVENT_PING] = { .name = "ping", .publish_default = 1, .subscribe_default = 1 },
00060 };
00061
00062 static struct {
00063 pthread_t id;
00064 int alert_pipe[2];
00065 unsigned int stop:1;
00066 } dispatch_thread = {
00067 .id = AST_PTHREADT_NULL,
00068 .alert_pipe = { -1, -1 },
00069 };
00070
00071 static cpg_handle_t cpg_handle;
00072 static corosync_cfg_handle_t cfg_handle;
00073
00074 #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
00075 static void cfg_state_track_cb(
00076 corosync_cfg_state_notification_buffer_t *notification_buffer,
00077 cs_error_t error);
00078 #endif
00079
00080 static void cfg_shutdown_cb(corosync_cfg_handle_t cfg_handle,
00081 corosync_cfg_shutdown_flags_t flags);
00082
00083 static corosync_cfg_callbacks_t cfg_callbacks = {
00084 #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
00085 .corosync_cfg_state_track_callback = cfg_state_track_cb,
00086 #endif
00087 .corosync_cfg_shutdown_callback = cfg_shutdown_cb,
00088 };
00089
00090 static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name,
00091 uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len);
00092
00093 static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name,
00094 const struct cpg_address *member_list, size_t member_list_entries,
00095 const struct cpg_address *left_list, size_t left_list_entries,
00096 const struct cpg_address *joined_list, size_t joined_list_entries);
00097
00098 static cpg_callbacks_t cpg_callbacks = {
00099 .cpg_deliver_fn = cpg_deliver_cb,
00100 .cpg_confchg_fn = cpg_confchg_cb,
00101 };
00102
00103 static void ast_event_cb(const struct ast_event *event, void *data);
00104
00105 #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
00106 static void cfg_state_track_cb(
00107 corosync_cfg_state_notification_buffer_t *notification_buffer,
00108 cs_error_t error)
00109 {
00110 }
00111 #endif
00112
00113 static void cfg_shutdown_cb(corosync_cfg_handle_t cfg_handle,
00114 corosync_cfg_shutdown_flags_t flags)
00115 {
00116 }
00117
00118 static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name,
00119 uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
00120 {
00121 struct ast_event *event;
00122
00123 if (msg_len < ast_event_minimum_length()) {
00124 ast_debug(1, "Ignoring event that's too small. %u < %u\n",
00125 (unsigned int) msg_len,
00126 (unsigned int) ast_event_minimum_length());
00127 return;
00128 }
00129
00130 if (!ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(msg, AST_EVENT_IE_EID))) {
00131
00132 return;
00133 }
00134
00135 ast_rwlock_rdlock(&event_types_lock);
00136 if (!event_types[ast_event_get_type(msg)].subscribe) {
00137
00138 ast_rwlock_unlock(&event_types_lock);
00139 return;
00140 }
00141 ast_rwlock_unlock(&event_types_lock);
00142
00143 if (!(event = ast_malloc(msg_len))) {
00144 return;
00145 }
00146
00147 memcpy(event, msg, msg_len);
00148
00149 if (ast_event_get_type(event) == AST_EVENT_PING) {
00150 const struct ast_eid *eid;
00151 char buf[128] = "";
00152
00153 eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
00154 ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid);
00155 ast_log(LOG_NOTICE, "(cpg_deliver_cb) Got event PING from server with EID: '%s'\n", buf);
00156
00157 ast_event_queue(event);
00158 } else {
00159 ast_event_queue_and_cache(event);
00160 }
00161 }
00162
00163 static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name,
00164 const struct cpg_address *member_list, size_t member_list_entries,
00165 const struct cpg_address *left_list, size_t left_list_entries,
00166 const struct cpg_address *joined_list, size_t joined_list_entries)
00167 {
00168 unsigned int i;
00169
00170
00171
00172
00173 if (!joined_list_entries) {
00174 return;
00175 }
00176
00177 for (i = 0; i < ARRAY_LEN(event_types); i++) {
00178 struct ast_event_sub *event_sub;
00179
00180 ast_rwlock_rdlock(&event_types_lock);
00181 if (!event_types[i].publish) {
00182 ast_rwlock_unlock(&event_types_lock);
00183 continue;
00184 }
00185 ast_rwlock_unlock(&event_types_lock);
00186
00187 event_sub = ast_event_subscribe_new(i, ast_event_cb, NULL);
00188 ast_event_sub_append_ie_raw(event_sub, AST_EVENT_IE_EID,
00189 &ast_eid_default, sizeof(ast_eid_default));
00190 ast_event_dump_cache(event_sub);
00191 ast_event_sub_destroy(event_sub);
00192 }
00193 }
00194
00195 static void *dispatch_thread_handler(void *data)
00196 {
00197 cs_error_t cs_err;
00198 struct pollfd pfd[3] = {
00199 { .events = POLLIN, },
00200 { .events = POLLIN, },
00201 { .events = POLLIN, },
00202 };
00203
00204 if ((cs_err = cpg_fd_get(cpg_handle, &pfd[0].fd)) != CS_OK) {
00205 ast_log(LOG_ERROR, "Failed to get CPG fd. This module is now broken.\n");
00206 return NULL;
00207 }
00208
00209 if ((cs_err = corosync_cfg_fd_get(cfg_handle, &pfd[1].fd)) != CS_OK) {
00210 ast_log(LOG_ERROR, "Failed to get CFG fd. This module is now broken.\n");
00211 return NULL;
00212 }
00213
00214 pfd[2].fd = dispatch_thread.alert_pipe[0];
00215
00216 while (!dispatch_thread.stop) {
00217 int res;
00218
00219 cs_err = CS_OK;
00220
00221 pfd[0].revents = 0;
00222 pfd[1].revents = 0;
00223 pfd[2].revents = 0;
00224
00225 res = ast_poll(pfd, ARRAY_LEN(pfd), -1);
00226 if (res == -1 && errno != EINTR && errno != EAGAIN) {
00227 ast_log(LOG_ERROR, "poll() error: %s (%d)\n", strerror(errno), errno);
00228 continue;
00229 }
00230
00231 if (pfd[0].revents & POLLIN) {
00232 if ((cs_err = cpg_dispatch(cpg_handle, CS_DISPATCH_ALL)) != CS_OK) {
00233 ast_log(LOG_WARNING, "Failed CPG dispatch: %d\n", cs_err);
00234 }
00235 }
00236
00237 if (pfd[1].revents & POLLIN) {
00238 if ((cs_err = corosync_cfg_dispatch(cfg_handle, CS_DISPATCH_ALL)) != CS_OK) {
00239 ast_log(LOG_WARNING, "Failed CFG dispatch: %d\n", cs_err);
00240 }
00241 }
00242
00243 if (cs_err == CS_ERR_LIBRARY || cs_err == CS_ERR_BAD_HANDLE) {
00244 struct cpg_name name;
00245
00246
00247
00248 ast_log(LOG_NOTICE, "Attempting to recover from corosync failure.\n");
00249
00250 if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) {
00251 ast_log(LOG_ERROR, "Failed to initialize cfg (%d)\n", (int) cs_err);
00252 sleep(5);
00253 continue;
00254 }
00255
00256 if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks) != CS_OK)) {
00257 ast_log(LOG_ERROR, "Failed to initialize cpg (%d)\n", (int) cs_err);
00258 sleep(5);
00259 continue;
00260 }
00261
00262 if ((cs_err = cpg_fd_get(cpg_handle, &pfd[0].fd)) != CS_OK) {
00263 ast_log(LOG_ERROR, "Failed to get CPG fd.\n");
00264 sleep(5);
00265 continue;
00266 }
00267
00268 if ((cs_err = corosync_cfg_fd_get(cfg_handle, &pfd[1].fd)) != CS_OK) {
00269 ast_log(LOG_ERROR, "Failed to get CFG fd.\n");
00270 sleep(5);
00271 continue;
00272 }
00273
00274 ast_copy_string(name.value, "asterisk", sizeof(name.value));
00275 name.length = strlen(name.value);
00276 if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) {
00277 ast_log(LOG_ERROR, "Failed to join cpg (%d)\n", (int) cs_err);
00278 sleep(5);
00279 continue;
00280 }
00281
00282 ast_log(LOG_NOTICE, "Corosync recovery complete.\n");
00283 }
00284 }
00285
00286 return NULL;
00287 }
00288
00289 static void ast_event_cb(const struct ast_event *event, void *data)
00290 {
00291 cs_error_t cs_err;
00292 struct iovec iov = {
00293 .iov_base = (void *) event,
00294 .iov_len = ast_event_get_size(event),
00295 };
00296
00297 if (ast_event_get_type(event) == AST_EVENT_PING) {
00298 const struct ast_eid *eid;
00299 char buf[128] = "";
00300
00301 eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
00302 ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid);
00303 ast_log(LOG_NOTICE, "(ast_event_cb) Got event PING from server with EID: '%s'\n", buf);
00304 }
00305
00306 if (ast_eid_cmp(&ast_eid_default,
00307 ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) {
00308
00309 return;
00310 }
00311
00312
00313
00314
00315 if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) {
00316 ast_log(LOG_WARNING, "CPG mcast failed (%d)\n", cs_err);
00317 }
00318 }
00319
00320 static char *corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
00321 {
00322 cs_error_t cs_err;
00323 cpg_iteration_handle_t cpg_iter;
00324 struct cpg_iteration_description_t cpg_desc;
00325 unsigned int i;
00326
00327 switch (cmd) {
00328 case CLI_INIT:
00329 e->command = "corosync show members";
00330 e->usage =
00331 "Usage: corosync show members\n"
00332 " Show corosync cluster members\n";
00333 return NULL;
00334
00335 case CLI_GENERATE:
00336 return NULL;
00337 }
00338
00339 if (a->argc != e->args) {
00340 return CLI_SHOWUSAGE;
00341 }
00342
00343 cs_err = cpg_iteration_initialize(cpg_handle, CPG_ITERATION_ALL, NULL, &cpg_iter);
00344
00345 if (cs_err != CS_OK) {
00346 ast_cli(a->fd, "Failed to initialize CPG iterator.\n");
00347 return CLI_FAILURE;
00348 }
00349
00350 ast_cli(a->fd, "\n"
00351 "=============================================================\n"
00352 "=== Cluster members =========================================\n"
00353 "=============================================================\n"
00354 "===\n");
00355
00356 for (i = 1, cs_err = cpg_iteration_next(cpg_iter, &cpg_desc);
00357 cs_err == CS_OK;
00358 cs_err = cpg_iteration_next(cpg_iter, &cpg_desc), i++) {
00359 corosync_cfg_node_address_t addrs[8];
00360 int num_addrs = 0;
00361 unsigned int j;
00362
00363 cs_err = corosync_cfg_get_node_addrs(cfg_handle, cpg_desc.nodeid,
00364 ARRAY_LEN(addrs), &num_addrs, addrs);
00365 if (cs_err != CS_OK) {
00366 ast_log(LOG_WARNING, "Failed to get node addresses\n");
00367 continue;
00368 }
00369
00370 ast_cli(a->fd, "=== Node %d\n", i);
00371 ast_cli(a->fd, "=== --> Group: %s\n", cpg_desc.group.value);
00372
00373 for (j = 0; j < num_addrs; j++) {
00374 struct sockaddr *sa = (struct sockaddr *) addrs[j].address;
00375 size_t sa_len = (size_t) addrs[j].address_length;
00376 char buf[128];
00377
00378 getnameinfo(sa, sa_len, buf, sizeof(buf), NULL, 0, NI_NUMERICHOST);
00379
00380 ast_cli(a->fd, "=== --> Address %d: %s\n", j + 1, buf);
00381 }
00382
00383 }
00384
00385 ast_cli(a->fd, "===\n"
00386 "=============================================================\n"
00387 "\n");
00388
00389 cpg_iteration_finalize(cpg_iter);
00390
00391 return CLI_SUCCESS;
00392 }
00393
00394 static char *corosync_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
00395 {
00396 struct ast_event *event;
00397
00398 switch (cmd) {
00399 case CLI_INIT:
00400 e->command = "corosync ping";
00401 e->usage =
00402 "Usage: corosync ping\n"
00403 " Send a test ping to the cluster.\n"
00404 "A NOTICE will be in the log for every ping received\n"
00405 "on a server.\n If you send a ping, you should see a NOTICE\n"
00406 "in the log for every server in the cluster.\n";
00407 return NULL;
00408
00409 case CLI_GENERATE:
00410 return NULL;
00411 }
00412
00413 if (a->argc != e->args) {
00414 return CLI_SHOWUSAGE;
00415 }
00416
00417 event = ast_event_new(AST_EVENT_PING, AST_EVENT_IE_END);
00418
00419 if (!event) {
00420 return CLI_FAILURE;
00421 }
00422
00423 ast_event_queue(event);
00424
00425 return CLI_SUCCESS;
00426 }
00427
00428 static char *corosync_show_config(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
00429 {
00430 unsigned int i;
00431
00432 switch (cmd) {
00433 case CLI_INIT:
00434 e->command = "corosync show config";
00435 e->usage =
00436 "Usage: corosync show config\n"
00437 " Show configuration loaded from res_corosync.conf\n";
00438 return NULL;
00439
00440 case CLI_GENERATE:
00441 return NULL;
00442 }
00443
00444 if (a->argc != e->args) {
00445 return CLI_SHOWUSAGE;
00446 }
00447
00448 ast_cli(a->fd, "\n"
00449 "=============================================================\n"
00450 "=== res_corosync config =====================================\n"
00451 "=============================================================\n"
00452 "===\n");
00453
00454 ast_rwlock_rdlock(&event_types_lock);
00455 for (i = 0; i < ARRAY_LEN(event_types); i++) {
00456 if (event_types[i].publish) {
00457 ast_cli(a->fd, "=== ==> Publishing Event Type: %s\n",
00458 event_types[i].name);
00459 }
00460 if (event_types[i].subscribe) {
00461 ast_cli(a->fd, "=== ==> Subscribing to Event Type: %s\n",
00462 event_types[i].name);
00463 }
00464 }
00465 ast_rwlock_unlock(&event_types_lock);
00466
00467 ast_cli(a->fd, "===\n"
00468 "=============================================================\n"
00469 "\n");
00470
00471 return CLI_SUCCESS;
00472 }
00473
00474 static struct ast_cli_entry corosync_cli[] = {
00475 AST_CLI_DEFINE(corosync_show_config, "Show configuration"),
00476 AST_CLI_DEFINE(corosync_show_members, "Show cluster members"),
00477 AST_CLI_DEFINE(corosync_ping, "Send a test ping to the cluster"),
00478 };
00479
00480 enum {
00481 PUBLISH,
00482 SUBSCRIBE,
00483 };
00484
00485 static int set_event(const char *event_type, int pubsub)
00486 {
00487 unsigned int i;
00488
00489 for (i = 0; i < ARRAY_LEN(event_types); i++) {
00490 if (!event_types[i].name || strcasecmp(event_type, event_types[i].name)) {
00491 continue;
00492 }
00493
00494 switch (pubsub) {
00495 case PUBLISH:
00496 event_types[i].publish = 1;
00497 break;
00498 case SUBSCRIBE:
00499 event_types[i].subscribe = 1;
00500 break;
00501 }
00502
00503 break;
00504 }
00505
00506 return (i == ARRAY_LEN(event_types)) ? -1 : 0;
00507 }
00508
00509 static int load_general_config(struct ast_config *cfg)
00510 {
00511 struct ast_variable *v;
00512 int res = 0;
00513 unsigned int i;
00514
00515 ast_rwlock_wrlock(&event_types_lock);
00516
00517 for (i = 0; i < ARRAY_LEN(event_types); i++) {
00518 event_types[i].publish = event_types[i].publish_default;
00519 event_types[i].subscribe = event_types[i].subscribe_default;
00520 }
00521
00522 for (v = ast_variable_browse(cfg, "general"); v && !res; v = v->next) {
00523 if (!strcasecmp(v->name, "publish_event")) {
00524 res = set_event(v->value, PUBLISH);
00525 } else if (!strcasecmp(v->name, "subscribe_event")) {
00526 res = set_event(v->value, SUBSCRIBE);
00527 } else {
00528 ast_log(LOG_WARNING, "Unknown option '%s'\n", v->name);
00529 }
00530 }
00531
00532 for (i = 0; i < ARRAY_LEN(event_types); i++) {
00533 if (event_types[i].publish && !event_types[i].sub) {
00534 event_types[i].sub = ast_event_subscribe(i,
00535 ast_event_cb, "Corosync", NULL,
00536 AST_EVENT_IE_END);
00537 } else if (!event_types[i].publish && event_types[i].sub) {
00538 event_types[i].sub = ast_event_unsubscribe(event_types[i].sub);
00539 }
00540 }
00541
00542 ast_rwlock_unlock(&event_types_lock);
00543
00544 return res;
00545 }
00546
00547 static int load_config(unsigned int reload)
00548 {
00549 static const char filename[] = "res_corosync.conf";
00550 struct ast_config *cfg;
00551 const char *cat = NULL;
00552 struct ast_flags config_flags = { 0 };
00553 int res = 0;
00554
00555 cfg = ast_config_load(filename, config_flags);
00556
00557 if (cfg == CONFIG_STATUS_FILEMISSING || cfg == CONFIG_STATUS_FILEINVALID) {
00558 return -1;
00559 }
00560
00561 while ((cat = ast_category_browse(cfg, cat))) {
00562 if (!strcasecmp(cat, "general")) {
00563 res = load_general_config(cfg);
00564 } else {
00565 ast_log(LOG_WARNING, "Unknown configuration section '%s'\n", cat);
00566 }
00567 }
00568
00569 ast_config_destroy(cfg);
00570
00571 return res;
00572 }
00573
00574 static void cleanup_module(void)
00575 {
00576 cs_error_t cs_err;
00577 unsigned int i;
00578
00579 for (i = 0; i < ARRAY_LEN(event_types); i++) {
00580 if (event_types[i].sub) {
00581 event_types[i].sub = ast_event_unsubscribe(event_types[i].sub);
00582 }
00583 event_types[i].publish = 0;
00584 event_types[i].subscribe = 0;
00585 }
00586
00587 if (dispatch_thread.id != AST_PTHREADT_NULL) {
00588 char meepmeep = 'x';
00589 dispatch_thread.stop = 1;
00590 if (ast_carefulwrite(dispatch_thread.alert_pipe[1], &meepmeep, 1,
00591 5000) == -1) {
00592 ast_log(LOG_ERROR, "Failed to write to pipe: %s (%d)\n",
00593 strerror(errno), errno);
00594 }
00595 pthread_join(dispatch_thread.id, NULL);
00596 }
00597
00598 if (dispatch_thread.alert_pipe[0] != -1) {
00599 close(dispatch_thread.alert_pipe[0]);
00600 dispatch_thread.alert_pipe[0] = -1;
00601 }
00602
00603 if (dispatch_thread.alert_pipe[1] != -1) {
00604 close(dispatch_thread.alert_pipe[1]);
00605 dispatch_thread.alert_pipe[1] = -1;
00606 }
00607
00608 if (cpg_handle && (cs_err = cpg_finalize(cpg_handle)) != CS_OK) {
00609 ast_log(LOG_ERROR, "Failed to finalize cpg (%d)\n", (int) cs_err);
00610 }
00611 cpg_handle = 0;
00612
00613 if (cfg_handle && (cs_err = corosync_cfg_finalize(cfg_handle)) != CS_OK) {
00614 ast_log(LOG_ERROR, "Failed to finalize cfg (%d)\n", (int) cs_err);
00615 }
00616 cfg_handle = 0;
00617 }
00618
00619 static int load_module(void)
00620 {
00621 cs_error_t cs_err;
00622 enum ast_module_load_result res = AST_MODULE_LOAD_FAILURE;
00623 struct cpg_name name;
00624
00625 if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) {
00626 ast_log(LOG_ERROR, "Failed to initialize cfg (%d)\n", (int) cs_err);
00627 return AST_MODULE_LOAD_DECLINE;
00628 }
00629
00630 if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks)) != CS_OK) {
00631 ast_log(LOG_ERROR, "Failed to initialize cpg (%d)\n", (int) cs_err);
00632 goto failed;
00633 }
00634
00635 ast_copy_string(name.value, "asterisk", sizeof(name.value));
00636 name.length = strlen(name.value);
00637
00638 if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) {
00639 ast_log(LOG_ERROR, "Failed to join (%d)\n", (int) cs_err);
00640 goto failed;
00641 }
00642
00643 if (pipe(dispatch_thread.alert_pipe) == -1) {
00644 ast_log(LOG_ERROR, "Failed to create alert pipe: %s (%d)\n",
00645 strerror(errno), errno);
00646 goto failed;
00647 }
00648
00649 if (ast_pthread_create_background(&dispatch_thread.id, NULL,
00650 dispatch_thread_handler, NULL)) {
00651 ast_log(LOG_ERROR, "Error starting CPG dispatch thread.\n");
00652 goto failed;
00653 }
00654
00655 if (load_config(0)) {
00656
00657 res = AST_MODULE_LOAD_DECLINE;
00658 goto failed;
00659 }
00660
00661 ast_cli_register_multiple(corosync_cli, ARRAY_LEN(corosync_cli));
00662
00663 ast_enable_distributed_devstate();
00664
00665 return AST_MODULE_LOAD_SUCCESS;
00666
00667 failed:
00668 cleanup_module();
00669
00670 return res;
00671 }
00672
00673 static int unload_module(void)
00674 {
00675 ast_cli_unregister_multiple(corosync_cli, ARRAY_LEN(corosync_cli));
00676
00677 cleanup_module();
00678
00679 return 0;
00680 }
00681
00682 AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "Corosync");