天天看点

jdk源码分析(4) ThreadLocal源码分析及应用场景

1、ThreadLocal介绍

性质

  • ThreadLoal 变量,线程局部变量,同一个 ThreadLocal 所包含的对象,在不同的 Thread 中有不同的副本。具体实现就是在ThreadLocal中有一个关于线程的map表,这样不同的线程之间获取的数据是不相同的。
  • 当一个线程结束时,它所使用的所有 ThreadLocal 相对的实例副本都可被回收。具体实现是依靠数据的弱引用实现的。

应用场景

实例需要在多个方法中共享,但不希望被多线程共享。例如我们在http服务器中,服务器接收一个用户发来的登入请求,如果服务器针对每一个请求都新建一个新的线程,那么threadLocal变量就完美契合了这种情况。这样服务器就可以在这个线程中保存这个登入用户的相应的信息。(注意在tomcat中并非是一个请求服务器就会启动一个线程,而是服务器每接收一个请求,会先去线程池那边查看是否还有空闲的线程,如果还有空闲的线程,就会调用一个线程来处理这个请求,如果线程池中没有空闲的线程,那么这个请求只能等待。)

jdk源码分析(4) ThreadLocal源码分析及应用场景

例如在一个订票系统中是使用mybatis来操作数据库的,一个用户购买一个订单后,需要去订单模块中创建这个订单,也需要去用户模块中更新这个用户的历史订单和用户积分。在mysql中是支持事务的,但是在mybatis中操作mysql时只有在同一个sqlsession中才能保证事务性,也就是说不同的sqlsession更新数据是不能保证事务的。所以如何在一个线程中获取到同一个sqlsession呢?mybatis有SqlsessionManager类来管理Sqlsession,其中有一个localSqlSession来保存当前线程的Sqlsession,它的属性就是ThreadLocal,也就是在这个线程中,都可以通过

localSqlSession.get()来获取到当前线程一开始连接时放入的Sqlsession。

public class SqlSessionManager implements SqlSessionFactory, SqlSession {

  private final SqlSessionFactory sqlSessionFactory;
  private final SqlSession sqlSessionProxy;

  private final ThreadLocal<SqlSession> localSqlSession = new ThreadLocal<>();

  private SqlSessionManager(SqlSessionFactory sqlSessionFactory) {
    this.sqlSessionFactory = sqlSessionFactory;
    this.sqlSessionProxy = (SqlSession) Proxy.newProxyInstance(
        SqlSessionFactory.class.getClassLoader(),
        new Class[]{SqlSession.class},
        new SqlSessionInterceptor());
  }
           

2、ThreadLocal如何保证线程之间的隔离性

如果一个变量只是对于每一个线程都是不同的,那么我们首先想到的就是在Thread这个类中是否有这个的一个ThreadLocal变量。因为所有的线程都是需要实现Thread这个基类的。我们先来看下ThreadLocal的set函数

public void set(T value) {
      Thread t = Thread.currentThread();
      ThreadLocalMap map = getMap(t);
      if (map != null)
          map.set(this, value);  //然后在这个threadLocalMap对象里放入当前线程对象和value值
      else
          createMap(t, value);   //在当前线程上
  }
  
/*根据Thread变量来获取对应的ThreadLocalMap,说明一个Thread变量中保存有ThreadLocalMap类的变量*/
ThreadLocalMap getMap(Thread t) {
    return t.threadLocals;
 }
           
jdk源码分析(4) ThreadLocal源码分析及应用场景

我们在利用ThreadLocal变量来set一个值时,实际上这个值并不是存放在ThreadLocal对象里,ThreadLocal只是一个工具方法,在set函数中首先会获取当前线程的对象,然后获取到这个线程对象的ThreadLocal.ThreadLocalMap对象,这个ThreadLocal.ThreadLocalMap类中定义了Entry类,在并在ThreadLocalMap的构造函数中初始化了new Entry[INITIAL_CAPACITY](其中INITIAL_CAPACITY的值为16)。而Entry类只用到两个变量,分别是ThreadLocal和Object。

static class Entry extends WeakReference<ThreadLocal<?>> {
     /** The value associated with this ThreadLocal. */
      Object value;
      Entry(ThreadLocal<?> k, Object v) {
          super(k);
          value = v;
      }
  }
           

其中这个ThreadLocal的对象的键值是WeakReference引用类型的。弱引用类型的值在垃圾回收线程发现这个引用是弱引用时,就会进行标记,在下一个系统gc时会释放掉这个弱引用。具体为什么要这么设计后面会详细讲解。

所以ThreadLocal中实现线程隔离是利用在ThreadLocal中创建了threadLocalMap对象来实现的,因为在每一个线程都是独立的,所以也不要在添加的使用使用同步锁来保证线程安全性。

3、ThreadLocal类的一些思考

为什么需要ThreadLocalMap类

所以我们可以看到大致的结构是thread–>threadLocalMap–>entry–>(threadLocal,value)。那么为什么需要有一个ThreadLocalMap类型,主要是因为一个线程对象中创建的threadLocal对象可能不止一个。例如我在一个类中创建了两个threadLocal对象,然后分别调用threadLocal.set来进行变量的保存,那么在Thread对象中就可以将这两个threadLocal对象分别塞入到threadLocalMap管理的entry数组中。其中插入entry数组的位置由firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1)来决定,这边firstKey表示一个threadLocal对象,这边threadLocalHashCode是为了获取一个threadLocal对象对应的hash值,实现是nextHashCode.getAndAdd(HASH_INCREMENT),主要靠主要是靠一个AtomicInteger类对象调用getAndAdd来得到一个哈希值,HASH_INCREMENT的值为0x61c88647,选这个值主要是为了将隐式顺序线程本地id转换为接近最优扩散的乘法哈希值,用于两倍大小的表。

