天天看點

JMX監控Zookeeper狀态Java API

一、背景

上一篇通過Java自帶的JConsole來擷取zookeeper狀态。主要有幾個不友善的地方,zk叢集一般會部署3或者5台,在多個JConsole視窗中切換比較麻煩,各個zk服務及曆史資料之間,不能直覺比較。一般會做一個WEB管理頁面來展示叢集狀态,設定報警閥值來做報警。

二、JVM平台提供Mbeans

在Java5.0以上版本,有一組API可以讓Java應用程式和允許的工具監視和管理Java虛拟機(JVM)和虛拟機所在的本機作業系統。該組API在 java.lang.management包。可以通過這些API可以監控local端JVM,同時也可以監控遠端JVM。

通過靜态工廠方法擷取MXBean執行個體,從本地通路正在運作的虛拟機的MXBean接口。這些Bean我們從ManagementFactory類中定義的靜态方法擷取;如ManagementFactory.getOperatingSystemMXBean();其中不足就是隻能擷取本地的JVM狀态。無法擷取遠端的虛拟機資料。

  • ClassLoadingMXBean Java虛拟機的類加載系統
  • CompilationMXBean Java虛拟機的編譯系統
  • MemoryMXBean Java虛拟機的記憶體系統
  • RuntimeMXBean Java虛拟機的運作時系統
  • OperatingSystemMXBean Java虛拟機在其上運作的作業系統
  • GarbageCollectorMXBean Java虛拟機中的垃圾回收器
  • MemoryManagerMXBean Java虛拟機中的記憶體管理器
  • MemoryPoolMXBean Java虛拟機中的記憶體池
三、Zookeeper提供出來的Mbeans

構造MXBean代理執行個體,通過代理将方法調用轉發到給定的MBeanServe。JConsole能夠監控的項目,通過API都能擷取到。

具體代碼如下:

import java.io.IOException;
import java.lang.management.ClassLoadingMXBean;
import java.lang.management.CompilationMXBean;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.lang.management.ThreadMXBean;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import javax.management.InstanceNotFoundException;
import javax.management.IntrospectionException;
import javax.management.JMX;
import javax.management.MBeanServerConnection;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.ReflectionException;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;

import org.apache.zookeeper.server.ConnectionMXBean;
import org.apache.zookeeper.server.DataTreeMXBean;
import org.apache.zookeeper.server.ZooKeeperServerMXBean;

public class ZkJMXTest {
    static JMXConnector connector;

