Redis发布订阅详解(原理命令及使用实例)

Redis发布订阅详解(原理命令及使用实例)-mikechen

Redis发布订阅定义

Redis 发布订阅是一种消息通信模式:发送者(pub)指的是发送消息,订阅者(sub)指的是接收消息。

每当有消息被发送至给定频道时,频道的所有订阅者都会收到消息,订阅者可以同时订阅多个频道,发送者可以再多个频道发送消息。

 

Redis发布订阅使用场景

Redis发布订阅这一功能,最明显的用法就是用作实时消息系统,比如普通的即时聊天,群聊等功能。

 

Redis发布订阅原理

Redis的发布订阅机制包括三个部分:发布者、订阅者、Channel,三者的关系如下图所示:

Redis发布订阅详解(原理命令及使用实例)-mikechen

Redis的这种发布订阅机制与基于主题的发布订阅类似,Channel相当于主题,发布者和订阅者都是Redis客户端,Channel则为Redis服务器端。
首先:发布者(publisher)将消息发送到某个的频道(channel);

其次:订阅者(subscriber),只要订阅了这个频道,订阅者就能接收到这条消息;

 

Redis发布订阅命令

Redis发布订阅常见的命令如下:

1.publish 频道发送消息

Redis采用PUBLISH命令发送消息,其返回值为接收到该消息的订阅者的数量。

2.subscribe订阅某个频道

Redis采用SUBSCRIBE命令订阅某个频道,其返回值包括客户端订阅的频道,目前已订阅的频道数量,以及接收到的消息,其中subscribe表示已经成功订阅了某个频道。

3.psubscribe模式匹配

模式匹配功能允许客户端订阅符合某个模式的频道,Redis采用PSUBSCRIBE订阅符合某个模式所有频道,用“”表示模式,“”可以被任意值代替。

4.unsubscribe 退订频道

指退订给定的频道。

 

Redis发布订阅实现

1.发送端

把消息发送到指定的频道:mikechen

  1. 127.0.0.1:6379> PUBLISH mikechen"hello world!" #发送消息到mikechen频道
  2. (integer) 1
  3. 127.0.0.1:6379> PUBLISH mikechen"my name is mikechen" #发送消息到mikechen频道
  4. (integer) 1

 

2.订阅端

订阅端可以通过SUBSCRIBE 来订阅频道,后续就可以接收到来自发送端的消息了。

  1. 127.0.0.1:6379> SUBSCRIBE mikechen #订阅名字为 mikechen的频道
  2. Reading messages... (press Ctrl-C to quit)
  3. 1) "subscribe"
  4. 2) "mikechen"
  5. 3) (integer) 1
  6.  
  7. #等待推送的信息
  8. 1) "message" #消息
  9. 2) "mikechen" #来自哪个频道的消息
  10. 3) "hello world\xef\xbc\x81" # 消息的具体内容
  11. 1) "message"
  12. 2) "dingdada"
  13. 3) "my name is dyj\x81"

 

Redis发布订阅实例

现在我们很多应用已经基于 SpringBoot 开发,使用spring-boot-starter-data-redis ,可以简化发布订阅开发。

1.加入Redis Maven依赖

  1. org.springframework.boot
  2. spring-boot-starter-data-redis

 

2.创建消息接收类

然后我们需要创建一个消息接收类,里面需要有方法消费消息,如下所示:

  1. @Slf4j
  2. public class Receiver {
  3. private AtomicInteger counter = new AtomicInteger();
  4.  
  5. public void receiveMessage(String message) {
  6. log.info("Received <" message ">");
  7. counter.incrementAndGet();
  8. }
  9.  
  10. public int getCount() {
  11. return counter.get();
  12. }
  13. }

 

3.注入Spring Redis相关类

接着我们只需要注入 Spring-Redis 相关 Bean,比如:StringRedisTemplate用来操作 Redis 命令,以及MessageListenerAdapter 消息监听器等。

示例如下:

  1. @Configuration
  2. public class MessageConfiguration {
  3.  
  4. @Bean
  5. RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
  6. MessageListenerAdapter listenerAdapter) {
  7.  
  8. RedisMessageListenerContainer container = new RedisMessageListenerContainer();
  9. container.setConnectionFactory(connectionFactory);
  10. // 订阅指定频道使用 ChannelTopic
  11. // 订阅模式使用 PatternTopic
  12. container.addMessageListener(listenerAdapter, new ChannelTopic("pay_result"));
  13.  
  14. return container;
  15. }
  16.  
  17. @Bean
  18. MessageListenerAdapter listenerAdapter(Receiver receiver) {
  19. // 注入 Receiver,指定类中的接受方法
  20. return new MessageListenerAdapter(receiver, "receiveMessage");
  21. }
  22.  
  23. @Bean
  24. Receiver receiver() {
  25. return new Receiver();
  26. }
  27.  
  28. @Bean
  29. StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
  30. return new StringRedisTemplate(connectionFactory);
  31. }
  32.  
  33. }

 

4.发送接收消息

最后我们使用 StringRedisTemplate.convertAndSend 发送消息,同时 Receiver 将会收到一条消息。

示例如下:

  1. @SpringBootApplication
  2. public class MessagingRedisApplication {
  3. public static void main(String[] args) throws InterruptedException {
  4.  
  5. ApplicationContext ctx = SpringApplication.run(MessagingRedisApplication.class, args);
  6.  
  7. StringRedisTemplate template = ctx.getBean(StringRedisTemplate.class);
  8. Receiver receiver = ctx.getBean(Receiver.class);
  9.  
  10. while (receiver.getCount() == 0) {
  11. template.convertAndSend("pay_result", "Hello from Redis!");
  12. Thread.sleep(500L);
  13. }
  14.  
  15. System.exit(0);
  16. }
  17. }
评论交流
    说说你的看法
欢迎您,新朋友,感谢参与互动!