RocketMQ LiteTopic 学习


2026-04-10 RocketMQ 发布 5.5.0,携带了一个新的 Feature:LiteTopic!

LiteTopic 学习

一句话理解

LiteTopic = 在一个 message.type=LITE 的父 Topic 下,用一个轻量子主题名 liteTopic 动态派生出海量“子主题”。Broker 不为每个子主题创建真正的 Topic 元数据,而是把它编码成一个 LMQ 名:%LMQ%$parentTopic$liteTopic。消息物理上仍只写一次 CommitLog,消费时按这个子 LMQ 做选择性读取与 POP。

快速结论

  • LiteTopic 很适合 A2A 这种“高基数、动态生成、生命周期短”的子通道模型。
  • 它不是独立协议,更像 RocketMQ 原生的一种轻量消息模型。
  • 它和 MQTT 的使用心智很像,但协议语义、存储形态、消费模型都不一样。
  • 它的消费模型不是经典 RocketMQ PushConsumer/PullConsumer + MessageQueue rebalance,而是:
    • 显式订阅 liteTopic 集合
    • group 级 offset
    • POP 驱动的选择性消费
  • 它底层不是新存储,而是复用 CommitLog + CQ + LMQ
  • 它最容易出现的代价不是单次读很贵,而是“候选 liteTopic 太多时的调度/扫描/试探式读取开销”。

图景

1. LiteTopic 是什么,适合什么场景

1.1 代码里怎么定义

核心命名逻辑在 LiteUtil

public static String toLmqName(String parentTopic, String liteTopic) {
    if (StringUtils.isEmpty(parentTopic) || StringUtils.isEmpty(liteTopic)) {
        return null;
    }
    return LITE_TOPIC_PREFIX + parentTopic + SEPARATOR + liteTopic;
}

其中:

  • LITE_TOPIC_PREFIX = "%LMQ%$"
  • 最终形态:%LMQ%$parentTopic$liteTopic

设计约束:

  • 父 Topic 必须是 TopicMessageType.LITE
  • 父 Topic 可以配置 lite.topic.expiration
  • 消费组通过 lite.bind.topic 绑定父 Topic

1.2 和普通 Topic / LMQ 的关系

  • LiteTopic 不是一个真正独立的 Topic 元数据对象
  • LiteTopic 底层一定是 LMQ
  • 但反过来,LMQ 不一定是 LiteTopic

所以更准确地说:

LiteTopic = LMQ + 父 Topic 命名空间 + 显式订阅 + 生命周期管理 + Lite 专用消费控制面

1.3 为什么适合 A2A

如果把 A2A 系统映射进来,一个很自然的建模方式是:

  • 父 Topic = 某个 agent 网络 / 某个租户 / 某个业务域
  • liteTopic = 某个会话、任务、agent 实例、tool call 流
  • group = 某个消费侧 agent 角色

适合的原因:

  • 子主题数量可以很大
  • 子主题可以动态创建,不需要显式建真实 Topic
  • 子主题可以自动过期清理
  • 消费者只需要关注自己的一部分 liteTopic

2. LiteTopic 和 MQTT / MQTT bridge 的关系

2.1 为什么看起来很像 MQTT

从“使用心智”上看,LiteTopic 确实很像 MQTT:

  • 父 Topic 下挂很多细粒度子主题
  • 消费者按子主题选择性订阅
  • 支持 wildcard / shared / exclusive 这类更偏主题路由的语义
  • 很适合 session / device / agent / channel 模型

所以可以把 LiteTopic 理解成:

RocketMQ 体系里的 MQTT-like 子主题模型

2.2 但它和 MQTT 不是一回事

维度 LiteTopic MQTT
本质 RocketMQ 原生轻量消息模型 独立协议
存储 CommitLog + CQ + LMQ Broker 自己的 topic/session 语义
消费 更偏 POP + invisibleTime + ack 协议级 QoS / session / retain
wildcard 有,但不是完整 MQTT topic filter 体系 协议级核心能力
retry 没有原生 retry topic MQTT 不走 RocketMQ 那套 retry topic 语义

2.3 rocketmq-iot-bridge 能不能满足 A2A

我目前的判断是:很多 A2A 场景是可以的。

如果你的需求更像:

  • agent 长连接在线
  • topic routing 很重要
  • wildcard/filter 很重要
  • 协议互通、设备/边缘/在线会话更重要

那 MQTT 或 MQTT bridge 很自然。

