天天看點

快速消息隊列 (FMQ)

HIDL 的遠端過程調用 (RPC) 基礎架構使用 Binder 機制,這意味着調用涉及開銷、需要核心操作,并且可以觸發排程程式操作。不過,對于必須在開銷較小且無核心參與的程序之間傳輸資料的情況,則使用快速消息隊列 (FMQ) 系統。

FMQ 會建立具有所需屬性的消息隊列。MQDescriptorSync 或 MQDescriptorUnsync 對象可通過 HIDL RPC 調用發送,并可供接收程序用于通路消息隊列。

僅 C++ 支援快速消息隊列。

MessageQueue 類型

Android 支援兩種隊列類型(稱為“風格”):

  • 未同步隊列:可以溢出,并且可以有多個讀取器;每個讀取器都必須及時讀取資料,否則資料将會丢失。
  • 已同步隊列:不能溢出,并且隻能有一個讀取器。

這兩種隊列都不能下溢(從空隊列進行讀取将會失敗),并且隻能有一個寫入器。

未同步

未同步隊列隻有一個寫入器,但可以有任意多個讀取器。此類隊列有一個寫入位置;不過,每個讀取器都會跟蹤各自的獨立讀取位置。

對此類隊列執行寫入操作一定會成功(不會檢查是否出現溢出情況),但前提是寫入的内容不超出配置的隊列容量(如果寫入的内容超出隊列容量,則操作會立即失敗)。由于各個讀取器的讀取位置可能不同,是以每當新的寫入操作需要空間時,系統都允許資料離開隊列,而無需等待每個讀取器讀取每條資料。

讀取操作負責在資料離開隊列末尾之前對其進行檢索。如果讀取操作嘗試讀取的資料超出可用資料量,則該操作要麼立即失敗(如果非阻塞),要麼等到有足夠多的可用資料時(如果阻塞)。如果讀取操作嘗試讀取的資料超出隊列容量,則讀取一定會立即失敗。

如果某個讀取器的讀取速度無法跟上寫入器的寫入速度,則寫入的資料量和該讀取器尚未讀取的資料量加在一起會超出隊列容量,這會導緻下一次讀取不會傳回資料;相反,該讀取操作會将讀取器的讀取位置重置為等于最新的寫入位置,然後傳回失敗。如果在發生溢出後但在下一次讀取之前,系統檢視可供讀取的資料,則會顯示可供讀取的資料超出了隊列容量,這表示發生了溢出。(如果隊列溢出發生在系統檢視可用資料和嘗試讀取這些資料之間,則溢出的唯一表征就是讀取操作失敗。)

已同步

已同步隊列有一個寫入器和一個讀取器,其中寫入器有一個寫入位置,讀取器有一個讀取位置。寫入的資料量不可能超出隊列可提供的空間;讀取的資料量不可能超出隊列目前存在的資料量。如果嘗試寫入的資料量超出可用空間或嘗試讀取的資料量超出現有資料量,則會立即傳回失敗,或會阻塞到可以完成所需操作為止,具體取決于調用的是阻塞還是非阻塞寫入或讀取函數。如果嘗試讀取或嘗試寫入的資料量超出隊列容量,則讀取或寫入操作一定會立即失敗。

設定 FMQ

一個消息隊列需要多個 MessageQueue 對象:一個對象用作資料寫入目标位置,以及一個或多個對象用作資料讀取來源。沒有關于哪些對象用于寫入資料或讀取資料的顯式配置;使用者需負責確定沒有對象既用于讀取資料又用于寫入資料,也就是說最多隻有一個寫入器,并且對于已同步隊列,最多隻有一個讀取器。

建立第一個 MessageQueue 對象

通過單個調用建立并配置消息隊列:

#include <fmq/MessageQueue.h>
using android::hardware::kSynchronizedReadWrite;
using android::hardware::kUnsynchronizedWrite;
using android::hardware::MQDescriptorSync;
using android::hardware::MQDescriptorUnsync;
using android::hardware::MessageQueue;
....
// For a synchronized non-blocking FMQ
mFmqSynchronized =
  new (std::nothrow) MessageQueue<uint16_t, kSynchronizedReadWrite>
      (kNumElementsInQueue);
