天天看點

結合ThreadLocal來看spring事務源碼,感受下清泉般的洗滌!

前言

  在我的部落格spring事務源碼解析中,提到了一個很關鍵的點:将connection綁定到目前線程來保證這個線程中的資料庫操作用的是同一個connection。但是沒有細緻的講到如何綁定,以及為什麼這麼綁定;另外也沒有講到連接配接池的相關問題:如何從連接配接池擷取,如何歸還連接配接到連接配接池等等。那麼下面就請聽我慢慢道來。

  路漫漫其修遠兮,吾将上下而求索!

  github:https://github.com/youzhibing

  碼雲(gitee):https://gitee.com/youzhibing

ThreadLocal

  講spring事務之前,我們先來看看ThreadLocal,它在spring事務中是占據着比較重要的地位;不管你對ThreadLocal熟悉與否,且都靜下心來聽我唐僧般的念叨。

  先強調一點:ThreadLocal不是用來解決共享變量問題的,它與多線程的并發問題沒有任何關系。

  基本介紹

    當使用ThreadLocal維護變量時,ThreadLocal為每個使用該變量的線程提供獨立的變量副本,是以每一個線程都可以獨立地改變自己的副本,而不會影響其它線程所對應的副本,如下例:

public class ThreadLocalTest
{
    ThreadLocal<Long> longLocal = new ThreadLocal<Long>();
    ThreadLocal<String> stringLocal = new ThreadLocal<String>();
    
    public void set()
    {
        longLocal.set(1L);
        stringLocal.set(Thread.currentThread().getName());
    }
    
    public long getLong()
    {
        return longLocal.get();
    }
    
    public String getString()
    {
        return stringLocal.get();
    }
    
    public static void main(String[] args) throws InterruptedException
    {
        final ThreadLocalTest test = new ThreadLocalTest();
        
        test.set();     // 初始化ThreadLocal
        for (int i=0; i<10; i++)
        {
            System.out.println(test.getString() + " : " + test.getLong() + i);
        }
        
        Thread thread1 = new Thread(){
            public void run() {
                test.set();
                for (int i=0; i<10; i++)
                {
                    System.out.println(test.getString() + " : " + test.getLong() + i);
                }
            };
        };
        thread1.start();
        
        Thread thread2 = new Thread(){
            public void run() {
                test.set();
                for (int i=0; i<10; i++)
                {
                    System.out.println(test.getString() + " : " + test.getLong() + i);
                }
            };
        };
        thread2.start();
    }
}      

    執行結果如下

結合ThreadLocal來看spring事務源碼,感受下清泉般的洗滌!

    可以看到,各個線程的longLocal值與stringLocal值是互相獨立的,本線程的累加操作不會影響到其他線程的值,真正達到了線程内部隔離的效果。

  源碼解讀

    這裡我就不進行ThreadLocal的源碼解析,建議大家去看我參考的部落格,個人認為看那兩篇部落格就能對ThreadLocal有個很深地認知了。

    做個重複的強調(引用[Java并發包學習七]解密ThreadLocal中的一段話):

Thread與ThreadLocal對象之間的引用關系圖
 
        
結合ThreadLocal來看spring事務源碼,感受下清泉般的洗滌!
看了ThreadLocal源碼,不知道大家有沒有一個疑惑:為什麼像上圖那麼設計? 如果給你設計,你會怎麼設計?相信大部分人會有這樣的想法,我也是這樣的想法:
  ”每個ThreadLocal類建立一個Map,然後用線程的ID作為Map的key,執行個體對象作為Map的value,這樣就能達到各個線程的值隔離的效果“
JDK最早期的ThreadLocal就是這樣設計的。(不确定是否是1.3)之後ThreadLocal的設計換了一種方式,也就是目前的方式,那有什麼優勢了:
  1、這樣設計之後每個Map的Entry數量變小了:之前是Thread的數量,現在是ThreadLocal的數量,能提高性能,據說性能的提升不是一點兩點(沒有親測)
  2、當Thread銷毀之後對應的ThreadLocalMap也就随之銷毀了,能減少記憶體使用量。      

    總結下改進後的優點

      1、自動釋放, 當 Thread 對象銷毀後,ThreadLocalMap 對象也随之銷毀,JVM 及時回收,避免了記憶體洩漏。如果按我們的想法:定義一個靜态的map,将目前 thread(或 thread 的 ID) 作為key,需要儲存的對象作為 value,put 到 map 中;如果任務完成之後,目前線程銷毀了,這個靜态 map 中該線程的資訊不會自動回收,如果我們不手動去釋放,這個 map 會随着時間的積累越來越大,最後出現記憶體洩漏。而一旦需要進行手動釋放,那很有可能就會有漏網之魚,這就像埋一個定時炸彈,定期爆發,而又不好排查!

      2、性能提升,各線程通路的 ThreadLocalMap 是各自不同的 ThreadLocalMap,是以不需要同步,速度會快很多;而如果把所有線程要用的對象都放到一個靜态 map 中的話,多線程并發通路需要進行同步(有興趣的可以去看下 JDK1.3的實作)

