程序员阿沛
发布于 2026-06-27 / 0 阅读
0
0

万字长文详解redisstream读写特性数据结构基础命令和积压消息处理

万字长文详解 redis stream 读写特性、数据结构、基础命令 和 积压消息处理

目录预览

_ 一、redis stream 特点 _

_ 1. 功能 & 使用场景 _

_ 2. 基本概念 _

_ 3. 独立消费者模式 & 消费者组模式 _

_ 4. 生产 & 消费流程 _

_ 5. 消息的状态 _

_ 附:记录一次线上stream积压的排查和解决 _

_
_

_ 二、stream的相关命令和常见操作 _

_ 1. 创建stream并往stream中写入消息 _

_ 2. 读消息(非消费者组模式) _

_ 3. xgroup命令组; _

_ 4. xinfo命令组 _

_ 5. xpending命令组 _

_ 6. xreadgroup 读取消息(消费者组模式) _

_ 7. xack 确认消息 _

_ 8. xclaim更改消息的所有权 _

_
_

_ 三、stream的数据结构 _

_ 1. radix tree 和 rax 树 _

_ 2. listpack _

_ 3. stream 结构 _

一、redis stream 特点

1. 功能 & 使用场景

Stream 是 Redis 5.0 引入的一种专门为消息队列设计的数据类型,Stream 是一个包含 0 个或者多个元素的有序队列,这些元素根据 ID
的大小进行有序排列。

Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis
宕机等,消息就会被丢弃。

它实现了大部分消息队列的功能:

● 消息 ID 序列化生成;

● 消息遍历;

● 消息的阻塞和非阻塞读;

● Consumer Groups 消费组;

● ACK 确认机制。

● 支持多播。

同时提供了消息的持久化和主从复制机制,客户端可以访问任何时刻的数据,并且能记住每一个客户端的访问位置,从而保证消息不丢失。支持查询 Stream
中的历史数据,消费者可以在处理完当前的数据后,再处理之前的数据。

2. 基本概念

a. 与所有 Redis 数据结构一样,每个 Redis Stream 都通过一个键进行寻址,该键指向Stream类型的值。

这意味着在其他Redis
类型的基本操作,也可以在stream上使用。比如,可以用DEL删除一个Stream,可以用expire命令设置Stream的生存时间(TTL)。此外,Stream数据同样存储在内存中,并且可以通过RDB进行持久化到硬盘。

b. stream中的消息的顺序无法更改。它表示一个消息序列,消息只能添加到序列的末尾。

c. 每个Stream中的每一个消息是一个由多个kv键值对组成的Element结构。一个Element结构 类似于一个 Redis Hash。

d. Stream的每个消息都有一个唯一ID,该 ID 默认由时间戳组成;

3. 独立消费者模式 & 消费者组模式

Redis Steam提供了两种消费方式:

a. 多个独立消费者共享读取消息;

b. 消费组模式;

模式一:多消费者共享读取消息

Redis允许多个独立的consumer从Stream的任意点读取消息,并且同一条消息可以同时被多个消费者消费到。

如下图所示,Application1,Application2,Application3这三个应用都能消费到队列中的每条消息,每个Application可以针对同一条消息进行不同的业务处理。

这种消费模式很方便,但是在某个Application进行水平扩展出多个实例的时候,就会出现同一条数据在同一个Application下的多个实例被重复处理的问题,此时消费者组模式就派上用场了。

模式二:消费者组模式

一个 Stream 都可以挂多个消费组,系统会为每个消费组维护一个游标 last_delivered_id ,表示当前消费组已经消费到哪条消息了。

每个消费组都有一个 Stream 内唯一的名称。不同消费组的状态都是独立的,相互不受影响。同一个 Stream 消息会被每个消费组都消费到。

一个消费组内的消费者们是竞争关系,它们之间不会重复消费同一条消息,任意一个消费者读取了消息都会使游标last_delivered_id往前移动。

普通的消费者模式和消费者组模式的区别在于,你可以将普通消费者模式中的每一个消费者看成是只有一个消费者的消费者组,这样的一个消费者组无法扩展,而消费者组模式中的消费者组是可以扩展里面的消费者个数的。

4. 生产 & 消费流程

生产者流程:

创建stream并往stream写入消息(xadd命令同时完成创建和写入);

消费者组模式的消费流程如下:

创建消费者组(xgroup create命令) -> 指定组内的消费者从stream读取消息(xreadgroup命令) -> 确认消息(xack命令)
-> 删除消息(xdel命令);

xack确认完消息之后,是否要删除消息取决于你的业务需要。如果一个消费者组消费完这个消息之后,没有其他消费者组需要消费它,那么就可以删除,否则就不能删除。

不过即便不主动删除,在指定了最大长度的stream中,旧消息也会因为新消息填满了整个stream而被自动移除。

