天天看点

Day05 Druid 小结

小结

Druid 连接池是阿里巴巴开源的数据库连接池项目。Druid 连接池为监控而生,内置强大的监控功能。

1. 初始化数据库连接池

1.1 使用 Druid

Spring Boot 项目中配置数据库连接池为 Druid 并启动项目,Spring 上下文初始化完成后会初始化 DruidDataSource, 如下图:

Day05 Druid 小结

核心类:

  • DruidDataSourceAutoConfigure

    负责初始化

    DruidDataSource

  • com.alibaba.druid.pool.DruidDataSource

    负责创建高效可管理的数据库连接池

1.2 druid-spring-boot-starter 源码

DruidDataSourceAutoConfigure

druid-spring-boot-starter

下的一个配置类。此 starter 中

spring.factories

文件中定义了自动加载 DruidDataSourceAutoConfigure 配置类:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure
           

DruidDataSourceAutoConfigure

配置类,源码如下,这里初始化了 Bean :

DruidDataSourceWrapper(继承了 DruidDataSource类)

,初始化了

DruidDataSourc#init()

方法。

@Configuration
// 判断当前classpath下是否存在 DruidDataSource.class 类,若是则将当前的配置装载入spring 容器
@ConditionalOnClass(DruidDataSource.class)
// DruidDataSourceAutoConfigure 类 在 DataSourceAutoConfiguration 类之前加载
@AutoConfigureBefore(DataSourceAutoConfiguration.class)
// 把使用 @ConfigurationProperties 的类 DruidStatProperties 和 DataSourceProperties 进行了一次注入。
@EnableConfigurationProperties({DruidStatProperties.class, DataSourceProperties.class})
// 把类注入到 IoC 容器
@Import({DruidSpringAopConfiguration.class,
    DruidStatViewServletConfiguration.class,
    DruidWebStatFilterConfiguration.class,
    DruidFilterConfiguration.class})
public class DruidDataSourceAutoConfigure {

    private static final Logger LOGGER = LoggerFactory.getLogger(DruidDataSourceAutoConfigure.class);

    // 初始化 Bean: DruidDataSourceWrapper
    @Bean(initMethod = "init")
    @ConditionalOnMissingBean
    public DataSource dataSource() {
        LOGGER.info("Init DruidDataSource");
        return new DruidDataSourceWrapper();
    }
}
           

所以,下面重点看

DruidDataSource#init()

方法的源码:

1.3

DruidDataSource#init()

方法

DruidDataSource#init()

方法里主要做的事情:

  1. 初始化 jdbcUrl
  2. 初始化过滤器
  3. 根据配置的 jdbcUrl 判断数据库的类型
  4. 通过 SPI ServiceLoader ,让类加载器加载 过滤器
  5. 加载数据库驱动

    resolveDriver();

  6. 初始化检查,检查数据库类型和驱动是否支持、初始化:

    initExceptionSorter()

  7. 初始化存放连接的数组 connections = new DruidConnectionHolder[maxActive]; 并初始化 initialSize 个连接

    CreateConnectionThread,动态在connections中添加连接

  8. 创建这几个线程:日志分析线程LogStatsThread、连接创建线程CreateConnectionThread、连接销毁线程DestroyConnectionThread
  9. 向线程池提交创建连接的任务:

    this.createSchedulerFuture = createScheduler.submit(task);

1.4 其中对于并发以及安全的考虑有:

  1. 整个初始化过程使用可重入锁来保证互斥性:

    final ReentrantLock lock = this.lock;

  2. 使用 volatile 变量来保证只会初始化一次
  3. DruidDriver 使用静态常量来保证单例
  4. 使用

    AtomicLongFieldUpdater

    类来原子更新

    DruidAbstractDataSource

    volatile long

    修饰的变量
  5. 使用 CountDownLatch 来确保 「连接池创建线程」、「连接池销毁线程」 一定会创建
  6. 使用线程池来优化数据库连接的创建

2. 连接的相关处理

