本文翻譯自 StreamNative 部落格《Spring into Pulsar》,作者 Tim Spann,StreamNative 布道師。
譯者簡介
姜吉甯,開源愛好者、終生學習者、健身愛好者。Github @jjnnzb[1]。
本文我們來探讨如何在 Java 架構——Spring 中整合 Apache Pulsar。文章闡述如何在 Java 中建構基于 Spring 的微服務。在正文内容開始前,我們先介紹 Spring。Spring 是 Java 生态中鼎鼎有名的技術架構,自誕生已有近 20 年曆史。Spring 提供了極為友善的裝配與控制機制,極大地降低了建構應用的難度。有了 Spring,開發者無需堆砌非業務相關的重複模闆代碼。基于 Spring,開發者可以如魚得水般快速開發微服務應用,包括各類 REST API、Web 應用程式、控制台應用程式等。推薦大家深入研究 Spring。
如果你想基于 Spring 來開發自己的第一個應用,推薦打開官方提供的 Spring Starter 起步連結[2]。借助這個連結,可以不費吹灰之力搭建好腳手架,并在此基礎之上編碼,實作自己的業務邏輯。
在本文示例中,将展示如何基于 Spring Boot 提供的依賴注入機制,為應用程式接入執行個體化和已配置的 Apache Pulsar 來生産與消費消息。此外,我還會通過使用 AMQP、Kafka 和 MQTT 發送和接收消息來展示 Apache Pulsar 與其他消息傳遞協定內建的靈活性。
最後,本文将淺析 Reactive Pulsar。強大的響應式架構 Reactive 是想建構 Spring 響應式應用的開發者們的不二之選。
基于 Spring 和 Pulsar 建構空氣品質應用
下圖展示的是該應用的架構圖。如圖所示,Apache Pulsar 是該架構圖的核心。Pulsar 在其中充當了路由、網關、消息總線和資料分發通道的角色。

