天天看點

Alexa SDK庖丁解牛-第二回:run函數解析

這回我們重點介紹下main中的void UserInputManager::run() { void InteractionManager::tap() } 在此我們首先看下tap事件處理: std::future<bool> AudioInputProcessor::recognize() ======》 bool AudioInputProcessor::executeRecognize()與 Creating the Context Manager、DialogUXStateAggregator 如何互動呢?? 1)首先bool AudioInputProcessor::executeRecognize()捕獲到事件; 2)狀态更新到DialogUXStateAggregator; bool AudioInputProcessor:: executeRecognize() //按鍵觸發、喚醒詞觸發 { setState(ObserverInterface::State::RECOGNIZING);

m_contextManager->getContext(shared_from_this());

} 3) setState(ObserverInterface::State::RECOGNIZING); 當audioinput狀态為AudioInputProcessorObserverInterface::State::RECOGNIZING,輪詢設定狀态更新,觸發對話管理器事件更新: void DialogUXStateAggregator::onStateChanged(AudioInputProcessorObserverInterface::State state) { //由于audioinput狀态更新為RECOGNIZING,則對話進入LISTENING case AudioInputProcessorObserverInterface::State::RECOGNIZING:  onActivityStarted();  setState(DialogUXStateObserverInterface::DialogUXState::LISTENING);  return; } 4) m_contextManager->getContext(shared_from_this()); Context Manager線程的主循環解阻塞,看文章最後面的說明: void ContextManager::updateStatesLoop() {  while (true) { m_contextLoopNotifier.wait(); //條件變量阻塞等待 .......................................................... sendContextToRequesters();  } }

sendContextAndClearQueue()=> onContextAvailable(); void AudioInputProcessor:: onContextAvailable(const std::string& jsonContext) { m_executor.submit([this, jsonContext]() { executeOnContextAvailable(jsonContext); }); } 再次擷取channel,dialogRequestId,發送request m_directiveSequencer-> setDialogRequestId(dialogRequestId); =>setDialogRequestIdLocked() =>scrubDialogRequestIdLocked() =>m_wakeProcessingLoop.notify_one(); auto msgIdAndJsonEvent = buildJsonEventString("Recognize", dialogRequestId, m_payload, jsonContext); // Assemble the MessageRequest. It will be sent by executeOnFocusChangedwhen we acquire the channel. m_request->addObserver(shared_from_this());

bool HTTP2Stream::setCommonOptions(const std::string& url, const std::string& authToken) { (!m_transfer.setWriteCallback(&HTTP2Stream::writeCallback, this)); (!m_transfer.setHeaderCallback(&HTTP2Stream::headerCallback, this)); setopt(CURLOPT_TCP_KEEPALIVE, "CURLOPT_TCP_KEEPALIVE", 1); } writeCallback分析:即收到http消息 size_t HTTP2Stream:: writeCallback(char* data, size_t size, size_t nmemb, void* user) => MimeParser::DataParsedStatus MimeParser:: feed(char* data, size_t length) => m_multipartReader. feed(data, length); => size_t feed(const char *buffer, size_t len) { return parser. feed(buffer, len); } => size_t feed(const char *buffer, size_t len) { callback(onPartEnd); callback(onPartBegin); callback(onHeaderEnd); callback(onPartBegin); } => void MimeParser:: partEndCallback(void* userData) =>  void MessageRouter:: consumeMessage(const std::string& contextId, const std::string& message) =>  void MessageRouter:: notifyObserverOnReceive(const std::string& contextId, const std::string& message) =>  void MessageInterpreter:: receive(const std::string& contextId, const std::string& message); =>  m_directiveSequencer->onDirective(avsDirective);  {  auto handled = m_directiveRouter-> preHandleDirective();  ============================  m_receivingQueue. push_back(directive);  m_wakeReceivingLoop. notify_one();  }

bool DirectiveRouter::preHandleDirective() { handlerAndPolicy.handler->preHandleDirective(directive, std::move(result)); } =》 void CapabilityAgent::preHandleDirective() { } => bool DirectiveRouter::handleDirective() =》 void AudioInputProcessor::handleDirective(std::shared_ptr<DirectiveInfo> info) { handleStopCaptureDirective(info); } =》 executeStopCapture(stopImmediately, info); =》 setState(ObserverInterface::State::BUSY); =》 observer->onStateChanged(m_state);

另一個分支: void SpeechSynthesizer::handleDirective() => void SpeechSynthesizer::executeHandle(std::shared_ptr<DirectiveInfo> info) => void SpeechSynthesizer::addToDirectiveQueue(std::shared_ptr<SpeakDirectiveInfo> speakInfo) => m_speakInfoQueue.push_back(speakInfo); executeHandleAfterValidation(speakInfo); => bool FocusManager::acquireChannel() =》 void FocusManager::acquireChannelHelper() => channelToAcquire->setFocus(FocusState::FOREGROUND); => m_observer->onFocusChanged(m_focusState); => void SpeechSynthesizer::onFocusChanged(FocusState newFocus) =>  1)void SpeechSynthesizer:: setCurrentStateLocked()  2)executeStateChange();

