天天看點

OpenHarmony——EventHandler源碼解析

作者:成飛

EventHandler是用于處理線程間通信的一種機制,可以通過EventRunner建立新線程,将耗時的操作放到新線程上執行。這樣既不阻塞原來的線程,任務又可以得到合理的處理。比如:主線程使用EventHandler建立子線程,子線程做耗時的下載下傳圖檔操作,下載下傳完成後,子線程通過EventHandler通知主線程,主線程再更新UI。

基本概念

EventRunner是一種事件循環器,循環處理從該EventRunner建立的新線程的事件隊列中擷取InnerEvent事件。InnerEvent是EventHandler投遞的事件。

EventHandler是一種使用者在目前線程上投遞InnerEvent事件到異步線程上處理的機制。每一個EventHandler和指定的EventRunner所建立的新線程綁定,并且該新線程内部有一個事件隊列。EventHandler可以投遞指定的InnerEvent事件到這個事件隊列。EventRunner從事件隊列裡循環地取出事件,并在EventRunner所線上程執行processEvent回調。一般,EventHandler有兩個主要作用:

  • 在不同線程間分發和處理InnerEvent事件。
  • 延遲處理InnerEvent事件。

運作機制

OpenHarmony——EventHandler源碼解析

使用EventHandler實作線程間通信的主要流程:

  1. EventHandler投遞具體的InnerEvent事件到EventRunner所建立的線程的事件隊列。
  2. EventRunner循環從事件隊列中擷取InnerEvent事件。
  3. 新線程上處理該事件:觸發InnerEvent的回調方法并觸發EventHandler的處理方法。

接口說明

ohos.events.emitter(Emitter)

接口名 描述
on(event: InnerEvent, callback: Callback<EventData>) 持續訂閱某個事件以及接收事件的回調處理。
once(event: InnerEvent, callback: Callback<EventData>) 單次訂閱某個事件以及接收事件的回調處理,接收到回調處理後自動取消訂閱。
off(eventId: number) 取消訂閱某個事件。
emit(event: InnerEvent, data?: EventData) 發送一個事件到事件隊列。

InnerEvent

程序内的事件。

名稱 參數類型 可讀 可寫 說明
eventId number 事件的ID,由開發者定義用來辨識事件。
priority EventPriority 事件被投遞的優先級。

EventData

發送事件時傳遞的資料。

名稱 參數類型 可讀 可寫 說明
data [key:string]:any 發送事件時傳遞的資料,資料類型支援字元串,整形和布爾型。

EventPriority

用于表示事件被投遞的優先級

屬性 描述
Priority.IMMEDIATE 表示事件被立即投遞
Priority.HIGH 表示事件先于LOW優先級投遞
Priority.LOW 表示事件優于IDLE優先級投遞,事件的預設優先級是LOW
Priority.IDLE 表示在沒有其他事件的情況下,才投遞該事件

代碼目錄

foundation/
└──foundation/appexecfwk/standard
   ├── interfaces
   │    ├── innerkits
   │    |    └── libeventhandler	   # 内部接口
   │    └──  napi
   │         └── eventhandler          # NAPI接口 
   └──— libs/libeventhandler           # 機制實作代碼			      
           

類圖

OpenHarmony——EventHandler源碼解析

EventRunner:

EventRunner的create方法在建立Runner線程的同時會執行個體化一個EventQueue,并指派給類内部的成員變量queue_ 。 EventRunner的Runner線程輪詢queue_,取出隊列中的InnerEvent事件,并處理該事件。

EventHandler:

EventHandler的各Send方法通過調用EventQueue的Insert方法将InnerEvent插入EventQueue的subEventQueues_中 或 idleEvents__ 中。

當調用Post方法,投遞某個回調函數時,該回調函數會指派給InnerEvent的taskCallback_,并通過Send方法将将擁有回調函數的InnerEvent插入隊列。

其中,subEventQueues_是size為3的SubEventQueue的清單,3個SubEventQueue分别用于存儲三種不同優先級(IMMEDIATE/HIGH/LOW)的InnerEvent。

idleEvents_用于存儲優先級為IDLE的InnerEvent。

同理,Remove方法即從上述subEventQueue_ 或 idleEvent_ 中删除某事件。

EventQueue

通過成員變量 ioWaiter_ 取出定時或延時的InnerEvent事件。同時實作對檔案描述符監聽事件的回調。

ioWaiter_ 成員的類型預設使用NoneIoWaiter , 此時不支援檔案描述符的監聽。隊列中的延時或定時事件的取出,通過NoneIoWaiter調用 std::condition_variable機制實作。

