Redis Stream 发布订阅服务实现方案
1. 发布消息服务
基础发布实现
java
@Service
public class RedisStreamPublisher {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 发布消息到指定流
*/
public String publish(String streamName, Map<String, Object> message) {
// 使用 Redis Stream 的 XADD 命令发布消息
return redisTemplate.opsForStream().add(streamName, message);
}
/**
* 发布带有ID的消息
*/
public String publishWithId(String streamName, String messageId, Map<String, Object> message) {
return redisTemplate.opsForStream().add(StreamRecords.string(message).withId(messageId));
}
}2. 消息监听服务
监听器实现
java
@Component
public class OrderStreamListener implements StreamListener<String, ObjectRecord<String, String>> {
@Override
public void onMessage(ObjectRecord<String, String> message) {
try {
// 解析消息内容
String content = message.getValue();
String streamId = message.getIdAsString();
// 业务逻辑处理
processBusinessLogic(content);
} catch (Exception e) {
log.error("处理消息失败: {}", message.getId(), e);
}
}
private void processBusinessLogic(String content) {
// 具体业务处理
}
}3. 配置监听容器
容器配置
java
@Configuration
@EnableRedisStreams
public class RedisStreamConfig {
@Bean
public StreamMessageListenerContainer<String, ObjectRecord<String, String>>
streamMessageListenerContainer(
RedisConnectionFactory connectionFactory,
RedisStreamConfigProperties properties) {
// 配置监听容器选项
StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> options =
StreamMessageListenerContainerOptions.builder()
.pollTimeout(Duration.ofSeconds(5))
.batchSize(10)
.build();
StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =
StreamMessageListenerContainer.create(connectionFactory, options);
// 绑定监听器到指定流
container.receive(
StreamOffset.fromStart(properties.getStreamName()),
new OrderStreamListener()
);
container.start();
return container;
}
}4. 消费者组模式
消费者组配置
java
@Service
public class RedisStreamGroupConsumer {
public void consumeWithGroup(String streamName, String groupName, String consumerName) {
// 创建消费者组
redisTemplate.opsForStream().createGroup(streamName, groupName);
// 以消费者组模式消费消息
StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =
StreamMessageListenerContainer.create(connectionFactory, options);
container.receiveAutoAck(
Consumer.from(groupName, consumerName),
StreamOffset.create(streamName, ReadOffset.lastConsumed()),
new GroupStreamListener()
);
}
}5. 消息处理服务
业务服务封装
java
@Service
public class MessageProcessingService {
/**
* 发布订单创建消息
*/
public void publishOrderCreation(OrderCreateEvent event) {
Map<String, Object> message = new HashMap<>();
message.put("orderId", event.getOrderId());
message.put("userId", event.getUserId());
message.put("timestamp", System.currentTimeMillis());
redisStreamPublisher.publish("order-stream", message);
}
/**
* 监听并处理订单消息
*/
@StreamListener("order-stream")
public void handleOrderMessage(OrderCreateEvent event) {
// 处理订单创建逻辑
}
}6. 异常处理与重试
错误处理机制
java
@Component
public class ErrorHandlingStreamListener implements StreamListener<String, ObjectRecord<String, String>> {
@Override
public void onMessage(ObjectRecord<String, String> message) {
try {
// 业务处理
processMessage(message);
} catch (Exception e) {
// 异常处理逻辑
handleException(message, e);
}
}
private void handleException(ObjectRecord<String, String> message, Exception e) {
// 记录错误、发送到死信队列或重试
}
}核心组件总结
- RedisStreamPublisher: 消息发布服务
- StreamListener: 消息监听处理
- StreamMessageListenerContainer: 消息监听容器
- RedisStreamConfigProperties: 配置属性管理
- 消费者组模式: 支持多消费者负载均衡
这种架构提供了可靠的消息发布订阅机制,支持容错、重试和水平扩展。
基于现有组件的消息发布和消费流程
1. 消息发布流程
发布消息的核心组件
- [RedisStreamPushHandler](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\RedisStreamPushHandler.java#L14-L32):负责向Redis Stream发送消息
发布消息的实现方式
java
@Service
public class OrderMessagePublisher {
@Autowired
private RedisStreamPushHandler redisStreamPushHandler;
public void publishOrderEvent(String orderData) {
// 调用push方法将消息发布到Redis Stream
RecordId recordId = redisStreamPushHandler.push(orderData);
}
}2. 消息消费流程
消费者的定义
- [MessageConsumer](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\MessageConsumer.java#L7-L18):函数式接口,定义消息处理逻辑
消息处理实现
java
@Component
public class OrderMessageConsumer implements MessageConsumer {
@Override
public void accept(ObjectRecord<String, String> message) {
// 处理收到的消息
String content = message.getValue();
// 具体业务逻辑处理
}
}3. 配置与集成
配置属性
properties
# application.yml
spring.data.redis.stream:
streamName: order-stream
consumerGroup: order-group
consumerName: order-consumer
consumerType: group # group 或 broadcast自动配置生效
- [RedisStreamAutoConfig](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\config\RedisStreamAutoConfig.java#L28-L124):Spring Boot自动配置类
- 通过
META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports文件注册
4. 消费模式
消费者组模式(默认)
- [RedisStreamConstant.GROUP](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\constant\RedisStreamConstant.java#L11-L12):值为"group"
- 多个消费者可以加入同一消费者组,实现负载均衡
- 消息只会被组内一个消费者处理
广播模式
- [RedisStreamConstant.BROADCAST](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\constant\RedisStreamConstant.java#L14-L14):值为"broadcast"
- 所有消费者都会收到相同消息
5. 核心组件交互流程
初始化阶段:
- Spring Boot启动时创建[RedisStreamPushHandler](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\RedisStreamPushHandler.java#L14-L32)、[RedisStreamHandler](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\RedisStreamHandler.java#L17-L53)等Bean
- [RedisStreamAutoConfig](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\config\RedisStreamAutoConfig.java#L28-L124)创建消息监听容器
发布消息:
- 调用[RedisStreamPushHandler.push()](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\RedisStreamPushHandler.java#L25-L32)方法
- 消息被发送到指定的Stream
消费消息:
- [RedisStreamListener](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\RedisStreamListener.java#L15-L32)监听Stream
- 收到消息后调用[MessageConsumer.accept()](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\MessageConsumer.java#L15-L17)方法
异常处理:
- 监听容器配置了错误处理器
- [RedisStreamListener.onMessage()](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\RedisStreamListener.java#L24-L32)方法包含异常捕获
6. 使用优势
- 自动配置:通过配置文件即可完成Redis Stream配置
- 灵活消费:支持消费者组和广播两种模式
- 线程安全:内置线程池管理
- 易于扩展:只需实现[MessageConsumer](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\MessageConsumer.java#L7-L18)接口即可处理消息