RocketMQ 怎么实现消息分发的?

RocketMQ 怎么实现消息分发的?RocketMQ 怎么实现消息分发的?

RocketMQ的消息分发机制是其核心设计之一,我将从架构设计、队列模型、负载均衡、消费模式和高级特性五个层面,为您深入解析。


一、 RocketMQ消息分发的核心架构

1.1 核心组件角色

生产者(Producer) → 主题(Topic) → 队列(MessageQueue) → 消费者组(ConsumerGroup) ← 消费者(Consumer)
                              ↑
                           Broker集群

关键组件解析

  • Topic(主题):消息的逻辑分类,生产者发送消息到Topic,消费者订阅Topic
  • MessageQueue(消息队列):Topic的物理分区,消息存储和分发的基本单位
  • ConsumerGroup(消费者组):一组协同工作的消费者,共同消费一个Topic
  • Broker:消息存储和转发服务器,每个Broker包含多个Queue

1.2 数据流架构

// 生产者发送消息到Topic的某个Queue
Message msg = new Message("OrderTopic", "订单创建", orderJson.getBytes());
SendResult result = producer.send(msg);

// 消费者订阅Topic,从分配的Queue拉取消息
consumer.subscribe("OrderTopic", "*");  // *表示消费所有Tag

二、 队列模型:消息分发的基石

2.1 Topic与Queue的关系

// 创建Topic时可以指定Queue数量
TopicConfig topicConfig = new TopicConfig();
topicConfig.setTopicName("OrderTopic");
topicConfig.setWriteQueueNums(8);   // 8个写队列
topicConfig.setReadQueueNums(8);    // 8个读队列

// 实际存储结构
OrderTopic/
  ├── Queue0  (Broker-a)
  ├── Queue1  (Broker-a)
  ├── Queue2  (Broker-b)  // Queue分布在多个Broker
  └── Queue3  (Broker-b)

核心特性

  • 队列是并行处理的最小单位:一个Queue只能被一个消费者线程处理
  • 队列数量决定最大并发度:8个Queue → 最多8个消费者线程并发消费
  • 队列分布实现负载均衡:Queue分布在多个Broker,避免单点瓶颈

2.2 队列选择策略

// 生产者发送时选择Queue的策略
public class OrderProducer {
    // 1. 轮询策略(默认):平均分布到所有Queue
    SendResult sendRoundRobin(Message msg) {
        return producer.send(msg);  // 自动轮询选择Queue
    }
    
    // 2. 按Key哈希:相同业务Key的消息进入同一Queue
    SendResult sendByOrderId(Message msg, String orderId) {
        msg.setKeys(orderId);  // 设置业务Key
        return producer.send(msg, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, 
                                     Message msg, Object arg) {
                int index = Math.abs(orderId.hashCode()) % mqs.size();
                return mqs.get(index);
            }
        }, orderId);
    }
    
    // 3. 手动指定Queue(特殊场景)
    SendResult sendToSpecificQueue(Message msg, int queueId) {
        return producer.send(msg, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, 
                                     Message msg, Object arg) {
                return mqs.get(queueId);
            }
        }, null);
    }
}

三、 消费者负载均衡机制

3.1 Rebalance(重平衡)机制

触发条件

  • 消费者启动、停止
  • 消费者数量变化
  • Topic的Queue数量变化

重平衡过程

// 伪代码:消费者端的Rebalance逻辑
public void doRebalance() {
    // 1. 获取Topic的所有Queue
    List<MessageQueue> allQueues = mQClientFactory.getTopicAllQueues("OrderTopic");
    
    // 2. 获取消费者组的所有消费者实例
    List<String> allConsumers = getConsumerList("OrderGroup");
    
    // 3. 根据策略分配Queue
    Map<String, List<MessageQueue>> allocation = 
        strategy.allocate("OrderGroup", currentConsumerId, allQueues, allConsumers);
    
    // 4. 更新本地消费的Queue列表
    updateProcessQueueTable(allocation.get(currentConsumerId));
}

3.2 负载均衡策略

RocketMQ提供了多种分配策略:

// 1. 平均分配(默认策略)
AllocateMessageQueueAveragely strategy = new AllocateMessageQueueAveragely();
// 示例:8个Queue,3个消费者
// Consumer1: Queue0, Queue1, Queue2
// Consumer2: Queue3, Queue4, Queue5  
// Consumer3: Queue6, Queue7

