Fri Jul 15 2011 12:02:34

Asterisk developer's documentation


pbx_spool.c File Reference

Full-featured outgoing call spool support. More...

#include "asterisk.h"
#include <sys/stat.h>
#include <time.h>
#include <utime.h>
#include <dirent.h>
#include "asterisk/paths.h"
#include "asterisk/lock.h"
#include "asterisk/file.h"
#include "asterisk/logger.h"
#include "asterisk/channel.h"
#include "asterisk/callerid.h"
#include "asterisk/pbx.h"
#include "asterisk/module.h"
#include "asterisk/utils.h"
#include "asterisk/options.h"
Include dependency graph for pbx_spool.c:

Go to the source code of this file.

Data Structures

struct  outgoing

Enumerations

enum  { SPOOL_FLAG_ALWAYS_DELETE = (1 << 0), SPOOL_FLAG_ARCHIVE = (1 << 1) }

Functions

static void __reg_module (void)
static void __unreg_module (void)
static int apply_outgoing (struct outgoing *o, char *fn, FILE *f)
static void * attempt_thread (void *data)
static void free_outgoing (struct outgoing *o)
static int init_outgoing (struct outgoing *o)
static void launch_service (struct outgoing *o)
static int load_module (void)
static int remove_from_queue (struct outgoing *o, const char *status)
 Remove a call file from the outgoing queue optionally moving it in the archive dir.
static void safe_append (struct outgoing *o, time_t now, char *s)
static int scan_service (char *fn, time_t now, time_t atime)
static void * scan_thread (void *unused)
static int unload_module (void)

Variables

static struct ast_module_info __mod_info = { .name = AST_MODULE, .flags = AST_MODFLAG_DEFAULT , .description = "Outgoing Spool Support" , .key = "This paragraph is copyright (c) 2006 by Digium, Inc. \In order for your module to load, it must return this \key via a function called \"key\". Any code which \includes this paragraph must be licensed under the GNU \General Public License version 2 or later (at your \option). In addition to Digium's general reservations \of rights, Digium expressly reserves the right to \allow other parties to license this paragraph under \different terms. Any use of Digium, Inc. trademarks or \logos (including \"Asterisk\" or \"Digium\") without \express written permission of Digium, Inc. is prohibited.\n" , .buildopt_sum = AST_BUILDOPT_SUM, .load = load_module, .unload = unload_module, }
static struct ast_module_infoast_module_info = &__mod_info
static char qdir [255]
static char qdonedir [255]

Detailed Description

Full-featured outgoing call spool support.

Definition in file pbx_spool.c.


Enumeration Type Documentation

anonymous enum
Enumerator:
SPOOL_FLAG_ALWAYS_DELETE 

Always delete the call file after a call succeeds or the maximum number of retries is exceeded, even if the modification time of the call file is in the future.

SPOOL_FLAG_ARCHIVE 

Definition at line 50 of file pbx_spool.c.

     {
   /*! Always delete the call file after a call succeeds or the
    * maximum number of retries is exceeded, even if the
    * modification time of the call file is in the future.
    */
   SPOOL_FLAG_ALWAYS_DELETE = (1 << 0),
   /* Don't unlink the call file after processing, move in qdonedir */
   SPOOL_FLAG_ARCHIVE = (1 << 1)
};

Function Documentation

static void __reg_module ( void  ) [static]

Definition at line 532 of file pbx_spool.c.

static void __unreg_module ( void  ) [static]

Definition at line 532 of file pbx_spool.c.

static int apply_outgoing ( struct outgoing o,
char *  fn,
FILE *  f 
) [static]

Definition at line 110 of file pbx_spool.c.

References outgoing::app, app, ast_callerid_split(), ast_log(), ast_parse_allow_disallow(), ast_set2_flag, ast_string_field_set, ast_strlen_zero(), ast_true(), ast_variable_new(), buf, outgoing::callingpid, cid_name, cid_num, context, outgoing::dest, outgoing::exten, exten, outgoing::format, last, LOG_NOTICE, LOG_WARNING, outgoing::maxretries, ast_variable::next, outgoing::options, outgoing::priority, outgoing::retries, outgoing::retrytime, SPOOL_FLAG_ALWAYS_DELETE, SPOOL_FLAG_ARCHIVE, strsep(), outgoing::tech, var, outgoing::vars, and outgoing::waittime.

