Rocketmq技术实战

2022/01/06 RocketMQ

RocketMQ实战

Windows10本地部署RocketMQ

下载安装

首先我们可以去RocketMQ的官网去下载源码: 官网点这里[https://rocketmq.apache.org/]

环境准备

配置环境变量

ROCKETMQ_HOME="D:\rocketmq"
NAMESRV_ADDR="localhost:9876"

启动Name Server

进入MQ解压后的目录,即:ROCKETMQ_HOME/bin目录

.\mqnamesrv.cmd

启动Broker

.\mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true

-n 指定broker连接的nameServer的地址和端口号

autoCreateTopicEnable参数见名知意,打开自动创建Topic

安装rocketmq-console验证运行情况

修改rocketmq-consoleapplication.rpoperties配置文件

指定连接的nameServer的地址和端口号

 rocketmq.config.namesrvAddr=127.0.0.1:9876

找到主启动类,运行run方法即可,等待启动成功我们就可以再浏览器打开后台页面了,默认的启动地址是http://localhost:8080

Docker下安装RocketMQ

查找RocketMQ

docker search rocketmq

拉取镜像

docker pull rocketmqinc/rocketmq

创建namesrv数据存储路径

mkdir -p  /docker/rocketmq/data/namesrv/logs   /docker/rocketmq/data/namesrv/store

构建namesrv容器

docker run -d \
--restart=always \
--name rmqnamesrv \
-p 9876:9876 \
-v /docker/rocketmq/data/namesrv/logs:/root/logs \
-v /docker/rocketmq/data/namesrv/store:/root/store \
-e "MAX_POSSIBLE_HEAP=100000000" \
rocketmqinc/rocketmq \
sh mqnamesrv 

参数 说明 -d 以守护进程的方式启动

  • -restart=always docker重启时候容器自动重启
  • -name rmqnamesrv 把容器的名字设置为rmqnamesrv -p 9876:9876 把容器内的端口9876挂载到宿主机9876上面 -v /docker/rocketmq/data/namesrv/logs:/root/logs 把容器内的/root/logs日志目录挂载到宿主机的 /docker/rocketmq/data/namesrv/logs目录 -v /docker/rocketmq/data/namesrv/store:/root/store 把容器内的/root/store数据存储目录挂载到宿主机的 /docker/rocketmq/data/namesrv目录 rmqnamesrv 容器的名字 -e “MAX_POSSIBLE_HEAP=100000000” 设置容器的最大堆内存为100000000 rocketmqinc/rocketmq 使用的镜像名称 sh mqnamesrv 启动namesrv服务

创建broker节点

创建broker数据存储路径

mkdir -p  /docker/rocketmq/data/broker/logs   /docker/rocketmq/data/broker/store /docker/rocketmq/conf

创建配置文件

vi /docker/rocketmq/conf/broker.conf

vi /docker/rocketmq/conf/broker.conf
# 所属集群名称,如果节点较多可以配置多个
brokerClusterName = DefaultCluster
#broker名称,master和slave使用相同的名称,表明他们的主从关系
brokerName = broker-a
#0表示Master,大于0表示不同的slave
brokerId = 0
#表示几点做消息删除动作,默认是凌晨4点
deleteWhen = 04
#在磁盘上保留消息的时长,单位是小时
fileReservedTime = 48
#有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机制;
brokerRole = ASYNC_MASTER
#刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后才返回成功状态,ASYNC_FLUSH不需要;
flushDiskType = ASYNC_FLUSH
# 设置broker节点所在服务器的ip地址
brokerIP1 = 192.168.152.100
# 磁盘使用达到95%之后,生产者再写入消息会报错 CODE: 14 DESC: service not available now, maybe disk full
diskMaxUsedSpaceRatio=95

构建broker容器

docker run -d  \
--restart=always \
--name rmqbroker \
--link rmqnamesrv:namesrv \
-p 10911:10911 \
-p 10909:10909 \
-v  /docker/rocketmq/data/broker/logs:/root/logs \
-v  /docker/rocketmq/data/broker/store:/root/store \
-v /docker/rocketmq/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf \
-e "NAMESRV_ADDR=namesrv:9876" \
-e "MAX_POSSIBLE_HEAP=200000000" \
rocketmqinc/rocketmq \
sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf 

参数 说明 -d 以守护进程的方式启动 –restart=always docker重启时候镜像自动重启

  • -name rmqbroker 把容器的名字设置为rmqbroker
  • –link rmqnamesrv:namesrv 和rmqnamesrv容器通信 -p 10911:10911 把容器的非vip通道端口挂载到宿主机 -p 10909:10909 把容器的vip通道端口挂载到宿主机 -e “NAMESRV_ADDR=namesrv:9876” 指定namesrv的地址为本机namesrv的ip地址:9876 -e “MAX_POSSIBLE_HEAP=200000000” rocketmqinc/rocketmq sh mqbroker 指定broker服务的最大堆内存 rocketmqinc/rocketmq 使用的镜像名称 sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf 指定配置文件启动broker节点

创建rockermq-console服务

拉取镜像

docker pull pangliang/rocketmq-console-ng

构建rockermq-console容器

docker run -d \
--restart=always \
--name rmqadmin \
-e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.152.100:9876 \
-Dcom.rocketmq.sendMessageWithVIPChannel=false" \
-p 9999:8080 \
pangliang/rocketmq-console-ng

参数说明

参数 说明
-d 以守护进程的方式启动
-restart=always docker重启时候镜像自动重启
-name rmqadmin 把容器的名字设置为rmqadmin
-e “JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.152.100:9876 设置namesrv服务的ip地址
-Dcom.rocketmq.sendMessageWithVIPChannel=false” 不使用vip通道发送消息
–p 9999:8080 把容器内的端口8080挂载到宿主机上的9999端口

需要关闭防火墙或者开放namesrv和broker端口 如果不设置,控制台服务将无法访问namesrv服务 异常信息如下

org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to failed

关闭防火墙

systemctl stop firewalld.service

开放指定端口

firewall-cmd --permanent --zone=public --add-port=9876/tcp
firewall-cmd --permanent --zone=public --add-port=10911/tcp
# 立即生效
firewall-cmd --reload

访问控制台网页访问http://192.168.152.100:9999/查看控制台信息

HelloWorld

Maven依赖

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <java.version>1.8</java.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <lombok.version>1.18.16</lombok.version>
        <spring-boot-starter-parent.version>2.2.0.RELEASE</spring-boot-starter-parent.version>
        <slf4j-api.version>1.7.26</slf4j-api.version>
    </properties>
    <dependencies>
        <!-- RocketMQ -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-namesrv</artifactId>
            <version>4.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-broker</artifactId>
            <version>4.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-all</artifactId>
            <version>4.3.0</version>
            <type>pom</type>
        </dependency>


        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
            <exclusions>
                <exclusion>
                    <artifactId>log4j</artifactId>
                    <groupId>log4j</groupId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <dependencyManagement>

        <dependencies>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>${lombok.version}</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-parent</artifactId>
                <version>${spring-boot-starter-parent.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>

        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.0.1.RELEASE</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <!--指定打的jar包使用的jdk版本-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>

    </build>

NamesrvStartup

package com.rocketmq;

import org.apache.rocketmq.namesrv.NamesrvStartup;

public class NamesrvStartupTest {

    public static void main(String[] args) {
        NamesrvStartup.main(args);
    }
}

BrokerStartup

package com.rocketmq;

import org.apache.rocketmq.broker.BrokerStartup;

public class BrokerStartupTest {

    public static void main(String[] args) {
        BrokerStartup.main(args);
    }
}

Producer生产者

package com.rocketmq;


import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;


public class RocketMqProducerTest {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

        /**
         * producer配置项
         *  producerGroup DEFAULT_PRODUCER (一个线程下只能有一个组,但是一个组下面可以有多个实例,生产者组)
         *  Producer组名,多个producer如果属于一个应用,发送同样的消息,则应该将他们视为同一组
         *  createTopicKey WBW102 在发送消息时,自动创建服务器不存在的topic,需制定key
         *  defaultTopicQueueNums4 发送消息时,自动创建服务器不存在的topic,默认创建的队列数
         *  sendMsgTimeout 10000 发送消息超时时间,单位毫秒
         *  compressMsgBodyOverHowmuch 4096 消息body超过多大开始压缩(Consumer收到消息会自动解压缩),单位:字节
         *  retryTimesWhenSendFailed 重试次数 (可以配置)
         *  retryAnotherBrokerWhenNotStoreOK FALSE 如果发送消息返回sendResult,但是sendStatus!=SEND_OK,是否重发
         *  maxMessageSize 131072 客户端限制消息的大小,超过报错,同时服务端也会限制(默认128k)
         *  transactionCheckListener 事务消息回查监听器,如果发送事务消息,必须设置
         *  checkThreadPoolMinSize 1 Broker回查Producer事务状态时,线程池大小
         *  checkThreadPoolMaxSize 1 Broker回查Producer事务状态时,线程池大小
         *  checkRequestHoldMax 2000 Broker回查Producer事务状态,Producer本地缓冲请求队列大小
         */

        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        for (int i = 1; i <= 10; i++) {
            try {
                Message msg = new Message("TopicQuickStart",// topic
                        "TagA",// tag
                        "KKK",//key用于标识业务的唯一性
                        ("Hello RocketMQ " + i).getBytes()// body 二进制字节数组
                );
                SendResult sendResult = producer.send(msg); //ACK确认反馈,通过result判断消息发送成功还是失败
                System.out.println(sendResult); //msgID会在msg经过msgQueue逻辑结构之后才会有ID
            }
            catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        producer.shutdown();
    }

}

Consume消费者

package com.rocketmq;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class RocketMqConsumeTest {


    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
        /**
         * Push Consumer设置
         * messageModel CLUSTERING 消息模型,支持以下两种1.集群消费2.广播消费
         * consumeFromWhere CONSUME_FROM_LAST_OFFSET Consumer启动后,默认从什么位置开始消费
         * allocateMessageQueueStrategy
         * allocateMessageQueueAveragely Rebalance 算法实现策略
         * Subsription{} 订阅关系
         * messageListener 消息监听器
         * offsetStore 消费进度存储
         * consumeThreadMin 10 消费线程池数量
         * consumeThreadMax 20 消费线程池数量
         * pullThresholdForQueue 1000 拉去消息本地队列缓存消息最大数
         * pullInterval 拉消息间隔,由于是轮训,所以为0,但是如果用了流控,也可以设置大于0的值,单位毫秒
         * consumeMessageBatchMaxSize 1 批量消费,一次消费杜少条消息
         * pullBatchSize 32 批量拉消息,一次最多拉多少条
         *
         */

        /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
         * 如果非第一次启动,那么按照上次消费的位置继续消费
         */

        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicQuickStart", "TagA");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                //System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                MessageExt msg = msgs.get(0);
                try {
                    String topic = msg.getTopic();
                    String msgBody = new String(msg.getBody(),"utf-8");
                    String tags = msg.getTags();
                    System.out.println("get massage : " + " topic : " + topic + " tags : " + tags + " msg : " +msgBody);
                }catch (Exception e){
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER; //requeue 一会再消费
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // response broker ack
            }
        });

        consumer.start();

        System.out.println("Consumer Started.");
    }
}

