天天看點

curator 操作zookeeper 擷取kafka 資訊 util

采用maven 管理的 增加依賴項

<dependency>

      <groupId>org.scala-lang</groupId>

      <artifactId>scala-library</artifactId>

      <version>2.9.2</version>

    </dependency>

    <dependency>

      <groupId>com.yammer.metrics</groupId>

      <artifactId>metrics-core</artifactId>

      <version>2.2.0</version>

    </dependency>

    <dependency>

      <groupId>com.101tec</groupId>

      <artifactId>zkclient</artifactId>

      <version>0.3</version>

    </dependency>

    <dependency>  

    <groupId>org.apache.curator</groupId>

    <artifactId>curator-recipes</artifactId>

    <version>2.3.0</version>

    </dependency>

</dependencies>

util代碼 

import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStreamReader;

import java.io.OutputStreamWriter;

import java.io.PrintWriter;

import java.util.List;

import java.util.logging.Level;

import java.util.logging.Logger;

import org.apache.curator.RetryPolicy;

import org.apache.curator.framework.CuratorFramework;

import org.apache.curator.framework.CuratorFrameworkFactory;

import org.apache.curator.retry.RetryNTimes;

public class kafkaShell {

//設定消費者根目錄

private static  String _zkconsumers="/consumers";

//存儲消費者組  按應用設定變量

private static List<String> grouplist=null;

//設定curator 用戶端參數 具體參數 操作參考  http://macrochen.iteye.com/blog/1366136

static RetryPolicy retryPolicy=new RetryNTimes(3, 1000);

static CuratorFramework client=CuratorFrameworkFactory.newClient("192.168.50.129:2181,192.168.50.169:2181", 10000, 10000, retryPolicy);

public static String ec(String command) throws InterruptedException {

String returnString = "";

Process pro = null;

Runtime runTime = Runtime.getRuntime();

if (runTime == null) {

System.err.println("Create runtime false!");

}

try {

pro = runTime.exec(command);

BufferedReader input = new BufferedReader(new InputStreamReader(

pro.getInputStream()));

PrintWriter output = new PrintWriter(new OutputStreamWriter(

pro.getOutputStream()));

String line;

while ((line = input.readLine()) != null) {

// System.out.println(line);

returnString = returnString + line + "\n";

}

input.close();

output.close();

pro.destroy();

} catch (IOException ex) {

Logger.getLogger(kafkaShell.class.getName()).log(Level.SEVERE, null, ex);

}

return returnString;

}

public static void main(String[] args) {

}

public void PrintMessage(){

try {

client.start();

//擷取目前所有消費者組

grouplist=client.getChildren().forPath(_zkconsumers);

if(grouplist!=null&&grouplist.size()>0){

for (String groupstr : grouplist) {

//列印消費者組

System.err.println("group---------->"+groupstr);

//擷取消費者組下的topic 的 offsets集合

List<String> listtopics=client.getChildren().forPath(_zkconsumers+"/"+groupstr+"/offsets");

if(listtopics!=null&&listtopics.size()>0){

//周遊集合 擷取分區offset值

for (String topicstr : listtopics) {

System.err.println("topicpath:"+_zkconsumers+"/"+groupstr+"/offsets/"+topicstr);

List<String> listpartition=client.getChildren().forPath(_zkconsumers+"/"+groupstr+"/offsets/"+topicstr);

for (String partition : listpartition) {

String partitionpath=_zkconsumers+"/"+groupstr+"/offsets/"+topicstr+"/"+partition;

System.err.println("partitionpath:"+partitionpath);

byte[]  newoffset= client.getData().forPath(partitionpath);

System.err.println(partitionpath+":offset:--------------------------->"+new String(newoffset,"UTF-8"));

}

}

}

}

}

} catch (Exception e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

public  void UpdateOffsetValue(String partitionpath,String offsetvalue){

try {

client.start();

byte[]  newoffset= client.getData().forPath(partitionpath);

System.err.println(partitionpath+":oldoffset:--------------------------->"+new String(newoffset,"UTF-8"));

client.setData().forPath(partitionpath, offsetvalue.getBytes());

 newoffset= client.getData().forPath(partitionpath);

System.err.println(partitionpath+":newoffset:--------------------------->"+new String(newoffset,"UTF-8"));

} catch (Exception e) {

// TODO Auto-generated catch block

e.printStackTrace();

}finally{

client.close();

}

}

public static void PrintIds(CuratorFramework client,String path){

try {

List<String> consumerids=client.getChildren().forPath(path);

for (String consumerid : consumerids) {

System.err.println(path+"=consumerid---------------------------------->"+consumerid);

}

} catch (Exception e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}