Referenced by scan_service().

{
   char buf[256];
   char *c, *c2;
   int lineno = 0;
   struct ast_variable *var, *last = o->vars;

   while (last && last->next) {
      last = last->next;
   }

   while(fgets(buf, sizeof(buf), f)) {
      lineno++;
      /* Trim comments */
      c = buf;
      while ((c = strchr(c, '#'))) {
         if ((c == buf) || (*(c-1) == ' ') || (*(c-1) == '\t'))
            *c = '\0';
         else
            c++;
      }

      c = buf;
      while ((c = strchr(c, ';'))) {
         if ((c > buf) && (c[-1] == '\\')) {
            memmove(c - 1, c, strlen(c) + 1);
            c++;
         } else {
            *c = '\0';
            break;
         }
      }

      /* Trim trailing white space */
      while(!ast_strlen_zero(buf) && buf[strlen(buf) - 1] < 33)
         buf[strlen(buf) - 1] = '\0';
      if (!ast_strlen_zero(buf)) {
         c = strchr(buf, ':');
         if (c) {
            *c = '\0';
            c++;
            while ((*c) && (*c < 33))
               c++;
#if 0
            printf("'%s' is '%s' at line %d\n", buf, c, lineno);
#endif
            if (!strcasecmp(buf, "channel")) {
               if ((c2 = strchr(c, '/'))) {
                  *c2 = '\0';
                  c2++;
                  ast_string_field_set(o, tech, c);
                  ast_string_field_set(o, dest, c2);
               } else {
                  ast_log(LOG_NOTICE, "Channel should be in form Tech/Dest at line %d of %s\n", lineno, fn);
               }
            } else if (!strcasecmp(buf, "callerid")) {
               char cid_name[80] = {0}, cid_num[80] = {0};
               ast_callerid_split(c, cid_name, sizeof(cid_name), cid_num, sizeof(cid_num));
               ast_string_field_set(o, cid_num, cid_num);
               ast_string_field_set(o, cid_name, cid_name);
            } else if (!strcasecmp(buf, "application")) {
               ast_string_field_set(o, app, c);
            } else if (!strcasecmp(buf, "data")) {
               ast_string_field_set(o, data, c);
            } else if (!strcasecmp(buf, "maxretries")) {
               if (sscanf(c, "%30d", &o->maxretries) != 1) {
                  ast_log(LOG_WARNING, "Invalid max retries at line %d of %s\n", lineno, fn);
                  o->maxretries = 0;
               }
            } else if (!strcasecmp(buf, "codecs")) {
               ast_parse_allow_disallow(NULL, &o->format, c, 1);
            } else if (!strcasecmp(buf, "context")) {
               ast_string_field_set(o, context, c);
            } else if (!strcasecmp(buf, "extension")) {
               ast_string_field_set(o, exten, c);
            } else if (!strcasecmp(buf, "priority")) {
               if ((sscanf(c, "%30d", &o->priority) != 1) || (o->priority < 1)) {
                  ast_log(LOG_WARNING, "Invalid priority at line %d of %s\n", lineno, fn);
                  o->priority = 1;
               }
            } else if (!strcasecmp(buf, "retrytime")) {
               if ((sscanf(c, "%30d", &o->retrytime) != 1) || (o->retrytime < 1)) {
                  ast_log(LOG_WARNING, "Invalid retrytime at line %d of %s\n", lineno, fn);
                  o->retrytime = 300;
               }
            } else if (!strcasecmp(buf, "waittime")) {
               if ((sscanf(c, "%30d", &o->waittime) != 1) || (o->waittime < 1)) {
                  ast_log(LOG_WARNING, "Invalid waittime at line %d of %s\n", lineno, fn);
                  o->waittime = 45;
               }
            } else if (!strcasecmp(buf, "retry")) {
               o->retries++;
            } else if (!strcasecmp(buf, "startretry")) {
               if (sscanf(c, "%30ld", &o->callingpid) != 1) {
                  ast_log(LOG_WARNING, "Unable to retrieve calling PID!\n");
                  o->callingpid = 0;
               }
            } else if (!strcasecmp(buf, "endretry") || !strcasecmp(buf, "abortretry")) {
               o->callingpid = 0;
               o->retries++;
            } else if (!strcasecmp(buf, "delayedretry")) {
            } else if (!strcasecmp(buf, "setvar") || !strcasecmp(buf, "set")) {
               c2 = c;
               strsep(&c2, "=");
               if (c2) {
                  var = ast_variable_new(c, c2, fn);
                  if (var) {
                     /* Always insert at the end, because some people want to treat the spool file as a script */
                     if (last) {
                        last->next = var;
                     } else {
                        o->vars = var;
                     }
                     last = var;
                  }
               } else
                  ast_log(LOG_WARNING, "Malformed \"%s\" argument.  Should be \"%s: variable=value\"\n", buf, buf);
            } else if (!strcasecmp(buf, "account")) {
               ast_string_field_set(o, account, c);
            } else if (!strcasecmp(buf, "alwaysdelete")) {
               ast_set2_flag(&o->options, ast_true(c), SPOOL_FLAG_ALWAYS_DELETE);
            } else if (!strcasecmp(buf, "archive")) {
               ast_set2_flag(&o->options, ast_true(c), SPOOL_FLAG_ARCHIVE);
            } else {
               ast_log(LOG_WARNING, "Unknown keyword '%s' at line %d of %s\n", buf, lineno, fn);
            }
         } else
            ast_log(LOG_NOTICE, "Syntax error at line %d of %s\n", lineno, fn);
      }
   }
   ast_string_field_set(o, fn, fn);
   if (ast_strlen_zero(o->tech) || ast_strlen_zero(o->dest) || (ast_strlen_zero(o->app) && ast_strlen_zero(o->exten))) {
      ast_log(LOG_WARNING, "At least one of app or extension must be specified, along with tech and dest in file %s\n", fn);
      return -1;
   }
   return 0;
}
static void* attempt_thread ( void *  data) [static]