    /**
     * @param args
     * @throws IOException
     * @throws MalformedObjectNameException
     * @throws InstanceNotFoundException
     * @throws ReflectionException
     * @throws IntrospectionException
     */
    public static void main(String[] args) throws IOException, MalformedObjectNameException,
        InstanceNotFoundException, IntrospectionException, ReflectionException {

        OperatingSystemMXBean osbean = ManagementFactory.getOperatingSystemMXBean();
        System.out.println("體系結構:" + osbean.getArch());//作業系統體系結構
        System.out.println("處理器核數:" + osbean.getAvailableProcessors());///核數
        System.out.println("名字:" + osbean.getName());//名字

        System.out.println(osbean.getVersion());//作業系統版本
        ThreadMXBean threadBean=ManagementFactory.getThreadMXBean();
        System.out.println("活動線程:" + threadBean.getThreadCount());//總線程數

        ClassLoadingMXBean classLoadingMXBean = ManagementFactory.getClassLoadingMXBean();
        CompilationMXBean compilationMXBean = ManagementFactory.getCompilationMXBean();
        System.out.println("===========");

        // 通過 MBeanServer間接地通路 MXBean 接口
        MBeanServerConnection mbsc = createMBeanServer("192.168.1.100", "9991", "controlRole", "123456");

        // 作業系統
        ObjectName os = new ObjectName("java.lang:type=OperatingSystem");
        System.out.println("體系結構:" + getAttribute(mbsc, os, "Arch"));//體系結構
        System.out.println("處理器核數:" + getAttribute(mbsc, os, "AvailableProcessors"));//核數
        System.out.println("總實體記憶體:" + getAttribute(mbsc, os, "TotalPhysicalMemorySize"));//總實體記憶體
        System.out.println("空閑實體記憶體:" + getAttribute(mbsc, os, "FreePhysicalMemorySize"));//空閑實體記憶體
        System.out.println("總交換空間:" + getAttribute(mbsc, os, "TotalSwapSpaceSize"));//總交換空間
        System.out.println("空閑交換空間:" + getAttribute(mbsc, os, "FreeSwapSpaceSize"));//空閑交換空間

        System.out.println("作業系統:" + getAttribute(mbsc, os, "Name")+ getAttribute(mbsc, os, "Version"));//作業系統
        System.out.println("送出的虛拟記憶體:" + getAttribute(mbsc, os, "CommittedVirtualMemorySize"));//送出的虛拟記憶體
        System.out.println("系統cpu使用率:" + getAttribute(mbsc, os, "SystemCpuLoad"));//系統cpu使用率
        System.out.println("程序cpu使用率:" + getAttribute(mbsc, os, "ProcessCpuLoad"));//程序cpu使用率

        System.out.println("============");//
        // 線程
        ObjectName Threading = new ObjectName("java.lang:type=Threading");
        System.out.println("活動線程:" + getAttribute(mbsc, Threading, "ThreadCount"));// 活動線程
        System.out.println("守護程式線程:" + getAttribute(mbsc, Threading, "DaemonThreadCount"));// 守護程式線程
        System.out.println("峰值:" + getAttribute(mbsc, Threading, "PeakThreadCount"));// 峰值
        System.out.println("啟動的線程總數:" + getAttribute(mbsc, Threading, "TotalStartedThreadCount"));// 啟動的線程總數
        ThreadMXBean threadBean2 = ManagementFactory.newPlatformMXBeanProxy
                (mbsc, ManagementFactory.THREAD_MXBEAN_NAME, ThreadMXBean.class);
        System.out.println("活動線程:" + threadBean2.getThreadCount());// 活動線程
        ThreadMXBean threadBean3 = ManagementFactory.getThreadMXBean();
        System.out.println("本地活動線程:" + threadBean3.getThreadCount());// 本地活動線程

        System.out.println("============");//
        ObjectName Compilation = new ObjectName("java.lang:type=Compilation");
        System.out.println("總編譯時間 毫秒:" + getAttribute(mbsc, Compilation, "TotalCompilationTime"));// 總編譯時間 毫秒

        System.out.println("============");//
        ObjectName ClassLoading = new ObjectName("java.lang:type=ClassLoading");
        System.out.println("已加載類總數:" + getAttribute(mbsc, ClassLoading, "TotalLoadedClassCount"));// 已加載類總數
        System.out.println("已加裝目前類:" + getAttribute(mbsc, ClassLoading, "LoadedClassCount"));// 已加裝目前類
        System.out.println("已解除安裝類總數:" + getAttribute(mbsc, ClassLoading, "UnloadedClassCount"));// 已解除安裝類總數


        System.out.println("==========================================================");//
        // http://zookeeper.apache.org/doc/r3.4.6/zookeeperJMX.html
        // org.apache.ZooKeeperService:name0=ReplicatedServer_id1,name1=replica.1,name2=Follower
        ObjectName replica = new ObjectName("org.apache.ZooKeeperService:name0=ReplicatedServer_id1,name1=replica.1");
        System.out.println("replica.1運作狀态:" + getAttribute(mbsc, replica, "State"));// 運作狀态

        mbsc = createMBeanServer("192.168.1.100", "9992", "controlRole", "123456");
        System.out.println("==============節點樹對象===========");
        ObjectName dataTreePattern = new ObjectName("org.apache.ZooKeeperService:name0=ReplicatedServer_id?,name1=replica.?,name2=*,name3=InMemoryDataTree");
        Set<ObjectName> dataTreeSets = mbsc.queryNames(dataTreePattern, null);
        Iterator<ObjectName> dataTreeIterator = dataTreeSets.iterator();
        // 隻有一個
        while (dataTreeIterator.hasNext()) {
            ObjectName dataTreeObjectName = dataTreeIterator.next();
            DataTreeMXBean dataTree = JMX.newMBeanProxy(mbsc, dataTreeObjectName, DataTreeMXBean.class);
            System.out.println("節點總數:" + dataTree.getNodeCount());// 節點總數
            System.out.println("Watch總數:" + dataTree.getWatchCount());// Watch總數
            System.out.println("臨時節點總數:" + dataTree.countEphemerals());// Watch總數
            System.out.println("節點名及字元總數:" + dataTree.approximateDataSize());// 節點全路徑和值的總字元數

            Map<String, String> dataTreeMap = dataTreeObjectName.getKeyPropertyList();
            String replicaId = dataTreeMap.get("name1").replace("replica.", "");
            String role = dataTreeMap.get("name2");// Follower,Leader,Observer,Standalone
            String canonicalName = dataTreeObjectName.getCanonicalName();
            int roleEndIndex = canonicalName.indexOf(",name3");

            ObjectName roleObjectName = new ObjectName(canonicalName.substring(0, roleEndIndex));
            System.out.println("==============zk服務狀态===========");
            ZooKeeperServerMXBean ZooKeeperServer = JMX.newMBeanProxy(mbsc, roleObjectName, ZooKeeperServerMXBean.class);
            System.out.println(role + " 的IP和端口:" + ZooKeeperServer.getClientPort());// IP和端口
            System.out.println(role + " 活着的連接配接數:" + ZooKeeperServer.getNumAliveConnections());// 連接配接數
            System.out.println(role + " 未完成請求數:" + ZooKeeperServer.getOutstandingRequests());// 未完成的請求數
            System.out.println(role + " 接收的包:" + ZooKeeperServer.getPacketsReceived());// 收到的包
            System.out.println(role + " 發送的包:" + ZooKeeperServer.getPacketsSent());// 發送的包
            System.out.println(role + " 平均延遲(毫秒):" + ZooKeeperServer.getAvgRequestLatency());
            System.out.println(role + " 最大延遲(毫秒):" + ZooKeeperServer.getMaxRequestLatency());

            System.out.println(role + " 每個用戶端IP允許的最大連接配接數:" + ZooKeeperServer.getMaxClientCnxnsPerHost());
            System.out.println(role + " 最大Session逾時(毫秒):" + ZooKeeperServer.getMaxSessionTimeout());
            System.out.println(role + " 心跳時間(毫秒):" + ZooKeeperServer.getTickTime());
            System.out.println(role + " 版本:" + ZooKeeperServer.getVersion());// 版本
            // 三個重置操作
//            ZooKeeperServer.resetLatency(); //重置min/avg/max latency statistics
//            ZooKeeperServer.resetMaxLatency(); //重置最大延遲統計
//            ZooKeeperServer.resetStatistics(); // 重置包和延遲所有統計


            System.out.println("==============所有用戶端的連接配接資訊===========");
            ObjectName connectionPattern = new ObjectName("org.apache.ZooKeeperService:name0=ReplicatedServer_id?,name1=replica.?,name2=*,name3=Connections,*");
            Set<ObjectName> connectionSets = mbsc.queryNames(connectionPattern, null);
            List<ObjectName> connectionList = new ArrayList<ObjectName>(connectionSets.size());
            connectionList.addAll(connectionSets);
            Collections.sort(connectionList);
            for (ObjectName connectionON : connectionList) {
                System.out.println("=========================");
                ConnectionMXBean connectionBean = JMX.newMBeanProxy(mbsc, connectionON, ConnectionMXBean.class);
                System.out.println(" IP+Port:" + connectionBean.getSourceIP());//
                System.out.println(" SessionId:" + connectionBean.getSessionId());//
                System.out.println(" PacketsReceived:" + connectionBean.getPacketsReceived());// 收到的包
                System.out.println(" PacketsSent:" + connectionBean.getPacketsSent());// 發送的包
                System.out.println(" MinLatency:" + connectionBean.getMinLatency());//
                System.out.println(" AvgLatency:" + connectionBean.getAvgLatency());//
                System.out.println(" MaxLatency:" + connectionBean.getMaxLatency());//
                System.out.println(" StartedTime:" + connectionBean.getStartedTime());//
                System.out.println(" EphemeralNodes:" + connectionBean.getEphemeralNodes().length);//
                System.out.println(" EphemeralNodes:" + Arrays.asList(connectionBean.getEphemeralNodes()));//
                System.out.println(" OutstandingRequests:" + connectionBean.getOutstandingRequests());//
                
                //connectionBean.resetCounters();
                //connectionBean.terminateConnection();
                //connectionBean.terminateSession();
            }
        }
        // close connection
        if (connector != null) {
            connector.close();
        }
    }

