千家信息网

如何通过InfluxDB来存储相关的信息

发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,本篇内容主要讲解"如何通过InfluxDB来存储相关的信息",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"如何通过InfluxDB来存储相关的信息"吧!In
千家信息网最后更新 2025年02月03日如何通过InfluxDB来存储相关的信息

本篇内容主要讲解"如何通过InfluxDB来存储相关的信息",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"如何通过InfluxDB来存储相关的信息"吧!

InfluxDB 是一个由 InfluxData 开发的开源时序型数据库。 它由 Go 写成,着力于高性能地查询与存储时序型数据,相比上一期中介绍的 OpenTSDB 数据库 InfluxDB 较为轻量,在 InfluxData 官方给出的各项指标基准测试用 InfluxDB 都强于 OpenTSDB。

面对大规模快速增长的物联网传感器采集、交易记录等数据,时间序列数据累计速度非常快,时序数据库通过提高效率来处理这种大规模数据,并带来性能的提升,包括:更高的容纳率(Ingest Rates)、更快的大规模查询(尽管有一些比其他数据库支持更多的查询)以及更好的数据压缩。

安装与验证 InfluxDB 服务器

下载安装 InfluxDB 服务器,本文使用 InfluxDB 1.7 版本。

配置 EMQ X 服务器

通过 RPM 方式安装的 EMQ X,InfluxDB 相关的配置文件位于 /etc/emqx/plugins/emqx_backend_influxdb.conf,考虑到功能定位,InfluxDB 插件仅支持消息存储功能。

配置连接地址与连接池大小:

## InfluxDB UDP Server## 仅使用 UDP 接入backend.influxdb.pool1.server = 127.0.0.1:8089## InfluxDB Pool Sizebackend.influxdb.pool1.pool_size = 5## Whether or not set timestamp when encoding InfluxDB linebackend.influxdb.pool1.set_timestamp = trues

**InfluxDB Backend 消息存储规则参数: **

通过 topic 过滤器,设置需要存储消息的主题,pool 参数区别多个数据源:

## Store Publish Messagebackend.influxdb.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

启动该插件,启动插件的方式有 命令行控制台两种方式,用户可以任选其一。

消息模板

由于 MQTT Message 无法直接写入 InfluxDB, InfluxDB Backend 提供了 emqx_backend_influxdb.tmpl 模板文件将 MQTT Message 转换为可写入 InfluxDB 的 DataPoint。

消息模板功能需要重启 EMQ X 才能应用更改。

tmpl 文件位于 data/templates/emqx_backend_influxdb_example.tmpl,使用 json 格式, 用户可以为不同 Topic 定义不同的 Template, 类似:

{    "timestamp":                 "measurement": ,    "tags": {        :     },                "fields": {            :     }}

其中, measurement 与 fields 为必选项, tags 与 timestamp 为可选项。 支持通过占位符如 $key 提取变量名为 key 的变量,支持的变量如下:

  • qos: 消息 QoS

  • form: 发布者信息

  • topic: 发布主题

  • timestamp: 时间戳

  • payload.*: JSON 消息体内任意变量,如 { "data": [{ "temp": 1 }] } 使用 ["$payload", "data", "temp"] 可以提取出 1

本示例设定模板如下:

{    "sample": {        "measurement": "$topic",        "tags": {            "host": ["$payload", "data", "$0", "host"],            "region": ["$payload", "data", "$0", "region"],            "qos": "$qos",            "from": "$from"        },        "fields": {            "temperature": ["$payload", "data", "$0", "temp"]        },        "timestamp": "$timestamp"    }}

当 Topic 为 "sample" 的 MQTT Message 拥有以下 Payload 时:

{  "data": [    {      "temp": 1,      "host": "serverA",      "region": "hangzhou"    },    {      "temp": 2,      "host": "serverB",      "region": "ningbo"    }  ]}

Backend 会将 MQTT Message 转换为:

[  {    "measurement": "sample",    "tags": {      "from": "mqttjs_ebcc36079a",      "host": "serverA",      "qos": "0",      "region": "hangzhou"    },    "fields": {      "temperature": "1"    },    "timestamp": "1560743513626681000"  },  {    "measurement": "sample",    "tags": {      "from": "mqttjs_ebcc36079a",      "host": "serverB",      "qos": "0",      "region": "ningbo"    },    "fields": {      "temperature": "2"    },    "timestamp": "1560743513626681000"  }]

使用示例

EMQ X 管理控制台 WebSocket 页面中,向 sample 主题发布如上格式消息消息,消息将解析存储到 InfluxDB udp 数据库对应的 measurement 中。

到此,相信大家对"如何通过InfluxDB来存储相关的信息"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

0