
RabbitMQ使用主要包含如下5种,下面重点详解5种RabbitMQ使用方式,非常详细。
简单模式
简单模式:主要就是一个producer发送消息,一个接收者接收消息。
模型如下图所示:

简单模式的应用场景:手机短信、邮件单发。
使用示例:
1.创建RabbitMQ服务工具类
主要用于与RabbitMQ建立连接,代码如下:
/**
* 获取连接
* @return Connection
* @throws Exception
*/
public class ConnectionUtil {
public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ所在主机ip,或者主机名
factory.setHost("localhost");
//端口
factory.setPort(5672);
//设置账号信息用户名与密码
factory.setUsername("admin");
factory.setPassword("admin");
//通过工厂获取连接
Connection connection = factory.newConnection();
return connection;
}
}
2.RabbitMQ生产消息
代码如下:
//创建队列 发送消息
public static void main(String[] args) throws Exception {
private static final String QUEUE_NAME = "basic_queue";
public static void main(String[] args) throws Exception {
//消息发送端,与RabbitMQ服务创建连接
Connection connection = ConnectionUtil.getConnection();
//创建一个频道
Channel channel = connection.createChannel();
//指定一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//发送的消息
String message = "hello world";
//往队列中发出一条消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("生产者已发送:" + message);
//关闭频道和连接
channel.close();
connection.close();
}
}
RabbitMQ生成端,主要就是5大流程:
1)producer生产发送消息;
2)创建一个频道(channel );
3)制定一个队列(queue);
4)往队列(queue)发送消息(message);
5)关闭频道和连接;
2.RabbitMQ消费消息
代码如下:
//消费者消费消息
public static void main(String[] args) throws Exception {
private static final String QUEUE_NAME = "basic_queue";
public static void main(String[] args) throws Exception {
//消息消费者与mq服务建立连接
Connection connection = ConnectionUtil.getConnection();
//建立通道
Channel channel = connection.createChannel();
//声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//创建消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println("消费者1接收到消息:" + msg);
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
RabbitMQ消费贷,主要就是如下流程:
1)与RabbitMQ服务端建立连接;
2)建立通道;
3)声明队列;
4)接收消息;
工作队列模式
在简单模式中,一个生产者对应一个消费者,而实际生产过程中有多个消费者接收消息,这就是工作队列模式。
工作队列模式模型,如下图所示:

默认情况下,RabbitMQ将按顺序将每条消息发送给下一个消费者,但是一条消息只会被一个消费者获取。
1.创建RabbitMQ连接工具类
/**
* 获取连接
* @return Connection
* @throws Exception
*/
public class ConnectionUtil {
public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ所在主机ip,或者主机名
factory.setHost("localhost");
//端口
factory.setPort(5672);
//设置账号信息用户名与密码
factory.setUsername("admin");
factory.setPassword("admin");
//通过工厂获取连接
Connection connection = factory.newConnection();
return connection;
}
}
2.RabbitMQ生产消息:发送10条消息
代码如下:
//消息生产者
public static void main(String[] args) throws Exception {
private static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
//创建连接
Connection connection = ConnectionUtil.getConnection();
//创建频道
Channel channel = connection.createChannel();
//指定队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 循环发布任务
for (int i = 1; i <= 10; i++) {
// 消息内容
String message = "task .. " + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("生产者发送消息:" + message);
Thread.sleep(500);
}
//关闭频道和连接
channel.close();
connection.close();
}
}
RabbitMQ将按顺序将每条消息发送给下一个消费者。
3.创建两个消费者(代码相同)
代码如下:
public class Consumer1 {
private static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
/**
* 创建队列Queue
* 参数:
* 1.queue:队列名称
* 2.durable:是否持久化,当mq重启之后是否还在
* 3.exclusive:是否独占,只能有一个消费者监听这个队列 当connection关闭时是否=删除队列
* 4.aotoDelete:是否自动删除,当没有接收端时是否自动删除 null
* 5.arguments:参数
*/
//如果没有hello_world的队列会自动创建,有就不会创建
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//接收消息
/**
* 1.queue:队列名称
* 2.aotoACK:是否自动确认
* 3.callback:回调对象
*/
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* 回调方法。当收到消息之后会自动执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息,交换机,路由Key
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body = " + new String(body));
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
当两个消费者都工作时,生成者发送消息,就会按照负载均衡算法分配给不同消费者。
发布订阅模式
在订阅模式中,可以实现一条消息被多个消费者获取,模型如下图所示:

这里相比较前面两种模型,多出了一个交换机(Exchange)。
交换机只负责转发消息,不具备存储消息的能力,没有队列绑定交换机,或者没有路由规则的队列,消息将丢失。
交换机有三种:
1)Fanout 广播:将消息交给所有绑定到交换机的队列;
2)Diect 定向:把消息交给符合指定的key的队列;
3)Topic 通配符:把消息交给符合routing pattern(路由模式的队列);
流程如下:
1)生产者将消息发送给交换机;
2)交换机将消息分散到队列中;
3)然后消费者在对应的队列中消费;
代码示例:
public static final String EXCHANGE_NAME = "test_exchange_fanout";
//生产者,发送消息到交换机
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明交换机 fanout:交换机类型 主要有fanout,direct,topics三种
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
String message = "Hello World!";
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
System.out.println(message);
channel.close();
connection.close();
}
//消费者1
public final static String QUEUE_NAME = "test_queue_exchange_1";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列到交换机上
channel.queueBind(QUEUE_NAME,Send.EXCHANGE_NAME,"");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME,true,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(message);
}
}
//消费者2
public final static String QUEUE_NAME = "test_queue_exchange_2";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列到交换机上
channel.queueBind(QUEUE_NAME,Send.EXCHANGE_NAME,"");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME,true,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(message);
}
}
路由模式
在路由模式(Direct)中,主要是根据路由规则(Routing Key)将消息发送到指定的队列。
模型如下图所示:

队列在与交换机绑定时会设定一个Routing Key,而生产者发送的消息时也需要携带一个Routing Key。
1.RabbitMQ生成消息
代码如下:
public class Send {
private static final String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明exchange,指定类型为direct
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String message = "新增一个订单";
//生产者发送消息时,设置消息的Routing Key:"insert"
channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes());
System.out.println("生产者发送消息:" + message);
channel.close();
connection.close();
}
}
2.RabbitMQ消费者
代码如下:
public class Consumer1 {
private static final String QUEUE_NAME = "direct_queue_1";
private static final String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//消费者声明自己的队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//消费者将队列与交换机进行绑定,并且设置Routing Key:"insert"
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String msg = new String(body);
System.out.println("消费者1获取到消息:" + msg);
}
});
}
}
Topic通配符模式
通配符模式:生产者生产消息发送至交换机,交换机通过通配符将消息发送给能匹配上的队列,再由订阅队列的消费者进行消费。
模型如下图所示:

模糊匹配路由规则,多个队列,多个消费者。
代码示例:
1.RabbitMQ生产消息
代码如下:
public class Producer_Topics {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
Connection connection = RabbitUtils.getConnection();
//创建channel 信道
Channel channel = connection.createChannel();
//创建交换机
/**
* String exchange,交换机名称
* BuiltinExchangeType type, 交换机类型 @See BuiltinExchangeType 定向 广播 通配符 参数匹配
* boolean durable, 是否持久化
* boolean autoDelete, 是否自动删除
* internal 内部使用 一般是false
* Map<String, Object> arguments 参数列表
*/
String ExchangeName = "test_topic";
channel.exchangeDeclare(ExchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
//创建队列
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//绑定队列和交换机
/**
* String queue, 队列名称
* String exchange, 交换机名称
* String routingKey 路由key
*/
//队列一绑定所有以error为结尾的日志,和绑定order后面所有的日志
channel.queueBind(queue1Name,ExchangeName,"#.error");
channel.queueBind(queue1Name,ExchangeName,"order.*");
//队列二绑定所有级别的日志
channel.queueBind(queue2Name,ExchangeName,"*.*");
//发送消息
String body = "日志信息:张三调用了delete方法 日志级别为:order.info";
//参数二指定绑定的队列
channel.basicPublish(ExchangeName,"order.info",null,body.getBytes());
//释放资源
RabbitUtils.closeConnectionAndchannel(channel,connection);
}
2.RabbitMQ消费责一
代码如下:
public class Consumer_Topic1 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitUtils.getConnection();
//创建channel
Channel channel = connection.createChannel();
String queue1Name = "test_topic_queue1";
//接收消息
/**
* 1.queue:队列名称
* 2.aotoACK:是否自动确认
* 3.callback:回调对象
*/
Consumer consumer = new DefaultConsumer(channel) {
/**
* 回调方法。当收到消息之后会自动执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息,交换机,路由Key
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body = " + new String(body));
System.out.println("将日志信息保存至数据库");
}
};
//接收队列一
channel.basicConsume(queue1Name, true, consumer);
}
}
3.RabbitMQ消费责二
代码如下:
public class Consumer_Topic2 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitUtils.getConnection();
//创建channel
Channel channel = connection.createChannel();
String queue2Name = "test_topic_queue2";
//接收消息
/**
* 1.queue:队列名称
* 2.aotoACK:是否自动确认
* 3.callback:回调对象
*/
Consumer consumer = new DefaultConsumer(channel) {
/**
* 回调方法。当收到消息之后会自动执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息,交换机,路由Key
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body = " + new String(body));
System.out.println("将日志信息保存至数据库");
}
};
//接收队列一
channel.basicConsume(queue2Name, true, consumer);
}
}
通配符和定向相比,通配符更加灵活。
mikechen睿哥
10年+大厂架构经验,资深技术专家,就职于阿里巴巴、淘宝、百度等一线互联网大厂。