天天看點

【RoceketMQ】事務消息

事務消息可以解決分布式事務實作資料最終一緻。二階段送出協定。

【RoceketMQ】事務消息
public class TransactionListenerImpl implements TransactionListener {

    //存儲目前線程對應的事務狀态
    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    /***
     * 發送prepare消息成功後回調該方法用于執行本地事務
     * @param msg:回傳的消息,利用transactionId即可擷取到該消息的唯一Id
     * @param arg:調用send方法時傳遞的參數,當send時候若有額外的參數可以傳遞到send方法中,這裡能擷取到
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        //擷取線程ID
        String transactionId = msg.getTransactionId();
        //初始狀态為0
        localTrans.put(transactionId,0);

        try {
            //此處執行本地事務操作
            System.out.println("....執行本地事務");
            Thread.sleep(70000);
            System.out.println("....執行完成本地事務");
        } catch (InterruptedException e) {
            e.printStackTrace();
            //發生異常,則復原消息
            localTrans.put(transactionId,2);
            return LocalTransactionState.UNKNOW;
        }

        //修改狀态
        localTrans.put(transactionId,1);
        System.out.println("executeLocalTransaction------狀态為1");
        //本地事務操作如果成功了,則送出該消息,讓該消息可見
        return LocalTransactionState.UNKNOW;
    }

    /***
     * 消息回查
     * @param msg
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        //擷取事務id
        String transactionId = msg.getTransactionId();

        //通過事務id擷取對應的本地事務執行狀态
        Integer status = localTrans.get(transactionId);
        System.out.println("消息回查-----"+status);
        switch (status){
            case 0:
                return LocalTransactionState.UNKNOW;
            case 1:
                return LocalTransactionState.COMMIT_MESSAGE;
            case 2:
                return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        return LocalTransactionState.UNKNOW;
    }
}


public class TransactionProducer {

    //nameserver位址
    private static String namesrvaddress="127.0.0.1:9876;";

    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, InterruptedException {
        //建立事務消息發送對象
        TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group_name");
        //設定namesrv位址
        producer.setNamesrvAddr(namesrvaddress);
        //建立監聽器
        TransactionListener transactionListener = new TransactionListenerImpl();
        //建立線程池
        ExecutorService executorService = new ThreadPoolExecutor(
                2,
                5,
                100,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(
                        2000),
                        new ThreadFactory() {
                            @Override
                            public Thread newThread(Runnable runnable) {
                                Thread thread = new Thread(runnable);
                                thread.setName("client-transaction-msg-check-thread");
                                return thread;
                            }
                        }
                );

        //設定線程池
        producer.setExecutorService(executorService);
        //設定監聽器
        producer.setTransactionListener(transactionListener);
        //啟動producer
        producer.start();

        //建立消息
        Message message = new Message(
                "TopicTxt_Demo",
                "TagTx",
                "KeyTx1",
                "hello".getBytes(RemotingHelper.DEFAULT_CHARSET));

        //發送事務消息,此時消息不可見
        TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(message, "發送消息,回傳所需資料!");
        System.out.println(transactionSendResult);


        //休眠
        Thread.sleep(120000);
        //關閉
        producer.shutdown();
    }
}