PS:“xreadgroup … >” 和 “xreadgroup … 0” 的区别:

表示读取该消费者没有读取过的消息,0表示从stream头部读取已读取过但还未确认的消息。

现在有一个消费者组G,里面有两个消费者 A 和 B。

在执行 > 模式读取的情况下,如果某条消息 A 读取过,但是还没有xack,那么这条消息不会再被 A 和 B 读取, 因为 > 读取会过滤掉所有消费者的
PEL列表 的消息不读取。

在执行 0 模式读取的情况下,如果某条消息 A 读取过,但是还没有xack,那么这条消息还会再被 A 读取,但 不会被 B 读取到。因为 0
读取会读取消费者 自己PEL列表 中存在且stream中也存在的消息。

不论是哪种读取模式,一旦 xack 了某条消息之后,即使这条消息还存在于stream中(还没xdel),也不会再被A和B读取,也就是说它们都不会读取
last_deliver_id 游标之前的消息。

那么我们该如何正确的使用 > 模式 和 0 模式呢,它们二者应该组合在一起使用。

正常情况下,先用 >
模式读取消息进行消费,如果由于代码逻辑错误导致某些消息被读取但没有确认,此时应该将这些消息转移到另一个专门用来处理死消息的消费者上(具体来说是转移到这个新消费者的PEL队列上,使用xpending

  • xclaim命令 或者 xautoclaim命令),然后由这个消费者用 0 模式读取 & 消费 & 确认 & 删除 这些死信息。

5. 消息的状态

在消费者组模式中,消费者所处理的消息有如下状态:

a. 未读状态(未 xreadgroup 的消息);

b. 已读为确认状态(已 xreadgroup 读到,但还未 xack 的消息);

c. 已确认状态(已 xack 的消息)

stream会为每一个消费者组的每一个消费者维护一个pending_ids数组(官方一点的说法是
PEL列表),该数组保存着该消费者已读但未确认的消息的ID,并且所有消费者的pending_ids在同一消费者组内的所有消费者之间共享。

也就是说,一个消费者组内有 A 和 B两个消费者,它们都通过 >
的方式读取stream消息(也就是读取未读过的消息这一方式)。如果A读取到ID为1的消息,B读到ID为2的消息。

如果由于业务侧代码问题导致消费者从stream中获取了消息但没有xack消息造成stream队列积压,虽然stream设置了长度不会导致该stream的内存占用不断上升,但是
pending_ids 记录的消息ID会不断增多,还是会导致redis的内存占用不断上升。

执行 “xack stream名 group名 消息ID1 消息ID2 …” 可以将这些消息ID从 pending_ids
数组中移除。下面我们通过几个场景分析一下消费者之间确认消息和读取消息的行为。

场景1:

A和B都已经ACK了自己手里的消息,此时A再次读取下一条消息的时候,A会读取到ID为3的消息,因为A和B在同一消费者组里,是互斥消费。

场景2:

A和B都还没有ACK自己手里的消息,此时A再次读取下一条消息的时候,A会读取到ID为3的消息,因为A读取消息的时候会过滤掉 A 和 B 的
pending_ids 中消息。

此时如果想让A重新读取消息1,就将xreadgroup命令中的 > 换成
0,表示从头(从队列头部)读取所有消息。当然啦,你得快点这样做,否则新生产的消息将最旧的消息挤出队列就再也读不到这些pending_id中的消息了。

PS:只有消费者组模式才会有 pending
数组和ack的概念,对于普通的消费者模式而言则没有这两个概念,普通的消费者从队列中获取数据后无需执行xack。

_
_

PS:消费者组的消费者一经创建,除非用户主动执行删除消费者(xgroup delconsumer
命令),这些消费者会一直存在,它们的pending_ids也会一直维护着。创建这些消费者和消费者组的客户端挂掉或者销毁连接是不会导致这些消费者删除的。

_
_

_
_

附:记录一次线上stream积压的排查和解决

这里记录一个我本人遇到的线上redis stream队列积压的问题。

场景如下:stream设定的最大消息长度为20w。消息以平均5000条/s的速度持续的写入到stream中,PHP
swoole的多进程(worker_num = 2,一个消费者组,2个消费者)消费着stream中的消息,基本能达到生产和消费的平衡。

之后发布新提交的代码后,由于代码有bug,导致stream内的消息瞬间飙升到19万多。回退代码之后,重启消费者发现stream中的消息数量维持在了19w多,既没有减少,也没有增多。当时怀疑是刚好生产和消费速度持平,于是尝试增加
worker_num = 6,也就是消费者的个数增加到 6 个,但仍然保持现状。

原因分析:

