天天看點

詳解協定緩存區的概念與實踐

作者:FennJava

讓我們試想一種場景:團隊中幾個說不同語言的人見面了。為了互相了解,他們需要使用一種每個人都能聽得懂的語言進行交流。為此,他們都應該在自己的母語以及該通用語言之間,執行資訊的轉換。同理,如果我們使用協定緩存區(Protocol Buffers)消息語言,則能夠讓整個團隊都使用自己特定的程式設計語言去建立消息,接着被翻譯成通用語言的形式。正如 ​Wikipedia解釋的那樣​ ​:“Google正廣泛地使用協定緩沖區,來存儲和交換各種結構化的資訊。該方法作為自定義的遠端過程調用(Remote procedure call,RPC)系統的基礎,可用于幾乎所有與Google互動的機器間通信。”

由于我經常需要通過參與各個行業的開發項目,與定制軟體打交道,而且專注于在嵌入式系統中使用Modern C++、以及Qt去建構應用程式,是以下面我将與您分享自己在記憶體受限的嵌入式系統上,使用協定緩沖區的經驗。

協定緩沖區消息語言

正如Google所言,“協定緩沖區能夠讓您以.proto檔案的形式,定義需要的資料結構,以便您使用特定生成的源代碼,從不同的資料流,使用不同的語言,去輕松地讀寫自己的結構化資料。”我們可以通常下圖了解其基本原理:

詳解協定緩存區的概念與實踐

根據協定緩存區語言指南,我們首先來讨論一個非常簡單的例子。假設您準備定義一個Person消息格式,由于每個人都有一個名字、年齡和電子郵件等屬性,是以被用于定義消息類型的.proto檔案的内容可設定為如下形式:

ProtoBuf
// person.proto

syntax = "proto3";

message Person {
  string name = 1;
  int32 age = 2;
  string email = 3;
}           

其中第一行便指定了使用檔案的proto3文法。接着,Person的消息定義指定了三個字段(即,名稱/值對)。每一個都可以被用于您需要包含在此類消息的資料塊中。而且每個字段包含了名稱、類型和字段數的資訊。

擁有了.proto檔案,您便可以為特定的語言生成源代碼語言。例如,C++會使用一種特殊的被稱為協定編譯器(protocol compiler,又名 protoc)的編譯器。請參見下圖:

詳解協定緩存區的概念與實踐

在此,我們把能夠生成包含原生語言(language-native)結構,以操控消息的接口稱為API。API可以為您提供所有需要set和retrieve資料的類和方法,以及對位元組流的serialization to和parsing from方法。

在C++中,各種生成的檔案都包含了Person類和所有必要的方法,以處理底層的資料。例如:

C++
void clear_name();
const ::std::string &name() const;
void set_name(const ::std::string &value);
void set_name(const char *value);           

此外,Person類通過繼承來自google::protobuf::Message的方法,對資料流進行序列化或反序列化(解析)。例如:

C++
// Serialization:
bool SerializeToOstream(std::ostream* output) const;
bool SerializePartialToOstream(std::ostream* output) const;
// Deserialization:
bool ParseFromIstream(std::istream* input);
bool ParsePartialFromIstream(std::istream* input);           

在Protobuf-C中使用自定義的靜态配置設定

如果您正在編寫一個完整的靜态配置設定系統,那麼可能需要用到的是C而不是C++。下面,我們來讨論如何使用靜态配置設定的緩沖區,而不是動态配置設定的記憶體,去編寫一個定制的配置設定器。

預設情況下,Protobuf-C會通過malloc(),來調用動态的配置設定記憶體。Protobuf-C會向您提供一種定制配置設定器(custom allocator)的能力,以替代malloc()和free()函數。下圖展示了malloc()和serial_alloc()在處理上的不同,我将對serial_alloc()進行後續讨論。

詳解協定緩存區的概念與實踐

在本例中,我将實作自定義的malloc()和free()函數,并将其使用到自定義配置設定--serial_allocator中。它會通過Protobuf-C庫将資料轉換成一個連續的、靜态配置設定的記憶體塊。下面兩張圖分别展示了malloc()和serial_alloc()的具體差異:

詳解協定緩存區的概念與實踐

在heap上的malloc()配置設定

詳解協定緩存區的概念與實踐

靜态緩沖區上的serial_alloc()配置設定

由于malloc()在堆(heap)上配置設定記憶體的“随機性”,會導緻記憶體碎片有待整理。而我們定制的serial_alloc()會在序列中靜态配置設定記憶體,是以無需記憶體碎片整理。

環境設定

下面,我将在Ubuntu 22.04 LTS上,通過如下指令,安裝protoc-c編譯器、以及協定緩存區的C Runtime:

Shell
sudo apt install libprotobuf-c-dev protobuf-c-compiler           

并通過如下指令檢查其是否運作:

Shell
protoc-c --version           

如果一切正常,螢幕上會傳回已安裝的版本号:

Plain Text
protobuf-c 1.3.3
libprotoc 3.12.4           

您可以通過連結-- https://github.com/protobuf-c/protobuf-c,檢視它在GitHub庫中的完整代碼。

消息

下面,我們來檢視在message.proto檔案中建立的簡單Protobuf消息。

ProtoBuf
syntax = "proto3";

message Message
{
    bool flag = 1;
    float value = 2;
}           

然後通過運作如下指令,以生成message.pb-c.h和message.pb-c.c兩個檔案:

Shell
protoc-c -I=. --c_out=. message.proto           

程式編譯

請通過如下指令,使用protobuf-c庫與生成的代碼,去編譯C語言程式:

Shell
gcc -Wall -Wextra -Wpedantic main.c message.pb-c.c -lprotobuf-c -o protobuf-c-custom_allocator           

程式代碼會使用Protobuf-C依次進行序列化、編碼、包裝成為靜态緩沖區--pack_buffer,然後經過反序列化、解碼、拆包到另一個靜态緩沖區—out。下面展示了其完整的代碼:

C
#include "message.pb-c.h"
#include <stdbool.h>
#include <stdio.h>
#include <string.h>

static uint8_t pack_buffer[100];

int main()
{
    Message in;
    message__init(∈);

    in.flag = true;
    in.value = 1.234f;

    // Serialization:
    message__pack(∈, pack_buffer);

    // Deserialization:
    unpacked_message_wrapper out;
    Message* outPtr = unpack_to_message_wrapper_from_buffer(message__get_packed_size(∈), pack_buffer, &out);

    if (NULL != outPtr)
    {
        assert(in.flag == out.message.flag);
        assert(in.value == out.message.value);

        assert(in.flag == outPtr->flag);
        assert(in.value == outPtr->value);
    }
    else
    {
        printf("ERROR: Unpack to serial buffer failed! Maybe MAX_UNPACKED_MESSAGE_LENGTH is to small or requested size is incorrect.\n");
    }

    return 0;
}           

在unpack_to_message_wrapper_from_buffer()中,我們建立了ProtobufCAllocator對象,并将serial_alloc()和serial_free()函數(作為malloc()和free()的替代品)放入其中。然後,我們通過調用message__unpack和傳遞serial_allocator去解包消息(請參見如下代碼):

C
Message* unpack_to_message_wrapper_from_buffer(const size_t packed_message_length, const uint8_t* buffer, unpacked_message_wrapper* wrapper)
{
    wrapper->next_free_index = 0;

    // Here is the trick: We pass `wrapper` (not wrapper.buffer) as `allocator_data`, to track number of allocations in `serial_alloc()`.
    ProtobufCAllocator serial_allocator = {.alloc = serial_alloc, .free = serial_free, .allocator_data = wrapper};

    return message__unpack(&serial_allocator, packed_message_length, buffer);
}           

比較基于malloc()與serial_alloc的方法

下面,我們來比較預設的Protobuf-C行為(基于malloc())和自定義配置設定器的行為。其中使用動态記憶體配置設定的Protobuf-C行為是:

C
static uint8_t buffer[SOME_BIG_ENOUGH_SIZE];

...