较为重要的配置项说明

  • producer配置项
    • producerGroup DEFAULT_PRODUCER (一个线程下只能有一个组,但是一个组下面可以有多个实例,生产者组)
    • Producer组名,多个producer如果属于一个应用,发送同样的消息,则应该将他们视为同一组
    • sendMsgTimeout 10000 发送消息超时时间,单位毫秒
    • compressMsgBodyOverHowmuch 4096 消息body超过多大开始压缩(Consumer收到消息会自动解压缩),单位:字节
    • retryTimesWhenSendFailed 重试次数 (可以配置)
    • retryAnotherBrokerWhenNotStoreOK FALSE 如果发送消息返回sendResult,但是sendStatus!=SEND_OK,是否重发
    • maxMessageSize 131072 客户端限制消息的大小,超过报错,同时服务端也会限制(默认128k)
  • Push Consumer设置
    • messageModel CLUSTERING 消息模型,支持以下两种1.集群消费2.广播消费
    • consumeFromWhere CONSUME_FROM_LAST_OFFSET Consumer启动后,默认从什么位置开始消费
    • allocateMessageQueueStrategy
    • allocateMessageQueueAveragely Rebalance 算法实现策略
    • Subsription 订阅关系
    • messageListener 消息监听器
    • offsetStore 消费进度存储

消息的种类

按照发送的特点分:

  1. 同步消息
  2. 异步消息
  3. 单向消息

1)同步消息(可靠同步发送)

同步发送是指消息发送方发出数据后,会阻塞直到MQ服务方发回响应消息。

应用场景:此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。

RocketMQ同步消息

​ 关键代码:SendResult sendResult = producer.send(msg);

2)异步消息(可靠异步发送)

异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。MQ 的异步发送,需要用户实现异步发送回调接口(SendCallback),在执行消息的异步发送时,应用不需要等待服务器响应即可直接返回,通过回调接口接收服务器响应,并对服务器的响应结果进行处理。

应用场景:异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

RocketMQ异步消息

​ 关键代码:producer.sendAsync(msg, new SendCallback() {//...});

3)单向(one-way)消息

单向(Oneway)发送特点为只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。

应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

RocketMQ单向消息

​ 单向只发送,不等待返回,所以速度最快,一般在微秒级,但可能丢失

​ 关键代码:producer.sendOneway(msg);

按照使用功能特点分:

1)普通消息

2)顺序消息

在使用DefaultMQPushConsumer时,您需要决定使用排序的还是并行的消息。

  • 排序的(Orderly) 消费消息的有序意味着,消息的使用顺序与生产者为每个消息队列发送的顺序相同。如果您正在处理全局顺序是强制性的场景,请确保您使用的主题只有一个消息队列。 警告:如果指定消费有序,则消息消耗的最大并发性是消费者组订阅的消息队列的数量。
  • 并行的(Concurrently) 并行地使用消息时,消费消息的最大并行度只受每个客户端指定的线程池大小限制。 警告:此模式不再保证消息顺序。

消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了 3 条消息,分别是订单创建、订单付款、订单完成。消费时,要按照这个顺序消费才有意义。但同时订单之间又是可以并行消费的。

假如生产者产生了2条消息:M1、M2,要保证这两条消息的顺序,应该怎样做?你脑中想到的可能是这样:

消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了 3 条消息,分别是订单创建、订单付款、订单完成。消费时,要按照这个顺序消费才有意义。但同时订单之间又是可以并行消费的。

假如生产者产生了2条消息:M1、M2,要保证这两条消息的顺序,应该怎样做?你脑中想到的可能是这样:

img

你可能会采用这种方式保证消息顺序

M1发送到S1后,M2发送到S2,如果要保证M1先于M2被消费,那么需要M1到达消费端后,通知S2,然后S2再将M2发送到消费端。

这个模型存在的问题是,如果M1和M2分别发送到两台Server上,就不能保证M1先达到,也就不能保证M1被先消费,那么就需要在MQ Server集群维护消息的顺序。那么如何解决?一种简单的方式就是将M1、M2发送到同一个Server上:

img

保证消息顺序,你改进后的方法

这样可以保证M1先于M2到达MQServer(客户端等待M1成功后再发送M2),根据先达到先被消费的原则,M1会先于M2被消费,这样就保证了消息的顺序。

这个模型,理论上可以保证消息的顺序,但在实际运用中你应该会遇到下面的问题:

img

网络延迟问题

只要将消息从一台服务器发往另一台服务器,就会存在网络延迟问题。如上图所示,如果发送M1耗时大于发送M2的耗时,那么M2就先被消费,仍然不能保证消息的顺序。即使M1和M2同时到达消费端,由于不清楚消费端1和消费端2的负载情况,仍然有可能出现M2先于M1被消费。如何解决这个问题?将M1和M2发往同一个消费者即可,且发送M1后,需要消费端响应成功后才能发送M2。

但又会引入另外一个问题,如果发送M1后,消费端1没有响应,那是继续发送M2呢,还是重新发送M1?一般为了保证消息一定被消费,肯定会选择重发M1到另外一个消费端2,就如下图所示。

img

保证消息顺序的正确姿势

这样的模型就严格保证消息的顺序,细心的你仍然会发现问题,消费端1没有响应Server时有两种情况,一种是M1确实没有到达,另外一种情况是消费端1已经响应,但是Server端没有收到。如果是第二种情况,重发M1,就会造成M1被重复消费。也就是我们后面要说的第二个问题,消息重复问题。

回过头来看消息顺序问题,严格的顺序消息非常容易理解,而且处理问题也比较容易,要实现严格的顺序消息,简单且可行的办法就是:

保证生产者 - MQServer - 消费者是一对一对一的关系

但是这样设计,并行度就成为了消息系统的瓶颈(吞吐量不够),也会导致更多的异常处理,比如:只要消费端出现问题,就会导致整个处理流程阻塞,我们不得不花费更多的精力来解决阻塞的问题。

但我们的最终目标是要集群的高容错性和高吞吐量。这似乎是一对不可调和的矛盾,那么阿里是如何解决的?

世界上解决一个计算机问题最简单的方法:“恰好”不需要解决它!—— 沈询

有些问题,看起来很重要,但实际上我们可以通过合理的设计或者将问题分解来规避。如果硬要把时间花在解决它们身上,实际上是浪费的,效率低下的。从这个角度来看消息的顺序问题,我们可以得出两个结论:

1、不关注乱序的应用实际大量存在 2、队列无序并不意味着消息无序

最后我们从源码角度分析RocketMQ怎么实现发送顺序消息。

一般消息是通过轮询所有队列来发送的(负载均衡策略),顺序消息可以根据业务,比如说订单号相同的消息发送到同一个队列。下面的示例中,OrderId相同的消息,会发送到同一个队列:

// RocketMQ默认提供了两种MessageQueueSelector实现:随机/Hash
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer id = (Integer) arg;
        int index = id % mqs.size();
        return mqs.get(index);
    }
}, orderId);

在获取到路由信息以后,会根据MessageQueueSelector实现的算法来选择一个队列,同一个OrderId获取到的队列是同一个队列。

