druid 保活机制源码研究

    科技2024-10-08  18

    问题:

    druid 数据库连接池的 keepAlive (连接保活)参数是如何起作用的,连接池保活工作机制是如何的?

     

    代码执行流程

    DruidDataSource.init()=>

    DruidDataSource.createAndLogThread()=>

    DruidDataSource.createAndStartCreatorThread()=>

    DruidDataSource.createAndStartDestroyThread()=>

    DruidDataSource.DestroyConnectionThread.run() (内部类)=>

    com.alibaba.druid.pool.DruidDataSource.DestroyTask.run() (内部类) =>

    com.alibaba.druid.pool.DruidDataSource.shrink(boolean checkTime, boolean keepAlive)

     

    public class DestroyConnectionThread extends Thread { public DestroyConnectionThread(String name){ super(name); this.setDaemon(true); } public void run() { initedLatch.countDown(); for (;;) { // 浠庡墠闈㈠紑濮嬪垹闄� try { if (closed) { break; } // timeBetweenEvictionRunsMillis 大于 0 // Druid-ConnectionPool-Destroy 线程就睡眠 // timeBetweenEvictionRunsMillis 毫秒,该变量通过 // spring.datasource.druid.time-between-eviction-runs-millis // 参数配置,如果没配置在 1.1.21 版 druid 中默认为60秒; // 否则 Druid-ConnectionPool-Destroy 线程睡眠一秒 if (timeBetweenEvictionRunsMillis > 0) { Thread.sleep(timeBetweenEvictionRunsMillis); } else { Thread.sleep(1000); // } if (Thread.interrupted()) { break; } destroyTask.run(); } catch (InterruptedException e) { break; } } } }

     Druid-ConnectionPool-Destroy 守护线进行无限循环,每次循环睡眠中进行睡眠。 Druid-ConnectionPool-Destroy 守护线程每次循环都会调用  DestroyTask.run(),DestroyTask.run()调用 com.alibaba.druid.pool.DruidDataSource.shrink(boolean checkTime, boolean keepAlive) 执行 druid 保活逻辑。

     

    com.alibaba.druid.pool.DruidDataSource.shrink(boolean checkTime, boolean keepAlive): druid 保活逻辑是在这个方法中实现的,下面我们分析这个方法的源代码。

     

    public void shrink(boolean checkTime, boolean keepAlive) { try { lock.lockInterruptibly(); } catch (InterruptedException e) { return; } boolean needFill = false; int evictCount = 0; int keepAliveCount = 0; int fatalErrorIncrement = fatalErrorCount - fatalErrorCountLastShrink; fatalErrorCountLastShrink = fatalErrorCount; try { if (!inited) { return; } // 合并计数器(数据库连接池中存储的总连接数)减去最小空闲线程数 final int checkCount = poolingCount - minIdle; final long currentTimeMillis = System.currentTimeMillis(); for (int i = 0; i < poolingCount; ++i) { // 从数据库连接池中取出数据库连接句柄 DruidConnectionHolder connection = connections[i]; // 如果有致命错误或致命错误增量计数器大于零,并且最后致命错误时间(单位是毫秒)大于当前连接的连接时间, // 则把连接句柄添加到 keepAliveConnections 连接句柄数组,退出本次循环 if ((onFatalError || fatalErrorIncrement > 0) && (lastFatalErrorTimeMillis > connection.connectTimeMillis)) { keepAliveConnections[keepAliveCount++] = connection; continue; } // for 循环中的第一层 if // 在 DestroyConnectionThread 线程中调用 shrink() 方法时,传参 checkTime 为 true if (checkTime) { // 如果设置了物理连接超时参数,则检查连接超时是否大于物理连接超时参数,如果大于则把连接句柄添加到 evictConnections(弃用连接句柄数组) // ,退出本次循环 if (phyTimeoutMillis > 0) { long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis; if (phyConnectTimeMillis > phyTimeoutMillis) { evictConnections[evictCount++] = connection; continue; } } long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis; // minEvictableIdleTimeMillis 连接保持空闲而不被驱逐的最小时间 // keepAliveBetweenTimeMillis 保活检查间隔时间 if (idleMillis < minEvictableIdleTimeMillis && idleMillis < keepAliveBetweenTimeMillis ) { break; } if (idleMillis >= minEvictableIdleTimeMillis) { // 大于最小空闲数据库连接数的连接被添加到数据库连接句柄弃用数组, // 并退出本轮循环 if (checkTime && i < checkCount) { evictConnections[evictCount++] = connection; continue; } else if (idleMillis > maxEvictableIdleTimeMillis) { // 否则如果连接空闲时间大于连接保持空闲而不被驱逐的最大时间, // 连接句柄被添加到数据库连接句柄弃用数组,并退出本轮循环 evictConnections[evictCount++] = connection; continue; } } // 如果在 druid 参数中配置启用保活机制,并且连接空闲时间大于保活检查间隔时间, // 连接句柄被添加到数据库连接句柄保活数组。keepAliveBetweenTimeMillis // 变量的值通过 spring.datasource.druid.keep-alive-between-time-millis 参数配置,如果没配置在 druid 1.1.21 版本中默认 keepAliveBetweenTimeMillis 等于2分钟 if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) { keepAliveConnections[keepAliveCount++] = connection; } // for 循环中的第一层 if } else { // 大于最小空闲数据库连接数的连接被添加到数据库连接句柄弃用数组 if (i < checkCount) { evictConnections[evictCount++] = connection; } else { break; } } } // 删除连接计数器等于弃用连接计数器加保活连接计数器 int removeCount = evictCount + keepAliveCount; if (removeCount > 0) { // 把数据库连接句柄数组 connections(连接池),从数组下标为删除连接计数器开始到最后的元素 // 复制到 connections 第一个元素到 removeCount 个元素 System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount); // 删除数据库连接句柄数组 connections(连接池),下标为删除连接计数器开始到最后的元素设置为空 Arrays.fill(connections, poolingCount - removeCount, poolingCount, null); // poolingCount 合并计数器 = 合并计数器 - 删除计数器 poolingCount -= removeCount; } // keepAliveCheckCount 连接池保活检查连接计数器 = keepAliveCheckCount + keepAliveCount 连接池保活连接计数器 keepAliveCheckCount += keepAliveCount; // 设置 needFill = true 是为了确保 keepAliveCount 不大于零时, // 仍然可以进行保活操作 if (keepAlive && poolingCount + activeCount < minIdle) { needFill = true; } } finally { lock.unlock(); } // 如果弃用计数器大于零,在循环中取出数据库句柄数组中的连接句柄,并关闭连接增加弃用连接原子计数器, if (evictCount > 0) { for (int i = 0; i < evictCount; ++i) { DruidConnectionHolder item = evictConnections[i]; Connection connection = item.getConnection(); JdbcUtils.close(connection); destroyCountUpdater.incrementAndGet(this); } // 清空弃用数据库连接句柄数组 Arrays.fill(evictConnections, null); } if (keepAliveCount > 0) { // keep order // 通过循环从数据库连接句柄保活数组中取出连接句柄,检查连接是否正常, // 如果连接正常则把连接句柄重新加入数据库连接池(在 druid 中数据库 // 连接池就是数据库连接句柄数组) // 如果重新加入连接池失败则关闭连接 for (int i = keepAliveCount - 1; i >= 0; --i) { DruidConnectionHolder holer = keepAliveConnections[i]; Connection connection = holer.getConnection(); holer.incrementKeepAliveCheckCount(); boolean validate = false; try { this.validateConnection(connection); validate = true; } catch (Throwable error) { if (LOG.isDebugEnabled()) { LOG.debug("keepAliveErr", error); } // skip } boolean discard = !validate; if (validate) { holer.lastKeepTimeMillis = System.currentTimeMillis(); boolean putOk = put(holer, 0L); if (!putOk) { discard = true; } } if (discard) { try { connection.close(); } catch (Exception e) { // skip } lock.lock(); try { discardCount++; // 如果活动计数器 + 合并计数器 <= 最小空闲连接数, // 则唤醒 createAndStartCreatorThread 线程创建新的连接 if (activeCount + poolingCount <= minIdle) { emptySignal(); } } finally { lock.unlock(); } } } this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount); Arrays.fill(keepAliveConnections, null); } if (needFill) { lock.lock(); try { int fillCount = minIdle - (activeCount + poolingCount + createTaskCount); for (int i = 0; i < fillCount; ++i) { emptySignal(); } } finally { lock.unlock(); } } else if (onFatalError || fatalErrorIncrement > 0) { lock.lock(); try { emptySignal(); } finally { lock.unlock(); } } }

     

    这个3个 if 是同层级的

     

    总结:

    druid 数据库连接池的保活机制的整体逻辑如下:

    如果启用了连接保活机制没有设置物理连接超时参数, Druid-ConnectionPool-Destroy 守护线程(druid 1.1.21 )默认每1分钟执行一次连接池保活机制,会把有问题的连接和连接空闲时间大于连接保持空闲而不被驱逐的最大时间(参数),添加到弃用数据库连接句柄数组(如果配置了物理连接超时时间,则会系统当前时间减去连接开始时间的差值大于物理超时时间的数据库连接句柄也添加到弃用数据库连接句柄数组);其他数据库连接,连接空闲时间大于保活检查间隔时间(keepAliveBetweenTimeMillis druid 1.1.21 中默认值是2分钟,可以通过 spring.datasource.druid.keep-alive-between-time-millis 配置 keepAliveBetweenTimeMillis  的值),连接句柄被添加到数据库连接句柄保活数组。druid 关闭弃用数据库连接句柄数组中的连接,对于并对非弃用连接(数据库连接句柄保活数组中的连接)进行检查,如果连接正常则重新加入连接池,如果连接有问题则关闭连接,如果连接重新加入数据库连接池失败也会关闭连接,如果连接池中连接数小于最小空闲连接数则唤醒 Druid-ConnectionPool-Create 守护线程创建新的连接。

    不管是否启用保活机制,Druid-ConnectionPool-Destroy 守护线程都会把数据库连接池中数组下标(数据库连接池是一个数据库连接句柄对象数组)大于最小空闲连接数的异常连接和连接空闲时间大于连接保持空闲而不被驱逐的最大时间(参数)的连接添加到弃用数据库连接句柄数组,并把弃用数据库连接句柄数组中的数据库连接关闭。但数据库连接池中数组下标小于最小连接数的连接不会加入弃用数据库连接句柄数组,如果这些数据库连接被防火墙掐断或其他情况在 druid 不知情的时候关闭了,仍然会保留在数据库连接池中,druid 仍然认为这些连接是正常的。

    如果启用了连接池保活机制 Druid-ConnectionPool-Destroy 守护线程,会把 数据库连接池中数组下标小于最小连接数的连接添加到数据库连接句柄保活数组,并检查这些连接是否正常,正常重新加入到连接池,不正常则关闭连接。

    当 druid 连接池配置了 testWhileIdle = true 时,仍然有连接被防火墙掐断或其他情况在 druid 不知情的时候关闭了,导致连接池报连接超时或连接池满错误时,可以设置 keepAlive = true 启用 druid 保活机制,避免数据库连接池中的连接数在最小空闲连接数范围内,连接被掐断没被 druid 从连接池中驱除导致出现连接超时或连接池满的故障。

     

     

    下图是 springBoot 集成 druid 数据库连接池启用数据库连接池保活机制的配置参数。

     

    Processed: 0.008, SQL: 8