天天看點

分布式消息與資料庫事務共享模式

事務管理器org.springframework.jdbc.datasource.DataSourceTransactionManager(AbstractPlatformTransactionManager).getTransaction建立ConnectionHolder并用DriverManagerDataSource對象作鍵值儲存在TransactionSynchronizationManager.resources.使TransactionSynchronizationManager.isSynchronizationActive=true表示可以執行TransactionSynchronizationManager.registerSynchronization儲存事務對應的同步器到synchronizations如JmsResourceSynchronization。在事務使用JmsTemplate方法首先會建立JmsResourceHolder并用ActiveMQConnectionFactor對象作鍵值儲存在TransactionSynchronizationManager.resources,同一個事務内共用session和connection。

ActiveMQSession.createConsumer()方法建立連接配接MQ伺服器的消費者對象,MQ伺服器會把消費者要操作的消息隊列鎖住,其它消費者對象不能操作這個消息隊列,其它生産者對象可進行寫操作,執行MessageConsumer.close()才釋放鎖。然後JmsTemplate.doReceive()方法是讀取MQ伺服器的消息,讀取消息隻是被MQ伺服器隔離,執行session.commit()後MQ伺服器才最終删除讀取消息。不執行session.commit()消息可以復原。代碼片段下圖,表示如果session由DataSourceTransactionManager管理可以忽略執行session.commit(),等到執行事務方法DataSourceTransactionManager.commit()再執行。同步器JmsResourceSynchronization已注冊到TransactionSynchronizationManager,DataSourceTransactionManager.commit()會觸發TransactionSynchronizationManager.synchronizations裡所有同步器的afterCommit()方法,如JmsResourceSynchronization.afterCommit(),最終執行session.commit()。

if (session.getTransacted()) {
                // Commit necessary - but avoid commit call within a JTA transaction.
                if (isSessionLocallyTransacted(session)) {
                    // Transacted session created by this template -> commit.
                    JmsUtils.commitIfNecessary(session);
                }
            }
      
分布式消息與資料庫事務共享模式