private SendResult send()  {
    // 获取topic路由信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        MessageQueue mq = null;
        // 根据我们的算法,选择一个发送队列
        // 这里的arg = orderId
        mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
        if (mq != null) {
            return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
        }
    }
}
消息重复

上面在解决消息顺序问题时,引入了一个新的问题,就是消息重复。那么RocketMQ是怎样解决消息重复的问题呢?还是“恰好”不解决。

造成消息的重复的根本原因是:网络不可达。只要通过网络交换数据,就无法避免这个问题。所以解决这个问题的办法就是不解决,转而绕过这个问题。那么问题就变成了:如果消费端收到两条一样的消息,应该怎样处理?

1、消费端处理消息的业务逻辑保持幂等性 2、保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现

第1条很好理解,只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。第2条原理就是利用一张日志表来记录已经处理成功的消息的ID,如果新到的消息ID已经在日志表中,那么就不再处理这条消息。

我们可以看到第1条的解决方式,很明显应该在消费端实现,不属于消息系统要实现的功能。第2条可以消息系统实现,也可以业务端实现。正常情况下出现重复消息的概率不一定大,且由消息系统实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以最好还是由业务端自己处理消息重复的问题,这也是RocketMQ不解决消息重复的问题的原因。

RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重。

顺序消息代码

生产者代码

package com.rocketmq;


import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;


public class RocketMqOrderProducerTest {
    public static void main(String[] args) {
        try {
            DefaultMQProducer producer = new DefaultMQProducer("order_producer");

            producer.setNamesrvAddr("127.0.0.1:9876");

            producer.start();

            Date date = new Date();

            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

            String dateStr = sdf.format(date);

            for (int i = 0; i < 5; i++) {
                String body = dateStr + " hello rocketMQ " + i;
                Message msg = new Message("TopicOrder2", "TagA", "KEY" + i, body.getBytes());
                //发送数据:如果使用顺序消息,则必须自己实现MessageQueueSelector,保证消息进入同一个队列中去.
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {

                        Integer id = (Integer) arg;
                        System.out.println("id : " + id);
                        return mqs.get(id);
                    }
                }, 1);//队列下标 //orderID是选定的topic中队列的下标

                System.out.println(sendResult + " , body : " + body);
            }
            for (int i = 0; i < 5; i++) {
                String body = dateStr + " hello rocketMQ " + i;
                Message msg = new Message("TopicOrder2", "TagA", "KEY" + i, body.getBytes());
                //发送数据:如果使用顺序消息,则必须自己实现MessageQueueSelector,保证消息进入同一个队列中去.
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {

                        Integer id = (Integer) arg;
                        System.out.println("id : " + id);
                        return mqs.get(id);
                    }
                }, 2);//队列下标 //orderID是选定的topic中队列的下标

                System.out.println(sendResult + " , body : " + body);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }

}

消费者代码

package com.rocketmq;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

public class RocketMqOrderConsumeTest {

    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_producer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
         * 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicOrder2", "TagA");

        consumer.registerMessageListener(new MessageListenerOrderly() {
            private Random random = new Random();
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                //设置自动提交
                context.setAutoCommit(true);

                for (MessageExt msg:msgs){
                    System.out.println(msg+ " , content : "+ new String(msg.getBody()));
                }
                try {
                    //模拟业务处理
                    TimeUnit.SECONDS.sleep(random.nextInt(5));
                }catch (Exception e){
                    e.printStackTrace();
                    return  ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
        System.out.println("consume ! ");
    }
}

3)广播消息

4)批量消息

5)事务消息

MQ组件是系统架构里必不可少的一门利器,设计层面可以降低系统耦合度,高并发场景又可以起到削峰填谷的作用,从单体应用到集群部署方案,再到现在的微服务架构,MQ凭借其优秀的性能和高可靠性,得到了广泛的认可。 随着数据量增多,系统压力变大,开始出现这种现象:数据库已经更新了,但消息没发出来,或者消息先发了,但后来数据库更新失败了,结果研发童鞋各种数据修复,这种生产问题出现的概率不大,但让人很郁闷。这个其实就是数据库事务与MQ消息的一致性问题,简单来讲,数据库的事务跟普通MQ消息发送无法直接绑定与数据库事务绑定在一起,例如上面提及的两种问题场景:

  1. 数据库事务提交后发送MQ消息;
  2. MQ消息先发,然后再提交数据库事务。

场景1的问题是数据库事务可能刚刚提交,服务器就宕机了,MQ消息没发出去,场景2的问题就是MQ消息发送出去了,但数据库事务提交失败,又没办法追加已经发出去的MQ消息,结果导致数据没更新,下游已经收到消息,最终事务出现不一致的情况。

RocketMQ除了支持普通消息,顺序消息,另外还支持事务消息。首先讨论一下什么是事务消息以及支持事务消息的必要性。我们以一个转帐的场景为例来说明这个问题。

我们以微服务架构的购物场景为例,参照一下RocketMQ官方的例子,用户A发起订单,支付100块钱操作完成后,能得到100积分,账户服务和会员服务是两个独立的微服务模块,有各自的数据库,按照上文提及的问题可能性,将会出现这些情况:

  • 如果先扣款,再发消息,可能钱刚扣完,宕机了,消息没发出去,结果积分没增加。
  • 如果先发消息,再扣款,可能积分增加了,但钱没扣掉,白送了100积分给人家。
  • 钱正常扣了,消息也发送成功了,但会员服务实例消费消息出现问题,结果积分没增加。

由此引出的是数据库事务与MQ消息的事务一致性问题,rocketmq事务消息解决的问题:解决本地事务执行与消息发送的原子性问题。这里界限一定要明白,是确保MQ生产端正确无误地将消息发送出来,没有多发,也不会漏发。但至于发送后消费端有没有正常的消费掉(如上面提及的第三种情况,钱正常扣了,消息也发了,但下游消费出问题导致积分不对),这种异常场景将由MQ消息消费失败重试机制来保证,不在此次的讨论范围内。

常用的MQ组件针对此场景都有自己的实现方案,如ActiveMQ使用AMQP协议(二阶提交方式)保证消息正确发送,这里我们以RocketMQ为重点进行学习。

RocketMQ实现发送事务消息

RocketMQ第一阶段发送Prepared消息时,会拿到消息的地址,第二阶段执行本地事物,第三阶段通过第一阶段拿到的地址去访问消息,并修改状态。细心的你可能又发现问题了,如果确认消息发送失败了怎么办?RocketMQ会定期扫描消息集群中的事物消息,这时候发现了Prepared消息,它会向消息发送者确认,Bob的钱到底是减了还是没减呢?如果减了是回滚还是继续发送确认消息呢?RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。

那我们来看下RocketMQ源码,是不是这样来处理事务消息的。客户端发送事务消息的部分(完整代码请查看:rocketmq-example工程下的com.alibaba.rocketmq.example.transaction.TransactionProducer):

// 未决事务,MQ服务器回查客户端
// 也就是上文所说的,当RocketMQ发现`Prepared消息`时,会根据这个Listener实现的策略来决断事务
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
// 构造事务消息的生产者
TransactionMQProducer producer = new TransactionMQProducer("groupName");
// 设置事务决断处理类
producer.setTransactionCheckListener(transactionCheckListener);
// 本地事务的处理逻辑,相当于示例中检查Bob账户并扣钱的逻辑
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
producer.start()
// 构造MSG,省略构造参数
Message msg = new Message(......);
// 发送消息
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
producer.shutdown();

接着查看sendMessageInTransaction方法的源码,总共分为3个阶段:发送Prepared消息、执行本地事务、发送确认消息。

public TransactionSendResult sendMessageInTransaction(.....)  {
    // 逻辑代码,非实际代码
    // 1.发送消息
    sendResult = this.send(msg);
    // sendResult.getSendStatus() == SEND_OK
    // 2.如果消息发送成功,处理与消息关联的本地事务单元
    LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
    // 3.结束事务
    this.endTransaction(sendResult, localTransactionState, localException);
}

endTransaction方法会将请求发往broker(mq server)去更新事物消息的最终状态:

  1. 根据sendResult找到Prepared消息
  2. 根据localTransaction更新消息的最终状态

如果endTransaction方法执行失败,导致数据没有发送到brokerbroker会有回查线程定时(默认1分钟)扫描每个存储事务状态的表格文件,如果是已经提交或者回滚的消息直接跳过,如果是prepared状态则会向Producer发起CheckTransaction请求,Producer会调用DefaultMQProducerImpl.checkTransactionState()方法来处理broker的定时回调请求,而checkTransactionState会调用我们的事务设置的决断方法,最后调用endTransactionOnewaybroker来更新消息的最终状态。

再回到转账的例子,如果Bob的账户的余额已经减少,且消息已经发送成功,Smith端开始消费这条消息,这个时候就会出现消费失败和消费超时两个问题?解决超时问题的思路就是一直重试,直到消费端消费消息成功,整个过程中有可能会出现消息重复的问题,按照前面的思路解决即可。

img

消费事务消息

这样基本上可以解决超时问题,但是如果消费失败怎么办?阿里提供给我们的解决方法是:人工解决。大家可以考虑一下,按照事务的流程,因为某种原因Smith加款失败,需要回滚整个流程。如果消息系统要实现这个回滚流程的话,系统复杂度将大大提升,且很容易出现Bug,估计出现Bug的概率会比消费失败的概率大很多。我们需要衡量是否值得花这么大的代价来解决这样一个出现概率非常小的问题,这也是大家在解决疑难问题时需要多多思考的地方。

