千家信息网

ActiveMQ要入门什么

发表于:2025-02-04 作者:千家信息网编辑
千家信息网最后更新 2025年02月04日,本篇文章给大家分享的是有关ActiveMQ要入门什么,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。1. 发布消息import org.a
千家信息网最后更新 2025年02月04日ActiveMQ要入门什么

本篇文章给大家分享的是有关ActiveMQ要入门什么,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

1. 发布消息

import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class BookProducer implements Runnable{    public static final String BROKER_URL = "tcp://localhost:61616";    @Override    public void run() {        try {            //1.创建连接工厂,指定ip和端口            ConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);            //2.使用连接工厂创建一个连接对象            Connection connection = factory.createConnection();            //3.开启连接(JMS会话)            connection.start();            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);            //使用会话创建目的地            /**             * ① 点对点(Point-to-Point)。在点对点的消息系统中,消息分发给一个单独的使用者。点对点消息往往与队列(javax.jms.Queue)相关联。             * ② 发布/订阅(Publish/Subscribe)。发布/订阅消息系统支持一个事件驱动模型,消息生产者和消费者都参与消息的传递。生产者发布事件,                  而使用者订阅感兴趣的事件,并使用事件。该类型消息一般与特定的主题(javax.jms.Topic)关联。             */            Destination destination = session.createQueue("book-broker")            //创建生产者/消费者            MessageProducer producer = session.createProducer(destination);            // MessageConsumer consumer = session.createConsumer(destination);            //consumer.receive();            /**             * 创建消息,支持的消息类型:             *  TextMessage             *  MapMessage             *  ObjectMessage:对象需要实现序列化接口             *  BytesMessage             *  StreamMessage             */            Message message = session.createTextMessage("我是一个香蕉.......");            //发送消息            producer.send(message);            //释放资源            producer.close();            session.close();            connection.close();        } catch (JMSException e) {            e.printStackTrace();        }    }}

2. 接收消息

import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.JMSException;import javax.jms.Session;import javax.jms.TextMessage;public class BookConsumer implements Runnable {    @Override    public void run() {        try {            var connection = new ActiveMQConnectionFactory(BookProducer.BROKER_URL).createConnection();            connection.start();            /**             * connection.createSession(boolean transacted, int acknowledgeMode);             * transacted:是否使用事务             * acknowledgeMode:应答模式             *     AUTO_ACKNOWLEDGE:自动应答             *          对于同步消费者,receive方法调用返回,且没有异常发生时,将自动对收到的消息予以确认.             *          对于异步消息,当onMessage方法返回,且没有异常发生时,即对收到的消息自动确认.             *     CLIENT_ACKNOWLEDGE:客户端手动应答             *          这种方式要求客户端使用javax.jms.Message.acknowledge()方法完成确认.             *     DUPS_OK_ACKNOWLEDGE:延时//批量通知             *          这种确认方式允许JMS不必急于确认收到的消息,允许在收到多个消息之后一次完成确认,                       与Auto_AcKnowledge相比,这种确认方式在某些情况下可能更有效,                       因为没有确认,当系统崩溃或者网络出现故障的时候,消息可以被重新传递.             *     使用事务消息确认模式:             *     SESSION_TRANSACTED             */            var session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);            var consumer = session.createConsumer(session.createQueue("tmall-queue"));            var message = ((TextMessage)consumer.receive()).getText();            System.out.println(message);            session.close();            connection.close();    }}

或者设置监听器接收(消费者不用一直在线,监听到消息自动接收)

import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.JMSException;import javax.jms.Session;import javax.jms.TextMessage;public class BookConsumer implements Runnable {    @Override    public void run() {        try {            var connection = new ActiveMQConnectionFactory(BookProducer.BROKER_URL).createConnection();            connection.start();            var session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);            var consumer = session.createConsumer(session.createQueue("tmall-queue"));            consumer.setMessageListener(message -> {                try {                    System.out.println(((TextMessage) message).getText());                }catch (JMSException e){                    e.printStackTrace();                }            });        } catch (JMSException e) {            e.printStackTrace();        }    }}

3. java内嵌ActiveMQ,自动启动一个ActiveMQ,不需要Linux启动

import org.apache.activemq.broker.BrokerService;public class Broker {    //导入依赖compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.9.9'    public void producter(){        BrokerService brokerService = new BrokerService();        brokerService.setUseJmx(true);//设置Broker的服务是否应该公开给JMX        try {            brokerService.addConnector("tcp://localhost:61616");            brokerService.start();         } catch (Exception e) {            e.printStackTrace();        }    }}

以上就是ActiveMQ要入门什么,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注行业资讯频道。

0