天天看点

快速消息队列 (FMQ)

HIDL 的远程过程调用 (RPC) 基础架构使用 Binder 机制,这意味着调用涉及开销、需要内核操作,并且可以触发调度程序操作。不过,对于必须在开销较小且无内核参与的进程之间传输数据的情况,则使用快速消息队列 (FMQ) 系统。

FMQ 会创建具有所需属性的消息队列。MQDescriptorSync 或 MQDescriptorUnsync 对象可通过 HIDL RPC 调用发送,并可供接收进程用于访问消息队列。

仅 C++ 支持快速消息队列。

MessageQueue 类型

Android 支持两种队列类型(称为“风格”):

  • 未同步队列:可以溢出,并且可以有多个读取器;每个读取器都必须及时读取数据,否则数据将会丢失。
  • 已同步队列:不能溢出,并且只能有一个读取器。

这两种队列都不能下溢(从空队列进行读取将会失败),并且只能有一个写入器。

未同步

未同步队列只有一个写入器,但可以有任意多个读取器。此类队列有一个写入位置;不过,每个读取器都会跟踪各自的独立读取位置。

对此类队列执行写入操作一定会成功(不会检查是否出现溢出情况),但前提是写入的内容不超出配置的队列容量(如果写入的内容超出队列容量,则操作会立即失败)。由于各个读取器的读取位置可能不同,因此每当新的写入操作需要空间时,系统都允许数据离开队列,而无需等待每个读取器读取每条数据。

读取操作负责在数据离开队列末尾之前对其进行检索。如果读取操作尝试读取的数据超出可用数据量,则该操作要么立即失败(如果非阻塞),要么等到有足够多的可用数据时(如果阻塞)。如果读取操作尝试读取的数据超出队列容量,则读取一定会立即失败。

如果某个读取器的读取速度无法跟上写入器的写入速度,则写入的数据量和该读取器尚未读取的数据量加在一起会超出队列容量,这会导致下一次读取不会返回数据;相反,该读取操作会将读取器的读取位置重置为等于最新的写入位置,然后返回失败。如果在发生溢出后但在下一次读取之前,系统查看可供读取的数据,则会显示可供读取的数据超出了队列容量,这表示发生了溢出。(如果队列溢出发生在系统查看可用数据和尝试读取这些数据之间,则溢出的唯一表征就是读取操作失败。)

已同步

已同步队列有一个写入器和一个读取器,其中写入器有一个写入位置,读取器有一个读取位置。写入的数据量不可能超出队列可提供的空间;读取的数据量不可能超出队列当前存在的数据量。如果尝试写入的数据量超出可用空间或尝试读取的数据量超出现有数据量,则会立即返回失败,或会阻塞到可以完成所需操作为止,具体取决于调用的是阻塞还是非阻塞写入或读取函数。如果尝试读取或尝试写入的数据量超出队列容量,则读取或写入操作一定会立即失败。

设置 FMQ

一个消息队列需要多个 MessageQueue 对象:一个对象用作数据写入目标位置,以及一个或多个对象用作数据读取来源。没有关于哪些对象用于写入数据或读取数据的显式配置;用户需负责确保没有对象既用于读取数据又用于写入数据,也就是说最多只有一个写入器,并且对于已同步队列,最多只有一个读取器。

创建第一个 MessageQueue 对象

通过单个调用创建并配置消息队列:

#include <fmq/MessageQueue.h>
using android::hardware::kSynchronizedReadWrite;
using android::hardware::kUnsynchronizedWrite;
using android::hardware::MQDescriptorSync;
using android::hardware::MQDescriptorUnsync;
using android::hardware::MessageQueue;
....
// For a synchronized non-blocking FMQ
mFmqSynchronized =
  new (std::nothrow) MessageQueue<uint16_t, kSynchronizedReadWrite>
      (kNumElementsInQueue);
