Rocketmq源码架构

2022/01/08 RocketMQ

RocketMQ源码架构

RocketMQ 的架构

RocketMQ架构设计RocketMQ架构设计

首先看下RocketMQ的架构图,RocketMQ架构主要分为四部分,如上图所示:

  • Producer:消息发布的角色,支持分布式集群方式部署。消息发布者,负责生产消息,提供多种发送方式将消息投递到Broker,如同步发送、异步发送、顺序发送、单向发送,为了确保消息投递到Broker,同步和异步方式都需要Broker确认消息,单向发送消息不需要。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
  • Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。消息消费者,负责消息消费,从Broker服务器获取消息提供应用程序。
  • BrokerServer:Broker 主要负责消息的存储,投递和查询以及保证服务的高可用。Broker负责接收生产者发送的消息并存储、同时为消费者消费消息提供支持。
  • NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer和Consumer仍然可以动态感知Broker的路由的信息。
  1. Broker管理:Broker启动的时候会将自己的注册消息提供给NameServer,注册消息主要包括Broker地址、Broker名字、Broker Id、topic配置信息等作为路由信息的基本数据,提供心跳检测机制检测Broker是否存活。Broker集群中每一台Broker服务器都向NameServer集群服务的每一台NameServer注册自己的路由信息。
  2. 路由信息管理:每个NameServer保存Broker集群服务器的所有的路由信息,提供给生产者和消费者获取路由信息,获取到路由信息就可以与Broker服务进行消息的投递和消费。由于每个NameServer都保存着完整的路由信息,即使一台NameServer服务下线了,生产者和消费者也能从其他的NameServer服务器获取到完整Broker路由细信息。

RocketMQ架构上主要分为四部分,如上图所示:

  • Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。

  • Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。

  • NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer和Consumer仍然可以动态感知Broker的路由的信息。

  • BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块。

    1. Remoting Module:整个Broker的实体,负责处理来自Client端的请求。
    2. Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息。
    3. Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
    4. HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
    5. Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。

RocketMQ网络部署特点

  • NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
  • Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。 注意:当前RocketMQ版本在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。
  • Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
  • Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。

结合部署架构图,描述集群工作流程:

  • 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
  • Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
  • 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
  • Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
  • Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

RocketMQ源码环境搭建

我们如果想要阅读RocketMQ源码,必须获取RocketMQ的源码,然后搭建RocketMQ的源码运行环境。

下面我们进行RocketMQ的源码环境搭建:

  • 下载RocketMQ源码,版本v4.7.0
  • 搭建RocketMQ启动环境

下载RocketMQ源码

RocketMQ源码地址 https://github.com/apache/rocketmq 在Github上下载RocketMQ源码,版本为v4.7.0

https://github.com/apache/rocketmq/tree/rocketmq-all-4.7.0

RocketMQ源码目录和模块如下:

rocketmq-rocketmq-all-4.7.0
├─acl                  #用户权限、安全、验证相关模块
├─broker               # Broker主要负责消息的存储、投递和查询以及服务高可用保证,
├─client               # 生产者、消费者、管理等客户端相关模块
├─common               # 公用数据结构等
├─distribution         # 分布式集群相关的脚本以及配置文件,编译模块,编译输出等
├─docs                 # 文档
├─example              # 示例,比如生产者和消费者
├─filter               # 有关消息过滤相关的功能模块
├─logappender          # 各种日志的追加器
├─logging              # 日志功能
├─namesrv              # 简单注册中心,路由管理以及Broker管理
├─openmessaging        # 对外提供服务
├─remoting             # 网络通信相关模块,远程调用接口,封装Netty底层通信
├─srvutil              # namesrc 模块的相关工具,提供一些公用的工具方法,比如解析命令行参数
├─store                # 消息存储相关模块
├─test                 # 测试相关模块
└─tools                # 工具类模块,命令行管理工具,如mqadmin工具

