1、緣由(需求):因為測試時不小心,寫了一大堆topics,看監測時一頓揪心。
2、解決方案:kafka是被zookeeper監管的,topics是在zookeeper上的(描述可能不是很準确),我發現kakfa原生api好像沒有批量删除topics的方式,我登陸到zookeeper後,在zookeeper的目錄下發現了topics。于是寫了個遞歸程式将其删除。以下是代碼:
public class App {
static String path = "/brokers";
static ZKConnection zkc = null;
public static void main(String[] args) throws Exception {
zkc = new ZKConnection();
zkc.connect();
get(zkc.zk.getChildren(path, true), path);
}
public static void get(List<String> paths, String path) throws Exception {
for (String p : paths) {
p = path + "/" + p;
if (null == zkc.zk.exists(p, true)) {
System.out.println(p);
//zkc.zk.delete(p, 0);
} else {
get(zkc.zk.getChildren(p, true), p);
int i =zkc.zk.exists(p,true).getVersion();
zkc.zk.delete(p,i);
}
System.out.println(p);
}
}
}
class ZKConnection {
/**
* server清單, 以逗号分割
*/
protected String hosts = "server1:2181,server2:2181,server3:2181";
/**
* 連接配接的逾時時間, 毫秒
*/
private static final int SESSION_TIMEOUT = 5000;
private CountDownLatch connectedSignal = new CountDownLatch(1);
public ZooKeeper zk;
/**
* 連接配接zookeeper server
*/
public void connect() throws Exception {
zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new ConnWatcher());
// 等待連接配接完成
connectedSignal.await();
}
public class ConnWatcher implements Watcher {
public void process(WatchedEvent event) {
// 連接配接建立, 回調process接口時, 其event.getState()為KeeperState.SyncConnected
if (event.getState() == KeeperState.SyncConnected) {
// 放開閘門, wait在connect方法上的線程将被喚醒
connectedSignal.countDown();
}
}
}
注:連接配接代碼是網上得的
3.重新開機kafka,完畢。