当时的消费代码是先用 xreadgroup … > 读数据,再执行业务代码,最后不论业务代码执行成功还是失败,都会执行 xack 和 xdel。

while(true){
  try{
    $messages = $redis->executeRaw("xreadgroup ... >");			// 每个消费者一次性从队列中取10条数据

    // 业务代码
  }catch(\Throwable $e){
      // 省略
  }finally{
    $redis->executeRaw("xack ...");			// 不论业务处理结果如何,都确认这10条消息
    $redis->executeRaw("xdel ...");			// 从 stream 中删除这10条消息
  }
}

时刻1:有bug的代码发布

当时有bug的代码发布之后,worker进程从stream获取消息,但下方的业务代码发生了 Fatal Error
导致worker进程直接exit,没有执行到finally中的代码。

为了维持worker_num=2,swoole进程池在子进程挂掉之后会重新初始化进程,重复上述代码里的工作。

于是消息不停的从stream中被获取,但这些消息既没有xack,也没有xdel。没有xack导致消费者的pending_ids(即PEL列表)不断增长,而没有xdel导致stream内消息不断增长,然后保持在20w的满队状态。

新生产的消息不断的将旧的消息挤出队列。

时刻2:回退代码 & 重启消费者

回退代码之后,消费者开始正常读取 & 确认 &
删除stream中从19w之后的消息,而前面的这19w+的数据成了死消息。原因是这19w+的消息ID已经记录到了PEL列表中,消费者读取stream消息时会过滤掉这些消息ID不读取。

于是就有了 stream中消息的数量就维持在了19w+条数据,不增加也不减少,而消费者也有正常在消费 的诡异景象。

执行 xlen 结果为 19.4w+ 条,执行 xpendings 结果有600w+ 条。也就是说,600w 条消息已读取但未确认,有 580w+
的消息在stream中丢失,只剩下19.4w+条死消息还留在队列里。

解决措施:

1.先用 xpending 查找group中已读取但未确认的消息的 PENDING_MAX_ID 和
PENDING_MIN_ID,这个ID范围内的消息就是在stream中没有被消费掉而之后也不会被消费的死消息ID 以及 stream丢失的消息ID;

xpending zbpstream zbpgroup     // 不指定 start end count的话,该命令会返回pending ID的最大值和最小值,而不会返回所有pending ID。

2. 从stream中删除掉这些死数据。

xtrim zbpstream MINID 1709482482296-0		// 使用xtrim命令删除掉stream中比 PENDING_MAX_ID 小的消息

3. 清空消费者PEL列表,否则这些PEL中保存的消息ID也会占用内存。

具体做法是遍历消费者组中的 pending 消息ID,并针对一条条消息ID执行XACK将其置为已确认。

循环执行下面的命令

xpending zbpstream zbpgroup IDLE  60000 - + 1000  // 表示从 PEL 列表中获取1000条超过60秒没有被确认的消息。

xack zbpstream zbpgroup 消息ID1 消息ID2 ...

PS:
XACK的作用就是将这条消息ID从PEL队列移除,并将last_delivered_id指针移动到stream的这条消息上,但XACK的消息并不会从stream中删除,必须显式的调用XDEL才可以。

解决措施的不足和补充:

上述处理方法虽然解决了积压的问题,但是:

  1. 死信消息没有被重新消费,而是直接被干掉了(就算手动跑脚本重新消费,也只能消费还留存在stream中的19w条,而丢失的580w条就真的没法找回来了);

  2. 需要每隔一段时间手动跑脚本清理积压的数据;

针对这几个不足,目前比较官方的做法是在消费者组中创建一个专门处理死信消息的消费者,让它来决定是直接清掉这些pending消息还是重新消费。具体做法如下:

1. 一开始swoole需要起了1个管理进程 和
3个worker进程(应该说是n+1个worker进程,这里n=2,所以是2+1=3),其中两个worker进程操控stream的2个普通消费者A和B,还有一个worker进程操控专门处理死消息的消费者C;

2.
我们在管理进程中起一个每秒触发1次的计时器,该计时器负责执行xpending命令检查stream中是否存在pending消息并获取pending消息ID;

3.计时器内,如果检测到有pending消息,通过 xclaim 命令将获取到的pending消息ID
转移到专门处理死消息的消费者C上(具体来说是转移到C的PEL列表上);

4.消费者C就会自动处理每秒投递过来的pending消息,消费者C的行为可以依据消息的重要程度做出不同的策略,具体见下方;

这样做的好处是,这些pending消息会一条条的被处理掉,而不是在stream中被丢失掉。