RocketQM源码的主要模块包含namesrv(路由中心),broker(消息服务器),store(消息存储),client(消息客户端)等。
阅读RocketMQ源码前,可以先查看docs文档,了解RocketMQ的基本概念,架构设计等文档。查看example下的基础案例,加深对RocketMQ的了解。

搭建RocketMQ启动环境

那么我们如何在Window环境下配置RocketMQ的启动环境呢?

  • 启动 RocketMQ Namesrv
  • 启动 RocketMQ Broker
  • 启动 RocketMQ Producer
  • 启动 RocketMQ Consumer

最小化的 RocketMQ 的环境,暂时不考虑 Namesrv 集群、Broker 集群、Consumer 集群。

启动namesrv

RocketMQ首先要启动namesrv。在namesrv模块中找到NamesrvStartup类,启动NamesrvStartup类的main方法。
启动报错,分析原因发现需要ROCKETMQ_HOME配置项。

configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

添加RocketMQ主目录,可以通过-Drocketmq.home.dir=path或者通过设置环境变量ROCKETMQ_HOME 来配置RocketMQ 的主目录。

ROCKETMQ_HOME=D:\RocketMQ

并在/RocketMQ文件目录下创建conf目录,并将distribution模块下的/conf的logback_namesrv.xml文件拷贝在/RocketMQ/conf目录下。 将文件中所有${user.home}替换成D:\RocketMQ\conf。

D:
├─RocketMQ                  
│ ├─conf
│ │ │ ├─logback_namesrv.xml

然后就可以启动namesrv了,启动成功以后控制台会显示如下信息:

The Name Server boot success. serializeType=JSON

命令行中输入 telnet 127.0.0.1 9876 ,看看是否能连接上 RocketMQ Namesrv 。

启动broker

同理,启动broker也需要配置环境变量

configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");

添加RocketMQ主目录,因为broker需要注册到namesrv上,还需要配置NAMESRV_ADDR。

ROCKETMQ_HOME=D:\RocketMQ;NAMESRV_ADDR=127.0.0.1:9876

并在/RocketMQ文件目录下创建conf目录,并将distribution模块下的/conf的logback_broker.xml文件拷贝在/RocketMQ/conf目录下。 将文件中所有${user.home}替换成D:\RocketMQ\conf。

D:
├─RocketMQ                  
│ ├─conf
│ │ │ ├─broker.conf
│ │ │ ├─logback_broker.xml
│ ├─logs
│ ├─store

我们还需要配置/conf/broker.conf

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH

# 这是存储路径,你设置为你的rocketmq运行目录的store子目录
storePathRootDir=D:\RocketMQ\store

# 这是commitLog的存储路径
storePathCommitLog=D:\RocketMQ\store\commitlog

# consume queue文件的存储路径
storePathConsumeQueue=D:\RocketMQ\store\consumequeue

# 消息索引文件的存储路径
storePathIndex=D:\RocketMQ\store\index

# checkpoint文件的存储路径
storeCheckpoint=D:\RocketMQ\store\checkpoint

# abort文件的存储路径
abortFile=D:\RocketMQ\abort

修改以后保存broker.conf broker的启动,在broker模块找到BrokerStartup类,启动BrokerStartup的main方法。 启动时,需要配置 -c D:\RocketMQ\conf\broker.conf用来加载broker的配置文件信息。 当你看到控制台显示如下信息时,说明broker也启动成功了。

