RocketMQ源码架构
RocketMQ 的架构
RocketMQ架构设计RocketMQ架构设计
首先看下RocketMQ的架构图,RocketMQ架构主要分为四部分,如上图所示:
- Producer:消息发布的角色,支持分布式集群方式部署。消息发布者,负责生产消息,提供多种发送方式将消息投递到Broker,如同步发送、异步发送、顺序发送、单向发送,为了确保消息投递到Broker,同步和异步方式都需要Broker确认消息,单向发送消息不需要。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
- Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。消息消费者,负责消息消费,从Broker服务器获取消息提供应用程序。
- BrokerServer:Broker 主要负责消息的存储,投递和查询以及保证服务的高可用。Broker负责接收生产者发送的消息并存储、同时为消费者消费消息提供支持。
- NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer和Consumer仍然可以动态感知Broker的路由的信息。
- Broker管理:Broker启动的时候会将自己的注册消息提供给NameServer,注册消息主要包括Broker地址、Broker名字、Broker Id、topic配置信息等作为路由信息的基本数据,提供心跳检测机制检测Broker是否存活。Broker集群中每一台Broker服务器都向NameServer集群服务的每一台NameServer注册自己的路由信息。
- 路由信息管理:每个NameServer保存Broker集群服务器的所有的路由信息,提供给生产者和消费者获取路由信息,获取到路由信息就可以与Broker服务进行消息的投递和消费。由于每个NameServer都保存着完整的路由信息,即使一台NameServer服务下线了,生产者和消费者也能从其他的NameServer服务器获取到完整Broker路由细信息。
RocketMQ架构上主要分为四部分,如上图所示:
-
Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
-
Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
-
NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer和Consumer仍然可以动态感知Broker的路由的信息。
-
BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块。
- Remoting Module:整个Broker的实体,负责处理来自Client端的请求。
- Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息。
- Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
- HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
- Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。
RocketMQ网络部署特点
- NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
- Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。 注意:当前RocketMQ版本在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。
- Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
- Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。
结合部署架构图,描述集群工作流程:
- 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
- Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
- 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
- Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
- Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
RocketMQ源码环境搭建
我们如果想要阅读RocketMQ源码,必须获取RocketMQ的源码,然后搭建RocketMQ的源码运行环境。
下面我们进行RocketMQ的源码环境搭建:
- 下载RocketMQ源码,版本v4.7.0
- 搭建RocketMQ启动环境
下载RocketMQ源码
RocketMQ源码地址 https://github.com/apache/rocketmq 在Github上下载RocketMQ源码,版本为v4.7.0
https://github.com/apache/rocketmq/tree/rocketmq-all-4.7.0
RocketMQ源码目录和模块如下:
rocketmq-rocketmq-all-4.7.0
├─acl #用户权限、安全、验证相关模块
├─broker # Broker主要负责消息的存储、投递和查询以及服务高可用保证,
├─client # 生产者、消费者、管理等客户端相关模块
├─common # 公用数据结构等
├─distribution # 分布式集群相关的脚本以及配置文件,编译模块,编译输出等
├─docs # 文档
├─example # 示例,比如生产者和消费者
├─filter # 有关消息过滤相关的功能模块
├─logappender # 各种日志的追加器
├─logging # 日志功能
├─namesrv # 简单注册中心,路由管理以及Broker管理
├─openmessaging # 对外提供服务
├─remoting # 网络通信相关模块,远程调用接口,封装Netty底层通信
├─srvutil # namesrc 模块的相关工具,提供一些公用的工具方法,比如解析命令行参数
├─store # 消息存储相关模块
├─test # 测试相关模块
└─tools # 工具类模块,命令行管理工具,如mqadmin工具
RocketQM源码的主要模块包含namesrv(路由中心),broker(消息服务器),store(消息存储),client(消息客户端)等。
阅读RocketMQ源码前,可以先查看docs文档,了解RocketMQ的基本概念,架构设计等文档。查看example下的基础案例,加深对RocketMQ的了解。
搭建RocketMQ启动环境
那么我们如何在Window环境下配置RocketMQ的启动环境呢?
- 启动 RocketMQ Namesrv
- 启动 RocketMQ Broker
- 启动 RocketMQ Producer
- 启动 RocketMQ Consumer
最小化的 RocketMQ 的环境,暂时不考虑 Namesrv 集群、Broker 集群、Consumer 集群。
启动namesrv
RocketMQ首先要启动namesrv。在namesrv模块中找到NamesrvStartup类,启动NamesrvStartup类的main方法。
启动报错,分析原因发现需要ROCKETMQ_HOME配置项。
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
添加RocketMQ主目录,可以通过-Drocketmq.home.dir=path或者通过设置环境变量ROCKETMQ_HOME 来配置RocketMQ 的主目录。
ROCKETMQ_HOME=D:\RocketMQ
并在/RocketMQ文件目录下创建conf目录,并将distribution模块下的/conf的logback_namesrv.xml文件拷贝在/RocketMQ/conf目录下。 将文件中所有${user.home}替换成D:\RocketMQ\conf。
D:
├─RocketMQ
│ ├─conf
│ │ │ ├─logback_namesrv.xml
然后就可以启动namesrv了,启动成功以后控制台会显示如下信息:
The Name Server boot success. serializeType=JSON
命令行中输入 telnet 127.0.0.1 9876 ,看看是否能连接上 RocketMQ Namesrv 。
启动broker
同理,启动broker也需要配置环境变量
configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");
添加RocketMQ主目录,因为broker需要注册到namesrv上,还需要配置NAMESRV_ADDR。
ROCKETMQ_HOME=D:\RocketMQ;NAMESRV_ADDR=127.0.0.1:9876
并在/RocketMQ文件目录下创建conf目录,并将distribution模块下的/conf的logback_broker.xml文件拷贝在/RocketMQ/conf目录下。 将文件中所有${user.home}替换成D:\RocketMQ\conf。
D:
├─RocketMQ
│ ├─conf
│ │ │ ├─broker.conf
│ │ │ ├─logback_broker.xml
│ ├─logs
│ ├─store
我们还需要配置/conf/broker.conf
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 这是存储路径,你设置为你的rocketmq运行目录的store子目录
storePathRootDir=D:\RocketMQ\store
# 这是commitLog的存储路径
storePathCommitLog=D:\RocketMQ\store\commitlog
# consume queue文件的存储路径
storePathConsumeQueue=D:\RocketMQ\store\consumequeue
# 消息索引文件的存储路径
storePathIndex=D:\RocketMQ\store\index
# checkpoint文件的存储路径
storeCheckpoint=D:\RocketMQ\store\checkpoint
# abort文件的存储路径
abortFile=D:\RocketMQ\abort
修改以后保存broker.conf broker的启动,在broker模块找到BrokerStartup类,启动BrokerStartup的main方法。 启动时,需要配置 -c D:\RocketMQ\conf\broker.conf用来加载broker的配置文件信息。 当你看到控制台显示如下信息时,说明broker也启动成功了。
The broker[DESKTOP-DFA24F11, 10.1.70.191:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876
妥妥的,原来 RocketMQ Broker 已经启动完成,并且注册到 RocketMQ Namesrv 上。
命令行中输入 telnet 127.0.0.1 10911 ,看看是否能连接上 RocketMQ Broker。
D:
├─RocketMQ
│ ├─conf
│ │ │ ├─broker.conf
│ │ │ ├─logback_broker.xml
│ │ │ ├─logback_namesrv.xml
│ ├─logs
│ ├─store
│ │ │ ├─conf
│ │ │ │ │ │consumerFilter.json
│ │ │ │ │ │consumerOffset.json
│ │ │ │ │ │delayOffset.json
│ │ │ │ │ │topics.json
│ │ │ ├─abort
│ │ │ ├─checkpoint
│ │ │ ├─lock
启动 RocketMQ Producer
打开 org.apache.rocketmq.example.quickstart.Producer 示例类,代码如下:
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.quickstart;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
* This class demonstrates how to send messages to brokers using provided {@link DefaultMQProducer}.
*/
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
/*
* Instantiate with a producer group name.
*/
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
/*
* Specify name server addresses.
* <p/>
*
* Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
* <pre>
* {@code
* producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
* }
* </pre>
*/
/*
* Launch the instance.
*/
producer.setNamesrvAddr("127.0.0.1:9876"); // NamesrvAddr
producer.start();
for (int i = 0; i < 1000; i++) {
try {
/*
* Create a message instance, specifying topic, tag and message body.
*/
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
/*
* Call send message to deliver message to one of brokers.
*/
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
/*
* Shut down once the producer instance is not longer in use.
*/
producer.shutdown();
}
}
注意,在 NamesrvAddr 处,我们增加了 producer.setNamesrvAddr(“127.0.0.1:9876”) 代码块,指明 Producer 使用的 RocketMQ Namesrv 。 然后,右键运行,RocketMQ Producer 就启动完成。 发送消息后,存储目录结构如下:
D:
├─RocketMQ
│ ├─conf
│ │ │ ├─broker.conf
│ │ │ ├─logback_broker.xml
│ │ │ ├─logback_namesrv.xml
│ ├─logs
│ ├─store
│ │ │ ├─conf
│ │ │ │ │ │consumerFilter.json
│ │ │ │ │ │consumerOffset.json
│ │ │ │ │ │delayOffset.json
│ │ │ │ │ │topics.json
│ │ │ │ │ │subscriptionGroup.json
│ │ │ ├─abort
│ │ │ ├─checkpoint
│ │ │ ├─lock
│ │ │ ├─commitlog
│ │ │ │ │ │00000000000000000000
│ │ │ │ │ │00000000001073741824
│ │ │ ├─consumequeue
│ │ │ │ │ │topic-test
│ │ │ │ │ │ │ │0
│ │ │ │ │ │ │ │ │ │00000000000000000000
│ │ │ │ │ │ │ │1
│ │ │ │ │ │ │ │2
│ │ │ │ │ │ │ │.....
│ │ │ ├─index
│ │ │ │ │ │20221107152845344
启动 RocketMQ Consumer
打开 org.apache.rocketmq.example.quickstart.Consumer 示例类,代码如下:
// Consumer.java
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
/*
* Instantiate with specified consumer group name.
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
/*
* Specify name server addresses.
* <p/>
*
* Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
* <pre>
* {@code
* consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
* }
* </pre>
*/
/*
* Specify where to start in case the specified consumer group is a brand new one.
*/
consumer.setNamesrvAddr("127.0.0.1:9876"); // NamesrvAddr
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
/*
* Subscribe one more more topics to consume.
*/
consumer.subscribe("TopicTest", "*");
/*
* Register callback to execute on arrival of messages fetched from brokers.
*/
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/*
* Launch the consumer instance.
*/
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
注意,在 NamesrvAddr,我们还增加了 consumer.setNamesrvAddr(“127.0.0.1:9876”) 代码块,指明 Consumer 使用的 RocketMQ Namesrv 。
然后,右键运行,RocketMQ Consumer 就启动完成。