Skip to content

而本文的重点是来介绍如果有修改的操作,那该如何去解决缓存一致性呢?又比如缓存中数据都是根据节目演出时间来设置的,如果遇到了突发情况,要求节目提前下线,要如何在缓存中提前过期呢?

思考

对于这种一致性问题,可以使用通用的方案,也就是当修改数据库中的数据后,将对应的缓存清空,Redis的缓存好办,可以直接删除掉

但是本地缓存就会有个问题,如果存在多实例,那么要怎么处理?

就拿节目服务来说,假设线上部署了5个实例节点,经过一段时间运行后,每个实例都有了自己的本地缓存,那么如果进行了数据的修改操作后,就要将这5个实例节点的数据都清空

那么要如何通知这5个节点呢?可以有这几种方式:

  1. 定时任务查询 定时从库中扫描失效的数据,对于已经失效的数据就在缓存中删除。这种只能应对简单而且数据量小的业务,而且不好估算定时任务的执行时间,频率高了对数据库的压力很大,频率低了缓存又不及时被清除,而且假如某段时间没有修改数据或者主动要失效的操作,那么就白执行了。而且这种多实例的情况,就只能每个实例都要查询一遍数据库,属实没有必要
  2. 使用MQ消息中间件(Kafka、RocketMQ、RabbitMQ) 这种方式确实是可以,但是要思考如果之前没有引入MQ的话难道要为了这个功能特意引入额外的中间件吗?另外一种情况就是如果确实在使用MQ,MQ上又都是比较重要的业务在使用,是否有必要在这个轻量级的功能上使用MQ,说白了 就是一个取舍问题,因为这个功能是比较轻量级的,就算通知有延迟也没关系,顶多就是缓存中没有清掉呗,还有个过期时间来兜底呢
  3. Redis的PUB/SUB,订阅/发布模式 这种发布订阅模式有个致命的问题就是没有办法进行持久化的,如果出现网络断开、Redis宕机的话,消息就会丢失,这种也不是很推荐
  4. Redis的Stream 可以理解成是Redis对消息队列MQ的完善实现,支持分组消费和广播消费,并且可以将消息进行持久化

而在项目中对于这种多实例清除本地缓存的业务,使用Redis的Stream是比较适合的,主要原因有以下几点:

  1. 这种通知清除缓存的功能是比较轻量级的,不是很频繁的操作,不像MQ那样是专门为了解决高并发下的问题,所以使用RedisStream就完全足够
  2. 使用RedisStream只需要连接Redis即可,而基本每个项目都需要Redis,这样就不需要再额外引入中间件,没有额外的部署成本
  3. RedisStream能够将数据保存到磁盘,以确保数据不会丢失
  4. 另外也是为了大家学习RedisStream,为了方便以后工作需要的话可以直接使用 下面就来介绍项目中具体是如何使用的

RedisStream相关配置

spring:
  data:
    # redis相关配置
    redis:
      database: 0
      host: 127.0.0.1
      port: 6379
      timeout: 3000
      # redisStream相关配置
      stream:
        # stream的key
        streamName: invalid_program
        # 消费类型:广播
        consumerType: broadcast

配置广播类型的消息消费,可以实现多个实例节点都能收到消息

接口

com.damai.controller.ProgramController#invalid

  
@ApiOperation(value = "节目失效(根据id)")

@PostMapping(value = "/invalid")

public ApiResponse<Boolean> invalid(@Valid @RequestBody ProgramInvalidDto programInvalidDto) {

return ApiResponse.ok(programService.invalid(programInvalidDto));

}
public Boolean invalid(final ProgramInvalidDto programInvalidDto) {
    Program program = new Program();
    program.setId(programInvalidDto.getId());
    //修改数据库中的节目状态为下线状态
    program.setProgramStatus(BusinessStatus.NO.getCode());
    int result = programMapper.updateById(program);
    if (result > 0) {
        //删除Redis的缓存
        delRedisData(programInvalidDto.getId());
        //向RedisStream发送消息
        redisStreamPushHandler.push(String.valueOf(programInvalidDto.getId()));
        //删除elasticsearch中的数据
        programEs.deleteByProgramId(programInvalidDto.getId());
        return true;
    }else {
        return false;
    }
}

