1. 前置介绍:为什么需要 Redis Stream?
在 Redis 5.0 之前,使用 Redis 实现消息队列主要有两种方式,但它们都有明显的缺陷:
| 实现方式 | 描述 | 缺点 |
|---|---|---|
| PUB/SUB (发布/订阅) | 经典的发布订阅模式 | 无法持久化。如果出现网络断开或 Redis 宕机,消息就会丢失。 |
| List (LPUSH+BRPOP) / Sorted-Set | 基于列表或有序集合实现 | 支持持久化,但不支持多播(一对多)、不支持分组消费。 |
为了解决上述问题,Redis 在 5.0 版本引入了 Stream 数据结构。从功能上看,它是 Redis 对消息队列(MQ)的完善实现。
2. Redis Stream 核心功能
Redis Stream 提供了消息的持久化和主备复制功能,允许任何客户端访问任何时刻的数据,并能记住每个客户端的访问位置,保证消息不丢失。其主要功能包括:
- ✅ 消息 ID 的序列化生成
- ✅ 消息遍历
- ✅ 消息的阻塞和非阻塞读取
- ✅ 消息的分组消费
- ✅ 消息的广播消费
- ✅ 未完成消息的处理(PEL 机制)
- ✅ 消息队列监控
3. 核心结构与概念解析
Redis Stream 的内部结构是一个消息链表,将所有加入的消息串起来,每个消息都有唯一的 ID 和内容。

关键组件说明
Stream Name: Redis 的 key,例如
mystream。在首次使用XADD指令追加消息时自动创建。Consumer Group (消费组): 使用
XGROUP CREATE命令创建。一个消费组包含多个消费者 (Consumer),这些消费者之间是竞争关系(一条消息只能被组内一个消费者消费)。last_delivered_id (游标): 每个消费组维护一个
last_delivered_id。任意一个消费者读取了消息,该游标都会向前移动。pending_ids (PEL - Pending Entries List): 这是核心机制。用于维护消费者未确认的消息 ID。
- 记录了已被客户端读取但尚未 ACK (Acknowledge) 的消息。
- 如果不 ACK,消息会一直停留在 PEL 中。
- 一旦消息被 ACK,它会从 PEL 中移除。
- 作用:确保客户端至少消费了一次消息,防止因网络故障导致消息丢失。
4. 常用命令实战
4.1 消息的添加、读取与删除
添加消息 (XADD)
*表示由服务器自动生成消息 ID。- 后面跟随
key value对。
# 格式: XADD key ID field string [field string ...]
redis:0> xadd test_stream * areaName nanjing
"1715334202451-0" # 返回生成的ID获取长度 (XLEN)与范围查询 (XRANGE)
-表示最小值,+表示最大值。
# 获取消息长度
redis:0> xlen test_stream
"1"
# 添加更多数据用于测试
redis:0> xadd test_stream * areaName shenzhen
"1715334483563-0"
# 查询所有消息
redis:0> xrange test_stream - +
1) 1) "1715334477467-0"
2) 1) "areaName"
2) "nanjing"
2) 1) "1715334483563-0"
2) 1) "areaName"
2) "shenzhen"删除消息 (XDEL)
redis:0> xdel test_stream 1715334202451-0
"1"4.2 不使用消费组消费 (XREAD)
这种模式下,Stream 类似于普通的 List,所有客户端都可以读取同一条消息(类似广播)。
命令格式: XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
- count: 读取数量
- milliseconds: 阻塞毫秒数(不设置则为非阻塞)
- id: 起始 ID (
0-0代表从头开始)
使用技巧:每次读取后,应记录返回的最后一条消息 ID,下次读取时将其作为参数,实现断点续传。
# 从头开始读取 1 条
redis:0> xread count 1 streams test_stream 0-0
1) 1) "test_stream"
2) 1) 1) "1715334477467-0"
2) 1) "areaName"
2) "nanjing"
#以此条 ID 为起点继续读取
redis:0> xread count 1 streams test_stream 1715334477467-0
1) 1) "test_stream"
2) 1) 1) "1715334483563-0"
2) 1) "areaName"
2) "shenzhen"4.3 使用消费组消费 (重点)
创建消费组 (XGROUP CREATE)
0-0:表示从头开始消费。$:表示只消费新消息(忽略历史消息)。
redis:0> xgroup create test_stream test_group_1 0-0
"OK"监控 Stream 信息 (XINFO)
# 查看 Stream 整体信息
redis:0> xinfo stream test_stream
1) "length"
2) "2"
...
9) "groups"
10) "1" # 消费组数量
# 查看消费组信息
redis:0> xinfo groups test_stream
1) 1) "name"
2) "test_group_1"
3) "consumers"
4) "0"
5) "pending" # 正在处理且未 ACK 的消息数
6) "0"消费组读取消息 (XREADGROUP)
读取后,消息 ID 会进入 PEL (Pending Entries List)。
命令格式: XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key ID
- >:表示从消费组的
last_delivered_id后面开始读(即读取未消费过的新消息)。
# 消费者 consumer_1 读取消息
redis:0> xreadgroup group test_group_1 consumer_1 count 1 streams test_stream >
1) 1) "test_stream"
2) 1) 1) "1715334477467-0"
2) 1) "areaName"
2) "nanjing"
# 再次查看消费组状态
redis:0> xinfo groups test_stream
...
5) "pending"
6) "1" # 发现有1条消息处于 pending 状态
...确认消息 (XACK)
处理完消息后,必须发送 ACK,否则消息会一直滞留在 PEL 中。
# 确认消息已处理
redis:0> xack test_stream test_group_1 1715334477467-0
"1"
# 再次查看,pending 归零
redis:0> xinfo groups test_stream
...
5) "pending"
6) "0"
...4.4 处理未完成的消息 (XPENDING)
如果消费者崩溃,消息未 ACK,需要使用 XPENDING 查询并重新处理这些“孤儿”消息。
# 添加并读取一条消息但不 ACK
redis:0> xadd test_stream * areaName dalian
"1715585337107-0"
redis:0> xreadgroup group test_group_1 consumer_1 count 1 streams test_stream >
...
# 查询消费组内的 Pending 消息详情
# 格式: XPENDING key group [start end count] [consumer]
redis:0> xpending test_stream test_group_1
1) "1" # 未确认消息总数
2) "1715585337107-0" # 起始 ID
3) "1715585337107-0" # 结束 ID
4) 1) 1) "consumer_1"
2) "1" # 该消费者的 pending 数量5. 深入理解:消息 ID 与时钟回拨
5.1 ID 结构
Redis Stream 的 ID 格式为 timestamp-sequence(例如 1715334202451-0):
- timestamp:毫秒级时间戳(64位整型)。
- sequence:该毫秒内的序列号(64位整型)。
5.2 解决时钟回拨问题
通常分布式 ID(如雪花算法)会面临服务器时钟回拨导致 ID 重复或乱序的问题。Redis Stream 采用了以下机制解决:
- 单调递增:Redis 保证生成的 ID 永远大于前一个 ID。
- 维护
latest_generated_id:Redis 会记录最后一个生成的 ID。 - 自动修正:如果检测到当前服务器时间戳 小于
latest_generated_id的时间戳(即发生了时钟回拨),Redis 会保持时间戳不变,强制递增序列号。
这种机制确保了即使服务器时间发生跳变,Stream 中的消息 ID 依然保持严格的单调递增特性。