RabbitMQ使用详解(5种使用模式实例)

RabbitMQ使用详解(5种使用模式实例)-mikechen

RabbitMQ使用主要包含如下5种,下面重点详解5种RabbitMQ使用方式,非常详细。

简单模式

简单模式:主要就是一个producer发送消息,一个接收者接收消息。

模型如下图所示:

RabbitMQ使用详解(5种使用模式实例)-mikechen

简单模式的应用场景:手机短信、邮件单发。

使用示例:

1.创建RabbitMQ服务工具类

主要用于与RabbitMQ建立连接,代码如下:

  /**
     * 获取连接
     * @return Connection
     * @throws Exception
     */
public class ConnectionUtil {
    public static Connection getConnection() throws Exception {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //设置RabbitMQ所在主机ip,或者主机名
        factory.setHost("localhost");

        //端口
        factory.setPort(5672);

        //设置账号信息用户名与密码
        factory.setUsername("admin");
        factory.setPassword("admin");

        //通过工厂获取连接
        Connection connection = factory.newConnection();

        return connection;
    }
}

 

2.RabbitMQ生产消息

代码如下:

//创建队列 发送消息
public static void main(String[] args) throws Exception {
    private static final String QUEUE_NAME = "basic_queue";
    public static void main(String[] args) throws Exception {
        //消息发送端,与RabbitMQ服务创建连接
        Connection connection = ConnectionUtil.getConnection();

        //创建一个频道
        Channel channel = connection.createChannel();

        //指定一个队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //发送的消息
        String message = "hello world";

       //往队列中发出一条消息
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("生产者已发送:" + message);

        //关闭频道和连接
        channel.close();
        connection.close();
    }
}

RabbitMQ生成端,主要就是5大流程:

1)producer生产发送消息;

2)创建一个频道(channel );

3)制定一个队列(queue);

4)往队列(queue)发送消息(message);

5)关闭频道和连接;

 

2.RabbitMQ消费消息

代码如下:

//消费者消费消息
public static void main(String[] args) throws Exception {
    private static final String QUEUE_NAME = "basic_queue";
    public static void main(String[] args) throws Exception {
        //消息消费者与mq服务建立连接
        Connection connection = ConnectionUtil.getConnection();
        //建立通道
        Channel channel = connection.createChannel();

        //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //创建消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                // body 即消息体
                String msg = new String(body);
                System.out.println("消费者1接收到消息:" + msg);
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

RabbitMQ消费贷,主要就是如下流程:

1)与RabbitMQ服务端建立连接;

2)建立通道;

3)声明队列;

4)接收消息;

 

工作队列模式

在简单模式中,一个生产者对应一个消费者,而实际生产过程中有多个消费者接收消息,这就是工作队列模式。

工作队列模式模型,如下图所示:

RabbitMQ使用详解(5种使用模式实例)-mikechen

默认情况下,RabbitMQ将按顺序将每条消息发送给下一个消费者,但是一条消息只会被一个消费者获取。

1.创建RabbitMQ连接工具类

  /**
     * 获取连接
     * @return Connection
     * @throws Exception
     */
public class ConnectionUtil {
    public static Connection getConnection() throws Exception {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //设置RabbitMQ所在主机ip,或者主机名
        factory.setHost("localhost");

        //端口
        factory.setPort(5672);

        //设置账号信息用户名与密码
        factory.setUsername("admin");
        factory.setPassword("admin");

        //通过工厂获取连接
        Connection connection = factory.newConnection();

        return connection;
    }
}

 

2.RabbitMQ生产消息:发送10条消息

代码如下:

//消息生产者 
public static void main(String[] args) throws Exception {
    private static final String QUEUE_NAME = "work_queue";
    public static void main(String[] args) throws Exception {

        //创建连接 
        Connection connection = ConnectionUtil.getConnection();
        
        //创建频道 
        Channel channel = connection.createChannel();
        
        //指定队列 
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 循环发布任务
        for (int i = 1; i <= 10; i++) {
            // 消息内容
            String message = "task .. " + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("生产者发送消息:" + message);
            Thread.sleep(500);
        }
        
        //关闭频道和连接 
        channel.close();
        connection.close();
    }
}