但 LiteTopic 的价值在于:

  • 不引入另一套协议面
  • 不跳出 RocketMQ 原有 broker/store/offset/运维体系
  • 直接在 RMQ 里支持“高基数子主题”

2.4 一个很重要的区分

MQTT bridge 更像:在 RocketMQ 之上提供 MQTT 协议入口。

LiteTopic 更像:RocketMQ 自己内部长出的一种轻量子主题消息模型。

所以:

  • 如果你要的是“MQTT 的协议语义”,bridge 更自然
  • 如果你要的是“RocketMQ 原生存储与治理下的动态子主题”,LiteTopic 更自然

3. 发布、订阅、消费是什么样的

3.1 发布形式

生产者仍然向父 Topic 发送消息,但需要带 __LITE_TOPIC

String liteTopic = oriProps.get(MessageConst.PROPERTY_LITE_TOPIC);
if (StringUtils.isNotEmpty(liteTopic)) {
    String lmqName = LiteUtil.toLmqName(requestHeader.getTopic(), liteTopic);
    oriProps.put(MessageConst.PROPERTY_INNER_MULTI_DISPATCH, lmqName);
}

也就是:

topic = parentTopic
property["__LITE_TOPIC"] = childLiteTopic
--> broker 转换为 INNER_MULTI_DISPATCH = %LMQ%$parentTopic$childLiteTopic

3.2 订阅形式

LiteTopic 的订阅不是“自动分配 MessageQueue”,而是显式同步 liteTopic 集合

控制面入口:

  • PARTIAL_ADD / PARTIAL_REMOVE
  • COMPLETE_ADD / COMPLETE_REMOVE

关键代码:

switch (entry.getAction()) {
    case PARTIAL_ADD:
        this.liteSubscriptionRegistry.addPartialSubscription(...);
        break;
    case COMPLETE_ADD:
        this.liteSubscriptionRegistry.addCompleteSubscription(...);
        break;
    case COMPLETE_REMOVE:
        this.liteSubscriptionRegistry.removeCompleteSubscription(clientId);
        break;
}

3.3 是否需要新版客户端

我的结论:

  • 发送侧:只要 SDK 能设置 __LITE_TOPIC,理论上就能发到 LiteTopic
  • 消费侧:基本需要理解 LiteTopic 新协议的新客户端或新 proxy

因为它依赖:

  • LITE_PULL_MESSAGE
  • POP_LITE_MESSAGE
  • liteTopic 字段
  • syncLiteSubscription
  • notifyUnsubscribeLite
  • ack / changeInvisible 时继续携带 liteTopic

当前仓库里“最完整的用户态语义”是 proxy gRPC:

  • ClientType.LITE_PUSH_CONSUMER
  • MessageModel.LITE_SELECTIVE

4. 底层存储:和 CQ / LMQ / CommitLog 的关系

4.1 先说结论

  • CommitLog:复用原有 CommitLog,消息物理上只写一次
  • CQ:复用现有 ConsumeQueue / RocksDB ConsumeQueue
  • LMQ:LiteTopic 的真实消费视角是一个 LMQ

所以 LiteTopic 不是一套全新存储,而是:

父 Topic + LMQ 多路分发 + Lite 控制面

4.2 写入链路

boolean isMultiDispatchMsg = messageStoreConfig.isEnableLmq() && msgInner.needDispatchLMQ();
if (isMultiDispatchMsg) {
    LmqDispatch.updateLmqOffsets(defaultMessageStore, msgInner);
}

配合 CQ 分发:

if (MultiDispatchUtils.checkMultiDispatchQueue(this.messageStore.getMessageStoreConfig(), request)) {
    multiDispatchLmqQueue(request, maxRetries);
}

写入流程可以概括成:

  1. 发送时带 __LITE_TOPIC
  2. broker 转成 INNER_MULTI_DISPATCH
  3. CommitLog 正常写一次
  4. 同时为目标 LMQ 分配 offset
  5. CQ 构建时把同一条消息额外 dispatch 到目标 LMQ CQ

4.3 消费读取时如何定位到 liteTopic

核心逻辑在 PullMessageProcessor

String storeTopic = topic;
if (StringUtils.isNotBlank(liteTopic)) {
    storeTopic = LiteUtil.toLmqName(topic, liteTopic);
}
messageStore.getMessageAsync(group, storeTopic, queueId, ...);

