Two channel bridging module which groups bridges into batches of threads. More...
#include "asterisk.h"#include <stdio.h>#include <stdlib.h>#include <string.h>#include <sys/types.h>#include <sys/stat.h>#include <fcntl.h>#include "asterisk/module.h"#include "asterisk/channel.h"#include "asterisk/bridging.h"#include "asterisk/bridging_technology.h"#include "asterisk/frame.h"#include "asterisk/astobj2.h"
Go to the source code of this file.
Data Structures | |
| struct | multiplexed_thread |
| Structure which represents a single thread handling multiple 2 channel bridges. More... | |
Defines | |
| #define | MULTIPLEXED_BUCKETS 53 |
| Number of buckets our multiplexed thread container can have. | |
| #define | MULTIPLEXED_MAX_CHANNELS 8 |
| Number of channels we handle in a single thread. | |
Functions | |
| static void | __reg_module (void) |
| static void | __unreg_module (void) |
| static void | destroy_multiplexed_thread (void *obj) |
| Destroy callback for a multiplexed thread structure. | |
| static int | find_multiplexed_thread (void *obj, void *arg, int flags) |
| Callback function for finding a free multiplexed thread. | |
| static int | load_module (void) |
| static void | multiplexed_add_or_remove (struct multiplexed_thread *multiplexed_thread, struct ast_channel *chan, int add) |
| Helper function which adds or removes a channel and nudges the thread. | |
| static int | multiplexed_bridge_create (struct ast_bridge *bridge) |
| Create function which finds/reserves/references a multiplexed thread structure. | |
| static int | multiplexed_bridge_destroy (struct ast_bridge *bridge) |
| Destroy function which unreserves/unreferences/removes a multiplexed thread structure. | |
| static int | multiplexed_bridge_join (struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel) |
| Join function which actually adds the channel into the array to be monitored. | |
| static int | multiplexed_bridge_leave (struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel) |
| Leave function which actually removes the channel from the array. | |
| static void | multiplexed_bridge_suspend (struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel) |
| Suspend function which means control of the channel is going elsewhere. | |
| static void | multiplexed_bridge_unsuspend (struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel) |
| Unsuspend function which means control of the channel is coming back to us. | |
| static enum ast_bridge_write_result | multiplexed_bridge_write (struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel, struct ast_frame *frame) |
| Write function for writing frames into the bridge. | |
| static void | multiplexed_nudge (struct multiplexed_thread *multiplexed_thread) |
| Internal function which nudges the thread. | |
| static void * | multiplexed_thread_function (void *data) |
| Thread function that executes for multiplexed threads. | |
| static int | unload_module (void) |
Variables | |
| static struct ast_module_info | __mod_info = { .name = AST_MODULE, .flags = AST_MODFLAG_LOAD_ORDER , .description = "Multiplexed two channel bridging module" , .key = "This paragraph is copyright (c) 2006 by Digium, Inc. \In order for your module to load, it must return this \key via a function called \"key\". Any code which \includes this paragraph must be licensed under the GNU \General Public License version 2 or later (at your \option). In addition to Digium's general reservations \of rights, Digium expressly reserves the right to \allow other parties to license this paragraph under \different terms. Any use of Digium, Inc. trademarks or \logos (including \"Asterisk\" or \"Digium\") without \express written permission of Digium, Inc. is prohibited.\n" , .buildopt_sum = AST_BUILDOPT_SUM, .load = load_module, .unload = unload_module, .load_pri = AST_MODPRI_DEFAULT, } |
| static struct ast_module_info * | ast_module_info = &__mod_info |
| static struct ast_bridge_technology | multiplexed_bridge |
| static struct ao2_container * | multiplexed_threads |
| Container of all operating multiplexed threads. | |
Two channel bridging module which groups bridges into batches of threads.
Definition in file bridge_multiplexed.c.
| #define MULTIPLEXED_BUCKETS 53 |
Number of buckets our multiplexed thread container can have.
Definition at line 51 of file bridge_multiplexed.c.
Referenced by load_module().
| #define MULTIPLEXED_MAX_CHANNELS 8 |
Number of channels we handle in a single thread.
Definition at line 54 of file bridge_multiplexed.c.
Referenced by find_multiplexed_thread(), and multiplexed_add_or_remove().
| static void __reg_module | ( | void | ) | [static] |
Definition at line 432 of file bridge_multiplexed.c.
| static void __unreg_module | ( | void | ) | [static] |
Definition at line 432 of file bridge_multiplexed.c.
| static void destroy_multiplexed_thread | ( | void * | obj | ) | [static] |
Destroy callback for a multiplexed thread structure.
Definition at line 83 of file bridge_multiplexed.c.
References multiplexed_thread::pipe.
Referenced by multiplexed_bridge_create().
{
struct multiplexed_thread *multiplexed_thread = obj;
if (multiplexed_thread->pipe[0] > -1) {
close(multiplexed_thread->pipe[0]);
}
if (multiplexed_thread->pipe[1] > -1) {
close(multiplexed_thread->pipe[1]);
}
return;
}
| static int find_multiplexed_thread | ( | void * | obj, |
| void * | arg, | ||
| int | flags | ||
| ) | [static] |
Callback function for finding a free multiplexed thread.
Definition at line 76 of file bridge_multiplexed.c.
References CMP_MATCH, CMP_STOP, multiplexed_thread::count, and MULTIPLEXED_MAX_CHANNELS.
Referenced by multiplexed_bridge_create().
{
struct multiplexed_thread *multiplexed_thread = obj;
return (multiplexed_thread->count <= (MULTIPLEXED_MAX_CHANNELS - 2)) ? CMP_MATCH | CMP_STOP : 0;
}
| static int load_module | ( | void | ) | [static] |
Definition at line 418 of file bridge_multiplexed.c.
References ao2_container_alloc, ast_bridge_technology_register, ast_format_cap_add_all_by_type(), ast_format_cap_alloc(), AST_FORMAT_TYPE_AUDIO, AST_FORMAT_TYPE_TEXT, AST_FORMAT_TYPE_VIDEO, AST_MODULE_LOAD_DECLINE, ast_bridge_technology::format_capabilities, and MULTIPLEXED_BUCKETS.
{
if (!(multiplexed_threads = ao2_container_alloc(MULTIPLEXED_BUCKETS, NULL, NULL))) {
return AST_MODULE_LOAD_DECLINE;
}
if (!(multiplexed_bridge.format_capabilities = ast_format_cap_alloc())) {
return AST_MODULE_LOAD_DECLINE;
}
ast_format_cap_add_all_by_type(multiplexed_bridge.format_capabilities, AST_FORMAT_TYPE_AUDIO);
ast_format_cap_add_all_by_type(multiplexed_bridge.format_capabilities, AST_FORMAT_TYPE_VIDEO);
ast_format_cap_add_all_by_type(multiplexed_bridge.format_capabilities, AST_FORMAT_TYPE_TEXT);
return ast_bridge_technology_register(&multiplexed_bridge);
}
| static void multiplexed_add_or_remove | ( | struct multiplexed_thread * | multiplexed_thread, |
| struct ast_channel * | chan, | ||
| int | add | ||
| ) | [static] |
Helper function which adds or removes a channel and nudges the thread.
Definition at line 269 of file bridge_multiplexed.c.
References ao2_lock, ao2_ref, ao2_unlock, ast_debug, ast_pthread_create, AST_PTHREADT_NULL, AST_PTHREADT_STOP, multiplexed_thread::chans, MULTIPLEXED_MAX_CHANNELS, multiplexed_nudge(), multiplexed_thread_function(), multiplexed_thread::service_count, multiplexed_thread::thread, and thread.
Referenced by multiplexed_bridge_join(), multiplexed_bridge_leave(), multiplexed_bridge_suspend(), and multiplexed_bridge_unsuspend().
{
int i, removed = 0;
pthread_t thread = AST_PTHREADT_NULL;
ao2_lock(multiplexed_thread);
multiplexed_nudge(multiplexed_thread);
for (i = 0; i < MULTIPLEXED_MAX_CHANNELS; i++) {
if (multiplexed_thread->chans[i] == chan) {
if (!add) {
multiplexed_thread->chans[i] = NULL;
multiplexed_thread->service_count--;
removed = 1;
}
break;
} else if (!multiplexed_thread->chans[i] && add) {
multiplexed_thread->chans[i] = chan;
multiplexed_thread->service_count++;
break;
}
}
if (multiplexed_thread->service_count && multiplexed_thread->thread == AST_PTHREADT_NULL) {
ao2_ref(multiplexed_thread, +1);
if (ast_pthread_create(&multiplexed_thread->thread, NULL, multiplexed_thread_function, multiplexed_thread)) {
ao2_ref(multiplexed_thread, -1);
ast_debug(1, "Failed to create an actual thread for multiplexed thread '%p', trying next time\n", multiplexed_thread);
}
} else if (!multiplexed_thread->service_count && multiplexed_thread->thread != AST_PTHREADT_NULL) {
thread = multiplexed_thread->thread;
multiplexed_thread->thread = AST_PTHREADT_STOP;
} else if (!add && removed) {
memmove(multiplexed_thread->chans + i, multiplexed_thread->chans + i + 1, sizeof(struct ast_channel *) * (MULTIPLEXED_MAX_CHANNELS - (i + 1)));
}
ao2_unlock(multiplexed_thread);
if (thread != AST_PTHREADT_NULL) {
pthread_join(thread, NULL);
}
return;
}
| static int multiplexed_bridge_create | ( | struct ast_bridge * | bridge | ) | [static] |
Create function which finds/reserves/references a multiplexed thread structure.
Definition at line 98 of file bridge_multiplexed.c.
References ao2_alloc, ao2_callback, ao2_link, ao2_lock, ao2_ref, ao2_unlock, ast_debug, ast_log(), AST_PTHREADT_NULL, ast_bridge::bridge_pvt, multiplexed_thread::count, destroy_multiplexed_thread(), errno, find_multiplexed_thread(), LOG_WARNING, multiplexed_thread::pipe, and multiplexed_thread::thread.
{
struct multiplexed_thread *multiplexed_thread;
ao2_lock(multiplexed_threads);
/* Try to find an existing thread to handle our additional channels */
if (!(multiplexed_thread = ao2_callback(multiplexed_threads, 0, find_multiplexed_thread, NULL))) {
int flags;
/* If we failed we will have to create a new one from scratch */
if (!(multiplexed_thread = ao2_alloc(sizeof(*multiplexed_thread), destroy_multiplexed_thread))) {
ast_debug(1, "Failed to find or create a new multiplexed thread for bridge '%p'\n", bridge);
ao2_unlock(multiplexed_threads);
return -1;
}
multiplexed_thread->pipe[0] = multiplexed_thread->pipe[1] = -1;
/* Setup a pipe so we can poke the thread itself when needed */
if (pipe(multiplexed_thread->pipe)) {
ast_debug(1, "Failed to create a pipe for poking a multiplexed thread for bridge '%p'\n", bridge);
ao2_ref(multiplexed_thread, -1);
ao2_unlock(multiplexed_threads);
return -1;
}
/* Setup each pipe for non-blocking operation */
flags = fcntl(multiplexed_thread->pipe[0], F_GETFL);
if (fcntl(multiplexed_thread->pipe[0], F_SETFL, flags | O_NONBLOCK) < 0) {
ast_log(LOG_WARNING, "Failed to setup first nudge pipe for non-blocking operation on %p (%d: %s)\n", bridge, errno, strerror(errno));
ao2_ref(multiplexed_thread, -1);
ao2_unlock(multiplexed_threads);
return -1;
}
flags = fcntl(multiplexed_thread->pipe[1], F_GETFL);
if (fcntl(multiplexed_thread->pipe[1], F_SETFL, flags | O_NONBLOCK) < 0) {
ast_log(LOG_WARNING, "Failed to setup second nudge pipe for non-blocking operation on %p (%d: %s)\n", bridge, errno, strerror(errno));
ao2_ref(multiplexed_thread, -1);
ao2_unlock(multiplexed_threads);
return -1;
}
/* Set up default parameters */
multiplexed_thread->thread = AST_PTHREADT_NULL;
/* Finally link us into the container so others may find us */
ao2_link(multiplexed_threads, multiplexed_thread);
ast_debug(1, "Created multiplexed thread '%p' for bridge '%p'\n", multiplexed_thread, bridge);
} else {
ast_debug(1, "Found multiplexed thread '%p' for bridge '%p'\n", multiplexed_thread, bridge);
}
/* Bump the count of the thread structure up by two since the channels for this bridge will be joining shortly */
multiplexed_thread->count += 2;
ao2_unlock(multiplexed_threads);
bridge->bridge_pvt = multiplexed_thread;
return 0;
}
| static int multiplexed_bridge_destroy | ( | struct ast_bridge * | bridge | ) | [static] |
Destroy function which unreserves/unreferences/removes a multiplexed thread structure.
Definition at line 181 of file bridge_multiplexed.c.
References ao2_lock, ao2_ref, ao2_unlink, ao2_unlock, ast_debug, ast_bridge::bridge_pvt, multiplexed_thread::count, and multiplexed_nudge().
{
struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
ao2_lock(multiplexed_threads);
multiplexed_thread->count -= 2;
if (!multiplexed_thread->count) {
ast_debug(1, "Unlinking multiplexed thread '%p' since nobody is using it anymore\n", multiplexed_thread);
ao2_unlink(multiplexed_threads, multiplexed_thread);
}
multiplexed_nudge(multiplexed_thread);
ao2_unlock(multiplexed_threads);
ao2_ref(multiplexed_thread, -1);
return 0;
}
| static int multiplexed_bridge_join | ( | struct ast_bridge * | bridge, |
| struct ast_bridge_channel * | bridge_channel | ||
| ) | [static] |
Join function which actually adds the channel into the array to be monitored.
Definition at line 316 of file bridge_multiplexed.c.
References ast_channel_make_compatible(), ast_channel_name(), ast_channel_nativeformats(), ast_channel_readformat(), ast_channel_writeformat(), ast_debug, ast_format_cap_identical(), ast_format_cmp(), AST_FORMAT_CMP_EQUAL, AST_LIST_FIRST, AST_LIST_LAST, ast_bridge::bridge_pvt, ast_bridge_channel::chan, and multiplexed_add_or_remove().
{
struct ast_channel *c0 = AST_LIST_FIRST(&bridge->channels)->chan, *c1 = AST_LIST_LAST(&bridge->channels)->chan;
struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
ast_debug(1, "Adding channel '%s' to multiplexed thread '%p' for monitoring\n", ast_channel_name(bridge_channel->chan), multiplexed_thread);
multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 1);
/* If the second channel has not yet joined do not make things compatible */
if (c0 == c1) {
return 0;
}
if ((ast_format_cmp(ast_channel_writeformat(c0), ast_channel_readformat(c1)) == AST_FORMAT_CMP_EQUAL) &&
(ast_format_cmp(ast_channel_readformat(c0), ast_channel_writeformat(c1)) == AST_FORMAT_CMP_EQUAL) &&
(ast_format_cap_identical(ast_channel_nativeformats(c0), ast_channel_nativeformats(c1)))) {
return 0;
}
return ast_channel_make_compatible(c0, c1);
}
| static int multiplexed_bridge_leave | ( | struct ast_bridge * | bridge, |
| struct ast_bridge_channel * | bridge_channel | ||
| ) | [static] |
Leave function which actually removes the channel from the array.
Definition at line 340 of file bridge_multiplexed.c.
References ast_channel_name(), ast_debug, ast_bridge::bridge_pvt, ast_bridge_channel::chan, and multiplexed_add_or_remove().
{
struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
ast_debug(1, "Removing channel '%s' from multiplexed thread '%p'\n", ast_channel_name(bridge_channel->chan), multiplexed_thread);
multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 0);
return 0;
}
| static void multiplexed_bridge_suspend | ( | struct ast_bridge * | bridge, |
| struct ast_bridge_channel * | bridge_channel | ||
| ) | [static] |
Suspend function which means control of the channel is going elsewhere.
Definition at line 352 of file bridge_multiplexed.c.
References ast_channel_name(), ast_debug, ast_bridge::bridge_pvt, ast_bridge_channel::chan, and multiplexed_add_or_remove().
{
struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
ast_debug(1, "Suspending channel '%s' from multiplexed thread '%p'\n", ast_channel_name(bridge_channel->chan), multiplexed_thread);
multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 0);
return;
}
| static void multiplexed_bridge_unsuspend | ( | struct ast_bridge * | bridge, |
| struct ast_bridge_channel * | bridge_channel | ||
| ) | [static] |
Unsuspend function which means control of the channel is coming back to us.
Definition at line 364 of file bridge_multiplexed.c.
References ast_channel_name(), ast_debug, ast_bridge::bridge_pvt, ast_bridge_channel::chan, and multiplexed_add_or_remove().
{
struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
ast_debug(1, "Unsuspending channel '%s' from multiplexed thread '%p'\n", ast_channel_name(bridge_channel->chan), multiplexed_thread);
multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 1);
return;
}
| static enum ast_bridge_write_result multiplexed_bridge_write | ( | struct ast_bridge * | bridge, |
| struct ast_bridge_channel * | bridge_channel, | ||
| struct ast_frame * | frame | ||
| ) | [static] |
Write function for writing frames into the bridge.
Definition at line 376 of file bridge_multiplexed.c.
References AST_BRIDGE_CHANNEL_STATE_WAIT, AST_BRIDGE_WRITE_FAILED, AST_BRIDGE_WRITE_SUCCESS, AST_LIST_FIRST, AST_LIST_LAST, ast_write(), ast_bridge_channel::chan, and ast_bridge_channel::state.
{
struct ast_bridge_channel *other;
if (AST_LIST_FIRST(&bridge->channels) == AST_LIST_LAST(&bridge->channels)) {
return AST_BRIDGE_WRITE_FAILED;
}
if (!(other = (AST_LIST_FIRST(&bridge->channels) == bridge_channel ? AST_LIST_LAST(&bridge->channels) : AST_LIST_FIRST(&bridge->channels)))) {
return AST_BRIDGE_WRITE_FAILED;
}
if (other->state == AST_BRIDGE_CHANNEL_STATE_WAIT) {
ast_write(other->chan, frame);
}
return AST_BRIDGE_WRITE_SUCCESS;
}
| static void multiplexed_nudge | ( | struct multiplexed_thread * | multiplexed_thread | ) | [static] |
Internal function which nudges the thread.
Definition at line 161 of file bridge_multiplexed.c.
References ast_log(), AST_PTHREADT_NULL, LOG_ERROR, multiplexed_thread::pipe, multiplexed_thread::thread, and multiplexed_thread::waiting.
Referenced by multiplexed_add_or_remove(), and multiplexed_bridge_destroy().
{
int nudge = 0;
if (multiplexed_thread->thread == AST_PTHREADT_NULL) {
return;
}
if (write(multiplexed_thread->pipe[1], &nudge, sizeof(nudge)) != sizeof(nudge)) {
ast_log(LOG_ERROR, "We couldn't poke multiplexed thread '%p'... something is VERY wrong\n", multiplexed_thread);
}
while (multiplexed_thread->waiting) {
sched_yield();
}
return;
}
| static void* multiplexed_thread_function | ( | void * | data | ) | [static] |
Thread function that executes for multiplexed threads.
Definition at line 204 of file bridge_multiplexed.c.
References ao2_lock, ao2_ref, ao2_trylock, ao2_unlock, ast_bridge_handle_trip(), ast_channel_internal_bridge(), ast_debug, ast_log(), AST_PTHREADT_NULL, AST_PTHREADT_STOP, ast_waitfor_nandfds(), multiplexed_thread::chans, errno, first, LOG_WARNING, multiplexed_thread::pipe, multiplexed_thread::service_count, stop, multiplexed_thread::thread, and multiplexed_thread::waiting.
Referenced by multiplexed_add_or_remove().
{
struct multiplexed_thread *multiplexed_thread = data;
int fds = multiplexed_thread->pipe[0];
ao2_lock(multiplexed_thread);
ast_debug(1, "Starting actual thread for multiplexed thread '%p'\n", multiplexed_thread);
while (multiplexed_thread->thread != AST_PTHREADT_STOP) {
struct ast_channel *winner = NULL, *first = multiplexed_thread->chans[0];
int to = -1, outfd = -1;
/* Move channels around so not just the first one gets priority */
memmove(multiplexed_thread->chans, multiplexed_thread->chans + 1, sizeof(struct ast_channel *) * (multiplexed_thread->service_count - 1));
multiplexed_thread->chans[multiplexed_thread->service_count - 1] = first;
multiplexed_thread->waiting = 1;
ao2_unlock(multiplexed_thread);
winner = ast_waitfor_nandfds(multiplexed_thread->chans, multiplexed_thread->service_count, &fds, 1, NULL, &outfd, &to);
multiplexed_thread->waiting = 0;
ao2_lock(multiplexed_thread);
if (multiplexed_thread->thread == AST_PTHREADT_STOP) {
break;
}
if (outfd > -1) {
int nudge;
if (read(multiplexed_thread->pipe[0], &nudge, sizeof(nudge)) < 0) {
if (errno != EINTR && errno != EAGAIN) {
ast_log(LOG_WARNING, "read() failed for pipe on multiplexed thread '%p': %s\n", multiplexed_thread, strerror(errno));
}
}
}
if (winner && ast_channel_internal_bridge(winner)) {
struct ast_bridge *bridge = ast_channel_internal_bridge(winner);
int stop = 0;
ao2_unlock(multiplexed_thread);
while ((bridge = ast_channel_internal_bridge(winner)) && ao2_trylock(bridge)) {
sched_yield();
if (multiplexed_thread->thread == AST_PTHREADT_STOP) {
stop = 1;
break;
}
}
if (!stop && bridge) {
ast_bridge_handle_trip(bridge, NULL, winner, -1);
ao2_unlock(bridge);
}
ao2_lock(multiplexed_thread);
}
}
multiplexed_thread->thread = AST_PTHREADT_NULL;
ast_debug(1, "Stopping actual thread for multiplexed thread '%p'\n", multiplexed_thread);
ao2_unlock(multiplexed_thread);
ao2_ref(multiplexed_thread, -1);
return NULL;
}
| static int unload_module | ( | void | ) | [static] |
Definition at line 408 of file bridge_multiplexed.c.
References ao2_ref, ast_bridge_technology_unregister(), ast_format_cap_destroy(), and ast_bridge_technology::format_capabilities.
{
int res = ast_bridge_technology_unregister(&multiplexed_bridge);
ao2_ref(multiplexed_threads, -1);
multiplexed_bridge.format_capabilities = ast_format_cap_destroy(multiplexed_bridge.format_capabilities);
return res;
}
struct ast_module_info __mod_info = { .name = AST_MODULE, .flags = AST_MODFLAG_LOAD_ORDER , .description = "Multiplexed two channel bridging module" , .key = "This paragraph is copyright (c) 2006 by Digium, Inc. \In order for your module to load, it must return this \key via a function called \"key\". Any code which \includes this paragraph must be licensed under the GNU \General Public License version 2 or later (at your \option). In addition to Digium's general reservations \of rights, Digium expressly reserves the right to \allow other parties to license this paragraph under \different terms. Any use of Digium, Inc. trademarks or \logos (including \"Asterisk\" or \"Digium\") without \express written permission of Digium, Inc. is prohibited.\n" , .buildopt_sum = AST_BUILDOPT_SUM, .load = load_module, .unload = unload_module, .load_pri = AST_MODPRI_DEFAULT, } [static] |
Definition at line 432 of file bridge_multiplexed.c.
struct ast_module_info* ast_module_info = &__mod_info [static] |
Definition at line 432 of file bridge_multiplexed.c.
struct ast_bridge_technology multiplexed_bridge [static] |
Definition at line 395 of file bridge_multiplexed.c.
struct ao2_container* multiplexed_threads [static] |
Container of all operating multiplexed threads.
Definition at line 73 of file bridge_multiplexed.c.