天天看點

sarama的消費者組分析、使用

kafka的go用戶端,使用最多的應該是sarama,但以前老的sarama版本不支援消費者組的消費方式,是以大多數人都用sarama-cluster。

後來sarama支援了消費者組的消費方式,sarama-cluster也停止維護了,但網上關于sarama的消費者組的解析很少,且官方的樣例很簡單,是以這裡分析一下。

一、官方樣例

官方樣例比較簡單:

1、通過

sarama.NewConfig

建立一個配置

2、通過

NewConsumerGroup

建立一個消費者組

3、通過

Consume

建立消費者組的會話,該函數的第三個參數即為該會話三個階段的回調:

Setup

Cleanup

ConsumeClaim

,分别在建立會話之前、會話結束之後 和 會話生存中(主要就是在此階段進行消息讀取)進行調用。

二、問題

1、當指定的topic在kafka中不存的時候,kafka會建立該topic,如果隻想讓使用者消費已存在的topic,那麼該如何擷取kafka中已經存在的topic?

2、

setup

Cleanup

的調用流程是怎樣的?會在哪些情況下被調用?

3、既然是消費者組,那如何檢視組裡某個消費者擁有哪些topic和partition?

4、如何使用指定的 offset 來消費某個 topic ?

5、如何實作消費的 Exactly-once?

注:以上測試使用的示例代碼是自己寫的樣例代碼的部分内容,完整的樣例代碼見文章最後

三、分析

1、在 sarama 中,擷取 topic 的接口在

Client interface

中,是以需要先通過

NewClient

接口建立一個 client,然後就可以通過該 client 的

Topics

接口擷取到 kafka 中所有的 topic。但消費者組使用的類型是

ConsumerGroup

,那該如何擷取該類型呢?sarama 中提供

NewConsumerGroupFromClient

接口,可以從一個現存的 client 建立一個

ConsumerGroup

,是以,修改後的流程,由原先的

NewConsumerGroup

直接建立,變成:

a、使用

NewClient

建立一個 client

b、使用

NewConsumerGroupFromClient

建立

ConsumerGroup

具體代碼實作如下:

// 建立client
newClient, err := sarama.NewClient(brokers, config)
if err != nil {
	log.Fatal(err)
}

// 擷取所有的topic
topics, err := newClient.Topics()
if err != nil {
	log.Fatal(err)
}
log.Info("topics: ", topics)

// 根據client建立consumerGroup
client, err := sarama.NewConsumerGroupFromClient(k.group, newClient)
if err != nil {
	log.Fatalf("Error creating consumer group client: %v", err)
}
           

這麼做的好處就是:可以使用 client 的接口,擷取一些資訊,例如 kafka 的目前配置有哪些,controller 有哪些,brokers 有哪些,topic 總共有哪些,特定的 topic 有哪些 partitions,partition 目前的 offset 是多少 等等,具體功能可檢視

Client interface

type Client interface {
	// Config returns the Config struct of the client. This struct should not be
	// altered after it has been created.
	Config() *Config

	// Controller returns the cluster controller broker. It will return a
	// locally cached value if it's available. You can call RefreshController
	// to update the cached value. Requires Kafka 0.10 or higher.
	Controller() (*Broker, error)

	// RefreshController retrieves the cluster controller from fresh metadata
	// and stores it in the local cache. Requires Kafka 0.10 or higher.
	RefreshController() (*Broker, error)

	// Brokers returns the current set of active brokers as retrieved from cluster metadata.
	Brokers() []*Broker
  ......
}
           

setup

Cleanup

ConsumeClaim

是 s.handler.ConsumeClaim 的三個接口,需要使用者自己實作。可以簡單了解為:當需要建立一個會話時,先運作

setup

,然後在

ConsumerClaim

中處理消息,最後運作

Cleanup

setup

會在一個新會話開始之前運作,且也在

ConsumerClaim

接口之前運作。調用流程為:

Consume

—>

newSession

newConsumerGroupSession

handler.Setup

在調用了

Setup

之後,後面會建立一個協程,該協程裡面其實調用的就是

ConsumeClaim

接口,是以我們實作的

ConsumerClaim

其實是一個單獨的協程,其調用流程為:

Consume

newSession

newConsumerGroupSession

consume

s.handler.ConsumeClaim

Cleanup

會在一個會話結束的時候運作。調用流程為:

Consume

release

s.handler.Cleanup

了解了調用流程之後,哪些情況又會調用到他們呢?—> 1、建立consumeGroup的時候。2、發生rebalance的時候。

我們可以在setup和cleanup中加一個列印:

func (k *Kafka) Setup(session sarama.ConsumerGroupSession) error {
	log.Info("setup")
  close(k.ready)
	return nil
}

