Rocketmq源码消息发送

2022/01/12 RocketMQ

RocketMQ消息发送

消息发送流程主要的步骤:验证消息、查找路由、消息发送(包含异常处理机制)。

RocketMQ发送消息的方式

RocketMQ生产者把消息发送到Broker服务器的方法有三种:

  • 单向发送 把消息发向Broker服务器,而不用管消息是否成功发送到Broker服务器,只管发送,不管结果。
  • 同步发送 把消息发送给Broker服务器,如果消息成功发送给Broker服务器,能得到Broker服务器的响应结果。
  • 异步发送 把消息发送给Broker服务器,如果消息成功发送给Broker服务器,能得到Broker服务器的响应结果。因为是异步发送,发送完消息以后,不用等待,等到Broker服务器的响应调用回调。

RocketMQ发送消息的例子

首先看下RocketMQ发送消息的例子:

public static void main(String[] args) throws MQClientException, InterruptedException {

        //创建默认的生产者
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        //启动生产者
        producer.start();
        //循环发送消息
        for (int i = 0; i < 1000; i++) {
            try {
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                //发送消息
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
 }

RocketMQ发送消息的例子比较简单,首先创建默认的消息生产者,然后启动生产者,最后循环发送消息。这里没有什么好说,接着,我们看下生产者发送消息的的方法:

//代码位置:org.apache.rocketmq.client.producer.DefaultMQProducer#send
public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 校验消息的合法性
        Validators.checkMessage(msg, this);
        msg.setTopic(withNamespace(msg.getTopic()));
        return this.defaultMQProducerImpl.send(msg);
}

默认消息发送以同步方式发送,默认超时时间为3s。

校验消息的合法性

send方法在发送消息之前,首先需要校验下消息的合法性,然后才进行发送消息。深入Validators.checkMessage方法看看,Validators.checkMessage方法到底做了哪些校验:

//代码位置: org.apache.rocketmq.client.Validators#checkMessage
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
        throws MQClientException {
        //消息空
        if (null == msg) {
            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
        }
        // topic 校验topic的合法性
        Validators.checkTopic(msg.getTopic());

        // body 消息为空
        if (null == msg.getBody()) {
            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
        }

        //消息的长度为空
        if (0 == msg.getBody().length) {
            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
        }

        //消息长度超过最大的消息大小 4M
        if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
                "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
        }
}

checkMessager方法校验了消息的合法性、以及topic的合法性。判断消息是否为null、消息体内容是否为null、、消息的长度是否为0、以及消息的大小。消息的大小限制为4M。限制消息的大小,是为了更好更快的传输消息,消息太大,网络传输就比较慢,防止RocketMQ处理不过来,消息太小,RocketMQ处理能力就会被大大减弱。检验topic的合法性的具体代码如下:

//代码位置:
public static void checkTopic(String topic) throws MQClientException {
        //topic是否为null
        if (UtilAll.isBlank(topic)) {
            throw new MQClientException("The specified topic is blank", null);
        }

        //正则topic
        if (!regularExpressionMatcher(topic, PATTERN)) {
            throw new MQClientException(String.format(
                "The specified topic[%s] contains illegal characters, allowing only %s", topic,
                VALID_PATTERN_STR), null);
        }

        //最大长度
        if (topic.length() > TOPIC_MAX_LENGTH) {
            throw new MQClientException(
                String.format("The specified topic is longer than topic max length %d.", TOPIC_MAX_LENGTH), null);
        }

        //whether the same with system reserved keyword
        //topic 不能为系统topic
        if (topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
            throw new MQClientException(
                String.format("The topic[%s] is conflict with AUTO_CREATE_TOPIC_KEY_TOPIC.", topic), null);
        }
}

checkTopic判断topic是否为null、是否符合topic的正则、topic的长度是否大于最大的长度127,以及消息发送的topic不能是系统的topic,如果topic不合法,则抛出异常。

消息发送

发送消息之前,进行了消息以及topic合法性的校验,当消息和topic都合法时,才进行消息的发送。不管是以什么方式发送消息,都是用sendDefaultImpl方法发送,传入不同的消息发送方式类型CommunicationMode,CommunicationMode有SYNC(同步)、ASYNC(异步)、ONEWAY(单向)三种。 sendDefaultImpl方法的逻辑比较长,主要分成下面几方面进行讲解:

  • 消息发送的准备
  • 消息的发送
  • 响应的处理

消息发送的准备

//代码位置:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

        //确定服务是否正常
        this.makeSureStateOK();
        //校验消息是否合法
        Validators.checkMessage(msg, this.defaultMQProducer);

        final long invokeID = random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
        //查找topic推送信息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

        //代码省略

}

消息发送之前,首先需要获取主题的路由信息,只有获取了这些信息我们才知道消息要发送到具体的Broker节点。

  • 确定服务是否正常,
  • 校验消息是否合法,
  • 查找topic推送信息,确定消息发往哪个Broker。校验消息是否合法在上面已经讲过了,一模一样的代码,这里应该是重复进行校验了。 具体看看makeSureStateOK方法:
    //代码位置:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#makeSureStateOK
    private void makeSureStateOK() throws MQClientException {
          if (this.serviceState != ServiceState.RUNNING) {
              throw new MQClientException("The producer service state not OK, "
                  + this.serviceState
                  + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                  null);
          }
    }
    

    消息在发送之前,要确保生产者是处于正常运行状态,当生产者启动异常或者生产者被关闭了,那么就不可以发送消息了。makeSureStateOK方法判断生产者的运行状态是否正在运行中,如果不是,则抛出生产者状态不正常的异常。

查找主题路由信息

接下来深入看看tryToFindTopicPublishInfo方法是如何寻找topic推送信息的,首先看下TopicPublishInfo类:

//代码位置:org.apache.rocketmq.client.impl.producer.TopicPublishInfo
public class TopicPublishInfo {
    // 是否有序
    private boolean orderTopic = false;
    // 是否有topic路由信息
    private boolean haveTopicRouterInfo = false;
    // 消息队列
    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
    // 消息队列索引
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    // 路由元数据
    private TopicRouteData topicRouteData;

    //代码省略

}