为什么在ThreadLocalMap中threadLocal键值要为弱引用

假如每个key都强引用指向ThreadLocal的对象,那么这个ThreadLocal对象就会因为和Entry对象存在强引用关联而无法被GC回收,在回顾一下上面的thread–>threadLocalMap–>entry–>(threadLocal,value)链路过程。可以知道如果threadLocal为强引用或者软引用,在强引用下除非线程结束,在软引用下除非线程结束和内存不够用了,不为上诉两种情况下ThreadLocal都不会被回收。这样很有可能会造成内存泄漏。所以这边键值要为弱引用。

修改为弱引用后依然存在的内存泄漏问题

当把ThreadLocal对象的引用置为null后,没有任何强引用指向内存中的ThreadLocal实例,threadLocals的key是它的弱引用,故它将会被GC回收。(注意当有任何强引用的值指向一个ThreadLocal实例时,他都不会被释放掉,我们创建一个变量时,如果没有特殊指定这个变量时什么引用类型的,默认的都是强引用类型。所以不需要担心我们创建的变量还没有使用好,这个ThreadLocal就被释放掉了。)

当把ThreadLocal对象的引用置为null后,没有任何强引用指向内存中的ThreadLocal实例,threadLocals的key是它的弱引用,故它将会被GC回收。但线程中threadLocals里的value却没有被回收,因为存在着一条从当前线程对象连接过来的强引用,且因为无法再通过ThreadLocal对象的get方法获取到这个value,它永远不会被访问到了,所以还存在内存泄漏问题。同样的,只有在当前线程结束后,线程对象的引用不再存在于栈中,强引用断开,内存中的Current Thread、ThreadLocalMap、value才会全部被GC回收。

解决方案

当线程的某个ThreadLocal对象使用完了,马上调用remove方法,删除Entry对象。其实只要这个线程对象及时被GC回收,这个内存泄露问题影响不大,只发生在ThreadLocal对象的引用设为null到线程结束的这段时间内。但在使用线程池的时候,线程结束是不会被销毁的,会再次使用,就可能出现真正的内存泄露。

4、ThreadLocal源码分析

public void set(T value) {
     Thread t = Thread.currentThread();
     ThreadLocalMap map = getMap(t);
     if (map != null)
         map.set(this, value);
     else
         createMap(t, value);
 }
           

这段代码在上面已经出现过了,接下来主要看下在ThreadLocalMap类中是如何实现set方法的

