Node.js 使用Redis发布订阅模式

1. 背景

最近在用Node.js做一个管理系统的时候碰到了一个场景:为了访问性能,系统在内存中(不是Redis)缓存了一些常用数据,例如系统菜单树之类的。但是什么时候刷新这些缓存就成了问题。当然在单服务器模式下也不是大问题,只要在更新数据的时候删除内存中的缓存数据即可。但是这一方法在分布式服务中就无效了:同时会有多个这种系统在跑,但是只有一台服务器接到了处理请求,其他服务器根本没有办法刷新内存中的数据。
当然有人会说将缓存放到Redis中不就解决问题了?但是如果数据量稍大一些,而且访问频繁、更新却不频繁,放到Redis中每次访问都会对Redis带来不小的压力,显得很没有必要。
这种情况下,自然就可以使用Redis的发布订阅机制来解决问题了:当数据更新之后,只要发布一个消息到Redis,所有服务器都可以收到消息,执行刷新缓存的操作了。

2. Redis发布订阅机制介绍

Redis 发布订阅(pub/sub)是一种消息通信模式。有两类参与者:发送者(pub)发送消息,订阅者(sub)接收消息。发布者和订阅者通过频道(Channel)进行沟通。发布者发布消息到指定频道上,订阅者订阅特定的频道,获取发布者发布的消息。

下图展示了频道 channel1 ,以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:

当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:

3. 代码示例

使用Node.js调用Redis的发布订阅机制非常简单。我用的Redis客户端是:node_redis

基本上把所有功能都封装到了cache-service中。发布者、订阅者的代码基本上与Redis无关了,理论上可以切换到其他同类的服务上去。
注:代码使用ES2017的特性(await/async)。

3.1 cache-services.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 订阅接口
// channel 需要订阅的channel
// callback 处理订阅的消息的回调函数,可以为async function
exports.subscribe = async function(channel, callback) {
// 读取redis配置信息
var redisConfig = config.get("redis");
var options = {
host: redisConfig.host,
port: redisConfig.port,
user: redisConfig.user,
password: redisConfig.password,
db: redisConfig.database
};
// 创建redis client
var subClient = redis.createClient(options);
// 添加订阅监听处理函数
subClient.on("message", async function(channel, message) {
logger.info(format.vsprintf("Received subscribe message, channel [%s] message [%s]", [channel, message]));
// 调用回调函数
await callback(channel, message);
});
// 开始订阅
subClient.subscribe("channel_update_permissions");
// 订阅成功
subClient.on("ready", function() {
logger.info(format.vsprintf('Redis [%s:%s/%s] is connected and ready for subscribe channel [%s] use.', [redisConfig.host, redisConfig.port, redisConfig.database, channel]));
});
// 错误处理
redisClient.on("error", function (err) {
logger.error("Subscribe channel ["+channel+"] encountered error. Error:" + err);
});
// 将client加到全局map中,以备后用
subscribeClients.set(channel, subClient);
};

// 调用publish功能。使用普通redis client即可
exports.publish = async function(channel, message) {
await redisClient.publish(channel, message);
};

说明:按照node_redis官方文档的说明,调用了subscribe的客户端将进入订阅模式,无法执行其他操作。因此这里每次都创建一个新的redis连接,并且设置了一个内部表,用于保存这些创建的链接。

When a client issues a SUBSCRIBE or PSUBSCRIBE, that connection is put into a “subscriber” mode. At that point, only commands that modify the subscription set are valid and quit (and depending on the redis version ping as well). When the subscription set is empty, the connection is put back into regular mode.

3.2 调用订阅功能,监听事件。在必要的时候自动加载权限表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 加载权限表
async function initPermssionTable() {
}

// 系统启动的时候自动加载权限表
(async function() {
permissionTable = await initPermssionTable();
logger.info("Permission table initialized.");
cacheServices.subscribe("channel_update_permissions", async function(channel, message) {
logger.info("Try to reload permission table....");
permissionTable = await initPermssionTable();
logger.info("Permission table reloaded successfully!");
});
})();

3.3 发布通知

1
2
3
4
...
// 更新全局权限表,通过redis.publish实现
await cacheServices.publish("channel_update_permissions", "anything");
...

附录A. 参考资料

热评文章