2.1 Druid 如何初始化创建数据库连接

在执行

DruidDataSource#init()

方法时,会初始化存放连接的数组conections,然后启动两个线程

(1)CreateConnectionThread,动态在 connections 数组中添加连接

(2)DestroyConnectionThread,动态移除activeConnections很久不用的连接

根据

initialSize

参数创建初始数量的连接:

while (poolingCount < initialSize) {
  try {
      PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
      DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
      connections[poolingCount++] = holder;
  } catch (SQLException ex) {
      LOG.error("init datasource error, url: " + this.getUrl(), ex);
      if (initExceptionThrow) {
          connectError = ex;
          break;
      } else {
          Thread.sleep(3000);
      }
  }
}
           

2.2 CreateConnectionThread 线程动态创建连接

CreateConnectionThread 是一个守护线程,当有线程等待时,便会通过 JDBC 创建不超过 maxActive 数量的数据库连接,然后将创建的连接放入 connections 数组。

public class CreateConnectionThread extends Thread {

        public CreateConnectionThread(String name){
            super(name);
            this.setDaemon(true);
        }

        public void run() {
        	for (;;) {
                ...
                ...
                // 通过 JDBC 创建一个数据库连接
                connection = createPhysicalConnection();
                ...
                ...
                // 将创建的连接放入 connections 数组
                boolean result = put(connection);
            }
        }
        
}
           

2.3 DestroyConnectionThread 线程动态缩容(释放连接)

DestroyConnectionThread 也是一个守护线程

public class DestroyConnectionThread extends Thread {

        public DestroyConnectionThread(String name){
            super(name);
            this.setDaemon(true);
        }

        public void run() {
            initedLatch.countDown();

            for (;;) {
                // 从前面开始删除
                try {
                    if (closed || closing) {
                        break;
                    }

                    if (timeBetweenEvictionRunsMillis > 0) {
                        Thread.sleep(timeBetweenEvictionRunsMillis);
                    } else {
                        Thread.sleep(1000); //
                    }

                    if (Thread.interrupted()) {
                        break;
                    }
					// 释放连接
                    destroyTask.run();
                } catch (InterruptedException e) {
                    break;
                }
            }
        }

    }
           

DestroyTask :

public class DestroyTask implements Runnable {
        public DestroyTask() {}
        @Override
        public void run() {
            // 关闭连接池 connnections[] 中空闲时间超过一个阈值的连接
            shrink(true, keepAlive);

            if (isRemoveAbandoned()) {
                // 移除 activeConnections[] 中运行超 removeAbandonedTimeoutMillis 的连接,并通过 JDBC 来 close 掉对应的数据库连接
                removeAbandoned();
            }
        }

    }
           

2.4 从 Druid 连接池获取数据库连接

/**
     * 获取从 Druid 连接池获取数据库连接
     */
    public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
        // 初始化工作
        init();

        if (filters.size() > 0) {
            FilterChainImpl filterChain = new FilterChainImpl(this);
            // 走过滤器拿数据库连接,详看FilterChainImpl#dataSource_connect
            return filterChain.dataSource_connect(this, maxWaitMillis);
        } else {
            // 直接从 connections[] 获取连接,详见 holder = takeLast();
            return getConnectionDirect(maxWaitMillis);
        }
    }
           

getConnectionDirect():

public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
        int notFullTimeoutRetryCnt = 0;
        for (;;) {
            // handle notFullTimeoutRetry
            DruidPooledConnection poolableConnection;
            try {
                // 从connections[]数组中获取连接,maxWaitMillis————拿连接的最大等待时间
                poolableConnection = getConnectionInternal(maxWaitMillis);
            } catch (GetConnectionTimeoutException ex) {
                if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) {
                    notFullTimeoutRetryCnt++;
                    if (LOG.isWarnEnabled()) {
                        LOG.warn("get connection timeout retry : " + notFullTimeoutRetryCnt);
                    }
                    continue;
                }
                throw ex;
            }

            if (testOnBorrow) {
                // 用获取的连接执行一个测试sql来校验是否生效
                boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
                ...
            }
            ...
        }
    ...
}
           

