事務消息可以解決分布式事務實作資料最終一緻。二階段送出協定。

public class TransactionListenerImpl implements TransactionListener {
//存儲目前線程對應的事務狀态
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
/***
* 發送prepare消息成功後回調該方法用于執行本地事務
* @param msg:回傳的消息,利用transactionId即可擷取到該消息的唯一Id
* @param arg:調用send方法時傳遞的參數,當send時候若有額外的參數可以傳遞到send方法中,這裡能擷取到
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
//擷取線程ID
String transactionId = msg.getTransactionId();
//初始狀态為0
localTrans.put(transactionId,0);
try {
//此處執行本地事務操作
System.out.println("....執行本地事務");
Thread.sleep(70000);
System.out.println("....執行完成本地事務");
} catch (InterruptedException e) {
e.printStackTrace();
//發生異常,則復原消息
localTrans.put(transactionId,2);
return LocalTransactionState.UNKNOW;
}
//修改狀态
localTrans.put(transactionId,1);
System.out.println("executeLocalTransaction------狀态為1");
//本地事務操作如果成功了,則送出該消息,讓該消息可見
return LocalTransactionState.UNKNOW;
}
/***
* 消息回查
* @param msg
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
//擷取事務id
String transactionId = msg.getTransactionId();
//通過事務id擷取對應的本地事務執行狀态
Integer status = localTrans.get(transactionId);
System.out.println("消息回查-----"+status);
switch (status){
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
}
public class TransactionProducer {
//nameserver位址
private static String namesrvaddress="127.0.0.1:9876;";
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, InterruptedException {
//建立事務消息發送對象
TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group_name");
//設定namesrv位址
producer.setNamesrvAddr(namesrvaddress);
//建立監聽器
TransactionListener transactionListener = new TransactionListenerImpl();
//建立線程池
ExecutorService executorService = new ThreadPoolExecutor(
2,
5,
100,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(
2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
}
);
//設定線程池
producer.setExecutorService(executorService);
//設定監聽器
producer.setTransactionListener(transactionListener);
//啟動producer
producer.start();
//建立消息
Message message = new Message(
"TopicTxt_Demo",
"TagTx",
"KeyTx1",
"hello".getBytes(RemotingHelper.DEFAULT_CHARSET));
//發送事務消息,此時消息不可見
TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(message, "發送消息,回傳所需資料!");
System.out.println(transactionSendResult);
//休眠
Thread.sleep(120000);
//關閉
producer.shutdown();
}
}