如何用MQTT协议实现消息的订阅接收?
发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,MQTT协议因低延迟、效率高在工业物联网领域使用的频率特别高。前文介绍了如何用代码发送MQTT消息,本文在前文的基础上实现MQTT消息的订阅接收。操作步骤:引入相关的依赖 org.springf
千家信息网最后更新 2025年01月24日如何用MQTT协议实现消息的订阅接收?
MQTT协议因低延迟、效率高在工业物联网领域使用的频率特别高。前文介绍了如何用代码发送MQTT消息,本文在前文的基础上实现MQTT消息的订阅接收。
操作步骤:
- 引入相关的依赖
org.springframework.boot spring-boot-starter-integration org.springframework.integration spring-integration-mqtt org.projectlombok lombok true
- 在application.yml配置MQTT服务器信息
server: port: 8090mqtt: host: tcp://127.0.0.1:1883 clientinid: mqttinId clientoutid: mqttoutid topic: virus qoslevel: 1 #MQTT 认证 username: xxx password: xxx timeout: 10000 #20s keepalive: 20
- 配置MQTT消息推送配置
package com.favccxx.mqtt.config;import lombok.extern.slf4j.Slf4j;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.integration.annotation.IntegrationComponentScan;import org.springframework.integration.annotation.ServiceActivator;import org.springframework.integration.channel.DirectChannel;import org.springframework.integration.core.MessageProducer;import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;import org.springframework.integration.mqtt.core.MqttPahoClientFactory;import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;import org.springframework.messaging.Message;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.MessageHandler;import org.springframework.messaging.MessagingException;@Slf4j@Configuration@IntegrationComponentScanpublic class MQTTReceiveConfig { @Value("${mqtt.username}") private String username; @Value("${mqtt.password}") private String password; @Value("${mqtt.host}") private String hostUrl; @Value("${mqtt.clientinid}") private String clientId; @Value("${mqtt.topic}") private String defaultTopic; @Value("${mqtt.timeout}") private int completionTimeout ; //连接超时 @Bean public MqttConnectOptions getReceiverMqttConnectOptions(){ MqttConnectOptions mqttConnectOptions=new MqttConnectOptions(); mqttConnectOptions.setCleanSession(true); mqttConnectOptions.setConnectionTimeout(10); mqttConnectOptions.setKeepAliveInterval(90); mqttConnectOptions.setAutomaticReconnect(true); mqttConnectOptions.setUserName(username); mqttConnectOptions.setPassword(password.toCharArray()); mqttConnectOptions.setServerURIs(new String[]{hostUrl}); mqttConnectOptions.setKeepAliveInterval(2); return mqttConnectOptions; } @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getReceiverMqttConnectOptions()); return factory; } //接收通道 @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } //配置client,监听的topic @Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId+"_inbound", mqttClientFactory(), defaultTopic); adapter.setCompletionTimeout(completionTimeout); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); return adapter; } //通过通道获取数据 @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message> message) throws MessagingException { log.info("主题:{},消息接收到的数据:{}", message.getHeaders().get("mqtt_receivedTopic"), message.getPayload()); } }; }}
消息
配置
数据
通道
前文
订阅
主题
代码
信息
基础
工业
效率
服务器
步骤
领域
频率
上实
延迟
推送
服务
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
不用数据库的留言板源码
襄阳靠谱的软件开发
公安局网络安全岗位身高要求
为什么要青少年网络安全教育
sql语句从数据库中提取数据
表格无法装载数据库
有数据库只读权限安全隐患
什么是网络技术发展的动力
电脑网络安全常识
pop3怎么查服务器
王者服务器无响应怎么办啊
远程oracle数据库安全性
超级固话网络安全说明
明日方舟各种服务器的区别
网站服务器防护
商城的软件开发
延庆区多功能网络技术客户至上
服务器自动开关机怎么固定时间
湖南服务器电源可以定制吗
手机变2g网还打不开数据库
mcu独立的综合管理服务器
网络安全大赛国家排名
村级网络安全工作怎没样开展
四川戴尔服务器续保价格
数据中心服务器供应公司
享赚钱软件开发
小学五年级网络安全班会教案
软件开发是技术服务
互联网科技巨头破圈靠什么
宁晋县网络安全和信息化