操控 消费者AB 和 消费者C 的代码的不同之处在于:

  • 消费者AB采用 “xreadgroup … >” 的方式读取它们没读过的消息,而消费者C采用 "xreadgroup … 0"的方式读取pending消息;

  • 消费者AB负责将消息打包并投递给业务侧处理。消费者C的行为则可以依据消息的重要程度做出不同的策略:

    • 对于不重要丢失了也无所谓的消息可以直接xack & xdel处理;

    • 对于重要的会影响数据一致性的消息,则需要由C重新投递给业务侧处理,但前提是业务侧的逻辑是幂等的,因为我们无法确定这些消息在被消费者A和B弄丢的时候是否已经交给业务侧处理过了;

    • 甚至于我们可以将这些消息写入到db,之后交给离线任务慢慢重跑;

二、stream的相关命令和常见操作

1. 创建stream并往stream中写入消息

XADD key [NOMKSTREAM] [<MAXLEN | MINID> [= | ~] threshold  [LIMIT count]] <* | id> field value [field value ...]

向student_stream添加消息(自动生成ID)

xadd zbpstream maxlen = 2000 * name zbp age 27

参数说明:

key: stream名称

[MAXLEN [= | ~] threshold]: 限制 key 中消息的最大数量,超过MAXLEN会删除最先加入的消息。

  • = 用于精确限制。精确限制 key 中消息的数量是低效的。

  • ~ 符号用于粗略的限制 key 中消息的数量。redis 会在可以删除整个宏结点时才去删除多余的消息,实际数量可能会比限制数量多几十个,这是正常的,但是不会少于限制的数量。

<* | id>: 添加消息的ID。

  • id: 自己直接指定消息ID。指定的id不能小于上一个添加的消息ID

  • *: Redis自动生成ID。自动生成的ID格式为:-

2. 读消息(非消费者组模式)

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]

参数说明:

[BLOCK milliseconds]:如果Stream中没数据,等待数据的时间

STREAMS key:指定Stream名称

id:消息起始id,返回大于(不含等于)该id的消息。

如果想返回所有,id可以配置为:0-0;

如果只想监听最新的消息,id可以配置为 $

3. xgroup命令组;

  • 创建一个消费者组

    xgroup create 流的key 消费者组名称 id-or-$ // 0表示该消费者组从队列的开始位置开始消费

该命令在指定的 key 中创建分组,并且指定分组读取消息的起点,如果指定了0,分组将可以读取指定 key 的所有历史消息,如果指定了 $,分组将可以读取指定
key 的新消息,将不能读取历史消息。也可以指定任意的开始 ID。

  • 为消费者组重新设定消息读取的起点

    XGROUP SETID key groupname id-or-$]

如将起点设置为 0就可以重新读取所有的历史消息。

  • 销毁消费者组

    XGROUP DESTROY key groupname

  • 创建消费者和删除消费者

    XGROUP CREATECONSUMER key groupname consumername
    XGROUP DELCONSUMER key groupname consumername

其他命令:

xlen 获取队列中消息个数;

xrange 按范围获取消息(可用于遍历队列中的消息);

xdel 删除指定ID的消息;

xtrim 按范围删除消息;

4. xinfo命令组

  • 查看指定stream的所有信息

    XINFO STREAM key

输出结果:

127.0.0.1:6379> xinfo stream message
 1) "length"							// 长度
 2) (integer) 4
 3) "radix-tree-keys"			// radix-tree是一种存储和快速查找key的数据结构,后面再说,先忽略
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "last-generated-id"		// 最新的一个消息ID
 8) "1604494179721-0"
 9) "groups"							// 消费者组个数
10) (integer) 1
11) "first-entry"					// 第一个消息内容
12) 1) "1604476672762-0"
    2) 1) "key3"
       2) "value3"
13) "last-entry"					// 最后一个消息内容
14) 1) "1604494179721-0"
    2) 1) "readkey"
       2) "readvalue"
  • 查看指定stream的消费者组信息

    XINFO GROUPS key

输出结果:

127.0.0.1:6379> xinfo groups message
1) 1) "name"					// 消费者组的名称
   2) "read_group"
   3) "consumers"			// 里面的消费者个数
   4) (integer) 1
   5) "pending"				// 该消费者组的pending消息个数
   6) (integer) 4
   7) "last-delivered-id"		// 该消费者组最后一次读取并确认的消息ID
   8) "1604494179721-0"
  • 查看指定stream的指定消费者组的指定消费者信息

    XINFO CONSUMERS key groupname

输出结果:

127.0.0.1:6379> xinfo consumers message read_group
1) 1) "name"			// 消费者名称
   2) "read"
   3) "pending"		// pending消息数
   4) (integer) 4
   5) "idle"			// 消息在该消费者手上的空闲时间
   6) (integer) 206796

5. xpending命令组

XPENDING key group [start end count] [consumer]

xpending 命令可以查看对应分组中未确认的消息的数量和其所对应的消费者的名字还有起始和终止 ID。

