Skip to content

Redis Stream 广播与组播实现分析

1. Redis Stream 本身支持模式

Redis Stream 原生功能

  • 消费者组模式:Redis Stream 本身就支持消费者组,这是其核心特性之一
  • 广播模式:Redis Stream 本身不直接支持广播模式

为什么需要自实现广播模式

  1. Redis Stream 天然支持的是组播模式,即一个消息只能被组内的一个消费者处理
  2. 业务需求驱动:需要实现类似广播的功能,让多个服务实例都能收到相同消息

2. 项目中广播模式的实现

广播模式的业务场景

java
// 当设置 consumerType = "broadcast" 时
if (RedisStreamConstant.BROADCAST.equals(redisStreamConfigProperties.getConsumerType())) {
    // 普通消费模式:从流开始位置消费
    container.receive(StreamOffset.fromStart(redisStreamConfigProperties.getStreamName()), 
                     redisStreamListener);
}

广播模式的工作原理

  • 每个服务实例都从流的起始位置开始消费
  • 所有实例都会读取到相同的流数据
  • 适用于缓存失效等需要多实例同步的场景

3. 缓存失效场景的问题

本地缓存失效业务场景

  • 订单服务集群部署:多个实例需要同步清除本地缓存
  • 商品信息变更:所有实例需要更新本地缓存
  • 配置变更:所有实例需要刷新本地配置缓存

消息消费确认机制

消费者组模式(推荐用于缓存失效)

java
// 组播模式 - 每个消息只被一个消费者处理
container.receiveAutoAck(Consumer.from(redisStreamConfigProperties.getConsumerGroup(),
        redisStreamConfigProperties.getConsumerName()),
        StreamOffset.create(redisStreamConfigProperties.getStreamName(), ReadOffset.lastConsumed()),
        redisStreamListener);

广播模式的问题

  • 消息重复消费:每个实例都会处理相同消息
  • 资源浪费:相同的操作被执行多次
  • 缺乏统一控制:无法知道消息是否已被所有实例处理

4. 消息消费确认解决方案

方案一:使用 Redis Set 记录已处理消息

java
@Service
public class MessageProcessedTracker {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    public boolean markAsProcessed(String messageId) {
        String key = "processed_messages:" + messageId;
        Boolean added = redisTemplate.opsForSet().add(key, messageId);
        return added != null && added;
    }
}

方案二:使用分布式锁避免重复处理

java
@Service
public class BroadcastMessageProcessor {
    
    public void processBroadcastMessage(String messageId, String content) {
        String lockKey = "message_lock:" + messageId;
        // 使用分布式锁确保同一消息只被处理一次
        if (acquireLock(lockKey)) {
            try {
                // 执行缓存清理逻辑
                clearLocalCache(content);
            } finally {
                releaseLock(lockKey);
            }
        }
    }
}

方案三:调整业务模型

  • 使用消费者组模式:让其中一个实例处理消息,然后通过其他方式通知其他实例
  • 引入事件总线:使用专门的事件发布订阅机制

5. 最佳实践建议

针对缓存失效场景

  1. 优先考虑组播模式:让单个实例处理,通过集群通信同步
  2. 使用分布式协调机制:如 Zookeeper 或 Redis 实现消息去重
  3. 设计幂等性处理:确保重复处理不会产生副作用

消息可靠性保障

  • 持久化消息状态:记录消息处理状态到数据库
  • 定期清理机制:清理过期的处理记录
  • 监控告警:监控消息积压和处理异常情况