目前常見的解決分布式 事務問題的方案有: 兩階段送出 C2PC)、補償事務 CTCC)、本地事件表加消息隊列、 MQ 事務 消息等
以新增使用者并為該使用者設定新增使用者積分的例子,講解:基于activemq的分布式事務---本地事件表 + 消息隊列
技術棧:springboot2.x + mybatis + activemq
此案例有2個微服務:使用者服務、積分服務
一、使用者服務
dao層:
@Repository
@Component
public interface IEventDao {
Integer insert(@Param("reqDto") Event event);
Integer upate(@Param("reqDto") Event event);
List<Event> getEvents(@Param("reqDto") Event event);
}
@Repository
@Component
public interface IPointDao {
Integer insert(@Param("reqDto")Point point);
}
model:
public class Event implements Serializable {
private static final long serialVersionUID = 7424354435546253870L;
private Integer id;
private String type;
private String process;
private String content;
private Date createTime;
private Date updateTime;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getProcess() {
return process;
}
public void setProcess(String process) {
this.process = process;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public Date getUpdateTime() {
return updateTime;
}
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
}
public class Point implements Serializable {
private static final long serialVersionUID = -6462063974099294937L;
private String id;
private String userId;
private Integer amount;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public Integer getAmount() {
return amount;
}
public void setAmount(Integer amount) {
this.amount = amount;
}
}
public class User implements Serializable {
private static final long serialVersionUID = 8243391387720968609L;
private String id;
private String userName;
public User(String id, String userName) {
this.id = id;
this.userName = userName;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
}
service層:
public class UserEventService {
@Autowired
private IEventDao iEventDao;
@Resource
private JmsTemplate jmsTemplate;
public int newEvent(Event event) {
if (event != null) {
return iEventDao.insert(event);
} else {
throw new BusinessException("入參不能為空!");
}
}
public List<Event> getNewEventList() {
Event event = new Event();
event.setProcess(EventProcess.NEW.getValue());
return iEventDao.getEvents(event);
}
public void executeEvent(Event event) {
if (event != null) {
String eventProcess = event.getProcess();
if ((EventProcess.NEW.getValue().equals(eventProcess))
&& (EventType.NEW_USER.getValue().equals(event.getType()))) {
jmsTemplate.convertAndSend("sms",event.getContent());
event.setProcess(EventProcess.PUBLISHED.getValue());
iEventDao.upate(event);
}
}
}
}
@Service
public class UserService {
@Resource
private IUserDao userDao;
@Resource
private UserEventService userEventService;
@Transactional
public void newUser(String userName, Integer pointAmount) {
String id = UUID.randomUUID().toString().replaceAll("-","");
User user = new User(id,userName);
// 1.儲存使用者
userDao.insert(user);
// 2.新增事件
Event event = new Event();
event.setType(EventType.NEW_USER.getValue());
event.setProcess(EventProcess.NEW.getValue());
Point point = new Point();
point.setUserId(id);
point.setAmount(pointAmount);
// 将對象轉成 json 字元串存到事件的内容字段中
event.setContent(JSON.toJSONString(point));
userEventService.newEvent(event);
}
}
定時任務:有新增使用者,就向隊列塞增加積分資料
@Component
public class UserScheduled {
@Autowired
private UserEventService userEventService;
@Scheduled(cron = "*/5 * * * * *")
public void executeEvent() {
List<Event> eventList = userEventService.getNewEventList();
if (!CollectionUtils.isEmpty(eventList)) {
System.out.println("建立使用者的事件記錄總數:" + eventList.size());
for (Event event : eventList) {
userEventService.executeEvent(event);
}
} else {
System.out.println("待處理的事件總數:0");
}
}
}
測試:
@EnableJms
@MapperScan("com.allen.activemq.user.dao")
@EnableAsync
@EnableScheduling
@SpringBootApplication
public class UserApplication {
public static void main(String[] args) {
ConfigurableApplicationContext applicationContext = SpringApplication.run(UserApplication.class, args);
UserService userService = (UserService) applicationContext.getBean("userService");
userService.newUser("測試", 1500);
}
}
配置類:
server.port=8080
spring.datasource.url=jdbc:mysql://localhost:3306/activemq-db1?Unicode=true&characterEncoding=UTF-8&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=root1111
#activemq
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.pool.enabled=true
spring.activemq.packages.trust-all=true
spring.jms.pub-sub-domain=true
spring.activemq.pool.max-connections=100
#mybatis
mybatis.mapper-locations=classpath:/mybatis-mapper/*Mapper.xml
積分服務:
dao層:
@Repository
public interface IEventDao {
Integer insert(@Param("reqDto") Event event);
Integer update(@Param("reqDto") Event event);
List<Event> getEvents(@Param("reqDto") Event event);
}
@Repository
@Component
public interface IPointDao {
Integer insert(@Param("reqDto")Point point);
}
監聽:sms 消息隊列有人塞資料,就會進入onMessage,然後入庫t_event表
import com.allen.activemq.pointer.enums.EventProcess;
import com.allen.activemq.pointer.enums.EventType;
import com.allen.activemq.pointer.model.Event;
import com.allen.activemq.pointer.service.PointEventService;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;
@Component
public class PointMessageListener{
@Resource
private PointEventService pointEventService;
@JmsListener(destination = "sms")
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
TextMessage txtMsg = (TextMessage) message;
String eventContent = txtMsg.getText();
System.out.println("隊列監聽器接收到文本消息:" + eventContent);
if (!StringUtils.isEmpty(eventContent)) {
// 新增事件
Event event = new Event();
event.setType(EventType.NEW_POINT.getValue());
event.setProcess(EventProcess.PUBLISHED.getValue());
event.setContent(eventContent);
pointEventService.newEvent(event);
}
} catch (JMSException e) {
//業務補償消息
e.printStackTrace();
}
} else {
throw new IllegalArgumentException("隻支援 TextMessage 類型消息!");
}
}
}
服務層:
@Service
public class PointEventService {
@Resource
private IEventDao iEventDao;
@Resource
private PointService pointService;
public int newEvent(Event event) {
if (event != null) {
return iEventDao.insert(event);
} else {
throw new BusinessException("入參不能為空!");
}
}
public List<Event> getPublishedEventList() {
Event event = new Event();
event.setProcess(EventProcess.PUBLISHED.getValue());
return iEventDao.getEvents(event);
}
public void executeEvent(Event event) {
if (event != null) {
String eventProcess = event.getProcess();
if ((EventProcess.PUBLISHED.getValue().equals(eventProcess))
&& (EventType.NEW_POINT.getValue().equals(event.getType()))) {
Point point = JSON.parseObject(event.getContent(), Point.class);
pointService.newPoint(point);
event.setProcess(EventProcess.PROCESSED.getValue());
iEventDao.update(event);
}
}
}
}
@Service
public class PointService {
@Resource
private IPointDao iPointDao;
public String newPoint(Point point){
if(null != point){
iPointDao.insert(point);
}else {
throw new BusinessException("入參不能為空!");
}
return point.getId();
}
}
定時任務:定時檢測t_event是否有NEW類型的資料,有為其增加積分
@Component
public class PointScheduled {
@Autowired
private PointEventService pointEventService;
/**
*
*/
@Scheduled(cron = "*/5 * * * * *")
public void executeEvent() throws InterruptedException {
List<Event> eventList = pointEventService.getPublishedEventList();
if (!CollectionUtils.isEmpty(eventList)) {
System.out.println("已釋出的積分事件記錄總數:" + eventList.size());
for (Event event : eventList) {
pointEventService.executeEvent(event);
}
} else {
System.out.println("待處理的事件總數:0" + new Date());
}
//測試上個任務未結束是否開始新的任務:結果上個任務未結束,不會開啟新的任務
TimeUnit.SECONDS.sleep(5);
}
}
源碼:
https://download.csdn.net/download/zhuhaoyu6666/12027904