而本文的重点是来介绍如果有修改的操作,那该如何去解决缓存一致性呢?又比如缓存中数据都是根据节目演出时间来设置的,如果遇到了突发情况,要求节目提前下线,要如何在缓存中提前过期呢?
思考
对于这种一致性问题,可以使用通用的方案,也就是当修改数据库中的数据后,将对应的缓存清空,Redis的缓存好办,可以直接删除掉
但是本地缓存就会有个问题,如果存在多实例,那么要怎么处理?
就拿节目服务来说,假设线上部署了5个实例节点,经过一段时间运行后,每个实例都有了自己的本地缓存,那么如果进行了数据的修改操作后,就要将这5个实例节点的数据都清空
那么要如何通知这5个节点呢?可以有这几种方式:
- 定时任务查询 定时从库中扫描失效的数据,对于已经失效的数据就在缓存中删除。这种只能应对简单而且数据量小的业务,而且不好估算定时任务的执行时间,频率高了对数据库的压力很大,频率低了缓存又不及时被清除,而且假如某段时间没有修改数据或者主动要失效的操作,那么就白执行了。而且这种多实例的情况,就只能每个实例都要查询一遍数据库,属实没有必要
- 使用MQ消息中间件(Kafka、RocketMQ、RabbitMQ) 这种方式确实是可以,但是要思考如果之前没有引入MQ的话难道要为了这个功能特意引入额外的中间件吗?另外一种情况就是如果确实在使用MQ,MQ上又都是比较重要的业务在使用,是否有必要在这个轻量级的功能上使用MQ,说白了 就是一个取舍问题,因为这个功能是比较轻量级的,就算通知有延迟也没关系,顶多就是缓存中没有清掉呗,还有个过期时间来兜底呢
- Redis的PUB/SUB,订阅/发布模式 这种发布订阅模式有个致命的问题就是没有办法进行持久化的,如果出现网络断开、Redis宕机的话,消息就会丢失,这种也不是很推荐
- Redis的Stream 可以理解成是Redis对消息队列MQ的完善实现,支持分组消费和广播消费,并且可以将消息进行持久化
而在项目中对于这种多实例清除本地缓存的业务,使用Redis的Stream是比较适合的,主要原因有以下几点:
- 这种通知清除缓存的功能是比较轻量级的,不是很频繁的操作,不像MQ那样是专门为了解决高并发下的问题,所以使用RedisStream就完全足够
- 使用RedisStream只需要连接Redis即可,而基本每个项目都需要Redis,这样就不需要再额外引入中间件,没有额外的部署成本
- RedisStream能够将数据保存到磁盘,以确保数据不会丢失
- 另外也是为了大家学习RedisStream,为了方便以后工作需要的话可以直接使用 下面就来介绍项目中具体是如何使用的
RedisStream相关配置
spring:
data:
# redis相关配置
redis:
database: 0
host: 127.0.0.1
port: 6379
timeout: 3000
# redisStream相关配置
stream:
# stream的key
streamName: invalid_program
# 消费类型:广播
consumerType: broadcast配置广播类型的消息消费,可以实现多个实例节点都能收到消息
接口
com.damai.controller.ProgramController#invalid
@ApiOperation(value = "节目失效(根据id)")
@PostMapping(value = "/invalid")
public ApiResponse<Boolean> invalid(@Valid @RequestBody ProgramInvalidDto programInvalidDto) {
return ApiResponse.ok(programService.invalid(programInvalidDto));
}public Boolean invalid(final ProgramInvalidDto programInvalidDto) {
Program program = new Program();
program.setId(programInvalidDto.getId());
//修改数据库中的节目状态为下线状态
program.setProgramStatus(BusinessStatus.NO.getCode());
int result = programMapper.updateById(program);
if (result > 0) {
//删除Redis的缓存
delRedisData(programInvalidDto.getId());
//向RedisStream发送消息
redisStreamPushHandler.push(String.valueOf(programInvalidDto.getId()));
//删除elasticsearch中的数据
programEs.deleteByProgramId(programInvalidDto.getId());
return true;
}else {
return false;
}
}删除Redis的缓存
public void delRedisData(Long programId){
Program program = Optional.ofNullable(programMapper.selectById(programId))
.orElseThrow(() -> new DaMaiFrameException(BaseCode.PROGRAM_NOT_EXIST));
redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM,programId));
redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_GROUP,program.getProgramGroupId()));
redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_SHOW_TIME,programId));
redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_SEAT_NO_SOLD_HASH, programId));
redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_SEAT_LOCK_HASH, programId));
redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_SEAT_SOLD_HASH, programId));
redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_TICKET_CATEGORY_LIST, programId));
redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_TICKET_REMAIN_NUMBER_HASH, programId));
}删除Redis的缓存数据,包括:节目、节目分组、节目演出时间、节目座位(未售卖、锁定中、已售卖)、票档、余票数量向RedisStream发送消息
监听RedisStream消息
@Slf4j
@Component
public class ProgramRedisStreamConsumer implements MessageConsumer {
@Autowired
private ProgramService programService;
@Override
public void accept(ObjectRecord<String, String> message) {
Long programId = Long.parseLong(message.getValue());
programService.delLocalCache(programId);
}
}
public void delLocalCache(Long programId){
log.info("删除本地缓存 programId : {}",programId);
localCacheProgram.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM, programId).getRelKey());
localCacheProgramGroup.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_GROUP, programId).getRelKey());
localCacheProgramShowTime.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_SHOW_TIME, programId).getRelKey());
localCacheTicketCategory.del(programId);
}处理逻辑也是比较简单,监听到消息后,直接从本地缓存删除删除elasticsearch中的数据
删除elasticsearch中的数据
public void deleteByProgramId(Long programId){
try {
//通过节目id查询elasticsearch中的数据
List<EsDataQueryDto> esDataQueryDtoList = new ArrayList<>();
EsDataQueryDto programIdDto = new EsDataQueryDto();
programIdDto.setParamName(ProgramDocumentParamName.ID);
programIdDto.setParamValue(programId);
esDataQueryDtoList.add(programIdDto);
List<ProgramListVo> programListVos =
businessEsHandle.query(
SpringUtil.getPrefixDistinctionName() + "-" + ProgramDocumentParamName.INDEX_NAME,
ProgramDocumentParamName.INDEX_TYPE,
esDataQueryDtoList,
ProgramListVo.class);
//如果数据存在,则通过文档id删除
if (CollectionUtil.isNotEmpty(programListVos)) {
for (ProgramListVo programListVo : programListVos) {
businessEsHandle.deleteByDocumentId(
SpringUtil.getPrefixDistinctionName() + "-" + ProgramDocumentParamName.INDEX_NAME,
programListVo.getEsId());
}
}
}catch (Exception e) {
log.error("deleteByProgramId error",e);
}
}流程是先通过节目id查询elasticsearch中的数据,在此方法中,ProgramListVo对象设置了字段 esId
@Data
@ApiModel(value="ProgramListVo", description ="节目列表")
public class ProgramListVo {
/**
* es中的文档id
* */
private String esId;
}使用封装的组件 businessEsHandle.query 的方法查询后,会自动的将elasticsearch中的文档id映射到ProgramListVo中的esId字段上
当获取到数据后,就可以通过文档id将数据删除了,这里同样使用封装的组件 businessEsHandle.deleteByDocumentId 方法