Redis作为基于内存运行的数据库提供丰富的数据类型,特殊的有如下四种:Stream, BitMap, Geospatial, HyperLogLog 。下面就来探讨一下Stream

简介

Redis Stream 是 Redis 5.0 版本新增加的数据结构,用于储存和处理信息流。主要应用于消息队列。

此外redis也有两种方案实现消息队列:

  • List: 实现消息队列的方式不能重复消费,一个消息消费完就会被删除,而且生产者需要自行实现全局唯一 ID。

  • PubSub: 发布订阅模式,不能持久化也就无法可靠的保存消息,并且对于离线重连的客户端不能读取历史消息的缺陷。

三者的比较如下:

List

PubSub

Stream

消息持久化

支持

不支持

支持

阻塞读取

支持

支持

支持

消息堆积处理

受限于内存空间,可以利用多消费者加快处理

受限于消费者缓冲区

受限于队列长度,可以利用消费者组提高消费速度,减少堆积

消息确认机制

不支持

不支持

支持

消息回溯

不支持

不支持

支持

1. 消息队列相关命令

  • XADD - 添加消息到末尾

  • XTRIM - 对流进行修剪(保留最新),限制长度

  • XDEL - 删除消息

  • XLEN - 获取流包含的元素数量,即消息长度

  • XRANGE - 获取消息列表,会自动过滤已经删除的消息

  • XREVRANGE - 反向获取消息列表,ID从大到小

  • XREAD - 以阻塞或非阻塞方式获取消息列表

1.1 XADD

XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count] *|ID field value [field value ...]
  • key : 队列名称

  • [NOMKSTREAM] : 如果队列不存在,是否自动创建,默认是自动创建

  • [MAXLEN|MINID [=|~] threshold [LIMIT count] : 设置消息队列的最大消息数量

  • *|ID : 消息的唯—id,*代表由Redis自动生成。格式是"时间戳-递增数字

  • field value [field value ...]: 发送到队列中的消息,称为Entry。格式就是多个key-value键值对

例如:创建名为users的队列,并向其中发送一个消息,内容是:{name=iack.age=21},并且使用Redis自动生成ID

127.0.0.1:6379[1]> XADD users * name jack age 21
"1730620626474-0"

1.2 XTRIM

XTRIM key MAXLEN [~] count
  • key : 队列名称

  • MAXLEN : 长度

  • count : 数量

例如:创建了一个ages队列,发送三条消息,保留最近的两条:

127.0.0.1:6379[1]> XADD ages * a1 11 b1 22
"1730621638318-0"
127.0.0.1:6379[1]> XADD ages * c2 22
"1730621643105-0"
127.0.0.1:6379[1]> XADD ages * d4 56
"1730621647633-0"
127.0.0.1:6379[1]> XRANGE ages - +
1)  1)"1730621638318-0"
  2)  1)"al"
      2)"11"
      3)"bl"
      4)"22"
2)1)"1730621643105-0"
    2)1)"c2"
      2)"22"
3)1)"1730621647633-0"
    2)1)"d4"
      2)"56"
127.0.0.1:6379[1]> XTRIM ages MAXLEN 2
(integer) 1
127.0.0.1:6379[1]> XRANGE ages - +
1) 1) "1730621643105-0"

1.3 XDEL

XDEL key ID [ID ...]
  • key: 队列名称

  • ID: 消息 ID

例如:删掉1.2.1中添加的队列。

127.0.0.1:6379[1]> XDEL users 1730620626474-0
(integer) 1

1.4 XLEN

XLEN key

例如:查询1.2.2中之后的队列长度:

127.0.0.1:6379[1]> XLEN ages
(integer) 2

1.5 XRANGE

XRANGE key start end [COUNT count]
  • key: 队列名

  • start: 开始值, - 表示最小值

  • end : 结束值, + 表示最大值

  • count : 数量

例如:添加一个长度为4的foods队列,并查询最早的两个:

127.0.0.1:6379[1]> XADD foods * apple 1
"1730622940774-0"
127.0.0.1:6379[1]> XADD foods * banana 2
"1730622946663-0"
127.0.0.1:6379[1]> XADD foods * pear 3 
"1730622954120-0"
127.0.0.1:6379[1]> XADD foods * shawarma 4
"1730622987113-0"
127.0.0.1:6379[1]> XLEN foods
(integer) 4
127.0.0.1:6379[1]> XRANGE foods - + count 2
1) 1) "1730622940774-0"
   2) 1) "apple"
      2) "1"
2) 1) "1730622946663-0"
   2) 1) "banana"
      2) "2"

1.6 XREVRANGE

XREVRANGE key end start [COUNT count]
  • key : 队列名

  • end : 结束值, + 表示最大值

  • start : 开始值, - 表示最小值

  • count : 数量

例如:查询1.2.5中最新的两个消息:

127.0.0.1:6379[1]> XREVRANGE foods + - count 2
1) 1) "1730622987113-0"
   2) 1) "shawarma"
      2) "4"