示例:

  • 不指定具体的消费者

    127.0.0.1:6379> xpending message readgroup

    1. (integer) 2
    2. “1604496633846-0”
    3. “1604496640734-0”
        1. “read”
        2. “2”
  • 指定具体的消费者

    127.0.0.1:6379> xpending message readgroup - + 10 read

      1. “1604496633846-0”
      2. “read”
      3. (integer) 513557
      4. (integer) 4
      1. “1604496640734-0”
      2. “read”
      3. (integer) 482927
      4. (integer) 1

6. xreadgroup 读取消息(消费者组模式)

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

[NOACK],不需要确认消息,适用于不怎么重要的可以丢失的消息;

ID [ID …],指定的从某消息 ID之后的消息读起,> 指定表示读取所有未消费的消息,0表示从头读起;

xreadgroup 命令通过与消费者组和消费者的结合可以做到消息的读取与确认,在 xread 的基础上细化了读取消息操作。

7. xack 确认消息

XACK key group ID [ID ...]

xack 命令从 pending 队列中删除挂起的消息,即未确认的消息。

当使用 xreadgroup 命令读取消息时,消息同时被存储到 PEL队列 中,等待被确认,调用 xack 命令可以从 PEL
中删除挂起的消息并且释放内存,确保不丢失消息。

每个消费者都会维护一个PEL队列,每个消费者的PEL队列会被消费者组内的所有消费者共享,这样消费者B才不会读取到消费者A读取过但还未确认的消息。

8. xclaim更改消息的所有权

XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [FORCE] [JUSTID]
  • key,指定的 key

  • group,指定的分组

  • consumer,指定的消费者

  • min-idle-time,指定消息最小空闲数,指定空闲了多久以上的消息会被选中(毫秒)

  • ID [ID …],消息的 ID

  • [IDLE ms],设置消息的空闲时间,如果不提供,默认为 0

  • [TIME ms-unix-time],和IDLE相同,unix 时间戳

  • RETRYCOUNT,设置重试次数,通常 xclaim 不会改变这个值,它通常用于 xpending 命令,用来发现一些长时间未被处理的消息。

  • FORCE,在 PEL 中创建待处理消息,即使指定的 ID 尚未分配给客户端的PEL。

  • JUSTID,只返回认领的消息 ID 数组,不返回实际消息。

如果有消费者在读取了消息之后未处理完成就挂掉了,那么消息会一直在 pending 队列中,占用内存,这时需要使用 xclaim
命令更改此条消息的所属者(更改所属者后,该消息还是pending状态),让其他的消费者去消费这条消息(这也是redis
stream中死信消息的正确处理流程)。

三、stream的数据结构

Redis Stream主要由消息、生产者、消费者、消费组4部分组成。

redis stream 的底层实现主要使用了 listpack 以及 rax 树,下面一一介绍。

1. radix tree 和 rax 树

前缀树是字符串查找时,经常使用的一种数据结构,能够在一个字符串集合中快速查找到某个字符串。

Radix Tree 是属于前缀树的一种类型。前缀树也称为 Trie Tree,其特点是,保存在树上的每个 key
会被拆分成单字符,然后逐一保存在树上的节点中。

前缀树的根节点不保存任何字符,而除了根节点以外的其他节点,每个节点只保存一个字符。当把从根节点到当前节点的路径上的字符拼接在一起时,就可以得到相应 key
的值了。

前缀树示例图

             (f) ""
               \
               (o) "f"
                 \
                 (o) "fo"
                   \
                 [t   b] "foo"
                 /     \
        "foot" (e)     (a) "foob"
               /         \
     "foote" (r)         (r) "fooba"
             /             \
   "footer" []             [] "foobar"

raxNode定义

typedef struct raxNode {
    uint32_t iskey:1;     //节点是否包含key
    uint32_t isnull:1;    //节点的值是否为NULL
    uint32_t iscompr:1;   //节点是否被压缩
    uint32_t size:29;     //节点大小
    unsigned char data[]; //节点的实际存储数据
} raxNode;

该结构中的成员变量包括 4 个元数据,这四个元数据的含义分别如下。

iskey:表示从 Radix Tree 的根节点到当前节点路径上的字符组成的字符串,是否表示了一个完整的 key。如果是的话,那么 iskey 的值为
1。否则,iskey 的值为 0。不过,这里需要注意的是,当前节点所表示的 key,并不包含该节点自身的内容。

isnull:表示当前节点是否为空节点。如果当前节点是空节点,那么该节点就不需要为指向 value 的指针分配内存空间了。

iscompr:表示当前节点是非压缩节点,还是压缩节点。

size:表示当前节点的大小,具体值会根据节点是压缩节点还是非压缩节点而不同。如果当前节点是压缩节点,该值表示压缩数据的长度;如果是非压缩节点,该值表示该节点指向的子节点个数。

