2026-04-10 RocketMQ 发布 5.5.0,携带了一个新的 Feature:LiteTopic!
LiteTopic 学习
一句话理解
LiteTopic = 在一个 message.type=LITE 的父 Topic 下,用一个轻量子主题名 liteTopic 动态派生出海量“子主题”。Broker 不为每个子主题创建真正的 Topic 元数据,而是把它编码成一个 LMQ 名:%LMQ%$parentTopic$liteTopic。消息物理上仍只写一次 CommitLog,消费时按这个子 LMQ 做选择性读取与 POP。
快速结论
- LiteTopic 很适合 A2A 这种“高基数、动态生成、生命周期短”的子通道模型。
- 它不是独立协议,更像 RocketMQ 原生的一种轻量消息模型。
- 它和 MQTT 的使用心智很像,但协议语义、存储形态、消费模型都不一样。
- 它的消费模型不是经典 RocketMQ
PushConsumer/PullConsumer + MessageQueue rebalance,而是:- 显式订阅 liteTopic 集合
- group 级 offset
- POP 驱动的选择性消费
- 它底层不是新存储,而是复用
CommitLog + CQ + LMQ。 - 它最容易出现的代价不是单次读很贵,而是“候选 liteTopic 太多时的调度/扫描/试探式读取开销”。
图景

