1、kafka producer consumer connect stream 监控
kafka 提供对producer consumer connect stream的指标监控
kafka monitoring官方文档
producer consumer connect stream 的监控是在各自的实例上,即在kafka集群上是获取不到这些监控指标的,所以要在各自的实例上开启JMX端口以供调用。

2、开启JMX端口
网上的资料很多,简单写一下
java 程序开启
-Dcom.sun.management.jmxremote.port=8888
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.authenticate=false
connect distribution方式开启
与kafka一致
3、获取具体的指标项
初始化连接
public boolean init(){
jmxURL = "service:jmx:rmi:///jndi/rmi://" +ipAndPort+ "/jmxrmi";
log.info("init jmx, jmxUrl: "+jmxURL+" and begin to connect it");
try {
JMXServiceURL serviceURL = new JMXServiceURL(jmxURL);
JMXConnector connector = JMXConnectorFactory.connect(serviceURL,null);
conn = connector.getMBeanServerConnection();
if(conn == null){
log.error("get connection return null!");
return false;
}
} catch (MalformedURLException e) {
e.printStackTrace();
return false;
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
获取producer、consumer指标
public double getProducerRate(String clientId, String topic, String type) {
String objectName = "kafka.producer:type=producer-topic-metrics,client-id="+clientId+",topic="+topic;
Object val = getAttribute(objectName, type);
log.info(" getProducerRate, " + type +", result: " + val);
if(val !=null){
double dVal = ((Double)val).doubleValue();
return dVal;
}
return 0;
}
public double getConsumerRate(String clientId, String topic, String type) {
String objectName = "kafka.consumer:type=consumer-fetch-manager-metrics,client-id="+clientId+",topic="+topic;
Object val = getAttribute(objectName, type);
log.info(" getConsumerRate, " + type +", result: " + val);
if(val !=null){
double dVal = ((Double)val).doubleValue();
return dVal;
}
return 0;
}
private Object getAttribute(ObjectName objName, String objAttr){
if(conn== null)
{
log.error("jmx connection is null");
return null;
}
try {
return conn.getAttribute(objName,objAttr);
} catch (MBeanException e) {
e.printStackTrace();
return null;
} catch (AttributeNotFoundException e) {
e.printStackTrace();
return null;
} catch (InstanceNotFoundException e) {
e.printStackTrace();
return null;
} catch (ReflectionException e) {
e.printStackTrace();
return null;
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
4、connect 获取
connect 比较特别,producer、consumer是在源码中指定的
connect client-id命名方式:
consumer:connector-consumer-"connector_name"-"taskId"
producer:connector-producer-"connector_name"-"taskId"