这 4 个元数据就对应了压缩节点和非压缩节点头部的 HDR,其中,iskey、isnull 和 iscompr 分别用 1 bit 表示,而 size 占用
29 bit。

另外,从 raxNode 结构体中,可以看到,除了元数据,该结构体中还有 char 类型数组 data。需要知道的是,data
是用来保存实际数据的。不过,这里保存的数据会根据当前节点的类型而有所不同:

对于非压缩节点来说,data 数组包括子节点对应的字符、指向子节点的指针,以及节点表示 key 时对应的 value 指针;

对于压缩节点来说,data 数组包括子节点对应的合并字符串、指向子节点的指针,以及节点为 key 时的 value 指针。

在 raxNode 的实现中,无论是非压缩节点还是压缩节点,具有两个特点:

它们所代表的 key,是从根节点到当前节点路径上的字符串,但并不包含当前节点;

它们本身就已经包含了子节点代表的字符或合并字符串。而对于它们的子节点来说,也都属于非压缩或压缩节点,所以,子节点本身又会保存,子节点的子节点所代表的字符或合并字符串。

而这两个特点给 Radix Tree 实际保存数据时的结构,有两个特征。

首先,Radix Tree
非叶子节点,要么是压缩节点,只指向单个子节点,要么是非压缩节点,指向多个子节点,但每个子节点只表示一个字符。所以,非叶子节点无法同时指向表示单个字符的子节点和表示合并字符串的子节点。

其次,对于 Radix Tree 的叶子节点来说,因为它没有子节点,所以,Redis 会用一个不包含子节点指针的 raxNode
节点来表示叶子节点,也就是说,叶子节点的 raxNode 元数据 size 为 0,没有子节点指针。如果叶子节点代表了一个 key,那么它的 raxNode
中是会保存这个 key 的 value 指针的。

如果你觉得晦涩不易懂,坚持,继续往下看~

非压缩节点

这类节点会包含多个指向不同子节点的指针,以及多个子节点所对应的字符。

同时,如果从根节点到一个非压缩节点的路径上的字符串,已经对应了 Radix Tree 中保存的一个 key,那么这个非压缩节点中还包含了指向这个 key
对应的 value 的指针。

结合 raxNode 结构来看看,如果是一个非压缩 node ,当我们有size个字符(bytes),就会有相对应size 个指向子节点 raxNode
的指针;

需要注意的是,字符不是存在子节点,而是存在于父节点中(可以思考下, 为什么不存在子节点中?)。

[header iscompr=0][abc][a-ptr][b-ptr][c-ptr](value-ptr?)

压缩节点

这类节点会包含一个指向子节点的指针,以及子节点所代表的合并的字符串。

和非压缩节点类似,如果从根节点到一个压缩节点的路径上的字符串,已经对应了 Radix Tree 中保存的一个 key,那么,这个压缩节点中还包含指向这个
key 对应的 value 的指针。

注意,压缩节点仅有一个子节点。

图解Radix Tree

假设Radix Tree树中包含以下几个字符串 “foo”, “foobar” 和 “footer”;如果node节点代表rax树中的一个key,就写在
[] 里面,反之写在 () 里面。

              (f) ""
                \
                (o) "f"
                  \
                  (o) "fo"
                    \
                  [t   b] "foo"
                  /     \
         "foot" (e)     (a) "foob"
                /         \
      "foote" (r)         (r) "fooba"
              /             \
    "footer" []             [] "foobar"

然而,这里有个常见优化点,对于连续只有单个子节点的node可以压缩成一个 「压缩节点」,因此,上面的树形图变成了下面这种:

              ["foo"] ""
                 |
              [t   b] "foo"
              /     \
    "foot" ("er")    ("ar") "foob"
             /          \
   "footer" []          [] "foobar"

这便是Radix Tree的模型图了。

不过,这种树形图实现上却是有点麻烦,比如 当字符串 “first” 要插入上面的 Radix Tree 中,这里就涉及到节点切割了,因为此时 “foo”
就不再是一个公共前缀了,最终树形图如下:

                (f) ""
                /
             (i o) "f"
             /   \
"fi"  ("rst")  (o) "fo"
          /        \
"first" []       [t   b] "foo"
                 /     \
       "foot" ("er")    ("ar") "foob"
                /          \
      "footer" []          [] "foobar"

那 stream 又是如何利用 rax 来提高检索效率并节省空间的呢?

首先,stream 作为一款基于内存的消息队列来设计,针对每一条消息,都必须设置唯一且递增的消息 ID。
一般我们选择自动生成 ID,这种一段时间内的连续 ID,前缀都有一些高度重复性;你也知道,这种重复的前缀正好符合 rax 结构高效存储的特性。

