解决消息队列重复消费问题


重复消费产生原因

  1. 网络问题
  2. 消息队列一般对消息消费失败都有重试机制,假如有一个消息消费执行了某些逻辑后在删除标识时失败了 (服务宕机,断电等问题导致中断),那消息队列就会进行重试,而造成重复消费。

待补充。

怎么解决

通过 Redis 设置幂等标识,当需要进行消费时,判断以当前消息 ID 为后缀的幂等 Key 是否存在于 Redis 中,如果其已经被消费,则直接返回,否则放行进行消费并设置完成标识

完成标识和消息消费的标识是同一个 Key,因为消息消费是判断是否存在这条消息消费记录,而完成标识是通过设置这个 key 的 value 为 1 表示完成。

(双重判定保证消费不会因为异常丢失)这一点也可以写上。

消费过程如下:

@Override
   public void onMessage(MapRecord<String, String, String> message) {
       String stream = message.getStream();
       RecordId id = message.getId();
       // 判断当前的这个消息是否已经被消费
       if (!messageQueueIdempotentHandler.isMessageProcessed(id.toString())) {
           //消费失败,但是此时已经设置了消费标识,可能会导致消息未被真实消费,所以设置一个完成标识
           if (messageQueueIdempotentHandler.isAccomplish(id.toString())) {
               return;
           }
           throw new ServiceException("消息未完成流程,需要消息队列重试");
       }
       try {
           Map<String, String> producerMap = message.getValue();
           String fullShortUrl = producerMap.get("fullShortUrl");
           if (StrUtil.isNotBlank(fullShortUrl)) {
               String gid = producerMap.get("gid");
               ShortLinkStatsRecordDTO statsRecord = JSON.parseObject(producerMap.get("statsRecord"), ShortLinkStatsRecordDTO.class);
               //核心:调用原本的短链接统计方法
               actualSaveShortLinkStats(fullShortUrl, gid, statsRecord);
           }
           //由于Redis自身没有像MQ那种智能的把消费过的信息,待一段时间后自动删除,而且内存珍贵,所以手动删除这个消息(也能避免重复消费)
           stringRedisTemplate.opsForStream().delete(Objects.requireNonNull(stream), id.getValue());
       } catch (Throwable ex) {
           // 消息队列宕机处理
           messageQueueIdempotentHandler.delMessageProcessed(id.toString());
           log.error("记录短链接监控消费异常", ex);
       }
       //当完整执行完消费流程,才会设置完成标识。
       messageQueueIdempotentHandler.setAccomplish(id.toString());
   }

文章作者: KTpro
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 KTpro !
评论
  目录