RabbitMQ使用主要包含如下5种,下面重点详解5种RabbitMQ使用方式,非常详细。
简单模式
简单模式:主要就是一个producer发送消息,一个接收者接收消息。
模型如下图所示:
简单模式的应用场景:手机短信、邮件单发。
使用示例:
1.创建RabbitMQ服务工具类
主要用于与RabbitMQ建立连接,代码如下:
/** * 获取连接 * @return Connection * @throws Exception */ public class ConnectionUtil { public static Connection getConnection() throws Exception { //定义连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置RabbitMQ所在主机ip,或者主机名 factory.setHost("localhost"); //端口 factory.setPort(5672); //设置账号信息用户名与密码 factory.setUsername("admin"); factory.setPassword("admin"); //通过工厂获取连接 Connection connection = factory.newConnection(); return connection; } }
2.RabbitMQ生产消息
代码如下:
//创建队列 发送消息 public static void main(String[] args) throws Exception { private static final String QUEUE_NAME = "basic_queue"; public static void main(String[] args) throws Exception { //消息发送端,与RabbitMQ服务创建连接 Connection connection = ConnectionUtil.getConnection(); //创建一个频道 Channel channel = connection.createChannel(); //指定一个队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //发送的消息 String message = "hello world"; //往队列中发出一条消息 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("生产者已发送:" + message); //关闭频道和连接 channel.close(); connection.close(); } }
RabbitMQ生成端,主要就是5大流程:
1)producer生产发送消息;
2)创建一个频道(channel );
3)制定一个队列(queue);
4)往队列(queue)发送消息(message);
5)关闭频道和连接;
2.RabbitMQ消费消息
代码如下:
//消费者消费消息 public static void main(String[] args) throws Exception { private static final String QUEUE_NAME = "basic_queue"; public static void main(String[] args) throws Exception { //消息消费者与mq服务建立连接 Connection connection = ConnectionUtil.getConnection(); //建立通道 Channel channel = connection.createChannel(); //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //创建消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println("消费者1接收到消息:" + msg); } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
RabbitMQ消费贷,主要就是如下流程:
1)与RabbitMQ服务端建立连接;
2)建立通道;
3)声明队列;
4)接收消息;
工作队列模式
在简单模式中,一个生产者对应一个消费者,而实际生产过程中有多个消费者接收消息,这就是工作队列模式。
工作队列模式模型,如下图所示:
默认情况下,RabbitMQ将按顺序将每条消息发送给下一个消费者,但是一条消息只会被一个消费者获取。
1.创建RabbitMQ连接工具类
/** * 获取连接 * @return Connection * @throws Exception */ public class ConnectionUtil { public static Connection getConnection() throws Exception { //定义连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置RabbitMQ所在主机ip,或者主机名 factory.setHost("localhost"); //端口 factory.setPort(5672); //设置账号信息用户名与密码 factory.setUsername("admin"); factory.setPassword("admin"); //通过工厂获取连接 Connection connection = factory.newConnection(); return connection; } }
2.RabbitMQ生产消息:发送10条消息
代码如下:
//消息生产者 public static void main(String[] args) throws Exception { private static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { //创建连接 Connection connection = ConnectionUtil.getConnection(); //创建频道 Channel channel = connection.createChannel(); //指定队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 循环发布任务 for (int i = 1; i <= 10; i++) { // 消息内容 String message = "task .. " + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("生产者发送消息:" + message); Thread.sleep(500); } //关闭频道和连接 channel.close(); connection.close(); } }
RabbitMQ将按顺序将每条消息发送给下一个消费者。
3.创建两个消费者(代码相同)
代码如下:
public class Consumer1 { private static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); /** * 创建队列Queue * 参数: * 1.queue:队列名称 * 2.durable:是否持久化,当mq重启之后是否还在 * 3.exclusive:是否独占,只能有一个消费者监听这个队列 当connection关闭时是否=删除队列 * 4.aotoDelete:是否自动删除,当没有接收端时是否自动删除 null * 5.arguments:参数 */ //如果没有hello_world的队列会自动创建,有就不会创建 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //接收消息 /** * 1.queue:队列名称 * 2.aotoACK:是否自动确认 * 3.callback:回调对象 */ DefaultConsumer consumer = new DefaultConsumer(channel) { /** * 回调方法。当收到消息之后会自动执行该方法 * @param consumerTag 标识 * @param envelope 获取一些信息,交换机,路由Key * @param properties 配置信息 * @param body 数据 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body = " + new String(body)); } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
当两个消费者都工作时,生成者发送消息,就会按照负载均衡算法分配给不同消费者。
发布订阅模式
在订阅模式中,可以实现一条消息被多个消费者获取,模型如下图所示:
这里相比较前面两种模型,多出了一个交换机(Exchange)。
交换机只负责转发消息,不具备存储消息的能力,没有队列绑定交换机,或者没有路由规则的队列,消息将丢失。
交换机有三种:
1)Fanout 广播:将消息交给所有绑定到交换机的队列;
2)Diect 定向:把消息交给符合指定的key的队列;
3)Topic 通配符:把消息交给符合routing pattern(路由模式的队列);
流程如下:
1)生产者将消息发送给交换机;
2)交换机将消息分散到队列中;
3)然后消费者在对应的队列中消费;
代码示例:
public static final String EXCHANGE_NAME = "test_exchange_fanout"; //生产者,发送消息到交换机 public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //声明交换机 fanout:交换机类型 主要有fanout,direct,topics三种 channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes()); System.out.println(message); channel.close(); connection.close(); } //消费者1 public final static String QUEUE_NAME = "test_queue_exchange_1"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //绑定队列到交换机上 channel.queueBind(QUEUE_NAME,Send.EXCHANGE_NAME,""); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME,true,consumer); while(true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(message); } } //消费者2 public final static String QUEUE_NAME = "test_queue_exchange_2"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //绑定队列到交换机上 channel.queueBind(QUEUE_NAME,Send.EXCHANGE_NAME,""); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME,true,consumer); while(true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(message); } }
路由模式
在路由模式(Direct)中,主要是根据路由规则(Routing Key)将消息发送到指定的队列。
模型如下图所示:
队列在与交换机绑定时会设定一个Routing Key,而生产者发送的消息时也需要携带一个Routing Key。
1.RabbitMQ生成消息
代码如下:
public class Send { private static final String EXCHANGE_NAME = "direct_exchange"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明exchange,指定类型为direct channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String message = "新增一个订单"; //生产者发送消息时,设置消息的Routing Key:"insert" channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes()); System.out.println("生产者发送消息:" + message); channel.close(); connection.close(); } }
2.RabbitMQ消费者
代码如下:
public class Consumer1 { private static final String QUEUE_NAME = "direct_queue_1"; private static final String EXCHANGE_NAME = "direct_exchange"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //消费者声明自己的队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //消费者将队列与交换机进行绑定,并且设置Routing Key:"insert" channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert"); channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); System.out.println("消费者1获取到消息:" + msg); } }); } }
Topic通配符模式
通配符模式:生产者生产消息发送至交换机,交换机通过通配符将消息发送给能匹配上的队列,再由订阅队列的消费者进行消费。
模型如下图所示:
模糊匹配路由规则,多个队列,多个消费者。
代码示例:
1.RabbitMQ生产消息
代码如下:
public class Producer_Topics { public static void main(String[] args) throws IOException, TimeoutException { //创建连接 Connection connection = RabbitUtils.getConnection(); //创建channel 信道 Channel channel = connection.createChannel(); //创建交换机 /** * String exchange,交换机名称 * BuiltinExchangeType type, 交换机类型 @See BuiltinExchangeType 定向 广播 通配符 参数匹配 * boolean durable, 是否持久化 * boolean autoDelete, 是否自动删除 * internal 内部使用 一般是false * Map<String, Object> arguments 参数列表 */ String ExchangeName = "test_topic"; channel.exchangeDeclare(ExchangeName, BuiltinExchangeType.TOPIC,true,false,false,null); //创建队列 String queue1Name = "test_topic_queue1"; String queue2Name = "test_topic_queue2"; channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null); //绑定队列和交换机 /** * String queue, 队列名称 * String exchange, 交换机名称 * String routingKey 路由key */ //队列一绑定所有以error为结尾的日志,和绑定order后面所有的日志 channel.queueBind(queue1Name,ExchangeName,"#.error"); channel.queueBind(queue1Name,ExchangeName,"order.*"); //队列二绑定所有级别的日志 channel.queueBind(queue2Name,ExchangeName,"*.*"); //发送消息 String body = "日志信息:张三调用了delete方法 日志级别为:order.info"; //参数二指定绑定的队列 channel.basicPublish(ExchangeName,"order.info",null,body.getBytes()); //释放资源 RabbitUtils.closeConnectionAndchannel(channel,connection); }
2.RabbitMQ消费责一
代码如下:
public class Consumer_Topic1 { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitUtils.getConnection(); //创建channel Channel channel = connection.createChannel(); String queue1Name = "test_topic_queue1"; //接收消息 /** * 1.queue:队列名称 * 2.aotoACK:是否自动确认 * 3.callback:回调对象 */ Consumer consumer = new DefaultConsumer(channel) { /** * 回调方法。当收到消息之后会自动执行该方法 * @param consumerTag 标识 * @param envelope 获取一些信息,交换机,路由Key * @param properties 配置信息 * @param body 数据 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body = " + new String(body)); System.out.println("将日志信息保存至数据库"); } }; //接收队列一 channel.basicConsume(queue1Name, true, consumer); } }
3.RabbitMQ消费责二
代码如下:
public class Consumer_Topic2 { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitUtils.getConnection(); //创建channel Channel channel = connection.createChannel(); String queue2Name = "test_topic_queue2"; //接收消息 /** * 1.queue:队列名称 * 2.aotoACK:是否自动确认 * 3.callback:回调对象 */ Consumer consumer = new DefaultConsumer(channel) { /** * 回调方法。当收到消息之后会自动执行该方法 * @param consumerTag 标识 * @param envelope 获取一些信息,交换机,路由Key * @param properties 配置信息 * @param body 数据 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body = " + new String(body)); System.out.println("将日志信息保存至数据库"); } }; //接收队列一 channel.basicConsume(queue2Name, true, consumer); } }
通配符和定向相比,通配符更加灵活。
mikechen睿哥
mikechen睿哥,十余年BAT架构经验,资深技术专家,就职于阿里、淘宝、百度等一线互联网大厂。
关注「mikechen」公众号,获取更多技术干货!
后台回复【面试】即可获取《史上最全阿里Java面试题总结》,后台回复【架构】,即可获取《阿里架构师进阶专题全部合集》