RabbitMQ将按顺序将每条消息发送给下一个消费者。

 

3.创建两个消费者(代码相同)

代码如下:

public class Consumer1 {
    private static final String QUEUE_NAME = "work_queue";
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        /**
         * 创建队列Queue
         * 参数:
         * 1.queue:队列名称
         * 2.durable:是否持久化,当mq重启之后是否还在
         * 3.exclusive:是否独占,只能有一个消费者监听这个队列    当connection关闭时是否=删除队列
         * 4.aotoDelete:是否自动删除,当没有接收端时是否自动删除 null
         * 5.arguments:参数
         */
        //如果没有hello_world的队列会自动创建,有就不会创建
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //接收消息
        /**
         * 1.queue:队列名称
         * 2.aotoACK:是否自动确认
         * 3.callback:回调对象
         */
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * 回调方法。当收到消息之后会自动执行该方法
             * @param consumerTag 标识
             * @param envelope 获取一些信息,交换机,路由Key
             * @param properties 配置信息
             * @param body 数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body = " + new String(body));
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

当两个消费者都工作时,生成者发送消息,就会按照负载均衡算法分配给不同消费者。

 

发布订阅模式

在订阅模式中,可以实现一条消息被多个消费者获取,模型如下图所示:

RabbitMQ使用详解(5种使用模式实例)-mikechen

这里相比较前面两种模型,多出了一个交换机(Exchange)。

交换机只负责转发消息,不具备存储消息的能力,没有队列绑定交换机,或者没有路由规则的队列,消息将丢失。

交换机有三种:

1)Fanout 广播:将消息交给所有绑定到交换机的队列;

2)Diect 定向:把消息交给符合指定的key的队列;

3)Topic 通配符:把消息交给符合routing pattern(路由模式的队列);

 

流程如下:

1)生产者将消息发送给交换机;

2)交换机将消息分散到队列中;

3)然后消费者在对应的队列中消费;

代码示例:

public static final String EXCHANGE_NAME = "test_exchange_fanout";
    //生产者,发送消息到交换机
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        //声明交换机 fanout:交换机类型 主要有fanout,direct,topics三种
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
        System.out.println(message);
        channel.close();
        connection.close();
    }

    //消费者1
    public final static String QUEUE_NAME = "test_queue_exchange_1";
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //绑定队列到交换机上
        channel.queueBind(QUEUE_NAME,Send.EXCHANGE_NAME,"");
        channel.basicQos(1);
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME,true,consumer);
        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(message);
        }
    }

    //消费者2
    public final static String QUEUE_NAME = "test_queue_exchange_2";
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //绑定队列到交换机上
        channel.queueBind(QUEUE_NAME,Send.EXCHANGE_NAME,"");
        channel.basicQos(1);
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME,true,consumer);

        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(message);
        }
    }

 

路由模式

在路由模式(Direct)中,主要是根据路由规则(Routing Key)将消息发送到指定的队列。

模型如下图所示:

RabbitMQ使用详解(5种使用模式实例)-mikechen

队列在与交换机绑定时会设定一个Routing Key,而生产者发送的消息时也需要携带一个Routing Key。

1.RabbitMQ生成消息

代码如下:

public class Send {
    private static final String EXCHANGE_NAME = "direct_exchange";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明exchange,指定类型为direct
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        String message = "新增一个订单";
        //生产者发送消息时,设置消息的Routing Key:"insert"
        channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes());
        System.out.println("生产者发送消息:" + message);
        channel.close();
        connection.close();
    }
}

 

2.RabbitMQ消费者

代码如下:

public class Consumer1 {
    private static final String QUEUE_NAME = "direct_queue_1";
    private static final String EXCHANGE_NAME = "direct_exchange";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        //消费者声明自己的队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //消费者将队列与交换机进行绑定,并且设置Routing Key:"insert"
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException
            {
                String msg = new String(body);
                System.out.println("消费者1获取到消息:" + msg);
            }
        });
    }
}

 