2) 1) "1730622954120-0"
   2) 1) "pear"
      2) "3"

1.7 XREAD

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
  • count :数量

  • milliseconds :可选,阻塞毫秒数,没有设置就是非阻塞模式

  • key :队列名

  • id :起始id,只返回大于该id的消息

    • 0:代表从第一个消息开始;

    • $:代表从最新的消息开始;

例如:使用XREAD读取第一个消息:

127.0.0.1:6379[1]> XREAD COUNT 1 STREAMS foods 0
1) 1) "foods"
   2) 1) 1) "1730622940774-0"
         2) 1) "apple"
            2) "1"

例如:使用阻塞方式,读取最新消息:

127.0.0.1:6379[1]> XREAD COUNT 1 BLOCK 1000 STREAMS foods $
(nil)
(1.09s)

所以在业务开发中,可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果。

2. 消费者组相关命令

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。

  • XGROUP CREATE - 创建消费者组

  • XREADGROUP GROUP - 读取消费者组中的消息

  • XACK - 将消息标记为"已处理"

  • XGROUP SETID - 为消费者组设置新的最后递送消息ID

  • XGROUP DELCONSUMER - 删除消费者

  • XGROUP DESTROY - 删除消费者组

  • XPENDING - 显示待处理消息的相关信息

  • XCLAIM - 转移消息的归属权

  • XINFO - 查看流和消费者组的相关信息

  • XINFO GROUPS - 打印消费者组的信息

  • XINFO STREAM - 打印流信息

  • XINFO CONSUMERS - 打印消费者的信息

2.1 XGROUP CREATE

XGROUP CREATEkey groupName ID [MKSTREAM]
  • key:队列名称

  • groupName:消费者组名称

  • ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息

  • MKSTREAM:队列不存在时自动创建队列

从头开始消费:

127.0.0.1:6379[1]> XGROUP CREATE mymq group1 0-0 
(error) ERR The XGROUP subcommand requires the key to exist. Note that for CREATE you may want to use the MKSTREAM option to create an empty stream automatically.  # Redis 期望指定的 key(即流)必须存在
127.0.0.1:6379[1]> XGROUP CREATE mymq group1 0-0 MKSTREAM  # 使用 MKSTREAM 选项,不存在,Redis 会自动创建一个空的流。
OK

从尾部开始消费:

127.0.0.1:6379[1]> XGROUP CREATE mymq group2 $
OK

2.2 XREADGROUP GROUP

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group:消费组名称

  • consumer:消费者名称,如果消费者不存在,会自动创建一个消费者

  • count:本次查询的最大数量

  • BLOCK milliseconds:当没有消息时最长等待时间

  • NOACK:无需手动ACK,获取到消息后自动确认

  • STREAMS key:指定队列名称

  • ID:获取消息的起始ID:

    • >:从下一个未消费的消息开始

    • 其它:根据指定id从pending-List中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始。

例如:如果想从名为 mystream 的流中,以消费者组 mygroup 和消费者 myconsumer 的身份读取消息,可以执行如下命令:

XREADGROUP GROUP mygroup myconsumer STREAMS mystream >

> 表示从第一条尚未被消费的消息开始读取。如果想要读取一定数量的消息(例如10条未被消费的消息),可以添加 COUNT 参数:

XREADGROUP GROUP mygroup myconsumer COUNT 10 STREAMS mystream >

如果想要阻塞一定时间等待新消息(例如最多1000毫秒),可以添加 BLOCK 参数,例如:

XREADGROUP GROUP mygroup myconsumer BLOCK 1000 STREAMS mystream >

2.3 XACK

XACK key group ID [ID ...]
  • key:Redis Stream 的 key 名称。

  • group:消费者组的名称。

  • ID:要确认的消息的 ID。可以一次确认多个消息,只需列出多个 ID。

例如,如果你有一个名为 mystream 的 Stream 和一个名为 mygroup 的消费者组,你想确认消息 ID 为 1523843700000000121-0 的消息,你可以使用以下命令:

XACK mystream mygroup 1523843700000000121-0

2.4 XGROUP SETID

XGROUP SETID key group [id|$] [ENTRIESREAD entries-read]
  • key:指定 Redis Stream 的 key 名称。

  • group:指定消费者组的名称。

  • id|$:指定新的最后下发 ID。可以是任意合法的消息 ID,也可以是 $ 符号,表示从最新的 ID 开始。

  • ENTRIESREAD entries-read:可选参数,用于设置消费者组的 entries-read 属性,从而启用消费者组滞后跟踪。

例如,如果你想让消费组中的消费者重新处理流中的所有消息,你可以将下一个 ID 设置为 0:

XGROUP SETID mystream mygroup 0

或者,如果你想将消费组的最后下发 ID 设置为流的最新 ID,可以使用 $ 符号:

XGROUP SETID mystream mygroup $

2.5 XGROUP DELCONSUMER