    /**
     * 建立連接配接
     *
     * @param ip
     * @param jmxport
     * @return
     */
    public static MBeanServerConnection createMBeanServer(String ip,
            String jmxport, String userName, String password) {
        try {
            String jmxURL = "service:jmx:rmi:///jndi/rmi://" + ip + ":"
                    + jmxport + "/jmxrmi";
            // jmxurl
            JMXServiceURL serviceURL = new JMXServiceURL(jmxURL);

            Map<String, String[]> map = new HashMap<String, String[]>();
            String[] credentials = new String[] { userName, password };
            map.put("jmx.remote.credentials", credentials);
            connector = JMXConnectorFactory.connect(serviceURL, map);
            MBeanServerConnection mbsc = connector.getMBeanServerConnection();
            return mbsc;

        } catch (IOException ioe) {
            ioe.printStackTrace();
            System.err.println(ip + ":" + jmxport + " 連接配接建立失敗");
        }
        return null;
    }

    /**
     * 使用MBeanServer擷取對象名為[objName]的MBean的[objAttr]屬性值
     * <p>
     * 靜态代碼: return MBeanServer.getAttribute(ObjectName name, String attribute)
     *
     * @param mbeanServer
     *            - MBeanServer執行個體
     * @param objName
     *            - MBean的對象名
     * @param objAttr
     *            - MBean的某個屬性名
     * @return 屬性值
     */
    private static String getAttribute(MBeanServerConnection mbeanServer,
            ObjectName objName, String objAttr) {
        if (mbeanServer == null || objName == null || objAttr == null)
            throw new IllegalArgumentException();
        try {
            return String.valueOf(mbeanServer.getAttribute(objName, objAttr));
        } catch (Exception e) {
            return null;
        }
    }
}