天天看點

kafka清空topics

1、緣由(需求):因為測試時不小心,寫了一大堆topics,看監測時一頓揪心。

2、解決方案:kafka是被zookeeper監管的,topics是在zookeeper上的(描述可能不是很準确),我發現kakfa原生api好像沒有批量删除topics的方式,我登陸到zookeeper後,在zookeeper的目錄下發現了topics。于是寫了個遞歸程式将其删除。以下是代碼:

public class App {
	static String path = "/brokers";
	static ZKConnection zkc = null;

	public static void main(String[] args) throws Exception {
		zkc = new ZKConnection();
		zkc.connect();
		get(zkc.zk.getChildren(path, true), path);

	}

	public static void get(List<String> paths, String path) throws Exception {
		for (String p : paths) {
			p = path + "/" + p;
			if (null == zkc.zk.exists(p, true)) {
				System.out.println(p);
				//zkc.zk.delete(p, 0);
			} else {
				get(zkc.zk.getChildren(p, true), p);
				int i =zkc.zk.exists(p,true).getVersion();
				zkc.zk.delete(p,i);
			}

			System.out.println(p);
		}
	}
}

class ZKConnection {
	/**
	 * server清單, 以逗号分割
	 */
	protected String hosts = "server1:2181,server2:2181,server3:2181";
	/**
	 * 連接配接的逾時時間, 毫秒
	 */
	private static final int SESSION_TIMEOUT = 5000;
	private CountDownLatch connectedSignal = new CountDownLatch(1);
	public ZooKeeper zk;

	/**
	 * 連接配接zookeeper server
	 */
	public void connect() throws Exception {
		zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new ConnWatcher());
		// 等待連接配接完成
		connectedSignal.await();
	}

	public class ConnWatcher implements Watcher {
		public void process(WatchedEvent event) {
			// 連接配接建立, 回調process接口時, 其event.getState()為KeeperState.SyncConnected
			if (event.getState() == KeeperState.SyncConnected) {
				// 放開閘門, wait在connect方法上的線程将被喚醒
				connectedSignal.countDown();
			}
		}
	}
           

注:連接配接代碼是網上得的

3.重新開機kafka,完畢。