
ActiveMQ定义

ActiveMQ是老牌的消息中间件,是第一代的消息中间件,它实现了Java Message Service (JMS) ,并提供了可靠的消息传递、发布-订阅模式和点对点模式等功能。
ActiveMQ作用
根据消息队列的特点,可以衍生出很多应用场景。
1)异步通信
注册时的短信、邮件通知,减少响应时间。
2)应用解耦
信息发送者和消息接受者无需耦合,比如调用第三方。
3)流量削峰
例如:秒杀系统。
ActiveMQ原理
1.Broker(代理)
ActiveMQ采用Broker架构,其中Broker是消息服务器的实例,负责接收、存储和传递消息。
Broker可以配置为主-从模式或集群模式,以提供高可用性和负载均衡的支持。
2.消息传输模式
- ActiveMQ支持点对点(Queue)和发布-订阅(Topic)两种消息传递模式。
- 在点对点模式下,消息发送到队列,并且只有一个消费者可以接收和处理该消息。
- 在发布-订阅模式下,消息发布到主题,然后所有订阅该主题的消费者都可以接收到该消息。
3.主题(Topic)
发布-订阅模式中用于一对多通信的目的地,消息被发布到主题并由所有订阅者接收。
4.持久性(Persistence)
消息可以持久化到磁盘,以防止消息丢失。
5.消费者(Consumer)
- 消费者(Consumer)通过订阅队列或主题,使用JMS API接收和处理消息。
- 消费者可以使用同步方式或异步方式接收消息,异步方式可以提高系统的吞吐量和响应性能。
6.生产者(Producer)
- 生产者(Producer)使用JMS API向ActiveMQ发送消息,可以指定目的地(Queue或Topic)和消息内容。
ActiveMQ使用
下面是一个使用ActiveMQ的简单示例,演示了如何发送和接收消息。
1.导入所需的依赖
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.16.2</version>
</dependency>
2.发送消息的代码示例
import javax.jms.*;
public class MessageSender {
public static void main(String[] args) {
try {
// 创建连接工厂
ConnectionFactory connectionFactory = new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616");
// 创建连接
Connection connection = connectionFactory.createConnection();
// 开启连接
connection.start();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地(队列或主题)
Destination destination = session.createQueue("myQueue");
// 创建消息生产者
MessageProducer producer = session.createProducer(destination);
// 创建消息
TextMessage message = session.createTextMessage();
message.setText("Hello, ActiveMQ!");
// 发送消息
producer.send(message);
// 关闭资源
producer.close();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
3.接收消息的代码示例
import javax.jms.*;
public class MessageReceiver {
public static void main(String[] args) {
try {
// 创建连接工厂
ConnectionFactory connectionFactory = new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616");
// 创建连接
Connection connection = connectionFactory.createConnection();
// 开启连接
connection.start();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地(队列或主题)
Destination destination = session.createQueue("myQueue");
// 创建消息消费者
MessageConsumer consumer = session.createConsumer(destination);
// 接收消息
Message message = consumer.receive();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Received message: " + textMessage.getText());
}
// 关闭资源
consumer.close();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
在示例中,发送者使用MessageSender类创建连接、会话和消息生产者,然后发送一条文本消息到名为myQueue的队列。
接收者使用MessageReceiver类创建连接、会话和消息消费者,然后从myQueue队列中接收消息,并将接收到的文本消息打印出来。
mikechen睿哥
10年+大厂架构经验,资深技术专家,就职于阿里巴巴、淘宝、百度等一线互联网大厂。