天天看點

Kafka producer consumer connect stream 監控名額擷取

1、kafka producer consumer connect stream 監控

       kafka 提供對producer consumer connect stream的名額監控

       kafka monitoring官方文檔

       producer consumer connect stream 的監控是在各自的執行個體上,即在kafka叢集上是擷取不到這些監控名額的,是以要在各自的執行個體上開啟JMX端口以供調用。

Kafka producer consumer connect stream 監控名額擷取

2、開啟JMX端口

     網上的資料很多,簡單寫一下

     java 程式開啟

    -Dcom.sun.management.jmxremote.port=8888

     -Dcom.sun.management.jmxremote.ssl=false

     -Dcom.sun.management.jmxremote.authenticate=false

    connect distribution方式開啟

    與kafka一緻

3、擷取具體的名額項

     初始化連接配接

public boolean init(){

		jmxURL = "service:jmx:rmi:///jndi/rmi://" +ipAndPort+ "/jmxrmi";
		log.info("init jmx, jmxUrl: "+jmxURL+" and begin to connect it");
		try {
			JMXServiceURL serviceURL = new JMXServiceURL(jmxURL);
			JMXConnector connector = JMXConnectorFactory.connect(serviceURL,null);
			conn = connector.getMBeanServerConnection();
			if(conn == null){
				log.error("get connection return null!");
				return  false;
			}
		} catch (MalformedURLException e) {
			e.printStackTrace();
			return false;
		} catch (IOException e) {
			e.printStackTrace();
			return false;
		}
		return true;
	}
           

擷取producer、consumer名額

public double getProducerRate(String clientId, String topic, String type) {
		
		String objectName = "kafka.producer:type=producer-topic-metrics,client-id="+clientId+",topic="+topic;
		Object val = getAttribute(objectName, type);
		log.info(" getProducerRate, " + type +", result: " + val);
		if(val !=null){
			double dVal = ((Double)val).doubleValue();
			return dVal;
		}
		return 0;
	}
	
	
public double getConsumerRate(String clientId, String topic, String type) {
		
		String objectName = "kafka.consumer:type=consumer-fetch-manager-metrics,client-id="+clientId+",topic="+topic;
		Object val = getAttribute(objectName, type);
		log.info(" getConsumerRate, " + type +", result: " + val);
		if(val !=null){
			double dVal = ((Double)val).doubleValue();
			return dVal;
		}
		return 0;
	}


private Object getAttribute(ObjectName objName, String objAttr){
		if(conn== null)
		{
			log.error("jmx connection is null");
			return null;
		}

		try {
			return conn.getAttribute(objName,objAttr);
		} catch (MBeanException e) {
			e.printStackTrace();
			return null;
		} catch (AttributeNotFoundException e) {
			e.printStackTrace();
			return null;
		} catch (InstanceNotFoundException e) {
			e.printStackTrace();
			return null;
		} catch (ReflectionException e) {
			e.printStackTrace();
			return null;
		} catch (IOException e) {
			e.printStackTrace();
			return null;
		}
	}
           

4、connect 擷取

     connect 比較特别,producer、consumer是在源碼中指定的

Kafka producer consumer connect stream 監控名額擷取
Kafka producer consumer connect stream 監控名額擷取

connect client-id命名方式:

consumer:connector-consumer-"connector_name"-"taskId"

producer:connector-producer-"connector_name"-"taskId"

僅供參考