天天看點

pulsar建立topic

因為之前寫了一個遊标復原的部落格,牽扯到分區topic與非分區topic的差別,樓主總結了一些經驗,有不對的地方歡迎指出

** 1. 分區topic的建立**

private static PulsarAdmin admin = null;

public void createTopic(){

topicName = “persistent://zhiwang3/whds9/admin3”;

numPartitions = 2;

try {

System.out.println( "開始建立topic : " + topicName );

admin.topics().createPartitionedTopic(topicName,numPartitions);

System.out.println( "topic建立成功 : " + topicName );

} catch (PulsarAdminException e) {

System.out.println( "topic已存在 : " + topicName );

}

// 設定權限

try {

String role1 = “*.role”;

Set actions1 = Sets.newHashSet(AuthAction.produce, AuthAction.consume);

admin.topics().grantPermission( topicName , role1, actions1);

System.out.println( “設定使用者權限成功” );

} catch (PulsarAdminException e) {

}

}

現在的api隻提供了分區topic的建立,但是功能支援還是非分區topic支援的更多。靜等pulsar更加完善。

** 2. 分區topic的建立**

在建立producer和consumer的時候系統會預設建立topic,此時建立的就是非分區topic,例如系統中原先并沒有topic persistent://zhiwang3/whds7/3,此時直接調用建立producer,就會建立非分區topicpersistent://zhiwang3/whds7/3。

private static Producer<byte[]> producer = null;

private static PulsarClient client = null;

public void createProcduce(){

try {

System.out.println(“begin create produce”);

// 構造Pulsar Client

this.client = PulsarClient.builder()

.serviceUrl(judgeValue(“serviceURL”))

.enableTcpNoDelay(true)

.build();

// 構造生産者

this.producer = client.newProducer(Schema.BYTES)

.producerName(judgeValue(“producerName”))

.topic(“persistent://zhiwang3/whds7/3”)

.create();

System.out.println(“create produce success”);

} catch (PulsarClientException e) {

e.printStackTrace();

}

}

還一種辦法是,調用admin建立分區topic,admin.topics().createPartitionedTopic(topicName,numPartitions);

此時分區topic是persistent://zhiwang3/whds9/admin3。但是直接調用此topic來進行生産,會發現非分區topic多了兩個。numPartitions=2;persistent://zhiwang3/whds9/admin3-partition-0和persistent://zhiwang3/whds9/admin3-partition-1。我們可以直接使用這兩個非分區topic進行生産和消費。但是删除的時候還是要按照分區topic來進行删除。

繼續閱讀