目前常见的解决分布式 事务问题的方案有: 两阶段提交 C2PC)、补偿事务 CTCC)、本地事件表加消息队列、 MQ 事务 消息等
以新增用户并为该用户设置新增用户积分的例子,讲解:基于activemq的分布式事务---本地事件表 + 消息队列
技术栈:springboot2.x + mybatis + activemq
此案例有2个微服务:用户服务、积分服务
一、用户服务
dao层:
@Repository
@Component
public interface IEventDao {
Integer insert(@Param("reqDto") Event event);
Integer upate(@Param("reqDto") Event event);
List<Event> getEvents(@Param("reqDto") Event event);
}
@Repository
@Component
public interface IPointDao {
Integer insert(@Param("reqDto")Point point);
}
model:
public class Event implements Serializable {
private static final long serialVersionUID = 7424354435546253870L;
private Integer id;
private String type;
private String process;
private String content;
private Date createTime;
private Date updateTime;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getProcess() {
return process;
}
public void setProcess(String process) {
this.process = process;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public Date getUpdateTime() {
return updateTime;
}
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
}
public class Point implements Serializable {
private static final long serialVersionUID = -6462063974099294937L;
private String id;
private String userId;
private Integer amount;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public Integer getAmount() {
return amount;
}
public void setAmount(Integer amount) {
this.amount = amount;
}
}
public class User implements Serializable {
private static final long serialVersionUID = 8243391387720968609L;
private String id;
private String userName;
public User(String id, String userName) {
this.id = id;
this.userName = userName;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
}
service层:
public class UserEventService {
@Autowired
private IEventDao iEventDao;
@Resource
private JmsTemplate jmsTemplate;
public int newEvent(Event event) {
if (event != null) {
return iEventDao.insert(event);
} else {
throw new BusinessException("入参不能为空!");
}
}
public List<Event> getNewEventList() {
Event event = new Event();
event.setProcess(EventProcess.NEW.getValue());
return iEventDao.getEvents(event);
}
public void executeEvent(Event event) {
if (event != null) {
String eventProcess = event.getProcess();
if ((EventProcess.NEW.getValue().equals(eventProcess))
&& (EventType.NEW_USER.getValue().equals(event.getType()))) {
jmsTemplate.convertAndSend("sms",event.getContent());
event.setProcess(EventProcess.PUBLISHED.getValue());
iEventDao.upate(event);
}
}
}
}
@Service
public class UserService {
@Resource
private IUserDao userDao;
@Resource
private UserEventService userEventService;
@Transactional
public void newUser(String userName, Integer pointAmount) {
String id = UUID.randomUUID().toString().replaceAll("-","");
User user = new User(id,userName);
// 1.保存用户
userDao.insert(user);
// 2.新增事件
Event event = new Event();
event.setType(EventType.NEW_USER.getValue());
event.setProcess(EventProcess.NEW.getValue());
Point point = new Point();
point.setUserId(id);
point.setAmount(pointAmount);
// 将对象转成 json 字符串存到事件的内容字段中
event.setContent(JSON.toJSONString(point));
userEventService.newEvent(event);
}
}
定时任务:有新增用户,就向队列塞增加积分数据
@Component
public class UserScheduled {
@Autowired
private UserEventService userEventService;
@Scheduled(cron = "*/5 * * * * *")
public void executeEvent() {
List<Event> eventList = userEventService.getNewEventList();
if (!CollectionUtils.isEmpty(eventList)) {
System.out.println("新建用户的事件记录总数:" + eventList.size());
for (Event event : eventList) {
userEventService.executeEvent(event);
}
} else {
System.out.println("待处理的事件总数:0");
}
}
}
测试:
@EnableJms
@MapperScan("com.allen.activemq.user.dao")
@EnableAsync
@EnableScheduling
@SpringBootApplication
public class UserApplication {
public static void main(String[] args) {
ConfigurableApplicationContext applicationContext = SpringApplication.run(UserApplication.class, args);
UserService userService = (UserService) applicationContext.getBean("userService");
userService.newUser("测试", 1500);
}
}
配置类:
server.port=8080
spring.datasource.url=jdbc:mysql://localhost:3306/activemq-db1?Unicode=true&characterEncoding=UTF-8&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=root1111
#activemq
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.pool.enabled=true
spring.activemq.packages.trust-all=true
spring.jms.pub-sub-domain=true
spring.activemq.pool.max-connections=100
#mybatis
mybatis.mapper-locations=classpath:/mybatis-mapper/*Mapper.xml
积分服务:
dao层:
@Repository
public interface IEventDao {
Integer insert(@Param("reqDto") Event event);
Integer update(@Param("reqDto") Event event);
List<Event> getEvents(@Param("reqDto") Event event);
}
@Repository
@Component
public interface IPointDao {
Integer insert(@Param("reqDto")Point point);
}
监听:sms 消息队列有人塞数据,就会进入onMessage,然后入库t_event表
import com.allen.activemq.pointer.enums.EventProcess;
import com.allen.activemq.pointer.enums.EventType;
import com.allen.activemq.pointer.model.Event;
import com.allen.activemq.pointer.service.PointEventService;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;
@Component
public class PointMessageListener{
@Resource
private PointEventService pointEventService;
@JmsListener(destination = "sms")
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
TextMessage txtMsg = (TextMessage) message;
String eventContent = txtMsg.getText();
System.out.println("队列监听器接收到文本消息:" + eventContent);
if (!StringUtils.isEmpty(eventContent)) {
// 新增事件
Event event = new Event();
event.setType(EventType.NEW_POINT.getValue());
event.setProcess(EventProcess.PUBLISHED.getValue());
event.setContent(eventContent);
pointEventService.newEvent(event);
}
} catch (JMSException e) {
//业务补偿消息
e.printStackTrace();
}
} else {
throw new IllegalArgumentException("只支持 TextMessage 类型消息!");
}
}
}
服务层:
@Service
public class PointEventService {
@Resource
private IEventDao iEventDao;
@Resource
private PointService pointService;
public int newEvent(Event event) {
if (event != null) {
return iEventDao.insert(event);
} else {
throw new BusinessException("入参不能为空!");
}
}
public List<Event> getPublishedEventList() {
Event event = new Event();
event.setProcess(EventProcess.PUBLISHED.getValue());
return iEventDao.getEvents(event);
}
public void executeEvent(Event event) {
if (event != null) {
String eventProcess = event.getProcess();
if ((EventProcess.PUBLISHED.getValue().equals(eventProcess))
&& (EventType.NEW_POINT.getValue().equals(event.getType()))) {
Point point = JSON.parseObject(event.getContent(), Point.class);
pointService.newPoint(point);
event.setProcess(EventProcess.PROCESSED.getValue());
iEventDao.update(event);
}
}
}
}
@Service
public class PointService {
@Resource
private IPointDao iPointDao;
public String newPoint(Point point){
if(null != point){
iPointDao.insert(point);
}else {
throw new BusinessException("入参不能为空!");
}
return point.getId();
}
}
定时任务:定时检测t_event是否有NEW类型的数据,有为其增加积分
@Component
public class PointScheduled {
@Autowired
private PointEventService pointEventService;
/**
*
*/
@Scheduled(cron = "*/5 * * * * *")
public void executeEvent() throws InterruptedException {
List<Event> eventList = pointEventService.getPublishedEventList();
if (!CollectionUtils.isEmpty(eventList)) {
System.out.println("已发布的积分事件记录总数:" + eventList.size());
for (Event event : eventList) {
pointEventService.executeEvent(event);
}
} else {
System.out.println("待处理的事件总数:0" + new Date());
}
//测试上个任务未结束是否开始新的任务:结果上个任务未结束,不会开启新的任务
TimeUnit.SECONDS.sleep(5);
}
}
源码:
https://download.csdn.net/download/zhuhaoyu6666/12027904