XGROUP DELCONSUMER key groupname consumername
  • key:指定消费组关联的 Stream(流)的键名。

  • groupname:指定要操作的消费组的名称。

  • consumername:指定要删除的消费者的名称。

例如:有一个名为 mystream 的流和一个名为 mygroup 的消费者组。在这个消费者组中,有一个名为 consumer1 的消费者。删除这个消费者,可以使用以下命令:

XGROUP DELCONSUMER mystream mygroup consumer1

2.6 XGROUP DESTROY

XGROUP DESTROY key groupname
  • key:指定消费组关联的 Stream(流)的键名。

  • groupname:指定要销毁的消费者组的名称。

例如:删除1.3.5中的消费者组:

XGROUP DESTROY mystream mygroup

2.7 XPENDING

XPENDING key group [IDLE min-idle-time] [start end] [count] [consumer]
  • key:指定 Redis Stream 的 key 名称。

  • group:指定消费者组的名称。

  • IDLE min-idle-time:可选参数,用于过滤出空闲时间超过 min-idle-time 毫秒的消息。

  • start:指定消息 ID 的起始值,- 表示最小值。

  • end:指定消息 ID 的结束值,+ 表示最大值。

  • count:限制返回的消息数量。

  • consumer:可选参数,用于指定只查看特定消费者的挂起消息。

例如:查看消费者组 mygroup 中的前 10 条挂起消息的详细信息。

XPENDING mystream mygroup - + 10

查看消费者组 mygroup 中空闲时间超过 9000 毫秒的前 10 条挂起消息。

XPENDING mystream mygroup IDLE 9000 - + 10

2.8 XCLAIM

XCLAIM key group consumer min-idle-time id [id ...] [IDLE ms] [TIME unix-time-milliseconds] [RETRYCOUNT count] [FORCE] [JUSTID] [LASTID lastid]
  • key:队列名称。

  • group:消费组名称。

  • consumer:消费组里的消费者名称。

  • min-idle-time:消息的最小空闲时间(毫秒),只有空闲时间超过这个值的消息才会被转移。

  • id:消息的ID。

  • IDLE ms:设置消息的新空闲时间(毫秒)。

  • TIME unix-time-milliseconds:设置消息的新时间戳。

  • RETRYCOUNT count:设置消息的新重试计数。

  • FORCE:强制转移消息,即使它尚未达到 min-idle-time

  • JUSTID:只返回消息ID,不返回消息体。

  • LASTID lastid:设置消费者组的最后一个已确认ID。

假设有一个名为mystream的流和一个名为mygroup的消费者组。如果消费者 consumerA 读取了消息但没有处理,并且这个消息已经空闲了超过 3600000 毫秒(1小时),就可以将这个消息转移给消费者 consumerB

XCLAIM mystream mygroup consumerB 3600000 1674984765438-0

执行这个命令后,消息1674984765438-0的所有权将从 consumerA 转移到 consumerB,并且这个消息的空闲时间将被重置为 0。这样,consumerB 就可以处理这条消息了。如果消息成功转移,命令将返回消息的内容;如果消息不存在或未达到 min-idle-time,则返回空数组。

2.9 XINFO

XINFO [consumers key groupName] [groups key] [stream key]
  • [consumers key groupName]:这部分表示要获取特定消费者组(groupName)的信息。consumers 是获取消费者列表的参数,key 是消费者组的名称。

  • [groups key]:这部分用于获取所有消费者组的列表。groups 是获取消费者组列表的参数,key 是消费者组的名称。

  • [stream key]:这部分可能是用来指定要查询的流或主题的名称。

下面是三个例子:

# 查看comsumer信息
127.0.0.1:6379[1]> XINFO consumers test g1
1) 1) "name"
   2) "cgj"
   3) "pending"
   4) (integer) 2
   5) "idle"
   6) (integer) 774090
2) 1) "name"
   2) "myt"
   3) "pending"
   4) (integer) 1
   5) "idle"
   6) (integer) 1181400

# 查看group信息
127.0.0.1:6379[1]> XINFO groups test
1) 1) "name"
   2) "cgj"
   3) "consumers"
   4) (integer) 0
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"
   8) "1578124740094-0"
2) 1) "name"
   2) "g1"
   3) "consumers"
   4) (integer) 2
   5) "pending"
   6) (integer) 3
   7) "last-delivered-id"
   8) "1578134897648-0"
3) 1) "name"
   2) "gq"
   3) "consumers"
   4) (integer) 0
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"
   8) "1578124740094-0"

# 查看stream信息
127.0.0.1:6379[1]> XINFO stream test
 1) "length"
 2) (integer) 10
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "groups"
 8) (integer) 3
 9) "last-generated-id"
10) "1578134897648-0"
11) "first-entry"
12) 1) "1578117062045-0"
    2) 1) "firstName"
       2) "Guanjie"
       3) "lastName"
       4) "Cao"
13) "last-entry"
14) 1) "1578134897648-0"
    2) 1) "msg"
       2) "China"