天天看點

Kafka封裝之——方法參數解析器,用起來真香!

作者:戰神互遊

大家好,我是Jensen。一個想和大家一起打怪更新的程式員朋友。

咱們在寫Kafka消費者的時候,有沒有發現一個很麻煩的事:每次都要手動解析Kafka消息,轉換成自己想要的類型,再進行業務操作,比如:

/**
 * 訂單支付成功通知
 * @Author 公衆号:架構師修行錄
 */
@KafkaListener(topics = "oms.orderPaySuccess", groupId = "fms")
public void orderPaySuccess(ConsumerRecord<String, String> consumerRecord) {
  	// 解析Kafka消息
		OrderPaySuccessEvent event = JSON.parseObject(consumerRecord.value(), OrderPaySuccessEvent.class);
  	// TODO 完成解析成功後的業務操作
}           

對代碼有潔癖的同志就比較難受了,每次解析的操作都差不多,但這又不是業務代碼……于是你就想:有沒有一個辦法讓系統能夠自動解析成自己想要的參數對象呢?

其實是有的,早在一年前,我寫過一個元件,對kafka消費做了一層淺封裝,也一直在工程中沿用到現在:Kafka消費者這樣寫,一年節省10,000行代碼

Kafka封裝之——方法參數解析器,用起來真香!

不過也發現了一些難以解決的問題,比如:共用了一個groupId,使用Java線程池來管理,這樣工程會存在性能瓶頸。

專業的事還是交給專業的“人”來做吧,用Kafka元件本身來管理不同分組消費會更靠譜。

Spring官方也發現這個問題,并且對此提出了解決方案——spring-messaging包下的HandlerMethodArgumentResolver接口。

Kafka封裝之——方法參數解析器,用起來真香!

Spring官方說明

我們從官方文檔中看到,Spring-kafka在2.4.2版本支援了對kafka消息進行方法參數級别轉換。

其實,spring-web包下也有這個同名接口,用于自動解析Controller方法上的參數,這塊網上資料比較多我就不詳細展開了,而spring-messaging包下面這個接口的非官方資料比較少,我這裡給大家總結一下用法。

HandlerMethodArgumentResolver方法參數處理器接口很簡單,隻有兩個方法:

public interface HandlerMethodArgumentResolver {
    boolean supportsParameter(MethodParameter var1);

    @Nullable
    Object resolveArgument(MethodParameter var1, Message<?> var2) throws Exception;
}           

supportsParameter方法傳回是否支援參數自動解析,resolveArgument方法就是具體的解析邏輯,把MQ傳遞參數轉換為具體類型參數。

我們來看一下實際案例。

首先實作HandlerMethodArgumentResolver接口,定義為一個Spring的Component:

@Component
public class KafkaListenerMethodArgumentResolver implements HandlerMethodArgumentResolver {

    @Override
    public boolean supportsParameter(@NonNull MethodParameter parameter) {
      	// 預設以com.xxx開頭的類,這樣可以不用在在參數前加@Payload注解
        return parameter.getParameterType().getName().startsWith("com.xxx") || parameter.hasParameterAnnotation(Payload.class);
    }

    @Override
    public Object resolveArgument(@NonNull MethodParameter parameter, @NonNull Message<?> message) {
        Class<?> parameterType = parameter.getParameterType();
        String messageContent = (String) message.getPayload();

        Object body;
        try {
          	// 這裡定義自己的解析方法
            body = JsonUtils.fromJson(messageContent, parameterType);
            Objects.requireNonNull(body);
        } catch (Throwable cause) {
            throw new KafkaException("kafka 消息解析失敗: 非法JSON字元串", cause);
        }
				// 可選,定義解析後的參數校驗邏輯
        validate(parameter, body);
        return body;
    }

    private void validate(MethodParameter parameter, Object target) {
        for (Annotation ann : parameter.getParameterAnnotations()) {
            Validated validatedAnn = AnnotationUtils.getAnnotation(ann, Validated.class);
            if (Objects.nonNull(validatedAnn) || ann.annotationType().getSimpleName().startsWith("Valid")) {
                ValidationUtils.valid(target);
            }
        }
    }
}           

到這裡,一個類就把參數自動解析搞定了,接下來咱們看看怎麼用。

/**
 * 訂單支付成功通知
 * @Author 公衆号:架構師修行錄
 */
@KafkaListener(topics = "oms.orderPaySuccess", groupId = "fms")
public void orderPaySuccess(@Payload OrderPaySuccessEvent orderPaySuccessEvent) {
  	// 已經自動解析Kafka消息為orderPaySuccessEvent參數
	
  	// TODO 完成解析成功後的業務操作
}           

怎麼樣,代碼是不是比原來簡潔多了,香不香!

如果你對技術有追求,不想一直寫業務代碼,不妨把項目中所有需要手動解析參數的地方,替換成自定義方法參數解析器來實作~

本文作者:Jensen

八年Java老兵,曾涉獵航空、電信、IoT、垂直電商、直播互動遊戲産品研發,現就職于廣州某知名電商企業,财務資金團隊負責人。

小米主題設計師、手機輸入法設計師、雙鍵五筆創始人、ProcessOn特邀講師。

技術公衆号【架構師修行錄】号主,專注于分享程式員日常/架構技術/職場幹貨,關注回複“職場”馬上升職加薪。