千家信息网

如何进行Redis5新特性中Streams作消息队列的分析

发表于:2025-02-01 作者:千家信息网编辑
千家信息网最后更新 2025年02月01日,这期内容当中小编将会给大家带来有关如何进行Redis5新特性中Streams作消息队列的分析,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。前言Redis 5 新特性
千家信息网最后更新 2025年02月01日如何进行Redis5新特性中Streams作消息队列的分析

这期内容当中小编将会给大家带来有关如何进行Redis5新特性中Streams作消息队列的分析,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

前言

Redis 5 新特性中,Streams 数据结构的引入,可以说它是在本次迭代中最大特性。它使本次 5.x 版本迭代中,Redis 作为消息队列使用时,得到更完善,更强大的原生支持,其中尤为明显的是持久化消息队列。同时,stream 借鉴了 kafka 的消费组模型概念和设计,使消费消息处理上更加高效快速。 数据结构中常用 API 进行分析。

准备

本文所使用 Redis 版本为 5.0.5 。如果使用更早的 5.x 版本,有些 API 使用效果,与本文中描述略有不同。

添加消息

Streams 添加数据使用 XADD 指令进行添加,消息中的数据以 K-V 键值对的形式进行操作。一条消息可以存在多个键值对,添加命令格式:

XADD key ID field string [field string ...]

其中 key 为 Streams 的名称,ID 为消息的唯一标志,不可重复,field string 就为键值对。下面我们就添加以 person 为名称的流,进行操作。

XADD person * name ytao des https://ytao.top

上面添加案例中,ID 使用 * 号复制,这里代表着服务端自动生成 Id,添加后返回数据 "1578238486193-0"

这里自动生成的 Id 格式为 - Id 是由两部分组成:

  1. 鸿蒙官方战略合作共建--HarmonyOS技术社区

  2. millisecondsTime 为当前服务器时间毫秒时间戳。

  3. sequenceNumber 当前序列号,取值来源于当前毫秒内,生成消息的顺序,默认从 0 开始加 1 递增。

比如:1578238486193-3 表示在 1578238486193 毫秒的时间戳时,添加的第 4 条消息。

除了服务端自动生成 Id 方式外,也支持指定 Id 的生成,但是指定 Id 有以下条件限制:

  1. 鸿蒙官方战略合作共建--HarmonyOS技术社区

  2. Id 中的前后部分必须为数字。

  3. 最小 Id 为 0-1,不能为 0-0,但是 2-0,3-0 .... 是被允许的。

  4. 添加的消息,Id 的前半部分不能比存在 Id 最大的值小,Id 后半部分不能比存在前半部分相同的最大后半部分小。

否则,当不满足上述条件时,添加后会抛出异常:

(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

实际上,当添加一条消息时,会进行两部操作。第一步,先判断如果不存在 Streams,则创建 Streams 的名称,再添加消息到 Streams 中。即使添加消息时,由于 Id 异常,也可以在 Redis 中存在以当前 Streams 的名称。 Streams 中 Id 也可作为指针使用,因为它是一个有序的标记。

生产中,如果这样使用添加消息,会存在一个问题,那就是消息数量太大时,会使服务宕机。这里 Streams 的设计初期也有考虑到这个问题,那就是可以指定 Streams 的容量。如果容量操作这个设定的值,就会对调旧的消息。在添加消息时,设置 MAXLEN 参数。

XADD person MAXLEN 5 * name ytao des https://ytao.top

这样就指定该了 Streams 中的容量为 5 条消息。也可使用 XTRIM 截取消息,从小到大剔除多余的消息:

XTRIM person MAXLEN 8

消息数量

查看消息数量使用 XLEN 指令进行操作。

XLEN key

例:查看 person 流中的消息数量:

> XLEN person  (integer) 5

查询消息

查询 Streams 中的消息使用 XRANGE 和 XREVRANGE 指令。

XRANGE

查询数据时,可以按照指定 Id 范围进行查询,XRANGE 查询指令格式:

XRANGE key start end [COUNT count]

参数说明:

  • key 为 Streams 的名称

  • start 为范围查询开始 Id,包含本 Id。

  • start 为范围查询结束 Id,包含本 Id。

  • Count 为查询返回最大的消息数量,非必填。

这里 start 和 end 有-和+两个非指定值,他们分别表示无穷小和无穷大,所以当使用这个两个值时,会查询出全部的消息。

> XRANGE person - +  1) 1) "0-1"     2) 1) "name"        2) "ytao"        3) "des"        4) "https://ytao.top"  2) 1) "0-2"     2) 1) "name"        2) "luffy"        3) "des"        4) "valiant!"  3) 1) "2-0"     2) 1) "name"        2) "gaga"        3) "des"        4) "fishion!"