Definition at line 325 of file pbx_spool.c.

References outgoing::account, outgoing::app, ast_channel_reason2str(), ast_log(), ast_pbx_outgoing_app(), ast_pbx_outgoing_exten(), ast_strlen_zero(), ast_verb, outgoing::cid_name, outgoing::cid_num, outgoing::context, outgoing::data, outgoing::dest, outgoing::exten, outgoing::format, free_outgoing(), LOG_EVENT, LOG_NOTICE, outgoing::maxretries, outgoing::priority, remove_from_queue(), outgoing::retries, safe_append(), outgoing::tech, outgoing::vars, and outgoing::waittime.

Referenced by launch_service().

{
   struct outgoing *o = data;
   int res, reason;
   if (!ast_strlen_zero(o->app)) {
      ast_verb(3, "Attempting call on %s/%s for application %s(%s) (Retry %d)\n", o->tech, o->dest, o->app, o->data, o->retries);
      res = ast_pbx_outgoing_app(o->tech, o->format, (void *) o->dest, o->waittime * 1000, o->app, o->data, &reason, 2 /* wait to finish */, o->cid_num, o->cid_name, o->vars, o->account, NULL);
      o->vars = NULL;
   } else {
      ast_verb(3, "Attempting call on %s/%s for %s@%s:%d (Retry %d)\n", o->tech, o->dest, o->exten, o->context,o->priority, o->retries);
      res = ast_pbx_outgoing_exten(o->tech, o->format, (void *) o->dest, o->waittime * 1000, o->context, o->exten, o->priority, &reason, 2 /* wait to finish */, o->cid_num, o->cid_name, o->vars, o->account, NULL);
      o->vars = NULL;
   }
   if (res) {
      ast_log(LOG_NOTICE, "Call failed to go through, reason (%d) %s\n", reason, ast_channel_reason2str(reason));
      if (o->retries >= o->maxretries + 1) {
         /* Max retries exceeded */
         ast_log(LOG_EVENT, "Queued call to %s/%s expired without completion after %d attempt%s\n", o->tech, o->dest, o->retries - 1, ((o->retries - 1) != 1) ? "s" : "");
         remove_from_queue(o, "Expired");
      } else {
         /* Notate that the call is still active */
         safe_append(o, time(NULL), "EndRetry");
      }
   } else {
      ast_log(LOG_NOTICE, "Call completed to %s/%s\n", o->tech, o->dest);
      ast_log(LOG_EVENT, "Queued call to %s/%s completed\n", o->tech, o->dest);
      remove_from_queue(o, "Completed");
   }
   free_outgoing(o);
   return NULL;
}
static void free_outgoing ( struct outgoing o) [static]
static int init_outgoing ( struct outgoing o) [static]
static void launch_service ( struct outgoing o) [static]

