(按“消息生命周期”顺序给方案,每环都配一句口诀)
| 阶段 | 丢失场景 | 防护手段 | 口诀 |
|---|---|---|---|
| ① 生产端 | Netty 写缓存成功即返回,实际没进 Kafka | 异步转同步:KafkaProducer 回调 send(record, callback);失败落本地磁盘补偿重试 | “回调不到不返回,失败写盘慢慢补” |
| ② 消息队列 | Kafka broker 挂掉,pageCache 未刷盘 | acks=all + min.insync.replicas=2 + 每条 flush.ms=10 强制刷盘 | “多副本+强制刷,机器掉电也不怕” |
| ③ 消费端 | 拉取后处理完 先 commit 后业务,宕机时 commit 了但业务没写 Redis | 手动异步提交:业务成功写 Redis 后再 consumer.commitAsync();失败走重试队列 | “业务成功再打卡,失败回滚不丢话” |
实现代码(Netty → Kafka → Redis 全链路)
java
// 1. 生产端:回调成功才回客户端
producer.send(new ProducerRecord<>("danmu", roomId, msg), (meta, ex) -> {
if (ex == null) {
ctx.writeAndFlush(new Ack(danmuId)); // 只在这返回 ack
} else {
diskCompensate.write(msg); // 失败写本地文件
}
});
// 2. Kafka 配置:零丢失最强组合
props.put("acks", "all");
props.put("min.insync.replicas", "2");
props.put("retries", Integer.MAX_VALUE);
props.put("enable.idempotence", "true"); // 幂等生产
// 3. 消费端:业务写完 Redis 再提交 offset
List<ConsumerRecord<String,Danmu>> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String,Danmu> r : records) {
try{
redisOps.leftPush("danmu:"+r.key(), r.value()); // 写 Redis
redisOps.expire(...);
consumer.commitAsync(Collections.singletonMap(
new TopicPartition(r.topic(), r.partition()),
new OffsetAndMetadata(r.offset() + 1)), null);
}catch(Exception e){
// 抛到重试队列,offset 不提交即可
}
}额外兜底
本地补偿任务:扫描
diskCompensate目录,每 30 s 重新发送到 Kafka双队列:主队列
danmu+ 重试队列danmu.retry,重试 3 次后入死信队列人工处理