TopicPublishInfo主要有几个属性,主要属性为messageQueueList(消息队列)、sendWhichQueue(消息队列索引)、topicRouteData(路由元数据)。消息队列保存着topic和Broker 名字,通过消息队列就知道应该将消息发送给哪个Broker了。路由元数据保存着队列数据、broker数据等。 下面我们来一一介绍下TopicPublishInfo的属性。

  • orderTopic:是否是顺序消息。
  • ListmessageQueueList:该主题队列的消息队列。
  • sendWhichQueue:每选择一次消息队列,该值会自增1,如果Integer.MAX_VALUE,则重置为0,用于选择消息队列。
  • ListqueueData:topic队列元数据。
  • ListbrokerDatas:topic分布的broker元数据。
  • HashMap<String/brokerAdress/,List/*filterServer*/>:broker上过滤服务器地址列表。

然后让我们回到tryToFindTopicPublishInfo方法:

//代码位置:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        // 根据topic从topic推送信息表中查找路由信息
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        // 创建并放入topic推送信息表中
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            // 从Name server 服务器获取topic路由信息进行更新
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }

        // 如果找到的路由信息是可用的,直接返回路由信息
        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            // 否则从Name server服务器更新路由信息并从路由表获取路由信息
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
}

topicPublishInfoTable(路由信息表)表中缓存着路由信息,tryToFindTopicPublishInfo方法首先根据topic从topicPublishInfoTable中查找路由信息,当没有找到,就创建路由信息,加入topicPublishInfoTable缓存。并且对路由信息进行更新。当找到的路由信息是可用的,就直接返回,否则,从Name Server服务器更新路由信息并获取路由信息返回。

第一次发送消息时,本地没有缓存topic的路由信息,查询NameServer尝试获取,如果路由信息未找到,再次尝试用默认主题DefaultMQProducerImpl#createTopicKey去查询,如果BrokerConfig#autoCreateTopicEnable为true时,NameServer将返回路由信息,如果autoCreateTopicEnable为false将抛出无法找到topic路由异常。代码MQClientInstance#updateTopicRouteInfoFromNameServer这个方法的功能是消息生产者更新和维护路由缓存。

消息的发送

在找到路由信息以后,就进行消息发送的逻辑了,让我们回到sendDefaultImpl方法,sendDefaultImpl接下的代码比较多捕获异常处理的代码,这部分代码将会省略,不过不会影响主流程的分析,继续分析接下来的代码:

//代码位置:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

    //代码省略
     if (topicPublishInfo != null && topicPublishInfo.ok()) {
            boolean callTimeout = false;
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            //发送消息重试次数
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            //失败重试发送消息
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                //选择MessageQueue
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        beginTimestampPrev = System.currentTimeMillis();
                        if (times > 0) {
                            //Reset topic with namespace during resend.
                            // 重发消息的时候重置topic
                            msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                        }
                        // 现在发送的时间减去上次发送的时间
                        long costTime = beginTimestampPrev - beginTimestampFirst;
                        // 已经超时,就退出了循环
                        if (timeout < costTime) {
                            callTimeout = true;
                            break;
                        }

                        // 发送消息
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                        endTimestamp = System.currentTimeMillis();
                        // 缓存,跟容错有关
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        // 代码省略
                    } catch (RemotingException e) {
                        //省略代码
                    } catch (MQClientException e) {
                        //省略代码
                    } catch (MQBrokerException e) {
                       //省略代码
                    } catch (InterruptedException e) {
                       //省略代码
                    }
                } else {
                    break;
                }
            }
      //校验name server 地址的设置
       validateNameServerSetting();

       //抛出没有topic的路由的异常
        //代码省略

}

如果获取的路由信息topicPublishInfo等于null或者不可用的话,就直接退出抛出没有topic的路由的异常,否则才进行发送消息的流程。根据发送消息的方式CommunicationMode获取发送消息的失败重试次数,当发送方式为SYNC(同步)时,默认是发送3次,当是单向和异步发送方式时,默认发送次数为一次。这里为什么这样设置失败重试次数?当发送方式是同步时,发送次数设置为3次,保证消息尽可能的发送成功,如果三次都发送失败了,那么只能交给业务方去处理消息发送失败的异常了。如果发送方式是单向的,发送次数设置为1次,只管发送,不管结果。当发送方式是异步时。在这里只发送一遍,但是在具体发送异步消息时,也会进行失败消息的重试发送的

在发送消息之前,首先找到消息队列就可以知道消息发往哪里了。如果在消息发送之前,已经超时了,就直接退出发送消息循环了,就没有必要进行消息发送的方法了。准备好就可以调用sendKernelImpl方法进行消息的发送。

选择消息队列

上面已经分析,在发送消息前,先要知道消息到底被发送到哪个Broker服务了,消息队列(MessageQueue)保存着topic和brokerName,通过topic就可以找到消息被发送到哪个broker了。 例如:根据路由信息选择消息队列,返回的消息队列按照broker、序号排序。举例说明,如果topicA在broker-a,broker-b上分别创建了4个队列,那么返回的消息队列:[{“brokerName”:“broker-a”,“queueId”:0},{“brokerName”:“broker-a”,“queueId”:1},{“brokerName”:“broker-a”,“queueId”:2},{“brokerName”:“broker-a”,“queueId”:3},{“brokerName”:“broker-b”,“queueId”:0},{“brokerName”:“broker-b”,“queueId”:1},{“brokerName”:“broker-b”,“queueId”:2},{“brokerName”:“broker-b”,“queueId”:3}],那RocketMQ如何选择消息队列呢?

首先消息发送端采用重试机制,由retryTimesWhenSendFailed指定同步方式重试次数,异步重试机制在收到消息发送结构后执行回调之前进行重试。由retryTimesWhenSendAsyncFailed指定,接下来就是循环执行,选择消息队列、发送消息,发送成功则返回,收到异常则重试。选择消息队列有两种方式。

  • sendLatencyFaultEnable=false,默认不启用Broker故障延迟机制。
  • sendLatencyFaultEnable=true,启用Broker故障延迟机制。

selectOneMessageQueue方法是寻找消息队列的,具体寻找消息队列的方法如下:

//代码位置:org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        //如果开启发送延迟故障
        if (this.sendLatencyFaultEnable) {
            try {
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                //遍历消息队列。按照顺序轮询获取消息队列
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    //延迟容错可用,根据broker name从faultItemTable获取的faultItem是否可用,可用就返回消息队列
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }

                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                //写队列数
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    //选择一个消息队列
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }

            return tpInfo.selectOneMessageQueue();
        }

        //如果没有开始发送延迟故障开关
        return tpInfo.selectOneMessageQueue(lastBrokerName);
}

sendLatencyFaultEnable是发送延迟故障开关,默认设置为false。当sendLatencyFaultEnable设置为true时,消息队列时时如何被选取出来的?首先遍历消息队列。按照简单的轮询选取消息队列,当消息队列可用时,选择消息队列的工作就结束,直接返回选择到的消息队列。如果按照顺序轮询没有找到,那么使用无参数的selectOneMessageQueue方法获取消息队列。

