Mon Mar 12 2012 21:26:39

Asterisk developer's documentation


bridge_multiplexed.c File Reference

Two channel bridging module which groups bridges into batches of threads. More...

#include "asterisk.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "asterisk/module.h"
#include "asterisk/channel.h"
#include "asterisk/bridging.h"
#include "asterisk/bridging_technology.h"
#include "asterisk/frame.h"
#include "asterisk/astobj2.h"
Include dependency graph for bridge_multiplexed.c:

Go to the source code of this file.

Data Structures

struct  multiplexed_thread
 Structure which represents a single thread handling multiple 2 channel bridges. More...

Defines

#define MULTIPLEXED_BUCKETS   53
 Number of buckets our multiplexed thread container can have.
#define MULTIPLEXED_MAX_CHANNELS   8
 Number of channels we handle in a single thread.

Functions

static void __reg_module (void)
static void __unreg_module (void)
static void destroy_multiplexed_thread (void *obj)
 Destroy callback for a multiplexed thread structure.
static int find_multiplexed_thread (void *obj, void *arg, int flags)
 Callback function for finding a free multiplexed thread.
static int load_module (void)
static void multiplexed_add_or_remove (struct multiplexed_thread *multiplexed_thread, struct ast_channel *chan, int add)
 Helper function which adds or removes a channel and nudges the thread.
static int multiplexed_bridge_create (struct ast_bridge *bridge)
 Create function which finds/reserves/references a multiplexed thread structure.
static int multiplexed_bridge_destroy (struct ast_bridge *bridge)
 Destroy function which unreserves/unreferences/removes a multiplexed thread structure.