2.5 回收连接

连接池中每一个连接,用完之后必须调用close()。在close中通过 recycle() 方法把连接从 activeConections 集合中重新移动到connections[],达到重复利用连接的目的。

// 存放的是:从连接池获取连接时,从这个数组中获取
  private volatile DruidConnectionHolder[] connections;
  
  // 存放的是:正在使用的连接connection
  protected final Map<DruidPooledConnection, Object> activeConnections
           

2.6 异常处理

分析一下 Druid 中操作连接时发生异常情况的一些处理。

2.6.1 Druid 中自定义了哪些异常?

pool 包下比较关键的一些异常有:

  • DataSourceClosedException
  • DataSourceDisableException
  • DataSourceNotAvailableException
  • GetConnectionTimeoutException

当获取连接时,如果数据源是处于关闭状态,那么就会抛

DataSourceClosedException

异常,当数据源不可用时,就会抛

DataSourceDisableException

异常

private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {
        if (closed) { // 数据源是关闭状态
            connectErrorCountUpdater.incrementAndGet(this);
            // 抛 DataSourceClosedException
            throw new DataSourceClosedException("dataSource already closed at " + new Date(closeTimeMillis));
        }

        if (!enable) { // 数据源不可用
            connectErrorCountUpdater.incrementAndGet(this);

            if (disableException != null) {
                throw disableException;
            }
			// 抛 DataSourceDisableException
            throw new DataSourceDisableException();
        }
        ...
    }
           

从 connections[]数组中获取连接 时,连续失败,便会抛出

DataSourceNotAvailableException

异常

if (maxWait > 0) {
	holder = pollLast(nanos);
} else {
// 从connections[]数组中获取连接
	holder = takeLast();
}
...
// takeLast():
if (failFast && isFailContinuous()) {
	throw new DataSourceNotAvailableException(createError);
}
           

当获取连接超过指定等待时间,便会抛出

GetConnectionTimeoutException

异常:

if (createError != null) {
	throw new GetConnectionTimeoutException(errorMessage, createError);
} else {
	throw new GetConnectionTimeoutException(errorMessage);
}
           
2.6.2 ExceptionSorter

ExceptionSorter 的作用是:在数据库服务器重启、网络抖动、连接被服务器关闭等异常情况下,连接发生了不可恢复异常,将连接从连接池中移除,保证连接池在异常发生时情况下正常工作。ExceptionSorter是连接池稳定的关键特性,没有ExceptionSorter 的连接池,不能认为是有稳定性保障的连接池。

ExceptionSorter 接口

/**
 * An interface to allow for exception evaluation.
 */
public interface ExceptionSorter {

    /**
     * 判断异常是否致命
     * Returns true or false whether or not the exception is fatal.
     * 
     * @param e the exception
     * @return true or false if the exception is fatal.
     */
    boolean isExceptionFatal(SQLException e);
    
    void configFromProperties(Properties properties);
}

           

很多异常分拣类都是实现了 ExceptionSorter 这个接口,如图:

Day05 Druid 小结

ExceptionSorter#isExceptionFatal() 方法用来判断异常是否致命,如果致命那么需要进行处理,拿 DB2ExceptionSorter 举例,如果是 isExceptionFatal() 方法判断异常是致命的,那么便会进行处理:

public void handleConnectionException(DruidPooledConnection pooledConnection, Throwable t, String sql) throws SQLException {
        final DruidConnectionHolder holder = pooledConnection.getConnectionHolder();
        if (holder == null) {
            return;
        }

        errorCountUpdater.incrementAndGet(this);
        lastError = t;
        lastErrorTimeMillis = System.currentTimeMillis();

        if (t instanceof SQLException) {
            SQLException sqlEx = (SQLException) t;

            // broadcastConnectionError
            ConnectionEvent event = new ConnectionEvent(pooledConnection, sqlEx);
            for (ConnectionEventListener eventListener : holder.getConnectionEventListeners()) {
                eventListener.connectionErrorOccurred(event);
            }

            // exceptionSorter.isExceptionFatal
            if (exceptionSorter != null && exceptionSorter.isExceptionFatal(sqlEx)) {
                // 异常致命,进行处理
                handleFatalError(pooledConnection, sqlEx, sql);
            }

            throw sqlEx;
        } else {
            throw new SQLException("Error", t);
        }
    }
           

2.7 总结

连接池设计的基本思路:

(1)初始化建立连接池对象(服务启动)

(2)按照事先指定的参数创建初始数量的连接(即:空闲连接数)。

(3)对于一个访问请求,直接从连接池中得到一个连接。如果连接池对象中没有空闲的连接,且连接数没有达到最大(即:最大活跃连接数),创建一个新的连接;如果达到最大,则设定一定的超时时间,来获取连接。

(4)拿到连接访问服务。

(5)访问服务完成后释放连接(此时的释放连接,并非真正关闭,而是将其放入空闲队列中。如实际空闲连接数大于初始空闲连接数则释放连接)。

(6)释放连接池对象(服务停止、维护期间,释放连接池对象,并释放所有连接)。

3. Druid 中过滤器的装载和使用

Druid提供了基于Filter-Chain模式的扩展,使得在JDBC层做扩展非常容易。Druid内置提供了一些常用的Filter,StatFilter用于提供监控数据,Log系列Filter用于提供输出Connection、Statement、ResultSet相关的日志,WallFilter用于防御SQL注入攻击。

StatFilter——采集并提供监控数据

Druid连接池的监控信息主要是通过StatFilter 采集的,采集的信息非常全面,包括SQL执行、并发、慢查、执行时间区间分布等。具体配置可以看这个 https://github.com/alibaba/druid/wiki/%E9%85%8D%E7%BD%AE_StatFilter

WallFilter —— 基于SQL语义分析来实现防御SQL注入攻击

SQL注入攻击是黑客对数据库进行攻击的常用手段,Druid连接池内置了WallFilter 提供防SQL注入功能,在不影响性能的同时防御SQL注入攻击。

LogFilter ——输出数据库操作的日志

LogFilter 可以输出连接申请/释放,事务提交回滚,Statement的Create/Prepare/Execute/Close,ResultSet的Open/Next/Close,通过LogFilter可以详细诊断一个系统的Jdbc行为。

LogFilter有Log4j、Log4j2、Slf4j、CommsLog等实现,具体配置看这里 https://github.com/alibaba/druid/wiki/%E9%85%8D%E7%BD%AE_LogFilter

3.1 过滤器的装载

Druid 中过滤器的管理工作,主要是在 FilterManager 类中实现。其中 FilterManager#loadFilter() 负责加载 filter 并放入 filter集合中。

public static void loadFilter(List<Filter> filters, String filterName) throws SQLException {
  if (filterName.length() == 0) {
      return;
  }

  // 读取 druid-filter.properties 文件,获取 filterName 对应的过滤器类的名称
  String filterClassNames = getFilter(filterName);

  if (filterClassNames != null) {
      for (String filterClassName : filterClassNames.split(",")) {
          // 过滤器已加载,则跳过
          if (existsFilter(filters, filterClassName)) {
              continue;
          }

          // 加载该过滤器类
          Class<?> filterClass = Utils.loadClass(filterClassName);

          if (filterClass == null) {
              LOG.error("load filter error, filter not found : " + filterClassName);
              continue;
          }

          Filter filter;

          try {
              // 创建该过滤器类的实例
              filter = (Filter) filterClass.newInstance();
          } catch (ClassCastException e) {
              LOG.error("load filter error.", e);
              continue;
          } catch (InstantiationException e) {
              throw new SQLException("load managed jdbc driver event listener error. " + filterName, e);
          } catch (IllegalAccessException e) {
              throw new SQLException("load managed jdbc driver event listener error. " + filterName, e);
          } catch (RuntimeException e) {
              throw new SQLException("load managed jdbc driver event listener error. " + filterName, e);
          }

          // 将该实例添加到 过滤器 集合里
          filters.add(filter);
      }

      return;
  }

  // 已存在 filterName 对应的过滤器,直接返回
  if (existsFilter(filters, filterName)) {
      return;
  }

  // 加载 filterName 对应的过滤器
  Class<?> filterClass = Utils.loadClass(filterName);
  if (filterClass == null) {
      LOG.error("load filter error, filter not found : " + filterName);
      return;
  }

  try {
      Filter filter = (Filter) filterClass.newInstance();
      filters.add(filter);
  } catch (Exception e) {
      throw new SQLException("load managed jdbc driver event listener error. " + filterName, e);
  }
}
           