如果sendLatencyFaultEnable设置为false,那么使用有参数的selectOneMessageQueue方法获取消息队列,我们先看看无参数的selectOneMessageQueue具体是如何查找消息队列的:

//代码位置:org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue
public MessageQueue selectOneMessageQueue() {
        int index = this.sendWhichQueue.getAndIncrement();
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
        return this.messageQueueList.get(pos);
}

无参数的selectOneMessageQueue方法的逻辑也比较简单,通过轮询的方式从消息队列表中查找消息队列。 再来看看以brokerNmae为入参的selectOneMessageQueue方法:

//代码位置:org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            int index = this.sendWhichQueue.getAndIncrement();
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int pos = Math.abs(index++) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            return selectOneMessageQueue();
        }
}

当入参lastBrokerName为null时,调用上述的无入参的selectOneMessageQueue方法轮询随机获取消息队列。如果lastBrokerName不等于null,根据lastBrokerName找到消息队列,如果没有找到与lastBrokerName相等的消息队列的话,也是调用无入参的selectOneMessageQueue方法轮询随机获取消息队列。

首先在一次消息发送过程中,可能会多次执行选择消息队列这个方法,lastBrokerName就是上一次选择的执行发送消息失败的Broker。第一次执行消息队列选择时,lastBrokerName为null,此时直接用sendWhichQueue自增再获取值,与当前路由表中消息队列个数取模,返回该位置的MessageQueue(selectOneMessageQueue()方法),如果消息发送再失败的话,下次进行消息队列选择时规避上次MesageQueue所在的Broker,否则还是很有可能再次失败。

该算法在一次消息发送过程中能成功规避故障的Broker,但如果Broker宕机,由于路由算法中的消息队列是按Broker排序的,如果上一次根据路由算法选择的是宕机的Broker的第一个队列,那么随后的下次选择的是宕机Broker的第二个队列,消息发送很有可能会失败,再次引发重试,带来不必要的性能损耗,那么有什么方法在一次消息发送失败后,暂时将该Broker排除在消息队列选择范围外呢?或许有朋友会问,Broker不可用后,路由信息中为什么还会包含该Broker的路由信息呢?其实这不难解释:首先,NameServer检测Broker是否可用是有延迟的,最短为一次心跳检测间隔(10s);其次,NameServer不会检测到Broker宕机后马上推送消息给消息生产者,而是消息生产者每隔30s更新一次路由信息,所以消息生产者最快感知Broker最新的路由信息也需要30s。如果能引入一种机制,在Broker宕机期间,如果一次消息发送失败后,可以将该Broker暂时排除在消息队列的选择范围中。

Broker故障延迟机制

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final 
        String lastBrokerName) {
    if (this.sendLatencyFaultEnable) {
        try {
            int index = tpInfo.getSendWhichQueue().getAndIncrement();
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                    if (null == lastBrokerName || 
                            mq.getBrokerName().equals(lastBrokerName))
                        return mq;
                    }
                }
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() %
                            writeQueueNums);
                    }
                    return mq;
            } else {
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }
        return tpInfo.selectOneMessageQueue();
    }
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

首先对上述代码进行解读。

  • 根据对消息队列进行轮询获取一个消息队列。
  • 验证该消息队列是否可用,latencyFaultTolerance.isAvailable(mq.getBrokerName())是关键。
  • 如果返回的MessageQueue可用,移除latencyFaultTolerance关于该topic条目,表明该Broker故障已经恢复。

RocketMQ故障延迟机制核心类

LatencyFaultTolerance:延迟机制接口规范。

  • void updateFaultItem(final T name,final long currentLatency,final long notAvailableDuration) 更新失败条目。
    • name:brokerName。
    • currentLatency:消息发送故障延迟时间。
    • notAvailableDuration:不可用持续时长,在这个时间内,Broker将被规避。
  • boolean isAvailable(final T name) 判断Broker是否可用。
    • name:broker名称。
  • void remove(final T name) 移除Fault条目,意味着Broker重新参与路由计算。
  • T pickOneAtLeast() 尝试从规避的Broker中选择一个可用的Broker,如果没有找到,将返回null。

FaultItem:失败条目(规避规则条目)。

  • final String name条目唯一键,这里为brokerName。
  • private volatile long currentLatency本次消息发送延迟。
  • private volatile long startTimestamp故障规避开始时间。

MQFaultStrategy:消息失败策略,延迟实现的门面类。

  • long[]latencyMax={50L,100L,550L,1000L,2000L,3000L,15000L},
  • long[]notAvailableDuration={0L,0L,30000L,60000L,120000L,180000L,600000L}
  • latencyMax,根据currentLatency本次消息发送延迟,从latencyMax尾部向前找到第一个比currentLatency小的索引index,如果没有找到,返回0。然后根据这个索引从notAvailableDuration数组中取出对应的时间,在这个时长内,Broker将设置为不可用。
    beginTimestampPrev = System.currentTimeMillis();
    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, 
          topicPublishInfo, timeout);
    endTimestamp = System.currentTimeMillis();
    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, 
      false);
    

    上述代码如果发送过程中抛出了异常,调用DefaultMQProducerImpl#updateFaultItem,该方法则直接调用MQFaultStrategy#updateFaultItem方法,关注一下各个参数的含义。

  • 第一个参数:broker名称。
  • 第二个参数:本次消息发送延迟时间currentLatency。
  • 第三个参数:isolation,是否隔离,该参数的含义如果为true,则使用默认时长30s来计算Broker故障规避时长,如果为false,则使用本次消息发送延迟时间来计算Broker故障规避时长。
    public void updateFaultItem(final String brokerName, final long currentLatency,
          boolean isolation) {
      if (this.sendLatencyFaultEnable) {
              long duration = computeNotAvailableDuration(isolation ? 30000 : 
                  currentLatency);
              this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, 
                      duration);
      }
    }
    private long computeNotAvailableDuration(final long currentLatency) {
      for (int i = latencyMax.length - 1; i >= 0; i--) {
          if (currentLatency >= latencyMax[i])
              return this.notAvailableDuration[i];
          }
          return 0;
      }
    }
    

    如果isolation为true,则使用30s作为computeNotAvailableDuration方法的参数;如果isolation为false,则使用本次消息发送时延作为computeNotAvailableDuration方法的参数,那computeNotAvailableDuration的作用是计算因本次消息发送故障需要将Broker规避的时长,也就是接下来多久的时间内该Broker将不参与消息发送队列负载。具体算法:从latencyMax数组尾部开始寻找,找到第一个比currentLatency小的下标,然后从notAvailableDuration数组中获取需要规避的时长,该方法最终调用LatencyFaultTolerance的updateFaultItem。

    public void updateFaultItem(final String name, final long currentLatency, final 
          long notAvailableDuration) {
      FaultItem old = this.faultItemTable.get(name);
      if (null == old) {
          final FaultItem faultItem = new FaultItem(name);
          faultItem.setCurrentLatency(currentLatency);
          faultItem.setStartTimestamp(System.currentTimeMillis() + 
              notAvailableDuration);
          old = this.faultItemTable.putIfAbsent(name, faultItem);
          if (old != null) {
              old.setCurrentLatency(currentLatency);
              old.setStartTimestamp(System.currentTimeMillis() + 
                  notAvailableDuration);
          }
      } else {
          old.setCurrentLatency(currentLatency);
          old.setStartTimestamp(System.currentTimeMillis() + 
              notAvailableDuration);
      }
    }
    

    根据broker名称从缓存表中获取FaultItem,如果找到则更新FaultItem,否则创建FaultItem。这里有两个关键点。

  • currentLatency、startTimeStamp被volatile修饰。
  • startTimeStamp为当前系统时间加上需要规避的时长。startTimeStamp是判断broker当前是否可用的直接一句,请看FaultItem#isAvailable方法。

