天天看點

Hikaricp源碼解讀(3)——ConcurrentBag介紹

3、ConcurrentBag介紹

本文以v2.7.2源碼為主進行分析

HikariCP連接配接池是基于自主實作的ConcurrentBag完成的資料連接配接的多線程共享互動,是HikariCP連接配接管理快速的其中一個關鍵點。

ConcurrentBag是一個專門的并發包裹,在連接配接池(多線程資料互動)的實作上具有比LinkedBlockingQueue和LinkedTransferQueue更優越的性能。

ConcurrentBag通過拆分

CopyOnWriteArrayList、ThreadLocal和SynchronousQueue

進行并發資料互動。

- CopyOnWriteArrayList:負責存放ConcurrentBag中全部用于出借的資源

- ThreadLocal:用于加速線程本地化資源通路

- SynchronousQueue:用于存在資源等待線程時的第一手資源交接

private final CopyOnWriteArrayList<T> sharedList;
private final ThreadLocal<List<Object>> threadList;
private final SynchronousQueue<T> handoffQueue;
           

ConcurrentBag中全部的資源均隻能通過add方法進行添加,隻能通過remove方法進行移出。

public void add(final T bagEntry)
{
   if (closed) {
      LOGGER.info("ConcurrentBag has been closed, ignoring add()");
      throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
   }

   sharedList.add(bagEntry); //新添加的資源優先放入CopyOnWriteArrayList

   // 當有等待資源的線程時,将資源交到某個等待線程後才傳回(SynchronousQueue)
   while (waiters.get() >  && !handoffQueue.offer(bagEntry)) {
      yield();
   }
}

public boolean remove(final T bagEntry)
{
   // 如果資源正在使用且無法進行狀态切換,則傳回失敗
   if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
      LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);
      return false;
   }

   final boolean removed = sharedList.remove(bagEntry); // 從CopyOnWriteArrayList中移出
   if (!removed && !closed) {
      LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
   }

   return removed;
}
           

ConcurrentBag中通過borrow方法進行資料資源借用,通過requite方法進行資源回收,注意其中borrow方法隻提供對象引用,不移除對象,是以使用時通過borrow取出的對象必須通過requite方法進行放回,否則容易導緻記憶體洩露!

public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
{
   // 優先檢視有沒有可用的本地化的資源
   final List<Object> list = threadList.get();
   for (int i = list.size() - ; i >= ; i--) {
      final Object entry = list.remove(i);
      @SuppressWarnings("unchecked")
      final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
      if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
         return bagEntry;
      }
   }

   final int waiting = waiters.incrementAndGet();
   try {
      // 當無可用本地化資源時,周遊全部資源,檢視是否存在可用資源
      // 是以被一個線程本地化的資源也可能被另一個線程“搶走”
      for (T bagEntry : sharedList) {
         if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
            if (waiting > ) {
                // 因為可能“搶走”了其他線程的資源,是以提醒包裹進行資源添加
               listener.addBagItem(waiting - );
            }
            return bagEntry;
         }
      }

      listener.addBagItem(waiting);

      timeout = timeUnit.toNanos(timeout);
      do {
         final long start = currentTime();
         // 當現有全部資源全部在使用中,等待一個被釋放的資源或者一個新資源
         final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
         if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
            return bagEntry;
         }

         timeout -= elapsedNanos(start);
      } while (timeout > _000);

      return null;
   }
   finally {
      waiters.decrementAndGet();
   }
}

public void requite(final T bagEntry)
{
   // 将狀态轉為未在使用
   bagEntry.setState(STATE_NOT_IN_USE);

   // 判斷是否存在等待線程,若存在,則直接轉手資源
   for (int i = ; waiters.get() > ; i++) {
      if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
         return;
      }
      else if ((i & ) == ) {
         parkNanos(MICROSECONDS.toNanos());
      }
      else {
         yield();
      }
   }

   // 否則,進行資源本地化
   final List<Object> threadLocalList = threadList.get();
   threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
}
           

上述代碼中的

weakThreadLocals

是用來判斷是否使用弱引用,通過下述方法初始化:

private boolean useWeakThreadLocals()
{
   try {
      // 人工指定是否使用弱引用,但是官方不推薦進行自主設定。
      if (System.getProperty("com.dareway.concurrent.useWeakReferences") != null) { 
         return Boolean.getBoolean("com.dareway.concurrent.useWeakReferences");
      }

      // 預設通過判斷初始化的ClassLoader是否是系統的ClassLoader來确定
      return getClass().getClassLoader() != ClassLoader.getSystemClassLoader();
   }
   catch (SecurityException se) {
      return true;
   }
}
           

繼續閱讀