Sat Apr 26 2014 22:01:38

Asterisk developer's documentation


message.c
Go to the documentation of this file.
00001 /*
00002  * Asterisk -- An open source telephony toolkit.
00003  *
00004  * Copyright (C) 2010, Digium, Inc.
00005  *
00006  * Russell Bryant <russell@digium.com>
00007  *
00008  * See http://www.asterisk.org for more information about
00009  * the Asterisk project. Please do not directly contact
00010  * any of the maintainers of this project for assistance;
00011  * the project provides a web site, mailing lists and IRC
00012  * channels for your use.
00013  *
00014  * This program is free software, distributed under the terms of
00015  * the GNU General Public License Version 2. See the LICENSE file
00016  * at the top of the source tree.
00017  */
00018 
00019 /*! \file
00020  *
00021  * \brief Out-of-call text message support
00022  *
00023  * \author Russell Bryant <russell@digium.com>
00024  */
00025 
00026 /*** MODULEINFO
00027    <support_level>core</support_level>
00028  ***/
00029 
00030 #include "asterisk.h"
00031 
00032 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 411314 $")
00033 
00034 #include "asterisk/_private.h"
00035 
00036 #include "asterisk/module.h"
00037 #include "asterisk/datastore.h"
00038 #include "asterisk/pbx.h"
00039 #include "asterisk/manager.h"
00040 #include "asterisk/strings.h"
00041 #include "asterisk/astobj2.h"
00042 #include "asterisk/app.h"
00043 #include "asterisk/taskprocessor.h"
00044 #include "asterisk/message.h"
00045 
00046 /*** DOCUMENTATION
00047    <function name="MESSAGE" language="en_US">
00048       <synopsis>
00049          Create a message or read fields from a message.
00050       </synopsis>
00051       <syntax argsep="/">
00052          <parameter name="argument" required="true">
00053          <para>Field of the message to get or set.</para>
00054          <enumlist>
00055             <enum name="to">
00056                <para>Read-only.  The destination of the message.  When processing an
00057                incoming message, this will be set to the destination listed as
00058                the recipient of the message that was received by Asterisk.</para>
00059             </enum>
00060             <enum name="from">
00061                <para>Read-only.  The source of the message.  When processing an
00062                incoming message, this will be set to the source of the message.</para>
00063             </enum>
00064             <enum name="custom_data">
00065                <para>Write-only.  Mark or unmark all message headers for an outgoing
00066                message.  The following values can be set:</para>
00067                <enumlist>
00068                   <enum name="mark_all_outbound">
00069                      <para>Mark all headers for an outgoing message.</para>
00070                   </enum>
00071                   <enum name="clear_all_outbound">
00072                      <para>Unmark all headers for an outgoing message.</para>
00073                   </enum>
00074                </enumlist>
00075             </enum>
00076             <enum name="body">
00077                <para>Read/Write.  The message body.  When processing an incoming
00078                message, this includes the body of the message that Asterisk
00079                received.  When MessageSend() is executed, the contents of this
00080                field are used as the body of the outgoing message.  The body
00081                will always be UTF-8.</para>
00082             </enum>
00083          </enumlist>
00084          </parameter>
00085       </syntax>
00086       <description>
00087          <para>This function will read from or write a value to a text message.
00088          It is used both to read the data out of an incoming message, as well as
00089          modify or create a message that will be sent outbound.</para>
00090       </description>
00091       <see-also>
00092          <ref type="application">MessageSend</ref>
00093       </see-also>
00094    </function>
00095    <function name="MESSAGE_DATA" language="en_US">
00096       <synopsis>
00097          Read or write custom data attached to a message.
00098       </synopsis>
00099       <syntax argsep="/">
00100          <parameter name="argument" required="true">
00101          <para>Field of the message to get or set.</para>
00102          </parameter>
00103       </syntax>
00104       <description>
00105          <para>This function will read from or write a value to a text message.
00106          It is used both to read the data out of an incoming message, as well as
00107          modify a message that will be sent outbound.</para>
00108          <note>
00109             <para>If you want to set an outbound message to carry data in the
00110             current message, do
00111             Set(MESSAGE_DATA(<replaceable>key</replaceable>)=${MESSAGE_DATA(<replaceable>key</replaceable>)}).</para>
00112          </note>
00113       </description>
00114       <see-also>
00115          <ref type="application">MessageSend</ref>
00116       </see-also>
00117    </function>
00118    <application name="MessageSend" language="en_US">
00119       <synopsis>
00120          Send a text message.
00121       </synopsis>
00122       <syntax>
00123          <parameter name="to" required="true">
00124             <para>A To URI for the message.</para>
00125             <xi:include xpointer="xpointer(/docs/info[@name='SIPMessageToInfo'])" />
00126             <xi:include xpointer="xpointer(/docs/info[@name='XMPPMessageToInfo'])" />
00127          </parameter>
00128          <parameter name="from" required="false">
00129             <para>A From URI for the message if needed for the
00130             message technology being used to send this message.</para>
00131             <xi:include xpointer="xpointer(/docs/info[@name='SIPMessageFromInfo'])" />
00132             <xi:include xpointer="xpointer(/docs/info[@name='XMPPMessageFromInfo'])" />
00133          </parameter>
00134       </syntax>
00135       <description>
00136          <para>Send a text message.  The body of the message that will be
00137          sent is what is currently set to <literal>MESSAGE(body)</literal>.
00138            The technology chosen for sending the message is determined
00139          based on a prefix to the <literal>to</literal> parameter.</para>
00140          <para>This application sets the following channel variables:</para>
00141          <variablelist>
00142             <variable name="MESSAGE_SEND_STATUS">
00143                <para>This is the message delivery status returned by this application.</para>
00144                <value name="INVALID_PROTOCOL">
00145                   No handler for the technology part of the URI was found.
00146                </value>
00147                <value name="INVALID_URI">
00148                   The protocol handler reported that the URI was not valid.
00149                </value>
00150                <value name="SUCCESS">
00151                   Successfully passed on to the protocol handler, but delivery has not necessarily been guaranteed.
00152                </value>
00153                <value name="FAILURE">
00154                   The protocol handler reported that it was unabled to deliver the message for some reason.
00155                </value>
00156             </variable>
00157          </variablelist>
00158       </description>
00159    </application>
00160    <manager name="MessageSend" language="en_US">
00161       <synopsis>
00162          Send an out of call message to an endpoint.
00163       </synopsis>
00164       <syntax>
00165          <xi:include xpointer="xpointer(/docs/manager[@name='Login']/syntax/parameter[@name='ActionID'])" />
00166          <parameter name="To" required="true">
00167             <para>The URI the message is to be sent to.</para>
00168             <xi:include xpointer="xpointer(/docs/info[@name='SIPMessageToInfo'])" />
00169             <xi:include xpointer="xpointer(/docs/info[@name='XMPPMessageToInfo'])" />
00170          </parameter>
00171          <parameter name="From">
00172             <para>A From URI for the message if needed for the
00173             message technology being used to send this message.</para>
00174             <xi:include xpointer="xpointer(/docs/info[@name='SIPMessageFromInfo'])" />
00175             <xi:include xpointer="xpointer(/docs/info[@name='XMPPMessageFromInfo'])" />
00176          </parameter>
00177          <parameter name="Body">
00178             <para>The message body text.  This must not contain any newlines as that
00179             conflicts with the AMI protocol.</para>
00180          </parameter>
00181          <parameter name="Base64Body">
00182             <para>Text bodies requiring the use of newlines have to be base64 encoded
00183             in this field.  Base64Body will be decoded before being sent out.
00184             Base64Body takes precedence over Body.</para>
00185          </parameter>
00186          <parameter name="Variable">
00187             <para>Message variable to set, multiple Variable: headers are
00188             allowed.  The header value is a comma separated list of
00189             name=value pairs.</para>
00190          </parameter>
00191       </syntax>
00192    </manager>
00193  ***/
00194 
00195 struct msg_data {
00196    AST_DECLARE_STRING_FIELDS(
00197       AST_STRING_FIELD(name);
00198       AST_STRING_FIELD(value);
00199    );
00200    unsigned int send:1; /* Whether to send out on outbound messages */
00201 };
00202 
00203 AST_LIST_HEAD_NOLOCK(outhead, msg_data);
00204 
00205 /*!
00206  * \brief A message.
00207  *
00208  * \todo Consider whether stringfields would be an appropriate optimization here.
00209  */
00210 struct ast_msg {
00211    struct ast_str *to;
00212    struct ast_str *from;
00213    struct ast_str *body;
00214    struct ast_str *context;
00215    struct ast_str *exten;
00216    struct ao2_container *vars;
00217 };
00218 
00219 struct ast_msg_tech_holder {
00220    const struct ast_msg_tech *tech;
00221    /*!
00222     * \brief A rwlock for this object
00223     *
00224     * a read/write lock must be used to protect the wrapper instead
00225     * of the ao2 lock. A rdlock must be held to read tech_holder->tech.
00226     */
00227    ast_rwlock_t tech_lock;
00228 };
00229 
00230 static struct ao2_container *msg_techs;
00231 
00232 static struct ast_taskprocessor *msg_q_tp;
00233 
00234 static const char app_msg_send[] = "MessageSend";
00235 
00236 static void msg_ds_destroy(void *data);
00237 
00238 static const struct ast_datastore_info msg_datastore = {
00239    .type = "message",
00240    .destroy = msg_ds_destroy,
00241 };
00242 
00243 static int msg_func_read(struct ast_channel *chan, const char *function,
00244       char *data, char *buf, size_t len);
00245 static int msg_func_write(struct ast_channel *chan, const char *function,
00246       char *data, const char *value);
00247 
00248 static struct ast_custom_function msg_function = {
00249    .name = "MESSAGE",
00250    .read = msg_func_read,
00251    .write = msg_func_write,
00252 };
00253 
00254 static int msg_data_func_read(struct ast_channel *chan, const char *function,
00255       char *data, char *buf, size_t len);
00256 static int msg_data_func_write(struct ast_channel *chan, const char *function,
00257       char *data, const char *value);
00258 
00259 static struct ast_custom_function msg_data_function = {
00260    .name = "MESSAGE_DATA",
00261    .read = msg_data_func_read,
00262    .write = msg_data_func_write,
00263 };
00264 
00265 static struct ast_frame *chan_msg_read(struct ast_channel *chan);
00266 static int chan_msg_write(struct ast_channel *chan, struct ast_frame *fr);
00267 static int chan_msg_indicate(struct ast_channel *chan, int condition,
00268       const void *data, size_t datalen);
00269 static int chan_msg_send_digit_begin(struct ast_channel *chan, char digit);
00270 static int chan_msg_send_digit_end(struct ast_channel *chan, char digit,
00271       unsigned int duration);
00272 
00273 /*!
00274  * \internal
00275  * \brief A bare minimum channel technology
00276  *
00277  * This will not be registered as we never want anything to try
00278  * to create Message channels other than internally in this file.
00279  */
00280 static const struct ast_channel_tech msg_chan_tech_hack = {
00281    .type             = "Message",
00282    .description      = "Internal Text Message Processing",
00283    .read             = chan_msg_read,
00284    .write            = chan_msg_write,
00285    .indicate         = chan_msg_indicate,
00286    .send_digit_begin = chan_msg_send_digit_begin,
00287    .send_digit_end   = chan_msg_send_digit_end,
00288 };
00289 
00290 /*!
00291  * \internal
00292  * \brief ast_channel_tech read callback
00293  *
00294  * This should never be called.  However, we say that about chan_iax2's
00295  * read callback, too, and it seems to randomly get called for some
00296  * reason.  If it does, a simple NULL frame will suffice.
00297  */
00298 static struct ast_frame *chan_msg_read(struct ast_channel *chan)
00299 {
00300    return &ast_null_frame;
00301 }
00302 
00303 /*!
00304  * \internal
00305  * \brief ast_channel_tech write callback
00306  *
00307  * Throw all frames away.  We don't care about any of them.
00308  */
00309 static int chan_msg_write(struct ast_channel *chan, struct ast_frame *fr)
00310 {
00311    return 0;
00312 }
00313 
00314 /*!
00315  * \internal
00316  * \brief ast_channel_tech indicate callback
00317  *
00318  * The indicate callback is here just so it can return success.
00319  * We don't want any callers of ast_indicate() to think something
00320  * has failed.  We also don't want ast_indicate() itself to try
00321  * to generate inband tones since we didn't tell it that we took
00322  * care of it ourselves.
00323  */
00324 static int chan_msg_indicate(struct ast_channel *chan, int condition,
00325       const void *data, size_t datalen)
00326 {
00327    return 0;
00328 }
00329 
00330 /*!
00331  * \internal
00332  * \brief ast_channel_tech send_digit_begin callback
00333  *
00334  * This is here so that just in case a digit comes at a message channel
00335  * that the Asterisk core doesn't waste any time trying to generate
00336  * inband DTMF in audio.  It's a waste of resources.
00337  */
00338 static int chan_msg_send_digit_begin(struct ast_channel *chan, char digit)
00339 {
00340    return 0;
00341 }
00342 
00343 /*!
00344  * \internal
00345  * \brief ast_channel_tech send_digit_end callback
00346  *
00347  * This is here so that just in case a digit comes at a message channel
00348  * that the Asterisk core doesn't waste any time trying to generate
00349  * inband DTMF in audio.  It's a waste of resources.
00350  */
00351 static int chan_msg_send_digit_end(struct ast_channel *chan, char digit,
00352       unsigned int duration)
00353 {
00354    return 0;
00355 }
00356 
00357 static void msg_ds_destroy(void *data)
00358 {
00359    struct ast_msg *msg = data;
00360 
00361    ao2_ref(msg, -1);
00362 }
00363 
00364 static int msg_data_hash_fn(const void *obj, const int flags)
00365 {
00366    const struct msg_data *data = obj;
00367    return ast_str_case_hash(data->name);
00368 }
00369 
00370 static int msg_data_cmp_fn(void *obj, void *arg, int flags)
00371 {
00372    const struct msg_data *one = obj, *two = arg;
00373    return !strcasecmp(one->name, two->name) ? CMP_MATCH | CMP_STOP : 0;
00374 }
00375 
00376 static void msg_data_destructor(void *obj)
00377 {
00378    struct msg_data *data = obj;
00379    ast_string_field_free_memory(data);
00380 }
00381 
00382 static void msg_destructor(void *obj)
00383 {
00384    struct ast_msg *msg = obj;
00385 
00386    ast_free(msg->to);
00387    msg->to = NULL;
00388 
00389    ast_free(msg->from);
00390    msg->from = NULL;
00391 
00392    ast_free(msg->body);
00393    msg->body = NULL;
00394 
00395    ast_free(msg->context);
00396    msg->context = NULL;
00397 
00398    ast_free(msg->exten);
00399    msg->exten = NULL;
00400 
00401    ao2_ref(msg->vars, -1);
00402 }
00403 
00404 struct ast_msg *ast_msg_alloc(void)
00405 {
00406    struct ast_msg *msg;
00407 
00408    if (!(msg = ao2_alloc(sizeof(*msg), msg_destructor))) {
00409       return NULL;
00410    }
00411 
00412    if (!(msg->to = ast_str_create(32))) {
00413       ao2_ref(msg, -1);
00414       return NULL;
00415    }
00416 
00417    if (!(msg->from = ast_str_create(32))) {
00418       ao2_ref(msg, -1);
00419       return NULL;
00420    }
00421 
00422    if (!(msg->body = ast_str_create(128))) {
00423       ao2_ref(msg, -1);
00424       return NULL;
00425    }
00426 
00427    if (!(msg->context = ast_str_create(16))) {
00428       ao2_ref(msg, -1);
00429       return NULL;
00430    }
00431 
00432    if (!(msg->exten = ast_str_create(16))) {
00433       ao2_ref(msg, -1);
00434       return NULL;
00435    }
00436 
00437    if (!(msg->vars = ao2_container_alloc(1, msg_data_hash_fn, msg_data_cmp_fn))) {
00438       ao2_ref(msg, -1);
00439       return NULL;
00440    }
00441 
00442    ast_str_set(&msg->context, 0, "default");
00443 
00444    return msg;
00445 }
00446 
00447 struct ast_msg *ast_msg_ref(struct ast_msg *msg)
00448 {
00449    ao2_ref(msg, 1);
00450    return msg;
00451 }
00452 
00453 struct ast_msg *ast_msg_destroy(struct ast_msg *msg)
00454 {
00455    ao2_ref(msg, -1);
00456 
00457    return NULL;
00458 }
00459 
00460 int ast_msg_set_to(struct ast_msg *msg, const char *fmt, ...)
00461 {
00462    va_list ap;
00463    int res;
00464 
00465    va_start(ap, fmt);
00466    res = ast_str_set_va(&msg->to, 0, fmt, ap);
00467    va_end(ap);
00468 
00469    return res < 0 ? -1 : 0;
00470 }
00471 
00472 int ast_msg_set_from(struct ast_msg *msg, const char *fmt, ...)
00473 {
00474    va_list ap;
00475    int res;
00476 
00477    va_start(ap, fmt);
00478    res = ast_str_set_va(&msg->from, 0, fmt, ap);
00479    va_end(ap);
00480 
00481    return res < 0 ? -1 : 0;
00482 }
00483 
00484 int ast_msg_set_body(struct ast_msg *msg, const char *fmt, ...)
00485 {
00486    va_list ap;
00487    int res;
00488 
00489    va_start(ap, fmt);
00490    res = ast_str_set_va(&msg->body, 0, fmt, ap);
00491    va_end(ap);
00492 
00493    return res < 0 ? -1 : 0;
00494 }
00495 
00496 int ast_msg_set_context(struct ast_msg *msg, const char *fmt, ...)
00497 {
00498    va_list ap;
00499    int res;
00500 
00501    va_start(ap, fmt);
00502    res = ast_str_set_va(&msg->context, 0, fmt, ap);
00503    va_end(ap);
00504 
00505    return res < 0 ? -1 : 0;
00506 }
00507 
00508 int ast_msg_set_exten(struct ast_msg *msg, const char *fmt, ...)
00509 {
00510    va_list ap;
00511    int res;
00512 
00513    va_start(ap, fmt);
00514    res = ast_str_set_va(&msg->exten, 0, fmt, ap);
00515    va_end(ap);
00516 
00517    return res < 0 ? -1 : 0;
00518 }
00519 
00520 const char *ast_msg_get_body(const struct ast_msg *msg)
00521 {
00522    return ast_str_buffer(msg->body);
00523 }
00524 
00525 static struct msg_data *msg_data_alloc(void)
00526 {
00527    struct msg_data *data;
00528 
00529    if (!(data = ao2_alloc(sizeof(*data), msg_data_destructor))) {
00530       return NULL;
00531    }
00532 
00533    if (ast_string_field_init(data, 32)) {
00534       ao2_ref(data, -1);
00535       return NULL;
00536    }
00537 
00538    return data;
00539 }
00540 
00541 static struct msg_data *msg_data_find(struct ao2_container *vars, const char *name)
00542 {
00543    struct msg_data tmp = {
00544       .name = name,
00545    };
00546    return ao2_find(vars, &tmp, OBJ_POINTER);
00547 }
00548 
00549 static int msg_set_var_full(struct ast_msg *msg, const char *name, const char *value, unsigned int outbound)
00550 {
00551    struct msg_data *data;
00552 
00553    if (!(data = msg_data_find(msg->vars, name))) {
00554       if (ast_strlen_zero(value)) {
00555          return 0;
00556       }
00557       if (!(data = msg_data_alloc())) {
00558          return -1;
00559       };
00560 
00561       ast_string_field_set(data, name, name);
00562       ast_string_field_set(data, value, value);
00563       data->send = outbound;
00564       ao2_link(msg->vars, data);
00565    } else {
00566       if (ast_strlen_zero(value)) {
00567          ao2_unlink(msg->vars, data);
00568       } else {
00569          ast_string_field_set(data, value, value);
00570          data->send = outbound;
00571       }
00572    }
00573 
00574    ao2_ref(data, -1);
00575 
00576    return 0;
00577 }
00578 
00579 int ast_msg_set_var_outbound(struct ast_msg *msg, const char *name, const char *value)
00580 {
00581    return msg_set_var_full(msg, name, value, 1);
00582 }
00583 
00584 int ast_msg_set_var(struct ast_msg *msg, const char *name, const char *value)
00585 {
00586    return msg_set_var_full(msg, name, value, 0);
00587 }
00588 
00589 const char *ast_msg_get_var(struct ast_msg *msg, const char *name)
00590 {
00591    struct msg_data *data;
00592    const char *val = NULL;
00593 
00594    if (!(data = msg_data_find(msg->vars, name))) {
00595       return NULL;
00596    }
00597 
00598    /* Yep, this definitely looks like val would be a dangling pointer
00599     * after the ref count is decremented.  As long as the message structure
00600     * is used in a thread safe manner, this will not be the case though.
00601     * The ast_msg holds a reference to this object in the msg->vars container. */
00602    val = data->value;
00603    ao2_ref(data, -1);
00604 
00605    return val;
00606 }
00607 
00608 struct ast_msg_var_iterator {
00609    struct ao2_iterator i;
00610    struct msg_data *current_used;
00611 };
00612 
00613 struct ast_msg_var_iterator *ast_msg_var_iterator_init(const struct ast_msg *msg)
00614 {
00615    struct ast_msg_var_iterator *i;
00616    if (!(i = ast_calloc(1, sizeof(*i)))) {
00617       return NULL;
00618    }
00619 
00620    i->i = ao2_iterator_init(msg->vars, 0);
00621 
00622    return i;
00623 }
00624 
00625 int ast_msg_var_iterator_next(const struct ast_msg *msg, struct ast_msg_var_iterator *i, const char **name, const char **value)
00626 {
00627    struct msg_data *data;
00628 
00629    /* Skip any that aren't marked for sending out */
00630    while ((data = ao2_iterator_next(&i->i)) && !data->send) {
00631       ao2_ref(data, -1);
00632    }
00633 
00634    if (!data) {
00635       return 0;
00636    }
00637 
00638    if (data->send) {
00639       *name = data->name;
00640       *value = data->value;
00641    }
00642 
00643    /* Leave the refcount to be cleaned up by the caller with
00644     * ast_msg_var_unref_current after they finish with the pointers to the data */
00645    i->current_used = data;
00646 
00647    return 1;
00648 }
00649 
00650 void ast_msg_var_unref_current(struct ast_msg_var_iterator *i) {
00651    if (i->current_used) {
00652       ao2_ref(i->current_used, -1);
00653    }
00654    i->current_used = NULL;
00655 }
00656 
00657 void ast_msg_var_iterator_destroy(struct ast_msg_var_iterator *i)
00658 {
00659    ao2_iterator_destroy(&i->i);
00660    ast_free(i);
00661 }
00662 
00663 static struct ast_channel *create_msg_q_chan(void)
00664 {
00665    struct ast_channel *chan;
00666    struct ast_datastore *ds;
00667 
00668    chan = ast_channel_alloc(1, AST_STATE_UP,
00669          NULL, NULL, NULL,
00670          NULL, NULL, NULL, 0,
00671          "%s", "Message/ast_msg_queue");
00672 
00673    if (!chan) {
00674       return NULL;
00675    }
00676 
00677    ast_channel_unlink(chan);
00678 
00679    ast_channel_tech_set(chan, &msg_chan_tech_hack);
00680 
00681    if (!(ds = ast_datastore_alloc(&msg_datastore, NULL))) {
00682       ast_hangup(chan);
00683       return NULL;
00684    }
00685 
00686    ast_channel_lock(chan);
00687    ast_channel_datastore_add(chan, ds);
00688    ast_channel_unlock(chan);
00689 
00690    return chan;
00691 }
00692 
00693 /*!
00694  * \internal
00695  * \brief Run the dialplan for message processing
00696  *
00697  * \pre The message has already been set up on the msg datastore
00698  *      on this channel.
00699  */
00700 static void msg_route(struct ast_channel *chan, struct ast_msg *msg)
00701 {
00702    struct ast_pbx_args pbx_args;
00703 
00704    ast_explicit_goto(chan, ast_str_buffer(msg->context), AS_OR(msg->exten, "s"), 1);
00705 
00706    memset(&pbx_args, 0, sizeof(pbx_args));
00707    pbx_args.no_hangup_chan = 1,
00708    ast_pbx_run_args(chan, &pbx_args);
00709 }
00710 
00711 /*!
00712  * \internal
00713  * \brief Clean up ast_channel after each message
00714  *
00715  * Reset various bits of state after routing each message so the same ast_channel
00716  * can just be reused.
00717  */
00718 static void chan_cleanup(struct ast_channel *chan)
00719 {
00720    struct ast_datastore *msg_ds, *ds;
00721    struct varshead *headp;
00722    struct ast_var_t *vardata;
00723 
00724    ast_channel_lock(chan);
00725 
00726    /*
00727     * Remove the msg datastore.  Free its data but keep around the datastore
00728     * object and just reuse it.
00729     */
00730    if ((msg_ds = ast_channel_datastore_find(chan, &msg_datastore, NULL)) && msg_ds->data) {
00731       ast_channel_datastore_remove(chan, msg_ds);
00732       ao2_ref(msg_ds->data, -1);
00733       msg_ds->data = NULL;
00734    }
00735 
00736    /*
00737     * Destroy all other datastores.
00738     */
00739    while ((ds = AST_LIST_REMOVE_HEAD(ast_channel_datastores(chan), entry))) {
00740       ast_datastore_free(ds);
00741    }
00742 
00743    /*
00744     * Destroy all channel variables.
00745     */
00746    headp = ast_channel_varshead(chan);
00747    while ((vardata = AST_LIST_REMOVE_HEAD(headp, entries))) {
00748       ast_var_delete(vardata);
00749    }
00750 
00751    /*
00752     * Restore msg datastore.
00753     */
00754    if (msg_ds) {
00755       ast_channel_datastore_add(chan, msg_ds);
00756    }
00757    /*
00758     * Clear softhangup flags.
00759     */
00760    ast_channel_clear_softhangup(chan, AST_SOFTHANGUP_ALL);
00761 
00762    ast_channel_unlock(chan);
00763 }
00764 
00765 static void destroy_msg_q_chan(void *data)
00766 {
00767    struct ast_channel **chan = data;
00768 
00769    if (!*chan) {
00770       return;
00771    }
00772 
00773    ast_channel_release(*chan);
00774 }
00775 
00776 AST_THREADSTORAGE_CUSTOM(msg_q_chan, NULL, destroy_msg_q_chan);
00777 
00778 /*!
00779  * \internal
00780  * \brief Message queue task processor callback
00781  *
00782  * \retval 0 success
00783  * \retval -1 failure
00784  *
00785  * \note Even though this returns a value, the taskprocessor code ignores the value.
00786  */
00787 static int msg_q_cb(void *data)
00788 {
00789    struct ast_msg *msg = data;
00790    struct ast_channel **chan_p, *chan;
00791    struct ast_datastore *ds;
00792 
00793    if (!(chan_p = ast_threadstorage_get(&msg_q_chan, sizeof(struct ast_channel *)))) {
00794       return -1;
00795    }
00796    if (!*chan_p) {
00797       if (!(*chan_p = create_msg_q_chan())) {
00798          return -1;
00799       }
00800    }
00801    chan = *chan_p;
00802 
00803    ast_channel_lock(chan);
00804    if (!(ds = ast_channel_datastore_find(chan, &msg_datastore, NULL))) {
00805       ast_channel_unlock(chan);
00806       return -1;
00807    }
00808    ao2_ref(msg, +1);
00809    ds->data = msg;
00810    ast_channel_unlock(chan);
00811 
00812    msg_route(chan, msg);
00813    chan_cleanup(chan);
00814 
00815    ao2_ref(msg, -1);
00816 
00817    return 0;
00818 }
00819 
00820 int ast_msg_queue(struct ast_msg *msg)
00821 {
00822    int res;
00823 
00824    res = ast_taskprocessor_push(msg_q_tp, msg_q_cb, msg);
00825    if (res == -1) {
00826       ao2_ref(msg, -1);
00827    }
00828 
00829    return res;
00830 }
00831 
00832 /*!
00833  * \internal
00834  * \brief Find or create a message datastore on a channel
00835  *
00836  * \pre chan is locked
00837  *
00838  * \param chan the relevant channel
00839  *
00840  * \return the channel's message datastore, or NULL on error
00841  */
00842 static struct ast_datastore *msg_datastore_find_or_create(struct ast_channel *chan)
00843 {
00844    struct ast_datastore *ds;
00845 
00846    if ((ds = ast_channel_datastore_find(chan, &msg_datastore, NULL))) {
00847       return ds;
00848    }
00849 
00850    if (!(ds = ast_datastore_alloc(&msg_datastore, NULL))) {
00851       return NULL;
00852    }
00853 
00854    if (!(ds->data = ast_msg_alloc())) {
00855       ast_datastore_free(ds);
00856       return NULL;
00857    }
00858 
00859    ast_channel_datastore_add(chan, ds);
00860 
00861    return ds;
00862 }
00863 
00864 static int msg_func_read(struct ast_channel *chan, const char *function,
00865       char *data, char *buf, size_t len)
00866 {
00867    struct ast_datastore *ds;
00868    struct ast_msg *msg;
00869 
00870    if (!chan) {
00871       ast_log(LOG_WARNING, "No channel was provided to %s function.\n", function);
00872       return -1;
00873    }
00874 
00875    ast_channel_lock(chan);
00876 
00877    if (!(ds = ast_channel_datastore_find(chan, &msg_datastore, NULL))) {
00878       ast_channel_unlock(chan);
00879       ast_log(LOG_ERROR, "No MESSAGE data found on the channel to read.\n");
00880       return -1;
00881    }
00882 
00883    msg = ds->data;
00884    ao2_ref(msg, +1);
00885    ast_channel_unlock(chan);
00886 
00887    ao2_lock(msg);
00888 
00889    if (!strcasecmp(data, "to")) {
00890       ast_copy_string(buf, ast_str_buffer(msg->to), len);
00891    } else if (!strcasecmp(data, "from")) {
00892       ast_copy_string(buf, ast_str_buffer(msg->from), len);
00893    } else if (!strcasecmp(data, "body")) {
00894       ast_copy_string(buf, ast_msg_get_body(msg), len);
00895    } else {
00896       ast_log(LOG_WARNING, "Invalid argument to MESSAGE(): '%s'\n", data);
00897    }
00898 
00899    ao2_unlock(msg);
00900    ao2_ref(msg, -1);
00901 
00902    return 0;
00903 }
00904 
00905 static int msg_func_write(struct ast_channel *chan, const char *function,
00906       char *data, const char *value)
00907 {
00908    struct ast_datastore *ds;
00909    struct ast_msg *msg;
00910 
00911    if (!chan) {
00912       ast_log(LOG_WARNING, "No channel was provided to %s function.\n", function);
00913       return -1;
00914    }
00915 
00916    ast_channel_lock(chan);
00917 
00918    if (!(ds = msg_datastore_find_or_create(chan))) {
00919       ast_channel_unlock(chan);
00920       return -1;
00921    }
00922 
00923    msg = ds->data;
00924    ao2_ref(msg, +1);
00925    ast_channel_unlock(chan);
00926 
00927    ao2_lock(msg);
00928 
00929    if (!strcasecmp(data, "to")) {
00930       ast_msg_set_to(msg, "%s", value);
00931    } else if (!strcasecmp(data, "from")) {
00932       ast_msg_set_from(msg, "%s", value);
00933    } else if (!strcasecmp(data, "body")) {
00934       ast_msg_set_body(msg, "%s", value);
00935    } else if (!strcasecmp(data, "custom_data")) {
00936       int outbound = -1;
00937       if (!strcasecmp(value, "mark_all_outbound")) {
00938          outbound = 1;
00939       } else if (!strcasecmp(value, "clear_all_outbound")) {
00940          outbound = 0;
00941       } else {
00942          ast_log(LOG_WARNING, "'%s' is not a valid value for custom_data\n", value);
00943       }
00944 
00945       if (outbound != -1) {
00946          struct msg_data *hdr_data;
00947          struct ao2_iterator iter = ao2_iterator_init(msg->vars, 0);
00948 
00949          while ((hdr_data = ao2_iterator_next(&iter))) {
00950             hdr_data->send = outbound;
00951             ao2_ref(hdr_data, -1);
00952          }
00953          ao2_iterator_destroy(&iter);
00954       }
00955    } else {
00956       ast_log(LOG_WARNING, "'%s' is not a valid write argument.\n", data);
00957    }
00958 
00959    ao2_unlock(msg);
00960    ao2_ref(msg, -1);
00961 
00962    return 0;
00963 }
00964 
00965 static int msg_data_func_read(struct ast_channel *chan, const char *function,
00966       char *data, char *buf, size_t len)
00967 {
00968    struct ast_datastore *ds;
00969    struct ast_msg *msg;
00970    const char *val;
00971 
00972    if (!chan) {
00973       ast_log(LOG_WARNING, "No channel was provided to %s function.\n", function);
00974       return -1;
00975    }
00976 
00977    ast_channel_lock(chan);
00978 
00979    if (!(ds = ast_channel_datastore_find(chan, &msg_datastore, NULL))) {
00980       ast_channel_unlock(chan);
00981       ast_log(LOG_ERROR, "No MESSAGE data found on the channel to read.\n");
00982       return -1;
00983    }
00984 
00985    msg = ds->data;
00986    ao2_ref(msg, +1);
00987    ast_channel_unlock(chan);
00988 
00989    ao2_lock(msg);
00990 
00991    if ((val = ast_msg_get_var(msg, data))) {
00992       ast_copy_string(buf, val, len);
00993    }
00994 
00995    ao2_unlock(msg);
00996    ao2_ref(msg, -1);
00997 
00998    return 0;
00999 }
01000 
01001 static int msg_data_func_write(struct ast_channel *chan, const char *function,
01002       char *data, const char *value)
01003 {
01004    struct ast_datastore *ds;
01005    struct ast_msg *msg;
01006 
01007    if (!chan) {
01008       ast_log(LOG_WARNING, "No channel was provided to %s function.\n", function);
01009       return -1;
01010    }
01011 
01012    ast_channel_lock(chan);
01013 
01014    if (!(ds = msg_datastore_find_or_create(chan))) {
01015       ast_channel_unlock(chan);
01016       return -1;
01017    }
01018 
01019    msg = ds->data;
01020    ao2_ref(msg, +1);
01021    ast_channel_unlock(chan);
01022 
01023    ao2_lock(msg);
01024 
01025    ast_msg_set_var_outbound(msg, data, value);
01026 
01027    ao2_unlock(msg);
01028    ao2_ref(msg, -1);
01029 
01030    return 0;
01031 }
01032 static int msg_tech_hash(const void *obj, const int flags)
01033 {
01034    struct ast_msg_tech_holder *tech_holder = (struct ast_msg_tech_holder *) obj;
01035    int res = 0;
01036 
01037    ast_rwlock_rdlock(&tech_holder->tech_lock);
01038    if (tech_holder->tech) {
01039       res = ast_str_case_hash(tech_holder->tech->name);
01040    }
01041    ast_rwlock_unlock(&tech_holder->tech_lock);
01042 
01043    return res;
01044 }
01045 
01046 static int msg_tech_cmp(void *obj, void *arg, int flags)
01047 {
01048    struct ast_msg_tech_holder *tech_holder = obj;
01049    const struct ast_msg_tech_holder *tech_holder2 = arg;
01050    int res = 1;
01051 
01052    ast_rwlock_rdlock(&tech_holder->tech_lock);
01053    /*
01054     * tech_holder2 is a temporary fake tech_holder.
01055     */
01056    if (tech_holder->tech) {
01057       res = strcasecmp(tech_holder->tech->name, tech_holder2->tech->name) ? 0 : CMP_MATCH | CMP_STOP;
01058    }
01059    ast_rwlock_unlock(&tech_holder->tech_lock);
01060 
01061    return res;
01062 }
01063 
01064 static struct ast_msg_tech_holder *msg_find_by_tech(const struct ast_msg_tech *msg_tech, int ao2_flags)
01065 {
01066    struct ast_msg_tech_holder *tech_holder;
01067    struct ast_msg_tech_holder tmp_tech_holder = {
01068       .tech = msg_tech,
01069    };
01070 
01071    ast_rwlock_init(&tmp_tech_holder.tech_lock);
01072    tech_holder = ao2_find(msg_techs, &tmp_tech_holder, ao2_flags);
01073    ast_rwlock_destroy(&tmp_tech_holder.tech_lock);
01074    return tech_holder;
01075 }
01076 
01077 static struct ast_msg_tech_holder *msg_find_by_tech_name(const char *tech_name, int ao2_flags)
01078 {
01079    struct ast_msg_tech tmp_msg_tech = {
01080       .name = tech_name,
01081    };
01082    return msg_find_by_tech(&tmp_msg_tech, ao2_flags);
01083 }
01084 
01085 /*!
01086  * \internal
01087  * \brief MessageSend() application
01088  */
01089 static int msg_send_exec(struct ast_channel *chan, const char *data)
01090 {
01091    struct ast_datastore *ds;
01092    struct ast_msg *msg;
01093    char *tech_name;
01094    struct ast_msg_tech_holder *tech_holder = NULL;
01095    char *parse;
01096    int res = -1;
01097    AST_DECLARE_APP_ARGS(args,
01098       AST_APP_ARG(to);
01099       AST_APP_ARG(from);
01100    );
01101 
01102    if (ast_strlen_zero(data)) {
01103       ast_log(LOG_WARNING, "An argument is required to MessageSend()\n");
01104       pbx_builtin_setvar_helper(chan, "MESSAGE_SEND_STATUS", "INVALID_URI");
01105       return 0;
01106    }
01107 
01108    parse = ast_strdupa(data);
01109    AST_STANDARD_APP_ARGS(args, parse);
01110 
01111    if (ast_strlen_zero(args.to)) {
01112       ast_log(LOG_WARNING, "A 'to' URI is required for MessageSend()\n");
01113       pbx_builtin_setvar_helper(chan, "MESSAGE_SEND_STATUS", "INVALID_URI");
01114       return 0;
01115    }
01116 
01117    ast_channel_lock(chan);
01118 
01119    if (!(ds = ast_channel_datastore_find(chan, &msg_datastore, NULL))) {
01120       ast_channel_unlock(chan);
01121       ast_log(LOG_WARNING, "No message data found on channel to send.\n");
01122       pbx_builtin_setvar_helper(chan, "MESSAGE_SEND_STATUS", "FAILURE");
01123       return 0;
01124    }
01125 
01126    msg = ds->data;
01127    ao2_ref(msg, +1);
01128    ast_channel_unlock(chan);
01129 
01130    tech_name = ast_strdupa(args.to);
01131    tech_name = strsep(&tech_name, ":");
01132 
01133    tech_holder = msg_find_by_tech_name(tech_name, OBJ_POINTER);
01134 
01135    if (!tech_holder) {
01136       ast_log(LOG_WARNING, "No message technology '%s' found.\n", tech_name);
01137       pbx_builtin_setvar_helper(chan, "MESSAGE_SEND_STATUS", "INVALID_PROTOCOL");
01138       goto exit_cleanup;
01139    }
01140 
01141    /*
01142     * The message lock is held here to safely allow the technology
01143     * implementation to access the message fields without worrying
01144     * that they could change.
01145     */
01146    ao2_lock(msg);
01147    ast_rwlock_rdlock(&tech_holder->tech_lock);
01148    if (tech_holder->tech) {
01149       res = tech_holder->tech->msg_send(msg, S_OR(args.to, ""),
01150                      S_OR(args.from, ""));
01151    }
01152    ast_rwlock_unlock(&tech_holder->tech_lock);
01153    ao2_unlock(msg);
01154 
01155    pbx_builtin_setvar_helper(chan, "MESSAGE_SEND_STATUS", res ? "FAILURE" : "SUCCESS");
01156 
01157 exit_cleanup:
01158    if (tech_holder) {
01159       ao2_ref(tech_holder, -1);
01160       tech_holder = NULL;
01161    }
01162 
01163    ao2_ref(msg, -1);
01164 
01165    return 0;
01166 }
01167 
01168 static int action_messagesend(struct mansession *s, const struct message *m)
01169 {
01170    const char *to = ast_strdupa(astman_get_header(m, "To"));
01171    const char *from = astman_get_header(m, "From");
01172    const char *body = astman_get_header(m, "Body");
01173    const char *base64body = astman_get_header(m, "Base64Body");
01174    char base64decoded[1301] = { 0, };
01175    char *tech_name = NULL;
01176    struct ast_variable *vars = NULL;
01177    struct ast_variable *data = NULL;
01178    struct ast_msg_tech_holder *tech_holder = NULL;
01179    struct ast_msg *msg;
01180    int res = -1;
01181 
01182    if (ast_strlen_zero(to)) {
01183       astman_send_error(s, m, "No 'To' address specified.");
01184       return -1;
01185    }
01186 
01187    if (!ast_strlen_zero(base64body)) {
01188       ast_base64decode((unsigned char *) base64decoded, base64body, sizeof(base64decoded) - 1);
01189       body = base64decoded;
01190    }
01191 
01192    tech_name = ast_strdupa(to);
01193    tech_name = strsep(&tech_name, ":");
01194 
01195    tech_holder = msg_find_by_tech_name(tech_name, OBJ_POINTER);
01196 
01197    if (!tech_holder) {
01198       astman_send_error(s, m, "Message technology not found.");
01199       return -1;
01200    }
01201 
01202    if (!(msg = ast_msg_alloc())) {
01203       ao2_ref(tech_holder, -1);
01204       astman_send_error(s, m, "Internal failure\n");
01205       return -1;
01206    }
01207 
01208    data = astman_get_variables(m);
01209    for (vars = data; vars; vars = vars->next) {
01210       ast_msg_set_var_outbound(msg, vars->name, vars->value);
01211    }
01212 
01213    ast_msg_set_body(msg, "%s", body);
01214 
01215    ast_rwlock_rdlock(&tech_holder->tech_lock);
01216    if (tech_holder->tech) {
01217       res = tech_holder->tech->msg_send(msg, S_OR(to, ""), S_OR(from, ""));
01218    }
01219    ast_rwlock_unlock(&tech_holder->tech_lock);
01220 
01221    ast_variables_destroy(vars);
01222    ao2_ref(tech_holder, -1);
01223    ao2_ref(msg, -1);
01224 
01225    if (res) {
01226       astman_send_error(s, m, "Message failed to send.");
01227    } else {
01228       astman_send_ack(s, m, "Message successfully sent");
01229    }
01230    return res;
01231 }
01232 
01233 int ast_msg_send(struct ast_msg *msg, const char *to, const char *from)
01234 {
01235    char *tech_name = NULL;
01236    struct ast_msg_tech_holder *tech_holder = NULL;
01237    int res = -1;
01238 
01239    if (ast_strlen_zero(to)) {
01240       ao2_ref(msg, -1);
01241       return -1;
01242    }
01243 
01244    tech_name = ast_strdupa(to);
01245    tech_name = strsep(&tech_name, ":");
01246 
01247    tech_holder = msg_find_by_tech_name(tech_name, OBJ_POINTER);
01248 
01249    if (!tech_holder) {
01250       ao2_ref(msg, -1);
01251       return -1;
01252    }
01253 
01254    ast_rwlock_rdlock(&tech_holder->tech_lock);
01255    if (tech_holder->tech) {
01256       res = tech_holder->tech->msg_send(msg, S_OR(to, ""), S_OR(from, ""));
01257    }
01258    ast_rwlock_unlock(&tech_holder->tech_lock);
01259 
01260    ao2_ref(tech_holder, -1);
01261    ao2_ref(msg, -1);
01262 
01263    return res;
01264 }
01265 
01266 int ast_msg_tech_register(const struct ast_msg_tech *tech)
01267 {
01268    struct ast_msg_tech_holder *tech_holder;
01269 
01270    if ((tech_holder = msg_find_by_tech(tech, OBJ_POINTER))) {
01271       ao2_ref(tech_holder, -1);
01272       ast_log(LOG_ERROR, "Message technology already registered for '%s'\n",
01273             tech->name);
01274       return -1;
01275    }
01276 
01277    if (!(tech_holder = ao2_alloc(sizeof(*tech_holder), NULL))) {
01278       return -1;
01279    }
01280 
01281    ast_rwlock_init(&tech_holder->tech_lock);
01282    tech_holder->tech = tech;
01283 
01284    ao2_link(msg_techs, tech_holder);
01285 
01286    ao2_ref(tech_holder, -1);
01287    tech_holder = NULL;
01288 
01289    ast_verb(3, "Message technology handler '%s' registered.\n", tech->name);
01290 
01291    return 0;
01292 }
01293 
01294 int ast_msg_tech_unregister(const struct ast_msg_tech *tech)
01295 {
01296    struct ast_msg_tech_holder *tech_holder;
01297 
01298    tech_holder = msg_find_by_tech(tech, OBJ_POINTER | OBJ_UNLINK);
01299 
01300    if (!tech_holder) {
01301       ast_log(LOG_ERROR, "No '%s' message technology found.\n", tech->name);
01302       return -1;
01303    }
01304 
01305    ast_rwlock_wrlock(&tech_holder->tech_lock);
01306    tech_holder->tech = NULL;
01307    ast_rwlock_unlock(&tech_holder->tech_lock);
01308 
01309    ao2_ref(tech_holder, -1);
01310    tech_holder = NULL;
01311 
01312    ast_verb(3, "Message technology handler '%s' unregistered.\n", tech->name);
01313 
01314    return 0;
01315 }
01316 
01317 void ast_msg_shutdown(void)
01318 {
01319    if (msg_q_tp) {
01320       msg_q_tp = ast_taskprocessor_unreference(msg_q_tp);
01321    }
01322 }
01323 
01324 /*! \internal \brief Clean up other resources on Asterisk shutdown
01325  * \note This does not include the msg_q_tp object, which must be disposed
01326  * of prior to Asterisk checking for channel destruction in its shutdown
01327  * sequence.  The atexit handlers are executed after this occurs. */
01328 static void message_shutdown(void)
01329 {
01330    ast_custom_function_unregister(&msg_function);
01331    ast_custom_function_unregister(&msg_data_function);
01332    ast_unregister_application(app_msg_send);
01333    ast_manager_unregister("MessageSend");
01334 
01335    if (msg_techs) {
01336       ao2_ref(msg_techs, -1);
01337       msg_techs = NULL;
01338    }
01339 }
01340 
01341 /*
01342  * \internal
01343  * \brief Initialize stuff during Asterisk startup.
01344  *
01345  * Cleanup isn't a big deal in this function.  If we return non-zero,
01346  * Asterisk is going to exit.
01347  *
01348  * \retval 0 success
01349  * \retval non-zero failure
01350  */
01351 int ast_msg_init(void)
01352 {
01353    int res;
01354 
01355    msg_q_tp = ast_taskprocessor_get("ast_msg_queue", TPS_REF_DEFAULT);
01356    if (!msg_q_tp) {
01357       return -1;
01358    }
01359 
01360    msg_techs = ao2_container_alloc(17, msg_tech_hash, msg_tech_cmp);
01361    if (!msg_techs) {
01362       return -1;
01363    }
01364 
01365    res = __ast_custom_function_register(&msg_function, NULL);
01366    res |= __ast_custom_function_register(&msg_data_function, NULL);
01367    res |= ast_register_application2(app_msg_send, msg_send_exec, NULL, NULL, NULL);
01368    res |= ast_manager_register_xml_core("MessageSend", EVENT_FLAG_MESSAGE, action_messagesend);
01369 
01370    ast_register_atexit(message_shutdown);
01371 
01372    return res;
01373 }