发送消息

找到消息队列以后,就可以进行消息的发送了,具体发送消息的方法为sendKernelImpl,sendKernelImpl方法主要做了几件事:

  • 查找Broker地址
  • 消息的处理
  • 执行增强钩子
  • 消息请求头初始化
  • 消息发送

查找Broker地址

消息发送API核心入口:DefaultMQProducerImpl#sendKernelImpl。 消息发送参数详解。

  • Message msg:待发送消息。
  • MessageQueue mq:消息将发送到该消息队列上。
  • CommunicationMode communicationMode:消息发送模式,SYNC、ASYNC、ONEWAY。
  • SendCallback sendCallback:异步消息回调函数。
  • TopicPublishInfo topicPublishInfo:主题路由信息
  • long timeout:消息发送超时时间。
//代码位置:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl
private SendResult sendKernelImpl(final Message msg,
        final MessageQueue mq,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        //查找broker地址
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        if (null == brokerAddr) {
            //从topicPublishInfoTable中查找topic推送信息
            tryToFindTopicPublishInfo(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }

        //代码省略
}

在发送消息之前,需要知道将消息发送到哪个Broker服务器上。上面分析过程中已经在消息队列中找到了Broker的名字。findBrokerAddressInPublish方法通过broker名字查找broker地址,当查找的地址为null时,先更新下路由信息,然后再使用findBrokerAddressInPublish方法查找broker地址。如果找到了Broker 地址,则进行自下一步的流程,否则,直接抛出没有找Broker地址的异常。

findBrokerAddressInPublish方法如下:

//代码位置:org.apache.rocketmq.client.impl.factory.MQClientInstance#findBrokerAddressInPublish
public String findBrokerAddressInPublish(final String brokerName) {
        HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
        if (map != null && !map.isEmpty()) {
            return map.get(MixAll.MASTER_ID);
        }

        return null;
}

brokerAddrTable保存着<Broker Name,<brokerId,address»的关系,通过Broker 的名字查找的关系,消息是发送到Master Broker服务器上的,而Master服务器的Broker Id 等于0。

消息的处理

//代码位置:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl
private SendResult sendKernelImpl(final Message msg,
        final MessageQueue mq,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

        //省略代码
        SendMessageContext context = null;
        if (brokerAddr != null) {
            //如果发送消息使用vip通道,那么端口号会减少2
            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

            byte[] prevBody = msg.getBody();
            try {
                //for MessageBatch,ID has been set in the generating process
                //如果不是批量消息,设置唯一uid
                if (!(msg instanceof MessageBatch)) {
                    MessageClientIDSetter.setUniqID(msg);
                }

                //设置实例id
                boolean topicWithNamespace = false;
                if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
                    msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
                    topicWithNamespace = true;
                }

                int sysFlag = 0;
                boolean msgBodyCompressed = false;
                //尝试压缩消息
                if (this.tryToCompressMessage(msg)) {
                    //设置压缩标志
                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                    msgBodyCompressed = true;
                }

                //事务消息
                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
                }

                //省略代码
            } catch (RemotingException e) {
               //省略代码
            } catch (MQBrokerException e) {
               //省略代码
            } catch (InterruptedException e) {
               //省略代码
            } finally {
               //省略代码
            }
        }

    throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);

}

上述代码主要是对消息进行一些处理,当如果不是批量消息,设置唯一uid,批量消息在消息生成过程中已经设置过了;判断是否需要设置消息的压缩标志,当达到压缩的条件的时候,将消息压缩以后再进行发送。最后判断是否是事务消息,如果是就设置消息的标志位为事务消息。 为消息分配全局唯一ID,如果消息体默认超过4K(compressMsgBodyOverHowmuch),会对消息体采用zip压缩,并设置消息的系统标记为MessageSysFlag.COMPRESSED_FLAG。如果是事务Prepared消息,则设置消息的系统标记为MessageSysFlag.TRANSACTION_PREPARED_TYPE。

接下里看看tryToCompressMessage方法是怎么压缩消息的,代码如下:

//代码位置:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToCompressMessage
private boolean tryToCompressMessage(final Message msg) {
        //批量消息不压缩
        if (msg instanceof MessageBatch) {
            //batch dose not support compressing right now
            return false;
        }
        byte[] body = msg.getBody();
        if (body != null) {
            //如果消息长度大于压缩消息阈值
            if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
                try {
                    //压缩消息
                    byte[] data = UtilAll.compress(body, zipCompressLevel);
                    if (data != null) {
                        msg.setBody(data);
                        return true;
                    }
                } catch (IOException e) {
                    log.error("tryToCompressMessage exception", e);
                    log.warn(msg.toString());
                }
            }
        }

        return false;
}

当消息的类型是批量消息时,不支持压缩。如果消息长度大于压缩消息阈值(4k),那么就将消息进行压缩,最后将压缩过后的消息重新设置回消息内容中,消息压缩采用Java原生的压缩类进行压缩,有兴趣的可以深入看看是如何进行消息的压缩的。消息压缩是为了传输更小的文件流,有利于更快的进行网络传输。

执行增强钩子