1. LiteTopic 是什么,适合什么场景
1.1 代码里怎么定义
核心命名逻辑在 LiteUtil:
public static String toLmqName(String parentTopic, String liteTopic) {
if (StringUtils.isEmpty(parentTopic) || StringUtils.isEmpty(liteTopic)) {
return null;
}
return LITE_TOPIC_PREFIX + parentTopic + SEPARATOR + liteTopic;
}
其中:
LITE_TOPIC_PREFIX = "%LMQ%$"- 最终形态:
%LMQ%$parentTopic$liteTopic
设计约束:
- 父 Topic 必须是
TopicMessageType.LITE - 父 Topic 可以配置
lite.topic.expiration - 消费组通过
lite.bind.topic绑定父 Topic
1.2 和普通 Topic / LMQ 的关系
- LiteTopic 不是一个真正独立的 Topic 元数据对象
- LiteTopic 底层一定是 LMQ
- 但反过来,LMQ 不一定是 LiteTopic
所以更准确地说:
LiteTopic = LMQ + 父 Topic 命名空间 + 显式订阅 + 生命周期管理 + Lite 专用消费控制面
1.3 为什么适合 A2A
如果把 A2A 系统映射进来,一个很自然的建模方式是:
- 父 Topic = 某个 agent 网络 / 某个租户 / 某个业务域
- liteTopic = 某个会话、任务、agent 实例、tool call 流
- group = 某个消费侧 agent 角色
适合的原因:
- 子主题数量可以很大
- 子主题可以动态创建,不需要显式建真实 Topic
- 子主题可以自动过期清理
- 消费者只需要关注自己的一部分 liteTopic
2. LiteTopic 和 MQTT / MQTT bridge 的关系
2.1 为什么看起来很像 MQTT
从“使用心智”上看,LiteTopic 确实很像 MQTT:
- 父 Topic 下挂很多细粒度子主题
- 消费者按子主题选择性订阅
- 支持 wildcard / shared / exclusive 这类更偏主题路由的语义
- 很适合 session / device / agent / channel 模型
所以可以把 LiteTopic 理解成:
RocketMQ 体系里的 MQTT-like 子主题模型
2.2 但它和 MQTT 不是一回事
| 维度 | LiteTopic | MQTT |
|---|---|---|
| 本质 | RocketMQ 原生轻量消息模型 | 独立协议 |
| 存储 | CommitLog + CQ + LMQ |
Broker 自己的 topic/session 语义 |
| 消费 | 更偏 POP + invisibleTime + ack | 协议级 QoS / session / retain |
| wildcard | 有,但不是完整 MQTT topic filter 体系 | 协议级核心能力 |
| retry | 没有原生 retry topic | MQTT 不走 RocketMQ 那套 retry topic 语义 |
2.3 rocketmq-iot-bridge 能不能满足 A2A
我目前的判断是:很多 A2A 场景是可以的。
如果你的需求更像:
- agent 长连接在线
- topic routing 很重要
- wildcard/filter 很重要
- 协议互通、设备/边缘/在线会话更重要
那 MQTT 或 MQTT bridge 很自然。
但 LiteTopic 的价值在于:
- 不引入另一套协议面
- 不跳出 RocketMQ 原有 broker/store/offset/运维体系
- 直接在 RMQ 里支持“高基数子主题”
2.4 一个很重要的区分
MQTT bridge 更像:在 RocketMQ 之上提供 MQTT 协议入口。
LiteTopic 更像:RocketMQ 自己内部长出的一种轻量子主题消息模型。
所以:
- 如果你要的是“MQTT 的协议语义”,bridge 更自然
- 如果你要的是“RocketMQ 原生存储与治理下的动态子主题”,LiteTopic 更自然
3. 发布、订阅、消费是什么样的
3.1 发布形式
生产者仍然向父 Topic 发送消息,但需要带 __LITE_TOPIC:
String liteTopic = oriProps.get(MessageConst.PROPERTY_LITE_TOPIC);
if (StringUtils.isNotEmpty(liteTopic)) {
String lmqName = LiteUtil.toLmqName(requestHeader.getTopic(), liteTopic);
oriProps.put(MessageConst.PROPERTY_INNER_MULTI_DISPATCH, lmqName);
}
也就是:
topic = parentTopic
property["__LITE_TOPIC"] = childLiteTopic
--> broker 转换为 INNER_MULTI_DISPATCH = %LMQ%$parentTopic$childLiteTopic
3.2 订阅形式
LiteTopic 的订阅不是“自动分配 MessageQueue”,而是显式同步 liteTopic 集合。
控制面入口:
PARTIAL_ADD / PARTIAL_REMOVECOMPLETE_ADD / COMPLETE_REMOVE
关键代码:
switch (entry.getAction()) {
case PARTIAL_ADD:
this.liteSubscriptionRegistry.addPartialSubscription(...);
break;
case COMPLETE_ADD:
this.liteSubscriptionRegistry.addCompleteSubscription(...);
break;
case COMPLETE_REMOVE:
this.liteSubscriptionRegistry.removeCompleteSubscription(clientId);
break;
}
3.3 是否需要新版客户端
我的结论:
- 发送侧:只要 SDK 能设置
__LITE_TOPIC,理论上就能发到 LiteTopic - 消费侧:基本需要理解 LiteTopic 新协议的新客户端或新 proxy
因为它依赖:
LITE_PULL_MESSAGEPOP_LITE_MESSAGEliteTopic字段syncLiteSubscriptionnotifyUnsubscribeLite- ack / changeInvisible 时继续携带
liteTopic
当前仓库里“最完整的用户态语义”是 proxy gRPC:
ClientType.LITE_PUSH_CONSUMERMessageModel.LITE_SELECTIVE
4. 底层存储:和 CQ / LMQ / CommitLog 的关系
4.1 先说结论
- CommitLog:复用原有 CommitLog,消息物理上只写一次
- CQ:复用现有 ConsumeQueue / RocksDB ConsumeQueue
- LMQ:LiteTopic 的真实消费视角是一个 LMQ
所以 LiteTopic 不是一套全新存储,而是:
父 Topic + LMQ 多路分发 + Lite 控制面
4.2 写入链路
boolean isMultiDispatchMsg = messageStoreConfig.isEnableLmq() && msgInner.needDispatchLMQ();
if (isMultiDispatchMsg) {
LmqDispatch.updateLmqOffsets(defaultMessageStore, msgInner);
}
配合 CQ 分发:
if (MultiDispatchUtils.checkMultiDispatchQueue(this.messageStore.getMessageStoreConfig(), request)) {
multiDispatchLmqQueue(request, maxRetries);
}
写入流程可以概括成:
- 发送时带
__LITE_TOPIC - broker 转成
INNER_MULTI_DISPATCH - CommitLog 正常写一次
- 同时为目标 LMQ 分配 offset
- CQ 构建时把同一条消息额外 dispatch 到目标 LMQ CQ
4.3 消费读取时如何定位到 liteTopic
核心逻辑在 PullMessageProcessor:
String storeTopic = topic;
if (StringUtils.isNotBlank(liteTopic)) {
storeTopic = LiteUtil.toLmqName(topic, liteTopic);
}
messageStore.getMessageAsync(group, storeTopic, queueId, ...);
也就是说:
- 用户请求里看到的是
parentTopic + liteTopic - 真正落到存储层时读的是
LMQ(parentTopic, liteTopic)
5. LiteTopic 的消费模型
这一部分最关键。
5.1 它和经典 RocketMQ Push/Pull 模型不是一回事
LiteTopic 不等于:
PushConsumer/PullConsumertopic -> 多个 queue同组 consumer rebalance queue ownership
它更像:
显式订阅 liteTopic 集合 + group 级 offset + POP 驱动的选择性消费
5.2 为什么说它不是经典 rebalance
经典 RocketMQ cluster 模式的思维是:
- topic 下有固定若干 queue
- 同组 consumer 分摊 queue
- queue ownership 会 rebalance
LiteTopic 这里不是这样:
- 每个 liteTopic 实际对应一个 LMQ,通常就是
queueId = 0 - 客户端先同步自己关心的 liteTopic 集合
- broker 通过
LiteEventDispatcher决定哪个 client 去消费哪个 liteTopic
关键代码:
SubscriberWrapper wrapper = liteSubscriptionRegistry.getAllSubscriber(group, lmqName);
...
selectAndDispatch(lmqName, wrapper.asListWrapper().getClients(), excludeClientId);
所以它更像:
- broker 侧 dispatch / load-share
而不是:
- client 侧 classic rebalance
5.3 有哪些订阅模式
lite.sub.model 现在有两种:
SharedExclusive
此外还有:
Wildcard
可以理解成:
Shared:同组多个 client 可以共享消费一个 liteTopic,broker 选一个 client 派发Exclusive:同组下一个 liteTopic 只允许一个 client 占有,后来的会把前面的挤掉Wildcard:不显式列出全部 liteTopic,而是按父 Topic 下的匹配集合做动态发现/分发
5.4 消费进度怎么确定
有 offset,而且是明确的 group 级 offset。
核心逻辑:
long offset = brokerController.getConsumerOffsetManager().queryOffset(group, lmqName, 0);
也就是说:
- 进度粒度是
consumerGroup + lmqName + queueId(0) - 不是每个 client 自己一份
- 同组共享同一个 liteTopic offset
这点和经典 cluster 模式有点像,但这里的“topic 单位”变成了 liteTopic 对应的 LMQ。
5.5 消费流程更偏 POP
LiteTopic 当前明显偏 POP_LITE_MESSAGE 路线:
Pair<StringBuilder, GetMessageResult> rst = popByClientId(...);
而 popByClientId 的思路是:
- 从 event/subscription 里拿到候选 liteTopic
- 逐个
popLiteTopic(...) - 命中后返回消息
5.6 支持并发消费吗
支持,但不是经典“多 queue 并发 rebalance”。
并发来自:
- 不同 liteTopic 可以被不同 client 同时消费
- 一个 client 也可以在一次 POP 请求里从多个 liteTopic 拉到消息
5.7 支持顺序消费吗
支持,源码里直接写了:
/**
* Pop lite implementation, support FIFO consuming.
*/
依赖的是:
PopConsumerLockServiceConsumerOrderInfoManagercheckBlock(...)updateNextVisibleTime(...)
所以更准确地说:
- 跨 liteTopic 可以并发
- 单个 liteTopic 在需要时可以做 FIFO 顺序消费
5.8 重试队列、死信队列
这里非常容易误解。
原生 retry topic
LiteTopic 没有经典 RocketMQ retry topic。
源码注释直接写了:
Lite Topic: ... implemented based on LMQ, which has no retry topic.
所以它不是:
- 失败 ->
%RETRY%group-> 多次后%DLQ%group
这套经典 push consumer 语义。
失败恢复
LiteTopic 更像:
- POP
- invisibleTime
- 没 ack 就重新可见
changeInvisibleTime延长或 suspend
也就是 POP redelivery,不是 retry topic。
DLQ
有 DLQ 转发能力,尤其在 proxy/gRPC 路径里比较明确:
consumerSendMsgBackRequestHeader.setDelayLevel(-1);
consumerSendMsgBackRequestHeader.setOriginTopic(handle.getRealTopic(topicName, groupName));
consumerSendMsgBackRequestHeader.setMaxReconsumeTimes(0);
意思是:
- 可以把某条 Lite 消息直接转去 DLQ
- 成功后再 ack 原消息
所以结论是:
- 没有原生 Lite retry topic
- 有 DLQ 转发能力
- 失败恢复主要靠 invisible/redelivery
6. 读放大问题
6.1 先说结论
LiteTopic 有更高的读放大风险,但不是单次读取一定更重,而是:
候选 liteTopic 太多、命中率太低时,逻辑层面的试探式读取和调度成本会膨胀。
6.2 单次精确读取不一定更贵
如果客户端已经明确指定了某个 liteTopic,读取链路只是:
parentTopic + liteTopic- 改写成目标 LMQ
- 对这个 LMQ 做一次正常
getMessage
这种情况下,单次存储读取本身不一定比普通 queue 更重。
6.3 真正容易放大的地方:候选集合太大
PopLiteMessageProcessor.popByClientId():
Iterator<String> iterator = liteEventDispatcher.getEventIterator(clientId);
while (total.get() < maxNum && iterator.hasNext()) {
String lmqName = iterator.next();
Pair<StringBuilder, GetMessageResult> pair = popLiteTopic(parentTopic, clientHost, group, lmqName, ...);
...
}
这意味着:
- 如果一个 client 订阅了很多 liteTopic
- 但真正有消息的只有少数
- broker 就需要试很多个 liteTopic 才能命中
这就是一种逻辑上的读放大。
6.4 event mode 的作用
如果 enableLiteEventMode=true:
- 迭代器优先遍历“已被事件分发命中的 liteTopic”
- 能减少盲读
如果 enableLiteEventMode=false:
return liteSubscription != null && liteSubscription.getLiteTopicSet() != null ?
new LiteSubscriptionIterator(liteSubscription.getTopic(), liteSubscription.getLiteTopicSet().iterator())
: Collections.emptyIterator();
也就是直接遍历整个订阅集。
当订阅集很大时,试探式读取会明显放大。
6.5 wildcard 更容易放大
wildcard 模式下最重的是 full dispatch:
liteLifecycleManager.forEachLiteTopic(function);
也就是:
- 扫很多 liteTopic
- 检查 lag / offset / 是否属于该父 Topic
- 再决定是否派发
所以 wildcard 更像:
- 扫描放大
- 元数据放大
- 调度放大
而不只是单次 CommitLog 读取放大。
6.6 如何理解这种权衡
LiteTopic 本质是在两个成本之间做取舍:
- 大 Topic 拉下来再客户端过滤
- broker 帮你在很多小 liteTopic 里做选择性消费
LiteTopic 选的是第 2 条路。
因此:
- 当订阅集合小、event mode 有效、命中率高时,LiteTopic 可能更省
- 当订阅集合大、命中率低、wildcard 多时,LiteTopic 更容易出现调度/扫描放大
7. 对 broker / store / proxy 做了哪些适配
7.1 broker 新增组件
BrokerController 里新增并启动了:
liteLifecycleManagerliteSubscriptionRegistryliteEventDispatcherliteManagerProcessorliteSubscriptionCtlProcessorpopLiteMessageProcessor
同时注册了:
LITE_PULL_MESSAGEPOP_LITE_MESSAGE- 一组 Lite 管理/查询请求
7.2 broker 数据面适配
- 发送:
SendMessageProcessor增加 Lite 属性转译 - 拉取:
PullMessageProcessor读取前改写成 LMQ 名 - POP:新增
PopLiteMessageProcessor - ack / changeInvisible:增加
liteTopic分支 - 到达通知:Lite LMQ 消息交给
LiteEventDispatcher
7.3 broker 控制面适配
LiteSubscriptionCtlProcessor:处理订阅增删LiteSubscriptionRegistryImpl:维护 client/liteTopic/wildcard 索引LiteEventDispatcher:做事件分发LiteManagerProcessor:做诊断查询和管理
7.4 store 适配
没有新造一套 MessageStore,而是复用:
MessageStoreConsumeQueueStoreInterfaceCommitLogConsumeQueueRocksDBConsumeQueueStore
7.5 proxy 适配
LiteSubscriptionService:把订阅同步到所有可读 broker- gRPC 增加:
LITE_PUSH_CONSUMERsyncLiteSubscriptionnotifyUnsubscribeLitepopLiteMessage
8. RocksDBLiteLifecycleManager 和 LiteLifecycleManager 的关系
8.1 共同点
- 都继承
AbstractLiteLifecycleManager - 对外职责一致:
- 查 max offset
- 按父 Topic 收集 liteTopic
- 遍历 liteTopic
- TTL 扫描和清理
8.2 区别
LiteLifecycleManager
- 面向文件型 CQ
- 遍历
consumeQueueTable
RocksDBLiteLifecycleManager
- 面向 RocksDB CQ
- 也支持
CombineConsumeQueueStore + combineCQUseRocksdbForLmq=true - 主要基于 RocksDB 的 offset table 看 max offset
8.3 它们是不是不同存储模式
- 是“同一个 LiteTopic 模型在不同 CQ 后端上的两种 lifecycle 实现”
- 不是 LiteTopic 自己分裂成两种业务语义
9. CombineConsumeQueueStore 的意义
2c2cc921b 的核心含义是:
CombineConsumeQueueStore下,LMQ 不必强制迁移到统一 RocksDB CQ 才能工作- 但也可以通过
combineCQUseRocksdbForLmq让 LMQ 专门走 RocksDB CQ
所以 Lite 运行底座可以是:
- 文件 CQ
- RocksDB CQ
- CombineConsumeQueueStore
10. 我目前的总体判断
- LiteTopic 不是为了替代普通 Topic,而是为了在一个父 Topic 下承载大量动态、轻量、可过期的子通道
- 它最关键的设计点不是“新存储”,而是“复用 CommitLog / LMQ / CQ,只新增轻量控制面和生命周期管理”
- 从用户心智上,它很像 MQTT
- 从系统实现上,它仍然是 RocketMQ 原生模型,不是 MQTT 协议兼容层
- 对 A2A 来说,它本质上是在 RocketMQ 里提供“高基数子主题 + 显式订阅 + group 级 POP 消费”的能力
关键代码索引
- 命名与语义
common/src/main/java/org/apache/rocketmq/common/lite/LiteUtil.javacommon/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.javacommon/src/main/java/org/apache/rocketmq/common/TopicAttributes.javacommon/src/main/java/org/apache/rocketmq/common/SubscriptionGroupAttributes.java
- Broker 入口
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.javabroker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.javabroker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.javabroker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.javabroker/src/main/java/org/apache/rocketmq/broker/processor/LiteSubscriptionCtlProcessor.javabroker/src/main/java/org/apache/rocketmq/broker/processor/LiteManagerProcessor.java
- Broker 内部组件
broker/src/main/java/org/apache/rocketmq/broker/lite/LiteSubscriptionRegistryImpl.javabroker/src/main/java/org/apache/rocketmq/broker/lite/LiteEventDispatcher.javabroker/src/main/java/org/apache/rocketmq/broker/lite/AbstractLiteLifecycleManager.javabroker/src/main/java/org/apache/rocketmq/broker/lite/LiteLifecycleManager.javabroker/src/main/java/org/apache/rocketmq/broker/lite/RocksDBLiteLifecycleManager.java
- Store
store/src/main/java/org/apache/rocketmq/store/CommitLog.javastore/src/main/java/org/apache/rocketmq/store/LmqDispatch.javastore/src/main/java/org/apache/rocketmq/store/ConsumeQueue.javastore/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.javastore/src/main/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStore.java
- Proxy
proxy/src/main/java/org/apache/rocketmq/proxy/service/lite/LiteSubscriptionService.javaproxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.javaproxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.javaproxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.javaproxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java
相关提交
1b6a919bd[RIP-83] Lite Topic: A New Message Model (#9800)614b81693Support wildcard subscription and consumer suspend for LiteTopic (#10204)2c2cc921bSupport LMQ in CombineConsumeQueueStore without migration to RocksDB CQ (#10174)
后续可以继续深挖的点
LiteSharding的 broker 分片策略和跨 broker 行为- wildcard/full dispatch 在大量 liteTopic 下的成本边界
- LiteTopic 和 MQTT bridge 在 A2A 里的架构分工
- LiteTopic 的写放大、读放大、元数据放大分别发生在哪里