// For an unsynchronized FMQ that supports blocking
mFmqUnsynchronizedBlocking =
  new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
      (kNumElementsInQueue, true /* enable blocking operations */);
           
  • MessageQueue<T,flavor>(numElements) M e s s a g e Q u e u e < T , f l a v o r > ( n u m E l e m e n t s ) 初始化程式負責建立并初始化支援消息隊列功能的對象。
  • MessageQueue<T,flavor>(numElements,configureEventFlagWord) M e s s a g e Q u e u e < T , f l a v o r > ( n u m E l e m e n t s , c o n f i g u r e E v e n t F l a g W o r d ) 初始化程式負責建立并初始化支援消息隊列功能和阻塞的對象。
  • flavor 可以是 kSynchronizedReadWrite(對于已同步隊列)或 kUnsynchronizedWrite(對于未同步隊列)。
  • uint16_t(在本示例中)可以是任意不涉及嵌套式緩沖區(無 string 或 vec 類型)、句柄或接口的 HIDL 定義的類型。
  • kNumElementsInQueue 表示隊列的大小(以條目數表示);它用于确定将為隊列配置設定的共享記憶體緩沖區的大小。

建立第二個 MessageQueue 對象

使用從消息隊列的第一側擷取的 MQDescriptor 對象建立消息隊列的第二側。通過 HIDL RPC 調用将 MQDescriptor 對象發送到将容納消息隊列末端的程序。MQDescriptor 包含該隊列的相關資訊,其中包括:

  • 用于映射緩沖區和寫入指針的資訊。
  • 用于映射讀取指針的資訊(如果隊列已同步)。
  • 用于映射事件标記字詞的資訊(如果隊列是阻塞隊列)。
  • 對象類型 (<T,flavor>) ( < T , f l a v o r > ) ,其中包含 HIDL 定義的隊列元素類型和隊列風格(已同步或未同步)。

MQDescriptor 對象可用于建構 MessageQueue 對象:

MessageQueue<T, flavor>::MessageQueue(const MQDescriptor<T, flavor>& Desc, bool resetPointers)
           

resetPointers 參數表示是否在建立此 MessageQueue 對象時将讀取和寫入位置重置為 0。在未同步隊列中,讀取位置(在未同步隊列中,是每個 MessageQueue 對象的本地位置)在此對象建立過程中始終設為 0。通常,MQDescriptor 是在建立第一個消息隊列對象過程中初始化的。要對共享記憶體進行額外的控制,您可以手動設定 MQDescriptor(MQDescriptor 是在 system/libhidl/base/include/hidl/MQDescriptor.h 中定義的),然後按照本部分所述内容建立每個 MessageQueue 對象。

阻塞隊列和事件标記

預設情況下,隊列不支援阻塞讀取/寫入。有兩種類型的阻塞讀取/寫入調用:

  • 短格式:有三個參數(資料指針、項數、逾時)。支援阻塞針對單個隊列的各個讀取/寫入操作。在使用這種格式時,隊列将在内部處理事件标記和位掩碼,并且第一個消息隊列對象必須初始化為第二個參數為 true。例如:
// For an unsynchronized FMQ that supports blocking
mFmqUnsynchronizedBlocking =
  new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
      (kNumElementsInQueue, true /* enable blocking operations */);
           
  • 長格式:有六個參數(包括事件标記和位掩碼)。支援在多個隊列之間使用共享 EventFlag 對象,并允許指定要使用的通知位掩碼。在這種情況下,必須為每個讀取和寫入調用提供事件标記和位掩碼。

對于長格式,可在每個 readBlocking() 和 writeBlocking() 調用中顯式提供 EventFlag。可以将其中一個隊列初始化為包含一個内部事件标記,如果是這樣,則必須使用 getEventFlagWord() 從相應隊列的 MessageQueue 對象中提取該标記,以用于在每個程序中建立與其他 FMQ 一起使用的 EventFlag 對象。或者,可以将 EventFlag 對象初始化為具有任何合适的共享記憶體。

一般來說,每個隊列都應隻使用以下三項之一:非阻塞、短格式阻塞,或長格式阻塞。混合使用也不算是錯誤;但要獲得理想結果,則需要謹慎地進行程式設計。

使用 MessageQueue

MessageQueue 對象的公共 API 是:

size_t availableToWrite()  // Space available (number of elements).
size_t availableToRead()  // Number of elements available.
size_t getQuantumSize()  // Size of type T in bytes.
size_t getQuantumCount() // Number of items of type T that fit in the FMQ.
bool isValid() // Whether the FMQ is configured correctly.
const MQDescriptor<T, flavor>* getDesc()  // Return info to send to other process.

