Skip to content

Redis Stream 发布订阅服务实现方案

1. 发布消息服务

基础发布实现

java
@Service
public class RedisStreamPublisher {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    /**
     * 发布消息到指定流
     */
    public String publish(String streamName, Map<String, Object> message) {
        // 使用 Redis Stream 的 XADD 命令发布消息
        return redisTemplate.opsForStream().add(streamName, message);
    }
    
    /**
     * 发布带有ID的消息
     */
    public String publishWithId(String streamName, String messageId, Map<String, Object> message) {
        return redisTemplate.opsForStream().add(StreamRecords.string(message).withId(messageId));
    }
}

2. 消息监听服务

监听器实现

java
@Component
public class OrderStreamListener implements StreamListener<String, ObjectRecord<String, String>> {
    
    @Override
    public void onMessage(ObjectRecord<String, String> message) {
        try {
            // 解析消息内容
            String content = message.getValue();
            String streamId = message.getIdAsString();
            
            // 业务逻辑处理
            processBusinessLogic(content);
            
        } catch (Exception e) {
            log.error("处理消息失败: {}", message.getId(), e);
        }
    }
    
    private void processBusinessLogic(String content) {
        // 具体业务处理
    }
}

3. 配置监听容器

容器配置

java
@Configuration
@EnableRedisStreams
public class RedisStreamConfig {
    
    @Bean
    public StreamMessageListenerContainer<String, ObjectRecord<String, String>> 
           streamMessageListenerContainer(
               RedisConnectionFactory connectionFactory,
               RedisStreamConfigProperties properties) {
        
        // 配置监听容器选项
        StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> options = 
            StreamMessageListenerContainerOptions.builder()
                .pollTimeout(Duration.ofSeconds(5))
                .batchSize(10)
                .build();
        
        StreamMessageListenerContainer<String, ObjectRecord<String, String>> container = 
            StreamMessageListenerContainer.create(connectionFactory, options);
        
        // 绑定监听器到指定流
        container.receive(
            StreamOffset.fromStart(properties.getStreamName()),
            new OrderStreamListener()
        );
        
        container.start();
        return container;
    }
}

4. 消费者组模式

消费者组配置

java
@Service
public class RedisStreamGroupConsumer {
    
    public void consumeWithGroup(String streamName, String groupName, String consumerName) {
        // 创建消费者组
        redisTemplate.opsForStream().createGroup(streamName, groupName);
        
        // 以消费者组模式消费消息
        StreamMessageListenerContainer<String, ObjectRecord<String, String>> container = 
            StreamMessageListenerContainer.create(connectionFactory, options);
        
        container.receiveAutoAck(
            Consumer.from(groupName, consumerName),
            StreamOffset.create(streamName, ReadOffset.lastConsumed()),
            new GroupStreamListener()
        );
    }
}

5. 消息处理服务

业务服务封装

java
@Service
public class MessageProcessingService {
    
    /**
     * 发布订单创建消息
     */
    public void publishOrderCreation(OrderCreateEvent event) {
        Map<String, Object> message = new HashMap<>();
        message.put("orderId", event.getOrderId());
        message.put("userId", event.getUserId());
        message.put("timestamp", System.currentTimeMillis());
        
        redisStreamPublisher.publish("order-stream", message);
    }
    
    /**
     * 监听并处理订单消息
     */
    @StreamListener("order-stream")
    public void handleOrderMessage(OrderCreateEvent event) {
        // 处理订单创建逻辑
    }
}

6. 异常处理与重试

错误处理机制

java
@Component
public class ErrorHandlingStreamListener implements StreamListener<String, ObjectRecord<String, String>> {
    
    @Override
    public void onMessage(ObjectRecord<String, String> message) {
        try {
            // 业务处理
            processMessage(message);
        } catch (Exception e) {
            // 异常处理逻辑
            handleException(message, e);
        }
    }
    
    private void handleException(ObjectRecord<String, String> message, Exception e) {
        // 记录错误、发送到死信队列或重试
    }
}

核心组件总结

  1. RedisStreamPublisher: 消息发布服务
  2. StreamListener: 消息监听处理
  3. StreamMessageListenerContainer: 消息监听容器
  4. RedisStreamConfigProperties: 配置属性管理
  5. 消费者组模式: 支持多消费者负载均衡

这种架构提供了可靠的消息发布订阅机制,支持容错、重试和水平扩展。

基于现有组件的消息发布和消费流程

1. 消息发布流程

发布消息的核心组件

  • [RedisStreamPushHandler](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\RedisStreamPushHandler.java#L14-L32):负责向Redis Stream发送消息

发布消息的实现方式

java
@Service
public class OrderMessagePublisher {
    
    @Autowired
    private RedisStreamPushHandler redisStreamPushHandler;
    
    public void publishOrderEvent(String orderData) {
        // 调用push方法将消息发布到Redis Stream
        RecordId recordId = redisStreamPushHandler.push(orderData);
    }
}

2. 消息消费流程

消费者的定义

  • [MessageConsumer](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\MessageConsumer.java#L7-L18):函数式接口,定义消息处理逻辑

消息处理实现

java
@Component
public class OrderMessageConsumer implements MessageConsumer {
    
    @Override
    public void accept(ObjectRecord<String, String> message) {
        // 处理收到的消息
        String content = message.getValue();
        // 具体业务逻辑处理
    }
}

3. 配置与集成

配置属性

properties
# application.yml
spring.data.redis.stream:
  streamName: order-stream
  consumerGroup: order-group
  consumerName: order-consumer
  consumerType: group  # group 或 broadcast

自动配置生效

  • [RedisStreamAutoConfig](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\config\RedisStreamAutoConfig.java#L28-L124):Spring Boot自动配置类
  • 通过 META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports 文件注册

4. 消费模式

消费者组模式(默认)

  • [RedisStreamConstant.GROUP](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\constant\RedisStreamConstant.java#L11-L12):值为"group"
  • 多个消费者可以加入同一消费者组,实现负载均衡
  • 消息只会被组内一个消费者处理

广播模式

  • [RedisStreamConstant.BROADCAST](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\constant\RedisStreamConstant.java#L14-L14):值为"broadcast"
  • 所有消费者都会收到相同消息

5. 核心组件交互流程

  1. 初始化阶段

    • Spring Boot启动时创建[RedisStreamPushHandler](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\RedisStreamPushHandler.java#L14-L32)、[RedisStreamHandler](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\RedisStreamHandler.java#L17-L53)等Bean
    • [RedisStreamAutoConfig](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\config\RedisStreamAutoConfig.java#L28-L124)创建消息监听容器
  2. 发布消息

    • 调用[RedisStreamPushHandler.push()](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\RedisStreamPushHandler.java#L25-L32)方法
    • 消息被发送到指定的Stream
  3. 消费消息

    • [RedisStreamListener](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\RedisStreamListener.java#L15-L32)监听Stream
    • 收到消息后调用[MessageConsumer.accept()](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\MessageConsumer.java#L15-L17)方法
  4. 异常处理

    • 监听容器配置了错误处理器
    • [RedisStreamListener.onMessage()](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\RedisStreamListener.java#L24-L32)方法包含异常捕获

6. 使用优势

  • 自动配置:通过配置文件即可完成Redis Stream配置
  • 灵活消费:支持消费者组和广播两种模式
  • 线程安全:内置线程池管理
  • 易于扩展:只需实现[MessageConsumer](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\MessageConsumer.java#L7-L18)接口即可处理消息