Spring事務中的ThreadLocal

  最常見的ThreadLocal使用場景為 用來解決資料庫連接配接、Session管理等,那麼接下來我們就看看spring事務中ThreadLocal的應用

ApplicationContext ac = new ClassPathXmlApplicationContext("applicationContext-jdbc.xml");
DaoImpl daoImpl = (DaoImpl) ac.getBean("daoImpl");
System.out.println(daoImpl.insertUser("yes", 25));      

  隻要某個類的方法、類或者接口上有事務配置,spring就會對該類的執行個體生成代理。是以daoImpl是DaoImpl執行個體的代理執行個體的引用,而不是DaoImpl的執行個體(目标執行個體)的引用;當我們調用目标執行個體的方法時,實際調用的是代理執行個體對應的方法,若目标方法沒有被@Transactional(或aop注解,當然這裡不涉及aop)修飾,那麼代理方法直接反射調用目标方法,若目标方法被@Transactional修飾,那麼代理方法會先執行增強(例如判斷目前線程是否存在connection,不存在則建立并綁定到目前線程等等),然後通過反射執行目标方法,最後回到代理方法執行增強(例如,事務復原或事務送出、connection歸還到連接配接池等等處理)。這裡的綁定connection到目前線程就用到了ThreadLocal,我們來看看源碼

@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    Connection con = null;

    try {
        
        if (txObject.getConnectionHolder() == null ||
                txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
            // 從連接配接池擷取一個connection
            Connection newCon = this.dataSource.getConnection();
            if (logger.isDebugEnabled()) {
                logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
            }
            // 包裝newCon,并指派到txObject,并标記是新的ConnectionHolder
            txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
        }

        txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
        con = txObject.getConnectionHolder().getConnection();

        Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
        txObject.setPreviousIsolationLevel(previousIsolationLevel);

        // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
        // so we don't want to do it unnecessarily (for example if we've explicitly
        // configured the connection pool to set it already).
        if (con.getAutoCommit()) {
            txObject.setMustRestoreAutoCommit(true);
            if (logger.isDebugEnabled()) {
                logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
            }
            con.setAutoCommit(false);
        }
        txObject.getConnectionHolder().setTransactionActive(true);

        int timeout = determineTimeout(definition);
        if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
            txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
        }

        // 若是新的ConnectionHolder,則将它綁定到目前線程中
        // Bind the session holder to the thread.
        if (txObject.isNewConnectionHolder()) {
            TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
        }
    }

    catch (Throwable ex) {
        if (txObject.isNewConnectionHolder()) {
            DataSourceUtils.releaseConnection(con, this.dataSource);
            txObject.setConnectionHolder(null, false);
        }
        throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
    }
}      
/**
 * Bind the given resource for the given key to the current thread.
 * @param key the key to bind the value to (usually the resource factory)
 * @param value the value to bind (usually the active resource object)
 * @throws IllegalStateException if there is already a value bound to the thread
 * @see ResourceTransactionManager#getResourceFactory()
 */
public static void bindResource(Object key, Object value) throws IllegalStateException {        //key:通常指資源工廠,也就是connection工廠,value:通常指活動的資源,也就是活動的ConnectionHolder
    
    // 必要時unwrap給定的連接配接池; 否則按原樣傳回給定的連接配接池。
    Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
    Assert.notNull(value, "Value must not be null");
    Map<Object, Object> map = resources.get();
    // set ThreadLocal Map if none found   如果ThreadLocal Map不存在則建立,并将其設定到resources中
    // private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<Map<Object, Object>>("Transactional resources");
  // 這就到了ThreadLocal流程了
    if (map == null) {
        map = new HashMap<Object, Object>();
        resources.set(map);
    }
    Object oldValue = map.put(actualKey, value);
    // Transparently suppress a ResourceHolder that was marked as void...
    if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {
        oldValue = null;
    }
    if (oldValue != null) {
        throw new IllegalStateException("Already value [" + oldValue + "] for key [" +
                actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
    }
    if (logger.isTraceEnabled()) {
        logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to thread [" +
                Thread.currentThread().getName() + "]");
    }
}      

總結

  1、ThreadLocal能解決的問題,那肯定不是共享變量(多線程并發)問題,隻是看起來有些像并發;像火車票、電影票這樣的真正的共享變量的問題用ThreadLocal是解決不了的,同一時間,同一趟車的同一個座位,你敢用ThreadLocal來解決嗎?

  2、每個Thread維護一個ThreadLocalMap映射表,這個映射表的key是ThreadLocal執行個體本身,value是真正需要存儲的Object

  3、druid連接配接池用的是數組來存放的connectionHolder,不是我認為的list,connectionHolder從線程中解綁後,歸還到數組連接配接池中;connectionHolder是connection的封裝

疑問

  private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<Map<Object, Object>>("Transactional resources");

  1、 為什麼是ThreadLocal<Map<Object, Object>>,而不是ThreadLocal<ConnectionHolder>

  2、 ThreadLocal<Map<Object, Object>> 中的Map的key是為什麼是DataSource

  望知道的朋友賜教下,評論留言或者私信都可以,謝謝!

參考

  Java并發程式設計:深入剖析ThreadLocal

  正确了解ThreadLocal

繼續閱讀