文章目錄
-
- 1. Key和Value
- 2. 失敗重試
- 3. 異步發送
- 4. 線程安全
- 5. Acks
- 6. Batch
- 7. 單個請求的最大值
- 8. OOM
- 9. 分區順序
- 10. 順序保證
- 11. Producer 幂等性
-
- 11.1 Producer 幂等性設定
- 11.2 幂等性原理
- 11.3 原因分析
- 12. Producer 開啟事務
- 12.1 Producer 事務示例
-
- 12.1.2 查找TransactionCoordinator事務實作原理
- 12.1.3 擷取PID
- 12.1.4 開啟事務
- 12.1.5 Consume-Porcess-Produce Loop
- 12.1.6 送出或者中斷事務
1. Key和Value
0.10.2.2版本的Kafka的消息字段隻有兩個:Key和Value。
- Key:消息的辨別。
- Value:消息内容。
為了便于追蹤,請為消息設定一個唯一的Key。您可以通過Key追蹤某消息,列印發送日志和消費日志,了解該消息的發送和消費情況。
2. 失敗重試
分布式環境下,由于網絡等原因偶爾發送失敗是常見的。導緻這種失敗的原因可能是消息已經發送成功,但是Ack失敗,也有可能是确實沒發送成功。
消息隊列Kafka版是VIP網絡架構,會主動斷開空閑連接配接(30秒沒活動),是以,不是一直活躍的用戶端會經常收到 “connection rest by peer” 錯誤,建議重試消息發送。
重試參數
您可以根據業務需求,設定以下重試參數:
-
,重試次數,建議設定為retries
。失敗重試将會導緻資料亂序,如果要保證消息有序,設定3
,這樣就能保證消息有序性,但是會影響生産者吞吐量。是以隻有在對消息的順序有嚴格要求的情況下才能這麼做。max.in.flight.requests.per.connection =1
-
,重試間隔,建議設定為retry.backoff.ms
。1000
3. 異步發送
發送接口是異步的,如果您想得到發送的結果,可以調用使用回調函數:
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (recordMetadata == null) {
e.printStackTrace();
}else {
long offset = recordMetadata.offset();
System.out.println("sender success:"+offset);
}
}
});
4. 線程安全
Producer 是線程安全的,且可以往任何 Topic 發送消息。通常情況下,一個應用對應一個 Producer 就足夠了。
5. Acks
acks
配置表示 producer 發送消息到 broker 上以後的确認值。有三個可選項
-
:表示 producer 不需要等待 broker 的消息确認。這個選項時延最小但同
時風險最大(因為當 server 當機時,資料将會丢失)。
-
1
:表示 producer 隻需要獲得 kafka 叢集中的 leader 節點确認即可,這個
選擇時延較小同時確定了 leader 節點确認接收成功。
-
all(-1)
:需要 ISR 中所有的 Replica(副本)給予接收确認,速度最慢,安全性最高,
但是由于 ISR 可能會縮小到僅包含一個 Replica,是以設定參數為 all 時并不能一定避免資料丢失。
一般建議選擇
acks=1
,重要的服務可以設定
acks=all
。
6. Batch
-
生産者發送多個消息到 broker上的同一個分區時,為了減少網絡請求帶來的性能開銷,通過批量的方式來送出消息,可以通過這個參數來控制批量送出的位元組數大小,預設大小是16384byte,也就是16kb,意味着當一批消息大小達到指定的batch.size
的時候會統一發送。消息超過 16kb,将會動态申請記憶體,發送消息後回收動态記憶體。batch.size
-
Producer 預設會把兩次發送時間間隔内收集到的所有 Requests 進行一次聚合然後再發送,以此提高吞吐量,而linger.ms
就是為每次發送到 broker 的請求增加一些延遲,以此來聚合更多的消息請求。linger.ms
batch.size
和
linger.ms
這兩個參數是 kafka 性能優化的關鍵參數,很多同學會發現
batch.size
和
linger.ms
這兩者的作用是一樣的,如果兩個都配置了,那麼怎麼工作的呢?實際上,當二者都配置的時候,隻要滿足其中一個要求,就會發送請求到 broker上。
batch.size
有助于提高吞吐,
linger.ms
有助于控制延遲。您可以根據具體業務需求進行調整。
7. 單個請求的最大值
max.request.size
:設定請求的資料的最大位元組數,為了防止發生較大的資料包影響到吞吐量,預設值為1MB。超過 1MB 将會報錯。
8. OOM
結合 Kafka 的 Batch 設計思路,Kafka 會緩存消息并打包發送,如果緩存太多,則有可能造成OOM(Out of Memory)。
-
: 所有緩存消息的總體大小超過這個數值後,就會觸發把消息發往伺服器。此時會忽略buffer.memory
和batch.size
的限制。linger.ms
-
的預設數值是32 MB,對于單個 Producer 來說,可以保證足夠的性能。 需要注意的是,如果您在同一個JVM中啟動多個 Producer,那麼每個 Producer 都有可能占用 32 MB緩存空間,此時便有可能觸發 OOM。buffer.memory
- 在生産時,一般沒有必要啟動多個 Producer;如果特殊情況需要,則需要考慮
的大小,避免觸發 OOM。buffer.memory
9. 分區順序
單個分區(Partition)内,消息是按照發送順序儲存的,是基本有序的。
預設情況下,消息隊列 Kafka 為了提升可用性,并不保證單個分區内絕對有序,發生消息重試或者當機時,會産生消息亂序(某個分區挂掉後把消息 Failover 到其它分區)。
10. 順序保證
Kafka 可以保證同一個分區裡的消息是有序的。也就是說,如果生産者按照一定的順序發送消息,broker 就會按照這個順序把它們寫入分區,消費者也會按照同樣的順序讀取它們。
在某些情況下,順序是非常重要的。例如,往一個賬戶存入 100 元再取出來,這個與先取錢再存錢是截然不同的。不過,有些場景對順序不是很敏感。
如果把
retries
設為非零整數,同時把
max.in.flight.requests.per.connection
設為比 1 大的數,那麼,如果第一個批次消息寫入失敗,而第二個批次寫入成功,broker 會重試寫入第一個批次。如果此時第一個批次也寫入成功,那麼兩個批次的順序就反過來了。
一般來說,如果某些場景要求消息是有序的,那麼消息是否寫入成功也是很關鍵的,是以不建議把
retries
設為 0。可以把
max.in.flight.requests.per.connection
設為 1,這樣在生産者嘗試發送第一批消息時,就不會有其他的消息發送給 broker。不過這樣會嚴重影響生産者的吞吐量,是以隻有在對消息的順序有嚴格要求的情況下才能這麼做。
11. Producer 幂等性
11.1 Producer 幂等性設定
Producer 的幂等性指的是當發送同一條消息時,資料在 Server 端隻會被持久化一次,資料不丟不重,但是這裡的幂等性是有條件的:
- 隻能保證 Producer 在單個會話内不丟不重,如果 Producer 出現意外挂掉再重新開機是無法保證的(幂等性情況下,是無法擷取之前的狀态資訊,是以是無法做到跨會話級别的不丢不重);
- 幂等性不能跨多個 Topic-Partition,隻能保證單個 partition 内的幂等性,當涉及多個 Topic-Partition 時,這中間的狀态并沒有同步。
如果需要跨會話、跨多個 topic-partition 的情況,需要使用 Kafka 的事務性來實作。
使用方式:
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
當幂等性開啟的時候acks即為all。如果顯性的将acks設定為0,-1,那麼将會報錯
Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence.
示例如下:
// 保證 producer 的幂等性使用 這2個參數,該幂等性隻能保證分區内幂等。producer重新開機失效。
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.ACKS_CONFIG,"all");
11.2 幂等性原理
幂等性是通過兩個關鍵資訊保證的,PID(Producer ID) 和 sequence numbers。
- PID 用來辨別每個producer client
- sequence numbers 用戶端發送的每條消息都會帶相應的 sequence number,Server 端就是根據這個值來判斷資料是否重複
producer初始化會由server端生成一個PID,然後發送每條資訊都包含該PID和sequence number,在server端,是按照partition同樣存放一個sequence numbers 資訊,通過判斷用戶端發送過來的sequence number與server端number+1內插補點來決定資料是否重複或者漏掉。
通常情況下為了保證資料順序性,我們可以通過
max.in.flight.requests.per.connection=1
來保證,這個也隻是針對單執行個體。在kafka2.0+版本上,隻要開啟幂等性,不用設定這個參數也能保證發送資料的順序性。
11.3 原因分析
為什麼要求 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 小于等于5
其實這裡,要求 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 小于等于 5 的主要原因是:Server 端的 ProducerStateManager 執行個體會緩存每個 PID 在每個 Topic-Partition 上發送的最近 5 個batch 資料(這個 5 是寫死的,至于為什麼是 5,可能跟經驗有關,當不設定幂等性時,當這個設定為 5 時,性能相對來說較高,社群是有一個相關測試文檔),如果超過 5,ProducerStateManager 就會将最舊的 batch 資料清除。
假設應用将 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 設定為 6,假設發送的請求順序是 1、2、3、4、5、6,這時候 server 端隻能緩存 2、3、4、5、6 請求對應的 batch 資料,這時候假設請求 1 發送失敗,需要重試,當重試的請求發送過來後,首先先檢查是否為重複的 batch,這時候檢查的結果是否,之後會開始 check 其 sequence number 值,這時候隻會傳回一個 OutOfOrderSequenceException 異常,client 在收到這個異常後,會再次進行重試,直到超過最大重試次數或者逾時,這樣不但會影響 Producer 性能,還可能給 Server 帶來壓力(相當于client 狂發錯誤請求)。
12. Producer 開啟事務
12.1 Producer 事務示例
package com.mock.data.stream.kafka;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* https://www.cnblogs.com/fnlingnzb-learner/p/13646390.html
* producer 啟用事務
*/
public class ProducerSenderTransactional {
public ProducerSenderTransactional() {
producer = new KafkaProducer<String, String>(producerConfig());
// 初始化事務
producer.initTransactions();
}
public Map<String, Object> producerConfig(){
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop102:9094,hadoop103:9092");
props.put(ProducerConfig.CLIENT_ID_CONFIG,"flink_id");
props.put(ProducerConfig.RETRIES_CONFIG, "3");
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1048576);// 1M
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);// 16kb
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);//預設為0
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432L);// 32M
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 保證 producer 的幂等性使用 這2個參數,該幂等性隻能保證分區内幂等。producer重新開機失效。
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.ACKS_CONFIG,"all");
// 事務ID
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactionalId_0");
// 保證消息順序性
props.put("max.in.flight.requests.per.connection",1);
return props;
}
KafkaProducer<String, String> producer;
public void send(String topic, String key, String value) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key,value);
producer.beginTransaction();
boolean success = true;
try {
RecordMetadata recordMetadata = producer.send(record).get(3, TimeUnit.SECONDS);
long offset = recordMetadata.offset();
System.out.println("發送成功:"+offset);
if (key.contains("1")) {
throw new RuntimeException("發送失敗");
}
}catch (Exception e) {
e.printStackTrace();
success = false;
} finally {
if (success) {
// 送出事務
producer.commitTransaction();
} else {
// 中斷事務
producer.abortTransaction();
}
}
}
public static void main(String[] args) throws Exception{
ProducerSenderTransactional sender2 = new ProducerSenderTransactional();
String topic1 = "topic1";
for (int i = 0; i < 10; i++) {
String key = "key_"+i;
String value = "value"+i;
sender2.send(topic1,key,value);
}
Thread.sleep(600000);
}
}
12.1.2 查找TransactionCoordinator事務實作原理
通過transaction_id 找到TransactionCoordinator,具體算法是
Utils.abs(transaction_id.hashCode %transactionTopicPartitionCount )
,擷取到partition,再找到該partition的leader,即為TransactionCoordinator。
12.1.3 擷取PID
凡是開啟幂等性都是需要生成PID(Producer ID),隻不過未開啟事務的PID可以在任意broker生成,而開啟事務隻能在TransactionCoordinator節點生成。這裡隻講開啟事務的情況,Producer Client的
initTransactions()
方法會向TransactionCoordinator發起InitPidRequest ,這樣就能擷取PID。這裡面還有一些細節問題,這裡不探讨,例如transaction_id 之前的事務狀态什麼的。但需要說明的一點是這裡會将 transaction_id 與相應的 TransactionMetadata 持久化到事務日志(_transaction_state)中。
12.1.4 開啟事務
Producer調用
beginTransaction
開始一個事務狀态,這裡隻是在用戶端将本地事務狀态轉移成 IN_TRANSACTION,隻有在發送第一條資訊後,TransactionCoordinator才會認為該事務已經開啟。
12.1.5 Consume-Porcess-Produce Loop
這裡說的是一個典型的
consume-process-produce
場景:
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
producer.beginTransaction();
//start
for (ConsumerRecord record : records){
producer.send(producerRecord(“outputTopic1”, record));
producer.send(producerRecord(“outputTopic2”, record));
}
producer.sendOffsetsToTransaction(currentOffsets(consumer), group);
//end
producer.commitTransaction();
}
AddPartitionsToTxnRequest該階段主要經曆以下幾個步驟:
- ProduceRequest
- AddOffsetsToTxnRequest
- TxnOffsetsCommitRequest
關于這裡的詳細介紹可以檢視官網文檔!
12.1.6 送出或者中斷事務
Producer 調用
commitTransaction()
或者
abortTransaction()
方法來 commit 或者 abort 這個事務操作。
基本上經曆以下三個步驟,才真正結束事務。
- EndTxnRequest
- WriteTxnMarkerRquest
- Writing the Final Commit or Abort Message
其中EndTxnRequest是在Producer發起的請求,其他階段都是在TransactionCoordinator端發起完成的。WriteTxnMarkerRquest是發送請求到partition的leader上寫入事務結果資訊(ControlBatch),第三步主要是在
_transaction_state
中标記事務的結束。