首先推薦一個關于Kafka的中文網站:http://orchome.com/kafka/index
部分翻譯直接參考此網站内容,但是網站目前的API版本為0.10.0.1,是以在學習過程中,自行翻譯了一下0.11.0的API文檔。在翻譯過程中有些地方也不是太了解,感覺翻譯的不太準确,有問題的地方望讀者指出。
原文位址:http://kafka.apache.org/0110/javadoc/index.html? org/apache/kafka/clients/producer/KafkaProducer.html
public class KafkaProducer<K,V>
extends Object
implements Producer<K,V>
Kafka用戶端釋出消息至kafka叢集。
生産者是線程安全的,線上程之間共享單個生産者執行個體通常比持有多個執行個體更快。
下面是一個簡單的例子,它使用生産者将包含有序數字的字元串消息作為鍵/值對發送。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i),
Integer.toString(i)));
producer.close();
生産者的緩沖空間池保留尚未發送到伺服器的消息,背景I/O線程負責将這些消息轉換成請求發送到叢集。如果使用後不關閉生産者,則會洩露這些資源。
send()方法是異步的,添加消息到緩沖區等待發送,并立即傳回。這允許生産者将單個的消息批量在一起發送來提高效率。
ack是判别請求是否為完整的條件。我們指定了“all”将會阻塞消息,這種設定使性能最低,但是是最可靠的。
retries,如果請求失敗,生産者會自動重試,我們指定是0次即不啟動重試,如果啟用重試,則會有重複消息的可能性。
batch.size ,producer為每個分區未發送的消息保持一個緩沖區。緩存的大小是通過 batch.size 配置指定的。值較大的話将會産生更大的批處理。并需要更多的記憶體(因為通常我們會為每個“活躍”的分區都設定1個緩沖區)。
linger.ms預設情況,即便緩沖空間還沒有滿,緩沖也可立即發送,但是,如果想減少請求的數量,可以設定linger.ms大于0。這将訓示生産者發送請求之前等待一段時間,希望更多的消息填補到未滿的批中。這類似于TCP的算法,例如上面的代碼段,可能100條消息在一個請求發送,因為我們設定了linger(逗留)時間為1毫秒,然後,如果我們沒有填滿緩沖區,這個設定将增加1毫秒的延遲請求以等待更多的消息。需要注意的是,在高負載下,即使是 linger.ms=0,相近的時間一般也會組成批。在不處于高負載的情況下,如果設定比0大,将以少量的延遲代價換取更少的,更有效的請求。
buffer.memory 控制生産者可用的緩存總量,如果消息發送速度比它們能夠傳輸到伺服器的速度快,将會耗盡這個緩存空間。當緩存空間耗盡,其他發送調用将被阻塞,阻塞時間的門檻值通過max.block.ms設定,之後它将抛出一個TimeoutException。
key.serializer和value.serializer示例,将使用者提供的key和value對象ProducerRecord轉換成位元組,你可以使用附帶的ByteArraySerializaer或StringSerializer處理簡單的string或byte類型。
從kafka 0.11開始,KafkaProducer 支援兩種額外的模式:idempotent producer和transactional producer。
idempotent producer(幂等性生産者)增強Kafka的投遞語義,從至少一次投遞變為完全的一次投遞,特别是消息重試将不再重複提出。
transactional producer(事務性生産者)允許應用程式将消息發送給多個原子分區(和topics)。
使用idempotent,enable.idempotence配置項必須設定為true,如果設定為true,重試配置項(retries )将預設設定為 Integer.MAX_VALUE,max.inflight.requests.per.connection 将預設設定為 1,asks config(确認配置)将預設設定為all 。idempotent producer的API并沒有改變,是以現有的應用程式應用此特性時不需要做修改。
為了充分利用idempotent producer,必須避免應用級别的重試發送,因為這樣不能de-duplicated(去耦合/去重複:此處不是太了解,不知道如何翻譯)。是以,如果應用程式啟用了idempotence,建議取消retries (重試)配置,因為它将被預設為Integer.MAX_VALUE。此外,如果send(ProducerRecord)即使有無限重試還是傳回了一個錯誤(例如,如果消息在發送之前在緩沖區中過期),則建議關閉producer 并檢查最後生成的消息的内容,以確定它不是重複的。最後,producer 隻能保證在單個會話中發送消息的idempotent 特性。
使用transactional producer和與它相關的API,則必須設定transactional.id配置屬性,如果transactional.id被設定,idempotence 會随着其所依賴的producer的配置被自動啟用,此外,transactions 中包含的topics應該配置為持久性的。特别是,replication.factor(複制因子)至少應該為3,這些topics的 min.insync.replicas應該設為2。最後,為了保證transactional 端到端的實作,consumers必須配置為隻讀取送出的資訊。
transactional.id的目的是在單個生産者執行個體的多個會話中啟用事務恢複(transaction recovery )。它通常由分區、狀态和應用程式中的shard辨別符派生而來。是以,對于在分區應用程式中運作的每個生産者執行個體來說,它應該是惟一的。
所有新的transactional api都是阻塞的,并且在故障時抛出異常。下面的示例示範了如何使用新的api。它與上面的示例類似,隻是所有100個消息都是單個事務的一部分。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new
StringSerializer());
producer.initTransactions();
try {
producer.beginTransaction();
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i),
Integer.toString(i)));
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and
exit.
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction();
}
producer.close();
正如在示例中所暗示的那樣,每個生産者隻能有一個打開的事務。beginTransaction()和commitTransaction()之間發送的所有消息都将是單個事務的一部分。當transactional.id被指定,producer發送的所有消息必須是事務的一部分。
transactional producer使用異常來傳達錯誤狀态。具體地說,不需要為producer . send()或調用. get()指定回調函數:如果任何一個producer.send()或事務性調用在事務中遇到不可恢複的錯誤,則将抛出KafkaException。檢視send(ProducerRecord)文檔,了解從事務發送中探知錯誤的更多細節。
在接收一個KafkaException時,通過調用producer.abortTransaction()我們可以確定任何成功的寫操作标記為失敗(中止),是以保持事務保證。
這個用戶端可以與0.10.0版本或更新的brokers進行通信。舊的或新的brokers可能不支援某些用戶端特性。例如,事務api需要broker 0.11.0版本或更高。當調用的API在運作broker版本中不可用,你将收到一個UnsupportedVersionException。
構造函數總結:
public KafkaProducer(Map<String,Object> configs)
producer通過提供一組鍵值對作為配置來執行個體化。有效配置字元串都記錄在這裡。值可以是字元串或适當類型的對象(例如,數字配置可以接受字元串“42”或整數42)。
public KafkaProducer(Map<String,Object> configs,Serializer<K> keySerializer,Serializer<V> valueSerializer)
producer通過提供一組鍵值對作為配置、一個鍵和一個值序列化器來執行個體化。有效配置字元串都記錄在這裡。值可以是字元串或适當類型的對象(例如,數字配置可以接受字元串“42”或整數42)。
public KafkaProducer(Properties properties)
public KafkaProducer(Properties properties,Serializer<K> keySerializer,Serializer<V> valueSerializer)
同上。
方法總結:
public void initTransactions()
當在配置中設定了transactional.id,該方法需要在任何其他方法之前被調用,該方法執行以下步驟:1、確定由producer先前執行個體發起的任何事物都已經完成。如果前一個執行個體的事務在程序中失敗,它将被終止。如果最後一個事務已經開始完成,但還沒有完成,該方法将等待它完成。2、擷取producer的内部ID和epoch(紀元?此處不清楚準确的翻譯),用于 producer.釋出的所有未來的事務消息。
Specified by:
initTransactions in interface Producer<K,V>
Throws:
IllegalStateException - 如果配置中沒有設定producer的transactional.id則抛出此異常。
public void beginTransaction()
需要在每個新事務開始之前調用。
Specified by:
beginTransaction in interface Producer<K,V>
Throws:
ProducerFencedException - 如果另一個活躍的producer有相同的transactional.id則抛出此異常。
public void sendOffsetsToTransaction(Map<TopicPartition,OffsetAndMetadata> offsets,String consumerGroupId)
向consumer組協調器發送被消耗的偏移量清單,并将這些偏移量标記為目前事務的一部分。隻有當事務成功送出時,這些偏移量才會被視為消費掉的。當您需要将消費和生成的消息一起批量處理時,應該使用此方法,通常在consume-transform-produce 模式中使用。
Specified by:
sendOffsetsToTransaction in interface Producer<K,V>
Throws:
ProducerFencedException - 如果另一個活躍的producer有相同的transactional.id則抛出此異常。
public void commitTransaction()
送出正在進行中的事務。此方法将在實際送出事務之前flush任何未發送的消息。此外,如何事務包含部分的任何send(ProducerRecord) 調用觸發不可恢複的錯誤,那麼該方法将立即抛出最後一個接收到的異常,而事務将不會被送出。是以,在事務中對send(ProducerRecord)的調用必須成功,以便此方法成功。
Specified by:
commitTransaction in interface Producer<K,V>
Throws:
ProducerFencedException - 如果另一個活躍的producer有相同的transactional.id則抛出此異常。
public void abortTransaction()
中止正在進行中的事務。當此調用完成時,任何未flush的生成消息将中止。如果任何先前的send(ProducerRecord)調用有ProducerFencedException或ProducerFencedException 導緻的調用失敗,此調用将立即抛出異常。
Specified by:
abortTransaction in interface Producer<K,V>
Throws:
ProducerFencedException - 如果另一個活躍的producer有相同的transactional.id則抛出此異常。
public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback)
異步發送消息到topic,并在發送的消息被确認的時候調用所提供的回調。
send是異步的,并且一旦消息被儲存在等待發送的消息緩沖區中,此方法就立即傳回。這樣并行發送多條消息而不阻塞去等待每一條消息的響應。
發送的結果是一個RecordMetadata,它指定了消息發送的分區,配置設定的offset和消息的時間戳。如果topic使用的是CreateTime,則使用使用者提供的時間戳或發送的時間(如果使用者沒有指定消息的時間戳時使用發送的時間)如果topic使用的是LogAppendTime,則追加消息時,時間戳是broker的本地時間。
因為send 調用是異步的,它将為配置設定給消息的RecordMetadata傳回一個Future。如果future調用get(),則将阻塞,直到相關請求完成并傳回該消息的metadata,或抛出發送異常。
如果你想模拟一個簡單的阻塞調用,您可以立即調用get()方法:
byte[] key = "key".getBytes();
byte[] value = "value".getBytes();
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value);
producer.send(record).get();
完全非阻塞的使用可以利用回調參數提供一個回調,當請求完成時将被調用。
producer.send(myRecord,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null)
e.printStackTrace();
System.out.println("The offset of the record we just sent is: " +
metadata.offset());
}
});
發送到同一個分區的消息回調保證按一定的順序執行,也就是說,在下面的例子中 callback1 保證執行 callback2 之前:
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1);
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);
當send作為事務的一部分使用時,不需要定義回調或者檢查future的結果來檢查send的錯誤。如果任何send因為一個不可恢複的錯誤而調用失敗,則最終的commitTransaction() 調用将失敗,并且在最後的發送失敗時抛出異常。當發生這種情況的時候,應用程式應該調用abortTransaction()來重置狀态并繼續發送資料。
有些事務發送錯誤無法通過調用abortTransaction()來解決。特别是,如果一個事務發送完成時伴随着一個ProducerFencedException,OutOfOrderSequenceException,
UnsupportedVersionException,或一個AuthorizationException等異常,那麼唯一的選擇就是調用close()。重大的錯誤導緻生産者進入無效狀态,在這種狀态下,future的API調用将同樣的底層錯誤包裝在新的KafkaException中抛出。
當啟用idempotence(幂等性),但沒有配置transactional.id 的時候,是一個類似的場景。在這種情況下,UnsupportedVersionException和AuthorizationException被視為重大錯誤。但是,ProducerFencedException不需要被處理。此外,它有可能在收到一個OutOfOrderSequenceException之後繼續發送資訊,但是這樣做可能導緻等待中的消息的無序投遞,為了確定正确的順序,你應該關閉生産者并建立一個新的執行個體。
如果目标topic的消息格式不更新到0.11.0.0,idempotent(幂等性)和transactional(事務性)的生産請求将失敗并伴随一個UnsupportedForMessageFormatException的錯誤。如果在事務中遇到這種情況,則可以中止并繼續執行。但是需要注意的是,之後發送到同一topic将會繼續收到相同的異常,直到topic被更新。
注意:callback一般在生産者的I/O線程中執行,是以是相當的快的,否則将延遲其他的線程的消息發送。如果你想要執行阻塞或計算代價高昂的回調,建議在callback主體中使用自己的Executor來并行處理。
Specified by:
send in interface Producer<K,V>
Parameters:
record - 發送的消息
callback - 當消息被伺服器确認的時候執行使用者提供的回調 (null 表示沒有回調)
Throws:
IllegalStateException -如果配置了transactional.id 但沒有事務啟動。
InterruptException - 如果線程在阻塞時被中斷
SerializationException - 如果序列化配置了無效的鍵值對象
TimeoutException - 如果擷取metadata或為消息配置設定記憶體所消耗的時間超過了 max.block.ms設定的值。
KafkaException -如果出現了不屬于公共API異常的Kafka相關的錯誤。
public void flush()
調用此方法可以使所有的緩沖消息立即可以發送(即使linger.ms配置的值大于0)并且阻塞與這些資訊相關聯的請求的完成。flush()後置條件是任何先前發送的記錄已經完成(舉例來說就是,Future.isDone() == true)。一個請求根據你指定的确認配置被成功确認之後則被認為是完成的,否則會導緻錯誤。
當一個線程被阻塞等待一個flush()調用完成時,其它線程可以繼續發送消息,但是不能保證關于flush調用開始之後發送的消息的完成。
這個方法可以用于從一些輸入系統消費消息并生産至kafka中。flush()調用提供了一種友善的方法來確定所有以前發送的消息實際上已經完成。
這個示例展示了如何從一個Kafka topic中消費,并生成至另一個Kafka topic:
for(ConsumerRecord<String, String> record: consumer.poll(100))
producer.send(new ProducerRecord("my-topic", record.key(), record.value());
producer.flush();
consumer.commit();
需要注意的是,上述示例可能在生産(produce)請求失敗的時候删除消息。如果要確定這種情況不會發生,需要在配置中設定retries=<large_number>。
應用程式不需要為事務性生産者調用此方法,因為commitTransaction()将在執行送出之前flush所有緩沖消息。這将確定在送出之前先前的beginTransaction()之後的所有send(ProducerRecord)調用都已經完成。
Specified by:
flush in interface Producer<K,V>
Throws:
InterruptException - 如果線程在阻塞時被中斷
public List<PartitionInfo> partitionsFor(String topic)
擷取給定topic的分區metadata ,這可以用于自定義分區。
Specified by:
partitionsFor in interface Producer<K,V>
Throws:
InterruptException - 如果線程在阻塞時被中斷
public Map<MetricName,? extends Metric> metrics()
獲得生産者維護的完整的内部度量集。
Specified by:
metrics in interface Producer<K,V>
public void close()
關閉這個生産者,此方法阻塞直到所有以前的發送請求完成。該方法等效于close(Long.MAX_VALUE, TimeUnit.MILLISECONDS)。
如果從回調中調用close(),日志将記錄一條警告消息并以close(0, TimeUnit.MILLISECONDS)調用替代。我們這樣做是因為發送方線程将嘗試連接配接自己并永遠阻塞。
Specified by:
close in interface Closeable
Specified by:
close in interface AutoCloseable
Specified by:
close in interface Producer<K,V>
Throws:
InterruptException - 如果線程在阻塞時被中斷
public void close(long timeout,TimeUnit timeUnit)
此方法等待生産者完成所有未完成請求的發送直到逾時。如果逾時之前生産者不能完成所有請求,此方法将立刻丢棄任何的未發送和未确認的消息。
如果從一個Callback中調用此方法,此方法将不會阻塞,等同于close(0, TimeUnit.MILLISECONDS).這樣做是因為在阻塞生産者的I/O線程時不會發生進一步的發送。
Specified by:
close in interface Producer<K,V>
Parameters:
timeout - 等待生産者完成任何正要發生的請求的最大時間。這個值應該是非負的。指定逾時為0意味着不等待正要發生的請求的完成。
timeUnit - 逾時時間的機關
Throws:
InterruptException - 如果線程在阻塞時被中斷
IllegalArgumentException - 如果逾時時間的值是負的
版權聲明:本文為CSDN部落客「weixin_34249367」的原創文章,遵循CC 4.0 BY-SA版權協定,轉載請附上原文出處連結及本聲明。
原文連結:https://blog.csdn.net/weixin_34249367/article/details/92303790