Redis Stream 广播与组播实现分析
1. Redis Stream 本身支持模式
Redis Stream 原生功能
- 消费者组模式:Redis Stream 本身就支持消费者组,这是其核心特性之一
- 广播模式:Redis Stream 本身不直接支持广播模式
为什么需要自实现广播模式
- Redis Stream 天然支持的是组播模式,即一个消息只能被组内的一个消费者处理
- 业务需求驱动:需要实现类似广播的功能,让多个服务实例都能收到相同消息
2. 项目中广播模式的实现
广播模式的业务场景
java
// 当设置 consumerType = "broadcast" 时
if (RedisStreamConstant.BROADCAST.equals(redisStreamConfigProperties.getConsumerType())) {
// 普通消费模式:从流开始位置消费
container.receive(StreamOffset.fromStart(redisStreamConfigProperties.getStreamName()),
redisStreamListener);
}广播模式的工作原理
- 每个服务实例都从流的起始位置开始消费
- 所有实例都会读取到相同的流数据
- 适用于缓存失效等需要多实例同步的场景
3. 缓存失效场景的问题
本地缓存失效业务场景
- 订单服务集群部署:多个实例需要同步清除本地缓存
- 商品信息变更:所有实例需要更新本地缓存
- 配置变更:所有实例需要刷新本地配置缓存
消息消费确认机制
消费者组模式(推荐用于缓存失效)
java
// 组播模式 - 每个消息只被一个消费者处理
container.receiveAutoAck(Consumer.from(redisStreamConfigProperties.getConsumerGroup(),
redisStreamConfigProperties.getConsumerName()),
StreamOffset.create(redisStreamConfigProperties.getStreamName(), ReadOffset.lastConsumed()),
redisStreamListener);广播模式的问题
- 消息重复消费:每个实例都会处理相同消息
- 资源浪费:相同的操作被执行多次
- 缺乏统一控制:无法知道消息是否已被所有实例处理
4. 消息消费确认解决方案
方案一:使用 Redis Set 记录已处理消息
java
@Service
public class MessageProcessedTracker {
@Autowired
private StringRedisTemplate redisTemplate;
public boolean markAsProcessed(String messageId) {
String key = "processed_messages:" + messageId;
Boolean added = redisTemplate.opsForSet().add(key, messageId);
return added != null && added;
}
}方案二:使用分布式锁避免重复处理
java
@Service
public class BroadcastMessageProcessor {
public void processBroadcastMessage(String messageId, String content) {
String lockKey = "message_lock:" + messageId;
// 使用分布式锁确保同一消息只被处理一次
if (acquireLock(lockKey)) {
try {
// 执行缓存清理逻辑
clearLocalCache(content);
} finally {
releaseLock(lockKey);
}
}
}
}方案三:调整业务模型
- 使用消费者组模式:让其中一个实例处理消息,然后通过其他方式通知其他实例
- 引入事件总线:使用专门的事件发布订阅机制
5. 最佳实践建议
针对缓存失效场景
- 优先考虑组播模式:让单个实例处理,通过集群通信同步
- 使用分布式协调机制:如 Zookeeper 或 Redis 实现消息去重
- 设计幂等性处理:确保重复处理不会产生副作用
消息可靠性保障
- 持久化消息状态:记录消息处理状态到数据库
- 定期清理机制:清理过期的处理记录
- 监控告警:监控消息积压和处理异常情况