這回我們重點介紹下main中的void UserInputManager::run() { void InteractionManager::tap() } 在此我們首先看下tap事件處理: std::future<bool> AudioInputProcessor::recognize() ======》 bool AudioInputProcessor::executeRecognize()與 Creating the Context Manager、DialogUXStateAggregator 如何互動呢?? 1)首先bool AudioInputProcessor::executeRecognize()捕獲到事件; 2)狀态更新到DialogUXStateAggregator; bool AudioInputProcessor:: executeRecognize() //按鍵觸發、喚醒詞觸發 { setState(ObserverInterface::State::RECOGNIZING);
m_contextManager->getContext(shared_from_this());
} 3) setState(ObserverInterface::State::RECOGNIZING); 當audioinput狀态為AudioInputProcessorObserverInterface::State::RECOGNIZING,輪詢設定狀态更新,觸發對話管理器事件更新: void DialogUXStateAggregator::onStateChanged(AudioInputProcessorObserverInterface::State state) { //由于audioinput狀态更新為RECOGNIZING,則對話進入LISTENING case AudioInputProcessorObserverInterface::State::RECOGNIZING: onActivityStarted(); setState(DialogUXStateObserverInterface::DialogUXState::LISTENING); return; } 4) m_contextManager->getContext(shared_from_this()); Context Manager線程的主循環解阻塞,看文章最後面的說明: void ContextManager::updateStatesLoop() { while (true) { m_contextLoopNotifier.wait(); //條件變量阻塞等待 .......................................................... sendContextToRequesters(); } }
sendContextAndClearQueue()=> onContextAvailable(); void AudioInputProcessor:: onContextAvailable(const std::string& jsonContext) { m_executor.submit([this, jsonContext]() { executeOnContextAvailable(jsonContext); }); } 再次擷取channel,dialogRequestId,發送request m_directiveSequencer-> setDialogRequestId(dialogRequestId); =>setDialogRequestIdLocked() =>scrubDialogRequestIdLocked() =>m_wakeProcessingLoop.notify_one(); auto msgIdAndJsonEvent = buildJsonEventString("Recognize", dialogRequestId, m_payload, jsonContext); // Assemble the MessageRequest. It will be sent by executeOnFocusChangedwhen we acquire the channel. m_request->addObserver(shared_from_this());
bool HTTP2Stream::setCommonOptions(const std::string& url, const std::string& authToken) { (!m_transfer.setWriteCallback(&HTTP2Stream::writeCallback, this)); (!m_transfer.setHeaderCallback(&HTTP2Stream::headerCallback, this)); setopt(CURLOPT_TCP_KEEPALIVE, "CURLOPT_TCP_KEEPALIVE", 1); } writeCallback分析:即收到http消息 size_t HTTP2Stream:: writeCallback(char* data, size_t size, size_t nmemb, void* user) => MimeParser::DataParsedStatus MimeParser:: feed(char* data, size_t length) => m_multipartReader. feed(data, length); => size_t feed(const char *buffer, size_t len) { return parser. feed(buffer, len); } => size_t feed(const char *buffer, size_t len) { callback(onPartEnd); callback(onPartBegin); callback(onHeaderEnd); callback(onPartBegin); } => void MimeParser:: partEndCallback(void* userData) => void MessageRouter:: consumeMessage(const std::string& contextId, const std::string& message) => void MessageRouter:: notifyObserverOnReceive(const std::string& contextId, const std::string& message) => void MessageInterpreter:: receive(const std::string& contextId, const std::string& message); => m_directiveSequencer->onDirective(avsDirective); { auto handled = m_directiveRouter-> preHandleDirective(); ============================ m_receivingQueue. push_back(directive); m_wakeReceivingLoop. notify_one(); }
bool DirectiveRouter::preHandleDirective() { handlerAndPolicy.handler->preHandleDirective(directive, std::move(result)); } =》 void CapabilityAgent::preHandleDirective() { } => bool DirectiveRouter::handleDirective() =》 void AudioInputProcessor::handleDirective(std::shared_ptr<DirectiveInfo> info) { handleStopCaptureDirective(info); } =》 executeStopCapture(stopImmediately, info); =》 setState(ObserverInterface::State::BUSY); =》 observer->onStateChanged(m_state);
另一個分支: void SpeechSynthesizer::handleDirective() => void SpeechSynthesizer::executeHandle(std::shared_ptr<DirectiveInfo> info) => void SpeechSynthesizer::addToDirectiveQueue(std::shared_ptr<SpeakDirectiveInfo> speakInfo) => m_speakInfoQueue.push_back(speakInfo); executeHandleAfterValidation(speakInfo); => bool FocusManager::acquireChannel() =》 void FocusManager::acquireChannelHelper() => channelToAcquire->setFocus(FocusState::FOREGROUND); => m_observer->onFocusChanged(m_focusState); => void SpeechSynthesizer::onFocusChanged(FocusState newFocus) => 1)void SpeechSynthesizer:: setCurrentStateLocked() 2)executeStateChange();
1)其中void SpeechSynthesizer:: setCurrentStateLocked()分析如下: observer->onStateChanged(m_currentState); =》 void AudioInputProcessor::onFocusChanged(avsCommon::avs::FocusState newFocus) => void AudioInputProcessor:: executeOnFocusChanged(avsCommon::avs::FocusState newFocus) => void AudioInputProcessor::executeResetState() => void AudioInputProcessor::setState(ObserverInterface::State state) => 2)其中executeStateChange()分析如下:
=》 startPlaying()
{ m_speechPlayer-> setSource(std::move(m_currentInfo->attachmentReader);//這裡做了很多設定
m_speechPlayer-> play(m_mediaSourceId);
} =》 bool MediaPlayer:: play(MediaPlayer::SourceId id);
=》 void MediaPlayer:: handlePlay(SourceId id, std::promise<bool>* promise)
{ m_busWatchId = gst_bus_add_watch(bus, &MediaPlayer::onBusMessage, this); } =》 (mediaPlayer)->handleBusMessage(message);
=》 gboolean MediaPlayer::handleBusMessage(GstMessage* message);
在此重點看GST_MESSAGE_STATE_CHANGED分支就可以。 void MediaPlayer::sendPlaybackStarted()
當這裡我們需要回過頭看看在main初始化中 Creating the Directive Sequencer:負責把AVS指令序列化管理,根據namespace推到對應的元件子產品 m_directiveSequencer = adsl::DirectiveSequencer::create(exceptionSender); { return std::unique_ptr<DirectiveSequencerInterface>(new DirectiveSequencer(exceptionSender)); } 構造函數建立了線程: DirectiveSequencer::DirectiveSequencer() { m_receivingThread = std::thread(&DirectiveSequencer:: receivingLoop, this); } void DirectiveSequencer:: receivingLoop() { auto wake = [this]() { return !m_receivingQueue.empty() || m_isShuttingDown; };
std::unique_lock<std::mutex> lock(m_mutex); while (true) { m_wakeReceivingLoop.wait(lock, wake); if (m_isShuttingDown) { break; } receiveDirectiveLocked(lock); }
void DirectiveSequencer::receiveDirectiveLocked(std::unique_lock<std::mutex>& lock) { ================================== auto directive = m_receivingQueue.front(); m_receivingQueue.pop_front(); ================================== handled = m_directiveRouter.handleDirectiveImmediately(directive); 或者 handled = m_directiveRouter.handleDirectiveWithPolicyHandleImmediately(directive);
} 其實在SDK的初試中建立了很多線程,他們都在block等待狀态更新,這裡比較重要的是 HTTP2Stream線程。