1)其中void SpeechSynthesizer:: setCurrentStateLocked()分析如下:  observer->onStateChanged(m_currentState); =》  void AudioInputProcessor::onFocusChanged(avsCommon::avs::FocusState newFocus) =>  void AudioInputProcessor:: executeOnFocusChanged(avsCommon::avs::FocusState newFocus) =>  void AudioInputProcessor::executeResetState() =>  void AudioInputProcessor::setState(ObserverInterface::State state) => 2)其中executeStateChange()分析如下:

=》 startPlaying()

{   m_speechPlayer-> setSource(std::move(m_currentInfo->attachmentReader);//這裡做了很多設定

 m_speechPlayer-> play(m_mediaSourceId);

} =》  bool MediaPlayer:: play(MediaPlayer::SourceId id);

=》  void MediaPlayer:: handlePlay(SourceId id, std::promise<bool>* promise)

 {   m_busWatchId = gst_bus_add_watch(bus, &MediaPlayer::onBusMessage, this);  } =》  (mediaPlayer)->handleBusMessage(message);

=》  gboolean MediaPlayer::handleBusMessage(GstMessage* message);

在此重點看GST_MESSAGE_STATE_CHANGED分支就可以。 void MediaPlayer::sendPlaybackStarted()

當這裡我們需要回過頭看看在main初始化中 Creating the Directive Sequencer:負責把AVS指令序列化管理,根據namespace推到對應的元件子產品 m_directiveSequencer = adsl::DirectiveSequencer::create(exceptionSender); { return std::unique_ptr<DirectiveSequencerInterface>(new DirectiveSequencer(exceptionSender)); } 構造函數建立了線程: DirectiveSequencer::DirectiveSequencer() { m_receivingThread = std::thread(&DirectiveSequencer:: receivingLoop, this); } void DirectiveSequencer:: receivingLoop() { auto wake = [this]() { return !m_receivingQueue.empty() || m_isShuttingDown; };

std::unique_lock<std::mutex> lock(m_mutex); while (true) {  m_wakeReceivingLoop.wait(lock, wake);  if (m_isShuttingDown) {    break;  }  receiveDirectiveLocked(lock); }

void DirectiveSequencer::receiveDirectiveLocked(std::unique_lock<std::mutex>& lock) { ================================== auto directive = m_receivingQueue.front(); m_receivingQueue.pop_front(); ================================== handled = m_directiveRouter.handleDirectiveImmediately(directive); 或者 handled = m_directiveRouter.handleDirectiveWithPolicyHandleImmediately(directive);

} 其實在SDK的初試中建立了很多線程,他們都在block等待狀态更新,這裡比較重要的是 HTTP2Stream線程。

繼續閱讀