Redis发布订阅定义
Redis 发布订阅是一种消息通信模式:发送者(pub)指的是发送消息,订阅者(sub)指的是接收消息。
每当有消息被发送至给定频道时,频道的所有订阅者都会收到消息,订阅者可以同时订阅多个频道,发送者可以再多个频道发送消息。
Redis发布订阅使用场景
Redis发布订阅这一功能,最明显的用法就是用作实时消息系统,比如普通的即时聊天,群聊等功能。
Redis发布订阅原理
Redis的发布订阅机制包括三个部分:发布者、订阅者、Channel,三者的关系如下图所示:
Redis的这种发布订阅机制与基于主题的发布订阅类似,Channel相当于主题,发布者和订阅者都是Redis客户端,Channel则为Redis服务器端。
首先:发布者(publisher)将消息发送到某个的频道(channel);
其次:订阅者(subscriber),只要订阅了这个频道,订阅者就能接收到这条消息;
Redis发布订阅命令
Redis发布订阅常见的命令如下:
1.publish 频道发送消息
Redis采用PUBLISH命令发送消息,其返回值为接收到该消息的订阅者的数量。
2.subscribe订阅某个频道
Redis采用SUBSCRIBE命令订阅某个频道,其返回值包括客户端订阅的频道,目前已订阅的频道数量,以及接收到的消息,其中subscribe表示已经成功订阅了某个频道。
3.psubscribe模式匹配
模式匹配功能允许客户端订阅符合某个模式的频道,Redis采用PSUBSCRIBE订阅符合某个模式所有频道,用“”表示模式,“”可以被任意值代替。
4.unsubscribe 退订频道
指退订给定的频道。
Redis发布订阅实现
1.发送端
把消息发送到指定的频道:mikechen
127.0.0.1:6379> PUBLISH mikechen"hello world!" #发送消息到mikechen频道 (integer) 1 127.0.0.1:6379> PUBLISH mikechen"my name is mikechen" #发送消息到mikechen频道 (integer) 1
2.订阅端
订阅端可以通过SUBSCRIBE 来订阅频道,后续就可以接收到来自发送端的消息了。
127.0.0.1:6379> SUBSCRIBE mikechen #订阅名字为 mikechen的频道 Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "mikechen" 3) (integer) 1 #等待推送的信息 1) "message" #消息 2) "mikechen" #来自哪个频道的消息 3) "hello world\xef\xbc\x81" # 消息的具体内容 1) "message" 2) "dingdada" 3) "my name is dyj\x81"
Redis发布订阅实例
现在我们很多应用已经基于 SpringBoot 开发,使用spring-boot-starter-data-redis ,可以简化发布订阅开发。
1.加入Redis Maven依赖
org.springframework.boot spring-boot-starter-data-redis
2.创建消息接收类
然后我们需要创建一个消息接收类,里面需要有方法消费消息,如下所示:
@Slf4j public class Receiver { private AtomicInteger counter = new AtomicInteger(); public void receiveMessage(String message) { log.info("Received <" message ">"); counter.incrementAndGet(); } public int getCount() { return counter.get(); } }
3.注入Spring Redis相关类
接着我们只需要注入 Spring-Redis 相关 Bean,比如:StringRedisTemplate用来操作 Redis 命令,以及MessageListenerAdapter 消息监听器等。
示例如下:
@Configuration public class MessageConfiguration { @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); // 订阅指定频道使用 ChannelTopic // 订阅模式使用 PatternTopic container.addMessageListener(listenerAdapter, new ChannelTopic("pay_result")); return container; } @Bean MessageListenerAdapter listenerAdapter(Receiver receiver) { // 注入 Receiver,指定类中的接受方法 return new MessageListenerAdapter(receiver, "receiveMessage"); } @Bean Receiver receiver() { return new Receiver(); } @Bean StringRedisTemplate template(RedisConnectionFactory connectionFactory) { return new StringRedisTemplate(connectionFactory); } }
4.发送接收消息
最后我们使用 StringRedisTemplate.convertAndSend 发送消息,同时 Receiver 将会收到一条消息。
示例如下:
@SpringBootApplication public class MessagingRedisApplication { public static void main(String[] args) throws InterruptedException { ApplicationContext ctx = SpringApplication.run(MessagingRedisApplication.class, args); StringRedisTemplate template = ctx.getBean(StringRedisTemplate.class); Receiver receiver = ctx.getBean(Receiver.class); while (receiver.getCount() == 0) { template.convertAndSend("pay_result", "Hello from Redis!"); Thread.sleep(500L); } System.exit(0); } }
陈睿mikechen
10年+大厂架构经验,资深技术专家,就职于阿里巴巴、淘宝、百度等一线互联网大厂。
关注「mikechen」公众号,获取更多技术干货!
后台回复【面试】即可获取《史上最全阿里Java面试题总结》,后台回复【架构】,即可获取《阿里架构师进阶专题全部合集》