删除Redis的缓存

public void delRedisData(Long programId){
    Program program = Optional.ofNullable(programMapper.selectById(programId))
            .orElseThrow(() -> new DaMaiFrameException(BaseCode.PROGRAM_NOT_EXIST));
    redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM,programId));
    redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_GROUP,program.getProgramGroupId()));
    redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_SHOW_TIME,programId));
    redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_SEAT_NO_SOLD_HASH, programId));
    redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_SEAT_LOCK_HASH, programId));
    redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_SEAT_SOLD_HASH, programId));
    redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_TICKET_CATEGORY_LIST, programId));
    redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_TICKET_REMAIN_NUMBER_HASH, programId));
}

删除Redis的缓存数据,包括:节目、节目分组、节目演出时间、节目座位(未售卖、锁定中、已售卖)、票档、余票数量向RedisStream发送消息

监听RedisStream消息

@Slf4j
@Component
public class ProgramRedisStreamConsumer implements MessageConsumer {
    
    @Autowired
    private ProgramService programService;
    
    @Override
    public void accept(ObjectRecord<String, String> message) {
        Long programId = Long.parseLong(message.getValue());
        programService.delLocalCache(programId);
    }
}

public void delLocalCache(Long programId){
    log.info("删除本地缓存 programId : {}",programId);
    localCacheProgram.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM, programId).getRelKey());
    localCacheProgramGroup.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_GROUP, programId).getRelKey());
    localCacheProgramShowTime.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_SHOW_TIME, programId).getRelKey());
    localCacheTicketCategory.del(programId);
}

处理逻辑也是比较简单,监听到消息后,直接从本地缓存删除删除elasticsearch中的数据

删除elasticsearch中的数据

public void deleteByProgramId(Long programId){
    try {
        //通过节目id查询elasticsearch中的数据
        List<EsDataQueryDto> esDataQueryDtoList = new ArrayList<>();
        EsDataQueryDto programIdDto = new EsDataQueryDto();
        programIdDto.setParamName(ProgramDocumentParamName.ID);
        programIdDto.setParamValue(programId);
        esDataQueryDtoList.add(programIdDto);
        
        List<ProgramListVo> programListVos = 
                businessEsHandle.query(
                        SpringUtil.getPrefixDistinctionName() + "-" + ProgramDocumentParamName.INDEX_NAME, 
                        ProgramDocumentParamName.INDEX_TYPE, 
                        esDataQueryDtoList, 
                        ProgramListVo.class);
        //如果数据存在,则通过文档id删除                
        if (CollectionUtil.isNotEmpty(programListVos)) {
            for (ProgramListVo programListVo : programListVos) {
                businessEsHandle.deleteByDocumentId(
                        SpringUtil.getPrefixDistinctionName() + "-" + ProgramDocumentParamName.INDEX_NAME,
                        programListVo.getEsId());
            }
        }
    }catch (Exception e) {
        log.error("deleteByProgramId error",e);
    }
}

流程是先通过节目id查询elasticsearch中的数据,在此方法中,ProgramListVo对象设置了字段 esId

@Data
@ApiModel(value="ProgramListVo", description ="节目列表")
public class ProgramListVo {
    
    /**
     * es中的文档id
     * */
    private String esId;
}

使用封装的组件 businessEsHandle.query 的方法查询后,会自动的将elasticsearch中的文档id映射到ProgramListVo中的esId字段上

当获取到数据后,就可以通过文档id将数据删除了,这里同样使用封装的组件 businessEsHandle.deleteByDocumentId 方法