天天看點

使用JMX監控Kafka使用JMX監控Kafka 開啟JMX端口 通過Jconsole測試時候可以連接配接 通過JavaAPI來通路 其他工具

http://blog.csdn.net/eric_sunah/article/details/44980385?utm_source=tuicool

使用JMX監控Kafka使用JMX監控Kafka 開啟JMX端口 通過Jconsole測試時候可以連接配接 通過JavaAPI來通路 其他工具

 分類:

kafka(8) 

使用JMX監控Kafka使用JMX監控Kafka 開啟JMX端口 通過Jconsole測試時候可以連接配接 通過JavaAPI來通路 其他工具

版權聲明:本文為部落客原創文章,未經部落客允許不得轉載。

kafka可以配置使用jmx進行運作狀态的監控,既可以通過jdk自帶jconsole來觀察結果,也可以通過java api的方式來.

修改bin/kafka-server-start.sh,添加jmx_port參數,添加後樣子如下

if [ "x$kafka_heap_opts" = "x" ]; then  

    export kafka_heap_opts="-xmx1g -xms1g"  

    export jmx_port="9999"  

fi  

使用JMX監控Kafka使用JMX監控Kafka 開啟JMX端口 通過Jconsole測試時候可以連接配接 通過JavaAPI來通路 其他工具
使用JMX監控Kafka使用JMX監控Kafka 開啟JMX端口 通過Jconsole測試時候可以連接配接 通過JavaAPI來通路 其他工具

通過以下方法擷取目标值

使用JMX監控Kafka使用JMX監控Kafka 開啟JMX端口 通過Jconsole測試時候可以連接配接 通過JavaAPI來通路 其他工具

public class kafkadataprovider{  

    protected final logger logger = loggerfactory.getlogger(getclass());  

    private static final string message_in_per_sec = "kafka.server:type=brokertopicmetrics,name=messagesinpersec";  

    private static final string bytes_in_per_sec = "kafka.server:type=brokertopicmetrics,name=bytesinpersec";  

    private static final string bytes_out_per_sec = "kafka.server:type=brokertopicmetrics,name=bytesoutpersec";  

    private static final string produce_request_per_sec = "kafka.network:type=requestmetrics,name=requestspersec,request=produce";  

    private static final string consumer_request_per_sec = "kafka.network:type=requestmetrics,name=requestspersec,request=fetchconsumer";  

    private static final string flower_request_per_sec = "kafka.network:type=requestmetrics,name=requestspersec,request=fetchfollower";  

    private static final string active_controller_count = "kafka.controller:type=kafkacontroller,name=activecontrollercount";  

    private static final string part_count = "kafka.server:type=replicamanager,name=partitioncount";  

    public string extractmonitordata() {  

        //todo 通過調用api獲得ip以及參數  

        kafkaroleinfo monitordatapoint = new kafkaroleinfo();  

        string jmxurl = "service:jmx:rmi:///jndi/rmi://192.168.40.242:9999/jmxrmi";  

        try {  

            mbeanserverconnection jmxconnection = metricdatautils.getmbeanserverconnection(jmxurl);  

            objectname messagecountobj = new objectname(message_in_per_sec);  

            objectname bytesinpersecobj = new objectname(bytes_in_per_sec);  

            objectname bytesoutpersecobj = new objectname(bytes_out_per_sec);  

            objectname producerequestspersecobj = new objectname(produce_request_per_sec);  

            objectname consumerrequestspersecobj = new objectname(consumer_request_per_sec);  

            objectname flowerrequestspersecobj = new objectname(flower_request_per_sec);  

            objectname activecontrollercountobj = new objectname(active_controller_count);  

            objectname partcountobj = new objectname(part_count);  

            long messagesinpersec = (long) jmxconnection.getattribute(messagecountobj, "count");  

            long bytesinpersec = (long) jmxconnection.getattribute(bytesinpersecobj, "count");  

            long bytesoutpersec = (long) jmxconnection.getattribute(bytesoutpersecobj, "count");  

            long producerequestcountpersec = (long) jmxconnection.getattribute(producerequestspersecobj, "count");  

            long consumerrequestcountpersec = (long) jmxconnection.getattribute(consumerrequestspersecobj, "count");  

            long flowerrequestcountpersec = (long) jmxconnection.getattribute(flowerrequestspersecobj, "count");  

            integer activecontrollercount = (integer) jmxconnection.getattribute(activecontrollercountobj, "value");  

            integer partcount = (integer) jmxconnection.getattribute(partcountobj, "value");  

            monitordatapoint.setmessagesinpersec(messagesinpersec);  

            monitordatapoint.setbytesinpersec(bytesinpersec);  

            monitordatapoint.setbytesoutpersec(bytesoutpersec);  

            monitordatapoint.setproducerequestcountpersec(producerequestcountpersec);  

            monitordatapoint.setconsumerrequestcountpersec(consumerrequestcountpersec);  

            monitordatapoint.setflowerrequestcountpersec(flowerrequestcountpersec);  

            monitordatapoint.setactivecontrollercount(activecontrollercount);  

            monitordatapoint.setpartcount(partcount);  

        } catch (ioexception e) {  

            e.printstacktrace();  

        } catch (malformedobjectnameexception e) {  

        } catch (attributenotfoundexception e) {  

        } catch (mbeanexception e) {  

        } catch (reflectionexception e) {  

        } catch (instancenotfoundexception e) {  

        }  

        return monitordatapoint.tostring();  

    }  

    public static void main(string[] args) {  

        system.out.println(new kafkadataprovider().extractmonitordata());  

    /** 

     * 獲得mbeanserver 的連接配接 

     * 

     * @param jmxurl 

     * @return 

     * @throws ioexception 

     */  

    public mbeanserverconnection getmbeanserverconnection(string jmxurl) throws ioexception {  

        jmxserviceurl url = new jmxserviceurl(jmxurl);  

        jmxconnector jmxc = jmxconnectorfactory.connect(url, null);  

        mbeanserverconnection mbsc = jmxc.getmbeanserverconnection();  

        return mbsc;  

}  

除了自己編寫定制化的監控程式外

https://github.com/claudemamo/kafka-web-console

部署sbt:

http://www.scala-sbt.org/0.13/tutorial/manual-installation.html

http://www.scala-sbt.org/release/tutorial/zh-cn/installing-sbt-on-linux.html

https://github.com/quantifind/kafkaoffsetmonitor/releases/tag/v0.2.0

java -cp kafkaoffsetmonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.offsetgetterweb --zk localhost:12181 --port 8080 --refresh 5.minutes --retain 1.day