采用maven 管理的 增加依賴項
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.3</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.3.0</version>
</dependency>
</dependencies>
util代碼
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
public class kafkaShell {
//設定消費者根目錄
private static String _zkconsumers="/consumers";
//存儲消費者組 按應用設定變量
private static List<String> grouplist=null;
//設定curator 用戶端參數 具體參數 操作參考 http://macrochen.iteye.com/blog/1366136
static RetryPolicy retryPolicy=new RetryNTimes(3, 1000);
static CuratorFramework client=CuratorFrameworkFactory.newClient("192.168.50.129:2181,192.168.50.169:2181", 10000, 10000, retryPolicy);
public static String ec(String command) throws InterruptedException {
String returnString = "";
Process pro = null;
Runtime runTime = Runtime.getRuntime();
if (runTime == null) {
System.err.println("Create runtime false!");
}
try {
pro = runTime.exec(command);
BufferedReader input = new BufferedReader(new InputStreamReader(
pro.getInputStream()));
PrintWriter output = new PrintWriter(new OutputStreamWriter(
pro.getOutputStream()));
String line;
while ((line = input.readLine()) != null) {
// System.out.println(line);
returnString = returnString + line + "\n";
}
input.close();
output.close();
pro.destroy();
} catch (IOException ex) {
Logger.getLogger(kafkaShell.class.getName()).log(Level.SEVERE, null, ex);
}
return returnString;
}
public static void main(String[] args) {
}
public void PrintMessage(){
try {
client.start();
//擷取目前所有消費者組
grouplist=client.getChildren().forPath(_zkconsumers);
if(grouplist!=null&&grouplist.size()>0){
for (String groupstr : grouplist) {
//列印消費者組
System.err.println("group---------->"+groupstr);
//擷取消費者組下的topic 的 offsets集合
List<String> listtopics=client.getChildren().forPath(_zkconsumers+"/"+groupstr+"/offsets");
if(listtopics!=null&&listtopics.size()>0){
//周遊集合 擷取分區offset值
for (String topicstr : listtopics) {
System.err.println("topicpath:"+_zkconsumers+"/"+groupstr+"/offsets/"+topicstr);
List<String> listpartition=client.getChildren().forPath(_zkconsumers+"/"+groupstr+"/offsets/"+topicstr);
for (String partition : listpartition) {
String partitionpath=_zkconsumers+"/"+groupstr+"/offsets/"+topicstr+"/"+partition;
System.err.println("partitionpath:"+partitionpath);
byte[] newoffset= client.getData().forPath(partitionpath);
System.err.println(partitionpath+":offset:--------------------------->"+new String(newoffset,"UTF-8"));
}
}
}
}
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void UpdateOffsetValue(String partitionpath,String offsetvalue){
try {
client.start();
byte[] newoffset= client.getData().forPath(partitionpath);
System.err.println(partitionpath+":oldoffset:--------------------------->"+new String(newoffset,"UTF-8"));
client.setData().forPath(partitionpath, offsetvalue.getBytes());
newoffset= client.getData().forPath(partitionpath);
System.err.println(partitionpath+":newoffset:--------------------------->"+new String(newoffset,"UTF-8"));
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
client.close();
}
}
public static void PrintIds(CuratorFramework client,String path){
try {
List<String> consumerids=client.getChildren().forPath(path);
for (String consumerid : consumerids) {
System.err.println(path+"=consumerid---------------------------------->"+consumerid);
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}