其中 getFilter() 方法 会从初始化好的 ConcurrentHashMap——aliasMap 中获取指定的 filter,而 aliasMap 则根据 druid-filter.properties 文件中配的 filter 来初始化。

Day05 Druid 小结

3.2 过滤器装载的时机

1. 初始化时——initFromSPIServiceLoader();
private void initFromSPIServiceLoader() {
        // 是否跳过装载filter
        if (loadSpifilterSkip) {
            return;
        }

        if (autoFilters == null) {
            List<Filter> filters = new ArrayList<Filter>();
            ServiceLoader<Filter> autoFilterLoader = ServiceLoader.load(Filter.class);

            for (Filter filter : autoFilterLoader) {
                // 是否自动装载
                AutoLoad autoLoad = filter.getClass().getAnnotation(AutoLoad.class);
                if (autoLoad != null && autoLoad.value()) {
                    filters.add(filter);
                }
            }
            autoFilters = filters;
        }

        for (Filter filter : autoFilters) {
            if (LOG.isInfoEnabled()) {
                LOG.info("load filter from spi :" + filter.getClass().getName());
            }
            addFilter(filter);
        }
    }
           
2. 获取数据源时
@Override
    public Connection connect(String url, Properties info) throws SQLException {
        if (!acceptsURL(url)) {
            return null;
        }

        connectCount.incrementAndGet();

        // 获取数据源时,会加载 filter: 
        // FilterManager.loadFilter(config.getFilters(), filterItem);
        DataSourceProxyImpl dataSource = getDataSource(url, info);

        return dataSource.connect(info);
    }
           

3.3 filterChain 的使用

从 Druid 连接池获取连接的时候便会使用 nextFilter() 方法从装载 filter 的集合中拿到一个 filter, 并执行这个 filter 的 dataSource_getConnection 方法,将获取到的 DruidPooledConnection 对象依次返回到前面的 filter。这里是一个递归调用,层层下沉,使得可以依次执行 filterchain 的下一个filter,当拿到 DruidPooledConnection 对象,并层层返回时,有一个好处是:返回时每经过一个 filter ,都可以执行 该 filter 的

if (conn != null) {}

里的特殊逻辑。

@Override
    public DruidPooledConnection dataSource_connect(DruidDataSource dataSource, long maxWaitMillis) throws SQLException {
        if (this.pos < filterSize) {
            DruidPooledConnection conn = nextFilter().dataSource_getConnection(this, dataSource, maxWaitMillis);
            return conn;
        }

        return dataSource.getConnectionDirect(maxWaitMillis);
    }
           

比如StatFilter#dataSource_connect():

@Override
    public DruidPooledConnection dataSource_getConnection(FilterChain chain, DruidDataSource dataSource, long maxWaitMillis) throws SQLException {
        // 将调用关系传递到filterchain的下一个filter
        DruidPooledConnection conn = chain.dataSource_connect(dataSource, maxWaitMillis);

        if (conn != null) {
            conn.setConnectedTimeNano();

            StatFilterContext.getInstance().pool_connection_open();
        }

        return conn;
    }
           

继续阅读