天天看點

Pulsar官方文檔翻譯-概念和架構-Pulsar用戶端(Pulsar Clients)Pulsar 用戶端

本篇翻譯轉自Zongyang在crowdin apache pulsar項目中的翻譯,由于已經完成翻譯,我直接轉載,不再翻譯。

------------------------------------------

Pulsar 用戶端

Pulsar向使用者暴露原生的Java和C++的用戶端API。

Puslar用戶端封裝并優化了用戶端-伺服器的通信協定,并暴露出一套簡單直覺的API,提供給應用程式使用。

底層實作上,目前官方版的Pulsar用戶端支援對使用者透明的連接配接重連、故障切換、未ack消息的緩沖、消息重傳。

自定義用戶端庫

如果您想建立自己的用戶端庫, 我們建議參考Pulsar的自定義 二進制協定 的文檔。

用戶端使用步驟

當應用程式要建立生産者/消費者時, Pulsar用戶端庫執行按以下兩個步驟的工作:

  1. 用戶端将嘗試通過向伺服器(Broker)發送 HTTP 查找請求,來确定主題(Topic)所在的伺服器(Broker)。用戶端通過查詢Zookeeper中(緩存)的中繼資料,來确定這條消息的topic在哪個broker上,如果該topic不在任何一個broker上,則把這個topic配置設定在負載最少的broker上。
  2. 當用戶端擷取了broker的位址之後,将會建立一個TCP連接配接(或複用連接配接池中的連接配接)并且進行鑒權。用戶端和broker通過該連接配接交換基于自定義協定的二進制指令。同時,用戶端會向broker發送一條指令用以在broker上建立生産者/消費者,該指令将會在驗證授權政策後生效。

每當 TCP 連接配接中斷時, 用戶端将立即重新啟動此安裝階段, 并将繼續嘗試使用指數退避重建立立生産者或使用者, 直到操作成功為止。

Reader 接口

在Pulsar中, "标準" 消費者接口 涉及使用消費者監聽 主題, 處理傳入消息, 并在處理完這些消息後最終确認它們。 每當使用者連接配接到某個主題時, 它就會自動開始從最早的沒被确認(unacked)的消息處讀取, 因為該主題的遊标是由Pulsar自動管理的。

使用Pulsar的 讀取器接口, 應用程式可以手動管理遊标。 當使用讀取器連接配接到一個主題而非消費者時,在讀取器連接配接到主題的時候就需要指定讀取器從哪個位置開始讀消息。

當連接配接到某個主題時, 讀取器從以下位置開始讀消息:

  • 主題中的 最早的 可用消息
  • 主題中的 最新 可用消息
  • 除最早的和最新的之外的可用消息位點。如果你使用該選項,你需要顯式的提供一個消息ID。你的應用程式需要自己管理消息ID,例如從持久化的資料存儲或緩存中擷取

Pulsar的讀取器接口在流計算場景下,對提供effective-once的語義很有幫助。 Pulsar能夠将主題的消息進行重放,并從重放後的位置開始讀取消息,是滿足流處理的場景的重要基礎。讀取器界面為Pulsar用戶端在主體内提供了一種底層抽象“手動管理的位點”

Pulsar官方文檔翻譯-概念和架構-Pulsar用戶端(Pulsar Clients)Pulsar 用戶端

未分區主題

Pulsar的讀取器目前無法在已分區的主題上使用。

下面是一個Java語言實作的從主題上最早可用消息的位置開始消費的例子:

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Reader;

// Create a reader on a topic and for a specific message (and onward)
Reader<byte[]> reader = pulsarClient.newReader()
    .topic("reader-api-test")
    .startMessageId(MessageId.earliest)
    .create();

while (true) {
    Message message = reader.readNext();

    // Process the message
}
           

建立一個從最新可用消息處開始讀取消息的讀取器 

Reader<byte[]> reader = pulsarClient.newReader()
    .topic(topic)
    .startMessageId(MessageId.latest)
    .create();
           

建立一個從其他位置(非最早可用且非最新可用消息處)讀取消息的讀取器

byte[] msgIdBytes = // Some byte array
MessageId id = MessageId.fromByteArray(msgIdBytes);
Reader<byte[]> reader = pulsarClient.newReader()
    .topic(topic)
    .startMessageId(id)
    .create();
           

繼續閱讀