天天看點

Java微服務-基于Spring事件驅動模式實作業務解耦最佳實戰

作者:架構淺水灣

目錄

事件驅動模式

  舉個例子

  事件驅動模式原理介紹

代碼實作

  1. 定義基本元素

  2. 其他元件

  3. 監聽器實作

  4. 測試類

  項目代碼結構

  調用接口

事件驅動模式

舉個例子

大部分軟體或者APP都有會有會員系統,當我們注冊為會員時,商家一般會把我們拉入會員群、給我們發優惠券、推送歡迎語什麼的。

Java微服務-基于Spring事件驅動模式實作業務解耦最佳實戰

值得注意的是:

注冊成功後才會産生後面的這些動作;注冊成功後的這些動作沒有先後執行順序之分;注冊成功後的這些動作的執行結果不能互相影響;

傳統寫法

public Boolean doRegisterVip(){
	//1、注冊會員
	registerVip();
	//2、入會員群
	joinMembershipGroup();
	//3、發優惠券
	issueCoupons();
	//4、推送消息
	sendWelcomeMsg();
}
           

這樣的寫法将所有的動作都耦合在doRegisterVip方法中,首先執行效率低下,其次耦合度太高,最後不好擴充。那麼如何優化這種邏輯呢?

事件驅動模式原理介紹

Spring的事件驅動模型由三部分組成:

事件:使用者可自定義事件類和相關屬性及行為來表述事件特征,Spring4.2之後定義事件不需要再顯式繼承ApplicationEvent類,直接定義一個bean即可,Spring會自動通過PayloadApplicationEvent來包裝事件。

事件釋出者:在Spring中可通過ApplicationEventPublisher把事件釋出出去,這樣事件内容就可以被監聽者消費處理。

事件監聽者:ApplicationListener,監聽釋出事件,處理事件發生之後的後續操作。

原理圖如下:

Java微服務-基于Spring事件驅動模式實作業務解耦最佳實戰

代碼實作

1. 定義基本元素

事件釋出者:EventEngine.java、EventEngineImpl.java

package com.example.event.config;

/**
 * 事件引擎
 */
public interface EventEngine {

    /**
     * 發送事件
     *
     * @param event 事件
     */
    void publishEvent(BizEvent event);
}
           
package com.example.event.config;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;

import org.springframework.util.CollectionUtils;

/**
 * 事件引擎實作類
 */
public class EventEngineImpl implements EventEngine {

    /**
     * 異步執行器。也系統需要自行定義線程池
     */
    private Executor bizListenerExecutor;

    /**
     * 是否異步,預設為false
     */
    private boolean async;

    /**
     * 訂閱端 KEY是TOPIC,VALUES是監聽器集合
     */
    private Map<String, List<BizEventListener>> bizSubscribers = new HashMap<>(16);

    @Override
    public void publishEvent(BizEvent event) {
        List<BizEventListener> listeners = bizSubscribers.get(event.getTopic());
        if (CollectionUtils.isEmpty(listeners)) {
            return;
        }
        for (BizEventListener bizEventListener : listeners) {
            if (bizEventListener.decide(event)) {
                //異步執行的話,放入線程池
                if (async) {
                    bizListenerExecutor.execute(new EventSubscriber(bizEventListener, event));
                } else {
                    bizEventListener.onEvent(event);
                }

            }
        }
    }

    /**
     * Setter method for property <tt>bizListenerExecutor</tt>.
     *
     * @param bizListenerExecutor value to be assigned to property bizListenerExecutor
     */
    public void setBizListenerExecutor(Executor bizListenerExecutor) {
        this.bizListenerExecutor = bizListenerExecutor;
    }

    /**
     * Setter method for property <tt>bizSubscribers</tt>.
     *
     * @param bizSubscribers value to be assigned to property bizSubscribers
     */
    public void setBizSubscribers(Map<String, List<BizEventListener>> bizSubscribers) {
        this.bizSubscribers = bizSubscribers;
    }

