最近生产上碰到一个奇怪的问题,现场人员反馈Kafka
工作不正常。经过开发人员检查,发现一个奇怪的现象:线上两台服务器竟然在同时消费一个Kafka
Topic
中的一个分区(Partition
)。
1. 背景
我们的程序要实现的功能是这样的:系统会定时扫描库存表,根据库存情况决定是否要执行出库操作。之前是采用定时任务处理,执行策略是1分钟一次,为了防止多台服务器并发执行,使用Quartz
集群模式确保定时任务只在一台服务器上执行。最近客户提出需求,某些时候客户希望响应更及时一些,譬如某项请求到达的时候,就要立即检查库存并决定是否需要出库。因此,开发人员对系统进行了调整,改为使用Kafka
Consumer
来触发扫描库存动作;定时任务和接口处同时负责发送触发库存扫描的消息。
相关的代码如下:
Consumer
用来处理扫描库存的功能:
1public class LibraryOutboundTriggerConsumer {
2 private final OutboundService outboundService;
3 private final BoxInfoService boxInfoService;
4
5 @KafkaListener(topics = "${fwm.core.topic.library-outbound-trigger}")
6 public void consume(ConsumerRecord<String, String> record) {
7 log.info("\n\n");
8 log.info("=====================开始执行出库扫描=====================");
9 if (boxInfoService.hasBoxUpdateTask()) {
10 log.info("=====================存在料箱更新任务,执行出库扫描结束=====================");
11 return;
12 }
13 Stopwatch stopwatch = Stopwatch.createStarted();
14 try {
15 boxInfoService.startOutboundTask(90);
16 outboundService.outboundTaskScan();
17 } finally {
18 boxInfoService.stopOutboundTask();
19 }
20
21 log.info("=====================执行出库扫描结束,总耗时{}ms=====================", stopwatch.elapsed(TimeUnit.MILLISECONDS));
22 }
23}