Redis发布订阅定义
Redis 发布订阅是一种消息通信模式:发送者(pub)指的是发送消息,订阅者(sub)指的是接收消息。
每当有消息被发送至给定频道时,频道的所有订阅者都会收到消息,订阅者可以同时订阅多个频道,发送者可以再多个频道发送消息。
Redis发布订阅使用场景
Redis发布订阅这一功能,最明显的用法就是用作实时消息系统,比如普通的即时聊天,群聊等功能。
Redis发布订阅原理
Redis的发布订阅机制包括三个部分:发布者、订阅者、Channel,三者的关系如下图所示:

Redis的这种发布订阅机制与基于主题的发布订阅类似,Channel相当于主题,发布者和订阅者都是Redis客户端,Channel则为Redis服务器端。
首先:发布者(publisher)将消息发送到某个的频道(channel);
其次:订阅者(subscriber),只要订阅了这个频道,订阅者就能接收到这条消息;
Redis发布订阅命令
Redis发布订阅常见的命令如下:
1.publish 频道发送消息
Redis采用PUBLISH命令发送消息,其返回值为接收到该消息的订阅者的数量。
2.subscribe订阅某个频道
Redis采用SUBSCRIBE命令订阅某个频道,其返回值包括客户端订阅的频道,目前已订阅的频道数量,以及接收到的消息,其中subscribe表示已经成功订阅了某个频道。
3.psubscribe模式匹配
模式匹配功能允许客户端订阅符合某个模式的频道,Redis采用PSUBSCRIBE订阅符合某个模式所有频道,用“”表示模式,“”可以被任意值代替。
4.unsubscribe 退订频道
指退订给定的频道。
Redis发布订阅实现
1.发送端
把消息发送到指定的频道:mikechen
- 127.0.0.1:6379> PUBLISH mikechen"hello world!" #发送消息到mikechen频道
- (integer) 1
- 127.0.0.1:6379> PUBLISH mikechen"my name is mikechen" #发送消息到mikechen频道
- (integer) 1
2.订阅端
订阅端可以通过SUBSCRIBE 来订阅频道,后续就可以接收到来自发送端的消息了。
- 127.0.0.1:6379> SUBSCRIBE mikechen #订阅名字为 mikechen的频道
- Reading messages... (press Ctrl-C to quit)
- 1) "subscribe"
- 2) "mikechen"
- 3) (integer) 1
- #等待推送的信息
- 1) "message" #消息
- 2) "mikechen" #来自哪个频道的消息
- 3) "hello world\xef\xbc\x81" # 消息的具体内容
- 1) "message"
- 2) "dingdada"
- 3) "my name is dyj\x81"
Redis发布订阅实例
现在我们很多应用已经基于 SpringBoot 开发,使用spring-boot-starter-data-redis ,可以简化发布订阅开发。
1.加入Redis Maven依赖
- org.springframework.boot
- spring-boot-starter-data-redis
2.创建消息接收类
然后我们需要创建一个消息接收类,里面需要有方法消费消息,如下所示:
- @Slf4j
- public class Receiver {
- private AtomicInteger counter = new AtomicInteger();
- public void receiveMessage(String message) {
- log.info("Received <" message ">");
- counter.incrementAndGet();
- }
- public int getCount() {
- return counter.get();
- }
- }
3.注入Spring Redis相关类
接着我们只需要注入 Spring-Redis 相关 Bean,比如:StringRedisTemplate用来操作 Redis 命令,以及MessageListenerAdapter 消息监听器等。
示例如下:
- @Configuration
- public class MessageConfiguration {
- @Bean
- RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
- MessageListenerAdapter listenerAdapter) {
- RedisMessageListenerContainer container = new RedisMessageListenerContainer();
- container.setConnectionFactory(connectionFactory);
- // 订阅指定频道使用 ChannelTopic
- // 订阅模式使用 PatternTopic
- container.addMessageListener(listenerAdapter, new ChannelTopic("pay_result"));
- return container;
- }
- @Bean
- MessageListenerAdapter listenerAdapter(Receiver receiver) {
- // 注入 Receiver,指定类中的接受方法
- return new MessageListenerAdapter(receiver, "receiveMessage");
- }
- @Bean
- Receiver receiver() {
- return new Receiver();
- }
- @Bean
- StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
- return new StringRedisTemplate(connectionFactory);
- }
- }
4.发送接收消息
最后我们使用 StringRedisTemplate.convertAndSend 发送消息,同时 Receiver 将会收到一条消息。
示例如下:
- @SpringBootApplication
- public class MessagingRedisApplication {
- public static void main(String[] args) throws InterruptedException {
- ApplicationContext ctx = SpringApplication.run(MessagingRedisApplication.class, args);
- StringRedisTemplate template = ctx.getBean(StringRedisTemplate.class);
- Receiver receiver = ctx.getBean(Receiver.class);
- while (receiver.getCount() == 0) {
- template.convertAndSend("pay_result", "Hello from Redis!");
- Thread.sleep(500L);
- }
- System.exit(0);
- }
- }