RocketMQ事务实现

RocketMQ提供类似X/Open XA的分布式事务功能来确保业务发送方和MQ消息的最终一致性,其本质是通过半消息(prepare消息和commit消息)的方式把分布式事务放在MQ端来处理。

RocketMQ事务消息2

其中:

1,发送方向消息队列 RocketMQ 服务端发送消息。

2,服务端将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。

3,发送方开始执行本地事务逻辑。

4,发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半消息,订阅方将不会接受该消息。

补偿流程:

5,在断网或者是应用重启的特殊情况下,上述步骤 4 提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。

6,发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

7,发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半消息进行操作。

RocketMQ的半消息机制的注意事项是

1,根据第六步可以看出他要求发送方提供业务回查接口。

2,不能保证发送方的消息幂等,在ack没有返回的情况下,可能存在重复消息

3,消费方要做幂等处理。

事务消息代码

事务消息生产者

package com.rocketmq;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;

public class TransactionProducer {

    public static void main(String[] args) {
        try {
            TransactionMQProducer producer = new TransactionMQProducer("transactionMQProducer");
            producer.setNamesrvAddr("127.0.0.1:9876");
            producer.setTransactionListener(new TransactionListenerImpl());
            producer.start();
            for (int i = 1; i < 6; i++) {
                Message message = new Message("TransactionTopic", "transactionTest", "msg-" + i, ("Hello" + ":" + i).getBytes());
                try {
                    SendResult result = producer.sendMessageInTransaction(message, "Hello" + ":" + i);
                    System.out.printf("Topic:%s send success, misId is:%s%n", message.getTopic(), result.getMsgId());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            Thread.sleep(Integer.MAX_VALUE);
            producer.shutdown();
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}


事务消息监听器处理

package com.rocketmq;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

/**
 * 温馨提示:业务的操作与本地事务消息表都需要在 OrderTransactionListenerImpl
 * 的 executeLocalTransaction 方法中执行,确保是一个事务。
 */
@Slf4j
public class TransactionListenerImpl implements TransactionListener {
    /**
     * @param message
     * @param o
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        String msgKey = message.getKeys();
        log.info("本地事务执行===============");
        /**
         * 本地事务执行会有三种可能
         * 1、commit 成功
         * 2、Rollback 失败
         * 3、网络等原因服务宕机收不到返回结果
         */
        LocalTransactionState state;
        // 1、二次确认消息,然后消费者可以消费
        if (msgKey.contains("1")) {
            state = LocalTransactionState.COMMIT_MESSAGE;
            //2、回滚消息,Broker端会删除半消息
        } else if (msgKey.contains("2")) {
            state = LocalTransactionState.ROLLBACK_MESSAGE;
        } else {
            //3、Broker端会进行回查消息
            state = LocalTransactionState.UNKNOW;
        }
        return state;
    }

    /**
     * 回查本地事务结果
     *
     * @param messageExt
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        log.info("==========回查接口=========");
        String msgKey = messageExt.getKeys();
        //TODO 1、必须根据key先去检查本地事务消息是否完成。
        /**
         * 因为有种情况就是:上面本地事务执行成功了,但是return LocalTransactionState.COMMIT_MESSAG的时候
         * 服务挂了,那么最终 Brock还未收到消息的二次确定,还是个半消息 ,所以当重新启动的时候还是回调这个回调接口。
         * 如果不先查询上面本地事务的执行情况 直接在执行本地事务,那么就相当于成功执行了两次本地事务了。
         */
        // TODO 2、这里返回要么commit 要么rollback。没有必要在返回 UNKNOW
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}


RocketMQ事务消息设计

一阶段的消息如何对用户不可见

事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的。

如何做到写入了消息但是对用户不可见?——写入消息数据,但是不创建对应的消息的索引信息。

熟悉RocketMQ的同学应该都清楚,消息在服务端的存储结构如上,每条消息都会有对应的索引信息,Consumer通过索引读取消息。

那么实现一阶段写入的消息不被用户消费(需要在Commit后才能消费),只需要写入Storage Queue,但是不构建Index Queue即可。

RocketMQ中具体实现策略是:写入的如果事务消息,对消息的Topic和Queue等属性进行替换,同时将原来的Topic和Queue信息存储到消息的属性中。

上图即RocketMQ替换事务消息属性的代码实现,替换属性后这条消息被写入到TransactionalMessageUtil.buildHalfTopic()的Queue 0中。

*(RocketMQ将事务消息一阶段发送的消息称为Half消息让人费解,采用的2PC的方式,一阶段消息称为Prepare Message或者Pending Message更能体现它的含义)*

在完成Storage Queue的写入后,在appendCallback中,普通消息会去构建消息索引,而如果发现是事务消息,则跳过了创建索引的逻辑。

如果让一阶段的消息对用户可见

在完成一阶段写入一条对用户不可见的消息后,二阶段如果是Commit操作,则需要让消息对用户可见;如果是Rollback则需要撤销一阶段的消息。

先说Rollback的情况。对于Rollback,本身一阶段的消息对用户是不可见的,其实不需要真正撤销消息(实际上RocketMQ也无法去真正的删除一条消息,因为是顺序写文件的)。

但是区别于这条消息没有确定状态(Pending状态,事务悬而未决),需要一个操作来标识这条消息的最终状态。

RocketMQ事务消息方案中引入了Op消息的概念,用Op消息标识事务消息是否状态已经确定(Commit或者Rollback)。如果一条事务消息没有对应的Op消息,说明这个事务的状态还无法确定(可能是二阶段失败了)。

引入Op消息后,事务消息无论是Commit或者Rollback都会记录一个Op操作。

Commit相对于Rollback只是在写入Op消息前创建Half消息的索引。

Op消息的存储

RocketMQ将Op消息写入到全局一个特定的Topic中:TransactionalMessageUtil.buildOpTopic()

这个Topic是一个内部的Topic(像Half消息的Topic一样),不会被用户消费。

Op消息的内容为对应的Half消息的存储的Offset,这样通过Op消息能索引到Half消息进行后续的回查操作。

Half消息的索引构建

在执行二阶段的Commit操作时,需要构建出Half消息的索引。

一阶段的Half消息由于是写到一个特殊的Topic,所以二阶段构建索引时需要读取出Half消息,并将Topic和Queue替换成真正的目标的Topic和Queue,之后通过一次普通消息的写入操作来生成一条对用户可见的消息。

所以RocketMQ事务消息二阶段其实是利用了一阶段存储的消息的内容,在二阶段时恢复出一条完整的普通消息,然后走一遍消息写入流程。

如何处理二阶段失败的消息

如果二阶段失败了,比如在Commit操作时出现网络问题导致Commit失败,那么需要通过一定的策略使这条消息最终被Commit。

RocketMQ采用了一种补偿机制,称为“回查”。

Broker端对未确定状态的消息发起回查,将消息发送到对应的Producer端(同一个Group的Producer),由Producer根据消息来检查本地事务的状态,进而执行Commit或者Rollback。

Broker端通过对比Half消息和Op消息进行事务消息的回查并且推进CheckPoint(记录那些事务消息的状态是确定的)。

值得注意的一点是具体实现中,在回查前,系统会执行putBackHalfMsgQueue操作,即将Half消息重新写一遍到Half消息的Queue中。这么做其实是为了能有效的推进上面的CheckPoint。

RocketMQ事务消息设计总结

以上是RocketMQ事务消息实现的示意图:

  • 通过写Half消息的方式来实现一阶段消息对用户不可见
  • 通过Op消息来标记事务消息的状态
  • 通过读取Half消息来生成一条新的Normal消息来完成二阶段Commit之后消息对Consumer可见
  • 通过Op消息来执行回查

优势:

  • Half Queue和Op Queue的数量可控,不会随着Topic的增加而增加
  • 没有外部依赖,实现自包含

缺陷:

  • 每条事务消息至少需要写一条Half消息(异常情况可能会有多条)和Normal,写放大了
  • 所有Half消息都是写到全局预设的一个内部的Topic,这块可能性能会有一些问题(所有Topic的事务消息会往一个Topic上写)
  • 全局Op消息写一个Topic,回查时间可能会有相互影响
事务消息配置

在broker.conf中配置 transactionCheckMax:消息回查最大次数,默认15次; transactionCheckInterval:检测频率,默认60s; transactionTimeOut:过期时间

消息发布和订阅

在RocketMQ中,producer发布消息,consumer订阅消息。消息的收发模型如下图:

RocketMQ消息收发模型1

Producer端消息发布

RocketMQ消息收发模型2

producer完全无状态,可以集群部署。

Consumer的Push模式和Pull模式

  • 一种是Push模式,即MQServer主动向消费端推送;
  • 另外一种是Pull模式,即消费端在需要时,主动到MQServer拉取。

但在具体实现时,Push和Pull模式都是采用消费端主动拉取的方式。

消费端的Push模式是通过长轮询的模式来实现的,就如同下图:

RocketMQ消息收发模型3

Push模式示意图

Consumer端每隔一段时间主动向broker发送拉消息请求,broker在收到Pull请求后,如果有消息就立即返回数据,Consumer端收到返回的消息后,再回调消费者设置的Listener方法。如果broker在收到Pull请求时,消息队列里没有数据,broker端会阻塞请求直到有数据传递或超时才返回。

当然,Consumer端是通过一个线程将阻塞队列LinkedBlockingQueue<PullRequest>中的PullRequest发送到broker拉取消息,以防止Consumer一致被阻塞。而Broker端,在接收到Consumer的PullRequest时,如果发现没有消息,就会把PullRequest扔到ConcurrentHashMap中缓存起来。

broker在启动时,会启动一个线程不停的从ConcurrentHashMap取出PullRequest检查,直到有数据返回。

Consumer集群消费和广播消费

基本概念

消息队列 RocketMQ 是基于发布/订阅模型的消息系统。消息的订阅方订阅关注的 Topic,以获取并消费消息。由于订阅方应用一般是分布式系统,以集群方式部署有多台机器。因此消息队列 RocketMQ 约定以下概念。

集群:使用相同 Group ID 的订阅者属于同一个集群。同一个集群下的订阅者消费逻辑必须完全一致(包括 Tag 的使用),这些订阅者在逻辑上可以认为是一个消费节点。

集群消费:当使用集群消费模式时,消息队列 RocketMQ 认为任意一条消息只需要被集群内的任意一个消费者处理即可。

一个Consumer Group中的Consumer实例平均分摊消费消息。例如某个Topic有 9 条消息,其中一个Consumer Group有 3 个实例(可能是 3 个进程,或者 3 台机器),那么每个实例只消费其中的 3 条消息。

设置消费端属性:MessageModel.CLUSTERING,这种方式就可以达到类似于ActiveMQ水平扩展负责负载均衡消息的实现.比较特殊的是这种消费方式可以支持先发送数据(也就是producer先发送数据到MQ),消费端订阅主题发生在生产端之后也可以收到数据,比较灵活.

广播消费:当使用广播消费模式时,消息队列 RocketMQ 会将每条消息推送给集群内所有注册过的客户端,保证消息至少被每台机器消费一次。

一条消息被多个Consumer消费,即使这些Consumer属于同一个Consumer Group,消息也会被Consumer Group中的每个Consumer都消费一次。在广播消费中的Consumer Group概念可以认为在消息划分方面无意义。

设置消费端对象属性:MessageModel.BROADCASTING,这种模式就是相当于生产端发送数据到MQ,多个消费端都可以获得数据.

场景对比

集群消费模式:

RocketMQ消息收发模型6

适用场景&注意事项

  • 消费端集群化部署,每条消息只需要被处理一次。
  • 由于消费进度在服务端维护,可靠性更高。
  • 集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。
  • 集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上,因此处理消息时不应该做任何确定性假设。

广播消费模式:

RocketMQ消息收发模型7

适用场景&注意事项

  • 广播消费模式下不支持顺序消息。
  • 广播消费模式下不支持重置消费位点。
  • 每条消息都需要被相同逻辑的多台机器处理。
  • 消费进度在客户端维护,出现重复的概率稍大于集群模式。
  • 广播模式下,消息队列 RocketMQ 保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。
  • 广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。
  • 广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
  • 目前仅 Java 客户端支持广播模式。
  • 广播模式下服务端不维护消费进度,所以消息队列 RocketMQ 控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。

使用集群模式模拟广播:

如果业务需要使用广播模式,也可以创建多个 Group ID,用于订阅同一个 Topic。

img

适用场景&注意事项

  • 每条消息都需要被多台机器处理,每台机器的逻辑可以相同也可以不一样。
  • 消费进度在服务端维护,可靠性高于广播模式。
  • 对于一个 Group ID 来说,可以部署一个消费端实例,也可以部署多个消费端实例。 当部署多个消费端实例时,实例之间又组成了集群模式(共同分担消费消息)。 假设 Group ID 1 部署了三个消费者实例 C1、C2、C3,那么这三个实例将共同分担服务器发送给 Group ID 1 的消息。 同时,实例之间订阅关系必须保持一致。

6)定时消息

// 定时消息,单位毫秒(ms),在指定时间戳(当前时间之后)进行投递,例如 2016-03-07 16:21:00 投递。如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。    
long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-03-07 16:21:00").getTime();    
msg.setStartDeliverTime(timeStamp);    
// 发送消息,只要不抛异常就是成功    
SendResult sendResult = producer.send(msg); 

7)延时消息

Message sendMsg = new Message(topic, tags, message.getBytes());
sendMsg.setDelayTimeLevel(delayLevel);
// 默认3秒超时
SendResult sendResult = rocketMQProducer.send(sendMsg);

负载均衡

生产端负载均衡

首先分析一下RocketMQ的客户端发送消息的源码:

在整个应用生命周期内,生产者需要调用一次start方法来初始化,初始化主要完成的任务有:

  1. 如果没有指定namesrv地址,将会自动寻址
  2. 启动定时任务:更新namesrv地址、从namsrv更新topic路由信息、清理已经挂掉的broker、向所有broker发送心跳…
  3. 启动负载均衡的服务

初始化完成后,开始发送消息,发送消息的主要代码如下:

代码中需要关注的两个方法tryToFindTopicPublishInfo和selectOneMessageQueue。前面说过在producer初始化时,会启动定时任务获取路由信息并更新到本地缓存,所以tryToFindTopicPublishInfo会首先从缓存中获取topic路由信息,如果没有获取到,则会自己去namesrv获取路由信息。selectOneMessageQueue方法通过轮询的方式,返回一个队列,以达到负载均衡的目的。

如果Producer发送消息失败,会自动重试,重试的策略:

  1. 重试次数 < retryTimesWhenSendFailed(可配置)
  2. 总的耗时(包含重试n次的耗时) < sendMsgTimeout(发送消息时传入的参数)
  3. 同时满足上面两个条件后,Producer会选择另外一个队列发送消息

消费端的负载均衡

Producer向一些队列轮流发送消息,队列集合称为TopicConsumer如果做广播消费,则一个consumer实例消费这个Topic对应的所有队列;如果做集群消费,则多个Consumer实例平均消费这个Topic对应的队列集合。

上图的集群模式里,每个consumer消费部分消息,这里的负载均衡是怎样的呢?

消费端会通过RebalanceService线程,20秒钟做一次基于topic下的所有队列负载:

  1. 遍历Consumer下的所有topic,然后根据topic订阅所有的消息
  2. 获取同一topic和Consumer Group下的所有Consumer
  3. 然后根据具体的分配策略来分配消费队列,分配的策略包含:平均分配、消费端配置等

如同上图所示:如果有 3 个队列,2 个 consumer,那么第一个 Consumer 消费 2 个队列,第二 consumer 消费 1 个队列。这里采用的就是平均分配策略,它类似于我们的分页,TOPIC下面的所有queue就是记录,Consumer的个数就相当于总的页数,那么每页有多少条记录,就类似于某个Consumer会消费哪些队列。

通过这样的策略来达到大体上的平均消费,这样的设计也可以很方面的水平扩展Consumer来提高消费能力。

消息重试机制

消息重试机制概述

  • RocketMQ提供了消息重试机制
    • 生产者:消息重投重试(保证数据的高可靠)
    • 消费者:消息处理异常(broker端到consumer端各种问题,比如网络原因闪断,消费端处理失败,ACK返回失败等)

consumer如何消息重试

需要重试的情况

  • Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次。 Consumer 消费消息失败通常可以讣为 有以下几种情冴
    1. 由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。返种错诨通常需要跳过返条消息,再消费其他消息,而返条失败的消息即使立刻重试消费,99%也丌成功,所以最好提供一种定时重试机制,即过 10s 秒后再重试。
    2. 由亍依赖的下游应用服务丌可用,例如 db 连接丌可用,外系统网络丌可达等。遇到返种错诨,即使跳过当前失败的消息,消费其他消息同样也会报错。返种情冴建议应用 sleep 30s,再消费下一条消息,返样可以减轻 Broker 重试消息的压力。

触发与配置重试

  • 捕获异常后 return ConsumeConcurrentlyStatus.RECONSUME_LATER; //requeue 一会再消费,会启动broker的重试机制
  • consumer网络异常时,这时候不会返回东西,broker会选择同一个主题下同一个组下的另一个consumer把消息消费掉

  • 在服务器端(rocketmq-broker端)的属性配置文件中加入以下行(延时级别):
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
描述了各级别与延时时间的对应映射关系。
  1. 这个配置项配置了从1级开始,各级延时的时间,可以修改这个指定级别的延时时间;
  2. 时间单位支持:s、m、h、d,分别表示秒、分、时、天;
  3. 默认值就是上面声明的,可手工调整;
  4. 默认值已够用,不建议修改这个值。

处理重试次数过多的问题

