背景
部門内有一些億級别核心業務表增速非常快,增量日均100W,但線上業務隻依賴近一周的資料。随着資料量的迅速增長,慢SQL頻發,資料庫性能下降,系統穩定性受到嚴重影響。本篇文章,将分享如何使用MyBatis攔截器低成本的提升資料庫穩定性。
業界常見方案
針對冷資料多的大表,常用的政策有以2種:
1. 删除/歸檔舊資料。
2. 分表。
歸檔/删除舊資料
定期将冷資料移動到歸檔表或者冷存儲中,或定期對表進行删除,以減少表的大小。此政策邏輯簡單,隻需要編寫一個JOB定期執行SQL删除資料。我們開始也是用這種方案,但此方案也有一些副作用:
1.資料删除會影響資料庫性能,引發慢sql,多張表并行删除,資料庫壓力會更大。
2.頻繁删除資料,會産生資料庫碎片,影響資料庫性能,引發慢SQL。
綜上,此方案有一定風險,為了規避這種風險,我們決定采用另一種方案:分表。
分表
我們決定按日期對表進行橫向拆分,實作讓系統每周生成一張周期表,表内隻存近一周的資料,規避單表過大帶來的風險。
分表方案選型
經調研,考慮2種分表方案:Sharding-JDBC、利用Mybatis自帶的攔截器特性。
經過對比後,決定采用Mybatis攔截器來實作分表,原因如下:
1.JAVA生态中很常用的分表架構是Sharding-JDBC,雖然功能強大,但需要一定的接入成本,并且很多功能暫時用不上。
2.系統本身已經在使用Mybatis了,隻需要添加一個mybaits攔截器,把SQL表名替換為新的周期表就可以了,沒有接入新架構的成本,開發成本也不高。
分表具體實作代碼
分表配置對象
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Date;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ShardingProperty {
// 分表周期天數,配置7,就是一周一分
private Integer days;
// 分表開始日期,需要用這個日期計算周期表名
private Date beginDate;
// 需要分表的表名
private String tableName;
}
分表配置類
import java.util.concurrent.ConcurrentHashMap;
public class ShardingPropertyConfig {
public static final ConcurrentHashMap<String, ShardingProperty> SHARDING_TABLE = new ConcurrentHashMap<>();
static {
ShardingProperty orderInfoShardingConfig = new ShardingProperty(15, DateUtils.string2Date("20231117"), "order_info");
ShardingProperty userInfoShardingConfig = new ShardingProperty(7, DateUtils.string2Date("20231117"), "user_info");
SHARDING_TABLE.put(orderInfoShardingConfig.getTableName(), orderInfoShardingConfig);
SHARDING_TABLE.put(userInfoShardingConfig.getTableName(), userInfoShardingConfig);
}
}
攔截器
import lombok.extern.slf4j.Slf4j;
import o2o.aspect.platform.function.template.service.TemplateMatchService;
import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.executor.statement.StatementHandler;
import org.apache.ibatis.mapping.BoundSql;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.plugin.*;
import org.apache.ibatis.reflection.DefaultReflectorFactory;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.ReflectorFactory;
import org.apache.ibatis.reflection.factory.DefaultObjectFactory;
import org.apache.ibatis.reflection.factory.ObjectFactory;
import org.apache.ibatis.reflection.wrapper.DefaultObjectWrapperFactory;
import org.apache.ibatis.reflection.wrapper.ObjectWrapperFactory;
import org.springframework.stereotype.Component;
import java.sql.Connection;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.Properties;
@Slf4j
@Component
@Intercepts({@Signature(type = StatementHandler.class, method = "prepare", args = {Connection.class, Integer.class})})
public class ShardingTableInterceptor implements Interceptor {
private static final ObjectFactory DEFAULT_OBJECT_FACTORY = new DefaultObjectFactory();
private static final ObjectWrapperFactory DEFAULT_OBJECT_WRAPPER_FACTORY = new DefaultObjectWrapperFactory();
private static final ReflectorFactory DEFAULT_REFLECTOR_FACTORY = new DefaultReflectorFactory();
private static final String MAPPED_STATEMENT = "delegate.mappedStatement";
private static final String BOUND_SQL = "delegate.boundSql";
private static final String ORIGIN_BOUND_SQL = "delegate.boundSql.sql";
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd");
private static final String SHARDING_MAPPER = "com.jd.o2o.inviter.promote.mapper.ShardingMapper";
private ConfigUtils configUtils = SpringContextHolder.getBean(ConfigUtils.class);
@Override
public Object intercept(Invocation invocation) throws Throwable {
boolean shardingSwitch = configUtils.getBool("sharding_switch", false);
// 沒開啟分表 直接傳回老資料
if (!shardingSwitch) {
return invocation.proceed();
}
StatementHandler statementHandler = (StatementHandler) invocation.getTarget();
MetaObject metaStatementHandler = MetaObject.forObject(statementHandler, DEFAULT_OBJECT_FACTORY, DEFAULT_OBJECT_WRAPPER_FACTORY, DEFAULT_REFLECTOR_FACTORY);
MappedStatement mappedStatement = (MappedStatement) metaStatementHandler.getValue(MAPPED_STATEMENT);
BoundSql boundSql = (BoundSql) metaStatementHandler.getValue(BOUND_SQL);
String originSql = (String) metaStatementHandler.getValue(ORIGIN_BOUND_SQL);
if (StringUtils.isBlank(originSql)) {
return invocation.proceed();
}
// 擷取表名
String tableName = TemplateMatchService.matchTableName(boundSql.getSql().trim());
ShardingProperty shardingProperty = ShardingPropertyConfig.SHARDING_TABLE.get(tableName);
if (shardingProperty == null) {
return invocation.proceed();
}
// 新表
String shardingTable = getCurrentShardingTable(shardingProperty, new Date());
String rebuildSql = boundSql.getSql().replace(shardingProperty.getTableName(), shardingTable);
metaStatementHandler.setValue(ORIGIN_BOUND_SQL, rebuildSql);
if (log.isDebugEnabled()) {
log.info("rebuildSQL -> {}", rebuildSql);
}
return invocation.proceed();
}
@Override
public Object plugin(Object target) {
if (target instanceof StatementHandler) {
return Plugin.wrap(target, this);
}
return target;
}
@Override
public void setProperties(Properties properties) {}
public static String getCurrentShardingTable(ShardingProperty shardingProperty, Date createTime) {
String tableName = shardingProperty.getTableName();
Integer days = shardingProperty.getDays();
Date beginDate = shardingProperty.getBeginDate();
Date date;
if (createTime == null) {
date = new Date();
} else {
date = createTime;
}
if (date.before(beginDate)) {
return null;
}
LocalDateTime targetDate = SimpleDateFormatUtils.convertDateToLocalDateTime(date);
LocalDateTime startDate = SimpleDateFormatUtils.convertDateToLocalDateTime(beginDate);
LocalDateTime intervalStartDate = DateIntervalChecker.getIntervalStartDate(targetDate, startDate, days);
LocalDateTime intervalEndDate = intervalStartDate.plusDays(days - 1);
return tableName + "_" + intervalStartDate.format(FORMATTER) + "_" + intervalEndDate.format(FORMATTER);
}
}
臨界點資料不連續問題
分表方案有1個難點需要解決:周期臨界點資料不連續。舉例:假設要對operate_log(記錄檔表)大表進行橫向分表,每周一張表,分表明細可看下面表格。
第一周(operate_log_20240107_20240108) | 第二周(operate_log_20240108_20240114) | 第三周(operate_log_20240115_20240121) |
1月1号 ~ 1月7号的資料 | 1月8号 ~ 1月14号的資料 | 1月15号 ~ 1月21号的資料 |
1月8号就是分表臨界點,8号需要切換到第二周的表,但8号0點剛切換的時候,表内沒有任何資料,這時如果業務需要查近一周的記錄檔是查不到的,這樣就會引發線上問題。
我決定采用資料備援的方式來解決這個痛點。每個周期表都備援一份上個周期的資料,用雙倍資料量實作資料滑動的效果,效果見下面表格。
第一周(operate_log_20240107_20240108) | 第二周(operate_log_20240108_20240114) | 第三周(operate_log_20240115_20240121) |
12月25号 ~ 12月31号的資料 | 1月1号 ~ 1月7号的資料 | 1月8号 ~ 1月14号的資料 |
1月1号 ~ 1月7号的資料 | 1月8号 ~ 1月14号的資料 | 1月15号 ~ 1月21号的資料 |
注:表格内第一行資料就是備援的上個周期表的資料。
思路有了,接下來就要考慮怎麼實作雙寫(資料備援到下個周期表),有2種方案:
1.在SQL執行完成傳回結果前添加邏輯(可以用AspectJ 或 mybatis攔截器),如果SQL内的表名是目前周期表,就把表名替換為下個周期表,然後再次執行SQL。此方案對業務影響大,相當于串行執行了2次SQL,有性能損耗。
2.監聽增量binlog,京東内部有現成的資料訂閱中間件DRC,讀者也可以使用cannal等開源中間件來代替DRC,原理大同小異,此方案對業務無影響。
方案對比後,選擇了對業務性能損耗小的方案二。
監聽binlog并雙寫流程圖
監聽binlog資料雙寫注意點
1.提前上線監聽程式,提前把老表資料同步到新的周期表。分表前隻監聽老表binlog就可以,分表前隻需要把老表資料同步到新表。
2.切換到新表的臨界點,為了避免丢失積壓的老表binlog,需要同時處理新表binlog和老表binlog,這樣會出現死循環同步的問題,因為老表需要同步新表,新表又需要雙寫老表。為了打破循環,需要先把雙寫老表消費堵上讓消息暫時積壓,切換新表成功後,再打開雙寫消費。
監聽binlog資料雙寫代碼
注:下面代碼不能直接用,隻提供基本思路
/**
* 監聽binlog ,分表雙寫,解決資料臨界問題
*/
@Slf4j
@Component
public class BinLogConsumer implements MessageListener {
private MessageDeserialize deserialize = new JMQMessageDeserialize();
private static final String TABLE_PLACEHOLDER = "%TABLE%";
@Value("${mq.doubleWriteTopic.topic}")
private String doubleWriteTopic;
@Autowired
private JmqProducerService jmqProducerService;
@Override
public void onMessage(List<Message> messages) throws Exception {
if (messages == null || messages.isEmpty()) {
return;
}
List<EntryMessage> entryMessages = deserialize.deserialize(messages);
for (EntryMessage entryMessage : entryMessages) {
try {
syncData(entryMessage);
} catch (Exception e) {
log.error("sharding sync data error", e);
throw e;
}
}
}
private void syncData(EntryMessage entryMessage) throws JMQException {
// 根據binlog内的表名,擷取需要同步的表
// 3種情況:
// 1、老表:需要同步目前周期表,和下個周期表。
// 2、目前周期表:需要同步下個周期表,和老表。
// 3、下個周期表:不需要同步。
List<String> syncTables = getSyncTables(entryMessage.tableName, entryMessage.createTime);
if (CollectionUtils.isEmpty(syncTables)) {
log.info("table {} is not need sync", tableName);
return;
}
if (entryMessage.getHeader().getEventType() == WaveEntry.EventType.INSERT) {
String insertTableSqlTemplate = parseSqlForInsert(rowData);
for (String syncTable : syncTables) {
String insertSql = insertTableSqlTemplate.replaceAll(TABLE_PLACEHOLDER, syncTable);
// 雙寫老表發Q,為了避免出現同步死循環問題
if (ShardingPropertyConfig.SHARDING_TABLE.containsKey(syncTable)) {
Long primaryKey = getPrimaryKey(rowData.getAfterColumnsList());
sendDoubleWriteMsg(insertSql, primaryKey);
continue;
}
mysqlConnection.executeSql(insertSql);
}
continue;
}
}
資料對比
為了保證新表和老表資料一緻,需要編寫對比程式,在上線前進行資料對比,保證binlog同步無問題。
具體實作代碼不做展示,思路:新表查詢一定量級資料,老表查詢相同量級資料,都轉換成JSON,equals對比。
作者:京東零售 張均傑
來源:京東雲開發者社群 轉載請注明來源