天天看點

Kafka 如何實作更高效消費者-批量讀取(C++用戶端實作)

作為一個c++程式,我相信很多人都會使用c++的librdkafkacpp的接口,這些接口通過C++封裝,使用起來友善,不需要去了解底層的c接口的封裝及實作,是最理想的使用方式。

那為啥還要使用C接口呢?

這就涉及到C++的ABI二進制相容問題。

當我們編譯一個C++動态庫,受限于不同的編譯器版本,無論是gcc還是VC,都存在着相容問題。對于一個生産版本,如果在統一了編譯工具鍊的情景下,是可以使用C++版本,但如果考慮到更好的ABI相容,是以就選擇了C庫。其無論在VC還是gcc,都表現得很好。

C庫kafka的消費模式

 三種消費模式的異同

librdKafka 提供了三種不同的消費模式,下面将講解這三種模式的差異及如何使用。

// 一次讀取一條資料
rd_kafka_consumer_poll();
rd_kafka_consume();

// 批量讀取,一次可以讀取多條資料
// 通過回調函數的批量讀取,将廢棄
rd_kafka_consume_callback();
rd_kafka_consume_batch();

// 通過Queue路由道特定的處理函數
rd_kafka_consume_callback_queue
rd_kafka_consume_batch_queue
           

1、這種模式,直接使用rd_kafka_consumer_poll()接口,其可以實作1次讀取一條,這種場景對于資料量不大,可以很好的使用,而且其用戶端API也實作了相關的消息确認和送出,大多數采用這種方式,這種方式的例程,我在前面的博文中有講到,有興趣的可以去看看

2、這種模式是批量讀取的模式,這種模式下有兩種接口,一種基于api的回調

rd_kafka_consume_callback,這個接口根據官方文檔是即将被廢棄的接口,是以不建議使用該接口,建議使用rd_kafka_consume_batch(),後面會給出這種模式下的使用例子。

3、使用Queue的方式,該方式也提供了兩種模式,一種回調,一種直接消費,回調的方式應該在性能上是最高的。這種方式與第二種方式的不同是,他能讓不同topic,不同patition的資料路由到一個隊列,也就是對于一個需要處理多種資料的消費者。

使用批量讀取的注意:

1、消息最好程式中自己做确認送出,這樣在kafka的服務,才不會看到滞後的消息消費

2、注意消息的銷毀,kafka在銷毀對象時會持有引用技術,消費完的消息,要destoy

3、對于topic+partition的消費模式,最好不要在運作熱新增分區或主題,否則用戶端處理起來比較麻煩。如果必要這麼做,要做好測試。保證資料不重複不丢消息。

代碼實作

KafkaConsumer.h

#ifndef KAFKACONSUMER_H
#define KAFKACONSUMER_H

#include <string>
#include <iostream>
#include <vector>
#include <stdio.h>

#include "rdkafka.h"

using namespace std;

struct TKafkaCfgInfo
{
    string m_strBrokers;
    string m_strGroupID;
    string m_strTopics;
    int m_nPartition;

    int m_nBatchReadSize;
    int m_nReadTimeout;
    bool m_bIsLogKafka;

    TKafkaCfgInfo()
        : m_nPartition(-1)
        , m_nBatchReadSize(-1)
        , m_nReadTimeout(-1)
        , m_bIsLogKafka(false)
    {}
};

class CKafkaConsumer
{
public:
    CKafkaConsumer();
    virtual ~CKafkaConsumer();

    // 初?始º?化¡¥Kafka配?置?
    int InitCfg(const TKafkaCfgInfo& tCfg);
    const TKafkaCfgInfo& GetCfg() { return m_tCfg; }

    // 拉¤-取¨?Kafka消?息¡é
    void pullMessage(rd_kafka_message_t** p, ssize_t& nCnt);

    void SetConsumerOk(rd_kafka_message_t* pMsg);

protected:
    rd_kafka_t* m_pkafka;
    rd_kafka_topic_t* m_pKafkaTopic;

    TKafkaCfgInfo m_tCfg;
};

#endif // KAFKACONSUMER_H
           

// KafkaConsumer.cpp

#include "KafkaConsumer.h"

#include <iostream>

using namespace std;

static void RebalanceCb(rd_kafka_t* rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* partitions, void* opaque)
{
     測a試º?代䨲碼?
#if 0
    if (partitions)
    {
        for (int i = 0; i < partitions->cnt; i++)
        {
            partitions->elems[i].offset = 0;
        }
    }
    printf("RebalanceCb \r\n");
#endif
    printf("RebalanceCb \n");
    rd_kafka_assign(rk, partitions);
}

static void  EventErrorCb(rd_kafka_t* rk, int err,
    const char* reason,
    void* opaque)
{
    printf("Kafka EventErrorCb(%d): %s\n", err, reason);
}

CKafkaConsumer::CKafkaConsumer()
    : m_pkafka(nullptr)
    , m_pKafkaTopic(nullptr)
{
}