  • 通过重试次数进行逻辑处理(补偿机制)
  • 重试之后会获取OrignMsgId
catch(exception e){
    if(msg.getReconsumeTime()==3){
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

幂等与去重

基本概念

幂等性

消费者在消费mq中的消息时,mq已把消息发送给消费者,消费者在给mq返回ack时网络中断,故mq未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息;

  • 一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。
  • 在MQ中broker不可避免的会发送重复的数据,到我们的consumer,consumer必须要保证处理的消息是唯一的.

解决方案

  • MQ消费者的幂等行的解决一般使用全局ID 或者写个唯一标识比如时间戳 或者UUID 或者订单
  • 也可利用mq的该id来判断,或者可按自己的规则生成一个全局唯一id,每次消费消息时用该id先判断该消息是否已消费过。
  • 给消息分配一个全局id,只要消费过该消息,将 < id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。

谈谈死信队列

死信队列用于处理无法被正常消费的消息,即死信消息

当一条消息初次消费失败,消息队列 RocketMQ 版会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 版不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中,该特殊队列称为死信队列

死信消息的特点

  • 不会再被消费者正常消费。
  • 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理。

死信队列的特点

  • 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
  • 如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 版不会为其创建相应的死信队列。
  • 一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。

消息队列 RocketMQ 版控制台提供对死信消息的查询、导出和重发的功能。

运维常见问题

RocketMQ的mqadmin命令报错问题

问题描述:有时候在部署完RocketMQ集群后,尝试执行“mqadmin”一些运维命令,会出现下面的异常信息:

 org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <null> failed

解决方法:可以在部署RocketMQ集群的虚拟机上执行export NAMESRV_ADDR=ip:9876(ip指的是集群中部署NameServer组件的机器ip地址)命令之后再使用“mqadmin”的相关命令进行查询,即可得到结果。

RocketMQ生产端和消费端版本不一致导致不能正常消费的问题

问题描述:同一个生产端发出消息,A消费端可消费,B消费端却无法消费,rocketMQ Console中出现:

Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message的异常消息

解决方案:RocketMQ 的jar包:rocketmq-client等包应该保持生产端,消费端使用相同的version。

新增一个topic的消费组时,无法消费历史消息的问题

问题描述:当同一个topic的新增消费组启动时,消费的消息是当前的offset的消息,并未获取历史消息。

解决方案:rocketmq默认策略是从消息队列尾部,即跳过历史消息。如果想消费历史消息,则需要设置:org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#setConsumeFromWhere。常用的有以下三种配置:

  • 默认配置,一个新的订阅组第一次启动从队列的最后位置开始消费,后续再启动接着上次消费的进度开始消费,即跳过历史消息;
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  • 一个新的订阅组第一次启动从队列的最前位置开始消费,后续再启动接着上次消费的进度开始消费,即消费Broker未过期的历史消息;
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  • 一个新的订阅组第一次启动从指定时间点开始消费,后续再启动接着上次消费的进度开始消费,和consumer.setConsumeTimestamp()配合使用,默认是半个小时以前;
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);

如何开启从Slave读数据功能

在某些情况下,Consumer需要将消费位点重置到1-2天前,这时在内存有限的Master Broker上,CommitLog会承载比较重的IO压力,影响到该Broker的其它消息的读与写。可以开启slaveReadEnable=true,当Master Broker发现Consumer的消费位点与CommitLog的最新值的差值的容量超过该机器内存的百分比(accessMessageInMemoryMaxRatio=40%),会推荐Consumer从Slave Broker中去读取数据,降低Master Broker的IO。

性能调优问题

异步刷盘建议使用自旋锁,同步刷盘建议使用重入锁,调整Broker配置项useReentrantLockWhenPutMessage,默认为false;异步刷盘建议开启TransientStorePoolEnable;建议关闭transferMsgByHeap,提高拉消息效率;同步刷盘建议适当增大sendMessageThreadPoolNums,具体配置需要经过压测。

在RocketMQ中msgId和offsetMsgId的含义与区别

使用RocketMQ完成生产者客户端消息发送后,通常会看到如下日志打印信息:

SendResult [sendStatus=SEND_OK, msgId=0A42333A0DC818B4AAC246C290FD0000, offsetMsgId=0A42333A00002A9F000000000134F1F5, messageQueue=MessageQueue [topic=topicTest1, BrokerName=mac.local, queueId=3], queueOffset=4]
  • msgId,对于客户端来说msgId是由客户端producer实例端生成的,具体来说,调用方法MessageClientIDSetter.createUniqIDBuffer()生成唯一的Id;
  • offsetMsgId,offsetMsgId是由Broker服务端在写入消息时生成的(采用”IP地址+Port端口”与“CommitLog的物理偏移量地址”做了一个字符串拼接),其中offsetMsgId就是在RocketMQ控制台直接输入查询的那个messageId。

最佳实践

生产者

发送消息注意事项

Tags的使用

一个应用尽可能用一个Topic,而消息子类型则可以用tags来标识。tags可以由应用自由设置,只有生产者在发送消息设置了tags,消费方在订阅消息时才可以利用tags通过broker做消息过滤:message.setTags(“TagA”)。

Keys的使用

每个消息在业务层面的唯一标识码要设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic、key来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突。

   // 订单Id   
   String orderId = "20034568923546";   
   message.setKeys(orderId);   
日志的打印

消息发送成功或者失败要打印消息日志,务必要打印SendResult和key字段。send消息方法只要不抛异常,就代表发送成功。发送成功会有多个状态,在sendResult里定义。以下对每个状态进行说明:

  • SEND_OK

消息发送成功。要注意的是消息发送成功也不意味着它是可靠的。要确保不会丢失任何消息,还应启用同步Master服务器或同步刷盘,即SYNC_MASTER或SYNC_FLUSH。

  • FLUSH_DISK_TIMEOUT

消息发送成功但是服务器刷盘超时。此时消息已经进入服务器队列(内存),只有服务器宕机,消息才会丢失。消息存储配置参数中可以设置刷盘方式和同步刷盘时间长度,如果Broker服务器设置了刷盘方式为同步刷盘,即FlushDiskType=SYNC_FLUSH(默认为异步刷盘方式),当Broker服务器未在同步刷盘时间内(默认为5s)完成刷盘,则将返回该状态——刷盘超时。

  • FLUSH_SLAVE_TIMEOUT

消息发送成功,但是服务器同步到Slave时超时。此时消息已经进入服务器队列,只有服务器宕机,消息才会丢失。如果Broker服务器的角色是同步Master,即SYNC_MASTER(默认是异步Master即ASYNC_MASTER),并且从Broker服务器未在同步刷盘时间(默认为5秒)内完成与主服务器的同步,则将返回该状态——数据同步到Slave服务器超时。

  • SLAVE_NOT_AVAILABLE

消息发送成功,但是此时Slave不可用。如果Broker服务器的角色是同步Master,即SYNC_MASTER(默认是异步Master服务器即ASYNC_MASTER),但没有配置slave Broker服务器,则将返回该状态——无Slave服务器可用。

消息发送失败处理方式

Producer的send方法本身支持内部重试,重试逻辑如下:

  • 至多重试2次。
  • 如果同步模式发送失败,则轮转到下一个Broker,如果异步模式发送失败,则只会在当前Broker进行重试。这个方法的总耗时时间不超过sendMsgTimeout设置的值,默认10s。
  • 如果本身向broker发送消息产生超时异常,就不会再重试。

以上策略也是在一定程度上保证了消息可以发送成功。如果业务对消息可靠性要求比较高,建议应用增加相应的重试逻辑:比如调用send同步方法发送失败时,则尝试将消息存储到db,然后由后台线程定时重试,确保消息一定到达Broker。

上述db重试方式为什么没有集成到MQ客户端内部做,而是要求应用自己去完成,主要基于以下几点考虑:首先,MQ的客户端设计为无状态模式,方便任意的水平扩展,且对机器资源的消耗仅仅是cpu、内存、网络。其次,如果MQ客户端内部集成一个KV存储模块,那么数据只有同步落盘才能较可靠,而同步落盘本身性能开销较大,所以通常会采用异步落盘,又由于应用关闭过程不受MQ运维人员控制,可能经常会发生 kill -9 这样暴力方式关闭,造成数据没有及时落盘而丢失。第三,Producer所在机器的可靠性较低,一般为虚拟机,不适合存储重要数据。综上,建议重试过程交由应用来控制。

选择oneway形式发送

通常消息的发送是这样一个过程:

  • 客户端发送请求到服务器
  • 服务器处理请求
  • 服务器向客户端返回应答

所以,一次消息发送的耗时时间是上述三个步骤的总和,而某些场景要求耗时非常短,但是对可靠性要求并不高,例如日志收集类应用,此类应用可以采用oneway形式调用,oneway形式只发送请求不等待应答,而发送请求在客户端实现层面仅仅是一个操作系统系统调用的开销,即将数据写入客户端的socket缓冲区,此过程耗时通常在微秒级。

消费者

消费过程幂等

RocketMQ无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)

msgId一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同msgId的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。

消费速度慢的处理方式

提高消费并行度

绝大部分消息消费行为都属于 IO 密集型,即可能是操作数据库,或者调用 RPC,这类消费行为的消费速度在于后端数据库或者外系统的吞吐量,通过增加消费并行度,可以提高总的消费吞吐量,但是并行度增加到一定程度,反而会下降。所以,应用必须要设置合理的并行度。 如下有几种修改消费并行度的方法:

  • 同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度(需要注意的是超过订阅队列数的 Consumer 实例无效)。可以通过加机器,或者在已有机器启动多个进程的方式。
  • 提高单个 Consumer 的消费并行线程,通过修改参数 consumeThreadMin、consumeThreadMax实现。
批量方式消费

某些业务流程如果支持批量方式消费,则可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量,通过设置 consumer的 consumeMessageBatchMaxSize 返个参数,默认是 1,即一次只消费一条消息,例如设置为 N,那么每次消费的消息数小于等于 N。

跳过非重要消息

发生消息堆积时,如果消费速度一直追不上发送速度,如果业务对数据要求不高的话,可以选择丢弃不重要的消息。例如,当某个队列的消息数堆积到100000条以上,则尝试丢弃部分或全部消息,这样就可以快速追上发送消息的速度。示例代码如下:

    public ConsumeConcurrentlyStatus consumeMessage(
            List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
        long offset = msgs.get(0).getQueueOffset();
        String maxOffset =
                msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET);
        long diff = Long.parseLong(maxOffset) - offset;
        if (diff > 100000) {
            // TODO 消息堆积情况的特殊处理
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        // TODO 正常消费过程
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }    
优化每条消息消费过程

举例如下,某条消息的消费过程如下:

  • 根据消息从 DB 查询【数据 1】
  • 根据消息从 DB 查询【数据 2】
  • 复杂的业务计算
  • 向 DB 插入【数据 3】
  • 向 DB 插入【数据 4】

这条消息的消费过程中有4次与 DB的 交互,如果按照每次 5ms 计算,那么总共耗时 20ms,假设业务计算耗时 5ms,那么总过耗时 25ms,所以如果能把 4 次 DB 交互优化为 2 次,那么总耗时就可以优化到 15ms,即总体性能提高了 40%。所以应用如果对时延敏感的话,可以把DB部署在SSD硬盘,相比于SCSI磁盘,前者的RT会小很多。

消费打印日志

如果消息量较少,建议在消费入口方法打印消息,消费耗时等,方便后续排查问题。

   public ConsumeConcurrentlyStatus consumeMessage(
            List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
        log.info("RECEIVE_MSG_BEGIN: " + msgs.toString());
        // TODO 正常消费过程
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }   

如果能打印每条消息消费耗时,那么在排查消费慢等线上问题时,会更方便。

其他消费建议

关于消费者和订阅

第一件需要注意的事情是,不同的消费者组可以独立的消费一些 topic,并且每个消费者组都有自己的消费偏移量,请确保同一组内的每个消费者订阅信息保持一致。

关于有序消息

消费者将锁定每个消息队列,以确保他们被逐个消费,虽然这将会导致性能下降,但是当你关心消息顺序的时候会很有用。我们不建议抛出异常,你可以返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT 作为替代。

关于并发消费

顾名思义,消费者将并发消费这些消息,建议你使用它来获得良好性能,我们不建议抛出异常,你可以返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 作为替代。

关于消费状态Consume Status

对于并发的消费监听器,你可以返回 RECONSUME_LATER 来通知消费者现在不能消费这条消息,并且希望可以稍后重新消费它。然后,你可以继续消费其他消息。对于有序的消息监听器,因为你关心它的顺序,所以不能跳过消息,但是你可以返回SUSPEND_CURRENT_QUEUE_A_MOMENT 告诉消费者等待片刻。

关于Blocking

不建议阻塞监听器,因为它会阻塞线程池,并最终可能会终止消费进程

关于线程数设置

消费者使用 ThreadPoolExecutor 在内部对消息进行消费,所以你可以通过设置 setConsumeThreadMin 或 setConsumeThreadMax 来改变它。

关于消费位点

当建立一个新的消费者组时,需要决定是否需要消费已经存在于 Broker 中的历史消息CONSUME_FROM_LAST_OFFSET 将会忽略历史消息,并消费之后生成的任何消息。CONSUME_FROM_FIRST_OFFSET 将会消费每个存在于 Broker 中的信息。你也可以使用 CONSUME_FROM_TIMESTAMP 来消费在指定时间戳后产生的消息。

Broker

Broker 角色

Broker 角色分为 ASYNC_MASTER(异步主机)、SYNC_MASTER(同步主机)以及SLAVE(从机)。如果对消息的可靠性要求比较严格,可以采用 SYNC_MASTER加SLAVE的部署方式。如果对消息可靠性要求不高,可以采用ASYNC_MASTER加SLAVE的部署方式。如果只是测试方便,则可以选择仅ASYNC_MASTER或仅SYNC_MASTER的部署方式。

线上应该关闭autoCreateTopicEnable,即在配置文件中将其设置为false

RocketMQ在发送消息时,会首先获取路由信息。如果是新的消息,由于MQServer上面还没有创建对应的Topic,这个时候,如果上面的配置打开的话,会返回默认TOPIC的(RocketMQ会在每台broker上面创建名为TBW102的TOPIC)路由信息,然后Producer会选择一台Broker发送消息,选中的broker在存储消息时,发现消息的topic还没有创建,就会自动创建topic。后果就是:以后所有该TOPIC的消息,都将发送到这台broker上,达不到负载均衡的目的。

所以基于目前RocketMQ的设计,建议关闭自动创建TOPIC的功能,然后根据消息量的大小,手动创建TOPIC。

FlushDiskType

SYNC_FLUSH(同步刷新)相比于ASYNC_FLUSH(异步处理)会损失很多性能,但是也更可靠,所以需要根据实际的业务场景做好权衡。

Broker 配置

参数名 默认值 说明
listenPort 10911 接受客户端连接的监听端口
namesrvAddr null nameServer 地址
brokerIP1 网卡的 InetAddress 当前 broker 监听的 IP
brokerIP2 跟 brokerIP1 一样 存在主从 broker 时,如果在 broker 主节点上配置了 brokerIP2 属性,broker 从节点会连接主节点配置的 brokerIP2 进行同步
brokerName null broker 的名称
brokerClusterName DefaultCluster 本 broker 所属的 Cluser 名称
brokerId 0 broker id, 0 表示 master, 其他的正整数表示 slave
storePathRootDir $HOME/store/ 存储根路径
storePathCommitLog $HOME/store/commitlog/ 存储 commit log 的路径
mappedFileSizeCommitLog 1024 * 1024 * 1024(1G) commit log 的映射文件大小
deleteWhen 04 在每天的什么时间删除已经超过文件保留时间的 commit log
fileReservedTime 72 以小时计算的文件保留时间
brokerRole ASYNC_MASTER SYNC_MASTER/ASYNC_MASTER/SLAVE
flushDiskType ASYNC_FLUSH SYNC_FLUSH/ASYNC_FLUSH SYNC_FLUSH 模式下的 broker 保证在收到确认生产者之前将消息刷盘。ASYNC_FLUSH 模式下的 broker 则利用刷盘一组消息的模式,可以取得更好的性能。

NameServer

RocketMQ 中,Name Servers 被设计用来做简单的路由管理。其职责包括:

  • Brokers 定期向每个名称服务器注册路由数据。
  • 名称服务器为客户端,包括生产者,消费者和命令行客户端提供最新的路由信息。

客户端配置

相对于RocketMQ的Broker集群,生产者和消费者都是客户端。本小节主要描述生产者和消费者公共的行为配置。

客户端寻址方式

RocketMQ可以令客户端找到Name Server, 然后通过Name Server再找到Broker。如下所示有多种配置方式,优先级由高到低,高优先级会覆盖低优先级。