如果注册了消息发送钩子函数,则执行消息发送之前的增强逻辑。通过DefaultMQProducerImpl#registerSendMessageHook注册钩子处理类,并且可以注册多个。

//代码位置:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl

//如果有检查禁止钩子
 if (hasCheckForbiddenHook()) {
      CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
      checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
      checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
      checkForbiddenContext.setCommunicationMode(communicationMode);
      checkForbiddenContext.setBrokerAddr(brokerAddr);
      checkForbiddenContext.setMessage(msg);
      checkForbiddenContext.setMq(mq);
      checkForbiddenContext.setUnitMode(this.isUnitMode());
      this.executeCheckForbiddenHook(checkForbiddenContext);
 }

 //消息发送钩子
 if (this.hasSendMessageHook()) {
        context = new SendMessageContext();
        context.setProducer(this);
        context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        context.setCommunicationMode(communicationMode);
        context.setBornHost(this.defaultMQProducer.getClientIP());
        context.setBrokerAddr(brokerAddr);
        context.setMessage(msg);
        context.setMq(mq);
        context.setNamespace(this.defaultMQProducer.getNamespace());
        String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
        //如果是事务消息,将消息类型设置为半消息
        if (isTrans != null && isTrans.equals("true")) {
              context.setMsgType(MessageType.Trans_Msg_Half);
         }

         //延迟消息
         if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                        context.setMsgType(MessageType.Delay_Msg);
         }
         //发送消息之前,执行钩子函数
         this.executeSendMessageHookBefore(context);
  }

hasCheckForbiddenHook方法判断是否有检查禁止钩子,如果有就遍历执行所有的钩子。检查禁止钩子的注是在业务应用层,使用者首先实现CheckForbiddenHook接口,该接口的checkForbidden方法是在消息发送前检查是否属于禁止消息,并且调用DefaultMQProducerImpl类的registerCheckForbiddenHook方法注册检查禁止钩子,注册检查禁止钩子的逻辑也比较简单,就是在将检查钩子保存在List列表中。

hasSendMessageHook方法判断是否包含消息发送钩子,如果有就遍历执行所有的钩子。也像检查禁止钩子一样,先在业务应用层实现SendMessageHook接口,然后调用DefaultMQProducerImpl类的registerSendMessageHook注册发送消息钩子。SendMessageHook接口有两个方法,sendMessageBefore方法是消息发送之前执行,sendMessageAfter方法在消息发送之后执行,业务层面在这两个方法实现业务逻辑。

消息请求头初始化

//代码位置:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl
//发送消息头
                // 构建消息发送请求包。主要包含如下重要信息:生产者组、主题名称、默认创建主题Key、该主题在单个Broker 默认队列数、队列ID (队列序号)、
                // 消息系统标记( MessageSysFlag ) 、消息发送时间、消息标记(RocketMQ 对消息中的flag 不做任何处理,供应用程序使用) 、消息扩展属性、消息重试次数、是否是批量消息等。
                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                // 生产者组
                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                // 主题名称
                requestHeader.setTopic(msg.getTopic());
                // 默认创建主题Key
                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                // 该主题在单个Broker 默认队列数
                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                // 队列ID (队列序号)
                requestHeader.setQueueId(mq.getQueueId());
                // 消息系统标记( MessageSysFlag )
                requestHeader.setSysFlag(sysFlag);
                // 消息发送时间
                requestHeader.setBornTimestamp(System.currentTimeMillis());
                // 消息标记(RocketMQ 对消息中的flag 不做任何处理,供应用程序使用)
                requestHeader.setFlag(msg.getFlag());
                // 消息扩展属性
                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                // 消息重试次数
                requestHeader.setReconsumeTimes(0);
                requestHeader.setUnitMode(this.isUnitMode());
                // 是否是批量消息等
                requestHeader.setBatch(msg instanceof MessageBatch);
//%RETRY%
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
      //重新消费次数
      String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
      if (reconsumeTimes != null) {
              requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
              MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
      }

       //最大消费次数
       String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
       if (maxReconsumeTimes != null) {
              requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
              MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
       }
 }

初始化消息请求头的逻辑也比较简单,设置下消息请求头的一些属性,如消息生产组、topic、消息标志、消费次数、是否是批次消息、最大消费次数等属性。Broker服务器根据消息请求头的属性信息,就可以知道消息的类型,属性哪个topic的消息以及将消息保存在哪里。 构建消息发送请求包。主要包含如下重要信息:生产者组、主题名称、默认创建主题Key、该主题在单个Broker默认队列数、队列ID(队列序号)、消息系统标记(MessageSysFlag)、消息发送时间、消息标记(RocketMQ对消息中的flag不做任何处理,供应用程序使用)、消息扩展属性、消息重试次数、是否是批量消息等。

消息发送

SendResult sendResult = null;
//发送方式
switch (communicationMode) {
        case ASYNC:
             Message tmpMessage = msg;
             boolean messageCloned = false;
             if (msgBodyCompressed) {
                    //If msg body was compressed, msgbody should be reset using prevBody.
                    //Clone new message using commpressed message body and recover origin massage.
                    //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                    tmpMessage = MessageAccessor.cloneMessage(msg);
                    messageCloned = true;
                    //如果压缩了,还原原来的消息
                    msg.setBody(prevBody);
              }

              if (topicWithNamespace) {
                    if (!messageCloned) {
                           tmpMessage = MessageAccessor.cloneMessage(msg);
                           messageCloned = true;
                    }
                    msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
              }

              long costTimeAsync = System.currentTimeMillis() - beginStartTime;
              //还没有发送之前就超时,抛出异常
              if (timeout < costTimeAsync) {
                   throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
              }
              //发送消息
              sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            tmpMessage,
                            requestHeader,
                            timeout - costTimeAsync,
                            communicationMode,
                            sendCallback,
                            topicPublishInfo,
                            this.mQClientFactory,
                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                            context,
                            this);
                break;
          case ONEWAY:
          case SYNC:
          long costTimeSync = System.currentTimeMillis() - beginStartTime;
          //超时抛出异常
          if (timeout < costTimeSync) {
                 throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
          }

          //发送消息
          sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            msg,
                            requestHeader,
                            timeout - costTimeSync,
                            communicationMode,
                            context,
                            this);
               break;
           default:
                 assert false;
                 break;
}