上面查询的消息数据,可以看到是按照先进先出的顺序查询出来的。

使用 COUNT 指定查询返回的数量:

# 查询所有的消息,并且返回一条数据  > XRANGE person - + COUNT 1  1) 1) "0-1"     2) 1) "name"        2) "ytao"        3) "des"        4) "https://ytao.top"

在范围查询中,Id 的后半部分可省略,后半部分中的数据会全部查询到。

XREVRANGE

XREVRANGE 的查询和 XRANGE 指令中的使用类似,但查询的 start 和 end 参数顺序进行了调换:

XREVRANGE key end start [COUNT count]

使用案例:

> XREVRANGE person +  -  1) 1) "2-0"     2) 1) "name"        2) "gaga"        3) "des"        4) "fishion!"  2) 1) "0-2"     2) 1) "name"        2) "luffy"        3) "des"        4) "valiant!"  3) 1) "0-1"     2) 1) "name"        2) "ytao"        3) "des"        4) "https://ytao.top"

查询后的结果与 XRANGE 的结果顺序刚好相反,其他都一样,这两个指令可进行消息的升序和降序的返回。

删除消息

删除消息使用 XDEL 指令操作,只需指定将要删除的 Streams 名称和 Id 即可,支持一次删除多个消息 。

XDEL key ID [ID ...]

删除案例:

# 查询所有消息  > XRANGE person - +  1) 1) "0-1"     2) 1) "name"        2) "ytao"        3) "des"        4) "https://ytao.top"  2) 1) "0-2"     2) 1) "name"        2) "luffy"        3) "des"        4) "valiant!"  3) 1) "2-0"     2) 1) "name"        2) "gaga"        3) "des"        4) "fishion!"  # 删除消息        > XDEL person 2-0  (integer) 1  # 再次查询删除后的所有消息  > XRANGE person - +  1) 1) "0-1"     2) 1) "name"        2) "ytao"        3) "des"        4) "https://ytao.top"  2) 1) "0-2"     2) 1) "name"        2) "luffy"        3) "des"        4) "valiant!"  # 查询删除后的长度        > XLEN person  (integer) 2

从上面可以看到,删除消息后,长度也会减少相应的数量。

消费消息

在 Redis 的 PUB/SUB 中,我们是通过订阅来消费消息,在 Streams 数据结构中,同样也能实现同等功能,当没有新的消息时,可进行阻塞等待。不仅支持单独消费,而且还可以支持群组消费。

单独消费

单独消费使用 XREAD 指令。可以看到,下面命令中,STREAMS,key, 以及 ID 为必填项。ID 表示将要读取大于该 ID 的消息。当 ID 值使用 $ 赋予时,表示已存在消息的最大 Id 值。

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

上面的 COUNT 参数用来指定读取的最大数量,与 XRANGE 的用法一样。

> XREAD COUNT 1 STREAMS person 0  1) 1) "person"     2) 1) 1) "0-1"           2) 1) "name"              2) "ytao"              3) "des"              4) "https://ytao.top"  > XREAD COUNT 2 STREAMS person 0  1) 1) "person"     2) 1) 1) "0-1"           2) 1) "name"              2) "ytao"              3) "des"              4) "https://ytao.top"        2) 1) "0-2"           2) 1) "name"              2) "luffy"              3) "des"              4) "valiant!"

在 XREAD 里面还有个 BLOCK 参数,这个是用来阻塞订阅消息的,BLOCK 携带的参数为阻塞时间,单位为毫秒,如果在这个时间内没有新的消息消费,那么就会释放该阻塞。当这里的时间指定为 0 时,会一直阻塞,直到有新的消息来消费到。

# 窗口 1 开启阻塞,等待新消息的到来  > XREAD BLOCK 0 STREAMS person $  # 另开一个连接窗口 2,添加一条新的消息  > XADD person 2-2 name tao des coder  "2-2"  # 窗口 1,获取到有新的消息来消费,并且带有阻塞的时间  > XREAD BLOCK 0 STREAMS person $  1) 1) "person"     2) 1) 1) "2-2"           2) 1) "name"              2) "tao"              3) "des"              4) "coder"  (60.81s)

