ThreadPoolExecutor是Java中线程池的核心实现类,它有以下几个关键参数:
1. 核心参数
corePoolSize(核心线程数):
- 线程池中保持的最小线程数,即使这些线程处于空闲状态也不会被销毁
- 当有新任务提交时,如果当前线程数小于corePoolSize,即使有空闲线程也会创建新线程
- 在延迟队列框架中,监听线程池设置为1,执行任务线程池可通过配置调整
maximumPoolSize(最大线程数):
- 线程池中允许存在的最大线程数
- 当任务队列满了且当前线程数小于maximumPoolSize时,会创建新线程处理任务
- 一般应大于等于corePoolSize
keepAliveTime(空闲线程存活时间):
- 当线程数超过corePoolSize时,多余空闲线程在终止前等待新任务的最长时间
- 如果任务量不大,可以设置较长的存活时间以减少线程创建销毁的开销
unit(时间单位):
- keepAliveTime参数的时间单位
- 常用的有TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分钟)等
workQueue(工作队列):
- 用于保存等待执行任务的阻塞队列
- 常见的有:
- ArrayBlockingQueue:有界数组队列
- LinkedBlockingQueue:可选有界链表队列
- SynchronousQueue:同步移交队列
- PriorityBlockingQueue:优先级队列
threadFactory(线程工厂):
- 用于创建新线程的工厂
- 可以为线程设置有意义的名称,便于调试和监控
2. 在延迟队列框架中的应用
监听启动线程池 ([listenStartThreadPool](file://D:\Java_projects\damai\damai-redisson-framework\damai-service-delay-queue-framework\src\main\java\com\damai\core\DelayConsumerQueue.java#L75-L75)):
new ThreadPoolExecutor(1,1,60, TimeUnit.SECONDS,new LinkedBlockingQueue<>(), threadFactory)- corePoolSize = 1, maximumPoolSize = 1:只需要一个线程持续监听队列
- keepAliveTime = 60:由于核心线程不会被回收,此参数无实际意义
- workQueue:使用无界队列,因为监听任务不会堆积
- threadFactory:自定义线程名称便于监控
执行任务线程池 ([executeTaskThreadPool](file://D:\Java_projects\damai\damai-redisson-framework\damai-service-delay-queue-framework\src\main\java\com\damai\core\DelayConsumerQueue.java#L98-L98)):
new ThreadPoolExecutor(
corePoolSize, // 来自配置 delay.queue.core-pool-size
maximumPoolSize, // 来自配置 delay.queue.maximum-pool-size
keepAliveTime, // 来自配置 delay.queue.keep-alive-time
unit, // 来自配置 delay.queue.unit
new LinkedBlockingQueue<>(workQueueSize), // 来自配置 delay.queue.work-queue-size
threadFactory)- 所有参数均可配置:适应不同业务场景的性能需求
- 有界队列:防止任务无限堆积导致内存溢出
- 可调节的线程数:根据业务负载动态调整并发能力
3. 工作流程
当提交一个新任务时:
- 如果当前线程数小于corePoolSize,创建新核心线程执行任务
- 如果当前线程数等于或多于corePoolSize,且工作队列未满,将任务放入队列
- 如果工作队列已满,且当前线程数小于maximumPoolSize,创建新非核心线程执行任务
- 如果工作队列已满,且当前线程数已达maximumPoolSize,执行拒绝策略
线程回收:
- 当线程数超过corePoolSize时,空闲时间超过keepAliveTime的线程会被销毁
这种设计使得延迟队列框架既能够保证基本的监听能力,又可以根据实际业务负载动态调整处理能力。
让我来解释这句代码的作用:
r -> new Thread(Thread.currentThread().getThreadGroup(), r,
"listen-start-thread-" + listenStartThreadCount.getAndIncrement())这是一段lambda表达式,用于创建一个自定义的线程工厂(ThreadFactory)。让我详细解释每一部分:
r -> ...- 这是一个lambda表达式,实现了ThreadFactory接口的newThread(Runnable r)方法new Thread(...)- 创建一个新的线程对象Thread.currentThread().getThreadGroup()- 获取当前线程所在的线程组r- 这是要在线程中执行的任务(Runnable)"listen-start-thread-" + listenStartThreadCount.getAndIncrement()- 设置线程名称,使用原子计数器确保每个线程都有唯一的名字
让我为这部分代码添加详细注释:
让我详细解释这段代码:
r -> new Thread(Thread.currentThread().getThreadGroup(), r,
"listen-start-thread-" + listenStartThreadCount.getAndIncrement())这是一个 Lambda 表达式,实现了 ThreadFactory 接口。让我们逐步分析它的每个部分:
代码解析
1. Lambda 表达式结构
r -> new Thread(...)这实际上是 ThreadFactory 接口的简化实现:
// 完整形式应该是:
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(...);
}
}2. Thread 构造函数参数
new Thread(ThreadGroup group, Runnable target, String name)Thread.currentThread().getThreadGroup()- 获取当前线程所在的线程组- 线程组用于组织和管理线程
- 保证新建线程与创建它的线程在同一组中
r- 要执行的任务(Runnable)- 这是线程需要执行的实际工作内容
"listen-start-thread-" + listenStartThreadCount.getAndIncrement()- 线程名称"listen-start-thread-"- 固定前缀,表明这是监听启动线程listenStartThreadCount.getAndIncrement()- 原子递增计数器getAndIncrement()方法先返回当前值,然后将计数器加1- 确保每个线程都有唯一的序号,例如: listen-start-thread-1, listen-start-thread-2...
作用和意义
1. 线程命名的重要性
// 有了明确命名后,在调试和监控时可以看到:
"listen-start-thread-1" // 清楚表明这是第1个监听启动线程
"listen-start-thread-2" // 清楚表明这是第2个监听启动线程2. 为什么使用原子计数器?
private final AtomicInteger listenStartThreadCount = new AtomicInteger(1);- 线程安全:多个线程同时创建线程时不会出现重复的序号
- 自动递增:无需手动管理序号
3. 实际效果示例
假设有以下代码执行顺序:
- 第一次调用:创建线程命名为 "listen-start-thread-1"
- 第二次调用:创建线程命名为 "listen-start-thread-2"
- 第三次调用:创建线程命名为 "listen-start-thread-3"
在整个系统中的作用
这段代码是 [listenStartThreadPool](file://D:\Java_projects\damai\damai-redisson-framework\damai-service-delay-queue-framework\src\main\java\com\damai\core\DelayConsumerQueue.java#L75-L75) 线程池的一部分,用于:
- 持续监听Redis队列:保持一个后台线程不断检查是否有到期的消息
- 线程管理:通过命名便于监控和调试
- 资源控制:限制只使用一个线程执行监听任务,避免资源浪费
这种设计模式在延迟队列框架中非常重要,因为它确保了:
- 有且仅有一个线程持续监听Redis队列
- 线程具有明确的标识,便于运维和调试
- 线程创建过程是可控和标准化的