天天看點

基于activemq的分布式事務---本地事件表 + 消息隊列一、使用者服務積分服務:

目前常見的解決分布式 事務問題的方案有: 兩階段送出 C2PC)、補償事務 CTCC)、本地事件表加消息隊列、 MQ 事務 消息等

以新增使用者并為該使用者設定新增使用者積分的例子,講解:基于activemq的分布式事務---本地事件表 + 消息隊列

技術棧:springboot2.x + mybatis + activemq

此案例有2個微服務:使用者服務、積分服務

一、使用者服務

dao層:

@Repository
@Component
public interface IEventDao {

    Integer insert(@Param("reqDto") Event event);

    Integer  upate(@Param("reqDto") Event event);

    List<Event> getEvents(@Param("reqDto") Event event);

}      
@Repository
@Component
public interface IPointDao {

    Integer insert(@Param("reqDto")Point point);
}      

model:

public class Event implements Serializable {

    private static final long serialVersionUID = 7424354435546253870L;

    private Integer id;

    private String type;

    private String process;

    private String content;

    private Date createTime;

    private Date updateTime;

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public String getProcess() {
        return process;
    }

    public void setProcess(String process) {
        this.process = process;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    public Date getCreateTime() {
        return createTime;
    }

    public void setCreateTime(Date createTime) {
        this.createTime = createTime;
    }

    public Date getUpdateTime() {
        return updateTime;
    }

    public void setUpdateTime(Date updateTime) {
        this.updateTime = updateTime;
    }
}
      
public class Point implements Serializable {

    private static final long serialVersionUID = -6462063974099294937L;

    private String id;

    private String userId;

    private Integer amount;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public Integer getAmount() {
        return amount;
    }

    public void setAmount(Integer amount) {
        this.amount = amount;
    }
}
      
public class User implements Serializable {

    private static final long serialVersionUID = 8243391387720968609L;

    private String id;

    private String userName;

    public User(String id, String userName) {
        this.id = id;
        this.userName = userName;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }
}      

service層:

public class UserEventService {

    @Autowired
    private IEventDao iEventDao;

    @Resource
    private JmsTemplate jmsTemplate;



    public int newEvent(Event event) {
        if (event != null) {
            return iEventDao.insert(event);
        } else {
            throw new BusinessException("入參不能為空!");
        }
    }

    public List<Event> getNewEventList() {
        Event event = new Event();
        event.setProcess(EventProcess.NEW.getValue());
        return iEventDao.getEvents(event);
    }

    public void executeEvent(Event event) {
        if (event != null) {
            String eventProcess = event.getProcess();
            if ((EventProcess.NEW.getValue().equals(eventProcess))
                    && (EventType.NEW_USER.getValue().equals(event.getType()))) {
                jmsTemplate.convertAndSend("sms",event.getContent());

                event.setProcess(EventProcess.PUBLISHED.getValue());
                iEventDao.upate(event);
            }
        }
    }

}
      
@Service
public class UserService {

    @Resource
    private IUserDao userDao;

    @Resource
    private UserEventService userEventService;

    @Transactional
    public void newUser(String userName, Integer pointAmount) {
       String id =  UUID.randomUUID().toString().replaceAll("-","");
        User user = new User(id,userName);
        // 1.儲存使用者
         userDao.insert(user);

        // 2.新增事件
        Event event = new Event();
        event.setType(EventType.NEW_USER.getValue());
        event.setProcess(EventProcess.NEW.getValue());

        Point point = new Point();
        point.setUserId(id);
        point.setAmount(pointAmount);
        // 将對象轉成 json 字元串存到事件的内容字段中
        event.setContent(JSON.toJSONString(point));
        userEventService.newEvent(event);
    }

}
定時任務:有新增使用者,就向隊列塞增加積分資料      
@Component
public class UserScheduled {

    @Autowired
    private UserEventService userEventService;

    @Scheduled(cron = "*/5 * * * * *")
    public void executeEvent() {
        List<Event> eventList = userEventService.getNewEventList();
        if (!CollectionUtils.isEmpty(eventList)) {
            System.out.println("建立使用者的事件記錄總數:" + eventList.size());

            for (Event event : eventList) {
                userEventService.executeEvent(event);
            }
        } else {
            System.out.println("待處理的事件總數:0");
        }

    }
}      

測試:

@EnableJms
@MapperScan("com.allen.activemq.user.dao")
@EnableAsync
@EnableScheduling
@SpringBootApplication
public class UserApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext applicationContext = SpringApplication.run(UserApplication.class, args);
        UserService userService = (UserService) applicationContext.getBean("userService");
        userService.newUser("測試", 1500);
    }
}      

