在MRCP 開始語音識别後會調用recog_channel_start()函數
static switch_status_t recog_channel_start(speech_channel_t *schannel)
{
switch_status_t status = SWITCH_STATUS_SUCCESS;
switch_hash_index_t *egk;
mrcp_message_t *mrcp_message;
mrcp_recog_header_t *recog_header;
mrcp_generic_header_t *generic_header;
recognizer_data_t *r;
char *start_input_timers;
const char *mime_type;
char *key = NULL;
switch_size_t len;
grammar_t *grammar = NULL;
switch_size_t grammar_uri_count = ;
switch_size_t grammar_uri_list_len = ;
char *grammar_uri_list = NULL;
int warned = ;
switch_mutex_lock(schannel->mutex);
if (schannel->state != SPEECH_CHANNEL_READY) {
status = SWITCH_STATUS_FALSE;
goto done;
}
if (schannel->data == NULL) {
status = SWITCH_STATUS_FALSE;
goto done;
}
r = (recognizer_data_t *) schannel->data;
r->result = NULL;
if (r->result_headers) {
switch_event_destroy(&r->result_headers);
}
r->start_of_input = ;
/* input timers are started by default unless the start-input-timers=false param is set */
start_input_timers = (char *) switch_core_hash_find(schannel->params, "start-input-timers");
r->timers_started = zstr(start_input_timers) || strcasecmp(start_input_timers, "false");
/* count enabled grammars */
for (egk = switch_core_hash_first(r->enabled_grammars); egk; egk = switch_core_hash_next(&egk)) {
// NOTE: This postponed type check is necessary to allow a non-URI-list grammar to execute alone
if (grammar_uri_count == && grammar->type != GRAMMAR_TYPE_URI)
goto no_grammar_alone;
++grammar_uri_count;
switch_core_hash_this(egk, (void *) &key, NULL, (void *) &grammar);
if (grammar->type != GRAMMAR_TYPE_URI && grammar_uri_count != 1) {
no_grammar_alone:
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(schannel->session_uuid), SWITCH_LOG_ERROR, "(%s) Grammar '%s' can only be used alone (not a URI list)\n", schannel->name, key);
status = SWITCH_STATUS_FALSE;
switch_safe_free(egk);
goto done;
}
len = strlen(grammar->data);
if (!len)
continue;
grammar_uri_list_len += len;
if (grammar->data[len - 1] != '\n')
grammar_uri_list_len += ;
}
switch (grammar_uri_count) {
case :
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(schannel->session_uuid), SWITCH_LOG_ERROR, "(%s) No grammar specified\n", schannel->name);
status = SWITCH_STATUS_FALSE;
goto done;
case :
/* grammar should already be the unique grammar */
break;
default:
/* get the enabled grammars list */
grammar_uri_list = switch_core_alloc(schannel->memory_pool, grammar_uri_list_len + );
grammar_uri_list_len = ;
for (egk = switch_core_hash_first(r->enabled_grammars); egk; egk = switch_core_hash_next(&egk)) {
switch_core_hash_this(egk, (void *) &key, NULL, (void *) &grammar);
len = strlen(grammar->data);
if (!len)
continue;
memcpy(&(grammar_uri_list[grammar_uri_list_len]), grammar->data, len);
grammar_uri_list_len += len;
if (grammar_uri_list[grammar_uri_list_len - ] != '\n')
{
grammar_uri_list_len += ;
grammar_uri_list[grammar_uri_list_len - ] = '\r';
grammar_uri_list[grammar_uri_list_len - ] = '\n';
}
}
grammar_uri_list[grammar_uri_list_len++] = '\';
grammar = NULL;
}
/* create MRCP message */
mrcp_message = mrcp_application_message_create(schannel->unimrcp_session, schannel->unimrcp_channel, RECOGNIZER_RECOGNIZE);
if (mrcp_message == NULL) {
status = SWITCH_STATUS_FALSE;
goto done;
}
/* allocate generic header */
generic_header = (mrcp_generic_header_t *) mrcp_generic_header_prepare(mrcp_message);
if (generic_header == NULL) {
status = SWITCH_STATUS_FALSE;
goto done;
}
/* set Content-Type */
mime_type = grammar_type_to_mime(grammar ? grammar->type : GRAMMAR_TYPE_URI, schannel->profile);
if (zstr(mime_type)) {
status = SWITCH_STATUS_FALSE;
goto done;
}
apt_string_assign(&generic_header->content_type, mime_type, mrcp_message->pool);
mrcp_generic_header_property_add(mrcp_message, GENERIC_HEADER_CONTENT_TYPE);
/* set Content-ID for inline grammars */
if (grammar && grammar->type != GRAMMAR_TYPE_URI) {
apt_string_assign(&generic_header->content_id, grammar->name, mrcp_message->pool);
mrcp_generic_header_property_add(mrcp_message, GENERIC_HEADER_CONTENT_ID);
}
/* allocate recognizer-specific header */
recog_header = (mrcp_recog_header_t *) mrcp_resource_header_prepare(mrcp_message);
if (recog_header == NULL) {
status = SWITCH_STATUS_FALSE;
goto done;
}
/* set Cancel-If-Queue */
if (mrcp_message->start_line.version == MRCP_VERSION_2) {
recog_header->cancel_if_queue = FALSE;
mrcp_resource_header_property_add(mrcp_message, RECOGNIZER_HEADER_CANCEL_IF_QUEUE);
}
/* set parameters */
recog_channel_set_params(schannel, mrcp_message, generic_header, recog_header);
/* set message body */
apt_string_assign(&mrcp_message->body, grammar ? grammar->data : grammar_uri_list, mrcp_message->pool);
/* Empty audio queue and send RECOGNIZE to MRCP server */
audio_queue_clear(schannel->audio_queue);
if (mrcp_application_message_send(schannel->unimrcp_session, schannel->unimrcp_channel, mrcp_message) == FALSE) {
status = SWITCH_STATUS_FALSE;
goto done;
}
/* wait for IN-PROGRESS */
while (schannel->state == SPEECH_CHANNEL_READY) {
if (switch_thread_cond_timedwait(schannel->cond, schannel->mutex, SPEECH_CHANNEL_TIMEOUT_USEC) == SWITCH_STATUS_TIMEOUT && !warned) {
warned = ;
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(schannel->session_uuid), SWITCH_LOG_WARNING, "(%s) IN-PROGRESS not received for RECOGNIZE after %d ms.\n", schannel->name, SPEECH_CHANNEL_TIMEOUT_USEC / ());
}
}
if (schannel->state != SPEECH_CHANNEL_PROCESSING) {
status = SWITCH_STATUS_FALSE;
goto done;
}
done:
switch_mutex_unlock(schannel->mutex);
return status;
}
這裡做了如下幾件事情
1.檢查grammer的設定情況,然後設定真正的grammer。
2.調用mrcp_application_message_create()建立一個mrcp_message_t結構體。
3.調用recog_channel_set_params()設定MRCP的參數,也就是消息的頭部資訊,包括系統參數和廠商參數。
4.調用apt_string_assign()設定MRCP的body,從代碼可以看到body的值就是grammer的内容。
5.調用mrcp_application_message_send()将資料發送給MRCP的Server。
下面我們重點分析mrcp_application_message_send()的實作
MRCP_DECLARE(apt_bool_t) mrcp_application_message_send(mrcp_session_t *session, mrcp_channel_t *channel, mrcp_message_t *message)
{
if(!session || !channel || !message) {
return FALSE;
}
return mrcp_app_control_task_msg_signal(session,channel,message);
}
是以其實調用mrcp_app_control_task_msg_signal()來處理。
apt_bool_t mrcp_app_control_task_msg_signal(mrcp_session_t *session, mrcp_channel_t *channel, mrcp_message_t *message)
{
mrcp_client_session_t *client_session = (mrcp_client_session_t*)session;
mrcp_application_t *application = client_session->application;
apt_task_t *task = apt_consumer_task_base_get(application->client->task);
apt_task_msg_t *task_msg = apt_task_msg_acquire(application->msg_pool);
if(task_msg) {
mrcp_app_message_t **slot = ((mrcp_app_message_t**)task_msg->data);
mrcp_app_message_t *app_message;
task_msg->type = MRCP_CLIENT_APPLICATION_TASK_MSG;
app_message = mrcp_client_app_control_message_create(session->pool);
app_message->application = client_session->application;
app_message->session = session;
app_message->channel = channel;
app_message->control_message = message;
*slot = app_message;
return apt_task_msg_signal(task,task_msg);
}
return FALSE;
}
這裡将資料封裝成兩個對象:apt_task_t與apt_task_msg_t,(注意這裡的apt_task_t是 apt_consumer_task類型對象的base屬性),并且設定apt_task_msg_t的type為:MRCP_CLIENT_APPLICATION_TASK_MSG,而他的data指向一個mrcp_app_message_t結構體,這個結構體的message_type為:MRCP_APP_MESSAGE_TYPE_CONTROL。
如此之後就調用apt_task_msg_signal将這個消息任務分發出去。
這個函數的實作我們在前面的兩篇文章中已經做了介紹:
freeswitch mrcp 源碼分析–資料接收(上)
freeswitch mrcp 源碼分析–資料接收(下)
是以,從這裡也可以看出,不管是mrcp的消息發送還是接收,都是采用同樣的模型:建立一個消息任務,然後加到隊列裡面,而另一個線程就去處理消費這些消息,并調用對應的函數處理。
與MRCP資料的接收一樣,最後會調用mrcp_client_msg_process()這個函數來處理消息任務:
static apt_bool_t mrcp_client_msg_process(apt_task_t *task, apt_task_msg_t *msg)
{
apt_consumer_task_t *consumer_task = apt_task_object_get(task);
mrcp_client_t *client = apt_consumer_task_object_get(consumer_task);
if(!client) {
return FALSE;
}
switch(msg->type) {
case MRCP_CLIENT_SIGNALING_TASK_MSG:
{
const sig_agent_task_msg_data_t *sig_message = (const sig_agent_task_msg_data_t*)msg->data;
switch(msg->sub_type) {
case SIG_AGENT_TASK_MSG_ANSWER:
mrcp_client_session_answer_process(sig_message->session,sig_message->descriptor);
break;
case SIG_AGENT_TASK_MSG_TERMINATE_RESPONSE:
mrcp_client_session_terminate_response_process(sig_message->session);
break;
case SIG_AGENT_TASK_MSG_CONTROL_RESPONSE:
mrcp_client_session_control_response_process(sig_message->session,sig_message->message);
break;
case SIG_AGENT_TASK_MSG_DISCOVER_RESPONSE:
mrcp_client_session_discover_response_process(sig_message->session,sig_message->descriptor);
break;
case SIG_AGENT_TASK_MSG_TERMINATE_EVENT:
mrcp_client_session_terminate_event_process(sig_message->session);
break;
default:
break;
}
break;
}
case MRCP_CLIENT_CONNECTION_TASK_MSG:
{
const connection_agent_task_msg_data_t *data = (const connection_agent_task_msg_data_t*)msg->data;
switch(msg->sub_type) {
case CONNECTION_AGENT_TASK_MSG_ADD_CHANNEL:
mrcp_client_on_channel_add(data->channel,data->descriptor,data->status);
break;
case CONNECTION_AGENT_TASK_MSG_MODIFY_CHANNEL:
mrcp_client_on_channel_modify(data->channel,data->descriptor,data->status);
break;
case CONNECTION_AGENT_TASK_MSG_REMOVE_CHANNEL:
mrcp_client_on_channel_remove(data->channel,data->status);
break;
case CONNECTION_AGENT_TASK_MSG_RECEIVE_MESSAGE:
mrcp_client_on_message_receive(data->channel,data->message);
break;
case CONNECTION_AGENT_TASK_MSG_DISCONNECT:
mrcp_client_on_disconnect(data->channel);
break;
default:
break;
}
break;
}
case MRCP_CLIENT_MEDIA_TASK_MSG:
{
mpf_message_container_t *container = (mpf_message_container_t*) msg->data;
mrcp_client_mpf_message_process(container);
break;
}
case MRCP_CLIENT_APPLICATION_TASK_MSG:
{
mrcp_app_message_t **app_message = (mrcp_app_message_t**) msg->data;
mrcp_client_app_message_process(*app_message);
break;
}
default:
{
apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Unknown Task Message Received [%d;%d]", msg->type,msg->sub_type);
break;
}
}
return TRUE;
}
可以看到這裡會依據不同的任務類型,調用不同的函數來處理,這裡type為:MRCP_CLIENT_APPLICATION_TASK_MSG,是以會調用mrcp_client_app_message_process()處理。
這個函數裡面主要調用:
mrcp_app_request_dispatch()将消息分發給各個對應的函數處理。
static apt_bool_t mrcp_app_request_dispatch(mrcp_client_session_t *session, const mrcp_app_message_t *app_message)
{
if(session->state == SESSION_STATE_TERMINATING) {
/* no more requests are allowed, as session is being terminated!
just return, it is horribly wrong and can crash anytime here */
apt_obj_log(APT_LOG_MARK,APT_PRIO_ERROR,session->base.log_obj,"Inappropriate Application Request "APT_NAMESID_FMT" [%d]",
MRCP_SESSION_NAMESID(session),
app_message->sig_message.command_id);
return FALSE;
}
if(session->disconnected == TRUE) {
/* cancel all the requests besides session termination one */
if(!IS_SESSION_TERMINATE(app_message)) {
apt_obj_log(APT_LOG_MARK,APT_PRIO_WARNING,session->base.log_obj,"Cancel App Request "APT_NAMESID_FMT" [%d]",
MRCP_SESSION_NAMESID(session), app_message->sig_message.command_id);
session->status = MRCP_SIG_STATUS_CODE_CANCEL;
return mrcp_app_failure_message_raise(session);
}
}
if(session->registered == FALSE) {
if(IS_SESSION_TERMINATE(app_message)) {
/* if session is not registered, nothing to terminate, just respond with success */
session->status = MRCP_SIG_STATUS_CODE_SUCCESS;
return mrcp_app_sig_response_raise(session,FALSE);
}
/* select signaling agent */
session->base.signaling_agent = mrcp_sa_factory_agent_select(session->profile->sa_factory);
if(!session->base.signaling_agent) {
apt_obj_log(APT_LOG_MARK,APT_PRIO_WARNING,session->base.log_obj,"Failed to Select Signaling Agent "APT_NAMESID_FMT" [%d]",
MRCP_SESSION_NAMESID(session),
app_message->sig_message.command_id);
session->status = MRCP_SIG_STATUS_CODE_FAILURE;
}
if(session->profile->mrcp_version == MRCP_VERSION_2) {
/* select connection agent */
session->base.connection_agent = mrcp_ca_factory_agent_select(session->profile->ca_factory);
if(!session->base.connection_agent) {
apt_obj_log(APT_LOG_MARK,APT_PRIO_WARNING,session->base.log_obj,"Failed to Select Connection Agent "APT_NAMESID_FMT" [%d]",
MRCP_SESSION_NAMESID(session),
app_message->sig_message.command_id);
session->status = MRCP_SIG_STATUS_CODE_FAILURE;
}
}
if(session->profile->mpf_factory) {
/* select media engine */
session->base.media_engine = mpf_engine_factory_engine_select(session->profile->mpf_factory);
if(!session->base.media_engine) {
apt_obj_log(APT_LOG_MARK,APT_PRIO_WARNING,session->base.log_obj,"Failed to Select Media Engine "APT_NAMESID_FMT" [%d]",
MRCP_SESSION_NAMESID(session),
app_message->sig_message.command_id);
session->status = MRCP_SIG_STATUS_CODE_FAILURE;
}
}
/* set rtp termination factory */
session->base.rtp_factory = session->profile->rtp_termination_factory;
if(session->status == MRCP_SIG_STATUS_CODE_FAILURE) {
/* raise app response in case of failure */
return mrcp_app_failure_message_raise(session);
}
if(session->base.signaling_agent->create_client_session(&session->base,session->profile->signaling_settings) != TRUE) {
/* raise app response */
apt_obj_log(APT_LOG_MARK,APT_PRIO_WARNING,session->base.log_obj,"Failed to Create Session "APT_NAMESID_FMT" [%d]",
MRCP_SESSION_NAMESID(session),
app_message->sig_message.command_id);
session->status = MRCP_SIG_STATUS_CODE_FAILURE;
return mrcp_app_failure_message_raise(session);
}
mrcp_client_session_add(session->application->client,session);
session->registered = TRUE;
}
session->status = MRCP_SIG_STATUS_CODE_SUCCESS;
switch(app_message->message_type) {
case MRCP_APP_MESSAGE_TYPE_SIGNALING:
{
apt_obj_log(APT_LOG_MARK,APT_PRIO_DEBUG,session->base.log_obj,"Dispatch App Request "APT_NAMESID_FMT" [%d]",
MRCP_SESSION_NAMESID(session),
app_message->sig_message.command_id);
switch(app_message->sig_message.command_id) {
case MRCP_SIG_COMMAND_SESSION_UPDATE:
mrcp_client_session_update(session);
break;
case MRCP_SIG_COMMAND_SESSION_TERMINATE:
mrcp_client_session_terminate(session);
break;
case MRCP_SIG_COMMAND_CHANNEL_ADD:
mrcp_client_channel_add(session,app_message->channel);
break;
case MRCP_SIG_COMMAND_CHANNEL_REMOVE:
mrcp_client_channel_modify(session,app_message->channel,FALSE);
break;
case MRCP_SIG_COMMAND_RESOURCE_DISCOVER:
mrcp_client_resource_discover(session);
break;
default:
break;
}
break;
}
case MRCP_APP_MESSAGE_TYPE_CONTROL:
{
mrcp_client_message_send(session,app_message->channel,app_message->control_message);
break;
}
}
return TRUE;
}
前面說了message_type為:MRCP_APP_MESSAGE_TYPE_CONTROL,
是以處理函數是:mrcp_client_message_send()
static apt_bool_t mrcp_client_message_send(mrcp_client_session_t *session, mrcp_channel_t *channel, mrcp_message_t *message)
{
if(!session->base.id.length) {
mrcp_message_t *response = mrcp_response_create(message,message->pool);
response->start_line.status_code = MRCP_STATUS_CODE_METHOD_FAILED;
apt_obj_log(APT_LOG_MARK,APT_PRIO_DEBUG,session->base.log_obj,"Raise App Failure MRCP Response "APT_NAMESID_FMT,
MRCP_SESSION_NAMESID(session));
mrcp_app_control_message_raise(session,channel,response);
return TRUE;
}
message->channel_id.session_id = session->base.id;
message->start_line.request_id = ++session->base.last_request_id;
apt_obj_log(APT_LOG_MARK,APT_PRIO_INFO,session->base.log_obj,"Send MRCP Request "APT_NAMESIDRES_FMT" [%"MRCP_REQUEST_ID_FMT"]",
MRCP_SESSION_NAMESID(session),
channel->resource->name.buf,
message->start_line.request_id);
if(channel->control_channel) {
/* MRCPv2 */
mrcp_client_control_message_send(channel->control_channel,message);
}
else {
/* MRCPv1 */
mrcp_session_control_request(channel->session,message);
}
return TRUE;
}
對于MRCP2,顯然調用的是:
mrcp_client_control_message_send()
/** Send MRCPv2 message */
MRCP_DECLARE(apt_bool_t) mrcp_client_control_message_send(mrcp_control_channel_t *channel, mrcp_message_t *message)
{
return mrcp_client_control_message_signal(CONNECTION_TASK_MSG_SEND_MESSAGE,channel->agent,channel,NULL,message);
}
直接調用了:mrcp_client_control_message_signal()
/** Signal task message */
static apt_bool_t mrcp_client_control_message_signal(
connection_task_msg_type_e type,
mrcp_connection_agent_t *agent,
mrcp_control_channel_t *channel,
mrcp_control_descriptor_t *descriptor,
mrcp_message_t *message)
{
apt_task_t *task = apt_poller_task_base_get(agent->task);
apt_task_msg_t *task_msg = apt_task_msg_get(task);
if(task_msg) {
connection_task_msg_t *msg = (connection_task_msg_t*)task_msg->data;
msg->type = type;
msg->agent = agent;
msg->channel = channel;
msg->descriptor = descriptor;
msg->message = message;
apt_task_msg_signal(task,task_msg);
}
return TRUE;
}
看好像又回來了,建立了apt_task_t,apt_task_msg_t兩個對象,然後又将消費任務放到隊列裡面去,但是這裡有兩點需要 注意的:
1.這次的apt_task_t是:apt_poller_task_base_get(agent->task)的方式擷取的(注意這裡的apt_task_t是 apt_poller_task類型對象的base屬性,與前面的不一樣),它是在mrcp_client_connection_agent_create中定義的:
agent->task = apt_poller_task_create(
max_connection_count,
mrcp_client_poller_signal_process,
agent,
msg_pool,
pool);
if(!agent->task) {
return NULL;
}
task = apt_poller_task_base_get(agent->task);
if(task) {
apt_task_name_set(task,id);
}
vtable = apt_poller_task_vtable_get(agent->task);
if(vtable) {
vtable->process_msg = mrcp_client_agent_msg_process;
}
2.這次的消息任務的類型為:CONNECTION_TASK_MSG_SEND_MESSAGE。
是以調用的是:mrcp_client_agent_msg_process(),然後會調用:mrcp_client_agent_messsage_send(),在這個函數裡面完成真正的工作,組裝待發送的字元串,并調用apr_socket_send()通過socket發送出去。