http://blog.csdn.net/eric_sunah/article/details/44980385?utm_source=tuicool
分類:
kafka(8)
版權聲明:本文為部落客原創文章,未經部落客允許不得轉載。
kafka可以配置使用jmx進行運作狀态的監控,既可以通過jdk自帶jconsole來觀察結果,也可以通過java api的方式來.
修改bin/kafka-server-start.sh,添加jmx_port參數,添加後樣子如下
if [ "x$kafka_heap_opts" = "x" ]; then
export kafka_heap_opts="-xmx1g -xms1g"
export jmx_port="9999"
fi
通過以下方法擷取目标值
public class kafkadataprovider{
protected final logger logger = loggerfactory.getlogger(getclass());
private static final string message_in_per_sec = "kafka.server:type=brokertopicmetrics,name=messagesinpersec";
private static final string bytes_in_per_sec = "kafka.server:type=brokertopicmetrics,name=bytesinpersec";
private static final string bytes_out_per_sec = "kafka.server:type=brokertopicmetrics,name=bytesoutpersec";
private static final string produce_request_per_sec = "kafka.network:type=requestmetrics,name=requestspersec,request=produce";
private static final string consumer_request_per_sec = "kafka.network:type=requestmetrics,name=requestspersec,request=fetchconsumer";
private static final string flower_request_per_sec = "kafka.network:type=requestmetrics,name=requestspersec,request=fetchfollower";
private static final string active_controller_count = "kafka.controller:type=kafkacontroller,name=activecontrollercount";
private static final string part_count = "kafka.server:type=replicamanager,name=partitioncount";
public string extractmonitordata() {
//todo 通過調用api獲得ip以及參數
kafkaroleinfo monitordatapoint = new kafkaroleinfo();
string jmxurl = "service:jmx:rmi:///jndi/rmi://192.168.40.242:9999/jmxrmi";
try {
mbeanserverconnection jmxconnection = metricdatautils.getmbeanserverconnection(jmxurl);
objectname messagecountobj = new objectname(message_in_per_sec);
objectname bytesinpersecobj = new objectname(bytes_in_per_sec);
objectname bytesoutpersecobj = new objectname(bytes_out_per_sec);
objectname producerequestspersecobj = new objectname(produce_request_per_sec);
objectname consumerrequestspersecobj = new objectname(consumer_request_per_sec);
objectname flowerrequestspersecobj = new objectname(flower_request_per_sec);
objectname activecontrollercountobj = new objectname(active_controller_count);
objectname partcountobj = new objectname(part_count);
long messagesinpersec = (long) jmxconnection.getattribute(messagecountobj, "count");
long bytesinpersec = (long) jmxconnection.getattribute(bytesinpersecobj, "count");
long bytesoutpersec = (long) jmxconnection.getattribute(bytesoutpersecobj, "count");
long producerequestcountpersec = (long) jmxconnection.getattribute(producerequestspersecobj, "count");
long consumerrequestcountpersec = (long) jmxconnection.getattribute(consumerrequestspersecobj, "count");
long flowerrequestcountpersec = (long) jmxconnection.getattribute(flowerrequestspersecobj, "count");
integer activecontrollercount = (integer) jmxconnection.getattribute(activecontrollercountobj, "value");
integer partcount = (integer) jmxconnection.getattribute(partcountobj, "value");
monitordatapoint.setmessagesinpersec(messagesinpersec);
monitordatapoint.setbytesinpersec(bytesinpersec);
monitordatapoint.setbytesoutpersec(bytesoutpersec);
monitordatapoint.setproducerequestcountpersec(producerequestcountpersec);
monitordatapoint.setconsumerrequestcountpersec(consumerrequestcountpersec);
monitordatapoint.setflowerrequestcountpersec(flowerrequestcountpersec);
monitordatapoint.setactivecontrollercount(activecontrollercount);
monitordatapoint.setpartcount(partcount);
} catch (ioexception e) {
e.printstacktrace();
} catch (malformedobjectnameexception e) {
} catch (attributenotfoundexception e) {
} catch (mbeanexception e) {
} catch (reflectionexception e) {
} catch (instancenotfoundexception e) {
}
return monitordatapoint.tostring();
}
public static void main(string[] args) {
system.out.println(new kafkadataprovider().extractmonitordata());
/**
* 獲得mbeanserver 的連接配接
*
* @param jmxurl
* @return
* @throws ioexception
*/
public mbeanserverconnection getmbeanserverconnection(string jmxurl) throws ioexception {
jmxserviceurl url = new jmxserviceurl(jmxurl);
jmxconnector jmxc = jmxconnectorfactory.connect(url, null);
mbeanserverconnection mbsc = jmxc.getmbeanserverconnection();
return mbsc;
}
除了自己編寫定制化的監控程式外
https://github.com/claudemamo/kafka-web-console
部署sbt:
http://www.scala-sbt.org/0.13/tutorial/manual-installation.html
http://www.scala-sbt.org/release/tutorial/zh-cn/installing-sbt-on-linux.html
https://github.com/quantifind/kafkaoffsetmonitor/releases/tag/v0.2.0
java -cp kafkaoffsetmonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.offsetgetterweb --zk localhost:12181 --port 8080 --refresh 5.minutes --retain 1.day