RocketMQ使用详解(4大使用步骤)

RocketMQ使用详解(4大使用步骤)-mikechen

RocketMQ是一款开源的分布式消息中间件,下面将详细介绍RocketMQ的核心概念和RocketMQ使用步骤@mikechen

RocketMQ核心概念

1. 主题(Topic)

主题是RocketMQ消息的逻辑分类,用于区分不同类型的消息。

一个主题可以有多个消息生产者(Producer)和消息消费者(Consumer)。

 

2. 生产者(Producer)

生产者是消息的发送方,负责将消息发送到RocketMQ的消息队列。

如下图所示:

RocketMQ使用详解(4大使用步骤)-mikechen

生产者将消息发送到指定的主题,并且可以选择消息的标签(Tag)来进一步细分消息。

生产者可以是同步发送或异步发送消息,并且可以配置消息发送的超时时间。

 

3. 消费者(Consumer)

消费者是消息的接收方,负责从RocketMQ的消息队列中订阅并消费消息。

如下图所示:

RocketMQ使用详解(4大使用步骤)-mikechen

消费者可以订阅一个或多个主题,并且可以选择订阅主题下的特定标签的消息。

 

4. Broker消息服务器

Broker消息服务器,作为Server提供消息核心服务, 它接收并存储Producer生产的消息。

如下图所示:

RocketMQ使用详解(4大使用步骤)-mikechen

Broker的核心功能包含:

  1. 接收 Producer 发过来的消息;
  2. 处理 Consumer 的消费消息请求;
  3. 消息的持 久化存储;
  4. 消息的 HA 机制;
  5. 服务端过滤功能等 ;

 

5. 消息顺序

RocketMQ支持消息的严格有序和部分有序两种顺序保证方式。

严格有序要求同一主题下的所有消息都按照发送顺序进行消费,而部分有序要求同一消息队列下的消息按照发送顺序进行消费。

 

6. 延迟消息

RocketMQ支持延迟消息,即可以设置消息发送后延迟一定时间后才能被消费,这对于一些需要延迟处理的业务场景非常有用。

 

RocketMQ的使用示例

在了解完RocketMQ相关概念后,下面我们就可以开始使用了。

1.下载和安装RocketMQ

下载地址:https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.9.2/rocketmq-all-4.9.2-bin-release.zip

如下图:

RocketMQ使用详解(4大使用步骤)-mikechen

2.启动NameServer和Broker

nohup sh bin/mqnamesrv &

以及Broker

nohup sh bin/mqbroker -n localhost:9876 &

 

3.创建生产消息

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;

public class MyProducer {

    public static void main(String[] args) throws Exception {
        // 创建生产者实例
        DefaultMQProducer producer = new DefaultMQProducer("my_producer_group");
        producer.setNamesrvAddr("localhost:9876");

        // 启动生产者
        producer.start();

        // 创建消息
        Message message = new Message("my_topic", "my_tag", "Hello, RocketMQ!".getBytes());

        // 发送消息
        SendResult sendResult = producer.send(message);
        if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
            System.out.println("消息发送成功");
        }

        // 关闭生产者
        producer.shutdown();
    }
}

 

4.创建消费者

import org.apache.rocketmq.client.consumer.DefaultMQConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class MyConsumer {

    public static void main(String[] args) throws Exception {
        // 创建消费者实例
        DefaultMQConsumer consumer = new DefaultMQConsumer("my_consumer_group");
        consumer.setNamesrvAddr("localhost:9876");

        // 订阅主题和标签
        consumer.subscribe("my_topic", MessageSelector.byTag("my_tag"));

        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    System.out.println("接收到消息:" + new String(message.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();

后面就可以运行生产者和消费者的示例代码,生产者发送消息,消费者接收并处理消息。

这是一个基本的RocketMQ使用示例,您可以根据需要进行进一步的配置和调整。

此外,RocketMQ还提供了更多高级功能,如事务消息、顺序消息、消息过滤等,您可以参考官方文档来深入了解和使用这些功能。

更多RocketMQ内容请查看:RocketMQ最全详解(万字图文教程)

作者简介

陈睿|mikechen,10年+大厂架构经验,BAT资深面试官,就职于阿里巴巴、淘宝、百度等一线互联网大厂。

👇阅读更多mikechen架构文章👇

阿里架构 |双11秒杀 |分布式架构 |负载均衡 |单点登录 |微服务 |云原生 |高并发 |架构师

以上

关注作者「mikechen」公众号,获取更多技术干货!

后台回复架构,即可获取《阿里架构师进阶专题全部合集》,后台回复面试即可获取《史上最全阿里Java面试题总结

评论交流
    说说你的看法