int CKafkaConsumer::InitCfg(const TKafkaCfgInfo& tCfg)
{
    m_tCfg = tCfg;

    rd_kafka_conf_t* pConf = rd_kafka_conf_new();
    if (!pConf)
    {

        return -1;
    }

    char szErr[512] = { 0 };
    if (rd_kafka_conf_set(pConf, "bootstrap.servers", m_tCfg.m_strBrokers.c_str(), szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK)
    {
        rd_kafka_conf_destroy(pConf);
        return -1;
    }

    if (rd_kafka_conf_set(pConf, "group.id", m_tCfg.m_strGroupID.c_str(), szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK)
    {

        rd_kafka_conf_destroy(pConf);
        return -1;
    }

    // rd_kafka_conf_set_rebalance_cb(pConf, &RebalanceCb);
    rd_kafka_conf_set_error_cb(pConf, EventErrorCb);

    if (rd_kafka_conf_set(pConf, "enable.auto.commit", "false", szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK)
    {
        rd_kafka_conf_destroy(pConf);
        return -1;
    }

    if (rd_kafka_conf_set(pConf, "enable.auto.offset.store", "false", szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK)
    {
        rd_kafka_conf_destroy(pConf);
        return -1;
    }

    // topic配?置?
    /*
    rd_kafka_topic_conf_t* pTopicConf = rd_kafka_topic_conf_new();
    /*if (rd_kafka_topic_conf_set(pTopicConf, "auto.offset.reset", "latest", szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK)
    {
        rd_kafka_topic_conf_destroy(pTopicConf);
        return -1;
    }*/

    // 創ä¡ä建¡§kafka實º¦Ì例¤y
    m_pkafka = rd_kafka_new(RD_KAFKA_CONSUMER, pConf, szErr, sizeof(szErr));
    if (!m_pkafka)
    {
        return -1;
    }

    m_pKafkaTopic = rd_kafka_topic_new(m_pkafka, m_tCfg.m_strTopics.c_str(), nullptr);
    if (!m_pKafkaTopic)
    {
        return -1;
    }

    rd_kafka_consume_start(m_pKafkaTopic, m_tCfg.m_nPartition, RD_KAFKA_OFFSET_STORED);

    return 0;
}

void CKafkaConsumer::pullMessage(rd_kafka_message_t** pRet, ssize_t& nCnt)
{
    nCnt = rd_kafka_consume_batch(m_pKafkaTopic, m_tCfg.m_nPartition, m_tCfg.m_nReadTimeout, pRet, m_tCfg.m_nBatchReadSize);
    // rd_kafka_poll(m_pkafka, 0);
}

void CKafkaConsumer::SetConsumerOk(rd_kafka_message_t* pMsg)
{
    rd_kafka_resp_err_t errCode = rd_kafka_offset_store(m_pKafkaTopic, m_tCfg.m_nPartition, pMsg->offset);
    printf("rd_kafka_offset_store offset = %d\n", pMsg->offset);
    if (errCode != RD_KAFKA_RESP_ERR_NO_ERROR)
    {
        return;
    }

    errCode = rd_kafka_commit_message(m_pkafka, pMsg, 0);
    if (errCode != RD_KAFKA_RESP_ERR_NO_ERROR)
    {
        return;
    }
}

CKafkaConsumer::~CKafkaConsumer()
{
    if (m_pKafkaTopic)
    {
        rd_kafka_consume_stop(m_pKafkaTopic, m_tCfg.m_nPartition);
        rd_kafka_topic_destroy(m_pKafkaTopic);
        m_pKafkaTopic = nullptr;
    }

    if (m_pkafka)
    {
        // rd_kafka_consumer_close(m_pkafka); // rd_kafka_destroy調Ì¡Â用®?會¨¢調Ì¡Â用®?rd_kafka_consumer_close
        rd_kafka_destroy(m_pkafka);
        m_pkafka = nullptr;
    }
}
           

// Test.cpp

#include "stdafx.h"
#include "KafkaConsumer.h"
#include <assert.h>

int _tmain(int argc, _TCHAR* argv[])
{
    string brokers = "192.168.254.129:9092";
    string strTopic = "test";
    string group = "test1";
    int nPartition = 0;
    CKafkaConsumer consumer;
    TKafkaCfgInfo t;
    t.m_strBrokers = brokers;
    t.m_strGroupID = group;
    t.m_strTopics = strTopic;
    t.m_nPartition = nPartition;
    int nRet = consumer.InitCfg(t);
    if (nRet != 0)
    {
        assert(0);
        return 0;
    }

    int nBatchSize = 10;
    rd_kafka_message_t** pRet = new rd_kafka_message_t * [nBatchSize];

    while (1)
    {
        int nTimeout = 1000;
        ssize_t nCnt = 0;
        consumer.pullMessage(pRet, nCnt);

        if (nCnt < 0)
        {
            fprintf(stderr, "%% Error: %s\n", rd_kafka_err2str(rd_kafka_last_error()));
            continue;
        }

        if (nCnt == 0)
        {
            // donothing
            continue;
        }

        for (int nIndex = 0; nIndex < nCnt; nIndex++)
        {
            rd_kafka_message_t* pMsg = pRet[nIndex];
            printf("%s\n", pMsg->payload);
            consumer.SetConsumerOk(pMsg);
            rd_kafka_message_destroy(pMsg);
        }
    }

    delete[] pRet;
    return 0;
}