Definition at line 357 of file pbx_spool.c.

References ast_log(), ast_pthread_create_detached, attempt_thread(), free_outgoing(), and LOG_WARNING.

Referenced by scan_service().

{
   pthread_t t;
   int ret;

   if ((ret = ast_pthread_create_detached(&t, NULL, attempt_thread, o))) {
      ast_log(LOG_WARNING, "Unable to create thread :( (returned error: %d)\n", ret);
      free_outgoing(o);
   }
}
static int load_module ( void  ) [static]

Definition at line 513 of file pbx_spool.c.

References ast_config_AST_SPOOL_DIR, ast_log(), ast_mkdir(), AST_MODULE_LOAD_DECLINE, AST_MODULE_LOAD_FAILURE, AST_MODULE_LOAD_SUCCESS, ast_pthread_create_detached_background, LOG_WARNING, scan_thread(), and thread.

{
   pthread_t thread;
   int ret;
   snprintf(qdir, sizeof(qdir), "%s/%s", ast_config_AST_SPOOL_DIR, "outgoing");
   if (ast_mkdir(qdir, 0777)) {
      ast_log(LOG_WARNING, "Unable to create queue directory %s -- outgoing spool disabled\n", qdir);
      return AST_MODULE_LOAD_DECLINE;
   }
   snprintf(qdonedir, sizeof(qdir), "%s/%s", ast_config_AST_SPOOL_DIR, "outgoing_done");

   if ((ret = ast_pthread_create_detached_background(&thread, NULL, scan_thread, NULL))) {
      ast_log(LOG_WARNING, "Unable to create thread :( (returned error: %d)\n", ret);
      return AST_MODULE_LOAD_FAILURE;
   }

   return AST_MODULE_LOAD_SUCCESS;
}
static int remove_from_queue ( struct outgoing o,
const char *  status 
) [static]

Remove a call file from the outgoing queue optionally moving it in the archive dir.

Parameters:
othe pointer to outgoing struct
statusthe exit status of the call. Can be "Completed", "Failed" or "Expired"

Definition at line 276 of file pbx_spool.c.

References ast_log(), ast_mkdir(), ast_test_flag, f, outgoing::fn, LOG_WARNING, outgoing::options, SPOOL_FLAG_ALWAYS_DELETE, and SPOOL_FLAG_ARCHIVE.

Referenced by attempt_thread(), and scan_service().