private void set(ThreadLocal<?> key, Object value) {

       // We don't use a fast path as with get() because it is at
       // least as common to use set() to create new entries as
       // it is to replace existing ones, in which case, a fast
       // path would fail more often than not.

       Entry[] tab = table;
       int len = tab.length;
       int i = key.threadLocalHashCode & (len-1);

		/*从i坐标开始,因为threadLocal中解决冲突的方式是线性探测的方式,所以这边从i坐标开始探测
		  其中如果发现一个节点的key值已经释放掉了,但是value还存在,则会调用replaceStaleEntry函数*/
       for (Entry e = tab[i];
            e != null;
            e = tab[i = nextIndex(i, len)]) {
           ThreadLocal<?> k = e.get();

           if (k == key) {
               e.value = value;
               return;
           }
		   /*在还没有找到k == key前先发现了一个k == null,说明这个节点的threadLocal的值已经被释放掉了
		   所以在replaceStaleEntry函数应该完成的事有
		   1、继续往下查找,如果发现k==key,就需要将这个k==key的节点放到现在这个i节点位置上来,因为是线性探测,所以存放的位置应该是最近的
		   现在的情况就是更好的位置的节点已经被释放了,如果这个节点放在后面,则需要将它挪到前面来,否则就不是很满足线性探测的规律了
		   2、如果没有找到,则也是在当前i的位置新增一个节点
		   3、如果在探测过程中发现有一些节点上的threadLocal已经被释放掉了,则应该进行清理*/
           if (k == null) {
               replaceStaleEntry(key, value, i);
               return;
           }
       }

       tab[i] = new Entry(key, value);
       int sz = ++size;
       if (!cleanSomeSlots(i, sz) && sz >= threshold)
           rehash();
   }
           

根据上面对replaceStaleEntry函数的功能分析,来一起看下这个具体函数的实现

private void replaceStaleEntry(ThreadLocal<?> key, Object value,
                                       int staleSlot) {
            Entry[] tab = table;
            int len = tab.length;
            Entry e;

      // Back up to check for prior stale entry in current run.
      // We clean out whole runs at a time to avoid continual
      // incremental rehashing due to garbage collector freeing
      // up refs in bunches (i.e., whenever the collector runs).
      int slotToExpunge = staleSlot;
      /*如果前面的接点的threadLocal也被释放掉了,那么这些节点也应该被清理掉,往前更新slotToExpunge值*/
      for (int i = prevIndex(staleSlot, len);
           (e = tab[i]) != null;
           i = prevIndex(i, len))
          if (e.get() == null)
              slotToExpunge = i;

      /*向后遍历,其实我们来查找接下里节点的key是否有等于当前需要set的key值,应该从后面开始遍历判断,毕竟是线性探测嘛
      具体前面一开始时往前探测一下前面是否有需要进行释放的节点,因为后面都是向后遍历在查找的,这边往前也遍历一下更加全面*/
      for (int i = nextIndex(staleSlot, len);
           (e = tab[i]) != null;
           i = nextIndex(i, len)) {
          ThreadLocal<?> k = e.get();

          /*这时候往后遍历发现了key相等的节点,那么需要进行位置调整,毕竟还是需要满足线性探测的嘛*/
          if (k == key) {
              e.value = value;

              tab[i] = tab[staleSlot];
              tab[staleSlot] = e;

              /*这个条件成立说明之前往前探测时并没有移动slotToExpunge,可以说明前面的那个节点要么直接为null,要么threadLocal的值并没有被释放掉*/
              if (slotToExpunge == staleSlot)
                  slotToExpunge = i;   //因为当前的节点和staleSlot位置上的进行了互换,所以需要清理这个位置的
              cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);   //清理节点,具体见下面函数具体分析
              return;
          }

          /*如果在下面遍历是又发现了一个threadLocal已经被释放掉了的节点并且在一开始往前遍历是没有发现已经被释放的节点,更新slotToExpunge值*/
          if (k == null && slotToExpunge == staleSlot)
              slotToExpunge = i;
      }

      /*没有发现有k == key的节点,就需要新增加一个节点*/
      tab[staleSlot].value = null;
      tab[staleSlot] = new Entry(key, value);

      /*如果在往前遍历或者往后遍历是发现有threadLocal对象已经被释放掉的,需要进行清理*/
      if (slotToExpunge != staleSlot)
          cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
  }
           

