Rocketmq源码broker元数据同步源码分析

2022/01/25 RocketMQ

关键字:Broker 、元数据同步、RocketMQ 源码

Broker 有2种角色:Master和Slave。

  • Master:主要用于处理生产者、消费者的请求和存储数据。
  • Slave:从 Master 同步元数据和消息数据到Slave的磁盘保存。

这一节分析Broker Slave如何从Master同步元数据,同步的元数据包括:topic、消费者位移、延迟位移以及订阅组配置。

Broker 在启动的时候,会判断Broker的角色是什么,当Broker的角色是Slave时,会启动定时任务开始进行同步元数据。

//代码位置:org.apache.rocketmq.broker.BrokerController
private void handleSlaveSynchronize(BrokerRole role) {
        //如果是Slave
        if (role == BrokerRole.SLAVE) {
            if (null != slaveSyncFuture) {
                slaveSyncFuture.cancel(false);
            }
            this.slaveSynchronize.setMasterAddr(null);
            //启动定时任务同步元数据
            slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.slaveSynchronize.syncAll();
                    }
                    catch (Throwable e) {
                        log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
                    }
                }
            }, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
        } else {
            //handle the slave synchronise
            if (null != slaveSyncFuture) {
                slaveSyncFuture.cancel(false);
            }
            this.slaveSynchronize.setMasterAddr(null);
        }
 }

handleSlaveSynchronize方法判断Broker是否是Slave,如果是Slave则启动定时任务,每10秒同步元数据信息。

public void syncAll() {
        //同步topic
        this.syncTopicConfig();
        //同步消费者位移
        this.syncConsumerOffset();
        //同步延迟位移
        this.syncDelayOffset();
        //同步订阅组关系
        this.syncSubscriptionGroupConfig();
}

同步topic

//代码位置:org.apache.rocketmq.broker.slave.SlaveSynchronize
private void syncTopicConfig() {
        String masterAddrBak = this.masterAddr;
        if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
            try {
                //通过客户端从远程获取topic配置信息
                TopicConfigSerializeWrapper topicWrapper =
                    this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
                //如果topic配置的数据版本改变了,说明数据变化了,需要更新Slave的topic数据
                if (!this.brokerController.getTopicConfigManager().getDataVersion()
                    .equals(topicWrapper.getDataVersion())) {

                    //设置版本号
                    this.brokerController.getTopicConfigManager().getDataVersion()
                        .assignNewOne(topicWrapper.getDataVersion());
                    //清除旧的topic信息
                    this.brokerController.getTopicConfigManager().getTopicConfigTable().clear();
                    //添加新的topic信息
                    this.brokerController.getTopicConfigManager().getTopicConfigTable()
                        .putAll(topicWrapper.getTopicConfigTable());
                    //topic持久化
                    this.brokerController.getTopicConfigManager().persist();

                    log.info("Update slave topic config from master, {}", masterAddrBak);
                }
            } catch (Exception e) {
                log.error("SyncTopicConfig Exception, {}", masterAddrBak, e);
            }
        }
}

syncTopicConfig首先通过网络传输,从Broker master拉去所有的topic的信息。如果拉回来的topic的数据版本与Slave的topic的数据版本不一样,说明topic已经改变了,需要更新Slave的topic。更新Slave的topic的操作包括设置新的数据版本号、清除旧的topic信息、添加新的topic信息、topic持久化到本地。

getAllTopicConfig方法是从通过网络从master获取所有的topic信息,客户端和服务端之间的通信在《RocketMQ的通信机制设计源码分析》中已经分析过,具体细节可以参考该文章。客户端通过请求码将消息发送给服务端,服务端接收到消息,通过请求码将请求交给不同处理器处理,然后将结果返回。

其他元数据的同步

消费者位移、延迟位移、订阅组关系的同步,与topic的同步都是大同小异。这里只放出源码,源码中都有详情的注释。

消费者位移同步

private void syncConsumerOffset() {
        String masterAddrBak = this.masterAddr;
        if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
            try {
                //获取所有的消费者位移
                ConsumerOffsetSerializeWrapper offsetWrapper =
                    this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
                //将所有消费者位移添加到位移table中
                this.brokerController.getConsumerOffsetManager().getOffsetTable()
                    .putAll(offsetWrapper.getOffsetTable());
                //持久化
                this.brokerController.getConsumerOffsetManager().persist();
                log.info("Update slave consumer offset from master, {}", masterAddrBak);
            } catch (Exception e) {
                log.error("SyncConsumerOffset Exception, {}", masterAddrBak, e);
            }
        }
}

延迟位移同步

private void syncDelayOffset() {
        String masterAddrBak = this.masterAddr;
        if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
            try {
                //获取所有的延迟位移
                String delayOffset =
                    this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);
                if (delayOffset != null) {

                    //消息保存配置文件路径
                    String fileName =
                        StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController
                            .getMessageStoreConfig().getStorePathRootDir());
                    try {
                        //将延迟位移进行持久化
                        MixAll.string2File(delayOffset, fileName);
                    } catch (IOException e) {
                        log.error("Persist file Exception, {}", fileName, e);
                    }
                }
                log.info("Update slave delay offset from master, {}", masterAddrBak);
            } catch (Exception e) {
                log.error("SyncDelayOffset Exception, {}", masterAddrBak, e);
            }
        }
}

订阅组关系的同步

private void syncSubscriptionGroupConfig() {
        String masterAddrBak = this.masterAddr;
        if (masterAddrBak != null  && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
            try {
                //获取订阅组信息(消费者组)
                SubscriptionGroupWrapper subscriptionWrapper =
                    this.brokerController.getBrokerOuterAPI()
                        .getAllSubscriptionGroupConfig(masterAddrBak);

                if (!this.brokerController.getSubscriptionGroupManager().getDataVersion()
                    .equals(subscriptionWrapper.getDataVersion())) {
                    SubscriptionGroupManager subscriptionGroupManager =
                        this.brokerController.getSubscriptionGroupManager();
                    //设置数据版本
                    subscriptionGroupManager.getDataVersion().assignNewOne(
                        subscriptionWrapper.getDataVersion());
                    //删除旧的订阅组信息
                    subscriptionGroupManager.getSubscriptionGroupTable().clear();
                    //添加所有消费者信息
                    subscriptionGroupManager.getSubscriptionGroupTable().putAll(
                        subscriptionWrapper.getSubscriptionGroupTable());
                    //持久化信息
                    subscriptionGroupManager.persist();
                    log.info("Update slave Subscription Group from master, {}", masterAddrBak);
                }
            } catch (Exception e) {
                log.error("SyncSubscriptionGroup Exception, {}", masterAddrBak, e);
            }
        }
}

Search

    微信好友

    博士的沙漏

    Table of Contents