bool write(const T* data)  // Write one T to FMQ; true if successful.
bool write(const T* data, size_t count) // Write count T's; no partial writes.

bool read(T* data);  // read one T from FMQ; true if successful.
bool read(T* data, size_t count);  // Read count T's; no partial reads.

bool writeBlocking(const T* data, size_t count, int64_t timeOutNanos = );
bool readBlocking(T* data, size_t count, int64_t timeOutNanos = );

// Allows multiple queues to share a single event flag word
std::atomic<uint32_t>* getEventFlagWord();

bool writeBlocking(const T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = ,
android::hardware::EventFlag* evFlag = nullptr); // Blocking write operation for count Ts.

bool readBlocking(T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = ,
android::hardware::EventFlag* evFlag = nullptr) // Blocking read operation for count Ts;

//APIs to allow zero copy read/write operations
bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);
bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);
           

availableToWrite() 和 availableToRead() 可用于确定在一次操作中可傳輸的資料量。在未同步隊列中:

  • availableToWrite() 始終傳回隊列容量。
  • 每個讀取器都有自己的讀取位置,并會針對 availableToRead() 進行自己的計算。
  • 如果是讀取速度緩慢的讀取器,隊列可以溢出,這可能會導緻 availableToRead() 傳回的值大于隊列的大小。發生溢出後進行的第一次讀取操作将會失敗,并且會導緻相應讀取器的讀取位置被設為等于目前寫入指針,無論是否通過 availableToRead() 報告了溢出都是如此。

    如果所有請求的資料都可以(并已)傳輸到隊列/從隊列傳出,則 read() 和 write() 方法會傳回 true。這些方法不會阻塞;它們要麼成功(并傳回 true),要麼立即傳回失敗 (false)。

readBlocking() 和 writeBlocking() 方法會等到可以完成請求的操作,或等到逾時(timeOutNanos 值為 0 表示永不逾時)。

阻塞操作使用事件标記字詞來實作。預設情況下,每個隊列都會建立并使用自己的标記字詞來支援短格式的 readBlocking() 和 writeBlocking()。多個隊列可以共用一個字詞,這樣一來,程序就可以等待對任何隊列執行寫入或讀取操作。可以通過調用 getEventFlagWord() 獲得指向隊列事件标記字詞的指針,此類指針(或任何指向合适的共享記憶體位置的指針)可用于建立 EventFlag 對象,以傳遞到其他隊列的長格式 readBlocking() 和 writeBlocking()。readNotification 和 writeNotification 參數用于訓示事件标記中的哪些位應該用于針對相應隊列發出讀取和寫入信号。readNotification 和 writeNotification 是 32 位的位掩碼。

readBlocking() 會等待 writeNotification 位;如果該參數為 0,則調用一定會失敗。如果 readNotification 值為 0,則調用不會失敗,但成功的讀取操作将不會設定任何通知位。在已同步隊列中,這意味着相應的 writeBlocking() 調用一定不會喚醒,除非已在其他位置對相應的位進行設定。在未同步隊列中,writeBlocking() 将不會等待(它應仍用于設定寫入通知位),而且對于讀取操作來說,不适合設定任何通知位。同樣,如果 readNotification 為 0,writeblocking() 将會失敗,并且成功的寫入操作會設定指定的 writeNotification 位。

要一次等待多個隊列,請使用 EventFlag 對象的 wait() 方法來等待通知的位掩碼。wait() 方法會傳回一個狀态字詞以及導緻系統設定喚醒的位。然後,該資訊可用于驗證相應的隊列是否有足夠的控件或資料來完成所需的寫入/讀取操作,并執行非阻塞 write()/read()。要擷取操作後通知,請再次調用 EventFlag 的 wake() 方法。有關 EventFlag 抽象的定義,請參閱 system/libfmq/include/fmq/EventFlag.h。

零複制操作

read/write/readBlocking/writeBlocking() API 會将指向輸入/輸出緩沖區的指針作為參數,并在内部使用 memcpy() 調用,以便在相應緩沖區和 FMQ 環形緩沖區之間複制資料。為了提高性能,Android 8.0 及更高版本包含一組 API,這些 API 可提供對環形緩沖區的直接指針通路,這樣便無需使用 memcpy 調用。

