Skip to content

ThreadPoolExecutor是Java中线程池的核心实现类,它有以下几个关键参数:

1. 核心参数

  1. corePoolSize(核心线程数)

    • 线程池中保持的最小线程数,即使这些线程处于空闲状态也不会被销毁
    • 当有新任务提交时,如果当前线程数小于corePoolSize,即使有空闲线程也会创建新线程
    • 在延迟队列框架中,监听线程池设置为1,执行任务线程池可通过配置调整
  2. maximumPoolSize(最大线程数)

    • 线程池中允许存在的最大线程数
    • 当任务队列满了且当前线程数小于maximumPoolSize时,会创建新线程处理任务
    • 一般应大于等于corePoolSize
  3. keepAliveTime(空闲线程存活时间)

    • 当线程数超过corePoolSize时,多余空闲线程在终止前等待新任务的最长时间
    • 如果任务量不大,可以设置较长的存活时间以减少线程创建销毁的开销
  4. unit(时间单位)

    • keepAliveTime参数的时间单位
    • 常用的有TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分钟)等
  5. workQueue(工作队列)

    • 用于保存等待执行任务的阻塞队列
    • 常见的有:
      • ArrayBlockingQueue:有界数组队列
      • LinkedBlockingQueue:可选有界链表队列
      • SynchronousQueue:同步移交队列
      • PriorityBlockingQueue:优先级队列
  6. threadFactory(线程工厂)

    • 用于创建新线程的工厂
    • 可以为线程设置有意义的名称,便于调试和监控

2. 在延迟队列框架中的应用

监听启动线程池 ([listenStartThreadPool](file://D:\Java_projects\damai\damai-redisson-framework\damai-service-delay-queue-framework\src\main\java\com\damai\core\DelayConsumerQueue.java#L75-L75)):

java
new ThreadPoolExecutor(1,1,60, TimeUnit.SECONDS,new LinkedBlockingQueue<>(), threadFactory)
  • corePoolSize = 1, maximumPoolSize = 1:只需要一个线程持续监听队列
  • keepAliveTime = 60:由于核心线程不会被回收,此参数无实际意义
  • workQueue:使用无界队列,因为监听任务不会堆积
  • threadFactory:自定义线程名称便于监控

执行任务线程池 ([executeTaskThreadPool](file://D:\Java_projects\damai\damai-redisson-framework\damai-service-delay-queue-framework\src\main\java\com\damai\core\DelayConsumerQueue.java#L98-L98)):

java
new ThreadPoolExecutor(
    corePoolSize,           // 来自配置 delay.queue.core-pool-size
    maximumPoolSize,        // 来自配置 delay.queue.maximum-pool-size
    keepAliveTime,          // 来自配置 delay.queue.keep-alive-time
    unit,                   // 来自配置 delay.queue.unit
    new LinkedBlockingQueue<>(workQueueSize), // 来自配置 delay.queue.work-queue-size
    threadFactory)
  • 所有参数均可配置:适应不同业务场景的性能需求
  • 有界队列:防止任务无限堆积导致内存溢出
  • 可调节的线程数:根据业务负载动态调整并发能力

3. 工作流程

  1. 当提交一个新任务时:

    • 如果当前线程数小于corePoolSize,创建新核心线程执行任务
    • 如果当前线程数等于或多于corePoolSize,且工作队列未满,将任务放入队列
    • 如果工作队列已满,且当前线程数小于maximumPoolSize,创建新非核心线程执行任务
    • 如果工作队列已满,且当前线程数已达maximumPoolSize,执行拒绝策略
  2. 线程回收:

    • 当线程数超过corePoolSize时,空闲时间超过keepAliveTime的线程会被销毁

这种设计使得延迟队列框架既能够保证基本的监听能力,又可以根据实际业务负载动态调整处理能力。

让我来解释这句代码的作用:

java
r -> new Thread(Thread.currentThread().getThreadGroup(), r,
                "listen-start-thread-" + listenStartThreadCount.getAndIncrement())

这是一段lambda表达式,用于创建一个自定义的线程工厂(ThreadFactory)。让我详细解释每一部分:

  1. r -> ... - 这是一个lambda表达式,实现了ThreadFactory接口的newThread(Runnable r)方法
  2. new Thread(...) - 创建一个新的线程对象
  3. Thread.currentThread().getThreadGroup() - 获取当前线程所在的线程组
  4. r - 这是要在线程中执行的任务(Runnable)
  5. "listen-start-thread-" + listenStartThreadCount.getAndIncrement() - 设置线程名称,使用原子计数器确保每个线程都有唯一的名字

让我为这部分代码添加详细注释:

DelayConsumerQueue.java

让我详细解释这段代码:

java
r -> new Thread(Thread.currentThread().getThreadGroup(), r,
                "listen-start-thread-" + listenStartThreadCount.getAndIncrement())

这是一个 Lambda 表达式,实现了 ThreadFactory 接口。让我们逐步分析它的每个部分:

代码解析

1. Lambda 表达式结构

java
r -> new Thread(...)

这实际上是 ThreadFactory 接口的简化实现:

java
// 完整形式应该是:
new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
        return new Thread(...);
    }
}

2. Thread 构造函数参数

java
new Thread(ThreadGroup group, Runnable target, String name)
  • Thread.currentThread().getThreadGroup() - 获取当前线程所在的线程组

    • 线程组用于组织和管理线程
    • 保证新建线程与创建它的线程在同一组中
  • r - 要执行的任务(Runnable)

    • 这是线程需要执行的实际工作内容
  • "listen-start-thread-" + listenStartThreadCount.getAndIncrement() - 线程名称

    • "listen-start-thread-" - 固定前缀,表明这是监听启动线程
    • listenStartThreadCount.getAndIncrement() - 原子递增计数器
      • getAndIncrement() 方法先返回当前值,然后将计数器加1
      • 确保每个线程都有唯一的序号,例如: listen-start-thread-1, listen-start-thread-2...

作用和意义

1. 线程命名的重要性

java
// 有了明确命名后,在调试和监控时可以看到:
"listen-start-thread-1"  // 清楚表明这是第1个监听启动线程
"listen-start-thread-2"  // 清楚表明这是第2个监听启动线程

2. 为什么使用原子计数器?

java
private final AtomicInteger listenStartThreadCount = new AtomicInteger(1);
  • 线程安全:多个线程同时创建线程时不会出现重复的序号
  • 自动递增:无需手动管理序号

3. 实际效果示例

假设有以下代码执行顺序:

  1. 第一次调用:创建线程命名为 "listen-start-thread-1"
  2. 第二次调用:创建线程命名为 "listen-start-thread-2"
  3. 第三次调用:创建线程命名为 "listen-start-thread-3"

在整个系统中的作用

这段代码是 [listenStartThreadPool](file://D:\Java_projects\damai\damai-redisson-framework\damai-service-delay-queue-framework\src\main\java\com\damai\core\DelayConsumerQueue.java#L75-L75) 线程池的一部分,用于:

  1. 持续监听Redis队列:保持一个后台线程不断检查是否有到期的消息
  2. 线程管理:通过命名便于监控和调试
  3. 资源控制:限制只使用一个线程执行监听任务,避免资源浪费

这种设计模式在延迟队列框架中非常重要,因为它确保了:

  • 有且仅有一个线程持续监听Redis队列
  • 线程具有明确的标识,便于运维和调试
  • 线程创建过程是可控和标准化的