Topic通配符模式

通配符模式:生产者生产消息发送至交换机,交换机通过通配符将消息发送给能匹配上的队列,再由订阅队列的消费者进行消费。

模型如下图所示:

RabbitMQ使用详解(5种使用模式实例)-mikechen

模糊匹配路由规则,多个队列,多个消费者。

代码示例:

1.RabbitMQ生产消息

代码如下:

public class Producer_Topics {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接
        Connection connection = RabbitUtils.getConnection();
 
        //创建channel 信道
        Channel channel = connection.createChannel();
        //创建交换机
        /**
         * String exchange,交换机名称
         * BuiltinExchangeType type, 交换机类型 @See BuiltinExchangeType 定向 广播 通配符 参数匹配
         * boolean durable, 是否持久化
         * boolean autoDelete, 是否自动删除
         * internal 内部使用 一般是false
         * Map<String, Object> arguments 参数列表
         */
        String ExchangeName = "test_topic";
        channel.exchangeDeclare(ExchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
        //创建队列
        String queue1Name = "test_topic_queue1";
        String queue2Name = "test_topic_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);
        //绑定队列和交换机
        /**
         * String queue, 队列名称
         * String exchange, 交换机名称
         * String routingKey 路由key
         */
        //队列一绑定所有以error为结尾的日志,和绑定order后面所有的日志
        channel.queueBind(queue1Name,ExchangeName,"#.error");
        channel.queueBind(queue1Name,ExchangeName,"order.*");
        //队列二绑定所有级别的日志
        channel.queueBind(queue2Name,ExchangeName,"*.*");
        //发送消息
        String body = "日志信息:张三调用了delete方法  日志级别为:order.info";
        //参数二指定绑定的队列
        channel.basicPublish(ExchangeName,"order.info",null,body.getBytes());
        //释放资源
        RabbitUtils.closeConnectionAndchannel(channel,connection);
    }

 

2.RabbitMQ消费责一

代码如下:

public class Consumer_Topic1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitUtils.getConnection();
        //创建channel
        Channel channel = connection.createChannel();
 
        String queue1Name = "test_topic_queue1";
 
        //接收消息
        /**
         * 1.queue:队列名称
         * 2.aotoACK:是否自动确认
         * 3.callback:回调对象
         */
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * 回调方法。当收到消息之后会自动执行该方法
             * @param consumerTag 标识
             * @param envelope 获取一些信息,交换机,路由Key
             * @param properties 配置信息
             * @param body 数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body = " + new String(body));
                System.out.println("将日志信息保存至数据库");
            }
        };
        //接收队列一
        channel.basicConsume(queue1Name, true, consumer);
 
 
    }
 
 
}

 

3.RabbitMQ消费责二

代码如下:

public class Consumer_Topic2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitUtils.getConnection();
        //创建channel
        Channel channel = connection.createChannel();
 
        String queue2Name = "test_topic_queue2";
 
        //接收消息
        /**
         * 1.queue:队列名称
         * 2.aotoACK:是否自动确认
         * 3.callback:回调对象
         */
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * 回调方法。当收到消息之后会自动执行该方法
             * @param consumerTag 标识
             * @param envelope 获取一些信息,交换机,路由Key
             * @param properties 配置信息
             * @param body 数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body = " + new String(body));
                System.out.println("将日志信息保存至数据库");
            }
        };
        //接收队列一
        channel.basicConsume(queue2Name, true, consumer);
 
 
    }
 
 
}

通配符和定向相比,通配符更加灵活。

陈睿mikechen

10年+大厂架构经验,资深技术专家,就职于阿里巴巴、淘宝、百度等一线互联网大厂。

关注「mikechen」公众号,获取更多技术干货!

后台回复面试即可获取《史上最全阿里Java面试题总结》,后台回复架构,即可获取《阿里架构师进阶专题全部合集

评论交流
    说说你的看法