JMS消息队列ActiveMQ(发布/订阅模式)
发表于:2025-02-01 作者:千家信息网编辑
千家信息网最后更新 2025年02月01日,消费者1(Consumer)--订阅(subcribe)-->主题(Topic)package com.java1234.activemq2;import javax.jms.Connection;i
千家信息网最后更新 2025年02月01日JMS消息队列ActiveMQ(发布/订阅模式)
消费者1(Consumer)--订阅(subcribe)-->主题(Topic)
package com.java1234.activemq2;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageConsumer;import javax.jms.Session;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;/** * 消息消费者-消息订阅者一 * @author Administrator * */public class JMSConsumer { private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名 private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码 private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址 public static void main(String[] args) { ConnectionFactory connectionFactory; // 连接工厂 Connection connection = null; // 连接 Session session; // 会话 接受或者发送消息的线程 Destination destination; // 消息的目的地 MessageConsumer messageConsumer; // 消息的消费者 // 实例化连接工厂 connectionFactory=new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL); try { connection=connectionFactory.createConnection(); // 通过连接工厂获取连接 connection.start(); // 启动连接 session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session // destination=session.createQueue("FirstQueue1"); // 创建连接的消息队列 destination=session.createTopic("FirstTopic1"); messageConsumer=session.createConsumer(destination); // 创建消息消费者 messageConsumer.setMessageListener(new Listener()); // 注册消息监听 } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } }}
package com.java1234.activemq2;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage;/** * 消息监听-订阅者一 * @author Administrator * */public class Listener implements MessageListener{ @Override public void onMessage(Message message) { // TODO Auto-generated method stub try { System.out.println("订阅者一收到的消息:"+((TextMessage)message).getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } }}
消费者2(Consumer)--订阅(subcribe)-->主题(Topic)
生产者(Producer)--发布(publish)-->主题(Topic)
package com.java1234.activemq2;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;/** * 消息生产者-消息发布者 * @author Administrator * */public class JMSProducer { private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名 private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码 private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址 private static final int SENDNUM=10; // 发送的消息数量 public static void main(String[] args) { ConnectionFactory connectionFactory; // 连接工厂 Connection connection = null; // 连接 Session session; // 会话 接受或者发送消息的线程 Destination destination; // 消息的目的地 MessageProducer messageProducer; // 消息生产者 // 实例化连接工厂 connectionFactory=new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL); try { connection=connectionFactory.createConnection(); // 通过连接工厂获取连接 connection.start(); // 启动连接 session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建Session // destination=session.createQueue("FirstQueue1"); // 创建消息队列 destination=session.createTopic("FirstTopic1");//创建主题 messageProducer=session.createProducer(destination); // 创建消息生产者 sendMessage(session, messageProducer); // 发送消息 session.commit(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } finally{ if(connection!=null){ try { connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } /** * 发送消息 * @param session * @param messageProducer * @throws Exception */ public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{ for(int i=0;i
消息
工厂
订阅
消费者
消费
主题
生产者
生产
订阅者
队列
地址
实例
密码
用户
用户名
目的
目的地
线程
监听
发布者
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
出入库数据库
备用服务器
网络技术最牛的国家
论文数据库下架还符合毕业条件吗
数据库时间格式几种
上海挚品互联网科技有限公司
怎么看数据库表是否被锁住
数据库怎么发布到服务器上
市网络安全和信息化工作
信息网络安全许可证 百科
江干区学习软件开发
手游实况足球的服务器是什么
导致网络安全事件的原因
谷胱甘肽辅料数据库
添加的数据库语言
雀神麻将显示无法获得服务器地址
怎么查找赌博软件的服务器在哪
联想服务器引导盘安装
xp系统不信任服务器安全证书
数据库热点数据
达梦数据库修改表锁超时
无锡常规软件开发费用是多少
网络连接到服务器上怎么设置
大兴区卫星软件开发要求
斗鱼青少年网络安全研究中心
越花越有软件开发
北京光耀网络技术
搬瓦工的服务器
服务器ip域名
服务器托管的优势有哪些