  • 代码中指定Name Server地址,多个namesrv地址之间用分号分割
producer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");  

consumer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
  • Java启动参数中指定Name Server地址
-Drocketmq.namesrv.addr=192.168.0.1:9876;192.168.0.2:9876  
  • 环境变量指定Name Server地址
export   NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876   
  • HTTP静态服务器寻址(默认)

客户端启动后,会定时访问一个静态HTTP服务器,地址如下:http://jmenv.tbsite.net:8080/rocketmq/nsaddr,这个URL的返回内容如下:

192.168.0.1:9876;192.168.0.2:9876   

客户端默认每隔2分钟访问一次这个HTTP服务器,并更新本地的Name Server地址。URL已经在代码中硬编码,可通过修改/etc/hosts文件来改变要访问的服务器,例如在/etc/hosts增加如下配置:

10.232.22.67    jmenv.tbsite.net   

推荐使用HTTP静态服务器寻址方式,好处是客户端部署简单,且Name Server集群可以热升级。

客户端配置

DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPullConsumer都继承于ClientConfig类,ClientConfig为客户端的公共配置类。客户端的配置都是get、set形式,每个参数都可以用spring来配置,也可以在代码中配置,例如namesrvAddr这个参数可以这样配置,producer.setNamesrvAddr(“192.168.0.1:9876”),其他参数同理。

客户端的公共配置
参数名 默认值 说明
namesrvAddr   Name Server地址列表,多个NameServer地址用分号隔开
clientIP 本机IP 客户端本机IP地址,某些机器会发生无法识别客户端IP地址情况,需要应用在代码中强制指定
instanceName DEFAULT 客户端实例名称,客户端创建的多个Producer、Consumer实际是共用一个内部实例(这个实例包含网络连接、线程资源等)
clientCallbackExecutorThreads 4 通信层异步回调线程数
pollNameServerInteval 30000 轮询Name Server间隔时间,单位毫秒
heartbeatBrokerInterval 30000 向Broker发送心跳间隔时间,单位毫秒
persistConsumerOffsetInterval 5000 持久化Consumer消费进度间隔时间,单位毫秒
Producer配置
参数名 默认值 说明
producerGroup DEFAULT_PRODUCER Producer组名,多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组
createTopicKey TBW102 在发送消息时,自动创建服务器不存在的topic,需要指定Key,该Key可用于配置发送消息所在topic的默认路由。
defaultTopicQueueNums 4 在发送消息,自动创建服务器不存在的topic时,默认创建的队列数
sendMsgTimeout 3000 发送消息超时时间,单位毫秒
compressMsgBodyOverHowmuch 4096 消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节
retryAnotherBrokerWhenNotStoreOK FALSE 如果发送消息返回sendResult,但是sendStatus!=SEND_OK,是否重试发送
retryTimesWhenSendFailed 2 如果消息发送失败,最大重试次数,该参数只对同步发送模式起作用
maxMessageSize 4MB 客户端限制的消息大小,超过报错,同时服务端也会限制,所以需要跟服务端配合使用。
transactionCheckListener   事务消息回查监听器,如果发送事务消息,必须设置
checkThreadPoolMinSize 1 Broker回查Producer事务状态时,线程池最小线程数
checkThreadPoolMaxSize 1 Broker回查Producer事务状态时,线程池最大线程数
checkRequestHoldMax 2000 Broker回查Producer事务状态时,Producer本地缓冲请求队列大小
RPCHook null 该参数是在Producer创建时传入的,包含消息发送前的预处理和消息响应后的处理两个接口,用户可以在第一个接口中做一些安全控制或者其他操作。
PushConsumer配置
参数名 默认值 说明
consumerGroup DEFAULT_CONSUMER Consumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组
messageModel CLUSTERING 消费模型支持集群消费和广播消费两种
consumeFromWhere CONSUME_FROM_LAST_OFFSET Consumer启动后,默认从上次消费的位置开始消费,这包含两种情况:一种是上次消费的位置未过期,则消费从上次中止的位置进行;一种是上次消费位置已经过期,则从当前队列第一条消息开始消费
consumeTimestamp 半个小时前 只有当consumeFromWhere值为CONSUME_FROM_TIMESTAMP时才起作用。
allocateMessageQueueStrategy AllocateMessageQueueAveragely Rebalance算法实现策略
subscription   订阅关系
messageListener   消息监听器
offsetStore   消费进度存储
consumeThreadMin 20 消费线程池最小线程数
consumeThreadMax 20 消费线程池最大线程数
consumeConcurrentlyMaxSpan 2000 单队列并行消费允许的最大跨度
pullThresholdForQueue 1000 拉消息本地队列缓存消息最大数
pullInterval 0 拉消息间隔,由于是长轮询,所以为0,但是如果应用为了流控,也可以设置大于0的值,单位毫秒
consumeMessageBatchMaxSize 1 批量消费,一次消费多少条消息
pullBatchSize 32 批量拉消息,一次最多拉多少条
PullConsumer配置
参数名 默认值 说明
consumerGroup DEFAULT_CONSUMER Consumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组
brokerSuspendMaxTimeMillis 20000 长轮询,Consumer拉消息请求在Broker挂起最长时间,单位毫秒
consumerTimeoutMillisWhenSuspend 30000 长轮询,Consumer拉消息请求在Broker挂起超过指定时间,客户端认为超时,单位毫秒
consumerPullTimeoutMillis 10000 非长轮询,拉消息超时时间,单位毫秒
messageModel BROADCASTING 消息支持两种模式:集群消费和广播消费
messageQueueListener   监听队列变化
offsetStore   消费进度存储
registerTopics   注册的topic集合
allocateMessageQueueStrategy AllocateMessageQueueAveragely Rebalance算法实现策略
Message数据结构
字段名 默认值 说明
Topic null 必填,消息所属topic的名称
Body null 必填,消息体
Tags null 选填,消息标签,方便服务器过滤使用。目前只支持每个消息设置一个tag
Keys null 选填,代表这条消息的业务关键词,服务器会根据keys创建哈希索引,设置后,可以在Console系统根据Topic、Keys来查询消息,由于是哈希索引,请尽可能保证key唯一,例如订单号,商品Id等。
Flag 0 选填,完全由应用来设置,RocketMQ不做干预
DelayTimeLevel 0 选填,消息延时级别,0表示不延时,大于0会延时特定的时间才会被消费
WaitStoreMsgOK TRUE 选填,表示消息是否在服务器落盘后才返回应答。

系统配置

本小节主要介绍系统(JVM/OS)相关的配置。

JVM选项

推荐使用最新发布的JDK 1.8版本。通过设置相同的Xms和Xmx值来防止JVM调整堆大小以获得更好的性能。简单的JVM配置如下所示:

-server -Xms8g -Xmx8g -Xmn4g

如果您不关心RocketMQ Broker的启动时间,还有一种更好的选择,就是通过“预触摸”Java堆以确保在JVM初始化期间每个页面都将被分配。那些不关心启动时间的人可以启用它: ​ -XX:+AlwaysPreTouch

禁用偏置锁定可能会减少JVM暂停, ​ -XX:-UseBiasedLocking

至于垃圾回收,建议使用带JDK 1.8的G1收集器。

-XX:+UseG1GC -XX:G1HeapRegionSize=16m   
-XX:G1ReservePercent=25 
-XX:InitiatingHeapOccupancyPercent=30

这些GC选项看起来有点激进,但事实证明它在我们的生产环境中具有良好的性能。另外不要把-XX:MaxGCPauseMillis的值设置太小,否则JVM将使用一个小的年轻代来实现这个目标,这将导致非常频繁的minor GC,所以建议使用rolling GC日志文件:

-XX:+UseGCLogFileRotation   
-XX:NumberOfGCLogFiles=5 
-XX:GCLogFileSize=30m

如果写入GC文件会增加代理的延迟,可以考虑将GC日志文件重定向到内存文件系统:

-Xloggc:/dev/shm/mq_gc_%p.log123   

Linux内核参数

os.sh脚本在bin文件夹中列出了许多内核参数,可以进行微小的更改然后用于生产用途。下面的参数需要注意,更多细节请参考/proc/sys/vm/*的文档

  • vm.extra_free_kbytes,告诉VM在后台回收(kswapd)启动的阈值与直接回收(通过分配进程)的阈值之间保留额外的可用内存。RocketMQ使用此参数来避免内存分配中的长延迟。(与具体内核版本相关)
  • vm.min_free_kbytes,如果将其设置为低于1024KB,将会巧妙的将系统破坏,并且系统在高负载下容易出现死锁。
  • vm.max_map_count,限制一个进程可能具有的最大内存映射区域数。RocketMQ将使用mmap加载CommitLog和ConsumeQueue,因此建议将为此参数设置较大的值。(agressiveness –> aggressiveness)
  • vm.swappiness,定义内核交换内存页面的积极程度。较高的值会增加攻击性,较低的值会减少交换量。建议将值设置为10来避免交换延迟。
  • File descriptor limits,RocketMQ需要为文件(CommitLog和ConsumeQueue)和网络连接打开文件描述符。我们建议设置文件描述符的值为655350。
  • Disk scheduler,RocketMQ建议使用I/O截止时间调度器,它试图为请求提供有保证的延迟。

SpringBoot整合RocketMQ

Maven依赖

<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.1</version>
</dependency>

application.yaml

# RocketMQ 相关配置
rocketmq:
  # 指定 nameServer
  name-server: 192.168.152.100:9876
  # Producer 生产者
  producer:
    group: my-group  # 指定发送者组名
    send-message-timeout: 3000 # 发送消息超时时间,单位:毫秒。默认为 3000 。
    compress-message-body-threshold: 4096 # 消息压缩阀值,当消息体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B
    max-message-size: 4194304 # 消息体的最大允许大小。。默认为 4 * 1024 * 1024B
    retry-times-when-send-failed: 2 # 同步发送消息时,失败重试次数。默认为 2 次。
    retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次。
    retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false

发送消息

    private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送同步消息
     * @param topic
     * @param tag
     * @param data
     * @param <T>
     */
    public <T> void syncSend(String topic, String tag, T data) {
        log.info("syncSend topic:{}tag:{}",topic,tag);
        rocketMQTemplate.syncSend(topic + ":" + tag, data);
    }

    /**
     * 发送有序消息
     * @param topic
     * @param tag
     * @param data
     * @param key
     * @param <T>
     */
    public <T> void syncSendOrderly(String topic, String tag, T data, String key) {
        log.info("syncSendOrderly topic:{}tag:{}",topic,tag);
        rocketMQTemplate.syncSendOrderly(topic + ":" + tag, data, key);
    }


    /**
     * 发送事务消息
     * @param topic
     * @param tag
     * @param data
     * @param <T>
     */
    public <T> void sendMessageInTransaction(String topic, String tag, T data) {
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(topic + tag,
                MessageBuilder.withPayload(data).build(), null);
    }

接受消息

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
 
@Component
@RocketMQMessageListener(topic = "first-topic",consumerGroup = "my-consumer-group")
@Slf4j
public class Consumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println(message);
    }
}

Search

    微信好友

    博士的沙漏

    Table of Contents