千家信息网

activemq 编程式客户端

发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,dependency org.apache.activemq activemq-client ${activemq.version} org.apache.activemq
千家信息网最后更新 2025年02月03日activemq 编程式客户端

dependency

    org.apache.activemq    activemq-client    ${activemq.version}    org.apache.activemq    activemq-spring    ${activemq.version}

ActivemqProducerTest

public class ActivemqProducerTest {    private static final String producer_broker_url = "failover:(tcp://localhost:61617,tcp://localhost:61618,tcp://localhost:61619)";    private static final String username = "admin";    private static final String password = "admin123";    private static final String destination_queue = "queue.test01";    private static final String destination_topic = "topic.test01";    public static void main(String[] args) throws JMSException {        produceInQueue();        produceInTopic();    }    private static void produceInQueue() throws JMSException {        // activeMQ connection factory        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();        activeMQConnectionFactory.setBrokerURL(producer_broker_url);        activeMQConnectionFactory.setUserName(username);        activeMQConnectionFactory.setPassword(password);        Connection connection = activeMQConnectionFactory.createConnection();        connection.start();        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        ActiveMQQueue dest = new ActiveMQQueue(destination_queue);        TextMessage textMessage = session.createTextMessage();        textMessage.setText("this is a test...");        MessageProducer producer = session.createProducer(dest);        producer.setDeliveryMode(DeliveryMode.PERSISTENT);        producer.send(textMessage);        connection.close();    }    private static void produceInTopic() throws JMSException {        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();        activeMQConnectionFactory.setBrokerURL(producer_broker_url);        activeMQConnectionFactory.setUserName(username);        activeMQConnectionFactory.setPassword(password);        Connection connection = activeMQConnectionFactory.createConnection();        connection.start();        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        ActiveMQTopic dest = new ActiveMQTopic(destination_topic);        MapMessage mapMessage = session.createMapMessage();        mapMessage.setString("key01","val01");        mapMessage.setInt("key02", 2);        MessageProducer producer = session.createProducer(dest);        producer.setDeliveryMode(DeliveryMode.PERSISTENT);        producer.send(mapMessage);        connection.close();    }}
ActivemqCustomerTest
public class ActivemqCustomerTest {    private static final String producer_broker_url = "failover:(tcp://localhost:61617,tcp://localhost:61618,tcp://localhost:61619)";    private static final String username = "admin";    private static final String password = "admin123";    private static final String destination_queue = "queue.test01";    private static final String destination_topic = "topic.test01";    public static void main(String[] args) throws JMSException {        consumeTopic();        consumeTopic();        consumeQueue();    }    private static void consumeTopic() throws JMSException {        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();        activeMQConnectionFactory.setBrokerURL(producer_broker_url);        activeMQConnectionFactory.setUserName(username);        activeMQConnectionFactory.setPassword(password);        Connection connection = activeMQConnectionFactory.createConnection();        connection.setExceptionListener(new ExceptionListener(){            @Override            public void onException(JMSException exception) {                System.out.println("JMSException:"+exception.getMessage());            }        });        connection.start();        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        ActiveMQTopic dest = new ActiveMQTopic(destination_topic);        MessageConsumer consumer = session.createConsumer(dest);        consumer.setMessageListener(new MessageListener(){            @Override            public void onMessage(Message message) {                try {                    if (message instanceof TextMessage){                        System.out.println("message = [" + ((TextMessage) message).getText() + "]");                    }                    if (message instanceof MapMessage){                        MapMessage mapMessage = (MapMessage) message;                        System.out.println("key01 = [" + mapMessage.getString("key01") + "]");                        System.out.println("key02 = [" + mapMessage.getInt("key02") + "]");                    }                } catch (JMSException e) {                    e.printStackTrace();                }            }        });        //   connection.close();    }    private static void consumeQueue() throws JMSException {        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();        activeMQConnectionFactory.setBrokerURL(producer_broker_url);        activeMQConnectionFactory.setUserName(username);        activeMQConnectionFactory.setPassword(password);        Connection connection = activeMQConnectionFactory.createConnection();        connection.setExceptionListener(new ExceptionListener(){            @Override            public void onException(JMSException exception) {                System.out.println("JMSException:"+exception.getMessage());            }        });        connection.start();        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        ActiveMQQueue dest = new ActiveMQQueue(destination_queue);        MessageConsumer consumer = session.createConsumer(dest);        consumer.setMessageListener(new MessageListener(){            @Override            public void onMessage(Message message) {                try {                    if (message instanceof TextMessage){                        System.out.println("message = [" + ((TextMessage) message).getText() + "]");                    }                    if (message instanceof MapMessage){                        MapMessage mapMessage = (MapMessage) message;                        System.out.println("key01 = [" + mapMessage.getString("key01") + "]");                        System.out.println("key02 = [" + mapMessage.getInt("key02") + "]");                    }                } catch (JMSException e) {                    e.printStackTrace();                }            }        });        //   connection.close();    }}


集群配置地址:https://blog.51cto.com/881206524/2129863







0