天天看点

基于activemq的分布式事务---本地事件表 + 消息队列一、用户服务积分服务:

目前常见的解决分布式 事务问题的方案有: 两阶段提交 C2PC)、补偿事务 CTCC)、本地事件表加消息队列、 MQ 事务 消息等

以新增用户并为该用户设置新增用户积分的例子,讲解:基于activemq的分布式事务---本地事件表 + 消息队列

技术栈:springboot2.x + mybatis + activemq

此案例有2个微服务:用户服务、积分服务

一、用户服务

dao层:

@Repository
@Component
public interface IEventDao {

    Integer insert(@Param("reqDto") Event event);

    Integer  upate(@Param("reqDto") Event event);

    List<Event> getEvents(@Param("reqDto") Event event);

}      
@Repository
@Component
public interface IPointDao {

    Integer insert(@Param("reqDto")Point point);
}      

model:

public class Event implements Serializable {

    private static final long serialVersionUID = 7424354435546253870L;

    private Integer id;

    private String type;

    private String process;

    private String content;

    private Date createTime;

    private Date updateTime;

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public String getProcess() {
        return process;
    }

    public void setProcess(String process) {
        this.process = process;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    public Date getCreateTime() {
        return createTime;
    }

    public void setCreateTime(Date createTime) {
        this.createTime = createTime;
    }

    public Date getUpdateTime() {
        return updateTime;
    }

    public void setUpdateTime(Date updateTime) {
        this.updateTime = updateTime;
    }
}
      
public class Point implements Serializable {

    private static final long serialVersionUID = -6462063974099294937L;

    private String id;

    private String userId;

    private Integer amount;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public Integer getAmount() {
        return amount;
    }

    public void setAmount(Integer amount) {
        this.amount = amount;
    }
}
      
public class User implements Serializable {

    private static final long serialVersionUID = 8243391387720968609L;

    private String id;

    private String userName;

    public User(String id, String userName) {
        this.id = id;
        this.userName = userName;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }
}      

service层:

public class UserEventService {

    @Autowired
    private IEventDao iEventDao;

    @Resource
    private JmsTemplate jmsTemplate;



    public int newEvent(Event event) {
        if (event != null) {
            return iEventDao.insert(event);
        } else {
            throw new BusinessException("入参不能为空!");
        }
    }

    public List<Event> getNewEventList() {
        Event event = new Event();
        event.setProcess(EventProcess.NEW.getValue());
        return iEventDao.getEvents(event);
    }

    public void executeEvent(Event event) {
        if (event != null) {
            String eventProcess = event.getProcess();
            if ((EventProcess.NEW.getValue().equals(eventProcess))
                    && (EventType.NEW_USER.getValue().equals(event.getType()))) {
                jmsTemplate.convertAndSend("sms",event.getContent());

                event.setProcess(EventProcess.PUBLISHED.getValue());
                iEventDao.upate(event);
            }
        }
    }

}
      
@Service
public class UserService {

    @Resource
    private IUserDao userDao;

    @Resource
    private UserEventService userEventService;

    @Transactional
    public void newUser(String userName, Integer pointAmount) {
       String id =  UUID.randomUUID().toString().replaceAll("-","");
        User user = new User(id,userName);
        // 1.保存用户
         userDao.insert(user);

        // 2.新增事件
        Event event = new Event();
        event.setType(EventType.NEW_USER.getValue());
        event.setProcess(EventProcess.NEW.getValue());

        Point point = new Point();
        point.setUserId(id);
        point.setAmount(pointAmount);
        // 将对象转成 json 字符串存到事件的内容字段中
        event.setContent(JSON.toJSONString(point));
        userEventService.newEvent(event);
    }

}
定时任务:有新增用户,就向队列塞增加积分数据      
@Component
public class UserScheduled {

    @Autowired
    private UserEventService userEventService;

    @Scheduled(cron = "*/5 * * * * *")
    public void executeEvent() {
        List<Event> eventList = userEventService.getNewEventList();
        if (!CollectionUtils.isEmpty(eventList)) {
            System.out.println("新建用户的事件记录总数:" + eventList.size());

            for (Event event : eventList) {
                userEventService.executeEvent(event);
            }
        } else {
            System.out.println("待处理的事件总数:0");
        }

    }
}      

测试:

@EnableJms
@MapperScan("com.allen.activemq.user.dao")
@EnableAsync
@EnableScheduling
@SpringBootApplication
public class UserApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext applicationContext = SpringApplication.run(UserApplication.class, args);
        UserService userService = (UserService) applicationContext.getBean("userService");
        userService.newUser("测试", 1500);
    }
}      

