Skip to content

而本文的重点是来介绍如果有修改的操作,那该如何去解决缓存一致性呢?又比如缓存中数据都是根据节目演出时间来设置的,如果遇到了突发情况,要求节目提前下线,要如何在缓存中提前过期呢?

思考

对于这种一致性问题,可以使用通用的方案,也就是当修改数据库中的数据后,将对应的缓存清空,Redis的缓存好办,可以直接删除掉

但是本地缓存就会有个问题,如果存在多实例,那么要怎么处理?

就拿节目服务来说,假设线上部署了5个实例节点,经过一段时间运行后,每个实例都有了自己的本地缓存,那么如果进行了数据的修改操作后,就要将这5个实例节点的数据都清空

那么要如何通知这5个节点呢?可以有这几种方式:

  1. 定时任务查询 定时从库中扫描失效的数据,对于已经失效的数据就在缓存中删除。这种只能应对简单而且数据量小的业务,而且不好估算定时任务的执行时间,频率高了对数据库的压力很大,频率低了缓存又不及时被清除,而且假如某段时间没有修改数据或者主动要失效的操作,那么就白执行了。而且这种多实例的情况,就只能每个实例都要查询一遍数据库,属实没有必要
  2. 使用MQ消息中间件(Kafka、RocketMQ、RabbitMQ) MQ上又都是比较重要的业务在使用,是否有必要在这个轻量级的功能上使用MQ引入额外的中间件,说白了 就是一个取舍问题,因为这个功能是比较轻量级的,就算通知有延迟也没关系,顶多就是缓存中没有清掉呗,还有个过期时间来兜底
  3. Redis的PUB/SUB 致命的问题就是没有办法进行持久化的,如果出现网络断开、Redis宕机的话,消息就会丢失,这种也不是很推荐
  4. Redis的Stream 可以理解成是Redis对消息队列MQ的完善实现,支持分组消费和广播消费,并且可以将消息持久化

对于这种多实例清除本地缓存的业务,使用Redis的Stream是比较适合的,主要原因有以下几点:

  1. 这种通知清除缓存的功能是比较轻量级的,不是很频繁的操作,不像MQ那样是专门为了解决高并发下的问题,所以使用RedisStream就完全足够
  2. 使用RedisStream只需要连接Redis即可,而基本每个项目都需要Redis,这样就不需要再额外引入中间件,没有额外的部署成本
  3. RedisStream能够将数据保存到磁盘,以确保数据不会丢失

RedisStream相关配置

spring:
  data:
    # redis相关配置
    redis:
      database: 0
      host: 127.0.0.1
      port: 6379
      timeout: 3000
      # redisStream相关配置
      stream:
        # stream的key
        streamName: invalid_program
        # 消费类型:广播
        consumerType: broadcast

配置广播类型的消息消费,可以实现多个实例节点都能收到消息

代码细节

基于现有组件的消息发布和消费流程

1. 消息发布流程