// 2. 平均轮询分配
AllocateMessageQueueAveragelyByCircle strategy = 
    new AllocateMessageQueueAveragelyByCircle();
// Consumer1: Queue0, Queue3, Queue6
// Consumer2: Queue1, Queue4, Queue7
// Consumer3: Queue2, Queue5

// 3. 根据机房/机器名分配(避免跨机房消费)
AllocateMessageQueueByMachineRoom strategy = 
    new AllocateMessageQueueByMachineRoom();
// 只分配同机房的Queue

// 4. 一致性哈希分配(减少Rebalance影响)
AllocateMessageQueueConsistentHash strategy = 
    new AllocateMessageQueueConsistentHash();

// 配置策略
consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());

3.3 消费者组与消费模式

// 推模式消费者(常用)
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("OrderGroup");
pushConsumer.subscribe("OrderTopic", "*");
pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(
        List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        // 处理消息
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

// 拉模式消费者(需手动控制)
DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer("OrderGroup");
pullConsumer.start();

// 手动拉取指定Queue的消息
PullResult pullResult = pullConsumer.pull(
    mq,              // 指定MessageQueue
    "*",             // 订阅表达式
    nextOffset,      // 消费偏移量
    32               // 最大拉取数量
);

四、 消息过滤机制

4.1 Tag过滤(一级过滤)

// 生产者:发送时设置Tag
Message msg = new Message("OrderTopic", "PAY_SUCCESS", orderJson.getBytes());

// 消费者:订阅时指定Tag
// 1. 单Tag订阅
consumer.subscribe("OrderTopic", "PAY_SUCCESS");

// 2. 多Tag订阅(使用||分隔)
consumer.subscribe("OrderTopic", "PAY_SUCCESS || PAY_FAILED");

// 3. 订阅所有Tag
consumer.subscribe("OrderTopic", "*");

4.2 SQL92过滤(二级过滤,Broker端)

// 生产者:设置消息属性
Message msg = new Message("OrderTopic", "TAG1", orderJson.getBytes());
msg.putUserProperty("amount", "1000");
msg.putUserProperty("region", "beijing");

// 消费者:使用SQL表达式过滤
consumer.subscribe("OrderTopic", 
    MessageSelector.bySql("amount > 500 AND region = 'beijing'"));

过滤原理

1. 生产者发送消息,携带Tag和Properties
2. Broker存储消息时,同时存储过滤信息
3. 消费者拉取时,Broker根据订阅的过滤条件筛选消息
4. 只有符合条件的消息才会发送给消费者

4.3 过滤服务器(复杂过滤场景)

对于复杂的过滤逻辑,可以部署独立的Filter Server

// Filter Server实现过滤接口
public class CustomFilter implements MessageFilter {
    @Override
    public boolean match(MessageExt msg, FilterContext context) {
        // 自定义过滤逻辑
        String body = new String(msg.getBody());
        return body.contains("VIP");  // 只消费VIP用户消息
    }
}

五、 顺序消息分发

5.1 全局顺序消息(性能受限)

// 创建只有一个Queue的Topic
TopicConfig config = new TopicConfig();
config.setTopicName("GlobalOrderTopic");
config.setWriteQueueNums(1);  // 关键:只有1个Queue
config.setReadQueueNums(1);

// 消费者也必须顺序消费
consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(
        List<MessageExt> msgs, ConsumeOrderlyContext context) {
        // 保证同一个Queue的消息顺序处理
        return ConsumeOrderlyStatus.SUCCESS;
    }
});

5.2 分区顺序消息(推荐方案)

// 生产者:相同订单号的消息发送到同一个Queue
Message msg = new Message("OrderTopic", "订单操作", orderJson.getBytes());
SendResult result = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, 
                             Message msg, Object arg) {
        String orderId = (String) arg;
        int queueId = Math.abs(orderId.hashCode()) % mqs.size();
        return mqs.get(queueId);
    }
}, orderId);  // 相同orderId的消息进入同一个Queue

// 消费者:顺序消费模式
consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(
        List<MessageExt> msgs, ConsumeOrderlyContext context) {
        // 同一个Queue的消息会顺序处理
        // 不同Queue的消息可以并行处理
        return ConsumeOrderlyStatus.SUCCESS;
    }
});

六、 广播模式与集群模式

6.1 集群模式(默认)

// 消费者设置为集群模式
consumer.setMessageModel(MessageModel.CLUSTERING);