static int multiplexed_bridge_join (struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
 Join function which actually adds the channel into the array to be monitored.
static int multiplexed_bridge_leave (struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
 Leave function which actually removes the channel from the array.
static void multiplexed_bridge_suspend (struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
 Suspend function which means control of the channel is going elsewhere.
static void multiplexed_bridge_unsuspend (struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
 Unsuspend function which means control of the channel is coming back to us.
static enum ast_bridge_write_result multiplexed_bridge_write (struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel, struct ast_frame *frame)
 Write function for writing frames into the bridge.
static void multiplexed_nudge (struct multiplexed_thread *multiplexed_thread)
 Internal function which nudges the thread.
static void * multiplexed_thread_function (void *data)
 Thread function that executes for multiplexed threads.
static int unload_module (void)

Variables

static struct ast_module_info __mod_info = { .name = AST_MODULE, .flags = AST_MODFLAG_LOAD_ORDER , .description = "Multiplexed two channel bridging module" , .key = "This paragraph is copyright (c) 2006 by Digium, Inc. \In order for your module to load, it must return this \key via a function called \"key\". Any code which \includes this paragraph must be licensed under the GNU \General Public License version 2 or later (at your \option). In addition to Digium's general reservations \of rights, Digium expressly reserves the right to \allow other parties to license this paragraph under \different terms. Any use of Digium, Inc. trademarks or \logos (including \"Asterisk\" or \"Digium\") without \express written permission of Digium, Inc. is prohibited.\n" , .buildopt_sum = AST_BUILDOPT_SUM, .load = load_module, .unload = unload_module, .load_pri = AST_MODPRI_DEFAULT, }
static struct ast_module_infoast_module_info = &__mod_info
static struct ast_bridge_technology multiplexed_bridge
static struct ao2_containermultiplexed_threads
 Container of all operating multiplexed threads.

Detailed Description

Two channel bridging module which groups bridges into batches of threads.

Author:
Joshua Colp <jcolp@digium.com>

Definition in file bridge_multiplexed.c.


Define Documentation

#define MULTIPLEXED_BUCKETS   53

Number of buckets our multiplexed thread container can have.

Definition at line 51 of file bridge_multiplexed.c.

Referenced by load_module().

#define MULTIPLEXED_MAX_CHANNELS   8

Number of channels we handle in a single thread.

Definition at line 54 of file bridge_multiplexed.c.

Referenced by find_multiplexed_thread(), and multiplexed_add_or_remove().


Function Documentation

static void __reg_module ( void  ) [static]

Definition at line 425 of file bridge_multiplexed.c.

static void __unreg_module ( void  ) [static]

Definition at line 425 of file bridge_multiplexed.c.

static void destroy_multiplexed_thread ( void *  obj) [static]

Destroy callback for a multiplexed thread structure.

Definition at line 83 of file bridge_multiplexed.c.

References multiplexed_thread::pipe.

Referenced by multiplexed_bridge_create().

{
   struct multiplexed_thread *multiplexed_thread = obj;

   if (multiplexed_thread->pipe[0] > -1) {
      close(multiplexed_thread->pipe[0]);
   }
   if (multiplexed_thread->pipe[1] > -1) {
      close(multiplexed_thread->pipe[1]);
   }

   return;
}
static int find_multiplexed_thread ( void *  obj,
void *  arg,
int  flags 
) [static]

Callback function for finding a free multiplexed thread.

Definition at line 76 of file bridge_multiplexed.c.

References CMP_MATCH, CMP_STOP, multiplexed_thread::count, and MULTIPLEXED_MAX_CHANNELS.

Referenced by multiplexed_bridge_create().

{
   struct multiplexed_thread *multiplexed_thread = obj;
   return (multiplexed_thread->count <= (MULTIPLEXED_MAX_CHANNELS - 2)) ? CMP_MATCH | CMP_STOP : 0;
}
static void multiplexed_add_or_remove ( struct multiplexed_thread multiplexed_thread,
struct ast_channel chan,
int  add 
) [static]

Helper function which adds or removes a channel and nudges the thread.

Definition at line 269 of file bridge_multiplexed.c.

References ao2_lock, ao2_ref, ao2_unlock, ast_debug, ast_pthread_create, AST_PTHREADT_NULL, AST_PTHREADT_STOP, multiplexed_thread::chans, MULTIPLEXED_MAX_CHANNELS, multiplexed_nudge(), multiplexed_thread_function(), multiplexed_thread::service_count, multiplexed_thread::thread, and thread.

Referenced by multiplexed_bridge_join(), multiplexed_bridge_leave(), multiplexed_bridge_suspend(), and multiplexed_bridge_unsuspend().

{
   int i, removed = 0;
   pthread_t thread = AST_PTHREADT_NULL;

   ao2_lock(multiplexed_thread);

   multiplexed_nudge(multiplexed_thread);

   for (i = 0; i < MULTIPLEXED_MAX_CHANNELS; i++) {
      if (multiplexed_thread->chans[i] == chan) {
         if (!add) {
            multiplexed_thread->chans[i] = NULL;
            multiplexed_thread->service_count--;
            removed = 1;
         }
         break;
      } else if (!multiplexed_thread->chans[i] && add) {
         multiplexed_thread->chans[i] = chan;
         multiplexed_thread->service_count++;
         break;
      }
   }

   if (multiplexed_thread->service_count && multiplexed_thread->thread == AST_PTHREADT_NULL) {
      ao2_ref(multiplexed_thread, +1);
      if (ast_pthread_create(&multiplexed_thread->thread, NULL, multiplexed_thread_function, multiplexed_thread)) {
         ao2_ref(multiplexed_thread, -1);
         ast_debug(1, "Failed to create an actual thread for multiplexed thread '%p', trying next time\n", multiplexed_thread);
      }
   } else if (!multiplexed_thread->service_count && multiplexed_thread->thread != AST_PTHREADT_NULL) {
      thread = multiplexed_thread->thread;
      multiplexed_thread->thread = AST_PTHREADT_STOP;
   } else if (!add && removed) {
      memmove(multiplexed_thread->chans + i, multiplexed_thread->chans + i + 1, sizeof(struct ast_channel *) * (MULTIPLEXED_MAX_CHANNELS - (i + 1)));
   }

   ao2_unlock(multiplexed_thread);

   if (thread != AST_PTHREADT_NULL) {
      pthread_join(thread, NULL);
   }

   return;
}
static int multiplexed_bridge_create ( struct ast_bridge bridge) [static]

Create function which finds/reserves/references a multiplexed thread structure.

Definition at line 98 of file bridge_multiplexed.c.

References ao2_alloc, ao2_callback, ao2_link, ao2_lock, ao2_ref, ao2_unlock, ast_debug, ast_log(), AST_PTHREADT_NULL, ast_bridge::bridge_pvt, multiplexed_thread::count, destroy_multiplexed_thread(), errno, find_multiplexed_thread(), LOG_WARNING, multiplexed_thread::pipe, and multiplexed_thread::thread.

{
   struct multiplexed_thread *multiplexed_thread;

   ao2_lock(multiplexed_threads);

   /* Try to find an existing thread to handle our additional channels */
   if (!(multiplexed_thread = ao2_callback(multiplexed_threads, 0, find_multiplexed_thread, NULL))) {
      int flags;

      /* If we failed we will have to create a new one from scratch */
      if (!(multiplexed_thread = ao2_alloc(sizeof(*multiplexed_thread), destroy_multiplexed_thread))) {
         ast_debug(1, "Failed to find or create a new multiplexed thread for bridge '%p'\n", bridge);
         ao2_unlock(multiplexed_threads);
         return -1;
      }

      multiplexed_thread->pipe[0] = multiplexed_thread->pipe[1] = -1;
      /* Setup a pipe so we can poke the thread itself when needed */
      if (pipe(multiplexed_thread->pipe)) {
         ast_debug(1, "Failed to create a pipe for poking a multiplexed thread for bridge '%p'\n", bridge);
         ao2_ref(multiplexed_thread, -1);
         ao2_unlock(multiplexed_threads);
         return -1;
      }

      /* Setup each pipe for non-blocking operation */
      flags = fcntl(multiplexed_thread->pipe[0], F_GETFL);
      if (fcntl(multiplexed_thread->pipe[0], F_SETFL, flags | O_NONBLOCK) < 0) {
         ast_log(LOG_WARNING, "Failed to setup first nudge pipe for non-blocking operation on %p (%d: %s)\n", bridge, errno, strerror(errno));
         ao2_ref(multiplexed_thread, -1);
         ao2_unlock(multiplexed_threads);
         return -1;
      }
      flags = fcntl(multiplexed_thread->pipe[1], F_GETFL);
      if (fcntl(multiplexed_thread->pipe[1], F_SETFL, flags | O_NONBLOCK) < 0) {
         ast_log(LOG_WARNING, "Failed to setup second nudge pipe for non-blocking operation on %p (%d: %s)\n", bridge, errno, strerror(errno));
         ao2_ref(multiplexed_thread, -1);
         ao2_unlock(multiplexed_threads);
         return -1;
      }

      /* Set up default parameters */
      multiplexed_thread->thread = AST_PTHREADT_NULL;

      /* Finally link us into the container so others may find us */
      ao2_link(multiplexed_threads, multiplexed_thread);
      ast_debug(1, "Created multiplexed thread '%p' for bridge '%p'\n", multiplexed_thread, bridge);
   } else {
      ast_debug(1, "Found multiplexed thread '%p' for bridge '%p'\n", multiplexed_thread, bridge);
   }

   /* Bump the count of the thread structure up by two since the channels for this bridge will be joining shortly */
   multiplexed_thread->count += 2;

   ao2_unlock(multiplexed_threads);

   bridge->bridge_pvt = multiplexed_thread;

   return 0;
}
static int multiplexed_bridge_destroy ( struct ast_bridge bridge) [static]

Destroy function which unreserves/unreferences/removes a multiplexed thread structure.

Definition at line 181 of file bridge_multiplexed.c.

References ao2_lock, ao2_ref, ao2_unlink, ao2_unlock, ast_debug, ast_bridge::bridge_pvt, multiplexed_thread::count, and multiplexed_nudge().

{
   struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;

   ao2_lock(multiplexed_threads);

   multiplexed_thread->count -= 2;

   if (!multiplexed_thread->count) {
      ast_debug(1, "Unlinking multiplexed thread '%p' since nobody is using it anymore\n", multiplexed_thread);
      ao2_unlink(multiplexed_threads, multiplexed_thread);
   }

   multiplexed_nudge(multiplexed_thread);

   ao2_unlock(multiplexed_threads);

   ao2_ref(multiplexed_thread, -1);

   return 0;
}
static int multiplexed_bridge_join ( struct ast_bridge bridge,
struct ast_bridge_channel bridge_channel 
) [static]

Join function which actually adds the channel into the array to be monitored.

Definition at line 316 of file bridge_multiplexed.c.

References ast_channel_make_compatible(), ast_debug, AST_LIST_FIRST, AST_LIST_LAST, ast_bridge::bridge_pvt, ast_bridge_channel::chan, ast_bridge::channels, multiplexed_add_or_remove(), ast_channel::name, ast_channel::nativeformats, ast_channel::readformat, and ast_channel::writeformat.

{
   struct ast_channel *c0 = AST_LIST_FIRST(&bridge->channels)->chan, *c1 = AST_LIST_LAST(&bridge->channels)->chan;
   struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;

   ast_debug(1, "Adding channel '%s' to multiplexed thread '%p' for monitoring\n", bridge_channel->chan->name, multiplexed_thread);

   multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 1);

   /* If the second channel has not yet joined do not make things compatible */
   if (c0 == c1) {
      return 0;
   }

   if (((c0->writeformat == c1->readformat) && (c0->readformat == c1->writeformat) && (c0->nativeformats == c1->nativeformats))) {
      return 0;
   }

   return ast_channel_make_compatible(c0, c1);
}
static int multiplexed_bridge_leave ( struct ast_bridge bridge,
struct ast_bridge_channel bridge_channel 
) [static]

Leave function which actually removes the channel from the array.

Definition at line 338 of file bridge_multiplexed.c.

References ast_debug, ast_bridge::bridge_pvt, ast_bridge_channel::chan, multiplexed_add_or_remove(), and ast_channel::name.

{
   struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;

   ast_debug(1, "Removing channel '%s' from multiplexed thread '%p'\n", bridge_channel->chan->name, multiplexed_thread);

   multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 0);

   return 0;
}
static void multiplexed_bridge_suspend ( struct ast_bridge bridge,
struct ast_bridge_channel bridge_channel 
) [static]

Suspend function which means control of the channel is going elsewhere.

Definition at line 350 of file bridge_multiplexed.c.

References ast_debug, ast_bridge::bridge_pvt, ast_bridge_channel::chan, multiplexed_add_or_remove(), and ast_channel::name.

{
   struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;

   ast_debug(1, "Suspending channel '%s' from multiplexed thread '%p'\n", bridge_channel->chan->name, multiplexed_thread);

   multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 0);

   return;
}
static void multiplexed_bridge_unsuspend ( struct ast_bridge bridge,
struct ast_bridge_channel bridge_channel 
) [static]

Unsuspend function which means control of the channel is coming back to us.

Definition at line 362 of file bridge_multiplexed.c.

References ast_debug, ast_bridge::bridge_pvt, ast_bridge_channel::chan, multiplexed_add_or_remove(), and ast_channel::name.

{
   struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;

   ast_debug(1, "Unsuspending channel '%s' from multiplexed thread '%p'\n", bridge_channel->chan->name, multiplexed_thread);

   multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 1);

   return;
}
static enum ast_bridge_write_result multiplexed_bridge_write ( struct ast_bridge bridge,
struct ast_bridge_channel bridge_channel,
struct ast_frame frame 
) [static]

