RocketMQ源码之消费者负载均衡过程分析
消费者在消费消息的时候,需要知道从Broker的哪一个消息队列中去获取消息。所以,在消费者端必须要做负载均衡,即Broker端中多个消费队列分配给同一个消费者组中的哪些消费者消费。
触发负载均衡的时机
消费者启动触发负载均衡
消费者消费消息有push和pull两种方式,这篇文章是分析push过程中的消费者负载均衡。DefaultMQPushConsumerImpl启动的时候,会创建MQClientInstance(MQ客户端实例),消费者启动的过程可以参考《RocketMQ源码之消费者启动流程分析》,MQ客户端实例启动的时候会启动负载均衡服务线程,代码如下:
//代码位置:org.apache.rocketmq.client.impl.factory.MQClientInstance#start
public void start() throws MQClientException {
synchronized (this) {
//省略代码
//启动负载均衡服务
this.rebalanceService.start();
//省略代码
}
}
另外在DefaultMQPushConsumerImpl启动的启动的时候会立即唤醒负载均衡服务:
//代码位置:org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
public synchronized void start() throws MQClientException {
//省略代码
//唤醒负载均衡服务
this.mQClientFactory.rebalanceImmediately();
}
Broker变化触发负载均衡
消费者客户端可以接收Broker发送的请求,当消费者发生变化时,Broker将发送信息给消费者客户端,ClientRemotingProcessor负责接收Broker发送给消费者客户端的通知。ClientRemotingProcessor接收到消费者发生变化的通知时,会交给processRequest方法处理,将立即触发负载均衡:
//代码位置:org.apache.rocketmq.client.impl.ClientRemotingProcessor#processRequest
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
//省略代码
case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED:
return this.notifyConsumerIdsChanged(ctx, request);
//省略代码
}
notifyConsumerIdsChanged方法将会立即触发负载均衡:
//代码位置:org.apache.rocketmq.client.impl.ClientRemotingProcessor#notifyConsumerIdsChanged
public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
//省略代码
//立即进行负载均衡
this.mqClientFactory.rebalanceImmediately();
//省略代码
}
定期触发负载均衡
RebalanceService是负载均衡服务线程,每隔20秒会触发负载均衡:
//代码位置:org.apache.rocketmq.client.impl.consumer.RebalanceService#run
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
//等待20秒
this.waitForRunning(waitInterval);
//触发负载均衡
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
上面就是负载均衡触发的时机,在消费者启动的时候、消费者变化的时候以及定时触发负载均衡。接下来我们深入分析负载均衡的流程:
//代码位置:org.apache.rocketmq.client.impl.factory.MQClientInstance#doRebalance
public void doRebalance() {
//遍历消费者,对消费者进行负载均衡
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
//负载均衡
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}
MQClientInstance的doRebalance方法遍历所有的消息者,对消费者进行负载均衡操作。
//代码位置:org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalance
public void doRebalance(final boolean isOrder) {
//获取所有的订阅数据
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
//遍历所有的订阅数据
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
//根据topic进行负载均衡
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
//删除消息队列
this.truncateMessageQueueNotMyTopic();
}
消费者负载均衡最终会调用RebalanceImpl类的doRebalance方法,该方法首先获取所有的消费者订阅数据,遍历所有的订阅数据,根据topic进行订阅负载均衡。rebalanceByTopic方法是根据消息队列类型进行不同的逻辑,这里只分析集群模式:
//代码位置:org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalance
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
//省略代码
case CLUSTERING: {
//根据topic获取消息队列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
//获取所有的消费者客户端id
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}
if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
//将所有的消息队列和消费者进行排序
Collections.sort(mqAll);
Collections.sort(cidAll);
//消费者之间消息分配的策略算法
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
//进行负载均衡
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
//更新处理队列
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}
rebalanceByTopic的代码逻辑如下:
- 根据topic获取所有的消费者以及所有的消费者客户端id
- 将所有的消息队列和消费者进行排序
- 进行负载均衡选择消息队列(具体的负载均衡策略请看《RocketMQ源码之负载均衡策略》)
- 更新处理队列