Rocketmq源码消费者负载均衡过程分析

2022/01/21 RocketMQ

RocketMQ源码之消费者负载均衡过程分析

消费者在消费消息的时候,需要知道从Broker的哪一个消息队列中去获取消息。所以,在消费者端必须要做负载均衡,即Broker端中多个消费队列分配给同一个消费者组中的哪些消费者消费。

触发负载均衡的时机

消费者启动触发负载均衡

消费者消费消息有push和pull两种方式,这篇文章是分析push过程中的消费者负载均衡。DefaultMQPushConsumerImpl启动的时候,会创建MQClientInstance(MQ客户端实例),消费者启动的过程可以参考《RocketMQ源码之消费者启动流程分析》,MQ客户端实例启动的时候会启动负载均衡服务线程,代码如下:

//代码位置:org.apache.rocketmq.client.impl.factory.MQClientInstance#start
public void start() throws MQClientException {
    synchronized (this) {
        //省略代码
        //启动负载均衡服务
         this.rebalanceService.start();
        //省略代码
    }
}

另外在DefaultMQPushConsumerImpl启动的启动的时候会立即唤醒负载均衡服务:

//代码位置:org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
public synchronized void start() throws MQClientException {
    //省略代码
    //唤醒负载均衡服务
    this.mQClientFactory.rebalanceImmediately();
}

Broker变化触发负载均衡

消费者客户端可以接收Broker发送的请求,当消费者发生变化时,Broker将发送信息给消费者客户端,ClientRemotingProcessor负责接收Broker发送给消费者客户端的通知。ClientRemotingProcessor接收到消费者发生变化的通知时,会交给processRequest方法处理,将立即触发负载均衡:

//代码位置:org.apache.rocketmq.client.impl.ClientRemotingProcessor#processRequest
public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
    //省略代码
    case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED:
          return this.notifyConsumerIdsChanged(ctx, request);
     //省略代码
}

notifyConsumerIdsChanged方法将会立即触发负载均衡:

//代码位置:org.apache.rocketmq.client.impl.ClientRemotingProcessor#notifyConsumerIdsChanged
public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
      //省略代码
     //立即进行负载均衡
      this.mqClientFactory.rebalanceImmediately();
      //省略代码
 }

定期触发负载均衡

RebalanceService是负载均衡服务线程,每隔20秒会触发负载均衡:

//代码位置:org.apache.rocketmq.client.impl.consumer.RebalanceService#run
public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            //等待20秒
            this.waitForRunning(waitInterval);
            //触发负载均衡
            this.mqClientFactory.doRebalance();
        }

        log.info(this.getServiceName() + " service end");
}

上面就是负载均衡触发的时机,在消费者启动的时候、消费者变化的时候以及定时触发负载均衡。接下来我们深入分析负载均衡的流程:

//代码位置:org.apache.rocketmq.client.impl.factory.MQClientInstance#doRebalance
public void doRebalance() {
        //遍历消费者,对消费者进行负载均衡
        for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
            MQConsumerInner impl = entry.getValue();
            if (impl != null) {
                try {
                    //负载均衡
                    impl.doRebalance();
                } catch (Throwable e) {
                    log.error("doRebalance exception", e);
                }
            }
        }
}

MQClientInstance的doRebalance方法遍历所有的消息者,对消费者进行负载均衡操作。

//代码位置:org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalance
public void doRebalance(final boolean isOrder) {
        //获取所有的订阅数据
        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
        if (subTable != null) {
            //遍历所有的订阅数据
            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                final String topic = entry.getKey();
                try {
                    //根据topic进行负载均衡
                    this.rebalanceByTopic(topic, isOrder);
                } catch (Throwable e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("rebalanceByTopic Exception", e);
                    }
                }
            }
        }

        //删除消息队列
        this.truncateMessageQueueNotMyTopic();
}

消费者负载均衡最终会调用RebalanceImpl类的doRebalance方法,该方法首先获取所有的消费者订阅数据,遍历所有的订阅数据,根据topic进行订阅负载均衡。rebalanceByTopic方法是根据消息队列类型进行不同的逻辑,这里只分析集群模式:

//代码位置:org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalance
private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case BROADCASTING: {
               //省略代码
            case CLUSTERING: {
                //根据topic获取消息队列
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                //获取所有的消费者客户端id
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                if (null == mqSet) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                    }
                }

                if (null == cidAll) {
                    log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
                }

                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);

                    //将所有的消息队列和消费者进行排序
                    Collections.sort(mqAll);
                    Collections.sort(cidAll);

                    //消费者之间消息分配的策略算法
                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                    List<MessageQueue> allocateResult = null;
                    try {
                        //进行负载均衡
                        allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
                    } catch (Throwable e) {
                        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                            e);
                        return;
                    }

                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
                        allocateResultSet.addAll(allocateResult);
                    }

                    //更新处理队列
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    if (changed) {
                        log.info(
                            "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                            strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                            allocateResultSet.size(), allocateResultSet);
                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
                    }
                }
                break;
            }
            default:
                break;
        }
}

rebalanceByTopic的代码逻辑如下:

  • 根据topic获取所有的消费者以及所有的消费者客户端id
  • 将所有的消息队列和消费者进行排序
  • 进行负载均衡选择消息队列(具体的负载均衡策略请看《RocketMQ源码之负载均衡策略》)
  • 更新处理队列

Search

    微信好友

    博士的沙漏

    Table of Contents