Write function for writing frames into the bridge.

Definition at line 374 of file bridge_multiplexed.c.

References AST_BRIDGE_CHANNEL_STATE_WAIT, AST_BRIDGE_WRITE_FAILED, AST_BRIDGE_WRITE_SUCCESS, AST_LIST_FIRST, AST_LIST_LAST, ast_write(), ast_bridge_channel::chan, ast_bridge::channels, and ast_bridge_channel::state.

{
   struct ast_bridge_channel *other;

   if (AST_LIST_FIRST(&bridge->channels) == AST_LIST_LAST(&bridge->channels)) {
      return AST_BRIDGE_WRITE_FAILED;
   }

   if (!(other = (AST_LIST_FIRST(&bridge->channels) == bridge_channel ? AST_LIST_LAST(&bridge->channels) : AST_LIST_FIRST(&bridge->channels)))) {
      return AST_BRIDGE_WRITE_FAILED;
   }

   if (other->state == AST_BRIDGE_CHANNEL_STATE_WAIT) {
      ast_write(other->chan, frame);
   }

   return AST_BRIDGE_WRITE_SUCCESS;
}
static void multiplexed_nudge ( struct multiplexed_thread multiplexed_thread) [static]

Internal function which nudges the thread.

Definition at line 161 of file bridge_multiplexed.c.