func (k *Kafka) Cleanup(sarama.ConsumerGroupSession) error {
	log.Info("cleanup")
	return nil
}
           

然後啟動一個consumer,可以觀察到列印:

INFO[0000] setup
           

然後按 Ctrl + C 關閉 consumer,可以觀察到列印:

INFO[0101] cleanup
           

說明建立consumer然後退出時,會調用 setup 和 cleanup。

我們再試一下發生rebalance的情況:先啟動一個consumer,然後再啟動一個同一組的consumer,可以看到列印為:

第一個啟動的 consumer 列印為:
INFO[0000] setup
INFO[0006] cleanup
INFO[0006] setup

第二個啟動的 consumer 列印為:
INFO[0002] setup
           

說明在發生 reblance 的時候,會先關閉原先的會話,并調用 cleanup,然後再調用 setup,最後生成一個新的會話。

3、在

ConsumerGroupSession

接口中,有一個

Claims

接口,可以用來檢視目前 consumer 被配置設定到哪些 topic 和 partition。我們可以在

Setup

接口中進行列印:

func (k *Kafka) Setup(session sarama.ConsumerGroupSession) error {
	log.Info("setup")
	log.Info(session.Claims())
	close(k.ready)
	return nil
}
           

這裡使用 range 分區政策,訂閱的topic為t1p4和t2p4,每個topic都有4個分區,然後建立3個consumer,産生的列印為:

consumer1:
INFO[0000] setup
INFO[0000] map[t1p4:[0 1 2 3] t2p4:[0 1 2 3]]
INFO[0009] cleanup
INFO[0009] setup
INFO[0009] map[t1p4:[0 1] t2p4:[2 3]]
INFO[0015] cleanup
INFO[0015] setup
INFO[0015] map[t1p4:[0] t2p4:[3]]

consumer2:
INFO[0002] setup
INFO[0002] map[t1p4:[2 3] t2p4:[0 1]]
INFO[0009] cleanup
INFO[0009] setup
INFO[0009] map[t1p4:[1 2] t2p4:[0]]

consumer3:
INFO[0000] setup
INFO[0000] map[t1p4:[3] t2p4:[1 2]]
           

當隻有consumer1的時候,它被配置設定到所有的分區:

t1p4:[0 1 2 3] t2p4:[0 1 2 3]

當consumer2加入的時候,consumer1被配置設定的是:

t1p4:[0 1] t2p4:[2 3]

, consumer2被配置設定的是:

t1p4:[2 3] t2p4:[0 1]

當consumer3加入的時候,consumert1被配置設定的是:

t1p4:[0] t2p4:[3]

,consumer2被配置設定的是:

t1p4:[1 2] t2p4:[0]

, consumer3被配置設定的是:

t1p4:[3] t2p4:[1 2]

有興趣的可以再依次删除consumer1,consumer2。

4、kafka的config配置中,指定消費的offset隻有兩個:

OffsetNewest

OffsetOldest

,如果想指定 offset 進行消費,該怎麼做呢?

前面說過,

Setup

是運作在會話最一開始的地方,且這個時候已經能夠擷取到所有的 topic 和 partition,是以這裡可以使用

ConsumerGroupSession

ResetOffset

接口進行設定,具體實作如下:(這裡使用的主題:t2p4 已存在,且0分區中的offset已經到18)

func (k *Kafka) Setup(session sarama.ConsumerGroupSession) error {
	log.Info("setup")
	session.ResetOffset("t2p4", 0, 13, "")
	log.Info(session.Claims())
	close(k.ready)
	return nil
}

func (k *Kafka) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for message := range claim.Messages() {
		log.Infof("[topic:%s] [partiton:%d] [offset:%d] [value:%s] [time:%v]",
			message.Topic, message.Partition, message.Offset, string(message.Value), message.Timestamp)
		session.MarkMessage(message, "")
	}
	return nil
}
           

此時,無論運作多少次,都可以消費 13 到 18 之間的消息:

INFO[0000] setup
INFO[0000] map[t1p4:[0 1 2 3] t2p4:[0 1 2 3]]
INFO[0000] [topic:t2p4] [partiton:0] [offset:13] [value:a] [time:2021-10-12 23:02:35.058 -0400 EDT]
INFO[0000] [topic:t2p4] [partiton:0] [offset:14] [value:b] [time:2021-10-12 23:02:35.087 -0400 EDT]
INFO[0000] [topic:t2p4] [partiton:0] [offset:15] [value:c] [time:2021-10-12 23:02:35.092 -0400 EDT]
INFO[0000] [topic:t2p4] [partiton:0] [offset:16] [value:d] [time:2021-10-12 23:03:18.882 -0400 EDT]
INFO[0000] [topic:t2p4] [partiton:0] [offset:17] [value:e] [time:2021-10-12 23:03:18.898 -0400 EDT]
INFO[0000] [topic:t2p4] [partiton:0] [offset:18] [value:f] [time:2021-10-12 23:03:18.903 -0400 EDT]
           

