1、kafka producer consumer connect stream 監控
kafka 提供對producer consumer connect stream的名額監控
kafka monitoring官方文檔
producer consumer connect stream 的監控是在各自的執行個體上,即在kafka叢集上是擷取不到這些監控名額的,是以要在各自的執行個體上開啟JMX端口以供調用。

2、開啟JMX端口
網上的資料很多,簡單寫一下
java 程式開啟
-Dcom.sun.management.jmxremote.port=8888
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.authenticate=false
connect distribution方式開啟
與kafka一緻
3、擷取具體的名額項
初始化連接配接
public boolean init(){
jmxURL = "service:jmx:rmi:///jndi/rmi://" +ipAndPort+ "/jmxrmi";
log.info("init jmx, jmxUrl: "+jmxURL+" and begin to connect it");
try {
JMXServiceURL serviceURL = new JMXServiceURL(jmxURL);
JMXConnector connector = JMXConnectorFactory.connect(serviceURL,null);
conn = connector.getMBeanServerConnection();
if(conn == null){
log.error("get connection return null!");
return false;
}
} catch (MalformedURLException e) {
e.printStackTrace();
return false;
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
擷取producer、consumer名額
public double getProducerRate(String clientId, String topic, String type) {
String objectName = "kafka.producer:type=producer-topic-metrics,client-id="+clientId+",topic="+topic;
Object val = getAttribute(objectName, type);
log.info(" getProducerRate, " + type +", result: " + val);
if(val !=null){
double dVal = ((Double)val).doubleValue();
return dVal;
}
return 0;
}
public double getConsumerRate(String clientId, String topic, String type) {
String objectName = "kafka.consumer:type=consumer-fetch-manager-metrics,client-id="+clientId+",topic="+topic;
Object val = getAttribute(objectName, type);
log.info(" getConsumerRate, " + type +", result: " + val);
if(val !=null){
double dVal = ((Double)val).doubleValue();
return dVal;
}
return 0;
}
private Object getAttribute(ObjectName objName, String objAttr){
if(conn== null)
{
log.error("jmx connection is null");
return null;
}
try {
return conn.getAttribute(objName,objAttr);
} catch (MBeanException e) {
e.printStackTrace();
return null;
} catch (AttributeNotFoundException e) {
e.printStackTrace();
return null;
} catch (InstanceNotFoundException e) {
e.printStackTrace();
return null;
} catch (ReflectionException e) {
e.printStackTrace();
return null;
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
4、connect 擷取
connect 比較特别,producer、consumer是在源碼中指定的
connect client-id命名方式:
consumer:connector-consumer-"connector_name"-"taskId"
producer:connector-producer-"connector_name"-"taskId"