根据发送消息的类型,选择走不同的sendMessage重载方法。根据消息发送方式,同步、异步、单向方式进行网络传输。

  • 当发送消息的类型是ASYNC(异步)时,首先判断消息是否被压缩过,如果压缩过,将消息体中消息内容还原为原来的消息,将压缩过的消息发送给Broker服务器,这样做的目的是,在生产者服务这里,需要看到原来的消息,当发生异常等,需要知道原来的消息。如果在发送消息之前超时,就直接抛出超时的异常,RocketMQ在消息发送之前都会进行消息超时的检测,在消息发送之前,就拦截了超时的发送。
  • 消息发送类型是异步时,sendMessage方法还有发送重试次数,为了异步发送消息尽可能的成功。不管是什么方式发送消息,为了消息的尽可能成功,都重试发送了消息。分析了这么久,都还没有分析到真正的消息发送。都是在分析消息发送之前的一些准备工作,接下来分析真正的消息发送,sendMessage方法的代码如下:
//代码位置:org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessage
public SendResult sendMessage(
        final String addr,
        final String brokerName,
        final Message msg,
        final SendMessageRequestHeader requestHeader,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final MQClientInstance instance,
        final int retryTimesWhenSendFailed,
        final SendMessageContext context,
        final DefaultMQProducerImpl producer
    ) throws RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        RemotingCommand request = null;
        // 消息类型
        String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
        // 是否重试
        boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);

        //发送消息请求头压缩
        if (isReply) {
            //消息更轻量
            if (sendSmartMsg) {
                SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
                request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
            } else {
                request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
            }
        } else {
            //消息轻量发送或批量消息
            if (sendSmartMsg || msg instanceof MessageBatch) {
                SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
                request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
            } else {
                request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
            }
        }
        request.setBody(msg.getBody());

        switch (communicationMode) {
            //单向
            case ONEWAY:
                this.remotingClient.invokeOneway(addr, request, timeoutMillis);
                return null;
                //异步
            case ASYNC:
                final AtomicInteger times = new AtomicInteger();
                long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                //超时
                if (timeoutMillis < costTimeAsync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
                    retryTimesWhenSendFailed, times, context, producer);
                return null;
                //同步发送消息
            case SYNC:
                long costTimeSync = System.currentTimeMillis() - beginStartTime;
                //超时直接抛出异常
                if (timeoutMillis < costTimeSync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
            default:
                assert false;
                break;
        }

        return null;
}

在发送消息之前,首先判断下是否需要压缩下发送消息请求头,压缩发送消息请求头就是将请求头的属性修改为占内存较小、不利于阅读的单字母表示。当发送消息请求头标志sendSmartMsg位为true或者是发送批量消息时,都需要将原来的发送消息请求头转换为轻量发送消息请求头。发送消息请求头在网络传输过程中所占字节数比较少,有利消息的快速传输。然后根据发送消息的方式,选择不同的消息发送方法。RocketMQ为了消息在网络中快速传输,第一对消息体进行压缩,第二,对消息请求头进行更轻量的转换。

接下来,我们将分析单向、同步、异步发送消息方法,比较下三种发送方式的区别。

单向发送

//代码位置:org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeOneway
public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
        RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {

        //根据broker地址查找连接
        final Channel channel = this.getAndCreateChannel(addr);
        //连接正常
        if (channel != null && channel.isActive()) {
            try {
                //发送消息之前执行钩子
                doBeforeRpcHooks(addr, request);
                this.invokeOnewayImpl(channel, request, timeoutMillis);
            } catch (RemotingSendRequestException e) {
                log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
                this.closeChannel(addr, channel);
                throw e;
            }
        } else {
            //不正常就关闭连接
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
}

invokeOneway方法首先通过broker地址查找生产者与Broker服务器之间的连接,如果连接不为null或者正常,在发送消息之前执行钩子,然后调用invokeOnewayImpl方法发送单向的消息。如果连接不正常或者发送消息失败就关闭连接。getAndCreateChannel方法是根据broker地址查找连接,连接存在channelTables(map)中,当channelTables中找不到连接,就创建一个生产者与Broker服务器的连接保存到channelTables中。接下来,看看invokeOnewayImpl方法是如何以单向的方式发送消息的:

//代码位置:org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#invokeOnewayImpl
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
        // 设置单向发送的方式
        request.markOnewayRPC();
        // 获取信号聊量
        boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
        if (acquired) {
            final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
            try {
                //将请求发送出去
                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture f) throws Exception {
                        once.release();
                        if (!f.isSuccess()) {
                            log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
                        }
                    }
                });
            } catch (Exception e) {
                once.release();
                log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");
                throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
            }
        } else {
            //如果没有获取到信号量,已经超时,则抛出异常
            if (timeoutMillis <= 0) {
                throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
            } else {
                String info = String.format(
                    "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
                    timeoutMillis,
                    this.semaphoreOneway.getQueueLength(),
                    this.semaphoreOneway.availablePermits()
                );
                log.warn(info);
                throw new RemotingTimeoutException(info);
            }
        }
}

invokeOnewayImpl方法首先将设置单向发送的标志,然后获取信号量,如果获取到信号量,就利用netty 连接将消息发送给Broker 服务器,当发送完成以后,就释放信号量,如发送不成功,就打印日志。发送消息异常就释放信号量,抛出发送消息异常信息。获取信号量不成功,也抛出发送消异常的信息。

这里有个重要的知识点,单向发送消息的信号量最大请求为65535,当超过这个数就不能进行发送消息了。只有获取到信号量才能进行发送消息,这么做的就是为了避免请求过多,导致RocketMQ的压力过大而出现性能问题,也起到了限流的作用,保护RocketrMQ。

单向发送消息,将消息发送以后,就不管发送结果了,只要将消息发送出去就行,也不会进行重试。从上面代码也可以看出,消息发送出去以后,只释放信号量,当发送不成功就打印日志,不管消息发送的结果。

同步发送

MQ客户端发送消息的入口是MQClientAPIImpl#sendMessage。请求命令是RequestCode.SEND_MESSAGE,我们可以找到该命令的处理类:org.apache.rocketmq.broker.processor.SendMessageProcessor。入口方法在SendMessageProcessor#sendMessage。

//代码位置:org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessageSync
private SendResult sendMessageSync(
        final String addr,
        final String brokerName,
        final Message msg,
        final long timeoutMillis,
        final RemotingCommand request
    ) throws RemotingException, MQBrokerException, InterruptedException {
        // 发送同步消息
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        assert response != null;
        // 处理消息发送响应结果
        return this.processSendResponse(brokerName, msg, response);
}

消息的同步发送,不仅需要将消息发送出去,还要处理消息发送的响应结果,sendMessageSync方法首先将消息发送给Broker服务器,然后将消息响应的结果进行处理。首先我们看看同步发送消息的方法:

//代码位置:
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
        throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
        long beginStartTime = System.currentTimeMillis();
       // 根据broker地址查找连接
        final Channel channel = this.getAndCreateChannel(addr);
        if (channel != null && channel.isActive()) {
            try {
                // 在执行发送请求消息前,执行钩子
                doBeforeRpcHooks(addr, request);
                long costTime = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTime) {
                    throw new RemotingTimeoutException("invokeSync call timeout");
                }
                RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
                // 接收到消息之后,执行钩子
                doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
                return response;
            } catch (RemotingSendRequestException e) {
                log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
                this.closeChannel(addr, channel);
                throw e;
            } catch (RemotingTimeoutException e) {
                if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                    this.closeChannel(addr, channel);
                    log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
                }
                log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
                throw e;
            }
        } else {
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
}

同步发送与单向发送方式很像,都是先根据broker地址查找连接,如果连接正常,在消息发送之前和消息发送之后就执行钩子方法。然后将消息发送出去,将消息响应结果返回,如果消息发送出现异常,就关闭连接,抛出异常。深入invokeSyncImpl方法,看看同步发送消息的具体逻辑:

//代码位置:org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#invokeSyncImpl
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
        final long timeoutMillis)
        throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {

        //请求id,通过该id可以找到该请求的响应结果
        final int opaque = request.getOpaque();

        try {
            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
            //将请求id与响应结果的对饮关系保存在map中
            this.responseTable.put(opaque, responseFuture);
            final SocketAddress addr = channel.remoteAddress();
            //将消息发送出去
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    //消息发送成功,设置发送成功的标志
                    if (f.isSuccess()) {
                        responseFuture.setSendRequestOK(true);
                        return;
                    } else {
                        //发送失败
                        responseFuture.setSendRequestOK(false);
                    }

                    //请求失败,删除请求-响应的对应关系
                    responseTable.remove(opaque);
                    //发送异常
                    responseFuture.setCause(f.cause());
                    //发送结果为null
                    responseFuture.putResponse(null);
                    log.warn("send a request command to channel <" + addr + "> failed.");
                }
            });

            //等待响应结果
            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
            //响应结果为空
            if (null == responseCommand) {
                //发送成功,但是没有响应,抛出超时的异常
                if (responseFuture.isSendRequestOK()) {
                    throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                        responseFuture.getCause());
                } else {
                    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
                }
            }

            return responseCommand;
        } finally {
            //最后删除请求与响应的对应关系
            this.responseTable.remove(opaque);
        }
}

同步发送方法首先创建ResponseFuture,ResponseFuture是保存请求响应结果的,opaque是请求id,将请求与响应的对应关系保存在responseTable(map)中,通过请求id就可以找到对应的响应结果了。然后利用netty连接将消息以异步的方式发送出去,添加一个监听器,监听消息是否成功发送,当监听到消息成功发送,就设置发送成功的标志,否则设置发送失败的标志,并且删除请求与响应的对应关系、以及异常原因。

消息通过netty连接发送以后,就等待消息的响应结果。当响应结果为null,但是发送成功,那么就抛出超时的异常,否则就抛出其他异常,消息不为null,就返回响应结果,最后删除请求与响应的关系。

//代码位置:org.apache.rocketmq.remoting.netty.ResponseFuture#waitResponse
public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
        this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
        return this.responseCommand;
}

发送消息是通过异步的方式发送,然后进行等待消息的响应结果,那么这里是如何实现等待的?利用countDownLatch.await方法等待,达到阻塞的目的,当超时就返回响应。响应返回以后,就需要处理响应了。处理响应的代码就不贴了,大致逻辑就是根据响应状态码,设置发送的结果的状态,设置发送结果,最后将发送结果返回。

异步发送

//代码位置:org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessageAsync
private void sendMessageAsync(
        final String addr,final String brokerName,final Message msg,final long timeoutMillis,
        final RemotingCommand request,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,
        final MQClientInstance instance,final int retryTimesWhenSendFailed,final AtomicInteger times,
        final SendMessageContext context,final DefaultMQProducerImpl producer
    ) throws InterruptedException, RemotingException {
        final long beginStartTime = System.currentTimeMillis();
        this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
            @Override
            public void operationComplete(ResponseFuture responseFuture) {
                long cost = System.currentTimeMillis() - beginStartTime;
                RemotingCommand response = responseFuture.getResponseCommand();
                if (null == sendCallback && response != null) {

                    try {
                        //处理响应结果
                        SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
                        if (context != null && sendResult != null) {
                            context.setSendResult(sendResult);
                            context.getProducer().executeSendMessageHookAfter(context);
                        }
                    } catch (Throwable e) {
                    }

                    //放入容错表中
                    producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
                    return;
                }

                if (response != null) {
                    try {
                        SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
                        assert sendResult != null;
                        if (context != null) {
                            context.setSendResult(sendResult);
                            context.getProducer().executeSendMessageHookAfter(context);
                        }

                        try {
                            sendCallback.onSuccess(sendResult);
                        } catch (Throwable e) {
                        }

                        producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
                    } catch (Exception e) {
                        producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
                        // 异常处理
                        onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
                            retryTimesWhenSendFailed, times, e, context, false, producer);
                    }
                } else {
                    producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
                    // 发送不成功
                    if (!responseFuture.isSendRequestOK()) {
                        MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
                        onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
                            retryTimesWhenSendFailed, times, ex, context, true, producer);
                    } else if (responseFuture.isTimeout()) {
                        //超时,代码同发送不成功一样
                    } else {
                        //其他异常,代码同发送不成功一样
                    }
                }
            }
        });
 }

sendMessageAsync方法主要调用逻辑是调用invokeAsync方法发送异步消息 ,将响应结果交给InvokeCallback类进行回调处理。为了避免消息异步发送失败,将消息保存在容错表,供重试发送,当消息发送出现异常时,调用onExceptionImpl方法处理异常,实际上onExceptionImpl方法也是将消息重试发送,直到消息被成功发送。我们继续分析invokeAsync方法:

//代码位置:org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeAsync
public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
        throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
        RemotingSendRequestException {
        long beginStartTime = System.currentTimeMillis();
        // 根据broker地址从连接表中获取连接
        final Channel channel = this.getAndCreateChannel(addr);
        // 如果连接不等于空和活跃
        if (channel != null && channel.isActive()) {
            try {
                doBeforeRpcHooks(addr, request);
                long costTime = System.currentTimeMillis() - beginStartTime;
                // 超时,抛出异常
                if (timeoutMillis < costTime) {
                    throw new RemotingTooMuchRequestException("invokeAsync call timeout");
                }
                this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback);
            } catch (RemotingSendRequestException e) {
                log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
                this.closeChannel(addr, channel);
                throw e;
            }
        } else {
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
}

