Redis作为基于内存运行的数据库,提供丰富的数据类型,特殊的有如下四种:Stream
, BitMap
, Geospatial
, HyperLogLog
。下面就来探讨一下Stream。
简介
Redis Stream 是 Redis 5.0 版本新增加的数据结构,用于储存和处理信息流。主要应用于消息队列。
此外redis也有两种方案实现消息队列:
List
: 实现消息队列的方式不能重复消费,一个消息消费完就会被删除,而且生产者需要自行实现全局唯一 ID。PubSub
: 发布订阅模式,不能持久化也就无法可靠的保存消息,并且对于离线重连的客户端不能读取历史消息的缺陷。
三者的比较如下:
1. 消息队列相关命令
XADD - 添加消息到末尾
XTRIM - 对流进行修剪(保留最新),限制长度
XDEL - 删除消息
XLEN - 获取流包含的元素数量,即消息长度
XRANGE - 获取消息列表,会自动过滤已经删除的消息
XREVRANGE - 反向获取消息列表,ID从大到小
XREAD - 以阻塞或非阻塞方式获取消息列表
1.1 XADD
XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count] *|ID field value [field value ...]
key
: 队列名称[NOMKSTREAM]
: 如果队列不存在,是否自动创建,默认是自动创建[MAXLEN|MINID [=|~] threshold [LIMIT count]
: 设置消息队列的最大消息数量*|ID
: 消息的唯—id,*代表由Redis自动生成。格式是"时间戳-递增数字field value [field value ...]
: 发送到队列中的消息,称为Entry。格式就是多个key-value键值对
例如:创建名为users的队列,并向其中发送一个消息,内容是:{name=iack.age=21},并且使用Redis自动生成ID
127.0.0.1:6379[1]> XADD users * name jack age 21
"1730620626474-0"
1.2 XTRIM
XTRIM key MAXLEN [~] count
key
: 队列名称MAXLEN
: 长度count
: 数量
例如:创建了一个ages队列,发送三条消息,保留最近的两条:
127.0.0.1:6379[1]> XADD ages * a1 11 b1 22
"1730621638318-0"
127.0.0.1:6379[1]> XADD ages * c2 22
"1730621643105-0"
127.0.0.1:6379[1]> XADD ages * d4 56
"1730621647633-0"
127.0.0.1:6379[1]> XRANGE ages - +
1) 1)"1730621638318-0"
2) 1)"al"
2)"11"
3)"bl"
4)"22"
2)1)"1730621643105-0"
2)1)"c2"
2)"22"
3)1)"1730621647633-0"
2)1)"d4"
2)"56"
127.0.0.1:6379[1]> XTRIM ages MAXLEN 2
(integer) 1
127.0.0.1:6379[1]> XRANGE ages - +
1) 1) "1730621643105-0"
1.3 XDEL
XDEL key ID [ID ...]
key
: 队列名称ID
: 消息 ID
例如:删掉1.2.1中添加的队列。
127.0.0.1:6379[1]> XDEL users 1730620626474-0
(integer) 1
1.4 XLEN
XLEN key
例如:查询1.2.2中之后的队列长度:
127.0.0.1:6379[1]> XLEN ages
(integer) 2
1.5 XRANGE
XRANGE key start end [COUNT count]
key
: 队列名start
: 开始值, - 表示最小值end
: 结束值, + 表示最大值count
: 数量
例如:添加一个长度为4的foods队列,并查询最早的两个:
127.0.0.1:6379[1]> XADD foods * apple 1
"1730622940774-0"
127.0.0.1:6379[1]> XADD foods * banana 2
"1730622946663-0"
127.0.0.1:6379[1]> XADD foods * pear 3
"1730622954120-0"
127.0.0.1:6379[1]> XADD foods * shawarma 4
"1730622987113-0"
127.0.0.1:6379[1]> XLEN foods
(integer) 4
127.0.0.1:6379[1]> XRANGE foods - + count 2
1) 1) "1730622940774-0"
2) 1) "apple"
2) "1"
2) 1) "1730622946663-0"
2) 1) "banana"
2) "2"
1.6 XREVRANGE
XREVRANGE key end start [COUNT count]
key
: 队列名end
: 结束值, + 表示最大值start
: 开始值, - 表示最小值count
: 数量
例如:查询1.2.5中最新的两个消息:
127.0.0.1:6379[1]> XREVRANGE foods + - count 2
1) 1) "1730622987113-0"
2) 1) "shawarma"
2) "4"
2) 1) "1730622954120-0"
2) 1) "pear"
2) "3"
1.7 XREAD
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
count
:数量milliseconds
:可选,阻塞毫秒数,没有设置就是非阻塞模式key
:队列名id
:起始id,只返回大于该id的消息0
:代表从第一个消息开始;$
:代表从最新的消息开始;
例如:使用XREAD读取第一个消息:
127.0.0.1:6379[1]> XREAD COUNT 1 STREAMS foods 0
1) 1) "foods"
2) 1) 1) "1730622940774-0"
2) 1) "apple"
2) "1"
例如:使用阻塞方式,读取最新消息:
127.0.0.1:6379[1]> XREAD COUNT 1 BLOCK 1000 STREAMS foods $
(nil)
(1.09s)
所以在业务开发中,可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果。
2. 消费者组相关命令
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。
XGROUP CREATE - 创建消费者组
XREADGROUP GROUP - 读取消费者组中的消息
XACK - 将消息标记为"已处理"
XGROUP SETID - 为消费者组设置新的最后递送消息ID
XGROUP DELCONSUMER - 删除消费者
XGROUP DESTROY - 删除消费者组
XPENDING - 显示待处理消息的相关信息
XCLAIM - 转移消息的归属权
XINFO - 查看流和消费者组的相关信息
XINFO GROUPS - 打印消费者组的信息
XINFO STREAM - 打印流信息
XINFO CONSUMERS - 打印消费者的信息
2.1 XGROUP CREATE
XGROUP CREATEkey groupName ID [MKSTREAM]
key
:队列名称groupName
:消费者组名称ID
:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息MKSTREAM
:队列不存在时自动创建队列
从头开始消费:
127.0.0.1:6379[1]> XGROUP CREATE mymq group1 0-0
(error) ERR The XGROUP subcommand requires the key to exist. Note that for CREATE you may want to use the MKSTREAM option to create an empty stream automatically. # Redis 期望指定的 key(即流)必须存在
127.0.0.1:6379[1]> XGROUP CREATE mymq group1 0-0 MKSTREAM # 使用 MKSTREAM 选项,不存在,Redis 会自动创建一个空的流。
OK
从尾部开始消费:
127.0.0.1:6379[1]> XGROUP CREATE mymq group2 $
OK
2.2 XREADGROUP GROUP
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
group
:消费组名称consumer
:消费者名称,如果消费者不存在,会自动创建一个消费者count
:本次查询的最大数量BLOCK milliseconds
:当没有消息时最长等待时间NOACK
:无需手动ACK,获取到消息后自动确认STREAMS key
:指定队列名称ID
:获取消息的起始ID:>
:从下一个未消费的消息开始其它
:根据指定id从pending-List中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始。
例如:如果想从名为 mystream 的流中,以消费者组 mygroup 和消费者 myconsumer 的身份读取消息,可以执行如下命令:
XREADGROUP GROUP mygroup myconsumer STREAMS mystream >
>
表示从第一条尚未被消费的消息开始读取。如果想要读取一定数量的消息(例如10条未被消费的消息),可以添加 COUNT
参数:
XREADGROUP GROUP mygroup myconsumer COUNT 10 STREAMS mystream >
如果想要阻塞一定时间等待新消息(例如最多1000毫秒),可以添加 BLOCK
参数,例如:
XREADGROUP GROUP mygroup myconsumer BLOCK 1000 STREAMS mystream >
2.3 XACK
XACK key group ID [ID ...]
key
:Redis Stream 的 key 名称。group
:消费者组的名称。ID
:要确认的消息的 ID。可以一次确认多个消息,只需列出多个 ID。
例如,如果你有一个名为 mystream 的 Stream 和一个名为 mygroup 的消费者组,你想确认消息 ID 为 1523843700000000121-0 的消息,你可以使用以下命令:
XACK mystream mygroup 1523843700000000121-0
2.4 XGROUP SETID
XGROUP SETID key group [id|$] [ENTRIESREAD entries-read]
key
:指定 Redis Stream 的 key 名称。group
:指定消费者组的名称。id|$
:指定新的最后下发 ID。可以是任意合法的消息 ID,也可以是 $ 符号,表示从最新的 ID 开始。ENTRIESREAD entries-read
:可选参数,用于设置消费者组的 entries-read 属性,从而启用消费者组滞后跟踪。
例如,如果你想让消费组中的消费者重新处理流中的所有消息,你可以将下一个 ID 设置为 0:
XGROUP SETID mystream mygroup 0
或者,如果你想将消费组的最后下发 ID 设置为流的最新 ID,可以使用 $ 符号:
XGROUP SETID mystream mygroup $
2.5 XGROUP DELCONSUMER
XGROUP DELCONSUMER key groupname consumername
key
:指定消费组关联的 Stream(流)的键名。groupname
:指定要操作的消费组的名称。consumername
:指定要删除的消费者的名称。
例如:有一个名为 mystream 的流和一个名为 mygroup 的消费者组。在这个消费者组中,有一个名为 consumer1 的消费者。删除这个消费者,可以使用以下命令:
XGROUP DELCONSUMER mystream mygroup consumer1
2.6 XGROUP DESTROY
XGROUP DESTROY key groupname
key
:指定消费组关联的 Stream(流)的键名。groupname
:指定要销毁的消费者组的名称。
例如:删除1.3.5中的消费者组:
XGROUP DESTROY mystream mygroup
2.7 XPENDING
XPENDING key group [IDLE min-idle-time] [start end] [count] [consumer]
key
:指定 Redis Stream 的 key 名称。group
:指定消费者组的名称。IDLE min-idle-time
:可选参数,用于过滤出空闲时间超过 min-idle-time 毫秒的消息。start
:指定消息 ID 的起始值,-
表示最小值。end
:指定消息 ID 的结束值,+
表示最大值。count
:限制返回的消息数量。consumer
:可选参数,用于指定只查看特定消费者的挂起消息。
例如:查看消费者组 mygroup 中的前 10 条挂起消息的详细信息。
XPENDING mystream mygroup - + 10
查看消费者组 mygroup 中空闲时间超过 9000 毫秒的前 10 条挂起消息。
XPENDING mystream mygroup IDLE 9000 - + 10
2.8 XCLAIM
XCLAIM key group consumer min-idle-time id [id ...] [IDLE ms] [TIME unix-time-milliseconds] [RETRYCOUNT count] [FORCE] [JUSTID] [LASTID lastid]
key
:队列名称。group
:消费组名称。consumer
:消费组里的消费者名称。min-idle-time
:消息的最小空闲时间(毫秒),只有空闲时间超过这个值的消息才会被转移。id
:消息的ID。IDLE ms
:设置消息的新空闲时间(毫秒)。TIME unix-time-milliseconds
:设置消息的新时间戳。RETRYCOUNT count
:设置消息的新重试计数。FORCE
:强制转移消息,即使它尚未达到min-idle-time
。JUSTID
:只返回消息ID,不返回消息体。LASTID lastid
:设置消费者组的最后一个已确认ID。
假设有一个名为mystream的流和一个名为mygroup的消费者组。如果消费者 consumerA 读取了消息但没有处理,并且这个消息已经空闲了超过 3600000 毫秒(1小时),就可以将这个消息转移给消费者 consumerB:
XCLAIM mystream mygroup consumerB 3600000 1674984765438-0
执行这个命令后,消息1674984765438-0的所有权将从 consumerA 转移到 consumerB,并且这个消息的空闲时间将被重置为 0。这样,consumerB 就可以处理这条消息了。如果消息成功转移,命令将返回消息的内容;如果消息不存在或未达到 min-idle-time,则返回空数组。
2.9 XINFO
XINFO [consumers key groupName] [groups key] [stream key]
[consumers key groupName]
:这部分表示要获取特定消费者组(groupName)的信息。consumers 是获取消费者列表的参数,key 是消费者组的名称。[groups key]
:这部分用于获取所有消费者组的列表。groups 是获取消费者组列表的参数,key 是消费者组的名称。[stream key]
:这部分可能是用来指定要查询的流或主题的名称。
下面是三个例子:
# 查看comsumer信息
127.0.0.1:6379[1]> XINFO consumers test g1
1) 1) "name"
2) "cgj"
3) "pending"
4) (integer) 2
5) "idle"
6) (integer) 774090
2) 1) "name"
2) "myt"
3) "pending"
4) (integer) 1
5) "idle"
6) (integer) 1181400
# 查看group信息
127.0.0.1:6379[1]> XINFO groups test
1) 1) "name"
2) "cgj"
3) "consumers"
4) (integer) 0
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1578124740094-0"
2) 1) "name"
2) "g1"
3) "consumers"
4) (integer) 2
5) "pending"
6) (integer) 3
7) "last-delivered-id"
8) "1578134897648-0"
3) 1) "name"
2) "gq"
3) "consumers"
4) (integer) 0
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1578124740094-0"
# 查看stream信息
127.0.0.1:6379[1]> XINFO stream test
1) "length"
2) (integer) 10
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "groups"
8) (integer) 3
9) "last-generated-id"
10) "1578134897648-0"
11) "first-entry"
12) 1) "1578117062045-0"
2) 1) "firstName"
2) "Guanjie"
3) "lastName"
4) "Cao"
13) "last-entry"
14) 1) "1578134897648-0"
2) 1) "msg"
2) "China"