发布消息的核心组件

  • [RedisStreamPushHandler](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\RedisStreamPushHandler.java#L14-L32):负责向Redis Stream发送消息

发布消息的实现方式

java
@Service
public class OrderMessagePublisher {
    
    @Autowired
    private RedisStreamPushHandler redisStreamPushHandler;
    
    public void publishOrderEvent(String orderData) {
        // 调用push方法将消息发布到Redis Stream
        RecordId recordId = redisStreamPushHandler.push(orderData);
    }
}

2. 消息消费流程

消费者的定义

  • [MessageConsumer](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\MessageConsumer.java#L7-L18):函数式接口,定义消息处理逻辑

消息处理实现

java
@Component
public class OrderMessageConsumer implements MessageConsumer {
    
    @Override
    public void accept(ObjectRecord<String, String> message) {
        // 处理收到的消息
        String content = message.getValue();
        // 具体业务逻辑处理
    }
}

3. 配置与集成

配置属性

properties
# application.yml
spring.data.redis.stream:
  streamName: order-stream
  consumerGroup: order-group
  consumerName: order-consumer
  consumerType: group  # group 或 broadcast

自动配置生效

  • [RedisStreamAutoConfig](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\config\RedisStreamAutoConfig.java#L28-L124):Spring Boot自动配置类
  • 通过 META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports 文件注册

4. 消费模式

消费者组模式(默认)

  • [RedisStreamConstant.GROUP](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\constant\RedisStreamConstant.java#L11-L12):值为"group"
  • 多个消费者可以加入同一消费者组,实现负载均衡
  • 消息只会被组内一个消费者处理

广播模式

  • [RedisStreamConstant.BROADCAST](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\constant\RedisStreamConstant.java#L14-L14):值为"broadcast"
  • 所有消费者都会收到相同消息

5. 核心组件交互流程

  1. 初始化阶段

    • Spring Boot启动时创建[RedisStreamPushHandler](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\RedisStreamPushHandler.java#L14-L32)、[RedisStreamHandler](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\RedisStreamHandler.java#L17-L53)等Bean
    • [RedisStreamAutoConfig](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\config\RedisStreamAutoConfig.java#L28-L124)创建消息监听容器
  2. 发布消息

    • 调用[RedisStreamPushHandler.push()](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\RedisStreamPushHandler.java#L25-L32)方法
    • 消息被发送到指定的Stream
  3. 消费消息

    • [RedisStreamListener](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\RedisStreamListener.java#L15-L32)监听Stream
    • 收到消息后调用[MessageConsumer.accept()](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\MessageConsumer.java#L15-L17)方法
  4. 异常处理

    • 监听容器配置了错误处理器
    • [RedisStreamListener.onMessage()](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\RedisStreamListener.java#L24-L32)方法包含异常捕获

6. 使用优势

  • 自动配置:通过配置文件即可完成Redis Stream配置
  • 灵活消费:支持消费者组和广播两种模式
  • 线程安全:内置线程池管理
  • 易于扩展:只需实现[MessageConsumer](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\MessageConsumer.java#L7-L18)接口即可处理消息

接口

com.damai.controller.ProgramController#invalid

  
@ApiOperation(value = "节目失效(根据id)")

@PostMapping(value = "/invalid")

public ApiResponse<Boolean> invalid(@Valid @RequestBody ProgramInvalidDto programInvalidDto) {

return ApiResponse.ok(programService.invalid(programInvalidDto));

}
public Boolean invalid(final ProgramInvalidDto programInvalidDto) {
    Program program = new Program();
    program.setId(programInvalidDto.getId());
    //修改数据库中的节目状态为下线状态
    program.setProgramStatus(BusinessStatus.NO.getCode());
    int result = programMapper.updateById(program);
    if (result > 0) {
        //删除Redis的缓存
        delRedisData(programInvalidDto.getId());
        //向RedisStream发送消息
        redisStreamPushHandler.push(String.valueOf(programInvalidDto.getId()));
        //删除elasticsearch中的数据
        programEs.deleteByProgramId(programInvalidDto.getId());
        return true;
    }else {
        return false;
    }
}

删除Redis的缓存

public void delRedisData(Long programId){
    Program program = Optional.ofNullable(programMapper.selectById(programId))
            .orElseThrow(() -> new DaMaiFrameException(BaseCode.PROGRAM_NOT_EXIST));
    redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM,programId));
    redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_GROUP,program.getProgramGroupId()));
    redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_SHOW_TIME,programId));
    redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_SEAT_NO_SOLD_HASH, programId));
    redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_SEAT_LOCK_HASH, programId));
    redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_SEAT_SOLD_HASH, programId));
    redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_TICKET_CATEGORY_LIST, programId));
    redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_TICKET_REMAIN_NUMBER_HASH, programId));
}

删除Redis的缓存数据,包括:节目、节目分组、节目演出时间、节目座位(未售卖、锁定中、已售卖)、票档、余票数量向RedisStream发送消息

监听RedisStream消息

@Slf4j
@Component
public class ProgramRedisStreamConsumer implements MessageConsumer {
    
    @Autowired
    private ProgramService programService;
    
    @Override
    public void accept(ObjectRecord<String, String> message) {
        Long programId = Long.parseLong(message.getValue());
        programService.delLocalCache(programId);
    }
}

public void delLocalCache(Long programId){
    log.info("删除本地缓存 programId : {}",programId);
    localCacheProgram.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM, programId).getRelKey());
    localCacheProgramGroup.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_GROUP, programId).getRelKey());
    localCacheProgramShowTime.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_SHOW_TIME, programId).getRelKey());
    localCacheTicketCategory.del(programId);
}

处理逻辑也是比较简单,监听到消息后,直接从本地缓存删除删除elasticsearch中的数据

删除elasticsearch中的数据

流程是先通过节目id查询elasticsearch中的数据,在此方法中,ProgramListVo对象设置了字段 esId

@Data
@ApiModel(value="ProgramListVo", description ="节目列表")
public class ProgramListVo {
    
    /**
     * es中的文档id
     * */
    private String esId;
}

使用封装的组件 businessEsHandle.query 的方法查询后,会自动的将elasticsearch中的文档id映射到ProgramListVo中的esId字段上

当获取到数据后,就可以通过文档id将数据删除了,这里同样使用封装的组件 businessEsHandle.deleteByDocumentId 方法