因此,stream 中的 ID 就是通过 rax 来进行存储的。

既然消息 ID 已经存下了,对应的消息存在哪里的?别着急,继续往下看~

2. listpack

redis 源码对于 listpack 的解释为 A lists of strings serialization
format,一个字符串列表的序列化格式,也就是将一个字符串列表进行序列化存储。

listpack 也叫紧凑列表,它的特点就是用一块连续的内存空间来紧凑地保存数据,同时为了节省内存空间,listpack
列表项使用了多种编码方式,来表示不同长度的数据,这些数据包括整数和字符串。

listpack结构图

listpack 由4部分组成:Total Bytes、Num Elem、Entry 以及 End。

Total Bytes 为整个 listpack 的空间大小,占用4个字节。

Num Elem 为 listpack 中的元素个数,即 Entry 的个数,占用2个字节。

Entry 为每个具体的元素。

End 为 listpack 特定的结束标志,占用1个字节,内容为0xFF。

简单的理解 listpack 就是一款专门为节省内存空间,通过特定的编码方式将数据进行编码和解码的数据结构,这种结构天生就是为节省空间而存在的。

相信你已经想到了, 我们实际的消息就是通过 listpack 结构来存储的;上面我们讲到,消息 ID 已经存入了 rax 树中;

如果你了解 rax 结构应该知道,树中对应 key 的节点有一个 data 域,这里可以存储数据;

相同的,这里写入了 key 对应消息的指针,指向 listpack 结构,也就是 stream 的具体消息内容了。

3. stream 结构

redis stream 的实现依赖于 rax 结构以及 listpack 结构。每个消息流都包含一个 rax 结构,以消息ID 为
key、listpack结构为 value 存储在 rax 结构中。每个消息的具体信息存储在这个 listpack 中。

stream 结构 示例图

每个 listpack 都有一个 master entry,该结构中存储了创建这个 listpack 时待插入消息的所有
field,这种保存方式其实也是为了节省内存空间,这是因为很多消息的键是相同的,保存一份就行。

每个 listpack 中可能存储多条消息

stream 结构:

typedef struct stream {
    rax rax;               / The radix tree holding the stream. /
    uint64_t length;        / Number of elements inside this stream. /
    streamID last_id;       / Zero if there are yet no items. */
    rax cgroups;           / Consumer groups dictionary: name -> streamCG */
} stream;

rax 存储消息生产者生产的具体消息,每个消息有唯一的 ID。以消息 ID 为键,消息内容为值存储在 rax 中,值得注意的是,rax
中的一个节点可能存储多个消息。

length 代表当前 stream 中的消息个数(不包括已经删除的消息)。

last_id 为当前 stream 中最后插入的消息的 ID, stream 为空时,设置为0。

cgroups 存储了当前 stream 相关的消费组,以消费组的组名为键,streamCG 为值存储在 rax 中。

消费组实现

消费组是 stream 中的一个重要概念,每个 stream会有多个消费组,每个消费组通过组名称进行唯一标识,同时关联一个 streamCG
结构,该结构定义如下:

/* Consumer group. /
typedef struct streamCG {
    streamID last_id;       / 已经消费的最新的一个消息ID /
    rax pel;               / Pending entries list. 已经消费但没有ACK的消息列表/
    rax consumers;         / 消费组所包含的消费者列表 */
} streamCG;

特别说明的是 PEL 结构:

pel 为该消费组尚未确认的消息,并以消息ID 为键,以 streamNACK 为值。

消费组的概念是作者受到 kafka 消费组的启发而来;在 stream 的消费组中:

每个消费组通过组名称唯一标识,每个消费组都可以消费该消息队列的全部消息,多个消费组之间相互独立。

每个消费组可以有多个消费者,消费者通过名称唯一标识,消费者之间的关系是竞争关系,也就是说一个消息只能由该组的一个成员消费。

组内成员消费消息后需要确认,每个消息组都有一个待确认消息队列(pending entry list, pel),用以维护该消费组已经消费但没有确认的消息。

消费组中的每个成员也有一个待确认消息队列,维护着该消费者已经消费尚未确认的消息。

实际场景可以有多个生产者不断发布消息,同时可以有多个消费组进行监听消息;另外,也支持非消费组的形势处理消息。

如下图:

消费者实现

每个消费者通过 streamConsumer 唯一标识,该结构如下:

typedef struct streamConsumer {
    mstime_t seen_time;         /* 消费者上一次活跃时间 /
    sds name;                   / 消费者名字,组内唯一,区分大小写 */
    rax pel;                   / 该消费者消费但未 ACK 的消息列表. rax 结构中 key 便是
消息ID,value 指针指向 streamNACK 结构,记录的是该条消息的处理次数和上次处理时间 */
} streamConsumer;

