迁移到SpringBoot 07 - Redis发布订阅机制

Redis中的发布订阅机制(Pub/Sub)是基于channel这一概念的,这有些类似于Kafka中的基于topic的消息机制,只是不支持持久化。如果publish的消息,没有任何client处于”subscribe”状态,消息将会被丢弃。如果client在subcribe时,链接断开后重连,那么此期间的消息也将丢失。Redis server将会”尽力”将消息发送给处于subscribe状态的client,但是仍不会保证每条消息都能被正确接收。

为了解耦发布者(publisher)和订阅者(subscriber)之间的关系,Redis 使用了 channel (频道)作为两者的中介:发布者将信息直接发布给 channel ,而 channel 负责将信息发送给适当的订阅者,发布者和订阅者之间没有相互关系,也不知道对方的存在。

Spring Data Redis组件对Pub/Sub机制进行了抽象,提供了类似JMS的编程模式。Spring Data Redis使用了一个Container(RedisMessageListenerContainer)来解决发布订阅机制。这个Container使用一个Redis链接解决了多个topic订阅的问题。它把其他的订阅、发布者隔离成基本的POJO对象,而不用与Redis对象打交道。这简化了整个编程模型。

1. 使用方法

1.1 Container

首先定义一个Container。

1
2
3
4
5
6
@Bean
RedisMessageListenerContainer redisContainer() {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory); // 注入RedisConnectionFactory
return container;
}

其中的redisConnectionFactory是通过AutoConfiguration自动创建的对象,按照实际的情况配置spring.redis.*相关配置参数即可创建。如有需要,可以按照自己的需要进行修改。

1.2 发布

发布使用redisTemplate的convertAndSend方法即可。

1
2
3
4
5
6
7
8
protected void publish(String channel, String message) {
try {
redisTemplate.convertAndSend(channel, message);
logger.info("publish success! channel={},message={}", channel, message);
} catch (Exception e) {
logger.error("publish error! channel={},message={}, exception: {}", channel, message, e);
}
}

channel

1.3 订阅

订阅需要指定channel和对应的Handler即可。Handler需要实现MessageListener接口。下面是订阅函数的示例:

1
2
3
4
5
6
7
8
protected void subscribe(String channel, MessageListener handle) {
try {
container.addMessageListener(handle, new ChannelTopic(channel));
logger.info("subscribe success! channel={}", channel);
} catch (Exception e) {
logger.error("subscribe error! channel={}. Exception: {}", channel, e);
}
}

一个实现MessageListener接口的例子如下:

1
2
3
4
5
6
public void onMessage(Message message, byte[] pattern) {
long begin = System.currentTimeMillis();
listCardBin = bankcardBinDal.selectAll();
long timeUsed = System.currentTimeMillis() - begin;
logger.info("receive message & init cardBin success! timeUsed={}ms,[{}:{}]", timeUsed, new String(pattern), message);
}

收到订阅消息后的处理可以在上面接口中实现。

附录、参考资料

热评文章