使用以下公共 API 執行零複制 FMQ 操作:

bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);

bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);
           
  • beginWrite 方法負責提供用于通路 FMQ 環形緩沖區的基址指針。在資料寫入之後,使用 commitWrite() 送出資料。

    beginRead/commitRead 方法的運作方式與之相同。

  • beginRead/Write 方法會将要讀取/寫入的消息條數視為輸入,并會傳回一個布爾值來訓示是否可以執行讀取/寫入操作。如果可以執行讀取或寫入操作,則 memTx 結構體中會填入基址指針,這些指針可用于對環形緩沖區共享記憶體進行直接指針通路。
  • MemRegion 結構體包含有關記憶體塊的詳細資訊,其中包括基礎指針(記憶體塊的基址)和以 T 表示的長度(以 HIDL 定義的消息隊列類型表示的記憶體塊長度)。
  • MemTransaction 結構體包含兩個 MemRegion 結構體(first 和 second),因為對環形緩沖區執行讀取或寫入操作時可能需要繞回到隊列開頭。這意味着,要對 FMQ 環形緩沖區執行資料讀取/寫入操作,需要兩個基址指針。

從 MemRegion 結構體擷取基址和長度:

T* getAddress(); // gets the base address
size_t getLength(); // gets the length of the memory region in terms of T
size_t getLengthInBytes(); // gets the length of the memory region in bytes
           

擷取對 MemTransaction 對象内的第一個和第二個 MemRegion 的引用:

const MemRegion& getFirstRegion(); // get a reference to the first MemRegion
const MemRegion& getSecondRegion(); // get a reference to the second MemRegion
           

使用零複制 API 寫入 FMQ 的示例:

MessageQueueSync::MemTransaction tx;
if (mQueue->beginRead(dataLen, &tx)) {
    auto first = tx.getFirstRegion();
    auto second = tx.getSecondRegion();

    foo(first.getAddress(), first.getLength()); // method that performs the data write
    foo(second.getAddress(), second.getLength()); // method that performs the data write

    if(commitWrite(dataLen) == false) {
       // report error
    }
} else {
   // report error
}
           

以下輔助方法也是 MemTransaction 的一部分:

  • T* getSlot(size_t idx);

    傳回一個指針,該指針指向屬于此 MemTransaction 對象一部分的 MemRegions 内的槽位 idx。如果 MemTransaction 對象表示要讀取/寫入 N 個類型為 T 的項目的記憶體區域,則 idx 的有效範圍在 0 到 N-1 之間。

  • bool copyTo(const T* data, size_t startIdx, size_t nMessages = 1);

    将 nMessages 個類型為 T 的項目寫入到該對象描述的記憶體區域,從索引 startIdx 開始。此方法使用 memcpy(),但并非旨在用于零複制操作。如果 MemTransaction 對象表示要讀取/寫入 N 個類型為 T 的項目的記憶體區域,則 idx 的有效範圍在 0 到 N-1 之間。

  • bool copyFrom(T* data, size_t startIdx, size_t nMessages = 1);

    一種輔助方法,用于從該對象描述的記憶體區域讀取 nMessages 個類型為 T 的項目,從索引 startIdx 開始。此方法使用 memcpy(),但并非旨在用于零複制操作。

通過 HIDL 發送隊列

在建立側執行的操作:

  1. 建立消息隊列對象,如上所述。
  2. 使用 isValid() 驗證對象是否有效。
  3. 如果您要通過将 EventFlag 傳遞到長格式的 readBlocking()/writeBlocking() 來等待多個隊列,則可以從經過初始化的 MessageQueue 對象提取事件标記指針(使用 getEventFlagWord())以建立标記,然後使用該标記建立必需的 EventFlag 對象。
  4. 使用 MessageQueue getDesc() 方法擷取描述符對象。
  5. 在 .hal 檔案中,為某個方法提供一個類型為 fmq_sync 或 fmq_unsync 的參數,其中 T 是 HIDL 定義的一種合适類型。使用此方法将 getDesc() 傳回的對象發送到接收程序。

在接收側執行的操作:

  1. 使用描述符對象建立 MessageQueue 對象。務必使用相同的隊列風格和資料類型,否則将無法編譯模闆。
  2. 如果您已提取事件标記,則在接收程序中從相應的 MessageQueue 對象提取該标記。
  3. 使用 MessageQueue 對象傳輸資料。