References ast_log(), AST_PTHREADT_NULL, LOG_ERROR, multiplexed_thread::pipe, multiplexed_thread::thread, and multiplexed_thread::waiting.

Referenced by multiplexed_add_or_remove(), and multiplexed_bridge_destroy().

{
   int nudge = 0;

   if (multiplexed_thread->thread == AST_PTHREADT_NULL) {
      return;
   }

   if (write(multiplexed_thread->pipe[1], &nudge, sizeof(nudge)) != sizeof(nudge)) {
      ast_log(LOG_ERROR, "We couldn't poke multiplexed thread '%p'... something is VERY wrong\n", multiplexed_thread);
   }

   while (multiplexed_thread->waiting) {
      sched_yield();
   }

   return;
}
static void* multiplexed_thread_function ( void *  data) [static]

Thread function that executes for multiplexed threads.

Definition at line 204 of file bridge_multiplexed.c.

References ao2_lock, ao2_ref, ao2_trylock, ao2_unlock, ast_bridge_handle_trip(), ast_debug, ast_log(), AST_PTHREADT_NULL, AST_PTHREADT_STOP, ast_waitfor_nandfds(), ast_channel::bridge, multiplexed_thread::chans, errno, first, LOG_WARNING, multiplexed_thread::pipe, multiplexed_thread::service_count, stop, multiplexed_thread::thread, and multiplexed_thread::waiting.

