天天看點

Kafka監控架構介紹

CMAK

CMAK(原Kafka Manager) 是雅虎公司于 2015 年開源的一個 Kafka 監控架構。這個架構用 Scala 語言開發而成,主要用于管理和監控 Kafka 叢集。github位址:

https://github.com/yahoo/CMAK

,安裝前提要求Java版本在11以上。

在其 Github 官網上下載下傳 tar.gz 包之後,我們執行解壓縮,可以得到CMAK目錄。

之後,我們需要運作 sbt 工具來編譯CMAK。sbt 是專門用于建構 Scala 項目的編譯建構工具,類似于我們熟知的 Maven 和 Gradle。CMAK 自帶了 sbt 指令,我們直接運作它建構項目就可以了:

./sbt clean dist      

建構完成後,到target/universal 目錄下找到生成的 zip 檔案,把它解壓,然後修改裡面的 conf/application.conf 檔案中的 kafka-manager.zkhosts 項,讓它指向你環境中的 ZooKeeper 位址,比如:

cmak.zkhosts="kafka1:2181,kafka2:2181,kafka3:2181"      

之後進入target/universal/cmak-3.0.0.5/bin目錄,運作以下指令啟動 CMAK:

bin/cmak      

然後通路http://ip:9000就可以通路CMAK了。

點選Add Cluster,填寫Zookeeper的位址;選擇Kafka版本;勾選上 Enable JMX Polling,這樣你才能監控 Kafka 的各種 JMX 名額,其餘參數可以保持預設。

Kafka監控架構介紹

從這張圖中,我們可以發現,CMAK 清晰地列出了目前監控的 Kafka 叢集的主題數量、Broker 數量等資訊。你可以點選頂部菜單欄的各個條目去檢視或者設定具體功能。

Kafka監控架構介紹
Kafka監控架構介紹

JMX

JMX的全稱為Java Management Extensions。顧名思義,是管理Java的一種擴充。這種機制可以友善的管理、監控正在運作中的Java程式。常用于管理線程,記憶體,日志Level,服務重新開機,系統環境等。

Kafka系統預設是沒有開啟JMX端口的,設定該端口在$KAFKA_HOME/bin/kafka-server-start.sh腳本中,設定内容如下:

#vim bin/kafka-server-start.sh:
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
    # 添加下面這行,這裡的端口不一定非要設定成9999,端口隻要可用,均可。
    export JMX_PORT="9999" 
fi      

JConsole監控kafka

通過Jconsole連接配接:

Kafka監控架構介紹
Kafka監控架構介紹
Kafka監控架構介紹

Java API監控kafka

Kafka Mbean定義參考:

http://kafka.apache.org/documentation.html#monitoring
package monitor;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import java.io.IOException;
public class kafkaMonitor {
    //擷取的Kafka名額資料
    private static final String MESSAGE_IN_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec";
    private static final String BYTES_IN_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec";
    private static final String BYTES_OUT_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec";
    private static final String BYTES_REJECTED_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec";
    private static final String FAILED_FETCH_REQUESTS_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec";
    private static final String FAILED_PRODUCE_REQUESTS_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec";
    private static final String PRODUCE_REQUEST_PER_SEC = "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce,version=8";
    //JXM連接配接資訊
    private static final String JXM_URL = "service:jmx:rmi:///jndi/rmi://11.8.36.125:9999/jmxrmi";
    public void extractMonitorData() {
        try {
            MBeanServerConnection jmxConnection = this.getMBeanServerConnection(JXM_URL);
            ObjectName messageCountObj = new ObjectName(MESSAGE_IN_PER_SEC);
            ObjectName bytesInPerSecObj = new ObjectName(BYTES_IN_PER_SEC);
            ObjectName bytesOutPerSecObj = new ObjectName(BYTES_OUT_PER_SEC);
            ObjectName bytesRejectedPerSecObj = new ObjectName(BYTES_REJECTED_PER_SEC);
            ObjectName failedFetchRequestsPerSecObj = new ObjectName(FAILED_FETCH_REQUESTS_PER_SEC);
            ObjectName failedProduceRequestsPerSecObj = new ObjectName(FAILED_PRODUCE_REQUESTS_PER_SEC);
            ObjectName produceRequestPerSecObj = new ObjectName(PRODUCE_REQUEST_PER_SEC);
            printObjectNameDetails(messageCountObj, "Messages in /sec", jmxConnection);
            printObjectNameDetails(bytesInPerSecObj, "Bytes in /sec", jmxConnection);
            printObjectNameDetails(bytesOutPerSecObj, "Bytes out /sec", jmxConnection);
            printObjectNameDetails(bytesRejectedPerSecObj, "Bytes rejected /sec", jmxConnection);
            printObjectNameDetails(failedFetchRequestsPerSecObj, "Failed fetch request /sec", jmxConnection);
            printObjectNameDetails(failedProduceRequestsPerSecObj, "Failed produce request /sec", jmxConnection);
            printObjectNameDetails(produceRequestPerSecObj, "Produce request in /sec", jmxConnection);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) {
        new kafkaMonitor().extractMonitorData();
    }
    /**
     * 獲得 MBeanServer 的連接配接
     *
     * @param jmxUrl
     * @return
     * @throws IOException
     */
    private MBeanServerConnection getMBeanServerConnection(String jmxUrl) throws IOException {
        JMXServiceURL url = new JMXServiceURL(jmxUrl);
        JMXConnector jmxc = JMXConnectorFactory.connect(url, null);
        MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
        return mbsc;
    }
    /**
     * 列印 ObjectName 對象詳細資訊
     * @param objectName
     * @param printTitle
     * @param jmxConnection
     */
    private void printObjectNameDetails(ObjectName objectName, String printTitle, MBeanServerConnection jmxConnection) {
        try {
            System.out.println("----------"+ printTitle +"----------");
            System.out.println("TotalCount: " + (Long) jmxConnection.getAttribute(objectName, "Count"));
            System.out.println("MeanRate: " + String.format("%.2f", (double) jmxConnection.getAttribute(objectName, "MeanRate")));
            System.out.println("OneMinuteRate: " + String.format("%.2f", (double) jmxConnection.getAttribute(objectName, "OneMinuteRate")));
            System.out.println("FiveMinuteRate: " + String.format("%.2f", (double) jmxConnection.getAttribute(objectName, "FiveMinuteRate")));
            System.out.println("FifteenMinuteRate: " + String.format("%.2f", (double) jmxConnection.getAttribute(objectName, "FifteenMinuteRate")));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}      

運作程式,可以看到列印如下資訊:分别是消息接收情況統計,bytes接收統計,bytes輸出統計,bytes拒絕統計,失敗拉取請求統計,失敗生産消息統計,生産消息統計。

每一項統計均包括總量統計,平均速率,最近一分鐘速率,最近5分鐘速率,最近15分鐘速率。

----------Messages in /sec----------
TotalCount: 1701
MeanRate: 0.00
OneMinuteRate: 0.16
FiveMinuteRate: 0.05
FifteenMinuteRate: 0.02
----------Bytes in /sec----------
TotalCount: 176547
MeanRate: 0.07
OneMinuteRate: 12.90
FiveMinuteRate: 3.98
FifteenMinuteRate: 1.43
----------Bytes out /sec----------
TotalCount: 171300
MeanRate: 0.06
OneMinuteRate: 0.00
FiveMinuteRate: 0.00
FifteenMinuteRate: 0.00
----------Bytes rejected /sec----------
TotalCount: 0
MeanRate: 0.00
OneMinuteRate: 0.00
FiveMinuteRate: 0.00
FifteenMinuteRate: 0.00
----------Failed fetch request /sec----------
TotalCount: 0
MeanRate: 0.00
OneMinuteRate: 0.00
FiveMinuteRate: 0.00
FifteenMinuteRate: 0.00
----------Failed produce request /sec----------
TotalCount: 0
MeanRate: 0.00
OneMinuteRate: 0.00
FiveMinuteRate: 0.00
FifteenMinuteRate: 0.00
----------Produce request in /sec----------
TotalCount: 1
MeanRate: 0.03
OneMinuteRate: 0.12
FiveMinuteRate: 0.18
FifteenMinuteRate: 0.19      

JMX_Exporter + Prometheus + Grafana

JMX_Exporter 通過HTTP的方式暴露 metrics 資料, Prometheus 主動抓取 metrics 資料,Grafana對接Promethues的資料進行展示。

confluent公司提供了一個demo示例,通過docker-compose快速搭建JMX_Exporter + Prometheus + Grafana監控架構,github位址:

https://github.com/confluentinc/jmx-monitoring-stacks
Kafka監控架構介紹
Kafka監控架構介紹

Jolokia + Elasticsearch + Kibana

Jolokia也是通過JMX的方式來擷取Kafka運作狀态名額,通過Elasticsearch做資料的存儲,搜尋,Kibana做圖表的展示。

Kafka監控架構介紹
Kafka監控架構介紹
Kafka監控架構介紹

Confluent Control Center

Confluent 公司釋出的 Control Center, 這是目前已知的最強大的Kafka 監控架構。2014 年,Kafka 的 3 個創始人 Jay Kreps、Naha Narkhede 和饒軍離開 LinkedIn 創辦了 Confluent 公司,專注于提供基于 Kafka 的企業級流處了解決方案。

Control Center 不但能夠實時地監控 Kafka 叢集,而且還能夠幫助你操作和搭建基于 Kafka 的實時流處理應用。更棒的是,Control Center 提供了統一式的主題管理功能。你可以在這裡享受到 Kafka 主題和 Schema 的一站式管理服務。

Kafka監控架構介紹
Kafka監控架構介紹
Kafka監控架構介紹

Kafka Eagle

Kafka Eagle是由國人維護的,目前還在積極地演進着。根據 Kafka Eagle 官網的描述,除了提供正常的監控功能之外,還開放了告警功能(Alert),非常值得一試。

github位址:

https://github.com/smartloli/kafka-eagle/
Kafka監控架構介紹
Kafka監控架構介紹