也就是说:

  • 用户请求里看到的是 parentTopic + liteTopic
  • 真正落到存储层时读的是 LMQ(parentTopic, liteTopic)

5. LiteTopic 的消费模型

这一部分最关键。

5.1 它和经典 RocketMQ Push/Pull 模型不是一回事

LiteTopic 不等于:

  • PushConsumer/PullConsumer
  • topic -> 多个 queue
  • 同组 consumer rebalance queue ownership

它更像:

显式订阅 liteTopic 集合 + group 级 offset + POP 驱动的选择性消费

5.2 为什么说它不是经典 rebalance

经典 RocketMQ cluster 模式的思维是:

  • topic 下有固定若干 queue
  • 同组 consumer 分摊 queue
  • queue ownership 会 rebalance

LiteTopic 这里不是这样:

  • 每个 liteTopic 实际对应一个 LMQ,通常就是 queueId = 0
  • 客户端先同步自己关心的 liteTopic 集合
  • broker 通过 LiteEventDispatcher 决定哪个 client 去消费哪个 liteTopic

关键代码:

SubscriberWrapper wrapper = liteSubscriptionRegistry.getAllSubscriber(group, lmqName);
...
selectAndDispatch(lmqName, wrapper.asListWrapper().getClients(), excludeClientId);

所以它更像:

  • broker 侧 dispatch / load-share

而不是:

  • client 侧 classic rebalance

5.3 有哪些订阅模式

lite.sub.model 现在有两种:

  • Shared
  • Exclusive

此外还有:

  • Wildcard

可以理解成:

  • Shared:同组多个 client 可以共享消费一个 liteTopic,broker 选一个 client 派发
  • Exclusive:同组下一个 liteTopic 只允许一个 client 占有,后来的会把前面的挤掉
  • Wildcard:不显式列出全部 liteTopic,而是按父 Topic 下的匹配集合做动态发现/分发

5.4 消费进度怎么确定

有 offset,而且是明确的 group 级 offset。

核心逻辑:

long offset = brokerController.getConsumerOffsetManager().queryOffset(group, lmqName, 0);

也就是说:

  • 进度粒度是 consumerGroup + lmqName + queueId(0)
  • 不是每个 client 自己一份
  • 同组共享同一个 liteTopic offset

这点和经典 cluster 模式有点像,但这里的“topic 单位”变成了 liteTopic 对应的 LMQ。

5.5 消费流程更偏 POP

LiteTopic 当前明显偏 POP_LITE_MESSAGE 路线:

Pair<StringBuilder, GetMessageResult> rst = popByClientId(...);

popByClientId 的思路是:

  1. 从 event/subscription 里拿到候选 liteTopic
  2. 逐个 popLiteTopic(...)
  3. 命中后返回消息

5.6 支持并发消费吗

支持,但不是经典“多 queue 并发 rebalance”。

并发来自:

  • 不同 liteTopic 可以被不同 client 同时消费
  • 一个 client 也可以在一次 POP 请求里从多个 liteTopic 拉到消息

5.7 支持顺序消费吗

支持,源码里直接写了:

/**
 * Pop lite implementation, support FIFO consuming.
 */

依赖的是:

  • PopConsumerLockService
  • ConsumerOrderInfoManager
  • checkBlock(...)
  • updateNextVisibleTime(...)

所以更准确地说:

  • 跨 liteTopic 可以并发
  • 单个 liteTopic 在需要时可以做 FIFO 顺序消费

5.8 重试队列、死信队列

这里非常容易误解。

原生 retry topic

LiteTopic 没有经典 RocketMQ retry topic

源码注释直接写了:

Lite Topic: ... implemented based on LMQ, which has no retry topic.

所以它不是:

  • 失败 -> %RETRY%group -> 多次后 %DLQ%group

这套经典 push consumer 语义。

失败恢复

LiteTopic 更像:

  • POP
  • invisibleTime
  • 没 ack 就重新可见
  • changeInvisibleTime 延长或 suspend

也就是 POP redelivery,不是 retry topic。

DLQ

有 DLQ 转发能力,尤其在 proxy/gRPC 路径里比较明确:

consumerSendMsgBackRequestHeader.setDelayLevel(-1);
consumerSendMsgBackRequestHeader.setOriginTopic(handle.getRealTopic(topicName, groupName));
consumerSendMsgBackRequestHeader.setMaxReconsumeTimes(0);

