1.1 ä»ä¹æ¯ç¶æ
å æ¥è§£éä»ä¹æ¯âç¶æâï¼ State ï¼ãç°å®äºç©æ¯æä¸åç¶æçï¼ä¾å¦ä¸ä¸ªèªå¨é¨ï¼å°±æ open å closed 两ç§ç¶æãæ们é常æ说çç¶ææºæ¯æéç¶ææºï¼ä¹å°±æ¯è¢«æè¿°çäºç©çç¶æçæ°éæ¯æé个ï¼ä¾å¦èªå¨é¨çç¶æå°±æ¯ä¸¤ä¸ª open å closed ã
ç¶ææºï¼ä¹å°±æ¯ State Machine ï¼ä¸æ¯æä¸å°å®é æºå¨ï¼èæ¯æä¸ä¸ªæ°å¦æ¨¡åã说ç½äºï¼ä¸è¬å°±æ¯æä¸å¼ ç¶æ转æ¢å¾ãä¾å¦ï¼æ ¹æ®èªå¨é¨çè¿è¡è§åï¼æ们å¯ä»¥æ½è±¡åºä¸é¢è¿ä¹ä¸ä¸ªå¾ã
èªå¨é¨æ两个ç¶æï¼open å closed ï¼closed ç¶æä¸ï¼å¦æ读åå¼é¨ä¿¡å·ï¼é£ä¹ç¶æå°±ä¼åæ¢ä¸º open ãopen ç¶æä¸å¦æ读åå ³é¨ä¿¡å·ï¼ç¶æå°±ä¼åæ¢ä¸º closed ã
ç¶ææºçå ¨ç§°æ¯æéç¶æèªå¨æºï¼èªå¨ä¸¤ä¸ªåä¹æ¯å å«éè¦å«ä¹çãç»å®ä¸ä¸ªç¶ææºï¼åæ¶ç»å®å®çå½åç¶æ以åè¾å ¥ï¼é£ä¹è¾åºç¶ææ¶å¯ä»¥æç¡®çè¿ç®åºæ¥çãä¾å¦å¯¹äºèªå¨é¨ï¼ç»å®åå§ç¶æ closed ï¼ç»å®è¾å ¥âå¼é¨âï¼é£ä¹ä¸ä¸ä¸ªç¶ææ¶å¯ä»¥è¿ç®åºæ¥çã
è¿æ ·ç¶ææºçåºæ¬å®ä¹æ们就ä»ç»å®æ¯äºãéå¤ä¸ä¸ï¼ç¶ææºæ¯æéç¶æèªå¨æºçç®ç§°ï¼æ¯ç°å®äºç©è¿è¡è§åæ½è±¡èæçä¸ä¸ªæ°å¦æ¨¡åã
1.2 å大æ¦å¿µ
ä¸é¢æ¥ç»åºç¶ææºçå大æ¦å¿µã
- 第ä¸ä¸ªæ¯ State ï¼ç¶æãä¸ä¸ªç¶ææºè³å°è¦å å«ä¸¤ä¸ªç¶æãä¾å¦ä¸é¢èªå¨é¨çä¾åï¼æ open å closed 两个ç¶æã
- 第äºä¸ªæ¯ Event ï¼äºä»¶ãäºä»¶å°±æ¯æ§è¡æ个æä½ç触åæ¡ä»¶æè å£ä»¤ã对äºèªå¨é¨ï¼âæä¸å¼é¨æé®âå°±æ¯ä¸ä¸ªäºä»¶ã
- 第ä¸ä¸ªæ¯ Action ï¼å¨ä½ãäºä»¶åç以åè¦æ§è¡å¨ä½ãä¾å¦äºä»¶æ¯âæå¼é¨æé®âï¼å¨ä½æ¯âå¼é¨âãç¼ç¨çæ¶åï¼ä¸ä¸ª Actionä¸è¬å°±å¯¹åºä¸ä¸ªå½æ°ã
- 第åä¸ªæ¯ Transition ï¼åæ¢ãä¹å°±æ¯ä»ä¸ä¸ªç¶æåå为å¦ä¸ä¸ªç¶æãä¾å¦âå¼é¨è¿ç¨âå°±æ¯ä¸ä¸ªåæ¢ã
1.3 ç¶ææº
æéç¶ææºï¼Finite-state machine,FSMï¼ï¼å称æéç¶æèªå¨æºï¼ç®ç§°ç¶ææºï¼æ¯è¡¨ç¤ºæé个ç¶æ以åå¨è¿äºç¶æä¹é´ç转移åå¨ä½çè¡ä¸ºçæ°å¦æ¨¡åã
FSMæ¯ä¸ç§ç®æ³ææ³ï¼ç®åèè¨ï¼æéç¶ææºç±ä¸ç»ç¶æãä¸ä¸ªåå§ç¶æãè¾å ¥åæ ¹æ®è¾å ¥åç°æç¶æ转æ¢ä¸ºä¸ä¸ä¸ªç¶æç转æ¢å½æ°ç»æã
å ¶ä½ç¨ä¸»è¦æ¯æ述对象å¨å®ççå½å¨æå æç»åçç¶æåºåï¼ä»¥åå¦ä½ååºæ¥èªå¤ççåç§äºä»¶ã
2ãç¶ææºå¾
åéæ±æ¶ï¼éè¦äºè§£ä»¥ä¸å ç§å ç´ ï¼èµ·å§ãç»æ¢ãç°æã次æï¼ç®æ ç¶æï¼ãå¨ä½ãæ¡ä»¶ï¼æ们就å¯ä»¥å®æä¸ä¸ªç¶ææºå¾äºï¼
以订å为ä¾ï¼ä»¥ä»å¾ æ¯ä»ç¶æ转æ¢ä¸ºå¾ åè´§ç¶æ为ä¾
- â ç°æï¼æ¯æå½åæå¤çç¶æãå¾ æ¯ä»
- â¡æ¡ä»¶ï¼å称为âäºä»¶âï¼å½ä¸ä¸ªæ¡ä»¶è¢«æ»¡è¶³ï¼å°ä¼è§¦åä¸ä¸ªå¨ä½ï¼æè æ§è¡ä¸æ¬¡ç¶æçè¿ç§»ãæ¯ä»äºä»¶
- â¢å¨ä½ï¼æ¡ä»¶æ»¡è¶³åæ§è¡çå¨ä½ãå¨ä½æ§è¡å®æ¯åï¼å¯ä»¥è¿ç§»å°æ°çç¶æï¼ä¹å¯ä»¥ä»æ§ä¿æåç¶æãå¨ä½ä¸æ¯å¿ éçï¼å½æ¡ä»¶æ»¡è¶³åï¼ä¹å¯ä»¥ä¸æ§è¡ä»»ä½å¨ä½ï¼ç´æ¥è¿ç§»å°æ°ç¶æãç¶æ转æ¢ä¸ºå¾ åè´§
- â£æ¬¡æï¼æ¡ä»¶æ»¡è¶³åè¦è¿å¾çæ°ç¶æãâ次æâæ¯ç¸å¯¹äºâç°æâèè¨çï¼â次æâä¸æ¦è¢«æ¿æ´»ï¼å°±è½¬åææ°çâç°æâäºãå¾ åè´§ 注æäºé¡¹
1ãé¿å ææ个âç¨åºå¨ä½âå½ä½æ¯ä¸ç§âç¶æâæ¥å¤çãé£ä¹å¦ä½åºåâå¨ä½âåâç¶æâï¼âå¨ä½âæ¯ä¸ç¨³å®çï¼å³ä½¿æ²¡ææ¡ä»¶ç触åï¼âå¨ä½âä¸æ¦æ§è¡å®æ¯å°±ç»æäºï¼èâç¶æâæ¯ç¸å¯¹ç¨³å®çï¼å¦æ没æå¤é¨æ¡ä»¶ç触åï¼ä¸ä¸ªç¶æä¼ä¸ç´æç»ä¸å»ã
2ãç¶æååæ¶æ¼æä¸äºç¶æï¼å¯¼è´è·³è½¬é»è¾ä¸å®æ´ãæ以å¨è®¾è®¡ç¶ææºæ¶ï¼æ们éè¦åå¤çæ¥ç设计çç¶æå¾æè ç¶æ表ï¼æç»è¾¾å°ä¸ç§ç¢ä¸å¯ç ´ç设计æ¹æ¡ã
3ãspring statemachine
3.1 ç¶ææºspring statemachine æ¦è¿°
Spring Statemachineæ¯åºç¨ç¨åºå¼å人åå¨Springåºç¨ç¨åºä¸ä½¿ç¨ç¶ææºæ¦å¿µçæ¡æ¶
Spring Statemachineæ¨å¨æä¾ä»¥ä¸åè½ï¼
- æäºä½¿ç¨çæå¹³å级ç¶ææºï¼ç¨äºç®åç使ç¨æ¡ä¾ã
- åå±ç¶ææºç»æï¼ä»¥ç®åå¤æçç¶æé ç½®ã
- ç¶ææºåºåæä¾æ´å¤æçç¶æé ç½®ã
- 使ç¨è§¦åå¨ï¼è½¬æ¢ï¼è¦å«åæä½ã
- é®å ¥å®å ¨é ç½®éé å¨ã
- çæå¨æ¨¡å¼ï¼ç¨äºå¨Spring Applicationä¸ä¸æä¹å¤ä½¿ç¨çç®åå®ä¾åé常ç¨ä¾çé£è°±
- åºäºZookeeperçåå¸å¼ç¶ææº
- ç¶ææºäºä»¶çå¬å¨ã
- UML Eclipse Papyrus建模ã
- å°è®¡ç®æºé ç½®åå¨å¨æ°¸ä¹ åå¨ä¸ã
- Spring IOCéæå°beanä¸ç¶ææºå ³èèµ·æ¥ã
ç¶ææºåè½å¼ºå¤§ï¼å 为è¡ä¸ºå§ç»ä¿è¯ä¸è´ï¼ä½¿è°è¯ç¸å¯¹å®¹æãè¿æ¯å 为æä½è§åæ¯å¨æºå¨å¯å¨æ¶åæçãè¿ä¸ªæ³æ³æ¯ä½ çåºç¨ç¨åºå¯è½åå¨äºæéæ°éçç¶æä¸ï¼æäºé¢å®ä¹ç触åå¨å¯ä»¥å°ä½ çåºç¨ç¨åºä»ä¸ä¸ªç¶æ转移å°å¦ä¸ä¸ªç¶æãæ¤ç±»è§¦åå¨å¯ä»¥åºäºäºä»¶æ计æ¶å¨ã
å¨åºç¨ç¨åºä¹å¤å®ä¹é«çº§é»è¾ç¶åä¾é ç¶ææºæ¥ç®¡çç¶æè¦å®¹æå¾å¤ãæ¨å¯ä»¥éè¿åéäºä»¶ï¼ä¾¦å¬æ´æ¹æä» è¯·æ±å½åç¶ææ¥ä¸ç¶ææºè¿è¡äº¤äºã
3.2 å¿«éå¼å§
以订åç¶ææ转çä¾å为ä¾ï¼
表ç»æ设计å¦ä¸ï¼
CREATE TABLE `tb_order` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主é®ID',
`order_code` varchar(128) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '订åç¼ç ',
`status` smallint(3) DEFAULT NULL COMMENT '订åç¶æ',
`name` varchar(64) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '订åå称',
`price` decimal(12,2) DEFAULT NULL COMMENT 'ä»·æ ¼',
`delete_flag` tinyint(2) NOT NULL DEFAULT '0' COMMENT 'å é¤æ è®°ï¼0æªå é¤ 1å·²å é¤',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'å建æ¶é´',
`update_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT 'æ´æ°æ¶é´',
`create_user_code` varchar(32) COLLATE utf8mb4_bin DEFAULT NULL COMMENT 'å建人',
`update_user_code` varchar(32) COLLATE utf8mb4_bin DEFAULT NULL COMMENT 'æ´æ°äºº',
`version` int(11) NOT NULL DEFAULT '0' COMMENT 'çæ¬å·',
`remark` varchar(64) COLLATE utf8mb4_bin DEFAULT NULL COMMENT 'å¤æ³¨',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT='订å表';
/*Data for the table `tb_order` */
insert into `tb_order`(`id`,`order_code`,`status`,`name`,`price`,`delete_flag`,`create_time`,`update_time`,`create_user_code`,`update_user_code`,`version`,`remark`) values
(2,'A111',1,'A','22.00',0,'2022-10-15 16:14:11','2022-10-02 21:29:14','zhangsan','zhangsan',0,NULL),
(3,'A111',1,'订åA','22.00',0,'2022-10-02 21:53:13','2022-10-02 21:29:14','zhangsan','zhangsan',0,NULL),
(4,'A111',1,'订åA','22.00',0,'2022-10-02 21:53:13','2022-10-02 21:29:14','zhangsan','zhangsan',0,NULL),
(5,'A111',1,'订åA','22.00',0,'2022-10-03 09:08:30','2022-10-02 21:29:14','zhangsan','zhangsan',0,NULL);
1ï¼å¼å ¥ä¾èµ
<!-- redisæä¹
åç¶ææº -->
<dependency>
<groupId>org.springframework.statemachine</groupId>
<artifactId>spring-statemachine-redis</artifactId>
<version>1.2.9.RELEASE</version>
</dependency>
<!--ç¶ææº-->
<dependency>
<groupId>org.springframework.statemachine</groupId>
<artifactId>spring-statemachine-starter</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
2ï¼å®ä¹ç¶ææºç¶æåäºä»¶
ç¶ææ举ï¼
public enum OrderStatus {
// å¾
æ¯ä»ï¼å¾
åè´§ï¼å¾
æ¶è´§ï¼å·²å®æ
WAIT_PAYMENT(1, "å¾
æ¯ä»"),
WAIT_DELIVER(2, "å¾
åè´§"),
WAIT_RECEIVE(3, "å¾
æ¶è´§"),
FINISH(4, "å·²å®æ");
private Integer key;
private String desc;
OrderStatus(Integer key, String desc) {
this.key = key;
this.desc = desc;
}
public Integer getKey() {
return key;
}
public String getDesc() {
return desc;
}
public static OrderStatus getByKey(Integer key) {
for (OrderStatus e : values()) {
if (e.getKey().equals(key)) {
return e;
}
}
throw new RuntimeException("enum not exists.");
}
}
äºä»¶ï¼
public enum OrderStatusChangeEvent {
// æ¯ä»ï¼åè´§ï¼ç¡®è®¤æ¶è´§
PAYED, DELIVERY, RECEIVED;
}
3ï¼å®ä¹ç¶ææºè§ååé ç½®ç¶ææº
@Configuration
@EnableStateMachine(name = "orderStateMachine")
public class OrderStateMachineConfig extends StateMachineConfigurerAdapter<OrderStatus, OrderStatusChangeEvent> {
/**
* é
ç½®ç¶æ
*
* @param states
* @throws Exception
*/
public void configure(StateMachineStateConfigurer<OrderStatus, OrderStatusChangeEvent> states) throws Exception {
states
.withStates()
.initial(OrderStatus.WAIT_PAYMENT)
.states(EnumSet.allOf(OrderStatus.class));
}
/**
* é
ç½®ç¶æ转æ¢äºä»¶å
³ç³»
*
* @param transitions
* @throws Exception
*/
public void configure(StateMachineTransitionConfigurer<OrderStatus, OrderStatusChangeEvent> transitions) throws Exception {
transitions
//æ¯ä»äºä»¶:å¾
æ¯ä»-ãå¾
åè´§
.withExternal().source(OrderStatus.WAIT_PAYMENT).target(OrderStatus.WAIT_DELIVER).event(OrderStatusChangeEvent.PAYED)
.and()
//åè´§äºä»¶:å¾
åè´§-ãå¾
æ¶è´§
.withExternal().source(OrderStatus.WAIT_DELIVER).target(OrderStatus.WAIT_RECEIVE).event(OrderStatusChangeEvent.DELIVERY)
.and()
//æ¶è´§äºä»¶:å¾
æ¶è´§-ãå·²å®æ
.withExternal().source(OrderStatus.WAIT_RECEIVE).target(OrderStatus.FINISH).event(OrderStatusChangeEvent.RECEIVED);
}
}
é ç½®æä¹ åï¼
@Configuration
@Slf4j
public class Persist<E, S> {
/**
* æä¹
åå°å
åmapä¸
*
* @return
*/
@Bean(name = "stateMachineMemPersister")
public static StateMachinePersister getPersister() {
return new DefaultStateMachinePersister(new StateMachinePersist() {
@Override
public void write(StateMachineContext context, Object contextObj) throws Exception {
log.info("æä¹
åç¶ææº,context:{},contextObj:{}", JSON.toJSONString(context), JSON.toJSONString(contextObj));
map.put(contextObj, context);
}
@Override
public StateMachineContext read(Object contextObj) throws Exception {
log.info("è·åç¶ææº,contextObj:{}", JSON.toJSONString(contextObj));
StateMachineContext stateMachineContext = (StateMachineContext) map.get(contextObj);
log.info("è·åç¶ææºç»æ,stateMachineContext:{}", JSON.toJSONString(stateMachineContext));
return stateMachineContext;
}
private Map map = new HashMap();
});
}
@Resource
private RedisConnectionFactory redisConnectionFactory;
/**
* æä¹
åå°redisä¸ï¼å¨åå¸å¼ç³»ç»ä¸ä½¿ç¨
*
* @return
*/
@Bean(name = "stateMachineRedisPersister")
public RedisStateMachinePersister<E, S> getRedisPersister() {
RedisStateMachineContextRepository<E, S> repository = new RedisStateMachineContextRepository<>(redisConnectionFactory);
RepositoryStateMachinePersist p = new RepositoryStateMachinePersist<>(repository);
return new RedisStateMachinePersister<>(p);
}
}
4ï¼ä¸å¡ç³»ç»
controllerï¼
@RestController
@RequestMapping("/order")
public class OrderController {
@Resource
private OrderService orderService;
/**
* æ ¹æ®idæ¥è¯¢è®¢å
*
* @return
*/
@RequestMapping("/getById")
public Order getById(@RequestParam("id") Long id) {
//æ ¹æ®idæ¥è¯¢è®¢å
Order order = orderService.getById(id);
return order;
}
/**
* å建订å
*
* @return
*/
@RequestMapping("/create")
public String create(@RequestBody Order order) {
//å建订å
orderService.create(order);
return "sucess";
}
/**
* 对订åè¿è¡æ¯ä»
*
* @param id
* @return
*/
@RequestMapping("/pay")
public String pay(@RequestParam("id") Long id) {
//对订åè¿è¡æ¯ä»
orderService.pay(id);
return "success";
}
/**
* 对订åè¿è¡åè´§
*
* @param id
* @return
*/
@RequestMapping("/deliver")
public String deliver(@RequestParam("id") Long id) {
//对订åè¿è¡ç¡®è®¤æ¶è´§
orderService.deliver(id);
return "success";
}
/**
* 对订åè¿è¡ç¡®è®¤æ¶è´§
*
* @param id
* @return
*/
@RequestMapping("/receive")
public String receive(@RequestParam("id") Long id) {
//对订åè¿è¡ç¡®è®¤æ¶è´§
orderService.receive(id);
return "success";
}
}
servieï¼
@Service("orderService")
@Slf4j
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements OrderService {
@Resource
private StateMachine<OrderStatus, OrderStatusChangeEvent> orderStateMachine;
@Resource
private StateMachinePersister<OrderStatus, OrderStatusChangeEvent, String> stateMachineMemPersister;
@Resource
private OrderMapper orderMapper;
/**
* å建订å
*
* @param order
* @return
*/
public Order create(Order order) {
order.setStatus(OrderStatus.WAIT_PAYMENT.getKey());
orderMapper.insert(order);
return order;
}
/**
* 对订åè¿è¡æ¯ä»
*
* @param id
* @return
*/
public Order pay(Long id) {
Order order = orderMapper.selectById(id);
log.info("线ç¨å称ï¼{},å°è¯æ¯ä»ï¼è®¢åå·ï¼{}" ,Thread.currentThread().getName() , id);
if (!sendEvent(OrderStatusChangeEvent.PAYED, order)) {
log.error("线ç¨å称ï¼{},æ¯ä»å¤±è´¥, ç¶æå¼å¸¸ï¼è®¢åä¿¡æ¯ï¼{}", Thread.currentThread().getName(), order);
throw new RuntimeException("æ¯ä»å¤±è´¥, 订åç¶æå¼å¸¸");
}
return order;
}
/**
* 对订åè¿è¡åè´§
*
* @param id
* @return
*/
public Order deliver(Long id) {
Order order = orderMapper.selectById(id);
log.info("线ç¨å称ï¼{},å°è¯åè´§ï¼è®¢åå·ï¼{}" ,Thread.currentThread().getName() , id);
if (!sendEvent(OrderStatusChangeEvent.DELIVERY, order)) {
log.error("线ç¨å称ï¼{},å货失败, ç¶æå¼å¸¸ï¼è®¢åä¿¡æ¯ï¼{}", Thread.currentThread().getName(), order);
throw new RuntimeException("å货失败, 订åç¶æå¼å¸¸");
}
return order;
}
/**
* 对订åè¿è¡ç¡®è®¤æ¶è´§
*
* @param id
* @return
*/
public Order receive(Long id) {
Order order = orderMapper.selectById(id);
log.info("线ç¨å称ï¼{},å°è¯æ¶è´§ï¼è®¢åå·ï¼{}" ,Thread.currentThread().getName() , id);
if (!sendEvent(OrderStatusChangeEvent.RECEIVED, order)) {
log.error("线ç¨å称ï¼{},æ¶è´§å¤±è´¥, ç¶æå¼å¸¸ï¼è®¢åä¿¡æ¯ï¼{}", Thread.currentThread().getName(), order);
throw new RuntimeException("æ¶è´§å¤±è´¥, 订åç¶æå¼å¸¸");
}
return order;
}
/**
* åé订åç¶æ转æ¢äºä»¶
* synchronized修饰ä¿è¯è¿ä¸ªæ¹æ³æ¯çº¿ç¨å®å
¨ç
*
* @param changeEvent
* @param order
* @return
*/
private synchronized boolean sendEvent(OrderStatusChangeEvent changeEvent, Order order) {
boolean result = false;
try {
//å¯å¨ç¶ææº
orderStateMachine.start();
//å°è¯æ¢å¤ç¶ææºç¶æ
stateMachineMemPersister.restore(orderStateMachine, String.valueOf(order.getId()));
Message message = MessageBuilder.withPayload(changeEvent).setHeader("order", order).build();
result = orderStateMachine.sendEvent(message);
//æä¹
åç¶ææºç¶æ
stateMachineMemPersister.persist(orderStateMachine, String.valueOf(order.getId()));
} catch (Exception e) {
log.error("订åæä½å¤±è´¥:{}", e);
} finally {
orderStateMachine.stop();
}
return result;
}
}
çå¬ç¶æçååï¼
@Component("orderStateListener")
@WithStateMachine(name = "orderStateMachine")
@Slf4j
public class OrderStateListenerImpl {
@Resource
private OrderMapper orderMapper;
@OnTransition(source = "WAIT_PAYMENT", target = "WAIT_DELIVER")
public void payTransition(Message<OrderStatusChangeEvent> message) {
Order order = (Order) message.getHeaders().get("order");
log.info("æ¯ä»ï¼ç¶ææºåé¦ä¿¡æ¯ï¼{}", message.getHeaders().toString());
//æ´æ°è®¢å
order.setStatus(OrderStatus.WAIT_DELIVER.getKey());
orderMapper.updateById(order);
//TODO å
¶ä»ä¸å¡
}
@OnTransition(source = "WAIT_DELIVER", target = "WAIT_RECEIVE")
public void deliverTransition(Message<OrderStatusChangeEvent> message) {
Order order = (Order) message.getHeaders().get("order");
log.info("åè´§ï¼ç¶ææºåé¦ä¿¡æ¯ï¼{}", message.getHeaders().toString());
//æ´æ°è®¢å
order.setStatus(OrderStatus.WAIT_RECEIVE.getKey());
orderMapper.updateById(order);
//TODO å
¶ä»ä¸å¡
}
@OnTransition(source = "WAIT_RECEIVE", target = "FINISH")
public void receiveTransition(Message<OrderStatusChangeEvent> message) {
Order order = (Order) message.getHeaders().get("order");
log.info("确认æ¶è´§ï¼ç¶ææºåé¦ä¿¡æ¯ï¼{}", message.getHeaders().toString());
//æ´æ°è®¢å
order.setStatus(OrderStatus.FINISH.getKey());
orderMapper.updateById(order);
//TODO å
¶ä»ä¸å¡
}
}
3.3 æµè¯éªè¯
1ï¼éªè¯ä¸å¡
- æ°å¢ä¸ä¸ªè®¢åhttp://localhost:8084/order/create
- 对订åè¿è¡æ¯ä»http://localhost:8084/order/pay?id=2
- 对订åè¿è¡åè´§http://localhost:8084/order/deliver?id=2
- 对订åè¿è¡ç¡®è®¤æ¶è´§http://localhost:8084/order/receive?id=2
æ£å¸¸æµç¨ç»æãå¦æ对ä¸ä¸ªè®¢åè¿è¡æ¯ä»äºï¼å次è¿è¡æ¯ä»ï¼åä¼æ¥éï¼http://localhost:8084/order/pay?id=2
æ¥éå¦ä¸ï¼
2ï¼éªè¯æä¹ å
å å
使ç¨å åæä¹ åç±»æä¹ åï¼
@Resource
private StateMachinePersister<OrderStatus, OrderStatusChangeEvent, String> stateMachineMemPersister;
/**
* åé订åç¶æ转æ¢äºä»¶
* synchronized修饰ä¿è¯è¿ä¸ªæ¹æ³æ¯çº¿ç¨å®å
¨ç
*
* @param changeEvent
* @param order
* @return
*/
private synchronized boolean sendEvent(OrderStatusChangeEvent changeEvent, Order order) {
boolean result = false;
try {
//å¯å¨ç¶ææº
orderStateMachine.start();
//å°è¯æ¢å¤ç¶ææºç¶æ
stateMachineMemPersister.restore(orderStateMachine, String.valueOf(order.getId()));
Message message = MessageBuilder.withPayload(changeEvent).setHeader("order", order).build();
result = orderStateMachine.sendEvent(message);
//æä¹
åç¶ææºç¶æ
stateMachineMemPersister.persist(orderStateMachine, String.valueOf(order.getId()));
} catch (Exception e) {
log.error("订åæä½å¤±è´¥:{}", e);
} finally {
orderStateMachine.stop();
}
return result;
}
redisæä¹ å
å¼å ¥ä¾èµï¼
<!-- redisæä¹
åç¶ææº -->
<dependency>
<groupId>org.springframework.statemachine</groupId>
<artifactId>spring-statemachine-redis</artifactId>
<version>1.2.9.RELEASE</version>
</dependency>
é ç½®yamlï¼
spring:
redis:
database: 0
host: localhost
jedis:
pool:
max-active: 8
max-idle: 8
max-wait: ''
min-idle: 0
password: ''
port: 6379
timeout: 0
使ç¨redisæä¹ åç±»æä¹ åï¼
@Resource
private StateMachinePersister<OrderStatus, OrderStatusChangeEvent, String> stateMachineRedisPersister;
/**
* åé订åç¶æ转æ¢äºä»¶
* synchronized修饰ä¿è¯è¿ä¸ªæ¹æ³æ¯çº¿ç¨å®å
¨ç
*
* @param changeEvent
* @param order
* @return
*/
private synchronized boolean sendEvent(OrderStatusChangeEvent changeEvent, Order order) {
boolean result = false;
try {
//å¯å¨ç¶ææº
orderStateMachine.start();
//å°è¯æ¢å¤ç¶ææºç¶æ
stateMachineRedisPersister.restore(orderStateMachine, String.valueOf(order.getId()));
Message message = MessageBuilder.withPayload(changeEvent).setHeader("order", order).build();
result = orderStateMachine.sendEvent(message);
//æä¹
åç¶ææºç¶æ
stateMachineRedisPersister.persist(orderStateMachine, String.valueOf(order.getId()));
} catch (Exception e) {
log.error("订åæä½å¤±è´¥:{}", e);
} finally {
orderStateMachine.stop();
}
return result;
}
3.4 ç¶ææºåå¨çé®é¢
1ï¼stateMachineæ æ³æåºå¼å¸¸ï¼å¼å¸¸ä¼è¢«ç¶ææºç»æ¶åæ
é®é¢ç°è±¡
ä»orderStateMachine.sendEvent(message);è·åçç»ææ æ³æç¥å°ãæ 论æ§è¡æ£å¸¸è¿æ¯æåºå¼å¸¸ï¼é½è¿åtrueã
@Resource
private OrderMapper orderMapper;
@Resource
private StateMachine<OrderStatus, OrderStatusChangeEvent> orderStateMachine;
@OnTransition(source = "WAIT_PAYMENT", target = "WAIT_DELIVER")
@Transactional(rollbackFor = Exception.class)
public void payTransition(Message<OrderStatusChangeEvent> message) {
Order order = (Order) message.getHeaders().get("order");
log.info("æ¯ä»ï¼ç¶ææºåé¦ä¿¡æ¯ï¼{}", message.getHeaders().toString());
try {
//æ´æ°è®¢å
order.setStatus(OrderStatus.WAIT_DELIVER.getKey());
orderMapper.updateById(order);
//TODO å
¶ä»ä¸å¡
//模æå¼å¸¸
if(Objects.equals(order.getName(),"A")){
throw new RuntimeException("æ§è¡ä¸å¡å¼å¸¸");
}
} catch (Exception e) {
//å¦æåºç°å¼å¸¸ï¼è®°å½å¼å¸¸ä¿¡æ¯ï¼æåºå¼å¸¸ä¿¡æ¯è¿è¡åæ»
log.error("payTransition åºç°å¼å¸¸ï¼{}",e);
throw e;
}
}
çå¬äºä»¶æåºå¼å¸¸ï¼å¨åéäºä»¶ä¸æ æ³æç¥ï¼
private synchronized boolean sendEvent(OrderStatusChangeEvent changeEvent, Order order) {
boolean result = false;
try {
//å¯å¨ç¶ææº
orderStateMachine.start();
//å°è¯æ¢å¤ç¶ææºç¶æ
stateMachineMemPersister.restore(orderStateMachine, String.valueOf(order.getId()));
Message message = MessageBuilder.withPayload(changeEvent).setHeader("order", order).build();
//äºä»¶æ§è¡å¼å¸¸äºï¼ä¾ç¶è¿åtrueï¼æ æ³æç¥å¼å¸¸
result = orderStateMachine.sendEvent(message);
if(result){
//æä¹
åç¶ææºç¶æï¼å¦ææ ¹æ®trueæä¹
åï¼åä¼åºç°é®é¢
stateMachineMemPersister.persist(orderStateMachine, String.valueOf(order.getId()));
}
} catch (Exception e) {
log.error("订åæä½å¤±è´¥:{}", e);
} finally {
orderStateMachine.stop();
}
return result;
}
è°è¯åç°ï¼åéäºä»¶åçå¬äºä»¶æ¯ä¸ä¸ªçº¿ç¨ï¼åéäºä»¶çç»ææ¯å¨çå¬æä½æ§è¡å®ä¹åæè¿å
çå¬çº¿ç¨ï¼
解å³æ¹æ¡ï¼èªå·±ä¿åå¼å¸¸å°æ°æ®åºæè å åä¸ï¼è¿è¡å¤æ
ä¹å¯ä»¥éè¿æ¥å£ï¼
org.springframework.statemachine.StateMachine##getExtendedState
æ¹æ³ææ§è¡ç¶ææ¾å ¥è¿ä¸ªåéä¸
public interface ExtendedState {
Map<Object, Object> getVariables();
<T> T get(Object var1, Class<T> var2);
void setExtendedStateChangeListener(ExtendedState.ExtendedStateChangeListener var1);
public interface ExtendedStateChangeListener {
void changed(Object var1, Object var2);
}
}
org.springframework.statemachine.support.DefaultExtendedState##getVariables
private final Map<Object, Object> variables;
public DefaultExtendedState() {
this.variables = new ObservableMap(new ConcurrentHashMap(), new DefaultExtendedState.LocalMapChangeListener());
}
public Map<Object, Object> getVariables() {
return this.variables;
}
æ¹é çå¬ç¶æï¼æä¸å¡çæ§è¡ç»æè¿è¡ä¿åï¼1æåï¼0失败
@Resource
private OrderMapper orderMapper;
@Resource
private StateMachine<OrderStatus, OrderStatusChangeEvent> orderStateMachine;
@OnTransition(source = "WAIT_PAYMENT", target = "WAIT_DELIVER")
@Transactional(rollbackFor = Exception.class)
public void payTransition(Message<OrderStatusChangeEvent> message) {
Order order = (Order) message.getHeaders().get("order");
log.info("æ¯ä»ï¼ç¶ææºåé¦ä¿¡æ¯ï¼{}", message.getHeaders().toString());
try {
//æ´æ°è®¢å
order.setStatus(OrderStatus.WAIT_DELIVER.getKey());
orderMapper.updateById(order);
//TODO å
¶ä»ä¸å¡
//模æå¼å¸¸
if(Objects.equals(order.getName(),"A")){
throw new RuntimeException("æ§è¡ä¸å¡å¼å¸¸");
}
//æå å为1
orderStateMachine.getExtendedState().getVariables().put(CommonConstants.payTransition+order.getId(),1);
} catch (Exception e) {
//å¦æåºç°å¼å¸¸ï¼åè¿è¡åæ»
log.error("payTransition åºç°å¼å¸¸ï¼{}",e);
//å°å¼å¸¸ä¿¡æ¯åéä¿¡æ¯ä¸ï¼å¤±è´¥å为0
orderStateMachine.getExtendedState().getVariables().put(CommonConstants.payTransition+order.getId(), 0);
throw e;
}
}
åéäºä»¶æ¹é ï¼å¦æè·åå°ä¸å¡æ§è¡å¼å¸¸ï¼åè¿å失败ï¼ä¸è¿è¡ç¶ææºæä¹ å com.zengqingfa.springboot.state.demo.service.impl.OrderServiceImpl##sendEvent
@Resource
private StateMachine<OrderStatus, OrderStatusChangeEvent> orderStateMachine;
@Resource
private StateMachinePersister<OrderStatus, OrderStatusChangeEvent, String> stateMachineMemPersister;
/**
* åé订åç¶æ转æ¢äºä»¶
* synchronized修饰ä¿è¯è¿ä¸ªæ¹æ³æ¯çº¿ç¨å®å
¨ç
*
* @param changeEvent
* @param order
* @return
*/
private synchronized boolean sendEvent(OrderStatusChangeEvent changeEvent, Order order){
boolean result = false;
try {
//å¯å¨ç¶ææº
orderStateMachine.start();
//å°è¯æ¢å¤ç¶ææºç¶æ
stateMachineMemPersister.restore(orderStateMachine, String.valueOf(order.getId()));
Message message = MessageBuilder.withPayload(changeEvent).setHeader("order", order).build();
result = orderStateMachine.sendEvent(message);
if(!result){
return false;
}
//è·åå°çå¬çç»æä¿¡æ¯
Integer o = (Integer) orderStateMachine.getExtendedState().getVariables().get(CommonConstants.payTransition + order.getId());
//æä½å®æä¹å,å é¤æ¬æ¬¡å¯¹åºçkeyä¿¡æ¯
orderStateMachine.getExtendedState().getVariables().remove(CommonConstants.payTransition+order.getId());
//å¦æäºå¡æ§è¡æåï¼åæä¹
åç¶ææº
if(Objects.equals(1,Integer.valueOf(o))){
//æä¹
åç¶ææºç¶æ
stateMachineMemPersister.persist(orderStateMachine, String.valueOf(order.getId()));
}else {
//订åæ§è¡ä¸å¡å¼å¸¸
return false;
}
} catch (Exception e) {
log.error("订åæä½å¤±è´¥:{}", e);
} finally {
orderStateMachine.stop();
}
return result;
}
代ç ä¼å
- åéäºä»¶åªé对äºæ¯ä»ï¼å¦ææ¯éæ¯ä»äºä»¶å¢ï¼
//è·åå°çå¬çç»æä¿¡æ¯
Integer o = (Integer) orderStateMachine.getExtendedState().getVariables().get(CommonConstants.payTransition + order.getId());
- çå¬è®¾ç½®ç¶æç代ç æéå¤ä»£ç ï¼éè¦è¿è¡ä¼åï¼å¯ä½¿ç¨aop
try {
//TODO å
¶ä»ä¸å¡
//æå å为1
orderStateMachine.getExtendedState().getVariables().put(CommonConstants.payTransition+order.getId(),1);
} catch (Exception e) {
//å¦æåºç°å¼å¸¸ï¼åè¿è¡åæ»
log.error("payTransition åºç°å¼å¸¸ï¼{}",e);
//å°å¼å¸¸ä¿¡æ¯åéä¿¡æ¯ä¸ï¼å¤±è´¥å为0
orderStateMachine.getExtendedState().getVariables().put(CommonConstants.payTransition+order.getId(), 0);
throw e;
}
常éç±»ï¼
public interface CommonConstants {
String orderHeader="order";
String payTransition="payTransition";
String deliverTransition="deliverTransition";
String receiveTransition="receiveTransition";
}
æ¯ä»åéäºä»¶ï¼
com.zengqingfa.springboot.state.demo.service.impl.OrderServiceImpl##pay
@Resource
private StateMachine<OrderStatus, OrderStatusChangeEvent> orderStateMachine;
@Resource
private StateMachinePersister<OrderStatus, OrderStatusChangeEvent, String> stateMachineMemPersister;
@Resource
private OrderMapper orderMapper;
/**
* 对订åè¿è¡æ¯ä»
*
* @param id
* @return
*/
public Order pay(Long id) {
Order order = orderMapper.selectById(id);
log.info("线ç¨å称ï¼{},å°è¯æ¯ä»ï¼è®¢åå·ï¼{}" ,Thread.currentThread().getName() , id);
if (!sendEvent(OrderStatusChangeEvent.PAYED, order,CommonConstants.payTransition)) {
log.error("线ç¨å称ï¼{},æ¯ä»å¤±è´¥, ç¶æå¼å¸¸ï¼è®¢åä¿¡æ¯ï¼{}", Thread.currentThread().getName(), order);
throw new RuntimeException("æ¯ä»å¤±è´¥, 订åç¶æå¼å¸¸");
}
return order;
}
/**
* åé订åç¶æ转æ¢äºä»¶
* synchronized修饰ä¿è¯è¿ä¸ªæ¹æ³æ¯çº¿ç¨å®å
¨ç
*
* @param changeEvent
* @param order
* @return
*/
private synchronized boolean sendEvent(OrderStatusChangeEvent changeEvent, Order order,String key){
boolean result = false;
try {
//å¯å¨ç¶ææº
orderStateMachine.start();
//å°è¯æ¢å¤ç¶ææºç¶æ
stateMachineMemPersister.restore(orderStateMachine, String.valueOf(order.getId()));
Message message = MessageBuilder.withPayload(changeEvent).setHeader("order", order).build();
result = orderStateMachine.sendEvent(message);
if(!result){
return false;
}
//è·åå°çå¬çç»æä¿¡æ¯
Integer o = (Integer) orderStateMachine.getExtendedState().getVariables().get(key + order.getId());
//æä½å®æä¹å,å é¤æ¬æ¬¡å¯¹åºçkeyä¿¡æ¯
orderStateMachine.getExtendedState().getVariables().remove(key+order.getId());
//å¦æäºå¡æ§è¡æåï¼åæä¹
åç¶ææº
if(Objects.equals(1,Integer.valueOf(o))){
//æä¹
åç¶ææºç¶æ
stateMachineMemPersister.persist(orderStateMachine, String.valueOf(order.getId()));
}else {
//订åæ§è¡ä¸å¡å¼å¸¸
return false;
}
} catch (Exception e) {
log.error("订åæä½å¤±è´¥:{}", e);
} finally {
orderStateMachine.stop();
}
return result;
}
使ç¨aop对çå¬äºä»¶åé¢ï¼æä¸å¡æ§è¡ç»æå°è£ å°ç¶ææºçåéä¸ï¼æ³¨è§£ï¼
@Retention(RetentionPolicy.RUNTIME)
public @interface LogResult {
/**
*æ§è¡çä¸å¡key
*
* @return String
*/
String key();
}
åé¢ï¼
@Component
@Aspect
@Slf4j
public class LogResultAspect {
//æ¦æª LogHistory注解
@Pointcut("@annotation(com.zengqingfa.springboot.state.demo.aop.annotation.LogResult)")
private void logResultPointCut() {
//logResultPointCut æ¥å¿æ³¨è§£åç¹
}
@Resource
private StateMachine<OrderStatus, OrderStatusChangeEvent> orderStateMachine;
@Around("logResultPointCut()")
public Object logResultAround(ProceedingJoinPoint pjp) throws Throwable {
//è·ååæ°
Object[] args = pjp.getArgs();
log.info("åæ°args:{}", args);
Message message = (Message) args[0];
Order order = (Order) message.getHeaders().get("order");
//è·åæ¹æ³
Method method = ((MethodSignature) pjp.getSignature()).getMethod();
// è·åLogHistory注解
LogResult logResult = method.getAnnotation(LogResult.class);
String key = logResult.key();
Object returnVal = null;
try {
//æ§è¡æ¹æ³
returnVal = pjp.proceed();
//å¦æä¸å¡æ§è¡æ£å¸¸ï¼åä¿åä¿¡æ¯
//æå å为1
orderStateMachine.getExtendedState().getVariables().put(key + order.getId(), 1);
} catch (Throwable e) {
log.error("e:{}", e.getMessage());
//å¦æä¸å¡æ§è¡å¼å¸¸ï¼åä¿åä¿¡æ¯
//å°å¼å¸¸ä¿¡æ¯åéä¿¡æ¯ä¸ï¼å¤±è´¥å为0
orderStateMachine.getExtendedState().getVariables().put(key + order.getId(), 0);
throw e;
}
return returnVal;
}
}
çå¬ç±»ä½¿ç¨æ³¨è§£ï¼
@Component("orderStateListener")
@WithStateMachine(name = "orderStateMachine")
@Slf4j
public class OrderStateListenerImpl {
@Resource
private OrderMapper orderMapper;
@OnTransition(source = "WAIT_PAYMENT", target = "WAIT_DELIVER")
@Transactional(rollbackFor = Exception.class)
@LogResult(key = CommonConstants.payTransition)
public void payTransition(Message<OrderStatusChangeEvent> message) {
Order order = (Order) message.getHeaders().get("order");
log.info("æ¯ä»ï¼ç¶ææºåé¦ä¿¡æ¯ï¼{}", message.getHeaders().toString());
//æ´æ°è®¢å
order.setStatus(OrderStatus.WAIT_DELIVER.getKey());
orderMapper.updateById(order);
//TODO å
¶ä»ä¸å¡
//模æå¼å¸¸
if (Objects.equals(order.getName(), "A")) {
throw new RuntimeException("æ§è¡ä¸å¡å¼å¸¸");
}
}
@OnTransition(source = "WAIT_DELIVER", target = "WAIT_RECEIVE")
@LogResult(key = CommonConstants.deliverTransition)
public void deliverTransition(Message<OrderStatusChangeEvent> message) {
Order order = (Order) message.getHeaders().get("order");
log.info("åè´§ï¼ç¶ææºåé¦ä¿¡æ¯ï¼{}", message.getHeaders().toString());
//æ´æ°è®¢å
order.setStatus(OrderStatus.WAIT_RECEIVE.getKey());
orderMapper.updateById(order);
//TODO å
¶ä»ä¸å¡
}
@OnTransition(source = "WAIT_RECEIVE", target = "FINISH")
@LogResult(key = CommonConstants.receiveTransition)
public void receiveTransition(Message<OrderStatusChangeEvent> message) {
Order order = (Order) message.getHeaders().get("order");
log.info("确认æ¶è´§ï¼ç¶ææºåé¦ä¿¡æ¯ï¼{}", message.getHeaders().toString());
//æ´æ°è®¢å
order.setStatus(OrderStatus.FINISH.getKey());
orderMapper.updateById(order);
//TODO å
¶ä»ä¸å¡
}
}