天天看点

Kafka producer consumer connect stream 监控指标获取

1、kafka producer consumer connect stream 监控

       kafka 提供对producer consumer connect stream的指标监控

       kafka monitoring官方文档

       producer consumer connect stream 的监控是在各自的实例上,即在kafka集群上是获取不到这些监控指标的,所以要在各自的实例上开启JMX端口以供调用。

Kafka producer consumer connect stream 监控指标获取

2、开启JMX端口

     网上的资料很多,简单写一下

     java 程序开启

    -Dcom.sun.management.jmxremote.port=8888

     -Dcom.sun.management.jmxremote.ssl=false

     -Dcom.sun.management.jmxremote.authenticate=false

    connect distribution方式开启

    与kafka一致

3、获取具体的指标项

     初始化连接

public boolean init(){

		jmxURL = "service:jmx:rmi:///jndi/rmi://" +ipAndPort+ "/jmxrmi";
		log.info("init jmx, jmxUrl: "+jmxURL+" and begin to connect it");
		try {
			JMXServiceURL serviceURL = new JMXServiceURL(jmxURL);
			JMXConnector connector = JMXConnectorFactory.connect(serviceURL,null);
			conn = connector.getMBeanServerConnection();
			if(conn == null){
				log.error("get connection return null!");
				return  false;
			}
		} catch (MalformedURLException e) {
			e.printStackTrace();
			return false;
		} catch (IOException e) {
			e.printStackTrace();
			return false;
		}
		return true;
	}
           

获取producer、consumer指标

public double getProducerRate(String clientId, String topic, String type) {
		
		String objectName = "kafka.producer:type=producer-topic-metrics,client-id="+clientId+",topic="+topic;
		Object val = getAttribute(objectName, type);
		log.info(" getProducerRate, " + type +", result: " + val);
		if(val !=null){
			double dVal = ((Double)val).doubleValue();
			return dVal;
		}
		return 0;
	}
	
	
public double getConsumerRate(String clientId, String topic, String type) {
		
		String objectName = "kafka.consumer:type=consumer-fetch-manager-metrics,client-id="+clientId+",topic="+topic;
		Object val = getAttribute(objectName, type);
		log.info(" getConsumerRate, " + type +", result: " + val);
		if(val !=null){
			double dVal = ((Double)val).doubleValue();
			return dVal;
		}
		return 0;
	}


private Object getAttribute(ObjectName objName, String objAttr){
		if(conn== null)
		{
			log.error("jmx connection is null");
			return null;
		}

		try {
			return conn.getAttribute(objName,objAttr);
		} catch (MBeanException e) {
			e.printStackTrace();
			return null;
		} catch (AttributeNotFoundException e) {
			e.printStackTrace();
			return null;
		} catch (InstanceNotFoundException e) {
			e.printStackTrace();
			return null;
		} catch (ReflectionException e) {
			e.printStackTrace();
			return null;
		} catch (IOException e) {
			e.printStackTrace();
			return null;
		}
	}
           

4、connect 获取

     connect 比较特别,producer、consumer是在源码中指定的

Kafka producer consumer connect stream 监控指标获取
Kafka producer consumer connect stream 监控指标获取

connect client-id命名方式:

consumer:connector-consumer-"connector_name"-"taskId"

producer:connector-producer-"connector_name"-"taskId"

仅供参考