// NULL in this context means -> use malloc():
Message* parsed = message__unpack(NULL, packed_size, bufer);
// dynamic memory allocation occurred above

...

// somewhere below memory must be freed:
free(me)           

使用定制配置設定器(無動态記憶體配置)的是:

C
// statically allocated buffer inside some wrapper around the unpacked proto message:
typedef struct
{
    uint8_t buffer[SOME_BIG_ENOUGH_SIZE];
    ...
} unpacked_message_wrapper;

...

// malloc and free functions replacements:
static void* serial_alloc(void* allocator_data, size_t size) { ... }
static void serial_free(void* allocator_data, void* ignored) { ... }

...

ProtobufCAllocator serial_allocator = { .alloc = serial_alloc,
                                        .free = serial_free,
                                        .allocator_data = wrapper};

// now, instead of NULL we pass serial_allocator:
if (NULL == message__unpack(&serial_allocator, packed_message_length, input_buffer))
{
    printf("Unpack to serial buffer failed!\n");
}           

Proto Message的結構

unpacked_message_wrapper的結構隻是一個簡單的Proto消息包裝器。它的容量足以緩沖解包後的資料存儲,以便next_free_index跟蹤緩沖區裡的已使用空間。請參見如下代碼:

C
#define MAX_UNPACKED_MESSAGE_LENGTH 100

typedef struct
{
    size_t next_free_index;
    union
    {
        uint8_t buffer[MAX_UNPACKED_MESSAGE_LENGTH];
        Message message;   // Replace `Message` with your own type - generated from your own .proto message
    };
} unpacked_message_wrapper;           

其中,雖然Message對象不會改變它的大小,但是Message往往是一個廣泛的.proto。例如,重複性的字段通常會涉及到多個malloc()的調用。是以,您可能需要比Message本身更多的空間。為此,我們可以将buffer和Message聯合起來,并讓MAX_UNPACKED_MESSAGE_LENGTH足夠大。而且,unpacked_message_wrapper的結構,就是要将預定義的記憶體緩沖區與跟蹤緩沖區的配置設定放在一處。

serial_alloc()和serial_free()的實作

serial_alloc()的簽名遵循着ProtobufCAllocator的各項要求,例如:

C
static void* serial_alloc(void* allocator_data, size_t size)           

其中,serial_alloc()可以配置設定被請求的size到allocator_data處,然後增加next_free_index到下個詞的開始邊界處(這是一個優化過的連續資料塊,緊貼着下一個詞的邊界)。而size則來自Protobuf-C内部解析或解碼的資料。請參考如下代碼:

C
static void* serial_alloc(void* allocator_data, size_t size)
{
    void* ptr_to_memory_block = NULL;

    unpacked_message_wrapper* const wrapper = (unpacked_message_wrapper*)allocator_data;

    // Optimization: Align to next word boundary.
    const size_t temp_index = wrapper->next_free_index + ((size + sizeof(int)) & ~(sizeof(int)));

    if ((size > 0) && (temp_index <= MAX_UNPACKED_MESSAGE_LENGTH))
    {
        ptr_to_memory_block = (void*)&wrapper->buffer[wrapper->next_free_index];

        wrapper->next_free_index = temp_index;
    }

    return ptr_to_memory_block;
}           

當serial_alloc()被第一次調用時,程式會将next_free_index設定為已配置設定的大小,并将指針傳回至緩沖區的開始處。下圖展示了其内部邏輯:

詳解協定緩存區的概念與實踐

在第二次調用時,它會重新計算next_free_index的值,并傳回位址給下一塊資料。下圖展示了其對應的邏輯:

詳解協定緩存區的概念與實踐

下圖展示了第三次調用的邏輯:

詳解協定緩存區的概念與實踐

而serial_free()函數會将使用緩沖區空間設定為零。請參見如下代碼:

C
static void serial_free(void* allocator_data, void* ignored)
{
    (void)ignored;

    unpacked_message_wrapper* wrapper = (unpacked_message_wrapper*)allocator_data;
    wrapper->next_free_index = 0;
}           

