背景介紹
在文章【定位頻繁建立對象導緻記憶體溢出風險的思路】中分析了三種【事中】定位的方法,總體思路是能夠攔截對象的建立邏輯,現在對【同一條SQL語句,平時傳回預期内的資料條數,出問題的時候傳回了幾十萬條資料,短時間内建立了大量對象進而導緻非預期的GC】這個場景進行分析。
問題分析
同一條SQL語句,平時傳回預期内的資料條數,出問題的時候傳回了幾十萬條資料,短時間内建立了大量對象進而導緻非預期的GC,嚴重情況下會導緻應用無法提供服務。當這樣情況發生的時候,需要能夠及時發現并進行處理,為了便于定位問題,需要以下資訊:
- 引起問題的sql及sql的參數
- 查詢結果集的條數
- 查詢結果集的位元組大小
- 執行該sql的線程棧資訊
當然也可以根據具體需求,擷取更多的資訊,比如:資料庫連接配接資訊、各種相關配置參數等。
實作方法
采用位元組碼增強技術,當Statement執行execute和executeQuery的時候,攔截方法的傳回并對傳回結果進行分析。具體實作如下:
mysql-connector-java:
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.46</version>
</dependency>
位元組碼增強架構使用的是bytekit:
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>bytekit-core</artifactId>
<version>0.0.8</version>
</dependency>
Mysql Statement Query Interceptor :
import com.alibaba.bytekit.a**.binding.Binding;
import com.alibaba.bytekit.a**.interceptor.annotation.AtEnter;
import com.alibaba.bytekit.a**.interceptor.annotation.AtExit;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.sql.SQLException;
public class MysqlStatementQueryInterceptor {
private static ThreadLocal<Long> HOLDER = new ThreadLocal<>();
private static int THRESHOLD_COUNT = 100;
private static int THRESHOLD_SIZE = 1*1024;
private final static int THRESHOLD_ELAPSED = 10 * 1000;
@AtEnter(inline = false)
public static void atEnter() {
HOLDER.set(System.currentTimeMillis());
}
@AtExit(inline = false)
public static void atExit(@Binding.This Object target,
@Binding.Args Object[] args,
@Binding.MethodName String methodName) {
try{
doAtExit(target,args,methodName);
}catch (Throwable throwable){
throwable.printStackTrace();
}
}
public static void doAtExit(Object target,Object[] args, String methodName) throws SQLException, IllegalAccessException, InvocationTargetException {
Field resultsField = field(target.getClass(),"results");
resultsField.setAccessible(true);
Object obj = resultsField.get(target);
Method getUpdateCount = method(obj.getClass(),"getUpdateCount");
getUpdateCount.setAccessible(true);
long updateCount = (long)getUpdateCount.invoke(obj);
Method getBytesSize = method(obj.getClass(),"getBytesSize");
getBytesSize.setAccessible(true);
int byteSize = (int)getBytesSize.invoke(obj);
long elapsed = System.currentTimeMillis() - HOLDER.get();
if(updateCount > THRESHOLD_COUNT || byteSize > THRESHOLD_SIZE || elapsed > THRESHOLD_ELAPSED){
String sql = (args.length >= 1) ? (String) args[0] : "";
Method asSql = method(target.getClass(),"asSql");
if(asSql != null){
asSql.setAccessible(true);
sql = (String) asSql.invoke(target);
}
String ** = target.getClass().getName() + "." + methodName +
"," + sql +
"," + byteSize + " bytes"+
",amount " + updateCount +
",elapsed " + elapsed + " ms";
TooManyResultException e = new TooManyResultException(**);
e.setStackTrace(Thread.currentThread().getStackTrace());
e.printStackTrace();
}
}
private static Field field(Class<?> clazz,String fieldName){
if(clazz == null){
return null;
}
try{
return clazz.getDeclaredField(fieldName);
}catch (NoSuchFieldException exception){
return field(clazz.getSuperclass(),fieldName);
}
}
private static Method method(Class<?> clazz, String methodName){
if(clazz == null){
return null;
}
try{
return clazz.getDeclaredMethod(methodName);
} catch (NoSuchMethodException e) {
return method(clazz.getSuperclass(),methodName);
}
}
}
增強位元組碼:
Instrumentation instrumentation = AgentUtils.install();
DefaultInterceptorClassParser interceptorClassParser = new DefaultInterceptorClassParser();
List<InterceptorProcessor> processors = interceptorClassParser.parse(MysqlStatementQueryInterceptor.class);
String classPattern = "com.mysql.jdbc.StatementImpl";
Set<String> methodNames = new HashSet<>();
methodNames.add("executeQuery");
methodNames.add("execute");
BytekitUtils.reTransformClass(instrumentation,processors,classPattern,methodNames,true);
import com.alibaba.bytekit.a**.MethodProcessor;
import com.alibaba.bytekit.a**.interceptor.InterceptorProcessor;
import com.alibaba.bytekit.utils.AgentUtils;
import com.alibaba.bytekit.utils.A**Utils;
import com.alibaba.deps.org.objectweb.a**.tree.ClassNode;
import com.alibaba.deps.org.objectweb.a**.tree.MethodNode;
import java.lang.instrument.Instrumentation;
import java.util.List;
import java.util.Set;
public class BytekitUtils {
public static void reTransformClass(Instrumentation instrumentation, List<InterceptorProcessor> processors,
String className, Set<String> methodNames, boolean subClass){
Set<Class<?>> classes = SearchUtils.searchClassOnly(instrumentation,className,false);
if(classes.isEmpty()){
return;
}
Set<Class<?>> subClasses = classes;
if(subClass){
subClasses =SearchUtils.searchSubClass(instrumentation,classes);
}
reTransform(processors,subClasses,methodNames);
}
public static void reTransform(List<InterceptorProcessor> processors,Set<Class<?>> classes,Set<String> methodNames) {
for(Class<?> cls : classes) {
ClassNode classNode = null;
try {
classNode = A**Utils.loadClass(cls);
classNode = A**Utils.removeJSRInstructions(classNode);
} catch (Exception e) {
e.printStackTrace();
continue;
}
boolean inited = false;
for (MethodNode methodNode : classNode.methods) {
if (methodNames == null || methodNames.contains(methodNode.name)) {
MethodProcessor methodProcessor = new MethodProcessor(classNode, methodNode);
for (InterceptorProcessor interceptor : processors) {
try {
interceptor.process(methodProcessor);
inited = true;
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
if (!inited) {
continue;
}
byte[] bytes = A**Utils.toBytes(classNode);
try {
AgentUtils.reTransform(cls, bytes);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
應用方式
方式一
在應用啟動的時候進行位元組碼增強,可以實時監控每個資料庫查詢,當出現問題的時候,可以進行報警,能夠更快的發現問題;代價是有些額外的開銷。
方式二
按需進行位元組碼增強,即當系統出現問題的時候進行位元組碼增強(比如作為一條command內建進arthas),當再次出現問題的時候,可以抓取到異常資訊進行分析。
總結
通過位元組碼增強技術來攔截Statement的執行,進而擷取執行的sql、結果集的條數、大小及調用的線程棧資訊,當結果集的條數、大小或執行時間超過門檻值的時候,進行報警以便更快的發現和分析定位。