特点

  • 同组消费者分摊消费消息(负载均衡)
  • 每条消息只被消费一次
  • 适合大部分业务场景

6.2 广播模式

// 消费者设置为广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);

特点

  • 同组消费者各自消费全量消息
  • 每条消息被每个消费者消费一次
  • 适合配置同步、缓存刷新等场景

对比

特性 集群模式 广播模式
消费方式 分摊消费 全量消费
消费次数 每条消息被消费一次 每个消费者都消费一次
偏移量存储 Broker端存储,共享 消费者本地存储
适用场景 普通业务消息 配置同步、缓存刷新

七、 消息轨迹与监控

7.1 消息轨迹追踪

// 启用消息轨迹
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup", true);
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup", true);

// 轨迹数据存储到Topic
producer.setTraceTopic("RMQ_SYS_TRACE_TOPIC");

7.2 消费进度监控

// 查看消费者组的消费进度
ClusterInfo clusterInfo = mQAdminExt.examineBrokerClusterInfo();
ConsumerConnection consumerConnection = mQAdminExt.examineConsumerConnectionInfo(
    "OrderGroup");

// 获取每个Queue的消费偏移量
for (Connection connection : consumerConnection.getConnectionSet()) {
    String clientId = connection.getClientId();
    ConsumerRunningInfo runningInfo = mQAdminExt.getConsumerRunningInfo(
        "OrderGroup", clientId, false);
    
    // 消费偏移量信息
    Map<MessageQueue, Long> offsetTable = runningInfo.getOffsetTable();
    for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
        MessageQueue mq = entry.getKey();
        long consumerOffset = entry.getValue();
        long brokerOffset = mQAdminExt.maxOffset(mq);
        long lag = brokerOffset - consumerOffset;  // 堆积量
    }
}

八、 生产环境最佳实践

8.1 配置建议

// 消费者配置最佳实践
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderGroup");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("OrderTopic", "*");

// 1. 消费线程数配置(根据Queue数量调整)
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(64);

// 2. 每次拉取消息数量(平衡网络开销和处理速度)
consumer.setPullBatchSize(32);

// 3. 消息重试策略
consumer.setMaxReconsumeTimes(3);  // 最大重试次数

// 4. 消费超时时间
consumer.setConsumeTimeout(15);  // 分钟

// 5. 开启消费端限流(防止突发流量)
consumer.setPullThresholdForQueue(1000);  // 每个Queue最大缓存消息数

8.2 灾备与高可用

// 多集群消费(异地容灾)
consumer.setNamesrvAddr("主集群:9876;备集群:9876");

// 消费进度同步(主备切换时使用)
// 从主集群导出消费进度
Map<MessageQueue, Long> offsetTable = getOffsetFromPrimary();
// 在备集群设置消费进度
for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
    consumer.updateConsumeOffset(entry.getKey(), entry.getValue());
}

8.3 动态扩缩容

// 增加Queue数量(需要提前规划)
TopicConfig config = new TopicConfig();
config.setTopicName("OrderTopic");
config.setWriteQueueNums(16);  // 从8增加到16
config.setReadQueueNums(16);

// 注意:增加Queue数量会导致Rebalance
// 建议在业务低峰期操作

总结

RocketMQ的消息分发机制体现了分布式系统的经典设计哲学

  1. 队列是核心:通过Queue实现消息的物理分区,既是存储单位,也是并行单位
  2. 消费者组是协调单元:组内消费者协同工作,实现负载均衡
  3. Rebalance是动态平衡器:自动适应消费者数量和队列变化
  4. 多种模式适应不同场景:顺序/并发、集群/广播、推/拉模式

作为架构师,设计消息分发时要考虑的关键点:

  • 队列数量规划:根据业务吞吐量和消费者数量合理设置Queue数量
  • 消费幂等性:消息可能被重复消费(网络重试、Rebalance等场景)
  • 消费堆积监控:实时监控lag,避免消息积压
  • 过滤策略选择:根据业务需求选择Tag过滤或SQL过滤
  • 顺序性保证:需要顺序消费时,确保相同业务Key的消息进入同一Queue

最终选择依据:

  • 高吞吐场景:增加Queue数量,使用并发消费
  • 顺序性要求:使用分区顺序消息
  • 全量同步需求:使用广播模式
  • 复杂过滤:使用SQL过滤或Filter Server

RocketMQ的分发机制在性能、可靠性和灵活性之间取得了很好的平衡,使其成为企业级消息中间件的优选。