一. 核心DruidDataSource类
要找一个类代表druid的话,那么非DruidDataSource这个莫属了,其核心连接的维护,连接的构建,入池,获取,收缩,销毁,以及核心监控数据都在这个类维护,所以在研究之前必须把这个类中的核心成员变量搞清楚含义,不然很难阅读源码。
先来张该类的全局信息:
看图可得知
DruidDataSource继承自
DruidAbstractDataSource和CommonDataSource。说明DruidDataSrouce是一个DataSource,可以直接getConnection获取连接。且拥有两个task实现runnable接口,三个线程继承thread,分别处理销毁任务,创建连接任务,记录统计信息等信息。
二. 核心成员变量
类名
|
描述
|
ExceptionSorter
|
用于判断SQLException对象是否致命异常
|
ValidConnectionChecker
|
用于校验指定连接对象是否有效
|
CreateConnectionThread
|
DruidDataSource的内部类,用于异步创建连接对象
|
notEmpty
|
调用notEmpty.await()时,当前线程进入等待;当连接创建完成或者回收了连接,会调用notEmpty.signal()时,将等待线程唤醒
|
empty
|
调用empty.await()时,CreateConnectionThread进入等待,调用empty.signal()时,CreateConnectionThread被唤醒,并进入创建连接
|
DestroyConnectionThread
|
DruidDataSource的内部类,用于异步检验连接对象,包括校验空闲连接的phyTimeoutMillis,minEvictableIdleTimeMillis,以及校验借出连接的removeAbandonedTimeoutMillis
|
LogStatsThread
|
DruidDataSource的内部类,用于异步记录统计信息
|
connections
|
用于存放所有连接对象
|
evictConnections
|
用于存放需要丢弃的连接对象
|
keepAliveConnections
|
用于存放需要keepAlive的连接对象
|
activeConnections
|
用于存放需要进行removeAbandoned的连接对象
|
poolingCount
|
空闲连接对象的数量
|
activeCount
|
借出连接对象的数量
|
三. 初始化
初始化的过程分为两种情况,一种是在加载druid时初始化,还有就是在获取连接时也会初始话,DruidDataSource的这个初始化时机是可选的,当我们设置init=true时,在createDataSource时就会调用DataSource.init()方法进行初始化,否则,只会在getConnection时再进行初始化,当然这个初始化肯定时只会进行一次的。
public void init() throws SQLException {
if (inited) {
return;
// bug fixed for dead lock, for issue #2980
DruidDriver.getInstance();
final ReentrantLock lock = this.lock;
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
throw new SQLException("interrupt", e);
boolean init = false;
1,第一步就是判断是否已经初始化了,如果已经初始化,那么不再进行,保证只初始化一次,inited这个字段是个volatile修饰的布尔类型
2,然后是DruidDriver.getInstance();这一步是为了提前初始化DruidDriver,因为之前有个issue提到这里在多线程下初始化可能存在死锁,所以就给提前显式初始化了
3,加锁,使用一个可重入锁,并使用可处理中断异常的方式获取锁,保证下面的处理只有一个线程能处理
try {
if (inited) {
return;
initStackTrace = Utils.toString(Thread.currentThread().getStackTrace());
this.id = DruidDriver.createDataSourceId();
if (this.id > 1) {
long delta = (this.id - 1) * 100000;
this.connectionIdSeedUpdater.addAndGet(this, delta);
this.statementIdSeedUpdater.addAndGet(this, delta);
this.resultSetIdSeedUpdater.addAndGet(this, delta);
this.transactionIdSeedUpdater.addAndGet(this, delta);
if (this.jdbcUrl != null) {
this.jdbcUrl = this.jdbcUrl.trim();
initFromWrapDriverUrl();
for (Filter filter : filters) {
filter.init(this);
if (this.dbTypeName == null || this.dbTypeName.length() == 0) {
this.dbTypeName = JdbcUtils.getDbType(jdbcUrl, null);
DbType dbType = DbType.of(this.dbTypeName);
if (dbType == DbType.mysql
|| dbType == DbType.mariadb
|| dbType == DbType.oceanbase
|| dbType == DbType.ads) {
boolean cacheServerConfigurationSet = false;
if (this.connectProperties.containsKey("cacheServerConfiguration")) {
cacheServerConfigurationSet = true;
} else if (this.jdbcUrl.indexOf("cacheServerConfiguration") != -1) {
cacheServerConfigurationSet = true;
if (cacheServerConfigurationSet) {
this.connectProperties.put("cacheServerConfiguration", "true");
}
1,这段代码就已经在锁里了,再次判断是否已经初始化,同时创建一个datasourceId,这个id是原子类,保证安全
2,如果id>1,那么说明不止一个数据源,if里面的代码猜测可能是保留一个区间段id给每个数据源使用的
3,initFromWrapDriverUrl(),是针对druid自定义的一种url格式,以jdbc:wrap-jdbc:开头,进行解析
4,this.dbTypeName = JdbcUtils.getDbType(jdbcUrl, null); 根据url前缀,确定dbType
// 一段对配置参数进行检查的代码
initFromSPIServiceLoader();
resolveDriver();
initCheck();
initExceptionSorter();
initValidConnectionChecker();
validationQueryCheck();
1,采用SPI机制加载过滤器,这部分过滤器除了放入filters,还会放入autoFilters
2,处理驱动,根据我们配置中的连接地址的协议,得到具体的驱动类型
3,initCheck,只是针对oracle和DB2,需要校验validationQuery
4,根据dbType实例化一个具体的MySqlExceptionSorter,用来处理异常,判断异常等
5,根据dbType初始化一个具体的MySqlValidConnectionChecker,并加载配置,该类会在后面起到检测连接是否有效的作用
6,校验testOnBorrow,testOnReturn,testWhileIdle参数的合法性
connections = new DruidConnectionHolder[maxActive];
evictConnections = new DruidConnectionHolder[maxActive];
keepAliveConnections = new DruidConnectionHolder[maxActive];
SQLException connectError = null;
// 创建初始连接数
// 异步创建,createScheduler为null,不进入
if (createScheduler != null && asyncInit) {
for (int i = 0; i < initialSize; ++i) {
submitCreateTask(true);
} else if (!asyncInit) {
// init connections
while (poolingCount < initialSize) {
try {
PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
connections[poolingCount++] = holder;
} catch (SQLException ex) {
LOG.error("init datasource error, url: " + this.getUrl(), ex);
if (initExceptionThrow) {
connectError = ex;
break;
} else {
Thread.sleep(3000);
if (poolingCount > 0) {
poolingPeak = poolingCount;
poolingPeakTime = System.currentTimeMillis();
createAndLogThread();
createAndStartCreatorThread();
createAndStartDestroyThread();
initedLatch.await();
init = true;
initedTime = new Date();
registerMbean();
if (connectError != null && poolingCount == 0) {
throw connectError;
if (keepAlive) {
// async fill to minIdle
if (createScheduler != null) {
for (int i = 0; i < minIdle; ++i) {
submitCreateTask(true);
} else {
this.emptySignal();
}
1,初始化connections,用于存放所有连接对象,evictConnections,用于存放需要丢弃的连接对象,keepAliveConnections,用于存放需要keepAlive的连接对象
2,这里有两种方式创建连接,一种是异步,一种是同步。我这里使用的是同步,createScheduler为null。
3,poolingCount 为空闲连接对象数量,当其小于初始化连接池大小时,不停的调用createPhysicalConnection();创建新连接,并放进去
3.1,
createPhysicalConnection(),的流程大致就是读取配置中的url,驱动以及用户密码,实例化一个ConnectionProxyImpl,再使用上面初始话的validConnectionChecker,根据配置对该连接进行校验,判断是否是可用的连接。
4,启动三个线程
// 启动监控数据记录线程
createAndLogThread();
// 启动连接创建线程
createAndStartCreatorThread();
// 启动连接检测线程
createAndStartDestroyThread();
initedLatch.await();
这里initedLatch为一个countdownlatch对象,保证当createConnectionThread和destroyConnectionThread开始run时再继续执行
5,注册MBean,会去注册DruidDataSourceStatManager和DruidDataSource,用来通过jmx监控
6,如果配置了keepAlive,且是异步创建连接,那么会提交创建任务,创建任务队列使用long数组实现,初始化为8个长度,每个位置为0则代表任务为空,不为零则就是具体的任务id,代表加入任务队列,当队列满了时,会进行扩容,大小为1.5倍,然后通过createScheduler,执行任务。
6.1,
如果配置了
keepAlive,且不是异步创建连接,那么会去调用empty.signal(),会去唤醒处于empty.await()状态的CreateConnectionThread,CreateConnectionThread这个线程只有在需要创建连接时才运行,否则会一直等待。
至此,初始话的流程已经完成,总结下就是初始化驱动实例 -> 加锁 -> 初始化属性 -> 初始化过滤器 -> 校验参数 -> 创建初始化连接并校验后加入池中 ->
创建logStatsThread、createConnectionThread和destroyConnectionThread -> 注册MBean,用于支持JMX -> 如果设置了keepAlive,通知createConnectionThread创建连接对象 -> 解锁
四. 获取连接使用
之前我们说过,DruidDataSource这个核心类其实就是一个DataSource,因为其继承了DataSource,所以它也重写了 getConnection方法,实际获取一个连接就是从这获取的
@Override
public DruidPooledConnection getConnection() throws SQLException {
return getConnection(maxWait);
public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
init();
if (filters.size() > 0) {
FilterChainImpl filterChain = new FilterChainImpl(this);
return filterChain.dataSource_connect(this, maxWaitMillis);
} else {
return getConnectionDirect(maxWaitMillis);
}
它调用了本身的一个
getConnection方法,并以配置的maxWait作为参数传递进去。获取连接之前会进行druid连接池的初始化动作,这个上面也是说了,进入初始化流程发现已经初始化过了,就不会再初始化了。
接着如果配置了过滤器,也就是stat,wall,log4j那几个插件,那么会进行插件的初始化,这里使用到了责任链模式,很经典的用法了,在网关中也常用到,之前源码分析soul网关时也讲到这中用法。然后用filterChain调用dataSource_connect方法,进行获取连接。
在filterChain中,定义了当前的数据源以及连接池信息,责任链的列表,长度,以及当前访问的位置.
@Override
public DruidPooledConnection dataSource_connect(DruidDataSource dataSource, long maxWaitMillis) throws SQLException {
if (this.pos < filterSize) {
DruidPooledConnection conn = nextFilter().dataSource_getConnection(this, dataSource, maxWaitMillis);
return conn;
return dataSource.getConnectionDirect(maxWaitMillis);
}
如上代码,判断当前位置小于责任链长度,则使用nextFilter()获取下一个实际的过滤器获取连接,其实也就是遍历责任链,用每个过滤器插件执行下dataSource_getConnection(this, dataSource, maxWaitMillis); 这个方法在Filter中就定义了,每个过滤器都实现了具体的方法,举个例子:这是statFilter的实现
@Override
public DruidPooledConnection dataSource_getConnection(FilterChain chain, DruidDataSource dataSource,
long maxWaitMillis) throws SQLException {
DruidPooledConnection conn = chain.dataSource_connect(dataSource, maxWaitMillis);
if (conn != null) {
conn.setConnectedTimeNano();
StatFilterContext.getInstance().pool_connection_open();
return conn;
}
可以看到,很骚的是,他把过滤器本身传进来了,又通过dataSource_connect调用了一次,相当于dfs递归遍历,然后又回到上面的过滤器列表的遍历,只不过这一次pos已经+1了,拿到的就是下一个过滤器。如此这样直到,pos的位置大于插件连的长度。执行这个代码dataSource.getConnectionDirect(maxWaitMillis)
这个方法字面意思就是直连
获取
连接:
public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
int notFullTimeoutRetryCnt = 0;
for (;;) {
// handle notFullTimeoutRetry
DruidPooledConnection poolableConnection;
try {
poolableConnection = getConnectionInternal(maxWaitMillis);
} catch (GetConnectionTimeoutException ex) {
if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) {
notFullTimeoutRetryCnt++;
if (LOG.isWarnEnabled()) {
LOG.warn("get connection timeout retry : " + notFullTimeoutRetryCnt);
continue;
throw ex;
}
起一个死循环,在循环里面调用getConnectionInternal(maxWaitMillis);获取连接,关键点就在这:
poolableConnection = getConnectionInternal(maxWaitMillis);
private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {
if (closed) {
connectErrorCountUpdater.incrementAndGet(this);
throw new DataSourceClosedException("dataSource already closed at " + new Date(closeTimeMillis));
if (!enable) {
connectErrorCountUpdater.incrementAndGet(this);
if (disableException != null) {
throw disableException;
throw new DataSourceDisableException();
final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait);
final int maxWaitThreadCount = this.maxWaitThreadCount;
DruidConnectionHolder holder;
for (boolean createDirect = false;;) {
if (createDirect) {
createStartNanosUpdater.set(this, System.nanoTime());
if (creatingCountUpdater.compareAndSet(this, 0, 1)) {
PhysicalConnectionInfo pyConnInfo = DruidDataSource.this.createPhysicalConnection();
holder = new DruidConnectionHolder(this, pyConnInfo);
holder.lastActiveTimeMillis = System.currentTimeMillis();
creatingCountUpdater.decrementAndGet(this);
directCreateCountUpdater.incrementAndGet(this);
if (LOG.isDebugEnabled()) {
LOG.debug("conn-direct_create ");
boolean discard = false;
lock.lock();
try {
if (activeCount < maxActive) {
activeCount++;
holder.active = true;
if (activeCount > activePeak) {
activePeak = activeCount;
activePeakTime = System.currentTimeMillis();
break;
} else {
discard = true;
} finally {
lock.unlock();
if (discard) {
JdbcUtils.close(pyConnInfo.getPhysicalConnection());
}
上面代码逻辑就是,起一个死循环,先判断是否是直接创建,刚进来肯定不是,往下走:
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException("interrupt", e);
try {
if (maxWaitThreadCount > 0
&& notEmptyWaitThreadCount >= maxWaitThreadCount) {
connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException("maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count "
+ lock.getQueueLength());
if (onFatalError
&& onFatalErrorMaxActive > 0
&& activeCount >= onFatalErrorMaxActive) {
connectErrorCountUpdater.incrementAndGet(this);
StringBuilder errorMsg = new StringBuilder();
errorMsg.append("onFatalError, activeCount ")
.append(activeCount)
.append(", onFatalErrorMaxActive ")
.append(onFatalErrorMaxActive);
if (lastFatalErrorTimeMillis > 0) {
errorMsg.append(", time '")
.append(StringUtils.formatDateTime19(
lastFatalErrorTimeMillis, TimeZone.getDefault()))
.append("'");
if (lastFatalErrorSql != null) {
errorMsg.append(", sql \n")
.append(lastFatalErrorSql);
throw new SQLException(
errorMsg.toString(), lastFatalError);
connectCount++;
if (createScheduler != null
&& poolingCount == 0
&& activeCount < maxActive
&& creatingCountUpdater.get(this) == 0
&& createScheduler instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) createScheduler;
if (executor.getQueue().size() > 0) {
createDirect = true;
continue;
if (maxWait > 0) {
holder = pollLast(nanos);
} else {
holder = takeLast();
if (holder != null) {
if (holder.discard) {
continue;
activeCount++;
holder.active = true;
if (activeCount > activePeak) {
activePeak = activeCount;
activePeakTime = System.currentTimeMillis();
}
先加锁,然后是对异常场景的一顿判断,如果配置了异步创建连接,且池中无可用连接,且借出去的连接小于最大活跃连接,且待创建任务的队列>0,则将createDirect置为true,跳过下面代码,回到循环上面,相当于这次连接池没有可用的连接了,通过DruidDataSource.this.createPhysicalConnection();就直接现场创建一个连接。
如果没有配置异步创建
if (maxWait > 0) {
holder = pollLast(nanos);
} else {
holder = takeLast();
}
如果没超时,如果无可用连接,则发一个信号创建连接,直到可用连接不为0,然后取走最后一个连接对象,下面的超时的逻辑也很类似,就是多了一个对超时时间的处理。拿到链接后,活跃连接数加1,解锁。
如果拿到的连接为null,则代表出错了,组装下错误日志,抛出异常。
正常情况,则将拿到的连接包装为DruidPooledConnection,返回。
回到最上面,我们拿到连接后,通过testOnBorrow配置,对拿到的连接进行校验,这个校验的逻辑和前面创建的连接校验的逻辑是一样的,执行下配置的select 'x'。校验结果如果不合法,则代表此链接没用,就需要抛弃掉
if (testOnBorrow) {
boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
if (!validate) {
if (LOG.isDebugEnabled()) {
LOG.debug("skip not validate connection.");
discardConnection(poolableConnection.holder);
continue;
}
抛弃的流程就是,如果连接不为null,就将连接关了,同时将该连接的holer的属性置为抛弃,活跃数量减一,如果此时使用中的连接数量小于配置的最小连接数,则发起emptySignal()信号,进行创建。不合法后,则需要继续循环再从连接池拿一个连接,直到有效。
然后是testWhileIdle配置,该配置意思是申请连接时如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效,逻辑也就这个意思
if (testWhileIdle) {
final DruidConnectionHolder holder = poolableConnection.holder;
long currentTimeMillis = System.currentTimeMillis();
long lastActiveTimeMillis = holder.lastActiveTimeMillis;
long lastExecTimeMillis = holder.lastExecTimeMillis;
long lastKeepTimeMillis = holder.lastKeepTimeMillis;
if (checkExecuteTime
&& lastExecTimeMillis != lastActiveTimeMillis) {
lastActiveTimeMillis = lastExecTimeMillis;
if (lastKeepTimeMillis > lastActiveTimeMillis) {
lastActiveTimeMillis = lastKeepTimeMillis;
long idleMillis = currentTimeMillis - lastActiveTimeMillis;
long timeBetweenEvictionRunsMillis = this.timeBetweenEvictionRunsMillis;
if (timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis
|| idleMillis < 0 // unexcepted branch
boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
if (!validate) {
if (LOG.isDebugEnabled()) {
LOG.debug("skip not validate connection.");
discardConnection(poolableConnection.holder);
continue;
}
然后就成功的拿到连接了,此时代码回到了这里:
@Override
public DruidPooledConnection dataSource_connect(DruidDataSource dataSource, long maxWaitMillis) throws SQLException {
if (this.pos < filterSize) {
DruidPooledConnection conn = nextFilter().dataSource_getConnection(this, dataSource, maxWaitMillis);
return conn;
// 此处执行完了
return dataSource.getConnectionDirect(maxWaitMillis);
}
因为这里的过滤器链是dfs递归调用的,所以回去的时候就是一层一层的会去,回到每一层,执行获取连接后的逻辑,比如StatFilter会执行:
// 拿到连接后
if (conn != null) {
conn.setConnectedTimeNano();
StatFilterContext.getInstance().pool_connection_open();
}
层层返回后,这个连接就算成功拿到了,交给spring集成相关的代码,去实际执行sql了。这里使用责任链,通过每个过滤器去获取连接的目的就是,让每个插件在拿到连接的时候都能做一下处理,不想做直接跳过即可,以后也可以很方便拓展插件。
到此获取连接的逻辑结束。
五. 使用连接
连接的上层获取动作,是从DataSourceUtils中的getConnection获取的,调用我们上面分析的DruidDataSource中获取一个实际的连接。
拿到连接后,将连接包装成一个ConnectionHolder
public static Connection doGetConnection(DataSource dataSource) throws SQLException {
Assert.notNull(dataSource, "No DataSource specified");
ConnectionHolder conHolder = (ConnectionHolder)TransactionSynchronizationManager.getResource(dataSource);
if (conHolder == null || !conHolder.hasConnection() && !conHolder.isSynchronizedWithTransaction()) {
logger.debug("Fetching JDBC Connection from DataSource");
//拿到一个连接
Connection con = fetchConnection(dataSource);
if (TransactionSynchronizationManager.isSynchronizationActive()) {
logger.debug("Registering transaction synchronization for JDBC Connection");
// 包装下
ConnectionHolder holderToUse = conHolder;
if (conHolder == null) {
holderToUse = new ConnectionHolder(con);
} else {
conHolder.setConnection(con);
holderToUse.requested();
TransactionSynchronizationManager.registerSynchronization(new DataSourceUtils.ConnectionSynchronization(holderToUse, dataSource));
holderToUse.setSynchronizedWithTransaction(true);
if (holderToUse != conHolder) {
TransactionSynchronizationManager.bindResource(dataSource, holderToUse);
return con;
} else {
conHolder.requested();
if (!conHolder.hasConnection()) {
logger.debug("Fetching resumed JDBC Connection from DataSource");
conHolder.setConnection(fetchConnection(dataSource));
return conHolder.getConnection();
}
此时spring的事务管理器,拿到了这个连接,然后到达了mybatis手中,mybatis开始初始化s'tatement。
// 此处为mybatis的 BaseStatementHandler
private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
Connection connection = this.getConnection(statementLog);
Statement stmt = handler.prepare(connection, this.transaction.getTimeout());
handler.parameterize(stmt);
return stmt;
}
然后使用连接的preparedStatement(String sql)方法,初始化preparedStatement,也就进入了druid的重写的方法。创建一个实际的preparedStatement后,用一个PreparedStatementHolder包装下。并记录一些关键的指标,用来统计。
// druid重写的创建preparedStatement方法
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
checkState();
PreparedStatementHolder stmtHolder = null;
PreparedStatementKey key = new PreparedStatementKey(sql, getCatalog(), MethodType.M1);
boolean poolPreparedStatements = holder.isPoolPreparedStatements();
if (poolPreparedStatements) {
stmtHolder = holder.getStatementPool().get(key);
if (stmtHolder == null) {
try {
stmtHolder = new PreparedStatementHolder(key, conn.prepareStatement(sql));
holder.getDataSource().incrementPreparedStatementCount();
} catch (SQLException ex) {
handleException(ex, sql);
initStatement(stmtHolder);
DruidPooledPreparedStatement rtnVal = new DruidPooledPreparedStatement(this, stmtHolder);
holder.addTrace(rtnVal);
return rtnVal;
}
然后mybatis就拿到了这个实际执行的
preparedStatement,开始实际的执行。
六. 归还连接
执行完成后,就开始归还连接了,最开始肯定是
preparedStatement 进行close,由于实际执行的是我们包装的DruidPooledPreparedStatement,所以close方法也会被重写执行,在DruidPooledConnection中closePoolableStatement(),方法实际去关闭。
public void closePoolableStatement(DruidPooledPreparedStatement stmt) throws SQLException {
PreparedStatement rawStatement = stmt.getRawPreparedStatement();
final DruidConnectionHolder holder = this.holder;
if (holder == null) {
return;
if (stmt.isPooled()) {
try {
rawStatement.clearParameters();
} catch (SQLException ex) {
this.handleException(ex, null);
if (rawStatement.getConnection().isClosed()) {
return;
LOG.error("clear parameter error", ex);
PreparedStatementHolder stmtHolder = stmt.getPreparedStatementHolder();
stmtHolder.decrementInUseCount();
if (stmt.isPooled() && holder.isPoolPreparedStatements() && stmt.exceptionCount == 0) {
holder.getStatementPool().put(stmtHolder);
stmt.clearResultSet();
holder.removeTrace(stmt);
stmtHolder.setFetchRowPeak(stmt.getFetchRowPeak());
stmt.setClosed(true); // soft set close
} else if (stmt.isPooled() && holder.isPoolPreparedStatements()) {
// the PreparedStatement threw an exception
stmt.clearResultSet();
holder.removeTrace(stmt);
holder.getStatementPool()
.remove(stmtHolder);
} else {
try {
//Connection behind the statement may be in invalid state, which will throw a SQLException.
//In this case, the exception is desired to be properly handled to remove the unusable connection from the pool.
stmt.closeInternal();
} catch (SQLException ex) {
this.handleException(ex, null);
throw ex;
} finally {
holder.getDataSource().incrementClosedPreparedStatementCount();
}
大致逻辑就是判断stmt的类型,以及参数,处理相应的数据,再调用DruidPooledStatement的close方法,进行实际的关闭。再去清除当前查询的结果的数据,对resultSet进行close。此时数据已经被查出来了,返回,开始对connect进行close。
由于我们这是连接池,如果连接好好的还能用,那么就不用真正关了,只需要给他放入池中就行,下次直接就用了。
// druidPooledConnection
@Override
public void close() throws SQLException {
if (this.disable) {
return;
DruidConnectionHolder holder = this.holder;
if (holder == null) {
if (dupCloseLogEnable) {
LOG.error("dup close");
return;
DruidAbstractDataSource dataSource = holder.getDataSource();
boolean isSameThread = this.getOwnerThread() == Thread.currentThread();
if (!isSameThread) {
dataSource.setAsyncCloseConnectionEnable(true);
if (dataSource.isAsyncCloseConnectionEnable()) {
syncClose();
return;
for (ConnectionEventListener listener : holder.getConnectionEventListeners()) {
listener.connectionClosed(new ConnectionEvent(this));
List filters = dataSource.getProxyFilters();
if (filters.size() > 0) {
FilterChainImpl filterChain = new FilterChainImpl(dataSource);
filterChain.dataSource_recycle(this);
} else {
recycle();
this.disable = true;
}
这里我们再次看到了这个filter,通过filter进行回收,为啥这么做呢?其实也是为了检测,监控。。这里我们两个filter的实际都没做啥大动作,直接调用了连接的recycle()方法。
@Override
public void dataSource_recycle(DruidPooledConnection connection) throws SQLException {
if (this.pos < filterSize) {
nextFilter().dataSource_releaseConnection(this, connection);
return;
connection.recycle();
public void recycle() throws SQLException {
if (this.disable) {
return;
DruidConnectionHolder holder = this.holder;
if (holder == null) {
if (dupCloseLogEnable) {
LOG.error("dup close");
return;
if (!this.abandoned) {
DruidAbstractDataSource dataSource = holder.getDataSource();
dataSource.recycle(this);
this.holder = null;
conn = null;
transactionInfo = null;
closed = true;
}
回收前,先看下这个连接是不是被抛弃了,抛弃了那么就不回收了,因为抛弃的连接都是不能用的了,而且也close了,不用担心资源没关闭问题。然后还是掉用连接的recycle方法。
到达最终的回收前,还是先做一顿检查,然后判断连接是否需要回滚
// check need to rollback?
if ((!isAutoCommit) && (!isReadOnly)) {
pooledConnection.rollback();
}
然后调用holder的reset方法,将holder中的一些监控参数清理重置下。
holder.reset();
public void reset() throws SQLException {
// reset default settings
if (underlyingReadOnly != defaultReadOnly) {
conn.setReadOnly(defaultReadOnly);
underlyingReadOnly = defaultReadOnly;
if (underlyingHoldability != defaultHoldability) {
conn.setHoldability(defaultHoldability);
underlyingHoldability = defaultHoldability;
if (underlyingTransactionIsolation != defaultTransactionIsolation) {
conn.setTransactionIsolation(defaultTransactionIsolation);
underlyingTransactionIsolation = defaultTransactionIsolation;
if (underlyingAutoCommit != defaultAutoCommit) {
conn.setAutoCommit(defaultAutoCommit);
underlyingAutoCommit = defaultAutoCommit;
connectionEventListeners.clear();
statementEventListeners.clear();
lock.lock();
try {
for (Object item : statementTrace.toArray()) {
Statement stmt = (Statement) item;
JdbcUtils.close(stmt);
statementTrace.clear();
} finally {
lock.unlock();
conn.clearWarnings();
}
然后再判断,当前最大使用数量是否超标了,(此时可能又创建了新的连接)超标了则不回收了。
if (holder.discard) {
return;
if (phyMaxUseCount > 0 && holder.useCount >= phyMaxUseCount) {
discardConnection(holder);
return;
}
接着是testOnReturn,该参数是归还连接时执行validationQuery检测连接是否有效,如果开了,则会校验这个连接是不是还能用,不能用就不回收,然后关闭连接。
连接能用的话,则加锁,对监控参数处理,并将该holder放入连接池末尾:
lock.lock();
try {
activeCount--;
closeCount++;
result = putLast(holder, currentTimeMillis);
recycleCount++;
} finally {
lock.unlock();
}
放入末尾不成功则会将连接关了,同时结束整个回收的流程。
if (!result) {
JdbcUtils.close(holder.conn);
LOG.info("connection recyle failed.");
}
放入成功,那么这个连接就被成功回收了,而且因为放入末尾,下次再来个连接就还会优先用它,这也是一个小设计。
至此,整个回收的逻辑就完了。
总体看下来,整个druid的运行逻辑就是,druid继承了datasource,statement,对这些核心参数用一个holder代理,并放入一个核心数组,这个数组就是连接池,接入spring工程后,mybatis从druid获取一个连接,并通过连接获取一个druid代理的preparedStatement,执行完成后,一层一层的关闭连接,在关闭连接时,如果连接还能用就归还给连接池,完成连接的复用。
同时使用一组责任链模式的filter,在建立连接,关闭连接等动作时,维护,监控一些数据,达到分析连接池中连接的目的。还维护了几个监听器,去监听连接数量的信号,去创建/销毁连接,保证核心连接池中随时有可用的连接。通过维护的监控的数据,我们可以通过控制台清晰的看到连接池的具体数量,以及判断大致的执行情况,分析我们业务的执行情况,以及是否健康。
源码分析基本到此就结束了,其他的也没啥核心逻辑了,后面在分析下,druid的设计上面的小心思,我们能从中学到什么来应用到我们自己的工程,光是这么看源码其实没啥意义,还是要多学别人的实现方式,以及如何写出优雅,拓展性强,可维护性强的代码。