配置類:

server.port=8080
spring.datasource.url=jdbc:mysql://localhost:3306/activemq-db1?Unicode=true&characterEncoding=UTF-8&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=root1111

#activemq
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.pool.enabled=true
spring.activemq.packages.trust-all=true
spring.jms.pub-sub-domain=true
spring.activemq.pool.max-connections=100


#mybatis
mybatis.mapper-locations=classpath:/mybatis-mapper/*Mapper.xml      

積分服務:

dao層:

@Repository
public interface IEventDao {

    Integer insert(@Param("reqDto") Event event);

    Integer update(@Param("reqDto") Event event);

    List<Event> getEvents(@Param("reqDto") Event event);

}      
@Repository
@Component
public interface IPointDao {

    Integer insert(@Param("reqDto")Point point);
}      

監聽:sms 消息隊列有人塞資料,就會進入onMessage,然後入庫t_event表

import com.allen.activemq.pointer.enums.EventProcess;
import com.allen.activemq.pointer.enums.EventType;
import com.allen.activemq.pointer.model.Event;
import com.allen.activemq.pointer.service.PointEventService;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import javax.annotation.Resource;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;

@Component
public class PointMessageListener{

    @Resource
    private PointEventService pointEventService;

    @JmsListener(destination = "sms")
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                TextMessage txtMsg = (TextMessage) message;
                String eventContent = txtMsg.getText();
                System.out.println("隊列監聽器接收到文本消息:" + eventContent);

                if (!StringUtils.isEmpty(eventContent)) {
                    // 新增事件
                    Event event = new Event();
                    event.setType(EventType.NEW_POINT.getValue());
                    event.setProcess(EventProcess.PUBLISHED.getValue());
                    event.setContent(eventContent);
                    pointEventService.newEvent(event);
                }
            } catch (JMSException e) {
                //業務補償消息
                e.printStackTrace();
            }
        } else {
            throw new IllegalArgumentException("隻支援 TextMessage 類型消息!");
        }
    }
}      

服務層:

@Service
public class PointEventService {

    @Resource
    private IEventDao iEventDao;

    @Resource
    private PointService pointService;

    public int newEvent(Event event) {
        if (event != null) {
            return iEventDao.insert(event);
        } else {
            throw new BusinessException("入參不能為空!");
        }
    }

    public List<Event> getPublishedEventList() {
        Event event = new Event();
        event.setProcess(EventProcess.PUBLISHED.getValue());
        return iEventDao.getEvents(event);
    }

    public void executeEvent(Event event) {
        if (event != null) {
            String eventProcess = event.getProcess();
            if ((EventProcess.PUBLISHED.getValue().equals(eventProcess))
                    && (EventType.NEW_POINT.getValue().equals(event.getType()))) {
                Point point = JSON.parseObject(event.getContent(), Point.class);
                pointService.newPoint(point);

                event.setProcess(EventProcess.PROCESSED.getValue());

                iEventDao.update(event);
            }
        }
    }
}      
@Service
public class PointService {

    @Resource
    private IPointDao iPointDao;

    public String newPoint(Point point){
        if(null != point){
            iPointDao.insert(point);
        }else {
            throw new BusinessException("入參不能為空!");
        }
        return point.getId();

    }
}      

定時任務:定時檢測t_event是否有NEW類型的資料,有為其增加積分

@Component
public class PointScheduled {

    @Autowired
    private PointEventService pointEventService;

    /**
     *
     */

    @Scheduled(cron = "*/5 * * * * *")
    public void executeEvent() throws InterruptedException {
        List<Event> eventList = pointEventService.getPublishedEventList();
        if (!CollectionUtils.isEmpty(eventList)) {
            System.out.println("已釋出的積分事件記錄總數:" + eventList.size());

            for (Event event : eventList) {
                pointEventService.executeEvent(event);
            }
        } else {
            System.out.println("待處理的事件總數:0" + new Date());
        }
        //測試上個任務未結束是否開始新的任務:結果上個任務未結束,不會開啟新的任務
        TimeUnit.SECONDS.sleep(5);

    }
}      

源碼:

https://download.csdn.net/download/zhuhaoyu6666/12027904

繼續閱讀