{
   int fd;
   FILE *f;
   char newfn[256];
   const char *bname;

   if (!ast_test_flag(&o->options, SPOOL_FLAG_ALWAYS_DELETE)) {
      struct stat current_file_status;

      if (!stat(o->fn, &current_file_status)) {
         if (time(NULL) < current_file_status.st_mtime)
            return 0;
      }
   }

   if (!ast_test_flag(&o->options, SPOOL_FLAG_ARCHIVE)) {
      unlink(o->fn);
      return 0;
   }

   if (ast_mkdir(qdonedir, 0777)) {
      ast_log(LOG_WARNING, "Unable to create queue directory %s -- outgoing spool archiving disabled\n", qdonedir);
      unlink(o->fn);
      return -1;
   }

   if ((fd = open(o->fn, O_WRONLY | O_APPEND))) {
      if ((f = fdopen(fd, "a"))) {
         fprintf(f, "Status: %s\n", status);
         fclose(f);
      } else
         close(fd);
   }

   if (!(bname = strrchr(o->fn, '/')))
      bname = o->fn;
   else
      bname++; 
   snprintf(newfn, sizeof(newfn), "%s/%s", qdonedir, bname);
   /* a existing call file the archive dir is overwritten */
   unlink(newfn);
   if (rename(o->fn, newfn) != 0) {
      unlink(o->fn);
      return -1;
   } else
      return 0;
}
static void safe_append ( struct outgoing o,
time_t  now,
char *  s 
) [static]

Definition at line 248 of file pbx_spool.c.

References ast_log(), ast_mainpid, errno, f, outgoing::fn, LOG_WARNING, outgoing::retries, and outgoing::retrytime.

Referenced by attempt_thread(), and scan_service().

{
   int fd;
   FILE *f;
   struct utimbuf tbuf;

   if ((fd = open(o->fn, O_WRONLY | O_APPEND)) < 0)
      return;

   if ((f = fdopen(fd, "a"))) {
      fprintf(f, "\n%s: %ld %d (%ld)\n", s, (long)ast_mainpid, o->retries, (long) now);
      fclose(f);
   } else
      close(fd);

   /* Update the file time */
   tbuf.actime = now;
   tbuf.modtime = now + o->retrytime;
   if (utime(o->fn, &tbuf))
      ast_log(LOG_WARNING, "Unable to set utime on %s: %s\n", o->fn, strerror(errno));
}
static int scan_service ( char *  fn,
time_t  now,
time_t  atime 
) [static]

Definition at line 368 of file pbx_spool.c.

References apply_outgoing(), ast_calloc, ast_free, ast_log(), ast_mainpid, outgoing::callingpid, outgoing::dest, errno, f, outgoing::fn, free_outgoing(), init_outgoing(), launch_service(), LOG_DEBUG, LOG_EVENT, LOG_WARNING, outgoing::maxretries, remove_from_queue(), outgoing::retries, outgoing::retrytime, safe_append(), and outgoing::tech.

Referenced by scan_thread().

{
   struct outgoing *o = NULL;
   FILE *f;
   int res = 0;

   if (!(o = ast_calloc(1, sizeof(*o)))) {
      ast_log(LOG_WARNING, "Out of memory ;(\n");
      return -1;
   }
   
   if (init_outgoing(o)) {
      /* No need to call free_outgoing here since we know the failure
       * was to allocate string fields and no variables have been allocated
       * yet.
       */
      ast_free(o);
      return -1;
   }

   /* Attempt to open the file */
   if (!(f = fopen(fn, "r+"))) {
      remove_from_queue(o, "Failed");
      free_outgoing(o);
      ast_log(LOG_WARNING, "Unable to open %s: %s, deleting\n", fn, strerror(errno));
      return -1;
   }

   /* Read in and verify the contents */
   if (apply_outgoing(o, fn, f)) {
      remove_from_queue(o, "Failed");
      free_outgoing(o);
      ast_log(LOG_WARNING, "Invalid file contents in %s, deleting\n", fn);
      fclose(f);
      return -1;
   }
   
#if 0
   printf("Filename: %s, Retries: %d, max: %d\n", fn, o->retries, o->maxretries);
#endif
   fclose(f);
   if (o->retries <= o->maxretries) {
      now += o->retrytime;
      if (o->callingpid && (o->callingpid == ast_mainpid)) {
         safe_append(o, time(NULL), "DelayedRetry");
         ast_log(LOG_DEBUG, "Delaying retry since we're currently running '%s'\n", o->fn);
         free_outgoing(o);
      } else {
         /* Increment retries */
         o->retries++;
         /* If someone else was calling, they're presumably gone now
            so abort their retry and continue as we were... */
         if (o->callingpid)
            safe_append(o, time(NULL), "AbortRetry");
         
         safe_append(o, now, "StartRetry");
         launch_service(o);
      }
      res = now;
   } else {
      ast_log(LOG_EVENT, "Queued call to %s/%s expired without completion after %d attempt%s\n", o->tech, o->dest, o->retries - 1, ((o->retries - 1) != 1) ? "s" : "");
      remove_from_queue(o, "Expired");
      free_outgoing(o);
   }

   return res;
}
static void* scan_thread ( void *  unused) [static]

