Druid 多資料場景下如何進行選擇?
druid的多個資料源場景下,提供了三種資料源選擇器,分别是按照名字、随機、粘性随機資料源選擇器。
資料源選擇器DataSourceSelector
// 資料源選擇器DataSourceSelector
public interface DataSourceSelector {
/**
* Return a DataSource according to the implemention.
* 傳回資料源
*/
DataSource get();
/**
* Set the target DataSource name to return.
* Wether to use this or not, it's decided by the implemention.
* 設定targer為資料源的名稱,具體是否使用該屬性由實作層去決定
*/
void setTarget(String name);
/**
* Return the name of this DataSourceSelector.
* e.g. byName
* 傳回資料源選擇器的名稱
*/
String getName();
/**
* Init the DataSourceSelector before use it.
* 初始化資料源選擇器
*/
void init();
/**
* Destroy the DataSourceSelector, maybe interrupt the Thread.
* 銷毀資料源選擇器
*/
void destroy();
}
NamedDataSourceSelector
// 根據名稱選擇資料源
// 類說明:使用方通過ThreadLocal傳遞需要使用的資料源的name來擷取對應的DataSource。如果使用方有傳遞這個名稱,則按照這個名來從資料源map(這個是構造時入參HighAvailableDataSource自帶的)中取出來。如果沒有這個名稱,則會取預設的“default”,然後再取資料源。
// 适用場景:主從資料源。通過提前設定資料源名稱(高可用資料源設定方法),做到資料源的切換。或者固定的讀寫分離資料源,或者指定不同的業務适用不同的資料源
public class NamedDataSourceSelector implements DataSourceSelector {
// 名稱選擇器資料源的構造方法。傳入 HighAvailableDataSource
public NamedDataSourceSelector(HighAvailableDataSource highAvailableDataSource) {
this.highAvailableDataSource = highAvailableDataSource;
}
@Override
public void init() {
}
@Override
public void destroy() {
}
......
//擷取資料源
@Override
public DataSource get() {
// highAvailableDataSource包含多個資料源對象的資料源類
// highAvailableDataSource屬性private Map<String, DataSource> dataSourceMap = new ConcurrentHashMap<String, DataSource>();封裝了多個資料源
if (highAvailableDataSource == null) {
return null;
}
// 擷取highAvailableDataSource資料源集合
Map<String, DataSource> dataSourceMap = highAvailableDataSource.getAvailableDataSourceMap();
if (dataSourceMap == null || dataSourceMap.isEmpty()) {
return null;
}
if (dataSourceMap.size() == 1) {
// 隻有一個資料源直接傳回
for (DataSource v : dataSourceMap.values()) {
return v;
}
}
//擷取資料源的name
String name = getTarget();
if (name == null) {
// 名稱為空,取預設資料源名稱,根據名稱擷取資料源
if (dataSourceMap.get(getDefaultName()) != null) {
return dataSourceMap.get(getDefaultName());
}
} else {
// 根據名稱擷取資料源
return dataSourceMap.get(name);
}
return null;
}
@Override
public void setTarget(String name) {
targetDataSourceName.set(name);
}
......
}
HighAvailableDataSource
//包含多個資料源對象的資料源類
//類說明:三個Selector的構造入參。它實作了javax.sql.DataSource。它的執行個體化工作在getConnection()方法,調用的時候,會初始化一個資料源集合(初始化邏輯通過DataSourceCreator),然後初始化資料源選取器,可以設定選取器類型,如果沒有設定,預設使用随機類型。初始化完畢後,使用資料源選取器擷取資料源,然後傳回該資料源的一個連結(這個是正常的jdbc擷取連結的思路)。
public class HighAvailableDataSource extends WrapperAdapter implements DataSource {
......
// init操作
public void init() {
if (inited) {
return;
}
synchronized (this) {
// 同步代碼塊
if (inited) {
// double check
return;
}
// 資料源map為空的時候
if (dataSourceMap == null || dataSourceMap.isEmpty()) {
poolUpdater.setIntervalSeconds(poolPurgeIntervalSeconds);
poolUpdater.setAllowEmptyPool(allowEmptyPoolWhenUpdate);
// 池更新器PoolUpdater初始化 具體代碼解析見下面
poolUpdater.init();
// 節點集合 具體裡面的實作采用的是觀察者模式 增加observer
createNodeMap();
}
if (selector == null) {
// 資料源選擇器為空的場景下 預設是随機的資料源選擇器
setSelector(DataSourceSelectorEnum.RANDOM.getName());
}
if (dataSourceMap == null || dataSourceMap.isEmpty()) {
LOG.warn("There is NO DataSource available!!! Please check your configuration.");
}
// 初始化完成
inited = true;
}
}
public void close() {
destroy();
}
// 銷毀資料源選擇器
public void destroy() {
if (nodeListener != null) {
// 節點監聽都登出 銷毀監聽的方式支援 FileNodeListener檔案節點、ZookeeperNodeListener節點
nodeListener.destroy();
}
if (poolUpdater != null) {
// 池更新器銷毀
poolUpdater.destroy();
}
if (selector != null) {
// 選擇器銷毀
selector.destroy();
}
if (dataSourceMap == null || dataSourceMap.isEmpty()) {
return;
}
for (DataSource dataSource : dataSourceMap.values()) {
if (dataSource instanceof DruidDataSource) {
// 關閉所有的資料源
((DruidDataSource) dataSource).close();
}
}
}
......
// 擷取連接配接
@Override
public Connection getConnection() throws SQLException {
// 初始化
init();
// 從選擇器中擷取資料源
DataSource dataSource = selector.get();
if (dataSource == null) {
LOG.warn("Can NOT obtain DataSource, return null.");
return null;
}
// 擷取資料源連接配接
return dataSource.getConnection();
}
......
private void createNodeMap() {
if (nodeListener == null) {
// 相容老的版本
// 建立一個FileNodeListener監聽資料源的檔案.
FileNodeListener listener = new FileNodeListener();
listener.setFile(dataSourceFile);
listener.setPrefix(propertyPrefix);
nodeListener = listener;
}
nodeListener.setObserver(poolUpdater);
nodeListener.init();
nodeListener.update(); // Do update in the current Thread at the startup
}
......
}
PoolUpdater
// 資料源連接配接池更新器的初始化操作
public void init() {
if (inited) {
return;
}
synchronized (this) {
// double check
if (inited) {
return;
}
if (intervalSeconds < 10) {
//間隔秒時間小于10s 異常提示
LOG.warn("CAUTION: Purge interval has been set to " + intervalSeconds
+ ". This value should NOT be too small.");
}
if (intervalSeconds <= 0) {
// 設定預設值 60s
intervalSeconds = DEFAULT_INTERVAL;
}
// 建立一個線程池
executor = Executors.newScheduledThreadPool(1);
executor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
LOG.debug("Purging the DataSource Pool every " + intervalSeconds + "s.");
try {
//異步線程執行删除資料源
removeDataSources();
} catch (Exception e) {
LOG.error("Exception occurred while removing DataSources.", e);
}
}
}, intervalSeconds, intervalSeconds, TimeUnit.SECONDS);
}
}
// 删除不使用的資料源
public void removeDataSources() {
if (nodesToDel == null || nodesToDel.isEmpty()) {
return;
}
try {
//加鎖
lock.lock();
//擷取資料源組
Map<String, DataSource> map = highAvailableDataSource.getDataSourceMap();
//需要删除的節點
Set<String> copySet = new HashSet<String>(nodesToDel);
for (String nodeName : copySet) {
LOG.info("Start removing Node " + nodeName + ".");
if (!map.containsKey(nodeName)) {
LOG.info("Node " + nodeName + " is NOT existed in the map.");
// map中不包含需要删除的節點。則nodesToDel中删除節點,并從highAvailableDataSource.BlackList删除該節點
cancelBlacklistNode(nodeName);
continue;
}
//擷取資料源
DataSource ds = map.get(nodeName);
if (ds instanceof DruidDataSource) {
//如果是 druid的資料源
DruidDataSource dds = (DruidDataSource) ds;
// 擷取目前資料源的激活數量
int activeCount = dds.getActiveCount(); // CAUTION, activeCount MAYBE changed!
if (activeCount > 0) {
//如果有激活數量不删除
LOG.warn("Node " + nodeName + " is still running [activeCount=" + activeCount
+ "], try next time.");
continue;
} else {
LOG.info("Close Node " + nodeName + " and remove it.");
try {
//目前資料源中沒有使用的數量。關閉資料源
dds.close();
} catch (Exception e) {
LOG.error("Exception occurred while closing Node " + nodeName
+ ", just remove it.", e);
}
}
}
//不是DruidDataSource的話 從map節點中删除,從highAvailableDataSource.BlackList删除該節點
map.remove(nodeName); // Remove the node directly if it is NOT a DruidDataSource.
cancelBlacklistNode(nodeName);
}
} catch (Exception e) {
LOG.error("Exception occurred while removing DataSources.", e);
} finally {
//解鎖
lock.unlock();
}
}
DataSourceCreator
// 動态建立DruidDataSource 的工具類。
// 上面有提到過,HighAvailableDataSource的底層初始化資料源集合就是使用該類。通過構造進來配置檔案,以及配置參數的字首,做解析,緊接着建立資料源集合。具體建立過程就是拿解析到的name、url、username、psw,直接建構DruidDataSource,除了這幾個基本參數外,其他參數使用高可用資料源執行個體自己的預設值(外面也可以設定)
public class DataSourceCreator {
//建立資料源
public static DruidDataSource create(String name, String url, String username, String password,
HighAvailableDataSource haDataSource) throws SQLException {
DruidDataSource dataSource = new DruidDataSource();
dataSource.setName(name + "-" + System.identityHashCode(dataSource));
dataSource.setUrl(url);
dataSource.setUsername(username);
dataSource.setPassword(password);
//設定屬性
......
//初始化
dataSource.init();
return dataSource;
}
}
RandomDataSourceSelector
// 随機的選擇資料源 RandomDataSourceSelector
// 類說明:使用随機數随機選取資料源。從資料源map(同上解釋)中,根據總大小使用随機函數(Random.nextInt())選取一個資料源傳回。
// 适用場景:多個從資料源,之前無差别
public class RandomDataSourceSelector implements DataSourceSelector {
.......
// init
@Override
public void init() {
if (highAvailableDataSource == null) {
LOG.warn("highAvailableDataSource is NULL!");
return;
}
if (!highAvailableDataSource.isTestOnBorrow() && !highAvailableDataSource.isTestOnReturn()) {
// 加載配置。初始化線程s
loadProperties();
initThreads();
} else {
LOG.info("testOnBorrow or testOnReturn has been set to true, ignore validateThread");
}
}
// 銷毀
@Override
public void destroy() {
if (runningValidateThread != null) {
// 中斷正在運作的線程
runningValidateThread.interrupt();
validateThread.setSelector(null);
}
if (runningRecoverThread != null) {
// 中斷正在運作的線程
runningRecoverThread.interrupt();
recoverThread.setSelector(null);
}
}
// 擷取資料源
@Override
public DataSource get() {
Map<String, DataSource> dataSourceMap = getDataSourceMap();
if (dataSourceMap == null || dataSourceMap.isEmpty()) {
return null;
}
// 删除黑名資料源
Collection<DataSource> targetDataSourceSet = removeBlackList(dataSourceMap);
// 移除忙綠的資料源
removeBusyDataSource(targetDataSourceSet);
// 随機擷取資料源
DataSource dataSource = getRandomDataSource(targetDataSourceSet);
return dataSource;
}
......
// 擷取所有的資料源
public Map<String, DataSource> getFullDataSourceMap() {
if (highAvailableDataSource != null) {
return highAvailableDataSource.getDataSourceMap();
}
return new HashMap<String, DataSource>();
}
// 擷取所有可用資料源
public Map<String, DataSource> getDataSourceMap() {
if (highAvailableDataSource != null) {
return highAvailableDataSource.getAvailableDataSourceMap();
}
return new HashMap<String, DataSource>();
}
// 擷取黑名單
public List<DataSource> getBlacklist() {
return blacklist;
}
//是否包含在黑名單中
public boolean containInBlacklist(DataSource dataSource) {
return dataSource != null && blacklist.contains(dataSource);
}
// 加入黑名單
public void addBlacklist(DataSource dataSource) {
if (dataSource != null && !blacklist.contains(dataSource)) {
// 資料源不為空,且黑名單中沒有該資料源
blacklist.add(dataSource);
if (dataSource instanceof DruidDataSource) {
// 測試屬性
((DruidDataSource) dataSource).setTestOnReturn(true);
}
}
}
// 移除黑名單
public void removeBlacklist(DataSource dataSource) {
if (containInBlacklist(dataSource)) {
//在黑名單中移除
blacklist.remove(dataSource);
if (dataSource instanceof DruidDataSource) {
((DruidDataSource) dataSource).setTestOnReturn(highAvailableDataSource.isTestOnReturn());
}
}
}
// 加載配置
private void loadProperties() {
//檢查間隔
checkingIntervalSeconds = loadInteger(PROP_CHECKING_INTERVAL, checkingIntervalSeconds);
//恢複間隔
recoveryIntervalSeconds = loadInteger(PROP_RECOVERY_INTERVAL, recoveryIntervalSeconds);
// 校驗休眠時間
validationSleepSeconds = loadInteger(PROP_VALIDATION_SLEEP, validationSleepSeconds);
// 黑名單門檻值
blacklistThreshold = loadInteger(PROP_BLACKLIST_THRESHOLD, blacklistThreshold);
}
......
// 初始化線程
private void initThreads() {
if (validateThread == null) {
// 校驗線程不為空
validateThread = new RandomDataSourceValidateThread(this);
validateThread.setCheckingIntervalSeconds(checkingIntervalSeconds);
validateThread.setValidationSleepSeconds(validationSleepSeconds);
validateThread.setBlacklistThreshold(blacklistThreshold);
} else {
validateThread.setSelector(this);
}
if (runningValidateThread != null) {
// 中斷正在進行的校驗線程
runningValidateThread.interrupt();
}
// 随機校驗線程
runningValidateThread = new Thread(validateThread, "RandomDataSourceSelector-validate-thread");
runningValidateThread.start();
//恢複線程
if (recoverThread == null) {
recoverThread = new RandomDataSourceRecoverThread(this);
recoverThread.setRecoverIntervalSeconds(recoveryIntervalSeconds);
recoverThread.setValidationSleepSeconds(validationSleepSeconds);
} else {
recoverThread.setSelector(this);
}
if (runningRecoverThread != null) {
//中斷正在運作的恢複線程
runningRecoverThread.interrupt();
}
//恢複線程
runningRecoverThread = new Thread(recoverThread, "RandomDataSourceSelector-recover-thread");
runningRecoverThread.start();
}
// 删除黑名單
private Collection<DataSource> removeBlackList(Map<String, DataSource> dataSourceMap) {
Collection<DataSource> dataSourceSet;
if (blacklist == null || blacklist.isEmpty() || blacklist.size() >= dataSourceMap.size()) {
dataSourceSet = dataSourceMap.values();
} else {
dataSourceSet = new HashSet<DataSource>(dataSourceMap.values());
for (DataSource b : blacklist) {
dataSourceSet.remove(b);
}
LOG.info(blacklist.size() + " Blacklist DataSource removed, return "
+ dataSourceSet.size() + " DataSource(s).");
}
return dataSourceSet;
}
// 擷取高可用的資料源。(代碼命名誤人子弟)
private void removeBusyDataSource(Collection<DataSource> dataSourceSet) {
// 高可用線程
Collection<DataSource> busyDataSourceSet = new HashSet<DataSource>();
for (DataSource ds : dataSourceSet) {
if (ds instanceof DruidDataSource && ((DruidDataSource) ds).getPoolingCount() <= 0) {
// 資料源的池計數 < =0的時候說明該資料源已經異常了,源碼中好多判斷PoolingCount為0的時候會發送emptySignal給CreateThread信号量去建立線程
busyDataSourceSet.add(ds);
}
}
if (!busyDataSourceSet.isEmpty() && busyDataSourceSet.size() < dataSourceSet.size()) {
LOG.info("Busy DataSouces: " + busyDataSourceSet.size() + "/" + dataSourceSet.size());
for (DataSource ds : busyDataSourceSet) {
//資料源集合中剔除忙綠的資料源
dataSourceSet.remove(ds);
}
}
}
// 随機擷取資料源
private DataSource getRandomDataSource(Collection<DataSource> dataSourceSet) {
DataSource[] dataSources = dataSourceSet.toArray(new DataSource[] {});
if (dataSources != null && dataSources.length > 0) {
// 随機數擷取
return dataSources[random.nextInt(dataSourceSet.size())];
}
return null;
}
......
}
StickyRandomDataSourceSelector
// 粘性随機選擇資料源 StickyRandomDataSourceSelector。基于 RandomDataSourceSelector 的擴充選擇器,它可以在一段時間内将資料源粘貼到線程。
// 類說明:粘性随機選取資料源。這是一個比較特殊的選取器。它繼承随機選取資料源,也就是說如果是第一次進來,就是走的随機選取。在選取後,它會把這個資料源跟綁定到目前線程(ThreadLocal),會持續一段時間(這個時間可以配置)
// 适用場景:避免同一個線程内多個資料庫操作在不停的切換資料源
public class StickyRandomDataSourceSelector extends RandomDataSourceSelector {
......
private ThreadLocal<StickyDataSourceHolder> holders = new ThreadLocal<StickyDataSourceHolder>();
//過期時間
private int expireSeconds = 5;
......
@Override
public DataSource get() {
StickyDataSourceHolder holder = holders.get();
// 有有效的資料源持有者
if (holder != null && isAvailable(holder)) {
// 擷取資料源
LOG.debug("Return the sticky DataSource " + holder.getDataSource().toString() + " directly.");
return holder.getDataSource();
}
LOG.debug("Return a random DataSource.");
// 随機擷取一個
DataSource dataSource = super.get();
holder = new StickyDataSourceHolder(dataSource);
holders.remove();
// 目前的holder放在目前線程内部變量中
holders.set(holder);
return dataSource;
}
private boolean isAvailable(StickyDataSourceHolder holder) {
// 通過校驗并且是有效的持有者
boolean flag = isValid(holder) && !isExpired(holder);
if (flag && holder.getDataSource() instanceof DruidDataSource) {
// 目前資料源持有者的PoolingCount數量是否>0
flag = ((DruidDataSource) holder.getDataSource()).getPoolingCount() > 0;
}
return flag;
}
private boolean isValid(StickyDataSourceHolder holder) {
// 目前持有者是有效的 并且目前資料源不在黑名單中
boolean flag = holder.isValid() && !getBlacklist().contains(holder.getDataSource());
if (!(holder.getDataSource() instanceof DruidDataSource) || !flag) {
return flag;
}
// 擷取資料源
DruidDataSource dataSource = (DruidDataSource) holder.getDataSource();
// 目前資料源的激活數量小于最大數量
return flag && dataSource.getActiveCount() < dataSource.getMaxActive();
}
// 是否過期
private boolean isExpired(StickyDataSourceHolder holder) {
return System.currentTimeMillis() - holder.getRetrievingTime() > expireSeconds * 1000;
}
......
}
總結
今天針對druid的資料源在多個資料源場景下如何進行選擇的源代碼學習,目前druid資料源選擇器提供三種方案,按照名稱、随機、粘性随機的方案。其中遺留的疑惑點關于随機選擇器中的删除忙碌的資料源的代碼中的判斷“資料源的池計數 < =0的時候,加入忙碌的資料源中”這塊業務不太能了解,放個TODO,後面補充。
已解決:随機選擇器中的removeBusyDataSource是擷取高可用資料源,資料源的池計數 < =0的時候說明該資料源已經異常了,源碼中好多判斷PoolingCount為0的時候會發送emptySignal給CreateThread信号量去建立線程