ActiveMQ定义
ActiveMQ是老牌的消息中间件,是第一代的消息中间件,它实现了Java Message Service (JMS) ,并提供了可靠的消息传递、发布-订阅模式和点对点模式等功能。
ActiveMQ作用
根据消息队列的特点,可以衍生出很多应用场景。
1)异步通信
注册时的短信、邮件通知,减少响应时间。
2)应用解耦
信息发送者和消息接受者无需耦合,比如调用第三方。
3)流量削峰
例如:秒杀系统。
ActiveMQ原理
1.Broker(代理)
ActiveMQ采用Broker架构,其中Broker是消息服务器的实例,负责接收、存储和传递消息。
Broker可以配置为主-从模式或集群模式,以提供高可用性和负载均衡的支持。
2.消息传输模式
- ActiveMQ支持点对点(Queue)和发布-订阅(Topic)两种消息传递模式。
- 在点对点模式下,消息发送到队列,并且只有一个消费者可以接收和处理该消息。
- 在发布-订阅模式下,消息发布到主题,然后所有订阅该主题的消费者都可以接收到该消息。
3.主题(Topic)
发布-订阅模式中用于一对多通信的目的地,消息被发布到主题并由所有订阅者接收。
4.持久性(Persistence)
消息可以持久化到磁盘,以防止消息丢失。
5.消费者(Consumer)
- 消费者(Consumer)通过订阅队列或主题,使用JMS API接收和处理消息。
- 消费者可以使用同步方式或异步方式接收消息,异步方式可以提高系统的吞吐量和响应性能。
6.生产者(Producer)
- 生产者(Producer)使用JMS API向ActiveMQ发送消息,可以指定目的地(Queue或Topic)和消息内容。
ActiveMQ使用
下面是一个使用ActiveMQ的简单示例,演示了如何发送和接收消息。
1.导入所需的依赖
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.16.2</version> </dependency>
2.发送消息的代码示例
import javax.jms.*; public class MessageSender { public static void main(String[] args) { try { // 创建连接工厂 ConnectionFactory connectionFactory = new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616"); // 创建连接 Connection connection = connectionFactory.createConnection(); // 开启连接 connection.start(); // 创建会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建目的地(队列或主题) Destination destination = session.createQueue("myQueue"); // 创建消息生产者 MessageProducer producer = session.createProducer(destination); // 创建消息 TextMessage message = session.createTextMessage(); message.setText("Hello, ActiveMQ!"); // 发送消息 producer.send(message); // 关闭资源 producer.close(); session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } }
3.接收消息的代码示例
import javax.jms.*; public class MessageReceiver { public static void main(String[] args) { try { // 创建连接工厂 ConnectionFactory connectionFactory = new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616"); // 创建连接 Connection connection = connectionFactory.createConnection(); // 开启连接 connection.start(); // 创建会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建目的地(队列或主题) Destination destination = session.createQueue("myQueue"); // 创建消息消费者 MessageConsumer consumer = session.createConsumer(destination); // 接收消息 Message message = consumer.receive(); if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; System.out.println("Received message: " + textMessage.getText()); } // 关闭资源 consumer.close(); session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } }
在示例中,发送者使用MessageSender
类创建连接、会话和消息生产者,然后发送一条文本消息到名为myQueue
的队列。
接收者使用MessageReceiver
类创建连接、会话和消息消费者,然后从myQueue
队列中接收消息,并将接收到的文本消息打印出来。
陈睿mikechen
10年+大厂架构经验,资深技术专家,就职于阿里巴巴、淘宝、百度等一线互联网大厂。
关注「mikechen」公众号,获取更多技术干货!
后台回复【面试】即可获取《史上最全阿里Java面试题总结》,后台回复【架构】,即可获取《阿里架构师进阶专题全部合集》