可以总结一下,这个在set一个节点时通过线性依次探测,如果在首先发现一个节点的threadLocal被释放了,记录当前的位置为staleSlot,接下来需要进行一系列操作

  • 首先先从这个位置往前遍历看是否有节点的threadLocal对象已经被释放掉了,记录到slotToExpunge值上
  • 然后往后探测如果后面的节点的threadLocal对象是否被释放掉了,并且在上面往前探测的时候没有发现节点,毕竟如果前面也有节点已经被释放掉了,首先应该处理的是前面的节点。如果前面的节点没有被释放掉,那么这边也需要进行记录,记录到slotToExpunge。之后需要清理这些为节点的
  • 然后如果在往后遍历时发现有k == key的,那么就应该将这个节点位置放到staleSlot位置上,然后清理当前节点。
  • 如果往后遍历时没有找到k == key的,这时候需要新加入一个节点,并且进行清理,这里清理的位置就是之前更新的slotToExpunge值。

那具体清理的方式是怎么样的呢?清理的实现方法是cleanSomeSlots函数

private boolean cleanSomeSlots(int i, int n) {
     boolean removed = false;
      Entry[] tab = table;
      int len = tab.length;
      do {
          i = nextIndex(i, len);
          Entry e = tab[i];
          if (e != null && e.get() == null) {
              n = len;
              removed = true;
              i = expungeStaleEntry(i);
          }
      } while ( (n >>>= 1) != 0);
      return removed;
  }
           

可以看到它是从当前i位置开始探测,如果没有发现新的已经被释放threadLocal的节点(我们称为脏节点),那么最多就是查找log2(n)个,通过n >>>= 1) != 0可以发现,如果期间又发现了脏节点,那么就更新n,这样又可以往后遍历log2(n)个。具体清理操作还是在expungeStaleEntry中。

private int expungeStaleEntry(int staleSlot) {
      Entry[] tab = table;
      int len = tab.length;

      // expunge entry at staleSlot
      tab[staleSlot].value = null;
      tab[staleSlot] = null;
      size--;

      // Rehash until we encounter null
      Entry e;
      int i;
      for (i = nextIndex(staleSlot, len);
           (e = tab[i]) != null;
           i = nextIndex(i, len)) {
          ThreadLocal<?> k = e.get();
          /*清理操作,将value也置为null,并且tab[i]也置为null,这样我们就可以往里面添加新的节点*/
          if (k == null) {
              e.value = null;
              tab[i] = null;
              size--;
          } else {
          	  /*发现新的hashcode对应的坐标位置发生了改变,也就是处理rehash的情况*/
              int h = k.threadLocalHashCode & (len - 1);
              if (h != i) {
                  tab[i] = null;

                  // Unlike Knuth 6.4 Algorithm R, we must scan until
                  // null because multiple entries could have been stale.
                  while (tab[h] != null)
                      h = nextIndex(h, len);
                  tab[h] = e;
              }
          }
      }
      return i;
  }
           

可以看到清理过程就是将value值和tab[i]的值都置为null,将value置为null后垃圾回收器就会去回收相应的对象,tab[i]的值置为null后就我们就可以新添加节点。并且清理的过程也不是清理完一个就好了,而是遍历到tab[i]的值为null的节点结束。

再总结一下,为什么threadLocal的set过程会如此复杂,本身它解决冲突的方式是线性探测,但是为了防止threadLocal对象内存泄漏,所以将这个threadLocal对象设置为弱引用,但是这样会有一种情况,就是threadLocal被释放掉了,但其对应的value因为是强引用,还没有被释放掉,所以在set过程中会对一些节点进行探测,如果发现是脏节点(就是threadLocal被释放掉的节点),那么就会进行清理操作,这样就造成了threadLocal的set过程具有一定的复杂性。

继续阅读