意思是:

  • 可以把某条 Lite 消息直接转去 DLQ
  • 成功后再 ack 原消息

所以结论是:

  • 没有原生 Lite retry topic
  • 有 DLQ 转发能力
  • 失败恢复主要靠 invisible/redelivery

6. 读放大问题

6.1 先说结论

LiteTopic 有更高的读放大风险,但不是单次读取一定更重,而是:

候选 liteTopic 太多、命中率太低时,逻辑层面的试探式读取和调度成本会膨胀。

6.2 单次精确读取不一定更贵

如果客户端已经明确指定了某个 liteTopic,读取链路只是:

  • parentTopic + liteTopic
  • 改写成目标 LMQ
  • 对这个 LMQ 做一次正常 getMessage

这种情况下,单次存储读取本身不一定比普通 queue 更重。

6.3 真正容易放大的地方:候选集合太大

PopLiteMessageProcessor.popByClientId()

Iterator<String> iterator = liteEventDispatcher.getEventIterator(clientId);
while (total.get() < maxNum && iterator.hasNext()) {
    String lmqName = iterator.next();
    Pair<StringBuilder, GetMessageResult> pair = popLiteTopic(parentTopic, clientHost, group, lmqName, ...);
    ...
}

这意味着:

  • 如果一个 client 订阅了很多 liteTopic
  • 但真正有消息的只有少数
  • broker 就需要试很多个 liteTopic 才能命中

这就是一种逻辑上的读放大。

6.4 event mode 的作用

如果 enableLiteEventMode=true

  • 迭代器优先遍历“已被事件分发命中的 liteTopic”
  • 能减少盲读

如果 enableLiteEventMode=false

return liteSubscription != null && liteSubscription.getLiteTopicSet() != null ?
    new LiteSubscriptionIterator(liteSubscription.getTopic(), liteSubscription.getLiteTopicSet().iterator())
    : Collections.emptyIterator();

也就是直接遍历整个订阅集。
当订阅集很大时,试探式读取会明显放大。

6.5 wildcard 更容易放大

wildcard 模式下最重的是 full dispatch:

liteLifecycleManager.forEachLiteTopic(function);

也就是:

  • 扫很多 liteTopic
  • 检查 lag / offset / 是否属于该父 Topic
  • 再决定是否派发

所以 wildcard 更像:

  • 扫描放大
  • 元数据放大
  • 调度放大

而不只是单次 CommitLog 读取放大。

6.6 如何理解这种权衡

LiteTopic 本质是在两个成本之间做取舍:

  1. 大 Topic 拉下来再客户端过滤
  2. broker 帮你在很多小 liteTopic 里做选择性消费

LiteTopic 选的是第 2 条路。

因此:

  • 当订阅集合小、event mode 有效、命中率高时,LiteTopic 可能更省
  • 当订阅集合大、命中率低、wildcard 多时,LiteTopic 更容易出现调度/扫描放大

7. 对 broker / store / proxy 做了哪些适配

7.1 broker 新增组件

BrokerController 里新增并启动了:

  • liteLifecycleManager
  • liteSubscriptionRegistry
  • liteEventDispatcher
  • liteManagerProcessor
  • liteSubscriptionCtlProcessor
  • popLiteMessageProcessor

同时注册了:

  • LITE_PULL_MESSAGE
  • POP_LITE_MESSAGE
  • 一组 Lite 管理/查询请求

7.2 broker 数据面适配

  • 发送:SendMessageProcessor 增加 Lite 属性转译
  • 拉取:PullMessageProcessor 读取前改写成 LMQ 名
  • POP:新增 PopLiteMessageProcessor
  • ack / changeInvisible:增加 liteTopic 分支
  • 到达通知:Lite LMQ 消息交给 LiteEventDispatcher

7.3 broker 控制面适配

  • LiteSubscriptionCtlProcessor:处理订阅增删
  • LiteSubscriptionRegistryImpl:维护 client/liteTopic/wildcard 索引
  • LiteEventDispatcher:做事件分发
  • LiteManagerProcessor:做诊断查询和管理

7.4 store 适配

没有新造一套 MessageStore,而是复用:

  • MessageStore
  • ConsumeQueueStoreInterface
  • CommitLog
  • ConsumeQueue
  • RocksDBConsumeQueueStore

7.5 proxy 适配

  • LiteSubscriptionService:把订阅同步到所有可读 broker
  • gRPC 增加:
    • LITE_PUSH_CONSUMER
    • syncLiteSubscription
    • notifyUnsubscribeLite
    • popLiteMessage

