而本文的重点是来介绍如果有修改的操作,那该如何去解决缓存一致性呢?又比如缓存中数据都是根据节目演出时间来设置的,如果遇到了突发情况,要求节目提前下线,要如何在缓存中提前过期呢?
思考
对于这种一致性问题,可以使用通用的方案,也就是当修改数据库中的数据后,将对应的缓存清空,Redis的缓存好办,可以直接删除掉
但是本地缓存就会有个问题,如果存在多实例,那么要怎么处理?
就拿节目服务来说,假设线上部署了5个实例节点,经过一段时间运行后,每个实例都有了自己的本地缓存,那么如果进行了数据的修改操作后,就要将这5个实例节点的数据都清空
那么要如何通知这5个节点呢?可以有这几种方式:
- 定时任务查询 定时从库中扫描失效的数据,对于已经失效的数据就在缓存中删除。这种只能应对简单而且数据量小的业务,而且不好估算定时任务的执行时间,频率高了对数据库的压力很大,频率低了缓存又不及时被清除,而且假如某段时间没有修改数据或者主动要失效的操作,那么就白执行了。而且这种多实例的情况,就只能每个实例都要查询一遍数据库,属实没有必要
- 使用MQ消息中间件(Kafka、RocketMQ、RabbitMQ) MQ上又都是比较重要的业务在使用,是否有必要在这个轻量级的功能上使用MQ引入额外的中间件,说白了 就是一个取舍问题,因为这个功能是比较轻量级的,就算通知有延迟也没关系,顶多就是缓存中没有清掉呗,还有个过期时间来兜底
- Redis的PUB/SUB 致命的问题就是没有办法进行持久化的,如果出现网络断开、Redis宕机的话,消息就会丢失,这种也不是很推荐
- Redis的Stream 可以理解成是Redis对消息队列MQ的完善实现,支持分组消费和广播消费,并且可以将消息持久化
对于这种多实例清除本地缓存的业务,使用Redis的Stream是比较适合的,主要原因有以下几点:
- 这种通知清除缓存的功能是比较轻量级的,不是很频繁的操作,不像MQ那样是专门为了解决高并发下的问题,所以使用RedisStream就完全足够
- 使用RedisStream只需要连接Redis即可,而基本每个项目都需要Redis,这样就不需要再额外引入中间件,没有额外的部署成本
- RedisStream能够将数据保存到磁盘,以确保数据不会丢失
RedisStream相关配置
spring:
data:
# redis相关配置
redis:
database: 0
host: 127.0.0.1
port: 6379
timeout: 3000
# redisStream相关配置
stream:
# stream的key
streamName: invalid_program
# 消费类型:广播
consumerType: broadcast配置广播类型的消息消费,可以实现多个实例节点都能收到消息
代码细节
基于现有组件的消息发布和消费流程
1. 消息发布流程
发布消息的核心组件
- [RedisStreamPushHandler](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\RedisStreamPushHandler.java#L14-L32):负责向Redis Stream发送消息
发布消息的实现方式
java
@Service
public class OrderMessagePublisher {
@Autowired
private RedisStreamPushHandler redisStreamPushHandler;
public void publishOrderEvent(String orderData) {
// 调用push方法将消息发布到Redis Stream
RecordId recordId = redisStreamPushHandler.push(orderData);
}
}2. 消息消费流程
消费者的定义
- [MessageConsumer](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\MessageConsumer.java#L7-L18):函数式接口,定义消息处理逻辑
消息处理实现
java
@Component
public class OrderMessageConsumer implements MessageConsumer {
@Override
public void accept(ObjectRecord<String, String> message) {
// 处理收到的消息
String content = message.getValue();
// 具体业务逻辑处理
}
}3. 配置与集成
配置属性
properties
# application.yml
spring.data.redis.stream:
streamName: order-stream
consumerGroup: order-group
consumerName: order-consumer
consumerType: group # group 或 broadcast自动配置生效
- [RedisStreamAutoConfig](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\config\RedisStreamAutoConfig.java#L28-L124):Spring Boot自动配置类
- 通过
META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports文件注册
4. 消费模式
消费者组模式(默认)
- [RedisStreamConstant.GROUP](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\constant\RedisStreamConstant.java#L11-L12):值为"group"
- 多个消费者可以加入同一消费者组,实现负载均衡
- 消息只会被组内一个消费者处理
广播模式
- [RedisStreamConstant.BROADCAST](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\constant\RedisStreamConstant.java#L14-L14):值为"broadcast"
- 所有消费者都会收到相同消息
5. 核心组件交互流程
初始化阶段:
- Spring Boot启动时创建[RedisStreamPushHandler](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\RedisStreamPushHandler.java#L14-L32)、[RedisStreamHandler](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\RedisStreamHandler.java#L17-L53)等Bean
- [RedisStreamAutoConfig](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\config\RedisStreamAutoConfig.java#L28-L124)创建消息监听容器
发布消息:
- 调用[RedisStreamPushHandler.push()](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\RedisStreamPushHandler.java#L25-L32)方法
- 消息被发送到指定的Stream
消费消息:
- [RedisStreamListener](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\RedisStreamListener.java#L15-L32)监听Stream
- 收到消息后调用[MessageConsumer.accept()](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\MessageConsumer.java#L15-L17)方法
异常处理:
- 监听容器配置了错误处理器
- [RedisStreamListener.onMessage()](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\RedisStreamListener.java#L24-L32)方法包含异常捕获
6. 使用优势
- 自动配置:通过配置文件即可完成Redis Stream配置
- 灵活消费:支持消费者组和广播两种模式
- 线程安全:内置线程池管理
- 易于扩展:只需实现[MessageConsumer](file://D:\Java_projects\damai_new\damai-redis-tool-framework\damai-redis-stream-framework\src\main\java\com\damai\MessageConsumer.java#L7-L18)接口即可处理消息
接口
com.damai.controller.ProgramController#invalid
@ApiOperation(value = "节目失效(根据id)")
@PostMapping(value = "/invalid")
public ApiResponse<Boolean> invalid(@Valid @RequestBody ProgramInvalidDto programInvalidDto) {
return ApiResponse.ok(programService.invalid(programInvalidDto));
}public Boolean invalid(final ProgramInvalidDto programInvalidDto) {
Program program = new Program();
program.setId(programInvalidDto.getId());
//修改数据库中的节目状态为下线状态
program.setProgramStatus(BusinessStatus.NO.getCode());
int result = programMapper.updateById(program);
if (result > 0) {
//删除Redis的缓存
delRedisData(programInvalidDto.getId());
//向RedisStream发送消息
redisStreamPushHandler.push(String.valueOf(programInvalidDto.getId()));
//删除elasticsearch中的数据
programEs.deleteByProgramId(programInvalidDto.getId());
return true;
}else {
return false;
}
}删除Redis的缓存
public void delRedisData(Long programId){
Program program = Optional.ofNullable(programMapper.selectById(programId))
.orElseThrow(() -> new DaMaiFrameException(BaseCode.PROGRAM_NOT_EXIST));
redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM,programId));
redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_GROUP,program.getProgramGroupId()));
redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_SHOW_TIME,programId));
redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_SEAT_NO_SOLD_HASH, programId));
redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_SEAT_LOCK_HASH, programId));
redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_SEAT_SOLD_HASH, programId));
redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_TICKET_CATEGORY_LIST, programId));
redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_TICKET_REMAIN_NUMBER_HASH, programId));
}删除Redis的缓存数据,包括:节目、节目分组、节目演出时间、节目座位(未售卖、锁定中、已售卖)、票档、余票数量向RedisStream发送消息
监听RedisStream消息
@Slf4j
@Component
public class ProgramRedisStreamConsumer implements MessageConsumer {
@Autowired
private ProgramService programService;
@Override
public void accept(ObjectRecord<String, String> message) {
Long programId = Long.parseLong(message.getValue());
programService.delLocalCache(programId);
}
}
public void delLocalCache(Long programId){
log.info("删除本地缓存 programId : {}",programId);
localCacheProgram.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM, programId).getRelKey());
localCacheProgramGroup.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_GROUP, programId).getRelKey());
localCacheProgramShowTime.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_SHOW_TIME, programId).getRelKey());
localCacheTicketCategory.del(programId);
}处理逻辑也是比较简单,监听到消息后,直接从本地缓存删除删除elasticsearch中的数据
删除elasticsearch中的数据
流程是先通过节目id查询elasticsearch中的数据,在此方法中,ProgramListVo对象设置了字段 esId
@Data
@ApiModel(value="ProgramListVo", description ="节目列表")
public class ProgramListVo {
/**
* es中的文档id
* */
private String esId;
}使用封装的组件 businessEsHandle.query 的方法查询后,会自动的将elasticsearch中的文档id映射到ProgramListVo中的esId字段上
当获取到数据后,就可以通过文档id将数据删除了,这里同样使用封装的组件 businessEsHandle.deleteByDocumentId 方法