5、前面已經分析了

Setup

的調用流程,以及可以在

Setup

中可以做的事情,那麼就可以手動記錄 topic 的 offset 到磁盤中(比如文本、資料庫等),在

Setup

的接口中,讀取之前記錄的 offset ,通過

ResetOffset

接口進行重新設定即可。當然,更新 offset 與 消息處理這部分的一緻性,需要業務自己保證(例如使用資料庫的事務來實作)。

四、完整樣例代碼

package main

import (
	"context"
	"os"
	"os/signal"
	"sync"
	"syscall"

	"github.com/Shopify/sarama"
	log "github.com/sirupsen/logrus"
)

type Kafka struct {
	brokers           []string
	topics            []string
	startOffset       int64
	version           string
	ready             chan bool
	group             string
	channelBufferSize int
	assignor          string
}

var brokers = []string{"192.168.1.101:9092"}
var topics = []string{"t1p4", "t2p4"}
var group = "grp1"
var assignor = "range"

func NewKafka() *Kafka {
	return &Kafka{
		brokers:           brokers,
		topics:            topics,
		group:             group,
		channelBufferSize: 1000,
		ready:             make(chan bool),
		version:           "2.8.0",
		assignor:          assignor,
	}
}

func (k *Kafka) Connect() func() {
	log.Infoln("kafka init...")

	version, err := sarama.ParseKafkaVersion(k.version)
	if err != nil {
		log.Fatalf("Error parsing Kafka version: %v", err)
	}

	config := sarama.NewConfig()
	config.Version = version
	// 分區配置設定政策
	switch assignor {
	case "sticky":
		config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
	case "roundrobin":
		config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
	case "range":
		config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
	default:
		log.Panicf("Unrecognized consumer group partition assignor: %s", assignor)
	}
	config.Consumer.Offsets.Initial = sarama.OffsetNewest
	config.ChannelBufferSize = k.channelBufferSize // channel長度

	// 建立client
	newClient, err := sarama.NewClient(brokers, config)
	if err != nil {
		log.Fatal(err)
	}
	// 擷取所有的topic
	topics, err := newClient.Topics()
	if err != nil {
		log.Fatal(err)
	}
	log.Info("topics: ", topics)

	// 根據client建立consumerGroup
	client, err := sarama.NewConsumerGroupFromClient(k.group, newClient)
	if err != nil {
		log.Fatalf("Error creating consumer group client: %v", err)
	}

	ctx, cancel := context.WithCancel(context.Background())
	wg := &sync.WaitGroup{}
	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			if err := client.Consume(ctx, k.topics, k); err != nil {
				// 當setup失敗的時候,error會傳回到這裡
				log.Errorf("Error from consumer: %v", err)
				return
			}
			// check if context was cancelled, signaling that the consumer should stop
			if ctx.Err() != nil {
				log.Println(ctx.Err())
				return
			}
			k.ready = make(chan bool)
		}
	}()
	<-k.ready
	log.Infoln("Sarama consumer up and running!...")
	// 保證在系統退出時,通道裡面的消息被消費
	return func() {
		log.Info("kafka close")
		cancel()
		wg.Wait()
		if err = client.Close(); err != nil {
			log.Errorf("Error closing client: %v", err)
		}
	}
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (k *Kafka) Setup(session sarama.ConsumerGroupSession) error {
	log.Info("setup")
	session.ResetOffset("t2p4", 0, 13, "")
	log.Info(session.Claims())
	// Mark the consumer as ready
	close(k.ready)
	return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (k *Kafka) Cleanup(sarama.ConsumerGroupSession) error {
	log.Info("cleanup")
	return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (k *Kafka) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

	// NOTE:
	// Do not move the code below to a goroutine.
	// The `ConsumeClaim` itself is called within a goroutine, see:
	// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
	// 具體消費消息
	for message := range claim.Messages() {
		log.Infof("[topic:%s] [partiton:%d] [offset:%d] [value:%s] [time:%v]",
			message.Topic, message.Partition, message.Offset, string(message.Value), message.Timestamp)
		// 更新位移
		session.MarkMessage(message, "")
	}
	return nil
}

func main() {
	k := NewKafka()
	c := k.Connect()

	sigterm := make(chan os.Signal, 1)
	signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
	select {
	case <-sigterm:
		log.Warnln("terminating: via signal")
	}
	c()
}