背景:kafka 用戶端之producer API發送消息以及簡單源碼分析
從Kafka 0.11開始,KafkaProducer又支援兩種模式:幂等生産者和事務生産者。幂等生産者加強了Kafka的傳遞語義,從至少一次傳遞到精确一次傳遞。特别是生産者的重試将不再引入重複。事務性生産者允許應用程式原子地将消息發送到多個分區(和主題)。
幂等性
Kafka在 0.11 版本引入了一項重大特性,幂等性。所謂的幂等性就是指 Producer 不論向 Server 發送多少次重複資料,Server 端都隻會持久化一條。
拿 http 舉例來說,一次或多次請求,得到的響應是一緻的(網絡逾時等問題除外),換句話說,就是執行多次操作與執行一次操作的影響是一樣的。
如果,某個系統是不具備幂等性的,如果使用者重複送出了某個表格,就可能會造成不良影響。例如:使用者在浏覽器上點選了多次送出訂單按鈕,會在背景生成多個一模一樣的訂單。
幂等性基本原理
對于Kafka來說,要解決的是生産者發送消息的幂等問題。在生産者生産消息時,如果出現 retry 時,有可能會一條消息被發送了多次,如果Kafka不具備幂等性的,就有可能會在partition中儲存多條一模一樣的消息。
為了實作生産者的幂等性,Kafka 引入了 Producer ID(PID)和 Sequence Number 的概念。
- PID:每個 Producer 在初始化時,都會配置設定一個唯一的 PID,這個 PID 對使用者來說,是透明的
- Sequence Number:針對每個生産者(對應 PID )發送到指定主題分區的消息都對應一個從 0 開始遞增的 Sequence Number,Server 端就是根據這個值來判斷資料是否重複
producer初始化會由server端生成一個PID,然後發送每條資訊都包含該PID和sequence number,在server端,是按照partition同樣存放一個sequence numbers 資訊,通過判斷用戶端發送過來的sequence number與server端number+1內插補點來決定資料是否重複或者漏掉。
當 Producer 發送消息給 Broker 時,Broker 接收到消息并将其追加到消息流中。此時,Broker 傳回 Ack 信号給 Producer 時,發生異常導緻 Producer 接收 Ack 信号失敗。對于 Producer 來說,會觸發重試機制,将消息再次發送,但是,由于引入了幂等性,在每條消息中附帶了 PID(Producer ID)和Sequence Number。相同的 PID 和 Sequence Number 發送給 Broker,而之前 Broker 緩存過之前發送的相同的消息,那麼在消息流中的消息就隻有一條,不會出現重複發送的情況。
生成PID的流程
//在執行建立事務時
Producer<String, String> producer = new KafkaProducer<String, String>(props);
//會建立一個Sender,并啟動線程,執行如下run方法
Sender{
void run(long now) {
if (transactionManager != null) {
try {
........
if (!transactionManager.isTransactional()) {
// 為idempotent producer生成一個producer id
maybeWaitForProducerId();
} else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {
為什麼要求 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 小于等于5
通常情況下為了保證資料順序性,我們可以通過MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1來保證,這個也隻是針對單執行個體。在kafka2.0+版本上,隻要開啟幂等性,不用設定這個參數也能保證發送資料的順序性。
其實這裡,要求 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 小于等于 5 的主要原因是:Server 端的 ProducerStateManager 執行個體會緩存每個 PID 在每個 Topic-Partition 上發送的最近 5 個batch 資料(這個 5 是寫死的,至于為什麼是 5,可能跟經驗有關,當不設定幂等性時,當這個設定為 5 時,性能相對來說較高,社群是有一個相關測試文檔),如果超過 5,ProducerStateManager 就會将最舊的 batch 資料清除。
假設應用将 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 設定為 6,假設發送的請求順序是 1、2、3、4、5、6,這時候 server 端隻能緩存 2、3、4、5、6 請求對應的 batch 資料,這時候假設請求 1 發送失敗,需要重試,當重試的請求發送過來後,首先先檢查是否為重複的 batch,這時候檢查的結果是否,之後會開始 check 其 sequence number 值,這時候隻會傳回一個 OutOfOrderSequenceException 異常,client 在收到這個異常後,會再次進行重試,直到超過最大重試次數或者逾時,這樣不但會影響 Producer 性能,還可能給 Server 帶來壓力(相當于client 狂發錯誤請求)。
幂等性的注意事項
- 幂等性 Producer 隻能保證單分區上的幂等性:即隻能保證某個主題上的一個分區上不出現重複消息,無法實作多個分區的幂等性
- 幂等性 Producer 隻能實作單會話上的幂等性,不能實作跨會話的幂等性
會話:Producer 程序的一次運作,如果重新開機 Producer 程序,将丢失幂等性保證
配置幂等性
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
要啟用幂等(idempotence),必須将enable.idempotence配置設定為true。 如果設定,則retries(重試)配置将預設為Integer.MAX_VALUE,acks配置将預設為all,如果顯性的将acks設定為0,-1,那麼将會報錯。
此外,如果send(ProducerRecord)即使在無限次重試的情況下也會傳回錯誤(例如消息在發送前在緩沖區中過期),那麼建議關閉生産者,并檢查最後産生的消息的内容,以確定它不重複。最後,生産者隻能保證單個會話内發送的消息的幂等性。
Exactly Once語義
将伺服器的 ACK 級别設定為-1,可以保證 Producer 到 Server 之間不會丢失資料,即 At Least Once(至少一次) 語義。相對的,将伺服器 ACK 級别設定為 0,可以保證生産者每條消息隻會被發送一次,即At Most Once(最多一次) 語義。
At Least Once 可以保證資料不丢失,但是不能保證資料不重複;相對的,At Most Once 可以保證資料不重複,但是不能保證資料不丢失。但是,對于一些非常重要的資訊,比如說交易資料,下遊資料消費者要求資料既不重複也不丢失,即 Exactly Once(剛好一次) 語義。在 0.11 版本以前的 Kafka,對此是無能為力的,隻能保證資料不丢失,再在下遊消費者對資料做全局去重。對于多個下遊應用的情況,每個都需要單獨做全局去重,這就對性能造成了很大影響。
0.11 版本的 Kafka,引入了一項重大特性:幂等性。幂等性結合 At Least Once 語義,就構成了 Kafka 的Exactly Once 語義。即:
At Least Once + 幂等性 = Exactly Once
Kafka 的幂等性實作其實就是将原來下遊需要做的去重放在了資料上遊。
事務
幂等性并不能跨多個分區運作,而事務可以彌補這個缺陷。Kafka 事務是 2017 年 Kafka 0.11 引入的新特性。類似于資料庫的事務。Kafka 事務指的是在 Exactly Once 語義的基礎上,生産和消費可以跨分區和會話,生産者生産消息以及消費者送出 offset 的操作可以在一個原子操作中,要麼都成功,要麼都失敗。尤其是在生産者、消費者并存時,事務的保障尤其重要。(consumer-transform-producer模式)
消費者送出偏移量導緻重複消費消息的場景:消費者在消費消息完成送出偏移量o2之前挂掉了(假設它最近送出的偏移量是o1),此時執行再均衡時,其它消費者會重複消費消息(o1到o2之間的消息)。
事務的應用情況
在一個原子操作中,根據包含的操作類型,可以分為三種情況:
- 隻有Producer生産消息;
- 消費消息和生産消息并存,這個是事務場景中最常用的情況,就是我們常說的“consume-transform-produce ”模式
- 隻有consumer消費消息
前兩種情況是事務引入的場景,最後一種情況沒有使用價值(跟使用手動送出效果一樣)。
相關屬性配置
使用kafka的事務API 時的一些注意事項:
- 需要消費者的自動模式設定為false,并且不能子再手動的進行執行consumer#commitSync或者consumer#commitAsyc
- 生産者配置transaction.id屬性
- 生産者不需要再配置enable.idempotence,因為如果配置了transaction.id,則此時enable.idempotence會被設定為true
- 消費者需要配置Isolation.level。在consume-trnasform-produce模式下使用事務時,必須設定為READ_COMMITTED。
Producer事務
為了實作跨分區跨會話的事務,需要引入一個全局唯一的 Transaction ID,并将 Producer 獲得的 PID 和 Transaction ID 綁定。這樣當 Producer 重新開機後就可以通過正在進行的Transaction ID 獲得原來的PID。
為了管理 Transaction,Kafka 引入了一個新的元件 Transaction Coordinator。Producer 就是通過和 Transaction Coordinator 互動獲得 Transaction ID 對應的任務狀态。Transaction Coordinator 還負責将事務所有寫入 Kafka 的一個内部 Topic,這樣即使整個服務重新開機,由于事務狀态得到儲存,進行中的事務狀态可以得到恢複,進而繼續進行。
要使用事務生産者和attendant API,必須設定transactional.id。如果設定了transactional.id,幂等性會和幂等所依賴的生産者配置一起自動啟用。此外,應該對包含在事務中的topic進行耐久性配置。特别是,replication.factor應該至少是3,而且這些topic的min.insync.replicas應該設定為2。最後,為了實作從端到端的事務性保證,消費者也必須配置為隻讀取已送出的消息。
transactional.id的目的是實作單個生産者執行個體的多個會話之間的事務恢複。它通常是由分區、有狀态的應用程式中的分片辨別符派生的。是以,它對于在分區應用程式中運作的每個生産者執行個體來說應該是唯一的。
所有新的事務性API都是阻塞的,并且會在失敗時抛出異常。
Producer接口中有關事務的方法定義
//producer提供的事務方法
/**
* 初始化事務。需要注意的有:
* 1、前提
* 需要保證transation.id屬性被配置。
* 2、這個方法執行邏輯是:
* (1)Ensures any transactions initiated by previous instances of the producer with the same
* transactional.id are completed. If the previous instance had failed with a transaction in
* progress, it will be aborted. If the last transaction had begun completion,
* but not yet finished, this method awaits its completion.
* (2)Gets the internal producer id and epoch, used in all future transactional
* messages issued by the producer.
*
*/
public void initTransactions();
/**
* 開啟事務
*/
public void beginTransaction() throws ProducerFencedException ;
/**
* 為消費者提供的在事務内送出偏移量的操作
*/
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException ;
/**
* 送出事務
*/
public void commitTransaction() throws ProducerFencedException;
/**
* 放棄事務,類似復原事務的操作
*/
public void abortTransaction() throws ProducerFencedException ;
建立生産者
配置transactional.id屬性
public static Producer<String, String> createProducer() {
Properties properties = new Properties();
//配置檔案裡面的變量都是靜态final類型的,并且都有預設的值
//用于建立與 kafka 叢集連接配接的 host/port
//繼承的hashtable,保證了線程安全
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"IP:9092");
/**
* producer 将試圖批處理消息記錄,以減少請求次數。這将改善 client 與 server 之間的性能。這項配置控制預設的批量處理消息位元組數。
* 不會試圖處理大于這個位元組數的消息位元組數。發送到 brokers 的請求将包含多個批量處理,其中會包含對每個 partition 的一個請求。
* 較小的批量處理數值比較少用,并且可能降低吞吐量(0 則會僅用批量處理)。較大的批量處理數值将會浪費更多記憶體空間,這樣就需要配置設定特
* 定批量處理數值的記憶體大小
**/
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
/**
* producer 組将會彙總任何在請求與發送之間到達的消息記錄一個單獨批量的請求。通常來說,這隻有在記錄産生速度大于發送速度的時候才
* 能發生。然而,在某些條件下,用戶端将希望降低請求的數量,甚至降低到中等負載一下。這項設定将通過增加小的延遲來完成--即,不是立即
* 發送一條記錄,producer 将會等待給定的延遲時間以允許其他消息記錄發送,這些消息記錄可以批量處理。這可以認為是 TCP 種 Nagle 的算
* 法類似。這項設定設定了批量處理的更高的延遲邊界:一旦我們獲得某個 partition 的batch.size,他将會立即發送而不顧這項設定,
* 然而如果我們獲得消息位元組數比這項設定要小的多,我們需要“linger”特定的時間以擷取更多的消息。 這個設定預設為 0,即沒有延遲。設
* 定 linger.ms=5,例如,将會減少請求數目,但是同時會增加 5ms 的延遲
**/
properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
/**
* producer 可以用來緩存資料的記憶體大小。如果資料産生速度大于向 broker 發送的速度,将會耗盡這個緩存空間,producer
* 會阻塞或者抛出異常,以“block.on.buffer.full”來表明。這項設定将和 producer 能夠使用的總記憶體相關,但并不是一個
* 硬性的限制,因為不是producer 使用的所有記憶體都是用于緩存。一些額外的記憶體會用于壓縮(如果引入壓縮機制),同樣還有一些
* 用于維護請求當緩存空間耗盡,其他發送調用将被阻塞,阻塞時間的門檻值通過max.block.ms設定,之後它将抛出一個TimeoutException。
**/
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
/**
* 該配置控制 KafkaProducer's send(),partitionsFor(),inittransaction (),sendOffsetsToTransaction(),commitTransaction() "
* 和abortTransaction()方法将阻塞。對于send(),此逾時限制了擷取中繼資料和配置設定緩沖區的總等待時間"
**/
properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG,"5000");
//将消息發送到kafka server, 是以肯定需要用到序列化的操作 我們這裡發送的消息是string類型的,是以使用string的序列化類
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//設定事務ID 如果配置了transactional.id屬性,則enable.idempotence 會被設定為true.
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-transactional-id");
return new KafkaProducer<>(properties);
}
建立消費者
- 将配置中的自動送出屬性(auto.commit)進行關閉
- 而且在代碼裡面也不能使用手動送出commitSync( )或者commitAsync( )
- 設定isolation.level
/**
* 需要:
* 1、關閉自動送出 enable.auto.commit
* 2、isolation.level為 read_committed
* @return
*/
public static Consumer createConsumer() {
Properties properties = new Properties();
// bootstrap.servers是Kafka叢集的IP位址。多個時,使用逗号隔開
properties.put("bootstrap.servers", "IP:9092");
// 消費者群組
properties.put("group.id", "groupxt");
// 設定隔離級别
properties.put("isolation.level","read_committed");
// 關閉自動送出
properties.put("enable.auto.commit", "false");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer<String, String>(properties);
}
隻有Producer生産消息
/*
Producer異步發送帶回調函數(事務) 在一個事務隻有生産消息操作
*/
public static void onlyProduceInTransaction(){
Producer<String,String> producer = ProducerTransaction.createProducer();
// 1.初始化事務
producer.initTransactions();
try {
// 2.開啟事務
producer.beginTransaction();
// 3.kafka寫操作集合
// 3.1 do業務邏輯
// 3.2 發送消息
// 消息對象 - ProducerRecoder
for(int i=0;i<10;i++){
ProducerRecord<String,String> record = new ProducerRecord<>(TOPIC_NAME,"key-"+i,"value-"+i);
//就是多傳入一個回調執行個體
/**
* 使用者可以實作的回調接口,以允許代碼在請求完成時執行。 這個回調通常會在背景 I/O 線程中執行,是以它應該很快。
**/
producer.send(record, new Callback() {
/**
* 使用者可以實作的回調方法,以提供對請求完成的異步處理。 當發送到伺服器的記錄被确認時,将調用此方法。 當回調中的異常不為空時,中繼資料将包含除 topicPartition 之外的所有字段的特殊 -1 值,這将是有效的。
* 參形:
* metadata – 已發送記錄的中繼資料(即分區和偏移量)。 如果發生錯誤,将傳回除 topicPartition 之外所有其他字段都為-1的 的中繼資料,
* exception – 在處理此記錄期間引發的異常。 如果沒有發生錯誤,則為 Null。
* 可能抛出的異常包括: 不可重試異常(緻命,永遠不會發送消息):
* InvalidTopicException OffsetMetadataTooLargeException RecordBatchTooLargeException RecordTooLargeException UnknownServerException UnknownProducerIdException InvalidProducerEpochException
* 可重試異常(可以通過增加retries來覆寫): CorruptRecordException InvalidMetadataException NotEnoughReplicasAfterAppendException NotEnoughOutReplicasException Offset
**/
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println(
"partition : "+recordMetadata.partition()+" , offset : "+recordMetadata.offset());
}
});
}
// 3.3 do其他業務邏輯,還可以發送其他topic的消息。
// 4.事務送出
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
// 5.放棄事務
producer.abortTransaction();
}finally{
// 所有的通道打開都需要關閉 close方法會會将緩存隊列狀态置為關閉,喚醒io線程将記憶體中的資料發往broker,避免這個程式的程序突然挂掉,然後記憶體裡面的消息丢失,是以這個方法結束的時候,将消息資料都發送出去
producer.close();
}
}
隻有consumer消費消息
/**
* 在一個事務隻有消息操作
*/
public static void onlyConsumeInTransaction() {
// 1.建構上産者
Producer<String,String> producer = ProducerTransaction.createProducer();
// 2.初始化事務(生成productId),對于一個生産者,隻能執行一次初始化事務操作
producer.initTransactions();
// 3.建構消費者和訂閱主題
Consumer consumer = createConsumer();
consumer.subscribe(Arrays.asList("xt"));
while (true) {
// 4.開啟事務
producer.beginTransaction();
// 5.1 接受消息
Duration duration = Duration.ofMillis(500);
ConsumerRecords<String, String> records = consumer.poll(duration);
try {
// 5.2 do業務邏輯;
System.out.println("customer Message---");
Map<TopicPartition, OffsetAndMetadata> commits = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
// 5.2.1 處理消息 print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
// 5.2.2 記錄送出偏移量
commits.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset()));
}
// 6.送出偏移量
producer.sendOffsetsToTransaction(commits, "groupxt");
// 7.事務送出
producer.commitTransaction();
}catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
// 8.放棄事務
System.out.println(e.getMessage());
producer.abortTransaction();
}finally{
producer.flush();
}
}
}
消費消息和生産消息并存(consume-transform-produce)
在一個事務中,既有生産消息操作又有消費消息操作,即常說的Consume-tansform-produce模式。如下執行個體代碼
/**
* 在一個事務内,即有生産消息又有消費消息
*/
public static void consumeTransferProduce() {
// 1.建構上産者
Producer<String,String> producer = ProducerTransaction.createProducer();
// 2.初始化事務(生成productId),對于一個生産者,隻能執行一次初始化事務操作
producer.initTransactions();
// 3.建構消費者和訂閱主題
Consumer consumer = createConsumer();
consumer.subscribe(Arrays.asList("xt"));
while (true) {
// 4.開啟事務
producer.beginTransaction();
// 5.1 接受消息
Duration duration = Duration.ofMillis(5000);
ConsumerRecords<String, String> records = consumer.poll(duration);
System.out.println(records.count());
try {
// 5.2 do業務邏輯;
System.out.println("customer Message---");
Map<TopicPartition, OffsetAndMetadata> commits = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
// 5.2.1 讀取消息,并處理消息。print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
// 5.2.2 記錄送出的偏移量
commits.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset()));
// 6.生産新的消息。比如外賣訂單狀态的消息,如果訂單成功,則需要發送跟商家結轉消息或者派送員的提成消息
producer.send(new ProducerRecord<String, String>("xt", "data"));
}
// 7.送出偏移量
producer.sendOffsetsToTransaction(commits, "groupxt");
// 8.事務送出
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
// 7.放棄事務
producer.abortTransaction();
}finally{
producer.flush();
}
}
}
Consumer事務
上述事務機制主要是從 Producer 方面考慮,對于 Consumer 而言,事務的保證就會相對較弱,尤其是無法保證 Commit 的資訊被精确消費。這是由于 Consumer 可以通過 offset 通路任意資訊,而且不同的 Segment File 生命周期不同,同一事務的消息可能會出現重新開機後被删除的情況。
相關配置檔案字段
Broker configs
配置項 | 描述 |
transactional.id.timeout.ms | transaction coordinator沒有從生産者哪裡接收到任何事務狀态更新的等待時間, 過了時間之後會主動地使生産者。 Transactional Id過期預設為604800000(7天)。這允許制作人每周定期工作來維護他們的id |
max.transaction.timeout.ms | 事務允許的最大逾時時間。 如果用戶端請求的事務時間超過這個值,那麼broker将在InitPidRequest中傳回一個InvalidTransactionTimeout錯誤。這可以防止用戶端逾時時間過長,否則會導緻客戶讀取事務中包含的Topic 時出現停頓。 預設為900000(15分鐘)。這是需要發送事務消息的時間的保守上限。 |
transaction.state.log.replication.factor | 事務狀态主題的副本數量。預設為3 |
transaction.state.log.num.partitions | 事務狀态主題的分區數。預設為50 |
transaction.state.log.min.isr | 事務狀态主題的每個分區需要考慮線上的insync副本的最小數量。預設為2 |
transaction.state.log.segment.bytes | 事務狀态主題的段大小。Default: 104857600 bytes. |
Producer configs
配置項 | 描述 |
enable.idempotence | 是否啟用幂等(預設為false)。 如果禁用,生産者将不設定PID字段在生産請求和目前生産者傳遞語義中。注意,為了使用事務,必須啟用幂等性。當幂等性啟用時,我們強制acks=all,retries > 1,并且max. flight.requests.per.connection=1。 如果這些配置沒有這些值,我們就不能保證幂等性。 如果這些設定沒有被應用程式顯式覆寫,當幂等功能啟用時,生産者将設定acks=all, retries=Integer.MAX_VALUE, max.inflight.requests.per.connection=1 |
transaction.timeout.ms | 事務協調器在主動中止正在進行的事務之前,等待事務狀态更新的最長時間,這個配置值将與InitPidRequest一起發送到事務協調器。 如果該值大于代理中設定的max.transaction.timeout.ms,則請求将失敗,并出現InvalidTransactionTimeout錯誤。 預設是60000ms。這使得事務不會阻塞下遊消費超過一分鐘,這在實時應用中通常是允許的。 |
transactional.id | 用于事務傳遞的TransactionalId。 這支援跨多個生産者會話的可靠性語義,因為它允許用戶端保證使用相同TransactionalId的事務在啟動任何新事務之前已經完成。 如果沒有提供TransactionalId,那麼生産者将被限制為幂等傳遞。 如果配置了transactional.id屬性,則enable.idempotence 會被設定為true. |
Consumer configs
配置項 | 描述 |
isolation.level | (default is read_uncommitted) read_uncommitted:按照偏移量順序使用已送出和未送出的消息。 read_committed:隻使用非事務性消息或按偏移順序送出的事務性消息。為了保持偏移順序,這個設定意味着我們必須在消費者中緩沖消息,直到我們看到給定事務中的所有消息。 |
幂等性和事務性的關系
事務屬性實作前提是幂等性,即在配置事務屬性transaction id時,必須還得配置幂等性;但是幂等性是可以獨立使用的,不需要依賴事務屬性。
- 幂等性引入了Porducer ID
- 事務屬性引入了Transaction Id屬性
transactionalId 、producerId 和 producerEpoch
一個app有一個tid,同一個應用的不同執行個體PID是一樣的,隻是epoch的值不同。
對于同一個事務ID,先保證epoch小的producer執行init-transaction和committransaction,然後epoch較大的procuder才能開始執行init-transaction和commit-transaction,如下順序:
有了transactionId後,Kafka可保證:
跨Session的資料幂等發送。當具有相同Transaction ID的新的Producer執行個體被建立且工作時,舊的且擁有相同Transaction ID的Producer将不再工作。kafka保證了關聯同一個事務的所有producer(一個應用有多個執行個體)必須按照順序初始化事務、和送出事務,否則就會有問題,這保證了同一事務ID中消息是有序的(不同執行個體得按順序建立事務和送出事務)。
spring-kafka的事務設定
kafka是跨Session的資料幂等發送,即如果應用部署多個執行個體時常會遇到“org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer’s transaction has been expired by the broker.”,必須保證這些執行個體生成者的送出事務順序和建立順序保持一緻才可以,否則就無法成功。其實,在實踐中,我們更多的是如何實作對應用單執行個體的事務性。可以通過spring-kafaka實作思路來學習,即每次建立生成者都設定一個不同的transactionId的值,如下代碼:
====================================
類名:ProducerFactoryUtils
====================================
/**
* Obtain a Producer that is synchronized with the current transaction, if any.
* @param producerFactory the ConnectionFactory to obtain a Channel for
* @param <K> the key type.
* @param <V> the value type.
* @return the resource holder.
*/
public static <K, V> KafkaResourceHolder<K, V> getTransactionalResourceHolder(
final ProducerFactory<K, V> producerFactory) {
Assert.notNull(producerFactory, "ProducerFactory must not be null");
// 1.對于每一個線程會生成一個唯一key,然後根據key去查找resourceHolder
@SuppressWarnings("unchecked")
KafkaResourceHolder<K, V> resourceHolder = (KafkaResourceHolder<K, V>) TransactionSynchronizationManager
.getResource(producerFactory);
if (resourceHolder == null) {
// 2.建立一個消費者
Producer<K, V> producer = producerFactory.createProducer();
// 3.開啟事務
producer.beginTransaction();
resourceHolder = new KafkaResourceHolder<K, V>(producer);
bindResourceToTransaction(resourceHolder, producerFactory);
}
return resourceHolder;
}
在spring-kafka中,對于一個線程建立一個producer,事務送出之後,還會關閉這個producer并清除,後續同一個線程或者新的線程重新執行事務時,此時就會重新建立producer。
建立消費者代碼
====================================
類名:DefaultKafkaProducerFactory
====================================
protected Producer<K, V> createTransactionalProducer() {
Producer<K, V> producer = this.cache.poll();
if (producer == null) {
Map<String, Object> configs = new HashMap<>(this.configs);
// 對于每一次生成producer時,都設定一個不同的transactionId
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
this.transactionIdPrefix + this.transactionIdSuffix.getAndIncrement());
producer = new KafkaProducer<K, V>(configs, this.keySerializer, this.valueSerializer);
// 1.初始化話事務。
producer.initTransactions();
return new CloseSafeProducer<K, V>(producer, this.cache);
}
else {
return producer;
}
}
Consume-transform-Produce 的流程圖
流程1 :查找Tranaction Corordinator。
Producer向任意一個brokers發送 FindCoordinatorRequest請求來擷取Transaction Coordinator的位址。
流程2:初始化事務 initTransaction
Producer發送InitpidRequest給事務協調器,擷取一個Pid。InitpidRequest的處理過程是同步阻塞的,一旦該調用正确傳回,Producer就可以開始新的事務。TranactionalId通過InitpidRequest發送給Tranciton Corordinator,然後在Tranaciton Log中記錄這<TranacionalId,pid>的映射關系。除了傳回PID之外,還具有如下功能:
對PID對應的epoch進行遞增,這樣可以保證同一個app的不同執行個體對應的PID是一樣的,但是epoch是不同的。
復原之前的Producer未完成的事務(如果有)
流程3: 開始事務beginTransaction
執行Producer的beginTransacion(),它的作用是Producer在本地記錄下這個transaction的狀态為開始狀态。
注意:這個操作并沒有通知Transaction Coordinator。
流程4: Consume-transform-produce loop
流程4.0: 通過Consumtor消費消息,處理業務邏輯
流程4.1: producer向TransactionCordinantro發送AddPartitionsToTxnRequest
在producer執行send操作時,如果是第一次給<topic,partion>發送資料,此時會向Trasaction Corrdinator發送一個AddPartitionsToTxnRequest請求,Transaction Corrdinator會在transaction log中記錄下tranasactionId和<topic,partion>一個映射關系,并将狀态改為begin。AddPartionsToTxnRequest的資料結構如下:
AddPartitionsToTxnRequest => TransactionalId PID Epoch [Topic [Partition]]
TransactionalId => string
PID => int64
Epoch => int16
Topic => string
Partition => int32
流程4.2: producer#send發送 ProduceRequst
生産者發送資料,雖然沒有還沒有執行commit或者absrot,但是此時消息已經儲存到kafka上,而且即使後面執行abort,消息也不會删除,隻是更改狀态字段辨別消息為abort狀态。
流程4.3: AddOffsetCommitsToTxnRequest
Producer通過KafkaProducer.sendOffsetsToTransaction 向事務協調器器發送一個AddOffesetCommitsToTxnRequests:
AddOffsetsToTxnRequest => TransactionalId PID Epoch ConsumerGroupID
TransactionalId => string
PID => int64
Epoch => int16
ConsumerGroupID => string
在執行事務送出時,可以根據ConsumerGroupID來推斷_customer_offsets主題中相應的TopicPartions資訊。這樣在
流程4.4: TxnOffsetCommitRequest
Producer通過KafkaProducer.sendOffsetsToTransaction還會向消費者協調器Cosumer Corrdinator發送一個TxnOffsetCommitRequest,在主題_consumer_offsets中儲存消費者的偏移量資訊。
TxnOffsetCommitRequest => ConsumerGroupID
PID
Epoch
RetentionTime
OffsetAndMetadata
ConsumerGroupID => string
PID => int64
Epoch => int32
RetentionTime => int64
OffsetAndMetadata => [TopicName [Partition Offset Metadata]]
TopicName => string
Partition => int32
Offset => int64
Metadata => string
流程5: 事務送出和事務終結(放棄事務)
通過生産者的commitTransaction或abortTransaction方法來送出事務和終結事務,這兩個操作都會發送一個EndTxnRequest給Transaction Coordinator。
流程5.1:EndTxnRequest。Producer發送一個EndTxnRequest給Transaction Coordinator,然後執行如下操作:
-
Transaction Coordinator會把PREPARE_COMMIT or PREPARE_ABORT
消息寫入到transaction log中記錄
- 執行流程5.2
- 執行流程5.3
流程5.2:WriteTxnMarkerRequest
WriteTxnMarkersRequest => [CoorinadorEpoch PID Epoch Marker [Topic [Partition]]]
CoordinatorEpoch => int32
PID => int64
Epoch => int16
Marker => boolean (false(0) means ABORT, true(1) means COMMIT)
Topic => string
Partition => int32
- 對于Producer生産的消息。Tranaction Coordinator會發送WriteTxnMarkerRequest給目前事務涉及到每個<topic,partion>的leader,leader收到請求後,會寫入一個COMMIT(PID) 或者 ABORT(PID)的控制資訊到data log中
-
對于消費者偏移量資訊,如果在這個事務裡面包含_consumer-offsets主題。Tranaction
Coordinator會發送WriteTxnMarkerRequest給Transaction Coordinartor,Transaction Coordinartor收到請求後,會寫入一個COMMIT(PID) 或者ABORT(PID)的控制資訊到 data log中。
- 隻會保留這個事務對應的PID和timstamp。然後把目前事務其他相關消息删除掉,包括PID和tranactionId的映射關系。
References:
- https://www.orchome.com/303
- http://www.heartthinkdo.com/?p=2040#4