The broker[DESKTOP-DFA24F11, 10.1.70.191:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876

妥妥的,原来 RocketMQ Broker 已经启动完成,并且注册到 RocketMQ Namesrv 上。

命令行中输入 telnet 127.0.0.1 10911 ,看看是否能连接上 RocketMQ Broker。

D:
├─RocketMQ                  
│ ├─conf
│ │ │ ├─broker.conf
│ │ │ ├─logback_broker.xml
│ │ │ ├─logback_namesrv.xml
│ ├─logs
│ ├─store
│ │ │ ├─conf
│ │ │ │ │ │consumerFilter.json
│ │ │ │ │ │consumerOffset.json
│ │ │ │ │ │delayOffset.json
│ │ │ │ │ │topics.json
│ │ │ ├─abort
│ │ │ ├─checkpoint
│ │ │ ├─lock

启动 RocketMQ Producer

打开 org.apache.rocketmq.example.quickstart.Producer 示例类,代码如下:

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.rocketmq.example.quickstart;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
 * This class demonstrates how to send messages to brokers using provided {@link DefaultMQProducer}.
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        /*
         * Instantiate with a producer group name.
         */
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

        /*
         * Specify name server addresses.
         * <p/>
         *
         * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
         * <pre>
         * {@code
         * producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
         * }
         * </pre>
         */

        /*
         * Launch the instance.
         */
        producer.setNamesrvAddr("127.0.0.1:9876"); // NamesrvAddr
        producer.start();

        for (int i = 0; i < 1000; i++) {
            try {

                /*
                 * Create a message instance, specifying topic, tag and message body.
                 */
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );

                /*
                 * Call send message to deliver message to one of brokers.
                 */
                SendResult sendResult = producer.send(msg);

                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        /*
         * Shut down once the producer instance is not longer in use.
         */
        producer.shutdown();
    }
}

注意,在 NamesrvAddr 处,我们增加了 producer.setNamesrvAddr(“127.0.0.1:9876”) 代码块,指明 Producer 使用的 RocketMQ Namesrv 。 然后,右键运行,RocketMQ Producer 就启动完成。 发送消息后,存储目录结构如下:

D:
├─RocketMQ                  
│ ├─conf
│ │ │ ├─broker.conf
│ │ │ ├─logback_broker.xml
│ │ │ ├─logback_namesrv.xml
│ ├─logs
│ ├─store
│ │ │ ├─conf
│ │ │ │ │ │consumerFilter.json
│ │ │ │ │ │consumerOffset.json
│ │ │ │ │ │delayOffset.json
│ │ │ │ │ │topics.json
│ │ │ │ │ │subscriptionGroup.json
│ │ │ ├─abort
│ │ │ ├─checkpoint
│ │ │ ├─lock
│ │ │ ├─commitlog
│ │ │ │ │ │00000000000000000000
│ │ │ │ │ │00000000001073741824
│ │ │ ├─consumequeue
│ │ │ │ │ │topic-test
│ │ │ │ │ │ │ │0
│ │ │ │ │ │ │ │ │ │00000000000000000000
│ │ │ │ │ │ │ │1
│ │ │ │ │ │ │ │2
│ │ │ │ │ │ │ │.....
│ │ │ ├─index
│ │ │ │ │ │20221107152845344

启动 RocketMQ Consumer

打开 org.apache.rocketmq.example.quickstart.Consumer 示例类,代码如下:

// Consumer.java

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        /*
         * Instantiate with specified consumer group name.
         */
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

        /*
         * Specify name server addresses.
         * <p/>
         *
         * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
         * <pre>
         * {@code
         * consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
         * }
         * </pre>
         */

        /*
         * Specify where to start in case the specified consumer group is a brand new one.
         */
        consumer.setNamesrvAddr("127.0.0.1:9876"); // NamesrvAddr
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        /*
         * Subscribe one more more topics to consume.
         */
        consumer.subscribe("TopicTest", "*");

        /*
         *  Register callback to execute on arrival of messages fetched from brokers.
         */
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        /*
         *  Launch the consumer instance.
         */
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }

}

注意,在 NamesrvAddr,我们还增加了 consumer.setNamesrvAddr(“127.0.0.1:9876”) 代码块,指明 Consumer 使用的 RocketMQ Namesrv 。

然后,右键运行,RocketMQ Consumer 就启动完成。

Search

    微信好友

    博士的沙漏

    Table of Contents