// For an unsynchronized FMQ that supports blocking
mFmqUnsynchronizedBlocking =
  new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
      (kNumElementsInQueue, true /* enable blocking operations */);
           
  • MessageQueue<T,flavor>(numElements) M e s s a g e Q u e u e < T , f l a v o r > ( n u m E l e m e n t s ) 初始化程序负责创建并初始化支持消息队列功能的对象。
  • MessageQueue<T,flavor>(numElements,configureEventFlagWord) M e s s a g e Q u e u e < T , f l a v o r > ( n u m E l e m e n t s , c o n f i g u r e E v e n t F l a g W o r d ) 初始化程序负责创建并初始化支持消息队列功能和阻塞的对象。
  • flavor 可以是 kSynchronizedReadWrite(对于已同步队列)或 kUnsynchronizedWrite(对于未同步队列)。
  • uint16_t(在本示例中)可以是任意不涉及嵌套式缓冲区(无 string 或 vec 类型)、句柄或接口的 HIDL 定义的类型。
  • kNumElementsInQueue 表示队列的大小(以条目数表示);它用于确定将为队列分配的共享内存缓冲区的大小。

创建第二个 MessageQueue 对象

使用从消息队列的第一侧获取的 MQDescriptor 对象创建消息队列的第二侧。通过 HIDL RPC 调用将 MQDescriptor 对象发送到将容纳消息队列末端的进程。MQDescriptor 包含该队列的相关信息,其中包括:

  • 用于映射缓冲区和写入指针的信息。
  • 用于映射读取指针的信息(如果队列已同步)。
  • 用于映射事件标记字词的信息(如果队列是阻塞队列)。
  • 对象类型 (<T,flavor>) ( < T , f l a v o r > ) ,其中包含 HIDL 定义的队列元素类型和队列风格(已同步或未同步)。

MQDescriptor 对象可用于构建 MessageQueue 对象:

MessageQueue<T, flavor>::MessageQueue(const MQDescriptor<T, flavor>& Desc, bool resetPointers)
           

resetPointers 参数表示是否在创建此 MessageQueue 对象时将读取和写入位置重置为 0。在未同步队列中,读取位置(在未同步队列中,是每个 MessageQueue 对象的本地位置)在此对象创建过程中始终设为 0。通常,MQDescriptor 是在创建第一个消息队列对象过程中初始化的。要对共享内存进行额外的控制,您可以手动设置 MQDescriptor(MQDescriptor 是在 system/libhidl/base/include/hidl/MQDescriptor.h 中定义的),然后按照本部分所述内容创建每个 MessageQueue 对象。

阻塞队列和事件标记

默认情况下,队列不支持阻塞读取/写入。有两种类型的阻塞读取/写入调用:

  • 短格式:有三个参数(数据指针、项数、超时)。支持阻塞针对单个队列的各个读取/写入操作。在使用这种格式时,队列将在内部处理事件标记和位掩码,并且第一个消息队列对象必须初始化为第二个参数为 true。例如:
// For an unsynchronized FMQ that supports blocking
mFmqUnsynchronizedBlocking =
  new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
      (kNumElementsInQueue, true /* enable blocking operations */);
           
  • 长格式:有六个参数(包括事件标记和位掩码)。支持在多个队列之间使用共享 EventFlag 对象,并允许指定要使用的通知位掩码。在这种情况下,必须为每个读取和写入调用提供事件标记和位掩码。

对于长格式,可在每个 readBlocking() 和 writeBlocking() 调用中显式提供 EventFlag。可以将其中一个队列初始化为包含一个内部事件标记,如果是这样,则必须使用 getEventFlagWord() 从相应队列的 MessageQueue 对象中提取该标记,以用于在每个进程中创建与其他 FMQ 一起使用的 EventFlag 对象。或者,可以将 EventFlag 对象初始化为具有任何合适的共享内存。

一般来说,每个队列都应只使用以下三项之一:非阻塞、短格式阻塞,或长格式阻塞。混合使用也不算是错误;但要获得理想结果,则需要谨慎地进行编程。

使用 MessageQueue

MessageQueue 对象的公共 API 是:

size_t availableToWrite()  // Space available (number of elements).
size_t availableToRead()  // Number of elements available.
size_t getQuantumSize()  // Size of type T in bytes.
size_t getQuantumCount() // Number of items of type T that fit in the FMQ.
bool isValid() // Whether the FMQ is configured correctly.
const MQDescriptor<T, flavor>* getDesc()  // Return info to send to other process.