Definition at line 436 of file pbx_spool.c.

References ast_fully_booted, ast_log(), dir, errno, last, LOG_WARNING, and scan_service().

Referenced by load_module().

{
   struct stat st;
   DIR *dir;
   struct dirent *de;
   char fn[256];
   int res;
   time_t last = 0, next = 0, now;
   struct timespec ts = { .tv_sec = 1 };
  
   while (!ast_fully_booted) {
      nanosleep(&ts, NULL);
   }

   for(;;) {
      /* Wait a sec */
      nanosleep(&ts, NULL);
      time(&now);

      if (stat(qdir, &st)) {
         ast_log(LOG_WARNING, "Unable to stat %s\n", qdir);
         continue;
      }

      /* Make sure it is time for us to execute our check */
      if ((st.st_mtime == last) && (next && (next > now)))
         continue;
      
#if 0
      printf("atime: %ld, mtime: %ld, ctime: %ld\n", st.st_atime, st.st_mtime, st.st_ctime);
      printf("Ooh, something changed / timeout\n");
#endif
      next = 0;
      last = st.st_mtime;

      if (!(dir = opendir(qdir))) {
         ast_log(LOG_WARNING, "Unable to open directory %s: %s\n", qdir, strerror(errno));
         continue;
      }

      while ((de = readdir(dir))) {
         snprintf(fn, sizeof(fn), "%s/%s", qdir, de->d_name);
         if (stat(fn, &st)) {
            ast_log(LOG_WARNING, "Unable to stat %s: %s\n", fn, strerror(errno));
            continue;
         }
         if (!S_ISREG(st.st_mode))
            continue;
         if (st.st_mtime <= now) {
            res = scan_service(fn, now, st.st_atime);
            if (res > 0) {
               /* Update next service time */
               if (!next || (res < next)) {
                  next = res;
               }
            } else if (res) {
               ast_log(LOG_WARNING, "Failed to scan service '%s'\n", fn);
            } else if (!next) {
               /* Expired entry: must recheck on the next go-around */
               next = st.st_mtime;
            }
         } else {
            /* Update "next" update if necessary */
            if (!next || (st.st_mtime < next))
               next = st.st_mtime;
         }
      }
      closedir(dir);
   }
   return NULL;
}
static int unload_module ( void  ) [static]

Definition at line 508 of file pbx_spool.c.

{
   return -1;
}

Variable Documentation

struct ast_module_info __mod_info = { .name = AST_MODULE, .flags = AST_MODFLAG_DEFAULT , .description = "Outgoing Spool Support" , .key = "This paragraph is copyright (c) 2006 by Digium, Inc. \In order for your module to load, it must return this \key via a function called \"key\". Any code which \includes this paragraph must be licensed under the GNU \General Public License version 2 or later (at your \option). In addition to Digium's general reservations \of rights, Digium expressly reserves the right to \allow other parties to license this paragraph under \different terms. Any use of Digium, Inc. trademarks or \logos (including \"Asterisk\" or \"Digium\") without \express written permission of Digium, Inc. is prohibited.\n" , .buildopt_sum = AST_BUILDOPT_SUM, .load = load_module, .unload = unload_module, } [static]

Definition at line 532 of file pbx_spool.c.

Definition at line 532 of file pbx_spool.c.

char qdir[255] [static]

Definition at line 60 of file pbx_spool.c.

char qdonedir[255] [static]

Definition at line 61 of file pbx_spool.c.