千家信息网

如何用MQTT协议实现消息的订阅接收?

发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,MQTT协议因低延迟、效率高在工业物联网领域使用的频率特别高。前文介绍了如何用代码发送MQTT消息,本文在前文的基础上实现MQTT消息的订阅接收。操作步骤:引入相关的依赖 org.springf
千家信息网最后更新 2025年01月24日如何用MQTT协议实现消息的订阅接收?

MQTT协议因低延迟、效率高在工业物联网领域使用的频率特别高。前文介绍了如何用代码发送MQTT消息,本文在前文的基础上实现MQTT消息的订阅接收。
操作步骤:

  1. 引入相关的依赖
    org.springframework.boot    spring-boot-starter-integration    org.springframework.integration    spring-integration-mqtt    org.projectlombok    lombok    true
  1. 在application.yml配置MQTT服务器信息
server:  port: 8090mqtt:  host: tcp://127.0.0.1:1883  clientinid: mqttinId  clientoutid: mqttoutid  topic: virus  qoslevel: 1  #MQTT 认证  username: xxx  password: xxx  timeout: 10000  #20s  keepalive: 20
  1. 配置MQTT消息推送配置
package com.favccxx.mqtt.config;import lombok.extern.slf4j.Slf4j;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.integration.annotation.IntegrationComponentScan;import org.springframework.integration.annotation.ServiceActivator;import org.springframework.integration.channel.DirectChannel;import org.springframework.integration.core.MessageProducer;import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;import org.springframework.integration.mqtt.core.MqttPahoClientFactory;import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;import org.springframework.messaging.Message;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.MessageHandler;import org.springframework.messaging.MessagingException;@Slf4j@Configuration@IntegrationComponentScanpublic class MQTTReceiveConfig {    @Value("${mqtt.username}")    private String username;    @Value("${mqtt.password}")    private String password;    @Value("${mqtt.host}")    private String hostUrl;    @Value("${mqtt.clientinid}")    private String clientId;    @Value("${mqtt.topic}")    private String defaultTopic;    @Value("${mqtt.timeout}")    private int completionTimeout ;   //连接超时    @Bean    public MqttConnectOptions getReceiverMqttConnectOptions(){        MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();        mqttConnectOptions.setCleanSession(true);        mqttConnectOptions.setConnectionTimeout(10);        mqttConnectOptions.setKeepAliveInterval(90);        mqttConnectOptions.setAutomaticReconnect(true);        mqttConnectOptions.setUserName(username);        mqttConnectOptions.setPassword(password.toCharArray());        mqttConnectOptions.setServerURIs(new String[]{hostUrl});        mqttConnectOptions.setKeepAliveInterval(2);        return mqttConnectOptions;    }    @Bean    public MqttPahoClientFactory mqttClientFactory() {        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();        factory.setConnectionOptions(getReceiverMqttConnectOptions());        return factory;    }    //接收通道    @Bean    public MessageChannel mqttInputChannel() {        return new DirectChannel();    }    //配置client,监听的topic    @Bean    public MessageProducer inbound() {        MqttPahoMessageDrivenChannelAdapter adapter =                new MqttPahoMessageDrivenChannelAdapter(clientId+"_inbound", mqttClientFactory(),                        defaultTopic);        adapter.setCompletionTimeout(completionTimeout);        adapter.setConverter(new DefaultPahoMessageConverter());        adapter.setQos(1);        adapter.setOutputChannel(mqttInputChannel());        return adapter;    }    //通过通道获取数据    @Bean    @ServiceActivator(inputChannel = "mqttInputChannel")    public MessageHandler handler() {        return new MessageHandler() {            @Override            public void handleMessage(Message message) throws MessagingException {                log.info("主题:{},消息接收到的数据:{}", message.getHeaders().get("mqtt_receivedTopic"), message.getPayload());            }        };    }}
消息 配置 数据 通道 前文 订阅 主题 代码 信息 基础 工业 效率 服务器 步骤 领域 频率 上实 延迟 推送 服务 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 不用数据库的留言板源码 襄阳靠谱的软件开发 公安局网络安全岗位身高要求 为什么要青少年网络安全教育 sql语句从数据库中提取数据 表格无法装载数据库 有数据库只读权限安全隐患 什么是网络技术发展的动力 电脑网络安全常识 pop3怎么查服务器 王者服务器无响应怎么办啊 远程oracle数据库安全性 超级固话网络安全说明 明日方舟各种服务器的区别 网站服务器防护 商城的软件开发 延庆区多功能网络技术客户至上 服务器自动开关机怎么固定时间 湖南服务器电源可以定制吗 手机变2g网还打不开数据库 mcu独立的综合管理服务器 网络安全大赛国家排名 村级网络安全工作怎没样开展 四川戴尔服务器续保价格 数据中心服务器供应公司 享赚钱软件开发 小学五年级网络安全班会教案 软件开发是技术服务 互联网科技巨头破圈靠什么 宁晋县网络安全和信息化
0