選擇 Apache Pulsar 的核心原因之一是它具備極強的可擴充性,Pulsar 用戶端和應用均支援無限擴充,解決了消息存儲與分發在擴充性方面的難題。基于該特性,我們無需做額外的複制便能夠複用資料。該特性對很多應用場景非常友好,包括基于 Spark 做的 ETL 任務和基于 Flink 做的實時持續 SQL 流分析等。Pulsar 還為 Spring 微服務無縫支援其他語言編寫的服務,包括 Go、Python、C#、C++ 和 Node.JS 等。
點選下圖檢視示例應用示範視訊。
有了 Spring Boot Starter 提供的腳手架,我們可以向 Maven build pom 檔案中添加一些依賴,或選擇用 Gradle。
首先,配置好 Pulsar 相關版本依賴。本文的示例選擇 Pulsar 2.10.0[3] 版本及 JDK 11。開發者當下不應堅持 Java 8 版本,因為在不久的未來,JDK 17 将會是官方推薦的标準版本。
11
2.10.0
複制
接着,導入 Pulsar 用戶端依賴。
org.apache.pulsar
pulsar-client
${pulsar.version}
org.apache.pulsar
pulsar-client-admin
${pulsar.version}
org.apache.pulsar
pulsar-client-original
${pulsar.version}
pom
複制
通過以下指令來編譯打包:
mvn package
複制
輸入以下指令,運作應用程式:
mvn spring-boot:run
複制
在配置檔案中(application.resources)填充必要值相關配置,以連接配接到叢集,讀取應用資料。該檔案一般放在
src/main/resources
目錄下。
airnowapi.url=${AIRPORTNOWAPIURL}
topic.name=persistent://public/default/airquality
producer.name=airquality
send.timeout=60
security.mode=off
pulsar.service.url=pulsar://pulsar1:6650
#security.mode=on
#pulsar.service.url=pulsar+ssl://demo.sndemo.snio.cloud:6651
pulsar.oauth2.audience=urn:sn:pulsar:sndemo:demo-cluster
pulsar.oauth2.credentials-url=file:///cr/sndemo-tspann.json
pulsar.oauth2.issuer-url=https://auth.streamnative.cloud/
server.port=8999
#kafka
kafka.bootstrapAddress=pulsar1:9092
kafka.topic.name=airqualitykafka
#mqtt
mqtt.automaticReconnect=true
mqtt.cleanSession=true
mqtt.connectionTimeout=60
mqtt.clientId=airquality-MQTT
mqtt.hostname=pulsar1
mqtt.port=1883
mqtt.topic=airqualitymqtt
#amqp/rabbitmq
amqp.server=pulsar1:5672
amqp.topic=amqp-airquality
複制
如上所示,配置項
security.mode
和
pulsar
.service.url
被注釋掉了。這麼配置的原因是,我可以靈活地在 StreamNative 托管的雲生産環境和本地的開發環境之間切換。同時,我們也可以采用自動化流程或使用環境變量來更好地滿足生産環境的需求。
airnowapi.url
這個變量配置的是用于通路 Air Now REST 資料流的專用令牌,建議配置到環境變量中。如果你也想使用該資料流,請先注冊[4]。
我們現在開始建構應用。第一步,配置連接配接,連接配接上 Apache Pulsar 叢集。
第二步,我們來建立一個 Spring 配置類,來初始化 Pulsar 用戶端。在配置類中,通過 @Value 注解來注入
application.properties
中相關的配置項。
@Configuration
public class PulsarConfig {
@Value("${pulsar.service.url}")
String pulsarUrl;
@Value("${security.mode:off}")
String securityMode;
@Value("${pulsar.oauth2.audience}")
String audience;
@Value("${pulsar.oauth2.credentials-url}")
String credentialsUrl;
@Value("${pulsar.oauth2.issuer-url}")
String issuerUrl;
@Bean
public org.apache.pulsar.client.api.PulsarClient pulsarClient() {
PulsarClient client = null;
if (securityMode.equalsIgnoreCase(OFF)) {
try {
client = PulsarClient.builder().serviceUrl(pulsarUrl).build();
} catch (PulsarClientException e) {
e.printStackTrace();
client = null;
}
} else {
try {
try {
client = PulsarClient.builder()
.serviceUrl(pulsarUrl)
.authentication( AuthenticationFactoryOAuth2.clientCredentials(new URL(issuerUrl),
new URL(credentialsUrl),audience)).build();
} catch (MalformedURLException e) {
e.printStackTrace();
}
} catch (PulsarClientException e) {
e.printStackTrace();
client = null;
}
}
return client;
}
}
複制
第三步,在服務中配置生産者,代碼如下:
@Configuration
public class PulsarProducerConfig {
@Value("${producer.name:producername}")
String producerName;
@Value("${topic.name:airquality}")
String topicName;
@Autowired
PulsarClient pulsarClient;
@Bean
public Producer getProducer() {
ProducerBuilder producerBuilder = pulsarClient.newProducer(JSONSchema.of(Observation.class))
.topic(topicName)
.producerName(producerName).sendTimeout(60, TimeUnit.SECONDS);
Producer producer = null;
try {
producer = producerBuilder.create();
} catch (PulsarClientException e1) {
e1.printStackTrace();
}
return producer;
}
}
複制
在上述的配置類代碼中,我們要建構一個 Pulsar 的生産者,該生産者會使用 Observation 類中的 JSON Schema。該 Observation 類中引入了 FasterXML Jackson 相關注解,但該類實際上就是一個 Java bean,其中記錄的是 REST 資料流提供的測量日期、測量時間、狀态碼、經緯度等資訊。
生産者
我們添加上相關的業務邏輯代碼,随即對接消息平台,測試消息發送流程。完整的源代碼在此 Github 倉庫。
@Service
public class PulsarService {
@Autowired
PulsarClient pulsarClient;
@Autowired
ProducerObservation> producer;
public MessageId sendObservation(Observation observation) {
if (observation == null) {
return null;
}
UUID uuidKey = UUID.randomUUID();
MessageId msgID = null;
try {
msgID = producer.newMessage()
.key(uuidKey.toString())
.value(observation)
.send();
} catch (PulsarClientException e) {
e.printStackTrace();
}
return msgID;
}
}
複制
消費者
消息發送完畢之後,我們可以通過 Spring 讀取消息。在本節中,我們會建構一套消費程式測試消費資料。如果要填充一些業務邏輯、做消息路由、将消息轉換到一至多個主題中,建議通過 Pulsar Function 來實作(可通過 Java、Python 或 Go 編寫),而非 Spring Boot 微服務。我在這裡提供了兩種實作。Pulsar Spring Boot 消費者的源碼在可從此 GitHub 倉庫[5]中擷取。
如果通過 Java Pulsar Function 來處理空氣品質資料,可以參考此 GitHub 倉庫[6]中的代碼。如以下架構圖所示,各 Function、微服務、Spark 和 Flink 任務均可作為整個架構中的組成部分,協調處理實時流資料。
我們可以複用生産者中的配置類來連接配接叢集。此外,我們還需要一套消費者的配置代碼,該類需要在
application.properties
檔案中配置消費者名稱、訂閱名稱、主題名稱并注入。在示例代碼中,我們配置的訂閱類型是 Shared(共享訂閱),消費起始點是 Earliest。此外,我們還引入了在 Pulsar 生産者中使用的 Observation 來解析 JSON 資料。
@Configuration
public class PulsarConsumerConfig {
@Autowired
PulsarClient pulsarClient;
@Value("${consumer.name:consumerName}")
String consumerName;
@Value("${topic.name:airquality}")
String topicName;
@Value("${subscription.name:airqualitysubscription}")
String subscriptionName;
@Bean
public Consumer getConsumer() {
Consumer pulsarConsumer = null;
ConsumerBuilder consumerBuilder =
pulsarClient.newConsumer(JSONSchema.of(Observation.class))
.topic(topicName)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.consumerName(consumerName);
try {
pulsarConsumer = consumerBuilder.subscribe();
} catch (PulsarClientException e) {
e.printStackTrace();
}
return pulsarConsumer;
}
}
複制
顯然,運作消費者程式非常容易。在接收到消息事件之後,進行轉換得到普通 Java 對象(Plain Old Java Object,即 POJO),我們可以對資料做任意處理,包括将 Spring 庫持久化到資料庫、發送到 REST 服務中或存儲到檔案等。
結語
本文中,我們探索了通過多種消息協定來與 Apache Pulsar 叢集通信。由于文章篇幅有限,我們并沒有測試全部 Apache Pulsar 支援的消息協定。事實上,我們還可以通過 RocketMQ、WebSocket 來與 Pulsar 叢集互動,甚至還可以通過 JDBC 連接配接到 Pulsar SQL 層(Presto SQL)。
如果你對建構高速的響應式應用感興趣,推薦試試 Reactive Pulsar 庫。Reactive Pulsar 是一款快速高效的庫,需要另外一篇單獨的文章去介紹,可以點選此連結[7]了解更多資訊。
本文列舉了基于 Apache Pulsar 和 Spring 來建構應用的要點,包括關鍵步驟、實戰細節等。Pulsar Java 用戶端作為 Apache Pulsar 項目中的一等公民,文章借助執行個體展示了它的強大功能和靈活性。落地實戰起來,打造自己的 Pulsar 應用吧!
相關資源
- • 源碼|空氣品質示例代碼[8]
- • 源碼|Pulsar 提供的空氣品質函數[9]
- • 源碼|空氣指令消費者[10]
- • 源碼|FLiPN 空氣品質校驗[11]
- • 源碼|FLiPN 空氣品質 REST[12]
- • GitHub|Spring Boot Starter for Apache Pulsar[13]
- • Github|Reactive Pulsar Adapter[14]
- • 文檔|Pulsar Java 用戶端[15]
引用連結
[1]
jjnnzb: https://github.com/jjnnzb
[2]
起步連結: https://start.spring.io/
[3]
Pulsar 2.10.0: https://pulsar.apache.org/en/release-notes/
[4]
注冊: https://docs.airnowapi.org/
[5]
GitHub 倉庫: https://github.com/tspannhw/airquality
[6]
GitHub 倉庫: https://github.com/tspannhw/pulsar-airquality-function
[7]
連結: https://github.com/lhotari/reactive-iot-backend-ApacheCon2021
[8]
源碼|空氣品質示例代碼: https://github.com/tspannhw/airquality
[9]
源碼|Pulsar 提供的空氣品質函數: https://github.com/tspannhw/pulsar-airquality-function
[10]
源碼|空氣指令消費者: https://github.com/tspannhw/airquality-consumer
[11]
源碼|FLiPN 空氣品質校驗: https://github.com/tspannhw/FLiPN-AirQuality-Checks
[12]
源碼|FLiPN 空氣品質 REST: https://github.com/tspannhw/FLiPN-AirQuality-REST
[13]
GitHub|Spring Boot Starter for Apache Pulsar: https://github.com/majusko/pulsar-java-spring-boot-starter
[14]
Github|Reactive Pulsar Adapter: https://github.com/datastax/reactive-pulsar
[15]
文檔|Pulsar Java 用戶端: https://pulsar.apache.org/docs/en/client-libraries-java/
[16]
如何開發Pulsar Function?: https://pulsar.apache.org/docs/en/functions-develop/