Skip to content

1. 组件简介

在复杂的分布式系统中,延迟任务(如订单超时自动取消、预约提醒)是必不可少的场景。虽然 RabbitMQ、RocketMQ 等消息中间件提供了延迟消息功能,但在某些轻量级或特定场景下,基于 Redis 实现延迟队列更为灵活。

然而,传统的基于 Redis(ZSet 或 List)的延迟队列在高并发场景下容易遇到性能瓶颈:

  • 热点 Key 问题:所有消息写入同一个 Key,导致单节点压力过大。
  • 消费瓶颈:单个消费者线程无法处理海量积压消息。

本组件基于 Redisson 客户端封装,引入了分片(Sharding)机制,将一个逻辑队列拆分为多个物理队列,实现了生产端的负载均衡与消费端的并行处理,并通过 Spring Boot 自动配置实现了开箱即用。


2. 核心架构设计

2.1 底层实现

组件底层强依赖 Redisson 的两个核心对象:

  • RDelayedQueue:利用 Redis 的 zset 实现消息的延迟存储与定时触发(将到期消息移动到阻塞队列)。
  • RBlockingQueue:作为目标队列,消费者通过 BLPOP 阻塞式获取就绪消息。

2.2 分片(Sharding)设计模型

这是本组件的核心亮点。为了突破单队列性能极限,我们采用了 逻辑 Topic -> 物理 Topic 的映射机制。

  • 逻辑层:业务方只关注一个 Topic,例如 order_cancel
  • 物理层:框架根据配置的 isolation-region-count(分片数),自动将其拆分为 order_cancel-0, order_cancel-1 ... order_cancel-N
  • 生产者:通过**轮询(Round-Robin)**算法将消息均匀分散到各个物理分片。
  • 消费者:框架自动启动对应数量的监听线程,一对一监听物理分片,实现并行消费

3. 核心模块与源码解析

组件主要由以下四个模块构成:

模块关键类核心职责
ConfigDelayQueueAutoConfig负责 Spring Boot 自动装配,加载 DelayQueueProperties 配置。
ContextDelayQueueContext生产者的统一入口,负责管理 Topic 与分片组合的映射关系。
CoreIsolationRegionSelector分片选择器,利用 AtomicInteger 实现轮询负载均衡。
CoreDelayConsumerQueue消费者核心,内部维护监听线程池(阻塞读)和执行线程池(异步运行)。
EventDelayQueueInitHandler启动引导器,监听 Spring 上下文启动事件,扫描并初始化消费者。

3.1 关键交互流程

mermaid
graph TD
    subgraph Producer [生产者端]
        API[DelayQueueContext.sendMessage] --> Combine[DelayQueueProduceCombine]
        Combine --> Selector[IsolationRegionSelector]
        Selector -- 轮询算法 --> Q0[Queue-0]
        Selector -- 轮询算法 --> Q1[Queue-1]
        Selector -- 轮询算法 --> Q2[Queue-2]
    end

    subgraph Redis [Redisson Server]
        Q0 -.-> R0[(Redis List 0)]
        Q1 -.-> R1[(Redis List 1)]
        Q2 -.-> R2[(Redis List 2)]
    end

    subgraph Consumer [消费者端]
        Init[DelayQueueInitHandler] -- 扫描并启动 --> Threads
        R0 --> C0[DelayConsumerQueue-0]
        R1 --> C1[DelayConsumerQueue-1]
        R2 --> C2[DelayConsumerQueue-2]
        C0 --> Task[ConsumerTask 业务逻辑]
        C1 --> Task
        C2 --> Task
    end

4. 项目实战指南 (Damai Project)

“订单超时未支付自动取消” 场景为例,演示如何在项目中使用该组件。

4.1 第一步:引入依赖与配置

application.yml 中配置分片数量和线程池参数:

yaml
delay:
  queue:
    isolation-region-count: 5  # 核心参数:将队列拆分为 5 个分片
    core-pool-size: 4          # 消费者执行业务逻辑的线程池核心大小

4.2 第二步:定义消费者 (Consumer)

业务方只需实现 ConsumerTask 接口,定义 Topic 名称和具体业务逻辑。框架会自动发现该 Bean 并启动监听。

java
@Component
public class DelayOrderCancelConsumer implements ConsumerTask {

    @Autowired
    private OrderService orderService;

    @Override
    public void execute(String content) {
        // content 为 JSON 字符串,需反序列化
        DelayOrderCancelMessageModule message = JSON.parseObject(content, DelayOrderCancelMessageModule.class);
        
        // 执行核心业务:取消订单
        orderService.cancel(message.getOrderNumber());
        System.out.println("订单自动取消执行完毕: " + message.getOrderNumber());
    }

    @Override
    public String topic() {
        // 定义逻辑 Topic,框架会自动将其扩展为 topic-0, topic-1...
        return "delay_order_cancel_topic";
    }
}

原理注脚:应用启动时,DelayQueueInitHandler 会扫描到这个 Bean,根据 isolation-region-count: 5 的配置,自动创建 5 个 DelayConsumerQueue 实例,分别监听 delay_order_cancel_topic-0delay_order_cancel_topic-4

4.3 第三步:发送延迟消息 (Producer)

注入 DelayQueueContext 统一入口,指定 Topic、消息内容和延迟时间。

java
@Component
public class DelayOrderCancelSend {

    @Autowired
    private DelayQueueContext delayQueueContext;

    public void sendMessage(DelayOrderCancelDto dto) {
        String content = JSON.toJSONString(dto);
        
        // 发送消息
        // 参数:Topic, 消息内容, 延迟数值, 时间单位
        delayQueueContext.sendMessage(
            "delay_order_cancel_topic", 
            content, 
            15, 
            TimeUnit.MINUTES
        );
        
        System.out.println("延迟取消订单任务已发送,订单号:" + dto.getOrderNumber());
    }
}

原理注脚DelayQueueContext 内部会调用 IsolationRegionSelector 进行轮询,假设当前轮询到索引 2,消息将被写入 Redis 的 delay_order_cancel_topic-2 队列中。


5. 总结

该组件通过简单的配置和接口封装,解决了传统 Redis 延迟队列的扩展性问题:

  1. 高吞吐:分片机制打散了 Key 的读写压力,支持水平扩展。
  2. 易集成:业务方只需关注 ConsumerTask 接口,无需关心底层的 Redisson 操作和线程管理。
  3. 高并发:独立的监听线程池与执行线程池模型,确保了消息消费的及时性。