配置类:

server.port=8080
spring.datasource.url=jdbc:mysql://localhost:3306/activemq-db1?Unicode=true&characterEncoding=UTF-8&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=root1111

#activemq
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.pool.enabled=true
spring.activemq.packages.trust-all=true
spring.jms.pub-sub-domain=true
spring.activemq.pool.max-connections=100


#mybatis
mybatis.mapper-locations=classpath:/mybatis-mapper/*Mapper.xml      

积分服务:

dao层:

@Repository
public interface IEventDao {

    Integer insert(@Param("reqDto") Event event);

    Integer update(@Param("reqDto") Event event);

    List<Event> getEvents(@Param("reqDto") Event event);

}      
@Repository
@Component
public interface IPointDao {

    Integer insert(@Param("reqDto")Point point);
}      

监听:sms 消息队列有人塞数据,就会进入onMessage,然后入库t_event表

import com.allen.activemq.pointer.enums.EventProcess;
import com.allen.activemq.pointer.enums.EventType;
import com.allen.activemq.pointer.model.Event;
import com.allen.activemq.pointer.service.PointEventService;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import javax.annotation.Resource;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;

@Component
public class PointMessageListener{

    @Resource
    private PointEventService pointEventService;

    @JmsListener(destination = "sms")
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                TextMessage txtMsg = (TextMessage) message;
                String eventContent = txtMsg.getText();
                System.out.println("队列监听器接收到文本消息:" + eventContent);

                if (!StringUtils.isEmpty(eventContent)) {
                    // 新增事件
                    Event event = new Event();
                    event.setType(EventType.NEW_POINT.getValue());
                    event.setProcess(EventProcess.PUBLISHED.getValue());
                    event.setContent(eventContent);
                    pointEventService.newEvent(event);
                }
            } catch (JMSException e) {
                //业务补偿消息
                e.printStackTrace();
            }
        } else {
            throw new IllegalArgumentException("只支持 TextMessage 类型消息!");
        }
    }
}      

服务层:

@Service
public class PointEventService {

    @Resource
    private IEventDao iEventDao;

    @Resource
    private PointService pointService;

    public int newEvent(Event event) {
        if (event != null) {
            return iEventDao.insert(event);
        } else {
            throw new BusinessException("入参不能为空!");
        }
    }

    public List<Event> getPublishedEventList() {
        Event event = new Event();
        event.setProcess(EventProcess.PUBLISHED.getValue());
        return iEventDao.getEvents(event);
    }

    public void executeEvent(Event event) {
        if (event != null) {
            String eventProcess = event.getProcess();
            if ((EventProcess.PUBLISHED.getValue().equals(eventProcess))
                    && (EventType.NEW_POINT.getValue().equals(event.getType()))) {
                Point point = JSON.parseObject(event.getContent(), Point.class);
                pointService.newPoint(point);

                event.setProcess(EventProcess.PROCESSED.getValue());

                iEventDao.update(event);
            }
        }
    }
}      
@Service
public class PointService {

    @Resource
    private IPointDao iPointDao;

    public String newPoint(Point point){
        if(null != point){
            iPointDao.insert(point);
        }else {
            throw new BusinessException("入参不能为空!");
        }
        return point.getId();

    }
}      

定时任务:定时检测t_event是否有NEW类型的数据,有为其增加积分

@Component
public class PointScheduled {

    @Autowired
    private PointEventService pointEventService;

    /**
     *
     */

    @Scheduled(cron = "*/5 * * * * *")
    public void executeEvent() throws InterruptedException {
        List<Event> eventList = pointEventService.getPublishedEventList();
        if (!CollectionUtils.isEmpty(eventList)) {
            System.out.println("已发布的积分事件记录总数:" + eventList.size());

            for (Event event : eventList) {
                pointEventService.executeEvent(event);
            }
        } else {
            System.out.println("待处理的事件总数:0" + new Date());
        }
        //测试上个任务未结束是否开始新的任务:结果上个任务未结束,不会开启新的任务
        TimeUnit.SECONDS.sleep(5);

    }
}      

源码:

https://download.csdn.net/download/zhuhaoyu6666/12027904

继续阅读