Referenced by multiplexed_add_or_remove().

{
   struct multiplexed_thread *multiplexed_thread = data;
   int fds = multiplexed_thread->pipe[0];

   ao2_lock(multiplexed_thread);

   ast_debug(1, "Starting actual thread for multiplexed thread '%p'\n", multiplexed_thread);

   while (multiplexed_thread->thread != AST_PTHREADT_STOP) {
      struct ast_channel *winner = NULL, *first = multiplexed_thread->chans[0];
      int to = -1, outfd = -1;

      /* Move channels around so not just the first one gets priority */
      memmove(multiplexed_thread->chans, multiplexed_thread->chans + 1, sizeof(struct ast_channel *) * (multiplexed_thread->service_count - 1));
      multiplexed_thread->chans[multiplexed_thread->service_count - 1] = first;

      multiplexed_thread->waiting = 1;
      ao2_unlock(multiplexed_thread);
      winner = ast_waitfor_nandfds(multiplexed_thread->chans, multiplexed_thread->service_count, &fds, 1, NULL, &outfd, &to);
      multiplexed_thread->waiting = 0;
      ao2_lock(multiplexed_thread);
      if (multiplexed_thread->thread == AST_PTHREADT_STOP) {
         break;
      }

      if (outfd > -1) {
         int nudge;

         if (read(multiplexed_thread->pipe[0], &nudge, sizeof(nudge)) < 0) {
            if (errno != EINTR && errno != EAGAIN) {
               ast_log(LOG_WARNING, "read() failed for pipe on multiplexed thread '%p': %s\n", multiplexed_thread, strerror(errno));
            }
         }
      }
      if (winner && winner->bridge) {
         struct ast_bridge *bridge = winner->bridge;
         int stop = 0;
         ao2_unlock(multiplexed_thread);
         while ((bridge = winner->bridge) && ao2_trylock(bridge)) {
            sched_yield();
            if (multiplexed_thread->thread == AST_PTHREADT_STOP) {
               stop = 1;
               break;
            }
         }
         if (!stop && bridge) {
            ast_bridge_handle_trip(bridge, NULL, winner, -1);
            ao2_unlock(bridge);
         }
         ao2_lock(multiplexed_thread);
      }
   }

   multiplexed_thread->thread = AST_PTHREADT_NULL;

   ast_debug(1, "Stopping actual thread for multiplexed thread '%p'\n", multiplexed_thread);

   ao2_unlock(multiplexed_thread);
   ao2_ref(multiplexed_thread, -1);

   return NULL;
}
static int unload_module ( void  ) [static]

Variable Documentation

struct ast_module_info __mod_info = { .name = AST_MODULE, .flags = AST_MODFLAG_LOAD_ORDER , .description = "Multiplexed two channel bridging module" , .key = "This paragraph is copyright (c) 2006 by Digium, Inc. \In order for your module to load, it must return this \key via a function called \"key\". Any code which \includes this paragraph must be licensed under the GNU \General Public License version 2 or later (at your \option). In addition to Digium's general reservations \of rights, Digium expressly reserves the right to \allow other parties to license this paragraph under \different terms. Any use of Digium, Inc. trademarks or \logos (including \"Asterisk\" or \"Digium\") without \express written permission of Digium, Inc. is prohibited.\n" , .buildopt_sum = AST_BUILDOPT_SUM, .load = load_module, .unload = unload_module, .load_pri = AST_MODPRI_DEFAULT, } [static]

Definition at line 425 of file bridge_multiplexed.c.

Definition at line 425 of file bridge_multiplexed.c.

Definition at line 393 of file bridge_multiplexed.c.

Container of all operating multiplexed threads.

Definition at line 73 of file bridge_multiplexed.c.