前言
Druid是阿裡開源的資料庫連接配接池,是阿裡監控系統Dragoon的副産品,提供了強大的可監控性和基于Filter-Chain的可擴充性。
本篇文章将對Druid資料庫連接配接池的連接配接建立和銷毀進行分析。分析Druid資料庫連接配接池的源碼前,需要明确幾個概念。
- Druid資料庫連接配接池中可用的連接配接存放在一個數組connections中;
- Druid資料庫連接配接池做并發控制,主要靠一把可重入鎖以及和這把鎖關聯的兩個Condition對象;
public DruidAbstractDataSource(boolean lockFair) {
lock = new ReentrantLock(lockFair);
notEmpty = lock.newCondition();
empty = lock.newCondition();
}
- 連接配接池沒有可用連接配接時,應用線程會在notEmpty上等待,連接配接池已滿時,生産連接配接的線程會在empty上等待;
- 對連接配接保活,就是每間隔一定時間,對達到了保活間隔周期的連接配接進行有效性校驗,可以将無效連接配接銷毀,也可以防止連接配接長時間不與資料庫服務端通信。
Druid版本:1.2.11
正文
一. DruidDataSource連接配接建立
DruidDataSource連接配接的建立由CreateConnectionThread線程完成,其run() 方法如下所示。
public void run() {
initedLatch.countDown();
long lastDiscardCount = 0;
int errorCount = 0;
for (; ; ) {
try {
lock.lockInterruptibly();
} catch (InterruptedException e2) {
break;
}
long discardCount = DruidDataSource.this.discardCount;
boolean discardChanged = discardCount - lastDiscardCount > 0;
lastDiscardCount = discardCount;
try {
// emptyWait為true表示生産連接配接線程需要等待,無需生産連接配接
boolean emptyWait = true;
// 發生了建立錯誤,且池中已無連接配接,且丢棄連接配接的統計沒有改變
// 此時生産連接配接線程需要生産連接配接
if (createError != null
&& poolingCount == 0
&& !discardChanged) {
emptyWait = false;
}
if (emptyWait
&& asyncInit && createCount < initialSize) {
emptyWait = false;
}
if (emptyWait) {
// 池中已有連接配接數大于等于正在等待連接配接的應用線程數
// 且目前是非keepAlive場景
// 且目前是非連續失敗
// 此時生産連接配接的線程在empty上等待
// keepAlive && activeCount + poolingCount < minIdle時會在shrink()方法中觸發emptySingal()來添加連接配接
// isFailContinuous()傳回true表示連續失敗,即多次(預設2次)建立實體連接配接失敗
if (poolingCount >= notEmptyWaitThreadCount
&& (!(keepAlive && activeCount + poolingCount < minIdle))
&& !isFailContinuous()
) {
empty.await();
}
// 防止建立超過maxActive數量的連接配接
if (activeCount + poolingCount >= maxActive) {
empty.await();
continue;
}
}
} catch (InterruptedException e) {
// 省略
} finally {
lock.unlock();
}
PhysicalConnectionInfo connection = null;
try {
connection = createPhysicalConnection();
} catch (SQLException e) {
LOG.error("create connection SQLException, url: " + jdbcUrl
+ ", errorCode " + e.getErrorCode()
+ ", state " + e.getSQLState(), e);
errorCount++;
if (errorCount > connectionErrorRetryAttempts
&& timeBetweenConnectErrorMillis > 0) {
// 多次建立失敗
setFailContinuous(true);
// 如果配置了快速失敗,就喚醒所有在notEmpty上等待的應用線程
if (failFast) {
lock.lock();
try {
notEmpty.signalAll();
} finally {
lock.unlock();
}
}
if (breakAfterAcquireFailure) {
break;
}
try {
Thread.sleep(timeBetweenConnectErrorMillis);
} catch (InterruptedException interruptEx) {
break;
}
}
} catch (RuntimeException e) {
LOG.error("create connection RuntimeException", e);
setFailContinuous(true);
continue;
} catch (Error e) {
LOG.error("create connection Error", e);
setFailContinuous(true);
break;
}
if (connection == null) {
continue;
}
// 把連接配接添加到連接配接池
boolean result = put(connection);
if (!result) {
JdbcUtils.close(connection.getPhysicalConnection());
LOG.info("put physical connection to pool failed.");
}
errorCount = 0;
if (closing || closed) {
break;
}
}
}
CreateConnectionThread的run() 方法整體就是在一個死循環中不斷的等待,被喚醒,然後建立線程。當一個實體連接配接被建立出來後,會調用DruidDataSource#put方法将其放到連接配接池connections中,put() 方法源碼如下所示。
protected boolean put(PhysicalConnectionInfo physicalConnectionInfo) {
DruidConnectionHolder holder = null;
try {
holder = new DruidConnectionHolder(DruidDataSource.this, physicalConnectionInfo);
} catch (SQLException ex) {
// 省略
return false;
}
return put(holder, physicalConnectionInfo.createTaskId, false);
}
private boolean put(DruidConnectionHolder holder,
long createTaskId, boolean checkExists) {
// 涉及到連接配接池中連接配接數量改變的操作,都需要加鎖
lock.lock();
try {
if (this.closing || this.closed) {
return false;
}
// 池中已有連接配接數已經大于等于最大連接配接數,則不再把連接配接加到連接配接池并直接傳回false
if (poolingCount >= maxActive) {
if (createScheduler != null) {
clearCreateTask(createTaskId);
}
return false;
}
// 檢查重複添加
if (checkExists) {
for (int i = 0; i < poolingCount; i++) {
if (connections[i] == holder) {
return false;
}
}
}
// 連接配接放入連接配接池
connections[poolingCount] = holder;
// poolingCount++
incrementPoolingCount();
if (poolingCount > poolingPeak) {
poolingPeak = poolingCount;
poolingPeakTime = System.currentTimeMillis();
}
// 喚醒在notEmpty上等待連接配接的應用線程
notEmpty.signal();
notEmptySignalCount++;
if (createScheduler != null) {
clearCreateTask(createTaskId);
if (poolingCount + createTaskCount < notEmptyWaitThreadCount
&& activeCount + poolingCount + createTaskCount < maxActive) {
emptySignal();
}
}
} finally {
lock.unlock();
}
return true;
}
put() 方法會先将實體連接配接從PhysicalConnectionInfo中擷取出來并封裝成一個DruidConnectionHolder,DruidConnectionHolder就是Druid連接配接池中的連接配接。新添加的連接配接會存放在連接配接池數組connections的poolingCount位置,然後poolingCount會加1,也就是poolingCount代表着連接配接池中可以擷取的連接配接的數量。
二. DruidDataSource連接配接銷毀
DruidDataSource連接配接的銷毀由DestroyConnectionThread線程完成,其run() 方法如下所示。
public void run() {
// run()方法隻要執行了,就調用initedLatch#countDown
initedLatch.countDown();
for (; ; ) {
// 每間隔timeBetweenEvictionRunsMillis執行一次DestroyTask的run()方法
try {
if (closed || closing) {
break;
}
if (timeBetweenEvictionRunsMillis > 0) {
Thread.sleep(timeBetweenEvictionRunsMillis);
} else {
Thread.sleep(1000);
}
if (Thread.interrupted()) {
break;
}
// 執行DestroyTask的run()方法來銷毀需要銷毀的連接配接
destroyTask.run();
} catch (InterruptedException e) {
break;
}
}
}
DestroyConnectionThread的run() 方法就是在一個死循環中每間隔timeBetweenEvictionRunsMillis的時間就執行一次DestroyTask的run() 方法。DestroyTask#run方法實作如下所示。
public void run() {
// 根據一系列條件判斷并銷毀連接配接
shrink(true, keepAlive);
// RemoveAbandoned機制
if (isRemoveAbandoned()) {
removeAbandoned();
}
}
在DestroyTask#run方法中會調用DruidDataSource#shrink方法來根據設定的條件來判斷出需要銷毀和保活的連接配接。DruidDataSource#shrink方法如下所示。
// checkTime參數表示在将一個連接配接進行銷毀前,是否需要判斷一下空閑時間
public void shrink(boolean checkTime, boolean keepAlive) {
// 加鎖
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
return;
}
// needFill = keepAlive && poolingCount + activeCount < minIdle
// needFill為true時,會調用empty.signal()喚醒生産連接配接的線程來生産連接配接
boolean needFill = false;
// evictCount記錄需要銷毀的連接配接數
// keepAliveCount記錄需要保活的連接配接數
int evictCount = 0;
int keepAliveCount = 0;
int fatalErrorIncrement = fatalErrorCount - fatalErrorCountLastShrink;
fatalErrorCountLastShrink = fatalErrorCount;
try {
if (!inited) {
return;
}
// checkCount = 池中已有連接配接數 - 最小空閑連接配接數
// 正常情況下,最多能夠将前checkCount個連接配接進行銷毀
final int checkCount = poolingCount - minIdle;
final long currentTimeMillis = System.currentTimeMillis();
// 正常情況下,需要周遊池中所有連接配接
// 從前往後周遊,i為數組索引
for (int i = 0; i < poolingCount; ++i) {
DruidConnectionHolder connection = connections[i];
// 如果發生了緻命錯誤(onFatalError == true)且緻命錯誤發生時間(lastFatalErrorTimeMillis)在連接配接建立時間之後
// 把連接配接加入到保活連接配接數組中
if ((onFatalError || fatalErrorIncrement > 0)
&& (lastFatalErrorTimeMillis > connection.connectTimeMillis)) {
keepAliveConnections[keepAliveCount++] = connection;
continue;
}
if (checkTime) {
// phyTimeoutMillis表示連接配接的實體存活逾時時間,預設值是-1
if (phyTimeoutMillis > 0) {
// phyConnectTimeMillis表示連接配接的實體存活時間
long phyConnectTimeMillis = currentTimeMillis
- connection.connectTimeMillis;
// 連接配接的實體存活時間大于phyTimeoutMillis,則将這個連接配接放入evictConnections數組
if (phyConnectTimeMillis > phyTimeoutMillis) {
evictConnections[evictCount++] = connection;
continue;
}
}
// idleMillis表示連接配接的空閑時間
long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;
// minEvictableIdleTimeMillis表示連接配接允許的最小空閑時間,預設是30分鐘
// keepAliveBetweenTimeMillis表示保活間隔時間,預設是2分鐘
// 如果連接配接的空閑時間小于minEvictableIdleTimeMillis且還小于keepAliveBetweenTimeMillis
// 則connections數組中目前連接配接之後的連接配接都會滿足空閑時間小于minEvictableIdleTimeMillis且還小于keepAliveBetweenTimeMillis
// 此時跳出周遊,不再檢查其餘的連接配接
if (idleMillis < minEvictableIdleTimeMillis
&& idleMillis < keepAliveBetweenTimeMillis
) {
break;
}
// 連接配接的空閑時間大于等于允許的最小空閑時間
if (idleMillis >= minEvictableIdleTimeMillis) {
if (checkTime && i < checkCount) {
// i < checkCount這個條件的了解如下:
// 每次shrink()方法執行時,connections數組中隻有索引0到checkCount-1的連接配接才允許被銷毀
// 這樣才能保證銷毀完連接配接後,connections數組中至少還有minIdle個連接配接
evictConnections[evictCount++] = connection;
continue;
} else if (idleMillis > maxEvictableIdleTimeMillis) {
// 如果空閑時間過久,已經大于了允許的最大空閑時間(預設7小時)
// 那麼無論如何都要銷毀這個連接配接
evictConnections[evictCount++] = connection;
continue;
}
}
// 如果開啟了保活機制,且連接配接空閑時間大于等于了保活間隔時間
// 此時将連接配接加入到保活連接配接數組中
if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) {
keepAliveConnections[keepAliveCount++] = connection;
}
} else {
// checkTime為false,那麼前checkCount個連接配接直接進行銷毀,不再判斷這些連接配接的空閑時間是否超過門檻值
if (i < checkCount) {
evictConnections[evictCount++] = connection;
} else {
break;
}
}
}
// removeCount = 銷毀連接配接數 + 保活連接配接數
// removeCount表示本次從connections數組中拿掉的連接配接數
// 注:一定是從前往後拿,正常情況下最後minIdle個連接配接是安全的
int removeCount = evictCount + keepAliveCount;
if (removeCount > 0) {
// [0, 1, 2, 3, 4, null, null, null] -> [3, 4, 2, 3, 4, null, null, null]
System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
// [3, 4, 2, 3, 4, null, null, null] -> [3, 4, null, null, null, null, null, null, null]
Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
// 更新池中連接配接數
poolingCount -= removeCount;
}
keepAliveCheckCount += keepAliveCount;
// 如果池中連接配接數加上活躍連接配接數(借出去的連接配接)小于最小空閑連接配接數
// 則将needFill設為true,後續需要喚醒生産連接配接的線程來生産連接配接
if (keepAlive && poolingCount + activeCount < minIdle) {
needFill = true;
}
} finally {
lock.unlock();
}
if (evictCount > 0) {
// 周遊evictConnections數組,銷毀其中的連接配接
for (int i = 0; i < evictCount; ++i) {
DruidConnectionHolder item = evictConnections[i];
Connection connection = item.getConnection();
JdbcUtils.close(connection);
destroyCountUpdater.incrementAndGet(this);
}
Arrays.fill(evictConnections, null);
}
if (keepAliveCount > 0) {
// 周遊keepAliveConnections數組,對其中的連接配接做可用性校驗
// 校驗通過連接配接就放入connections數組,沒通過連接配接就銷毀
for (int i = keepAliveCount - 1; i >= 0; --i) {
DruidConnectionHolder holer = keepAliveConnections[i];
Connection connection = holer.getConnection();
holer.incrementKeepAliveCheckCount();
boolean validate = false;
try {
this.validateConnection(connection);
validate = true;
} catch (Throwable error) {
if (LOG.isDebugEnabled()) {
LOG.debug("keepAliveErr", error);
}
}
boolean discard = !validate;
if (validate) {
holer.lastKeepTimeMillis = System.currentTimeMillis();
boolean putOk = put(holer, 0L, true);
if (!putOk) {
discard = true;
}
}
if (discard) {
try {
connection.close();
} catch (Exception e) {
}
lock.lock();
try {
discardCount++;
if (activeCount + poolingCount <= minIdle) {
emptySignal();
}
} finally {
lock.unlock();
}
}
}
this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
Arrays.fill(keepAliveConnections, null);
}
// 如果needFill為true則喚醒生産連接配接的線程來生産連接配接
if (needFill) {
lock.lock();
try {
// 計算需要生産連接配接的個數
int fillCount = minIdle - (activeCount + poolingCount + createTaskCount);
for (int i = 0; i < fillCount; ++i) {
emptySignal();
}
} finally {
lock.unlock();
}
} else if (onFatalError || fatalErrorIncrement > 0) {
lock.lock();
try {
emptySignal();
} finally {
lock.unlock();
}
}
}
在DruidDataSource#shrink方法中,核心邏輯是周遊connections數組中的連接配接,并判斷這些連接配接是需要銷毀還是需要保活。通常情況下,connections數組中的前checkCount(checkCount = poolingCount - minIdle) 個連接配接是危險的,因為這些連接配接隻要滿足了:空閑時間 >= minEvictableIdleTimeMillis(允許的最小空閑時間),那麼就需要被銷毀,而connections數組中的最後minIdle個連接配接是相對安全的,因為這些連接配接隻有在滿足:空閑時間 > maxEvictableIdleTimeMillis(允許的最大空閑時間) 時,才會被銷毀。這麼判斷的原因,主要就是需要讓連接配接池裡能夠保證至少有minIdle個空閑連接配接可以讓應用線程擷取。
當确定好了需要銷毀和需要保活的連接配接後,此時會先将connections數組清理,隻保留安全的連接配接,這個過程示意圖如下。
最後,會周遊evictConnections數組,銷毀數組中的連接配接,周遊keepAliveConnections數組,對其中的每個連接配接做可用性校驗,如果校驗可用,那麼就重新放回connections數組,否則銷毀。
總結
連接配接的建立由一個叫做CreateConnectionThread的線程完成,整體流程就是在一個死循環中不斷的等待,被喚醒,然後建立連接配接。每一個被建立出來的實體連接配接java.sql.Connection會被封裝為一個DruidConnectionHolder,然後存放到connections數組中。
連接配接的銷毀由一個叫做DestroyConnectionThread的線程完成,核心邏輯是周期性的周遊connections數組中的連接配接,并判斷這些連接配接是需要銷毀還是需要保活,需要銷毀的連接配接最後會被實體銷毀,需要保活的連接配接最後會進行一次可用性校驗,如果校驗不通過,則進行實體銷毀。