RocketMQ是一款开源的分布式消息中间件,下面将详细介绍RocketMQ的核心概念和RocketMQ使用步骤@mikechen
RocketMQ核心概念
1. 主题(Topic)
主题是RocketMQ消息的逻辑分类,用于区分不同类型的消息。
一个主题可以有多个消息生产者(Producer)和消息消费者(Consumer)。
2. 生产者(Producer)
生产者是消息的发送方,负责将消息发送到RocketMQ的消息队列。
如下图所示:
生产者将消息发送到指定的主题,并且可以选择消息的标签(Tag)来进一步细分消息。
生产者可以是同步发送或异步发送消息,并且可以配置消息发送的超时时间。
3. 消费者(Consumer)
消费者是消息的接收方,负责从RocketMQ的消息队列中订阅并消费消息。
如下图所示:
消费者可以订阅一个或多个主题,并且可以选择订阅主题下的特定标签的消息。
4. Broker消息服务器
Broker消息服务器,作为Server提供消息核心服务, 它接收并存储Producer生产的消息。
如下图所示:
Broker的核心功能包含:
- 接收 Producer 发过来的消息;
- 处理 Consumer 的消费消息请求;
- 消息的持 久化存储;
- 消息的 HA 机制;
- 服务端过滤功能等 ;
5. 消息顺序
RocketMQ支持消息的严格有序和部分有序两种顺序保证方式。
严格有序要求同一主题下的所有消息都按照发送顺序进行消费,而部分有序要求同一消息队列下的消息按照发送顺序进行消费。
6. 延迟消息
RocketMQ支持延迟消息,即可以设置消息发送后延迟一定时间后才能被消费,这对于一些需要延迟处理的业务场景非常有用。
RocketMQ的使用示例
在了解完RocketMQ相关概念后,下面我们就可以开始使用了。
1.下载和安装RocketMQ
下载地址:https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.9.2/rocketmq-all-4.9.2-bin-release.zip
如下图:

2.启动NameServer和Broker
nohup sh bin/mqnamesrv &
以及Broker
nohup sh bin/mqbroker -n localhost:9876 &
3.创建生产消息
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.message.Message; public class MyProducer { public static void main(String[] args) throws Exception { // 创建生产者实例 DefaultMQProducer producer = new DefaultMQProducer("my_producer_group"); producer.setNamesrvAddr("localhost:9876"); // 启动生产者 producer.start(); // 创建消息 Message message = new Message("my_topic", "my_tag", "Hello, RocketMQ!".getBytes()); // 发送消息 SendResult sendResult = producer.send(message); if (sendResult.getSendStatus() == SendStatus.SEND_OK) { System.out.println("消息发送成功"); } // 关闭生产者 producer.shutdown(); } }
4.创建消费者
import org.apache.rocketmq.client.consumer.DefaultMQConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class MyConsumer { public static void main(String[] args) throws Exception { // 创建消费者实例 DefaultMQConsumer consumer = new DefaultMQConsumer("my_consumer_group"); consumer.setNamesrvAddr("localhost:9876"); // 订阅主题和标签 consumer.subscribe("my_topic", MessageSelector.byTag("my_tag")); // 注册消息监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) { for (MessageExt message : messages) { System.out.println("接收到消息:" + new String(message.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者 consumer.start();
后面就可以运行生产者和消费者的示例代码,生产者发送消息,消费者接收并处理消息。
这是一个基本的RocketMQ使用示例,您可以根据需要进行进一步的配置和调整。
此外,RocketMQ还提供了更多高级功能,如事务消息、顺序消息、消息过滤等,您可以参考官方文档来深入了解和使用这些功能。
更多RocketMQ内容请查看:RocketMQ最全详解(万字图文教程)
mikechen
陈睿|mikechen,10年+大厂架构经验,资深技术专家,就职于阿里巴巴、淘宝、百度等一线互联网大厂。
关注「mikechen」公众号,获知最新一线技术干货!