    /**
     * Setter method for property <tt>async</tt>.
     *
     * @param async value to be assigned to property async
     */
    public void setAsync(boolean async) {
        this.async = async;
    }
}
           

事件:BizEvent.java

package com.example.event.config;

import java.util.EventObject;

/**
 * 業務事件
 */
public class BizEvent extends EventObject {

    /**
     * Topic
     */
    private final String topic;

    /**
     * 業務id
     */
    private final String bizId;

    /**
     * 資料
     */
    private final Object data;

    /**
     * @param topic 事件topic,用于區分事件類型
     * @param bizId 業務ID,辨別這一次的調用
     * @param data  事件傳輸對象
     */
    public BizEvent(String topic, String bizId, Object data) {
        super(data);
        this.topic = topic;
        this.bizId = bizId;
        this.data = data;
    }

    /**
     * Getter method for property <tt>topic</tt>.
     *
     * @return property value of topic
     */
    public String getTopic() {
        return topic;
    }

    /**
     * Getter method for property <tt>id</tt>.
     *
     * @return property value of id
     */
    public String getBizId() {
        return bizId;
    }

    /**
     * Getter method for property <tt>data</tt>.
     *
     * @return property value of data
     */
    public Object getData() {
        return data;
    }
}
           

事件監聽者:EventSubscriber.java

package com.example.event.config;

/**
 * 事件監聽者。注意:此時已經沒有線程上下文,如果需要請修改構造函數,顯示複制上下文資訊
 */
public class EventSubscriber implements Runnable {

    /**
     * 業務監聽器
     **/
    private BizEventListener bizEventListener;

    /**
     * 業務事件
     */
    private BizEvent bizEvent;

    /**
     * @param bizEventListener 事件監聽者
     * @param bizEvent         事件
     */
    public EventSubscriber(BizEventListener bizEventListener, BizEvent bizEvent) {
        super();
        this.bizEventListener = bizEventListener;
        this.bizEvent = bizEvent;
    }

    @Override
    public void run() {
        bizEventListener.onEvent(bizEvent);
    }
}
           

2. 其他元件

業務事件監聽器:BizEventListener.java

package com.example.event.config;

import java.util.EventListener;

/**
 * 業務事件監聽器
 *
 */
public interface BizEventListener extends EventListener {

    /**
     * 是否執行事件
     *
     * @param event 事件
     * @return
     */
    public boolean decide(BizEvent event);

    /**
     * 執行事件
     *
     * @param event 事件
     */
    public void onEvent(BizEvent event);
}
           

事件引擎topic:EventEngineTopic.java

package com.example.event.config;

/**
 * 事件引擎topic,用于區分事件類型
 */
public class EventEngineTopic {
    /**
     * 入會員群
     */
    public static final String JOIN_MEMBERSHIP_GROUP = "joinMembershipGroup";

    /**
     * 發優惠券
     */
    public static final String ISSUE_COUPONS = "issueCoupons";

    /**
     * 推送消息
     */
    public static final String SEND_WELCOME_MSG = "sendWelcomeMsg";

}
           

3. 監聽器實作

優惠券處理器:CouponsHandlerListener.java

package com.example.event.listener;

import com.example.event.config.BizEvent;
import com.example.event.config.BizEventListener;
import org.springframework.stereotype.Component;

/**
 * 優惠券處理器
 */
@Component
public class CouponsHandlerListener implements BizEventListener {

    @Override
    public boolean decide(BizEvent event) {
        return true;
    }

    @Override
    public void onEvent(BizEvent event) {
        System.out.println("優惠券處理器:十折優惠券已發放");
    }
}
           

會員群處理器:MembershipHandlerListener.java

package com.example.event.listener;

import com.example.event.config.BizEvent;
import com.example.event.config.BizEventListener;
import org.springframework.stereotype.Component;

/**
 * 會員群處理器
 */
@Component
public class MembershipHandlerListener implements BizEventListener {
    @Override
    public boolean decide(BizEvent event) {
        return true;
    }

    @Override
    public void onEvent(BizEvent event) {
        System.out.println("會員群處理器:您已成功加入會員群");
    }
}

           