bool write(const T* data)  // Write one T to FMQ; true if successful.
bool write(const T* data, size_t count) // Write count T's; no partial writes.

bool read(T* data);  // read one T from FMQ; true if successful.
bool read(T* data, size_t count);  // Read count T's; no partial reads.

bool writeBlocking(const T* data, size_t count, int64_t timeOutNanos = );
bool readBlocking(T* data, size_t count, int64_t timeOutNanos = );

// Allows multiple queues to share a single event flag word
std::atomic<uint32_t>* getEventFlagWord();

bool writeBlocking(const T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = ,
android::hardware::EventFlag* evFlag = nullptr); // Blocking write operation for count Ts.

bool readBlocking(T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = ,
android::hardware::EventFlag* evFlag = nullptr) // Blocking read operation for count Ts;

//APIs to allow zero copy read/write operations
bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);
bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);
           

availableToWrite() 和 availableToRead() 可用于确定在一次操作中可传输的数据量。在未同步队列中:

  • availableToWrite() 始终返回队列容量。
  • 每个读取器都有自己的读取位置,并会针对 availableToRead() 进行自己的计算。
  • 如果是读取速度缓慢的读取器,队列可以溢出,这可能会导致 availableToRead() 返回的值大于队列的大小。发生溢出后进行的第一次读取操作将会失败,并且会导致相应读取器的读取位置被设为等于当前写入指针,无论是否通过 availableToRead() 报告了溢出都是如此。

    如果所有请求的数据都可以(并已)传输到队列/从队列传出,则 read() 和 write() 方法会返回 true。这些方法不会阻塞;它们要么成功(并返回 true),要么立即返回失败 (false)。

readBlocking() 和 writeBlocking() 方法会等到可以完成请求的操作,或等到超时(timeOutNanos 值为 0 表示永不超时)。

阻塞操作使用事件标记字词来实现。默认情况下,每个队列都会创建并使用自己的标记字词来支持短格式的 readBlocking() 和 writeBlocking()。多个队列可以共用一个字词,这样一来,进程就可以等待对任何队列执行写入或读取操作。可以通过调用 getEventFlagWord() 获得指向队列事件标记字词的指针,此类指针(或任何指向合适的共享内存位置的指针)可用于创建 EventFlag 对象,以传递到其他队列的长格式 readBlocking() 和 writeBlocking()。readNotification 和 writeNotification 参数用于指示事件标记中的哪些位应该用于针对相应队列发出读取和写入信号。readNotification 和 writeNotification 是 32 位的位掩码。

readBlocking() 会等待 writeNotification 位;如果该参数为 0,则调用一定会失败。如果 readNotification 值为 0,则调用不会失败,但成功的读取操作将不会设置任何通知位。在已同步队列中,这意味着相应的 writeBlocking() 调用一定不会唤醒,除非已在其他位置对相应的位进行设置。在未同步队列中,writeBlocking() 将不会等待(它应仍用于设置写入通知位),而且对于读取操作来说,不适合设置任何通知位。同样,如果 readNotification 为 0,writeblocking() 将会失败,并且成功的写入操作会设置指定的 writeNotification 位。

要一次等待多个队列,请使用 EventFlag 对象的 wait() 方法来等待通知的位掩码。wait() 方法会返回一个状态字词以及导致系统设置唤醒的位。然后,该信息可用于验证相应的队列是否有足够的控件或数据来完成所需的写入/读取操作,并执行非阻塞 write()/read()。要获取操作后通知,请再次调用 EventFlag 的 wake() 方法。有关 EventFlag 抽象的定义,请参阅 system/libfmq/include/fmq/EventFlag.h。

零复制操作

read/write/readBlocking/writeBlocking() API 会将指向输入/输出缓冲区的指针作为参数,并在内部使用 memcpy() 调用,以便在相应缓冲区和 FMQ 环形缓冲区之间复制数据。为了提高性能,Android 8.0 及更高版本包含一组 API,这些 API 可提供对环形缓冲区的直接指针访问,这样便无需使用 memcpy 调用。

使用以下公共 API 执行零复制 FMQ 操作:

bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);

bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);
           
  • beginWrite 方法负责提供用于访问 FMQ 环形缓冲区的基址指针。在数据写入之后,使用 commitWrite() 提交数据。

    beginRead/commitRead 方法的运作方式与之相同。

  • beginRead/Write 方法会将要读取/写入的消息条数视为输入,并会返回一个布尔值来指示是否可以执行读取/写入操作。如果可以执行读取或写入操作,则 memTx 结构体中会填入基址指针,这些指针可用于对环形缓冲区共享内存进行直接指针访问。
  • MemRegion 结构体包含有关内存块的详细信息,其中包括基础指针(内存块的基址)和以 T 表示的长度(以 HIDL 定义的消息队列类型表示的内存块长度)。
  • MemTransaction 结构体包含两个 MemRegion 结构体(first 和 second),因为对环形缓冲区执行读取或写入操作时可能需要绕回到队列开头。这意味着,要对 FMQ 环形缓冲区执行数据读取/写入操作,需要两个基址指针。

从 MemRegion 结构体获取基址和长度:

T* getAddress(); // gets the base address
size_t getLength(); // gets the length of the memory region in terms of T
size_t getLengthInBytes(); // gets the length of the memory region in bytes
           

获取对 MemTransaction 对象内的第一个和第二个 MemRegion 的引用:

const MemRegion& getFirstRegion(); // get a reference to the first MemRegion
const MemRegion& getSecondRegion(); // get a reference to the second MemRegion
           

使用零复制 API 写入 FMQ 的示例:

MessageQueueSync::MemTransaction tx;
if (mQueue->beginRead(dataLen, &tx)) {
    auto first = tx.getFirstRegion();
    auto second = tx.getSecondRegion();

    foo(first.getAddress(), first.getLength()); // method that performs the data write
    foo(second.getAddress(), second.getLength()); // method that performs the data write

    if(commitWrite(dataLen) == false) {
       // report error
    }
} else {
   // report error
}
           

以下辅助方法也是 MemTransaction 的一部分:

  • T* getSlot(size_t idx);

    返回一个指针,该指针指向属于此 MemTransaction 对象一部分的 MemRegions 内的槽位 idx。如果 MemTransaction 对象表示要读取/写入 N 个类型为 T 的项目的内存区域,则 idx 的有效范围在 0 到 N-1 之间。

  • bool copyTo(const T* data, size_t startIdx, size_t nMessages = 1);

    将 nMessages 个类型为 T 的项目写入到该对象描述的内存区域,从索引 startIdx 开始。此方法使用 memcpy(),但并非旨在用于零复制操作。如果 MemTransaction 对象表示要读取/写入 N 个类型为 T 的项目的内存区域,则 idx 的有效范围在 0 到 N-1 之间。

  • bool copyFrom(T* data, size_t startIdx, size_t nMessages = 1);

    一种辅助方法,用于从该对象描述的内存区域读取 nMessages 个类型为 T 的项目,从索引 startIdx 开始。此方法使用 memcpy(),但并非旨在用于零复制操作。

通过 HIDL 发送队列

在创建侧执行的操作:

  1. 创建消息队列对象,如上所述。
  2. 使用 isValid() 验证对象是否有效。
  3. 如果您要通过将 EventFlag 传递到长格式的 readBlocking()/writeBlocking() 来等待多个队列,则可以从经过初始化的 MessageQueue 对象提取事件标记指针(使用 getEventFlagWord())以创建标记,然后使用该标记创建必需的 EventFlag 对象。
  4. 使用 MessageQueue getDesc() 方法获取描述符对象。
  5. 在 .hal 文件中,为某个方法提供一个类型为 fmq_sync 或 fmq_unsync 的参数,其中 T 是 HIDL 定义的一种合适类型。使用此方法将 getDesc() 返回的对象发送到接收进程。

在接收侧执行的操作:

  1. 使用描述符对象创建 MessageQueue 对象。务必使用相同的队列风格和数据类型,否则将无法编译模板。
  2. 如果您已提取事件标记,则在接收进程中从相应的 MessageQueue 对象提取该标记。
  3. 使用 MessageQueue 对象传输数据。