當EventHandler中的AddFileDescriptorListener被調用時,ioWaiter_成員的類型自動轉換成EpollIoWaiter類型。EpollIoWaiter支援檔案描述符的監聽事件。此時,隊列中的延時或定時事件的取出,通過系統的epoll機制實作。

關鍵代碼摘錄

EventRunner

// Start event looper.
        for (auto event = queue_->GetEvent(); event; event = queue_->GetEvent()) {   //監聽輪詢queue_
            std::shared_ptr<EventHandler> handler = event->GetOwner();
            // Make sure owner of the event exists.
            if (handler) {
                std::shared_ptr<Logger> logging = logger_;
                std::stringstream address;
                address << handler.get();
                if (logging != nullptr) {
                    if (!event->HasTask()) {
                        logging->Log("Dispatching to handler event id = " + std::to_string(event->GetInnerEventId()));
                    } else {
                        logging->Log("Dispatching to handler event task name = " + event->GetTaskName());
                    }
                }
                handler->DistributeEvent(event); //執行aInnerEvent事件

                if (logging != nullptr) {
                    logging->Log("Finished to handler(0x" + address.str() + ")");
                }
            }
            // Release event manually, otherwise event will be released until next event coming.
            event.reset();
        }
           

EventQueue

void EventQueue::Insert(InnerEvent::Pointer &event, Priority priority) //事件或任務的插入
{
    if (!event) {
        HILOGE("Insert: Could not insert an invalid event");
        return;
    }

    std::lock_guard<std::mutex> lock(queueLock_);
    bool needNotify = false;
    switch (priority) { //根據Event的優先級将該Event插入不同的事件隊列
        case Priority::IMMEDIATE:
        case Priority::HIGH:
        case Priority::LOW: {
            needNotify = (event->GetHandleTime() < wakeUpTime_);
            InsertEventsLocked(subEventQueues_[static_cast<uint32_t>(priority)].queue, event);
            break;
        }
        case Priority::IDLE: {
            // Never wake up thread if insert an idle event.
            InsertEventsLocked(idleEvents_, event);
            break;
        }
        default:
            break;
    }

    if (needNotify) {
        ioWaiter_->NotifyOne();
    }
}
           
InnerEvent::Pointer EventQueue::GetEvent()  //事件或任務的取出
{
    std::unique_lock<std::mutex> lock(queueLock_);
    while (!finished_) {
        InnerEvent::TimePoint nextWakeUpTime = InnerEvent::TimePoint::max();
        InnerEvent::Pointer event = GetExpiredEventLocked(nextWakeUpTime);
        if (event) {
            return event;
        }
        WaitUntilLocked(nextWakeUpTime, lock);
    }

    HILOGD("GetEvent: Break out");
    return InnerEvent::Pointer(nullptr, nullptr);
}
           

EventHandler

void EventHandler::DistributeEvent(const InnerEvent::Pointer &event) //執行InnerEvent事件
{
    if (!event) {
        HILOGE("DistributeEvent: Could not distribute an invalid event");
        return;
    }

    // Save old event handler.
    std::weak_ptr<EventHandler> oldHandler = currentEventHandler;
    // Save current event handler into thread local data.
    currentEventHandler = shared_from_this();

    auto spanId = event->GetTraceId();
    auto traceId = HiTrace::GetId();
    bool allowTraceOutPut = AllowHiTraceOutPut(spanId, event->HasWaiter());
    if (allowTraceOutPut) {
        HiTrace::SetId(*spanId);
        HiTracePointerOutPut(spanId, event, "Receive", HiTraceTracepointType::HITRACE_TP_SR);
    }

    InnerEvent::TimePoint nowStart = InnerEvent::Clock::now();
    DeliveryTimeAction(event, nowStart);

    if (event->HasTask()) {  //如果有回調函數
        // Call task callback directly if contains a task.
        (event->GetTaskCallback())(); //執行回調函數
    } else {
        // Otherwise let developers to handle it.
        ProcessEvent(event); //執行ProcessEvent
    }

    DistributeTimeAction(event, nowStart);

    if (allowTraceOutPut) {
        HiTrace::Tracepoint(HiTraceTracepointType::HITRACE_TP_SS, *spanId, "Event Distribute over");
        HiTrace::ClearId();
        if (traceId.IsValid()) {
            HiTrace::SetId(traceId);
        }
    }

    // Restore current event handler.
    if (oldHandler.expired()) {
        currentEventHandler = nullptr;
    } else {
        currentEventHandler = oldHandler;
    }
}

           

更多原創内容請關注:深開鴻技術團隊

入門到精通、技巧到案例,系統化分享HarmonyOS開發技術,歡迎投稿和訂閱,讓我們一起攜手前行共建鴻蒙生态。

繼續閱讀