大家好,我是Jensen。一個想和大家一起打怪更新的程式員朋友。
咱們在寫Kafka消費者的時候,有沒有發現一個很麻煩的事:每次都要手動解析Kafka消息,轉換成自己想要的類型,再進行業務操作,比如:
/**
* 訂單支付成功通知
* @Author 公衆号:架構師修行錄
*/
@KafkaListener(topics = "oms.orderPaySuccess", groupId = "fms")
public void orderPaySuccess(ConsumerRecord<String, String> consumerRecord) {
// 解析Kafka消息
OrderPaySuccessEvent event = JSON.parseObject(consumerRecord.value(), OrderPaySuccessEvent.class);
// TODO 完成解析成功後的業務操作
}
對代碼有潔癖的同志就比較難受了,每次解析的操作都差不多,但這又不是業務代碼……于是你就想:有沒有一個辦法讓系統能夠自動解析成自己想要的參數對象呢?
其實是有的,早在一年前,我寫過一個元件,對kafka消費做了一層淺封裝,也一直在工程中沿用到現在:Kafka消費者這樣寫,一年節省10,000行代碼
不過也發現了一些難以解決的問題,比如:共用了一個groupId,使用Java線程池來管理,這樣工程會存在性能瓶頸。
專業的事還是交給專業的“人”來做吧,用Kafka元件本身來管理不同分組消費會更靠譜。
Spring官方也發現這個問題,并且對此提出了解決方案——spring-messaging包下的HandlerMethodArgumentResolver接口。
Spring官方說明
我們從官方文檔中看到,Spring-kafka在2.4.2版本支援了對kafka消息進行方法參數級别轉換。
其實,spring-web包下也有這個同名接口,用于自動解析Controller方法上的參數,這塊網上資料比較多我就不詳細展開了,而spring-messaging包下面這個接口的非官方資料比較少,我這裡給大家總結一下用法。
HandlerMethodArgumentResolver方法參數處理器接口很簡單,隻有兩個方法:
public interface HandlerMethodArgumentResolver {
boolean supportsParameter(MethodParameter var1);
@Nullable
Object resolveArgument(MethodParameter var1, Message<?> var2) throws Exception;
}
supportsParameter方法傳回是否支援參數自動解析,resolveArgument方法就是具體的解析邏輯,把MQ傳遞參數轉換為具體類型參數。
我們來看一下實際案例。
首先實作HandlerMethodArgumentResolver接口,定義為一個Spring的Component:
@Component
public class KafkaListenerMethodArgumentResolver implements HandlerMethodArgumentResolver {
@Override
public boolean supportsParameter(@NonNull MethodParameter parameter) {
// 預設以com.xxx開頭的類,這樣可以不用在在參數前加@Payload注解
return parameter.getParameterType().getName().startsWith("com.xxx") || parameter.hasParameterAnnotation(Payload.class);
}
@Override
public Object resolveArgument(@NonNull MethodParameter parameter, @NonNull Message<?> message) {
Class<?> parameterType = parameter.getParameterType();
String messageContent = (String) message.getPayload();
Object body;
try {
// 這裡定義自己的解析方法
body = JsonUtils.fromJson(messageContent, parameterType);
Objects.requireNonNull(body);
} catch (Throwable cause) {
throw new KafkaException("kafka 消息解析失敗: 非法JSON字元串", cause);
}
// 可選,定義解析後的參數校驗邏輯
validate(parameter, body);
return body;
}
private void validate(MethodParameter parameter, Object target) {
for (Annotation ann : parameter.getParameterAnnotations()) {
Validated validatedAnn = AnnotationUtils.getAnnotation(ann, Validated.class);
if (Objects.nonNull(validatedAnn) || ann.annotationType().getSimpleName().startsWith("Valid")) {
ValidationUtils.valid(target);
}
}
}
}
到這裡,一個類就把參數自動解析搞定了,接下來咱們看看怎麼用。
/**
* 訂單支付成功通知
* @Author 公衆号:架構師修行錄
*/
@KafkaListener(topics = "oms.orderPaySuccess", groupId = "fms")
public void orderPaySuccess(@Payload OrderPaySuccessEvent orderPaySuccessEvent) {
// 已經自動解析Kafka消息為orderPaySuccessEvent參數
// TODO 完成解析成功後的業務操作
}
怎麼樣,代碼是不是比原來簡潔多了,香不香!
如果你對技術有追求,不想一直寫業務代碼,不妨把項目中所有需要手動解析參數的地方,替換成自定義方法參數解析器來實作~
本文作者:Jensen
八年Java老兵,曾涉獵航空、電信、IoT、垂直電商、直播互動遊戲産品研發,現就職于廣州某知名電商企業,财務資金團隊負責人。
小米主題設計師、手機輸入法設計師、雙鍵五筆創始人、ProcessOn特邀講師。
技術公衆号【架構師修行錄】号主,專注于分享程式員日常/架構技術/職場幹貨,關注回複“職場”馬上升職加薪。