8. RocksDBLiteLifecycleManagerLiteLifecycleManager 的关系

8.1 共同点

  • 都继承 AbstractLiteLifecycleManager
  • 对外职责一致:
    • 查 max offset
    • 按父 Topic 收集 liteTopic
    • 遍历 liteTopic
    • TTL 扫描和清理

8.2 区别

LiteLifecycleManager

  • 面向文件型 CQ
  • 遍历 consumeQueueTable

RocksDBLiteLifecycleManager

  • 面向 RocksDB CQ
  • 也支持 CombineConsumeQueueStore + combineCQUseRocksdbForLmq=true
  • 主要基于 RocksDB 的 offset table 看 max offset

8.3 它们是不是不同存储模式

  • 是“同一个 LiteTopic 模型在不同 CQ 后端上的两种 lifecycle 实现”
  • 不是 LiteTopic 自己分裂成两种业务语义

9. CombineConsumeQueueStore 的意义

2c2cc921b 的核心含义是:

  • CombineConsumeQueueStore 下,LMQ 不必强制迁移到统一 RocksDB CQ 才能工作
  • 但也可以通过 combineCQUseRocksdbForLmq 让 LMQ 专门走 RocksDB CQ

所以 Lite 运行底座可以是:

  1. 文件 CQ
  2. RocksDB CQ
  3. CombineConsumeQueueStore

10. 我目前的总体判断

  • LiteTopic 不是为了替代普通 Topic,而是为了在一个父 Topic 下承载大量动态、轻量、可过期的子通道
  • 它最关键的设计点不是“新存储”,而是“复用 CommitLog / LMQ / CQ,只新增轻量控制面和生命周期管理”
  • 从用户心智上,它很像 MQTT
  • 从系统实现上,它仍然是 RocketMQ 原生模型,不是 MQTT 协议兼容层
  • 对 A2A 来说,它本质上是在 RocketMQ 里提供“高基数子主题 + 显式订阅 + group 级 POP 消费”的能力

关键代码索引

  • 命名与语义
    • common/src/main/java/org/apache/rocketmq/common/lite/LiteUtil.java
    • common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
    • common/src/main/java/org/apache/rocketmq/common/TopicAttributes.java
    • common/src/main/java/org/apache/rocketmq/common/SubscriptionGroupAttributes.java
  • Broker 入口
    • broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
    • broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
    • broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
    • broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java
    • broker/src/main/java/org/apache/rocketmq/broker/processor/LiteSubscriptionCtlProcessor.java
    • broker/src/main/java/org/apache/rocketmq/broker/processor/LiteManagerProcessor.java
  • Broker 内部组件
    • broker/src/main/java/org/apache/rocketmq/broker/lite/LiteSubscriptionRegistryImpl.java
    • broker/src/main/java/org/apache/rocketmq/broker/lite/LiteEventDispatcher.java
    • broker/src/main/java/org/apache/rocketmq/broker/lite/AbstractLiteLifecycleManager.java
    • broker/src/main/java/org/apache/rocketmq/broker/lite/LiteLifecycleManager.java
    • broker/src/main/java/org/apache/rocketmq/broker/lite/RocksDBLiteLifecycleManager.java
  • Store
    • store/src/main/java/org/apache/rocketmq/store/CommitLog.java
    • store/src/main/java/org/apache/rocketmq/store/LmqDispatch.java
    • store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
    • store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
    • store/src/main/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStore.java
  • Proxy
    • proxy/src/main/java/org/apache/rocketmq/proxy/service/lite/LiteSubscriptionService.java
    • proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
    • proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
    • proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java
    • proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java

相关提交

  • 1b6a919bd [RIP-83] Lite Topic: A New Message Model (#9800)
  • 614b81693 Support wildcard subscription and consumer suspend for LiteTopic (#10204)
  • 2c2cc921b Support LMQ in CombineConsumeQueueStore without migration to RocksDB CQ (#10174)

后续可以继续深挖的点

  • LiteSharding 的 broker 分片策略和跨 broker 行为
  • wildcard/full dispatch 在大量 liteTopic 下的成本边界
  • LiteTopic 和 MQTT bridge 在 A2A 里的架构分工
  • LiteTopic 的写放大、读放大、元数据放大分别发生在哪里

文章作者: KTpro
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 KTpro !
  目录