千家信息网

EMQ X 规则引擎中如何存储消息到MongoDB数据库

发表于:2024-09-22 作者:千家信息网编辑
千家信息网最后更新 2024年09月22日,这期内容当中小编将会给大家带来有关EMQ X 规则引擎中如何存储消息到MongoDB数据库,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。MongoDB 介绍非关系数
千家信息网最后更新 2024年09月22日EMQ X 规则引擎中如何存储消息到MongoDB数据库

这期内容当中小编将会给大家带来有关EMQ X 规则引擎中如何存储消息到MongoDB数据库,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

MongoDB 介绍

非关系数据库(NoSQL) 用于超大规模数据的存储,例如谷歌或 Facebook 每天为他们的用户收集万亿比特的数据。这些类型的数据存储不需要固定的模式,无需多余操作就可以横向扩展。

MongoDB 是一个介于关系数据库和非关系数据库之间的产品,是非关系数据库当中功能最丰富,最像关系数据库的。MongoDB 由 C++ 语言编写的,是一个基于分布式文件存储的开源数据库系统,MongoDB 旨在为数据存储提供可扩展的高性能数据存储解决方案,在高负载的情况下,可以轻松添加更多的节点保证服务性能。

MongoDB 将数据存储为一个文档,数据结构由键值 (key=>value) 对组成。MongoDB 文档类似于 JSON 对象。字段值可以包含其他文档,数组及文档数组。

MongoDB 下载地址:https://www.mongodb.com/download-center/community

场景介绍

该场景需要将 EMQ X 指定主题下满足某条件的消息存储到 MongoDB 数据库。为了便于后续分析检索,消息内容需要进行拆分存储。