每一个消费者都有一个 PEL 结构,用来保存未 ACK 消息的元数据;值得注意的是,这里并不会保存完整的消息,仅保存了消息 ID 和 处理情况的元数据;

因此,当你想从 PEL 中恢复数据时,你需要先从 PEL 中拿到消息 ID 列表,然后再从原 stream 列表中根据 ID 查询具体消息信息。

未确认消息实现

未确认消息(streamNACK)维护了消费组或者消费者尚未确认的消息。

/* Pending (yet not acknowledged) message in a consumer group. /
typedef struct streamNACK {
    mstime_t delivery_time;     / Last time this message was delivered. /
    uint64_t delivery_count;    / Number of times this message was delivered.*/
    streamConsumer consumer;   / The consumer this message was delivered to
                                   in the last delivery. */
} streamNACK;

该结构用于 PEL 队列中存储消息的元数据信息,比如 上次处理时间、处理次数以及上一次被哪个消费者处理的。

相信细心的你,已经发现了 streamCG 结构和 streamConsumer 结构都有一个 PEL 字段,那它们有什么关联?

首先,streamCG 作用范围是整个消费组,而 streamConsumer 范围是一个消费者。

streamCG 中包含的是整个消费组的未 ACK 列表,而 streamCG 是单个消费者的未 ACK 列表。

你可能想问,两者是不是包含关系?数据是不是有重复记录?确实是包含关系,但为什么要这样记录多次?

首先,PEL 也是一颗 rax 树结构,消息 ID 构成这棵树,value 是一个指针;得益于 rax 本身的特性,这棵树本身不会占用多少内存

这样写可以高效的应对多种数据查询,比如查询单个消费者的、或者整个消费组的,指定数量或者全部等
值得注意的是,两者 PEL 结构中,key 对应的 value 指向的是同一个 streamNACK 对象,也就是说,这个元数据是共享的

迭代器实现

为了遍历stream中的消息,Redis 提供了streamIterator 结构。

使用迭代器的好处是,提供一系列抽象去遍历 stream,不用在关心实际的 radix tree + listpack 实现。stream
中一般是内部使用迭代器,比如 streamReplyWithRange(),AOF 操作等。

typedef struct streamIterator {
    stream stream;         / The stream we are iterating. /
    streamID master_id;     / ID of the master entry at listpack head. /
    uint64_t master_fields_count;       / Master entries # of fields. */
    unsigned char master_fields_start; / Master entries start in listpack. */
    unsigned char master_fields_ptr;   / Master field to emit next. /
    int entry_flags;                    / Flags of entry we are emitting. /
    int rev;                / True if iterating end to start (reverse). /
    uint64_t start_key[2];  / Start key as 128 bit big endian. /
    uint64_t end_key[2];    / End key as 128 bit big endian. /
    raxIterator ri;         / Rax iterator. */
    unsigned char lp;      / Current listpack. */
    unsigned char lp_ele;  / Current listpack cursor. */
    unsigned char lp_flags; / Current entry flags pointer. /
    / Buffers used to hold the string of lpGet() when the element is
     * integer encoded, so that there is no string representation of the
     * element inside the listpack itself. */
    unsigned char field_buf[LP_INTBUF_SIZE];
    unsigned char value_buf[LP_INTBUF_SIZE];
} streamIterator;

stream 为当前迭代器正在遍历的消息流。

消息内容实际存储在 listpack 中,每个 listpack 都有一个 master entry(也就是第一个插入的消息), master_id
为该消息 id。

master_fields_count 为 master entry 中 field 域的个数。master_fields_start 为 master
entry field 域存储的首地址。

当 listpack 中消息的 field 域与 master entry 的 field 域完全相同时,该消息会复用 master entry 的
field 域,在我们遍历该消息时,需要记录当前所在的field域的具体位置,master_fields_ptr就是实现这个功能的。

entry_fags 为当前遍历的消息的标志位。

rev 代表当前迭代器的方向。

start_key, end_key 为该迭代器处理的消息 ID 的范围。

ri 为 rax 迭代器,用于遍历 rax 中所有的key。

lp为当前 listpack 指针。

lp_ele 为当前正在遍历的 listpack 中的元素。

lp_fags 指向当前消息的 fag 域。

field_buf, value_buf 用于从 listpack 读取数据时的缓存。

参考文章:

redis的stream类型命令详解:

https://segmentfault.com/a/1190000037729316

redis消息中间件实现方案之一 :stream介绍:

https://zhuanlan.zhihu.com/p/666789892

Redis Stream 数据结构实现原理真的很强:

https://www.51cto.com/article/766539.html

Stream:redis5.0 定制版消息队列底层实现:

https://www.51cto.com/article/766539.html


评论