当使用 XREAD 进行顺序消费时,需要额外记录下读取到位置的 Id,方便下次继续消费。

群组消费

群组消费的主要目的也就是为了分流消息给不同的客户端处理,以更高效的速率处理消息。为达到这一肝功能需求,我们需要做三件事:创建群组,群组读取消息,向服务端确认消息以处理。

群组操作

操作群组使用 XGROUP 指令:

XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]

上面命令中,包含操作有:

  • CREATE 创建消费组。

  • SETID 修改下一个处理消息的 Id。

  • DESTROY 销毁消费组。

  • DELCONSUMER 删除消费组中指定的消费者。

我们当前需要使用的是创建消费组:

# 以当前存在的最大 Id 作为消费起始   > XGROUP CREATE person group1 $  OK

群组读取消息

群组读取使用 XREADGROUP 指令,COUNT和BLOCK的使用类似 XREAD 的操作,只是多了个群组和消费者的指定:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

由于群组消费和单独消费类似,这里只进行个阻塞分析,这里 Id 也有个特殊值>,表示还未进行消费的消息:

# 窗口 1,消费群组中,taotao 消费者建立阻塞监听  XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person >  # 窗口 2,消费群组中,yangyang 消费者建立阻塞监听   XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person >  # 窗口 3,添加消费消息  > XADD person 3-1 name tony des 666  "3-1"  # 窗口 1,读取到新消息,此时 窗口 2 没有任何反应  > XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person >  1) 1) "person"     2) 1) 1) "3-1"           2) 1) "name"              2) "tony"              3) "des"              4) "666"  (77.54s)  # 窗口 3,再次添加消费消息  > XADD person 3-2 name james des abc!  "3-2"  # 窗口 2,读取到新消息,此时 窗口 1 没有任何反应  > XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person >  1) 1) "person"     2) 1) 1) "3-2"           2) 1) "name"              2) "james"              3) "des"              4) "abc!"  (76.36s)

以上执行流程中,group1 群组中有两个消费者,当添加两条消息后,这两个消费者轮流消费。

消息ACK

消息消费后,为避免再次重复消费,这是需要向服务端发送 ACK,确保消息被消费后的标记。 例如下列情况,我们上面我们将最新两条消息已进行了消费,但是当我们再次读取消息时,还是被读到:

>  XREADGROUP GROUP group1 yangyang STREAMS person 0  1) 1) "person"     2) 1) 1) "3-2"           2) 1) "name"              2) "james"              3) "des"              4) "abc!"

这时,我们使用 XACK 指令告诉服务器,我们已处理的消息:

XACK key group ID [ID ...]0

让服务器标记 3-2 已处理:

> XACK person group1 3-2  (integer) 1

再次获取群组读取消息:

>  XREADGROUP GROUP group1 yangyang STREAMS person 0  1) 1) "person"     2) (empty list or set)

队列中没有了可读消息。 除了上面以讲解到的 API 外,查看消费群组信息可使用 XINFO 指令查看。

上面对 Streams 常用 API 进行了分析,我们可以感受到 Redis 在消息队列支持的道路上,也越来越强大。如果使用过它的 PUB/SUB 功能的话,就会感受到 5.x 迭代正是将你的一些痛点进行了优化。

上述就是小编为大家分享的如何进行Redis5新特性中Streams作消息队列的分析了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注行业资讯频道。

消息 消费 查询 指令 数据 阻塞 数量 服务 分析 最大 时间 处理 队列 参数 名称 消费者 支持 两个 再次 顺序 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 微软ntp服务器地址 畅捷通T3数据库的默认sa口令 数据库脚本导出怎么排列 运用了通信网络技术的行业 数据库平台建设一般多少钱 山东本地网络安全企业 java 多数据库支持 服务器5m 非关系型数据库采用的是动态结构 db2 管理服务器创建 密码编程学与网络安全 百望税控盘连接服务器失败怎么办 数据库 新增字段 义乌高科技互联网推广 数据库日志修改有记录吗 荔湾区品质网络技术开发价格多少 数据库技术甲骨文落后了吗 ftp服务器开启多线程下载 上海启烨网络安全技术 实训总结计算机网络技术 未成年人网络安全防护网加固 浙江拓讯网络技术有限公司被告 郑州河马网络技术有限公司 公司内外网网络安全管理办法 深圳市纽邦网络技术有限公司 lamp数据库怎么删除 互联网科技公司怎么起名字 河北工业大学网络安全考什么 互联网 湖南科技大学 macpro做服务器好吗
0