该场景下设备端上报信息如下:

  • 上报主题:cmd/state/:id,主题中 id 代表车辆客户端识别码

  • 消息体:

    {  "id": "NXP-058659730253-963945118132721-22", // 客户端识别码  "speed": 32.12, // 车辆速度  "direction": 198.33212, // 行驶方向  "tachometer": 3211, // 发动机转速,数值大于 8000 时才需存储  "dynamical": 8.93, // 瞬时油耗  "location": { // GPS 经纬度数据    "lng": 116.296011,    "lat": 40.005091  },  "ts": 1563268202 // 上报时间}


当上报数据发动机转速数值大于 8000 时,存储当前信息以便后续分析用户车辆使用情况。

准备工作

创建管理用户

先使用具有创建用户的权限的账号登录 MongoDB,并为 emqx_rule_engine_output 添加用户:

> use emqx_rule_engine_output;> db.createUser({user: "root", pwd: "public", roles: [{role: "readWrite", db: "emqx_rule_engine_output"}]});

创建数据表

使用新的用户登入,并创建数据集 use_statistics

$ mongo 127.0.0.1/emqx_rule_engine_output -uroot -ppublic> db.createCollection("use_statistics");

创建成功后确认数据表是否存在:

> show collectionsuse_statistics

配置说明

创建资源

打开 EMQ X Dashboard,进入左侧菜单的 资源 页面,点击 新建 按钮,选择 MongoDB 资源类型进行创建:

EMQ X 集群中节点所在网络环境可能互不相同,资源创建成功后点击列表中 状态按钮,查看各个节点资源连接状况,如果节点上资源不可用,请检查配置是否正确、网络连通性,并点击 重连 按钮手动重连。

创建规则

进入左侧菜单的 规则 页面,点击 新建 按钮,进行规则创建。这里选择触发事件 消息发布,在消息发布时触发该规则进行数据处理。

选定触发事件后,我们可在界面上看到可选字段及示例 SQL:

筛选所需字段

规则引擎使用 SQL 语句处理规则条件,该业务中我们需要将 payload 中所有字段单独选择出来,使用 payload. 格式进行选择,还需要消息上下文的 topicqosid 信息,当前 SQL 如下:

SELECT  payload.id as client_id, payload.speed as speed,   payload.tachometer as tachometer,  payload.ts as ts, idFROM  "message.publish"WHERE  topic =~ 't/#'
确立筛选条件

使用 SQL 语句 WHERE 字句进行条件筛选,该业务中我们需要定义两个条件:

  • 仅处理 cmd/state/:id 主题,使用主题通配符 =~topic 进行筛选:topic =~ 'cmd/state/+'

  • 仅处理 tachometer > 8000 的消息,使用比较符对 tachometer 进行筛选:payload.tachometer > 8000

组合上一步骤得到 SQL 如下:

SELECT  payload.id as client_id, payload.speed as speed,   payload.tachometer as tachometer,  payload.ts as ts,  idFROM  "message.publish"WHERE  topic =~ 'cmd/state/+'  AND payload.tachometer > 8000
使用 SQL 测试功能进行输出测试

借助 SQL 测试功能,我们可以实时查看当前 SQL 处理后的数据输出,该功能需要我们指定 payload 等模拟原始数据。

payload 数据如下,记得更改 tachometer 数值大小,以满足 SQL 条件:

{  "id": "NXP-058659730253-963945118132721-22",  "speed": 32.12,  "direction": 198.33212,  "tachometer": 9001,  "dynamical": 8.93,  "location": {    "lng": 116.296011,    "lat": 40.005091  },  "ts": 1563268202}

点击 SQL 测试 切换按钮,更改 topicpayload 为场景中的信息,点击 测试 按钮查看数据输出:

测试输出数据为:

{  "client_id": "NXP-058659730253-963945118132721-22",  "id": "589A429E9572FB44B0000057C0001",  "speed": 32.12,  "tachometer": 9001,  "ts": 1563268202}

测试输出与预期相符,我们可以进行后续步骤。

添加响应动作,存储消息到 MongoDB

SQL 条件输入输出无误后,我们继续添加响应动作,配置写入 SQL 语句,将筛选结果存储到 MongoDB。

点击响应动作中的 添加 按钮,选择 保存数据到 MongoDB 动作,选取刚刚选定的资源,我们使用 ${fieldName} 语法填充操作语句,将数据插入到数据库,最后点击 新建 按钮完成规则创建。

Collection 配置为: use_statistics

Selector 配置为:

msgid=${id}, client_id=${client_id}, speed=${speed}, tachometer=${tachometer}, ts=${ts}

测试

预期结果

我们成功创建了一条规则,包含一个处理动作,动作期望效果如下:

  1. 设备向 cmd/state/:id 主题上报消息时,当消息中的 tachometer 数值超过 8000 时将命中 SQL,规则列表中 已命中 数字增加 1;

  2. MongoDB emqx_rule_engine_output 数据库的 use_statistics 表中将增加一条数据,数值与当前消息一致。

使用 Dashboard 中的 Websocket 工具测试

切换到 工具 => Websocket 页面,使用任意信息客户端连接到 EMQ X,连接成功后在 消息 卡片发送如下信息:

  • 主题:cmd/state/NXP-058659730253-963945118132721-22

  • 消息体:

    {  "id": "NXP-058659730253-963945118132721-22",  "speed": 32.12,  "direction": 198.33212,  "tachometer": 8081,  "dynamical": 8.93,  "location": {    "lng": 116.296011,    "lat": 40.005091  },  "ts": 1563268202}


点击 发送 按钮,此时消息体中的 tachometer 数值,满足上面设置的 tachometer > 8000 的条件,当前规则已命中统计值为加 1。

MongoDB 命令行中查看数据表记录得到数据如下:

至此,我们通过规则引擎实现了使用规则引擎存储消息到 MongoDB 数据库的业务开发。

上述就是小编为大家分享的EMQ X 规则引擎中如何存储消息到MongoDB数据库了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注行业资讯频道。

数据 消息 存储 规则 数据库 按钮 测试 条件 主题 资源 信息 动作 数值 用户 处理 输出 引擎 选择 配置 成功 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 安卓编程读取数据库表 全民健身数据库 二维视图软件开发工具 常见的售后服务网络技术方法 天水力天软件开发有限责任公司 服务器上的hdd按钮 五环智控软件开发 十四五 网络安全 技术发展 慕华网络技术有限公司 东亚环境数据库 工业网络安全创业 DB2数据库归档日志 手机服务器错误代码 网络安全和网络安全大赛 网络安全人员是什么编制 公安局网络安全警察岗位怎样 网络安全和保密工作检查 反编译 查看手游数据库 如何连接蜂窝破解器的服务器 江苏互联网软件开发销售价格 深圳大合网络技术有限公司成 视图在数据库中是什么模式 非人学园服务器叫什么名字 软件开发工程师证难考吗 原神ps5港版是什么服务器 软件开发属于哪个主营业务类别 软件开发项目实施组织方案 软件开发项目对环境影响 局域网服务器密码忘记怎么办 芜湖在线教育平台软件开发
0