消息推送處理器:MsgHandlerListener.java

package com.example.event.listener;

import com.example.event.config.BizEvent;
import com.example.event.config.BizEventListener;
import org.springframework.stereotype.Component;

/**
 * 消息推送處理器
 */
@Component
public class MsgHandlerListener implements BizEventListener {

    @Override
    public boolean decide(BizEvent event) {
        return true;
    }

    @Override
    public void onEvent(BizEvent event) {
        System.out.println("消息推送處理器:歡迎成為會員!!!");
    }
}

           

事件驅動引擎配置:EventEngineConfig.java

package com.example.event.listener;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.example.event.config.BizEventListener;
import com.example.event.config.EventEngine;
import com.example.event.config.EventEngineImpl;
import com.example.event.config.EventEngineTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;

/**
 * 事件驅動引擎配置
 */
@Configuration
public class EventEngineConfig {
    /**
     * 線程池異步處理事件
     */
    private static final Executor EXECUTOR = new ThreadPoolExecutor(20, 50, 10, TimeUnit.MINUTES,
        new LinkedBlockingQueue(500), new CustomizableThreadFactory("EVENT_ENGINE_POOL"));

    @Bean("eventEngineJob")
    public EventEngine initJobEngine(CouponsHandlerListener couponsHandlerListener,
        MembershipHandlerListener membershipHandlerListener,
        MsgHandlerListener msgHandlerListener) {
        Map<String, List<BizEventListener>> bizEvenListenerMap = new HashMap<>();
        //注冊優惠券事件
        bizEvenListenerMap.put(EventEngineTopic.ISSUE_COUPONS, Arrays.asList(couponsHandlerListener));
        //注冊會員群事件
        bizEvenListenerMap.put(EventEngineTopic.JOIN_MEMBERSHIP_GROUP, Arrays.asList(membershipHandlerListener));
        //注冊消息推送事件
        bizEvenListenerMap.put(EventEngineTopic.SEND_WELCOME_MSG, Arrays.asList(msgHandlerListener));

        EventEngineImpl eventEngine = new EventEngineImpl();
        eventEngine.setBizSubscribers(bizEvenListenerMap);
        eventEngine.setAsync(true);
        eventEngine.setBizListenerExecutor(EXECUTOR);
        return eventEngine;
    }
}
           

4. 測試類

TestController.java

package com.example.event.controller;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import javax.annotation.Resource;

import com.example.event.config.BizEvent;
import com.example.event.config.EventEngine;
import com.example.event.config.EventEngineTopic;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * 測試
 */
@RestController
@RequestMapping("/test")
public class TestController {

    @Resource(name = "eventEngineJob")
    private EventEngine eventEngine;

    @GetMapping("/doRegisterVip")
    public String doRegisterVip(@RequestParam(required = true) String userName,
        @RequestParam(required = true) Integer age) {
        Map<String, Object> paramMap = new HashMap<>(16);
        paramMap.put("userName", userName);
        paramMap.put("age", age);
        //1、注冊會員,這裡不實作了
        System.out.println("注冊會員成功");
        //2、入會員群
        eventEngine.publishEvent(
            new BizEvent(EventEngineTopic.JOIN_MEMBERSHIP_GROUP, UUID.randomUUID().toString(), paramMap));
        //3、發優惠券
        eventEngine.publishEvent(
            new BizEvent(EventEngineTopic.ISSUE_COUPONS, UUID.randomUUID().toString(), paramMap));
        //4、推送消息
        eventEngine.publishEvent(
            new BizEvent(EventEngineTopic.SEND_WELCOME_MSG, UUID.randomUUID().toString(), paramMap));
        return "注冊會員成功";
    }
}

           

項目代碼結構

Java微服務-基于Spring事件驅動模式實作業務解耦最佳實戰

調用接口

http://localhost:8080/test/doRegisterVip?userName=zhangsan&age=28
Java微服務-基于Spring事件驅動模式實作業務解耦最佳實戰