天天看點

Kakfa生産者詳解

kafka producer所有實作的接口:

java.io.Closeable, java.lang.AutoCloseable, 

Producer

<K,V>

生産者用于向kafka叢集發送消息。

生産者是線程安全的,通常在多線程中使用一個生産者執行個體比使用多個執行個體更快。

下面是一個簡單的例子:

Properties

props = new Properties();

 props.put("bootstrap.servers",

"localhost:9092");

 props.put("acks",

"all");

 props.put("retries",

0);

 props.put("batch.size",

16384);

 props.put("linger.ms",

1);

 props.put("buffer.memory",

33554432);

 props.put("key.serializer",

"org.apache.kafka.common.serialization.StringSerializer");

 props.put("value.serializer",

Producer<String,

String> producer = new KafkaProducer<>(props);

 for (int i = 0; i < 100;

i++)

     producer.send(new

ProducerRecord<String, String>("my-topic", Integer.toString(i),

Integer.toString(i)));

producer.close();

生産者持有一個緩存區,用來緩存尚未發送至服務端的資料,同時啟動一個背景I/O線程用來發送這些消息。使用後沒有成功關閉生産者可能導緻緩存區域的資源洩露。

send()方法是異步的。當被調用時,它将消息添加到待發緩存區并立即傳回。這使得生産者能夠批量的發送消息。

acks配置項控制着發送請求的完成标準。"all" 配置會導緻請求阻塞,,需等待本次請求的所有消息傳回成功确認。這種是最慢的也是最可靠的設定。

如果請求失敗,生産者會自動重試,如果把retries設定為0,就不會重試。若果啟用retries,有可能會導緻資料重複。

生産者為每一個partition分别維護各自的消息緩沖區(buffer)。這些buffer的大小通過batch.size配置項設定。調大這個參數,能夠減少處理批次,但也需要更多的記憶體(因為每個分區通常都有一個buffer)。

不管buffer有沒有剩餘的空間,預設情況下,其中的資料都能立刻發送出去。如果你想要減少發送請求數量,可以通過設定linger.ms 的值大于0。生産者會等待設定的時間後再發送下一次請求,這樣可能使更多的消息填充到同一個batch。這類似于TCP協定中的納格算法。例如上面的代碼片段中,在設定linger.ms=1後,總共有100條消息會在一次請求中發送。這樣會增加1ms的請求等待延時,使buffer未滿的情況下接受更多的消息。注意,如果消息接連過來也會被放在一個batch處理。是以在繁忙的系統中,即便設定了linger.ms=0,也會發生批處理。然而将此設定成大于0的值,可以在繁忙的系統中,以相對較小的時延,使請求數更少并且更高效。

Buffer.memory 控制着生産者所能使用的緩沖區的最大記憶體。如果消息發送比傳輸到伺服器的速度快,記憶體可能會被耗盡。當緩沖區空間被耗盡,send 方法将會被阻塞。阻塞時間由max.block.ms 控制,達到最大阻塞時間,send方法抛出TimeoutException。

key.serializer 和 value.serializer配置表明怎樣把使用者提供給ProducerRecord的key和value對象轉化為位元組。你可以對簡單的string和位元組類型資料使用ByteArraySerializer或StringSerializer進行序列化。

從kafka0.11開始,kafka生産者支援兩種額外的模式:幂等生産者與事務型生産者。幂等生産者使kafka的至少發送一次(at

least once)提升到了精确發送一次(exactly

once)。生産者的retries 設定

将不再造成資料重複。事務型生産者允許往多個分區(和多個topic)中發送資料。

開啟幂等生産者,需将enable.idempotence設定為true。這時retries會被預設為Integer.MAX_VALUE,acks預設設為all。幂等生産者的API并沒有改變,是以現有的應用不用修改即可使用此特性。

為了發揮幂等生産者的優點,避免應用層重複發送消息是很必要的。同樣的,如果應用支援幂等性,推薦不設定retires,使用預設設定即Integer.MAX_VALUE。此外,即便在無限的重試次數下,send方法依然傳回error(比如buffer中的消息在傳輸以前過期),推薦的的做法是關閉生産者,檢查最後一次發送消息的内容以確定沒有重複。最後幂等生産者隻能在一個session中保證消息的幂等性。

如果要使用事務性生産者和其附屬API,你必須正确的設定transactional.id。一旦設定了此配置,幂等性配置也會被自動開啟。此外replication.factor至少設定為3,并且min.insync.replicas應該設定為2。最後為了確定事務端到端的實作,消費者必須設定成隻能消費送出确認的消息。

設定transactional.id的目的是在一個生産者執行個體中回複多個傳輸session。

所有新的事務型API都是阻塞的,可能會抛出失敗。下面的例子示範了使用新版API的方,與上面的代碼類似,除了所有100條消息會在一個事務中處理:

 Properties props = new

Properties();

 props.put("transactional.id",

"my-transactional-id");

 Producer<String, String> producer

= new KafkaProducer<>(props, new StringSerializer(), new

StringSerializer());

producer.initTransactions();

try

{

producer.beginTransaction();

     for (int i = 0; i < 100;

         producer.send(new

ProducerRecord<>("my-topic", Integer.toString(i),

producer.commitTransaction();

 } catch (ProducerFencedException |

OutOfOrderSequenceException | AuthorizationException e) {

     // We can't recover from these

exceptions, so our only option is to close the producer and exit.

     producer.close();

 } catch (KafkaException e) {

     // For all other exceptions, just

abort the transaction and try again.

     producer.abortTransaction();

 }

 producer.close();

如上面的代碼所示,在一個生産者中隻能有一個打開的事務。所有在producer.beginTransaction()與producer.commitTransaction();之間被發送的消息都會在一個事務中。當transactional.id被指定,被這個生産者發送的所有消息必須在一個事務中。

事務型生産者使用異常傳遞錯誤狀态。特别的,沒有必要在producer.send方法中使用回掉函數或在發揮出調用.get()函數。因為如果在producer.send()或者事務調用中出不可恢複的錯誤将會抛出一個異常。

在接收到kafkaexception時,調用producer.abortTransaction();來確定成功寫入的消息被标記為遺棄(abort),以此來保證事務。

在0.10.0或更新的版本中,用戶端可以直接跟broker進行互動。老的broker可能不支援一些新的特性。比如事務型API需要0.11.0或更新的broker版本。當在運作的broker上使用不支援的API時,你會收到UnsupportedVersionException異常。

繼續閱讀