重复消费产生原因
- 网络问题
- 消息队列一般对消息消费失败都有重试机制,假如有一个消息消费执行了某些逻辑后在删除标识时失败了 (服务宕机,断电等问题导致中断),那消息队列就会进行重试,而造成重复消费。
待补充。
怎么解决
通过 Redis 设置幂等标识,当需要进行消费时,判断以当前消息 ID 为后缀的幂等 Key 是否存在于 Redis 中,如果其已经被消费,则直接返回,否则放行进行消费并设置完成标识。
完成标识和消息消费的标识是同一个 Key,因为消息消费是判断是否存在这条消息消费记录,而完成标识是通过设置这个 key 的 value 为 1 表示完成。
(双重判定保证消费不会因为异常丢失)这一点也可以写上。
消费过程如下:
@Override
public void onMessage(MapRecord<String, String, String> message) {
String stream = message.getStream();
RecordId id = message.getId();
// 判断当前的这个消息是否已经被消费
if (!messageQueueIdempotentHandler.isMessageProcessed(id.toString())) {
//消费失败,但是此时已经设置了消费标识,可能会导致消息未被真实消费,所以设置一个完成标识
if (messageQueueIdempotentHandler.isAccomplish(id.toString())) {
return;
}
throw new ServiceException("消息未完成流程,需要消息队列重试");
}
try {
Map<String, String> producerMap = message.getValue();
String fullShortUrl = producerMap.get("fullShortUrl");
if (StrUtil.isNotBlank(fullShortUrl)) {
String gid = producerMap.get("gid");
ShortLinkStatsRecordDTO statsRecord = JSON.parseObject(producerMap.get("statsRecord"), ShortLinkStatsRecordDTO.class);
//核心:调用原本的短链接统计方法
actualSaveShortLinkStats(fullShortUrl, gid, statsRecord);
}
//由于Redis自身没有像MQ那种智能的把消费过的信息,待一段时间后自动删除,而且内存珍贵,所以手动删除这个消息(也能避免重复消费)
stringRedisTemplate.opsForStream().delete(Objects.requireNonNull(stream), id.getValue());
} catch (Throwable ex) {
// 消息队列宕机处理
messageQueueIdempotentHandler.delMessageProcessed(id.toString());
log.error("记录短链接监控消费异常", ex);
}
//当完整执行完消费流程,才会设置完成标识。
messageQueueIdempotentHandler.setAccomplish(id.toString());
}