當serial_free()被調用時,它會通過設定next_free_index為零,以“釋放”所有記憶體,好讓緩沖區可以被重用。請參見下圖:

詳解協定緩存區的概念與實踐

對實作予以測試

我們可以使用知名的運作時診斷工具—Valgrind,通過如下指令來運作上述程式:

Shell
valgrind ./protobuf-c-custom_allocator           

在下面生成的報告中,您會發現并無任何配置設定的出現:

Plain Text
==3977== Memcheck, a memory error detector
==3977== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
==3977== Using Valgrind-3.18.1 and LibVEX; rerun with -h for copyright info
==3977== Command: ./protobuf-c-custom_allocator
==3977== 
==3977== 
==3977== HEAP SUMMARY:
==3977==     in use at exit: 0 bytes in 0 blocks
==3977==   total heap usage: 0 allocs, 0 frees, 0 bytes allocated
==3977== 
==3977== All heap blocks were freed -- no leaks are possible
==3977== 
==3977== For lists of detected and suppressed errors, rerun with: -s
==3977== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)           

提示和技巧

如果您手頭的項目是一個記憶體受限的系統,那麼您需要事先在serial_alloc中确定MAX_UNPACKED_MESSAGE_LENGTH的大小。請參見如下代碼:

C
static void* serial_alloc(void* allocator_data, size_t size)
{
    static int call_counter = 0;
    static size_t needed_space_counter = 0;
    needed_space_counter += ((size + sizeof(int)) & ~(sizeof(int)));

    printf("serial_alloc() called for: %d time. Needed space for worst case scenario is = %ld\n", ++call_counter, needed_space_counter);

    ...           

在上例中,我們得到了:

Plain Text
serial_alloc called for: 1 time. The needed space for the worst-case scenario is = 32           

而當.proto message變得更加複雜時,我們可以為其添加一個新字段。請參見如下代碼:

ProtoBuf
syntax = "proto3";

message Message
{
    bool flag = 1;
    float value = 2;
    repeated string names = 3; // added field, type repeated means "dynamic array"
}           

然後,我們再為message添加各種新的屬性:

C
int main()
{
    Message in;
    message__init(∈);

    in.flag = true;
    in.value = 1.234f;
    const char name1[] = "Let's";
    const char name2[] = "Solve";
    const char name3[] = "It";
    const char* names[] = {name1, name2, name3};
    in.names = (char**)names;
    in.n_names = 3;

    // Serialization:
    message__pack(∈, pack_buffer);
    
    ...           

下面便是我們能夠看到的輸出:

Plain Text
serial_alloc() called for: 1 time. Needed space for worst case scenario is = 48
serial_alloc() called for: 2 time. Needed space for worst case scenario is = 72
serial_alloc() called for: 3 time. Needed space for worst case scenario is = 82
serial_alloc() called for: 4 time. Needed space for worst case scenario is = 92
serial_alloc() called for: 5 time. Needed space for worst case scenario is = 95           

由結果可知,我們至少需要一個95位元組的緩沖區。而且在真實項目中,您往往需要比95位元組更多的空間。

小結

綜上所述,若要編寫和使用一個定制的配置設定器,您需要:

  1. 确定用來存儲資料的方式。
  2. 編寫一些針對原型消息類型的包裝器,并提供适當的alloc()和free()函數的替換。
  3. 為包裝器建立一個對象。
  4. 使用alloc()和free()的替換來建立ProtobufCAllocator,然後在x_unpack()函數使用配置設定器。

對應地,在上面的例子中,我們實作了:

  • 連續地存儲資料,并靜态地配置設定記憶體塊。
  • 編寫了unpacked_message_wrapper作為原型Message和buffer的包裝器,提供serial_alloc()和serial_free()作為alloc()和free()的替換。
  • 在記憶體棧中,建立了一個unpacked_message_wrapper對象,并命名為out。
  • 在unpack_to_message_wrapper_from_buffer中,我們建立了一個ProtobufCAllocator,并将serial_alloc()和serial_free()放入其中,進而傳遞給message__unpack函數。

繼續閱讀