消息异步发送invokeAsync方法,也是首先根据broker地址从缓存表中找到生产者与beroker服务器的连接,如果连接正常,在消息发送之前调用钩子进行增强。此时要是已经超时,则直接抛出异常,就不发送消息了。在发送消息过程要是发生了异常,先关闭连接,然后把异常抛出。接下来继续深入消息异步发送方法invokeAsyncImpl:

//代码位置:org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#invokeAsyncImpl
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
        long beginStartTime = System.currentTimeMillis();
        final int opaque = request.getOpaque();
        //设置信号量
        boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
        if (acquired) {
            final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
            long costTime = System.currentTimeMillis() - beginStartTime;
            //超时,抛出异常
            if (timeoutMillis < costTime) {
                once.release();
                throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
            }

            // 将响应Future放入响应表中
            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
            this.responseTable.put(opaque, responseFuture);
            try {
                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture f) throws Exception {
                        // 发送成功,则设置发送成功标志
                        if (f.isSuccess()) {
                            responseFuture.setSendRequestOK(true);
                            return;
                        }
                        // 失败,从响应表中删除该请求-响应表之间的关系
                        requestFail(opaque);
                        log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
                    }
                });
            } catch (Exception e) {
                responseFuture.release();
                log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
                throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
            }
        } else {
            // 没有获取到信号量
            // 超时
            if (timeoutMillis <= 0) {
                throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
            } else {
                String info =
                    String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
                        timeoutMillis,
                        this.semaphoreAsync.getQueueLength(),
                        this.semaphoreAsync.availablePermits()
                    );
                log.warn(info);
                throw new RemotingTimeoutException(info);
            }
        }
}

消息异步发送方法invokeAsyncImpl,也和单向发送方式一样,都要首先获取信号量,才能进行消息的发送,如果没有获取到信号量,则抛出异常。也和同步方式一样,将请求id和响应的对应关系保存在responseTable中,通过请求id就可以找到响应了。通过netty连接将消息发送给Broker服务器,利用监听消息是否发送完成,如果发送发送成功,则设置发送成功的标志,否则删除请求id与响应的对应关系。

好了,分析到这里,RocketMQ生产者发送消息的源码就分析完了,现在总结下单向、同步、异步三种发送消息放入方式:

  • 单向:只管发送消息,不管发送结果。为了保护RocketMQ,在发送消息之前需要获取信号量才能发送消息,信号量最最大值为65535.
  • 同步:不仅管发送消息,也管发送结果,在重试次数范围内保证消息尽可能发送成功,发送结果返回以后还需要进行处理。发送完消息以后,在规定的时间内阻塞等到消息结果的返回。
  • 异步:不仅管发送消息,也管发送结果,保证消息尽可能的成功,消息发送失败或则出现异常,都会进行重试发送,发送消息完以后,等消息结果返回采用回调告知生产者。异步的方式也是需要获取信号量才能发送消息。

批量消息发送

批量消息发送是将同一主题的多条消息一起打包发送到消息服务端,减少网络调用次数,提高网络传输效率。当然,并不是在同一批次中发送的消息数量越多性能就越好,其判断依据是单条消息的长度,如果单条消息内容比较长,则打包多条消息发送会影响其他线程发送消息的响应时间,并且单批次消息发送总长度不能超过DefaultMQProducer#maxMessageSize。批量消息发送要解决的是如何将这些消息编码以便服务端能够正确解码出每条消息的消息内容。

那RocketMQ如何编码多条消息呢?我们首先梳理一下RocketMQ网络请求命令设计。 下面我们来一一介绍下RemotingCommand的属性。

  • code:请求命令编码,请求命令类型。
  • version:版本号。
  • opaque:客户端请求序号。
  • flag:标记。倒数第一位表示请求类型,0:请求;1:返回。倒数第二位,1:表示oneway。
  • remark:描述。
  • extFields:扩展属性。
  • customeHeader:每个请求对应的请求头信息。
  • byte[]body:消息体内容。

单条消息发送时,消息体的内容将保存在body中。批量消息发送,需要将多条消息体的内容存储在body中,如何存储方便服务端正确解析出每条消息呢?

下来梳理一下批量消息发送的核心流程。

public SendResult send(Collection<Message> msgs) throws MQClientException, 
        RemotingException, MQBrokerException, InterruptedException {
    return this.defaultMQProducerImpl.send(batch(msgs));
}

首先在消息发送端,调用batch方法,将一批消息封装成MessageBatch对象。MessageBatch继承自Message对象,MessageBatch内部持有Listmessages。这样的话,批量消息发送与单条消息发送的处理流程完全一样。MessageBatch只需要将该集合中的每条消息的消息体body聚合成一个byte[]数值,在消息服务端能够从该byte[]数值中正确解析出消息即可。

public byte[] encode() {
    return MessageDecoder.encodeMessages(messages);
}

在创建RemotingCommand对象时将调用messageBatch#encode()方法填充到RemotingCommand的body域中。

public static byte[] encodeMessage(Message message) {
    //only need flag, body, properties
    byte[] body = message.getBody();
    int bodyLen = body.length;
    String properties = messageProperties2String(message.getProperties());
    byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);
    //note properties length must not more than Short.MAX
    short propertiesLength = (short) propertiesBytes.length;
    int sysFlag = message.getFlag();
    int storeSize = 4 // 1 TOTALSIZE
        + 4 // 2 MAGICCOD
        + 4 // 3 BODYCRC
        + 4 // 4 FLAG
        + 4 + bodyLen // 4 BODY
        + 2 + propertiesLength;
    ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize);
    // 1 TOTALSIZE
    byteBuffer.putInt(storeSize);
    // 2 MAGICCODE
    byteBuffer.putInt(0);
    // 3 BODYCRC
    byteBuffer.putInt(0);

    // 4 FLAG
    int flag = message.getFlag();
    byteBuffer.putInt(flag);
    // 5 BODY
    byteBuffer.putInt(bodyLen);
    byteBuffer.put(body);
    // 6 properties
    byteBuffer.putShort(propertiesLength);
    byteBuffer.put(propertiesBytes);
    return byteBuffer.array();
}

在消息发送端将会按照上述结构进行解码,然后整个发送流程与单个消息发送没什么差异

Search

    微信好友

    博士的沙漏

    Table of Contents