Rocketmq源码消息存储

2022/01/16 RocketMQ

RocketMQ源码消息存储

Broker接收到消息请求以后,是如何将消息存储起来的?

目前的MQ中间件从存储模型来看,分为需要持久化和不需要持久化的两种模型,现在大多数的MQ都是支持持久化存储的,比如ActiveMQ、RabbitMQ、Kafka,RocketMQ,而ZeroMQ却不需要支持持久化存储。然而业务系统也大多需要MQ有持久存储的能力,能大大增加系统的高可用性。从存储方式和效率来看,文件系统高于KV存储,KV存储又高于关系型数据库,直接操作文件系统肯定是最快的,但可靠性却是最低的,而关系型数据库的性能和可靠性与文件系统恰恰相反

存储概要设计

RocketMQ主要存储的文件包括Comitlog文件、ConsumeQueue文件、IndexFile文件。RocketMQ将所有主题的消息存储在同一个文件中,确保消息发送时顺序写文件,尽最大的能力确保消息发送的高性能与高吞吐量。但由于消息中间件一般是基于消息主题的订阅机制,这样便给按照消息主题检索消息带来了极大的不便。 为了提高消息消费的效率,RocketMQ引入了ConsumeQueue消息队列文件,每个消息主题包含多个消息消费队列,每一个消息队列有一个消息文件。IndexFile索引文件,其主要设计理念就是为了加速消息的检索性能,根据消息的属性快速从Commitlog文件中检索消息。RocketMQ是一款高性能的消息中间件,存储部分的设计是核心,存储的核心是IO访问性能。

RocketMQ消息存储设计原理

  • CommitLog:消息存储文件,所有消息主题的消息都存储在CommitLog文件中。
  • ConsumeQueue:消息消费队列,消息到达CommitLog文件后,将异步转发到消息消费队列,供消息消费者消费。
  • IndexFile:消息索引文件,主要存储消息Key与Offset的对应关系。
  • 事务状态服务:存储每条消息的事务状态。
  • 定时消息服务:每一个延迟级别对应一个消息消费队列,存储延迟队列的消息拉取进度。

消息发送处理

Broker服务器接收到消息请求时,通过请求码找到SendMessageProcessor处理器,SendMessageProcessor处理器通过processRequest方法处理消息请求:

//代码位置:org.apache.rocketmq.broker.processor.SendMessageProcessor#processRequest
public RemotingCommand processRequest(ChannelHandlerContext ctx,
                                          RemotingCommand request) throws RemotingCommandException {
        RemotingCommand response = null;
        try {
            response = asyncProcessRequest(ctx, request).get();
        } catch (InterruptedException | ExecutionException e) {
            log.error("process SendMessage error, request : " + request.toString(), e);
        }
        return response;
}

processRequest方法调用asyncProcessRequest方法处理请求,asyncProcessRequest方法是异步方法,通过get方法阻塞获取到最后的结果。asyncProcessRequest的代码如下:

//代码位置:org.apache.rocketmq.broker.processor.SendMessageProcessor#asyncProcessRequest
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
                                                                  RemotingCommand request) throws RemotingCommandException {
        final SendMessageContext mqtraceContext;
        switch (request.getCode()) {
            //消费者发送返回消息
            case RequestCode.CONSUMER_SEND_MSG_BACK:
                return this.asyncConsumerSendMsgBack(ctx, request);
            default:
                //解析发送消息请求头
                SendMessageRequestHeader requestHeader = parseRequestHeader(request);
                if (requestHeader == null) {
                    return CompletableFuture.completedFuture(null);
                }
                //发送消息实体
                mqtraceContext = buildMsgContext(ctx, requestHeader);
                this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
                //是否批量发送
                if (requestHeader.isBatch()) {
                    return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
                } else {
                    return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
                }
        }
}

asyncProcessRequest方法逻辑比较简单,通过请求码选择不同的消息请求逻辑,当请求码是CONSUMER_SEND_MSG_BACK时,处理消费者发送返回的消息,默认处理生产者发送的消息请求,首先解析消息发送请求头,如果消息请求头为null,就直接返回处理结果了,然后创建消息内容实体,最后根据消是否是批量。调用不同方法处理消息请求。当是批量消息发送时,调用asyncSendBatchMessage方法处理批量消息,否则调用asyncSendMessage方法处理。

我们解析来看看parseRequestHeader方法,看方法的名字就知道是解析消息请求头,parseRequestHeader方法如下:

//代码位置:org.apache.rocketmq.broker.processor.SendMessageProcessor#parseRequestHeader
protected SendMessageRequestHeader parseRequestHeader(RemotingCommand request)
        throws RemotingCommandException {

        //发送消息请求头,版本2
        SendMessageRequestHeaderV2 requestHeaderV2 = null;
        //发送消息请求头,版本1
        SendMessageRequestHeader requestHeader = null;
        switch (request.getCode()) {
                //批量消息,发送消息版本2
            case RequestCode.SEND_BATCH_MESSAGE:
            case RequestCode.SEND_MESSAGE_V2:
                requestHeaderV2 =
                    (SendMessageRequestHeaderV2) request
                        .decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
                //普通消息发送
            case RequestCode.SEND_MESSAGE:
                if (null == requestHeaderV2) {
                    requestHeader =
                        (SendMessageRequestHeader) request
                            .decodeCommandCustomHeader(SendMessageRequestHeader.class);
                } else {
                    requestHeader = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(requestHeaderV2);
                }
            default:
                break;
        }
        return requestHeader;
}

RocketMQ为了发送消息的性能,除了会对消息进行压缩外,还会将消息转换为更轻量的消息进行发送,更轻量的消息就是将原来的消息换成不利于阅读的格式,这样消息就会变得更小,传输起来就更快。

parseRequestHeader方法通过请求码的类型解析消息发送请求头,如果是批量消息和更轻量的消息,就解析为SendMessageRequestHeaderV2,否则就解析为SendMessageRequestHeader。

接下来,我们将会从下面两个方面继续分析消除的存储:

  • 单笔消息存储处理
  • 批量消息存储处理

单笔消息存储处理

//代码位置:org.apache.rocketmq.broker.processor.SendMessageProcessor#asyncSendMessage
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                                SendMessageContext mqtraceContext,
                                                                SendMessageRequestHeader requestHeader) {

        //创建响应结果
        final RemotingCommand response = preSend(ctx, request, requestHeader);
        //发送消息响应头
        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();

        if (response.getCode() != -1) {
            return CompletableFuture.completedFuture(response);
        }

        //消息体
        final byte[] body = request.getBody();

        //队列id
        int queueIdInt = requestHeader.getQueueId();

        //根据topic获取topic配置
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());

        //队列小于0,随机给一个
        if (queueIdInt < 0) {
            queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
        }

        //消息
        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        //省略代码:为msgInner赋值

        CompletableFuture<PutMessageResult> putMessageResult = null;
        Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
        String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
        if (transFlag != null && Boolean.parseBoolean(transFlag)) {
            //如果拒绝事务消息
            if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark(
                        "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                                + "] sending transaction message is forbidden");
                return CompletableFuture.completedFuture(response);
            }
            //处理事务消息
            putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
        } else {
            //处理普通消息
            putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
        }
        return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}

asyncSendMessage方法先创建发送消息响应头,从发送消息请求头获取消息内容,创建msgInner消息并设置消息的属性。如果消息是事务消息,则进行事务消息的处理,否则,进行普通消息的处理。接下来,我们先分析下普通消息的处理,然后在分析事务消息的处理。

普通消息存储

以消息发送存储为突破点,一点一点揭开RocketMQ存储设计的神秘面纱。消息存储入口:org.apache.rocketmq.store.DefaultMessageStore#putMessage。

//代码位置:org.apache.rocketmq.store.DefaultMessageStore#putMessage
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
        //检查store状态
        PutMessageStatus checkStoreStatus = this.checkStoreStatus();
        if (checkStoreStatus != PutMessageStatus.PUT_OK) {
            return new PutMessageResult(checkStoreStatus, null);
        }

        //消息是否合法
        PutMessageStatus msgCheckStatus = this.checkMessage(msg);
        if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
            return new PutMessageResult(msgCheckStatus, null);
        }

        long beginTime = this.getSystemClock().now();
        //保存消息
        PutMessageResult result = this.commitLog.putMessage(msg);
        long elapsedTime = this.getSystemClock().now() - beginTime;
        if (elapsedTime > 500) {
            log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
        }

        //统计保存消息花费的时间
        this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

        if (null == result || !result.isOk()) {
            //统计保存消息失败次数
            this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
        }

        return result;
}

采用DefaultMessageStore类的putMessage方法进行普通消息的存储。

  • checkStoreStatus方法先检查store服务器的状态,校验服务器的状态是否已经关闭、Broker的角色是否是slave,是否不可写以及是否繁忙?如果满足上述其中之一的条件,那么说明store服务器的状态都不是正常的。
  • 校验完store服务器的状态以后,校验消息是否正常,checkMessage方法主要是判断topic的长度不能超过topic最大的长度127、消息的Properties属性不能超过Properties最大的长度32767。
  • 消息正常以后就可以调用commitLog的putMessage方法保存消息了,最后统计保存消息花费的时间和统计保存消息失败次数。

如果日志中包含“message store is not writeable,so putMessage is forbidden”,出现这种日志最有可能是磁盘空间不足,在写ConsumeQueue、IndexFile文件出现错误时会拒绝消息再次写入。

接下来深入commitLog的putMessage的方法,看看是如何保存消息的。

//代码位置:org.apache.rocketmq.store.CommitLog#putMessage
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {

        //省略代码

        long elapsedTimeInLock = 0;

        MappedFile unlockMappedFile = null;
        //获取最后一个MappedFile
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

        //可重入锁
        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
        try {
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            this.beginTimeInLock = beginLockTimestamp;

            // Here settings are stored timestamp, in order to ensure an orderly
            // global
            msg.setStoreTimestamp(beginLockTimestamp);

            if (null == mappedFile || mappedFile.isFull()) {
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
            }
            if (null == mappedFile) {
                log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
            }

            //保存消息
            result = mappedFile.appendMessage(msg, this.appendMessageCallback);
            //根据保存消息的状态,做不同的处理
            switch (result.getStatus()) {
                case PUT_OK:
                    break;
                //如果mappedFile文件已经写满了,那么就在创建一个新的,继续写
                case END_OF_FILE:
                    unlockMappedFile = mappedFile;
                    // Create a new file, re-write the message
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                    if (null == mappedFile) {
                        // XXX: warn and notify me
                        log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                        beginTimeInLock = 0;
                        return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                    }
                    result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                    break;
                case MESSAGE_SIZE_EXCEEDED:
                case PROPERTIES_SIZE_EXCEEDED:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
                case UNKNOWN_ERROR:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                default:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
            }

            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
            beginTimeInLock = 0;
        } finally {
            putMessageLock.unlock();
        }

        //省略代码

        return putMessageResult;
}

上述代码省略了一些对此次分析不太重要的代码,不影响此次的分析。首先获取当前可以写入的Commitlog文件。

Commitlog文件存储目录为${ROCKET_HOME}/store/commitlog目录,每一个文件默认1G,一个文件写满后再创建另外一个,以该文件中第一个偏移量为文件名,偏移量小于20位用0补齐。例如第一个文件初始偏移量为0,第二个文件的1073741824,代表该文件中的第一条消息的物理偏移量为1073741824,这样根据物理偏移量能快速定位到消息。MappedFileQueue可以看作是${ROCKET_HOME}/store/commitlog文件夹,而MappedFile则对应该文件夹下一个个的文件。

获取最后一个MappedFile文件,MappedFile文件是用来保存消息的。为了避免消息在写入的过程中被其他线程访问,用可重入锁进行上锁,先申请putMessageLock,也就是将消息存储到CommitLog文件中是串行的。当获取MappedFile文件不存在或者已经写满了,就新创建一个MappedFile文件。然后通过MappedFile的appendMessage方法将消息保存起来得到保存消息的结果,根据保存消息状态设置保存消息的结果,这里有一个状态需要注意下,当状态时END_OF_FILE时,说明MappedFile文件已经写满了,所以新建MappedFile文件,再通过MappedFile的appendMessage的方法保存消息。

设置消息的存储时间,如果mappedFile为空,表明${ROCKET_HOME}/store/commitlog目录下不存在任何文件,说明本次消息是第一次消息发送,用偏移量0创建第一个commit文件,文件为00000000000000000000,如果文件创建失败,抛出CREATE_MAPEDFILE_FAILED,很有可能是磁盘空间不足或权限不够。

我们先不分析MappedFile的appendMessage的方法,先分析下CommitLog和MappedFile是什么?

CommitLog可以看成是Broker服务器在逻辑上对应的一个大文件,一个Broker服务器对应一个CommitLog文件,Broker服务接收到的所有消息都通过CommitLog类进行存储,CommitLog类有一个MappedFileQueue,MappedFileQueue队列存储着多个MappedFile文件,每个MappedFile文件的默认大小为1G ,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。(这句话从官方文档copy的)。MappedFile类就是实际存储消息的对应实体,采用内存映射技术来提高文件的访问速度与写入速度,以追加写的方式写入MappedFile文件。

在写入消息之前,首先要获取MappedFile文件,才能知道将消息写到哪个MappedFile文件中,putMessage方法在写入消息到MappedFile文件中,调用getLastMappedFile方法获取最后一个MappedFile文件,让我们看看getLastMappedFile方法:

//代码位置:org.apache.rocketmq.store.MappedFileQueue#getLastMappedFile
public MappedFile getLastMappedFile() {
        MappedFile mappedFileLast = null;

        while (!this.mappedFiles.isEmpty()) {
            try {
                mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
                break;
            } catch (IndexOutOfBoundsException e) {
                //continue;
            } catch (Exception e) {
                log.error("getLastMappedFile has exception.", e);
                break;
            }
        }

        return mappedFileLast;
}

mappedFiles是CopyOnWriteArrayList数组,保存着MappedFile,getLastMappedFile方法获取mappedFiles数组的最后一个MappedFile返回。如果获取的MappedFile文件为null或者已经写满,putMessage方法会重新创建一个新的MappedFile文件返回。我们看看MappedFile类:

public class MappedFile extends ReferenceResource {
    //系统page大小,4M
    public static final int OS_PAGE_SIZE = 1024 * 4;
    protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

    //总的mapped虚拟内存
    private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);

    //总的mapped文件数
    private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
    //写指针
    protected final AtomicInteger wrotePosition = new AtomicInteger(0);
    //提交指针
    protected final AtomicInteger committedPosition = new AtomicInteger(0);
    //刷盘指针
    private final AtomicInteger flushedPosition = new AtomicInteger(0);
    //文件大小
    protected int fileSize;
    protected FileChannel fileChannel;
    /**
     * Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
     */
    //写缓存,消息会先写到这里,reput方法会将写到文件中去
    protected ByteBuffer writeBuffer = null;
    //临时的存储池
    protected TransientStorePool transientStorePool = null;
    //文件名字
    private String fileName;
    private long fileFromOffset;
    //文件
    private File file;
    //
    private MappedByteBuffer mappedByteBuffer;
    //存储时间戳
    private volatile long storeTimestamp = 0;
    //第一次在队列中创建
    private boolean firstCreateInQueue = false;

    //省略代码
}

MappedFile的有几个重要的属性,上面代码已经为这些属性已经注释了,如写指针、读指针、刷盘指针、文件大小、文件名、写缓存等属性。

保存消息

获取到MappedFile文件以后,就可以调用appendMessage方法进行消息的存储,appendMessage方法又会调用appendMessagesInner方法真正进行消息的保存:

//代码位置:org.apache.rocketmq.store.MappedFile.appendMessagesInner
 public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
        assert messageExt != null;
        assert cb != null;

        //当前写指针
        int currentPos = this.wrotePosition.get();

        //写指针小于文件大小
        if (currentPos < this.fileSize) {
            ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
            byteBuffer.position(currentPos);
            AppendMessageResult result;
            //broker接收到的消息
            if (messageExt instanceof MessageExtBrokerInner) {
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
            } else if (messageExt instanceof MessageExtBatch) {
                //批量消息
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
            } else {
                return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
            }
            //增加写指针
            this.wrotePosition.addAndGet(result.getWroteBytes());
            //存储的时间
            this.storeTimestamp = result.getStoreTimestamp();
            return result;
        }
        log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
 }

appendMessagesInner方法有两个参数,MessageExt是消息,AppendMessageCallback是追加消息回调。appendMessagesInner方法首先获取当前写指针,如果currentPos大于或等于文件大小则表明文件已写满,抛出AppendMessageStatus.UNKNOWN_ERROR。如果currentPos小于文件大小,通过slice()方法创建一个与MappedFile的共享内存区,并设置position为当前指针。

当写指针小于文件大小,判断messageExt是Broker接收到的消息还是批量消息,然后调用AppendMessageCallback的doAppend方法进行消息的追加,最后增加写指针。

接下来我们分析下doAppend方法是如何将消息追加到文件中:

long wroteOffset = fileFromOffset + byteBuffer.position();
this.resetByteBuffer(hostHolder, 8);
String msgId = MessageDecoder.createMessageId(this.msgIdMemory, 
    msgInner.getStoreHostBytes(hostHolder), wroteOffset);

创建全局唯一消息ID,消息ID有16字节,消息ID组成= 4字节IP+4字节端口号+8字节消息偏移量。 但为了消息ID可读性,返回给应用程序的msgId为字符类型,可以通过UtilAll.bytes2string方法将msgId字节数组转换成字符串,通过UtilAll.string2bytes方法将msgId字符串还原成16个字节的字节数组,从而根据提取消息偏移量,可以快速通过msgId找到消息内容。

keyBuilder.setLength(0);
keyBuilder.append(msgInner.getTopic());
keyBuilder.append('-');
keyBuilder.append(msgInner.getQueueId());
String key = keyBuilder.toString();
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
if (null == queueOffset) {
    queueOffset = 0L;
    CommitLog.this.topicQueueTable.put(key, queueOffset);
}

获取该消息在消息队列的偏移量。CommitLog中保存了当前所有消息队列的当前待写入偏移量。

//代码省略:org.apache.rocketmq.store.CommitLog#doAppend
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,

            //代码省略
            /**
             * Serialize message
             */
            //序列化消息

            //消息的properties数据
            final byte[] propertiesData =
                msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);

            //properties长度
            final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;

            //消息properties太大
            if (propertiesLength > Short.MAX_VALUE) {
                log.warn("putMessage message properties length too long. length={}", propertiesData.length);
                return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
            }

            //topic数据和长度
            final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
            final int topicLength = topicData.length;

            //消息体的长度
            final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;

            //消息的长度
            final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);

            // Exceeds the maximum message 大于最大的消息长度4M
            if (msgLen > this.maxMessageSize) {
                CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
                    + ", maxMessageSize: " + this.maxMessageSize);
                return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
            }

            // Determines whether there is sufficient free space
            //确定是否有足够的空间
            if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
                this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
                // 1 TOTALSIZE
                this.msgStoreItemMemory.putInt(maxBlank);
                // 2 MAGICCODE
                this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
                // 3 The remaining space may be any value
                // Here the length of the specially set maxBlank
                final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
                //先写到byteBuffer缓存起来
                byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
                return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
                    queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
            }

            // Initialization of storage space
            //初始化内存
            this.resetByteBuffer(msgStoreItemMemory, msgLen);
             /**
             将消息添加到内存中
             **/
            // 1 TOTALSIZE
            this.msgStoreItemMemory.putInt(msgLen);
            // 2 MAGICCODE
            this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
            // 3 BODYCRC
            this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
            // 4 QUEUEID
            this.msgStoreItemMemory.putInt(msgInner.getQueueId());
            // 5 FLAG
            this.msgStoreItemMemory.putInt(msgInner.getFlag());
            // 6 QUEUEOFFSET
            this.msgStoreItemMemory.putLong(queueOffset);
            // 7 PHYSICALOFFSET
            this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
            // 8 SYSFLAG
            this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
            // 9 BORNTIMESTAMP
            this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
            // 10 BORNHOST
            this.resetByteBuffer(bornHostHolder, bornHostLength);
            this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
            // 11 STORETIMESTAMP
            this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
            // 12 STOREHOSTADDRESS
            this.resetByteBuffer(storeHostHolder, storeHostLength);
            this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
            // 13 RECONSUMETIMES
            this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
            // 14 Prepared Transaction Offset
            this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
            // 15 BODY
            this.msgStoreItemMemory.putInt(bodyLength);
            if (bodyLength > 0)
                this.msgStoreItemMemory.put(msgInner.getBody());
            // 16 TOPIC
            this.msgStoreItemMemory.put((byte) topicLength);
            this.msgStoreItemMemory.put(topicData);
            // 17 PROPERTIES
            this.msgStoreItemMemory.putShort((short) propertiesLength);
            if (propertiesLength > 0)
                this.msgStoreItemMemory.put(propertiesData);

            final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
            // Write messages to the queue buffer
             //将消息写到byteBuffer中
            byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);

            AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
                msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);

            switch (tranType) {
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    break;
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    // The next update ConsumeQueue information
                    CommitLog.this.topicQueueTable.put(key, ++queueOffset);
                    break;
                default:
                    break;
            }
            return result;
 }

上述将doAppend方法中一些代码省略掉,但是不影响主要流程,将消息的properties数据、topic数据转换为 byte数组,并计算properties数据的长度、topic数据的长度以及消息的长度。然后判断消息大小是否大于消息最大大小,如果是直接返回超过消息大小的结果。 接着确定是否有足够的空间存下消息,有充足的空间,则将消息写到byteBuffer缓存起来,返回文件已经写满的结果。如果没有充足的空间,初始化足够的空间,并将消息加入到msgStoreItemMemory中,最后将消息写入到byteBuffer中。doAppend方法并没有将消息直接写入到文件中,而是将消息写入到byteBuffer中缓存起来。

CommitLog#calMsgLength

private static int calMsgLength(int bodyLength, int topicLength, int 
        propertiesLength) {
    final int msgLen = 4 //TOTALSIZE
            + 4 //MAGICCODE
            + 4 //BODYCRC
            + 4 //QUEUEID
            + 4 //FLAG
            + 8 //QUEUEOFFSET
            + 8 //PHYSICALOFFSET
            + 4 //SYSFLAG
            + 8 //BORNTIMESTAMP
            + 8 //BORNHOST
            + 8 //STORETIMESTAMP
            + 8 //STOREHOSTADDRESS
            + 4 //RECONSUMETIMES
            + 8 //Prepared Transaction Offset
            + 4 + (bodyLength > 0 ? bodyLength : 0) //BODY
            + 1 + topicLength //TOPIC
            + 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength
            + 0;
        return msgLen;
}

根据消息体的长度、主题的长度、属性的长度结合消息存储格式计算消息的总长度。

RocketMQ消息存储格式如下。

  • TOTALSIZE:该消息条目总长度,4字节。
  • MAGICCODE:魔数,4字节。固定值0xdaa320a7。
  • BODYCRC:消息体crc校验码,4字节。
  • QUEUEID:消息消费队列ID,4字节。
  • FLAG:消息FLAG,RocketMQ不做处理,供应用程序使用,默认4字节。
  • QUEUEOFFSET:消息在消息消费队列的偏移量,8字节。
  • PHYSICALOFFSET:消息在CommitLog文件中的偏移量,8字节。
  • SYSFLAG:消息系统Flag,例如是否压缩、是否是事务消息等,4字节。
  • BORNTIMESTAMP:消息生产者调用消息发送API的时间戳,8字节。
  • BORNHOST:消息发送者IP、端口号,8字节。
  • STORETIMESTAMP:消息存储时间戳,8字节。
  • STOREHOSTADDRESS:Broker服务器IP+端口号,8字节。
  • RECONSUMETIMES:消息重试次数,4字节。
  • Prepared Transaction Offset:事务消息物理偏移量,8字节。
  • BodyLength:消息体长度,4字节。
  • Body:消息体内容,长度为bodyLenth中存储的值。
  • TopicLength:主题存储长度,1字节,表示主题名称不能超过255个字符。
  • Topic:主题,长度为TopicLength中存储的值。
  • PropertiesLength:消息属性长度,2字节,表示消息属性长度不能超过65536个字符。
  • Properties:消息属性,长度为PropertiesLength中存储的值。 上述表示CommitLog条目是不定长的,每一个条目的长度存储在前4个字节中。

如果消息长度+END_FILE_MIN_BLANK_LENGTH大于CommitLog文件的空闲空间,则返回AppendMessageStatus.END_OF_FILE,Broker会重新创建一个新的CommitLog文件来存储该消息。从这里可以看出,每个CommitLog文件最少会空闲8个字节,高4字节存储当前文件剩余空间,低4字节存储魔数:CommitLog.BLANK_MAGIC_CODE。 将消息内容存储到ByteBuffer中,然后创建AppendMessageResult。这里只是将消息存储在MappedFile对应的内存映射Buffer中,并没有刷写到磁盘。

下面我们来一一介绍下AppendMessageResult的属性。

  • AppendMessageStatus status:消息追加结果,取值PUT_OK:追加成功;END_OF_FILE:超过文件大小;MESSAGE_SIZE_EXCEEDED:消息长度超过最大允许长度:PROPERTIES_SIZE_EXCEEDED:消息属性超过最大允许长度;UNKNOWN_ERROR:未知异常。
  • long wroteOffset:消息的物理偏移量。
  • String msgId:消息ID。
  • long storeTimestamp:消息存储时间戳。
  • long logicsOffset:消息消费队列逻辑偏移量,类似于数组下标。
  • long pagecacheRT=0:当前未使用。
  • int msgNum=1:消息条数,批量消息发送时消息条数。

DefaultAppendMessageCallback#doAppend只是将消息追加在内存中,需要根据是同步刷盘还是异步刷盘方式,将内存中的数据持久化到磁盘。

处理事务消息

当Broker接收到事务消息以后,根据消息的类型将消息交给TransactionalMessageService类的asyncPrepareMessage方法处理,这个asyncPrepareMessage的具体由TransactionalMessageServiceImpl实现。如下所示:

//代码位置:org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#asyncPrepareMessage
public CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner) {
        return transactionalMessageBridge.asyncPutHalfMessage(messageInner);
}

asyncPrepareMessage方法调用asyncPutHalfMessage方法,如下:

//代码位置:org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#asyncPutHalfMessage
public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {
        return store.asyncPutMessage(parseHalfMessageInner(messageInner));
}

asyncPutHalfMessage方法首先调用parseHalfMessageInner方法先处理下半消息,处理完以后调用store的asyncPutMessage方法保存消息。我们先看看parseHalfMessageInner方法是如何处理半消息的。(RocketMQ事务消息不熟悉的可以参考阅读《RocketMQ源码分析之事务消息分析》)

//代码位置:org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#parseHalfMessageInner
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {

        //将真实的topic和queueId保存在Property中
        //REAL_TOPIC
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
        //REAL_QID
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
            String.valueOf(msgInner.getQueueId()));
        msgInner.setSysFlag(
            MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
        //重新设置topic,设置的topic为RMQ_SYS_TRANS_HALF_TOPIC
        msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
        //设置queueId为0
        msgInner.setQueueId(0);
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
        return msgInner;
}

parseHalfMessageInner方法首先将消息真实的topic和queueId存放到Property中,并重新设置topic为RMQ_SYS_TRANS_HALF_TOPIC,queueId为0,这样保存起来的消息就不能被消费者消费了。我们返回到asyncPutMessage方法中,asyncPutMessage方法最终也是会调用putMessage存储消息的,putMessage方法就是上述讲述的方法,跟上面普通消息处理是一样的,共用putMessage方法。

总结

Broker接收到消息时,会将消息交给SendMessageProcessor处理器处理,SendMessageProcessor处理器首先解析发送消息请求头,根据消息的类型交给不同方法处理消息。CommitLog类具体处理消息,首先获取最后一个MappedFile文件,然后将消息以追加的方式写入到MappedFile中的byteBuffer缓存起来。

RocketMQ源码Broker存储批量消息

我们来分析如何存储批量消息。

当Broker收到消息时,将会将消息交给SendMessageProcessor处理器处理,SendMessageProcessor处理器根据消息的类型处理不同的消息,当消息是批量消息时,会将消息交给asyncSendBatchMessage方法进行处理。asyncSendBatchMessage方法如下:

//代码位置:org.apache.rocketmq.broker.processor.SendMessageProcessor#asyncSendBatchMessage
private CompletableFuture<RemotingCommand> asyncSendBatchMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                                     SendMessageContext mqtraceContext,
                                                                     SendMessageRequestHeader requestHeader) {
        //创建消息响应
        final RemotingCommand response = preSend(ctx, request, requestHeader);
        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();

        if (response.getCode() != -1) {
            return CompletableFuture.completedFuture(response);
        }

        int queueIdInt = requestHeader.getQueueId();
        //根据topic获取topic配置信息
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());

        if (queueIdInt < 0) {
            //随机一个写队列的id
            queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
        }

        //如果topic的长度大于127,则返回错误的响应
        if (requestHeader.getTopic().length() > Byte.MAX_VALUE) {
            response.setCode(ResponseCode.MESSAGE_ILLEGAL);
            response.setRemark("message topic length too long " + requestHeader.getTopic().length());
            return CompletableFuture.completedFuture(response);
        }

        //批量消息实体
        MessageExtBatch messageExtBatch = new MessageExtBatch();
        //设置topic和queueid
        messageExtBatch.setTopic(requestHeader.getTopic());
        messageExtBatch.setQueueId(queueIdInt);

        int sysFlag = requestHeader.getSysFlag();
        if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
            sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
        }
        messageExtBatch.setSysFlag(sysFlag);

        messageExtBatch.setFlag(requestHeader.getFlag());
        MessageAccessor.setProperties(messageExtBatch, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
        messageExtBatch.setBody(request.getBody());
        messageExtBatch.setBornTimestamp(requestHeader.getBornTimestamp());
        messageExtBatch.setBornHost(ctx.channel().remoteAddress());
        messageExtBatch.setStoreHost(this.getStoreHost());
        //消费次数
        messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
        String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
        MessageAccessor.putProperty(messageExtBatch, MessageConst.PROPERTY_CLUSTER, clusterName);

        //存储批量消息
        CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessages(messageExtBatch);
        //处理消息存储结果
        return handlePutMessageResultFuture(putMessageResult, response, request, messageExtBatch, responseHeader, mqtraceContext, ctx, queueIdInt);
}

asyncSendBatchMessage方法跟普通消息的处理流程差不多,创建消息响应,根据topic查找topic配置信息、创建消息内容实体。然后调用asyncPutMessages方法进行存储批量消息,当批量消息存储完以后,就处理批量消息存储的结果。我们来深入asyncPutMessages方法,asyncPutMessages方法会调用putMessages方法存储消息,putMessages方法如下:

//代码位置:org.apache.rocketmq.store.DefaultMessageStore#putMessages
public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {
        //检查store服务器的状态
        PutMessageStatus checkStoreStatus = this.checkStoreStatus();
        //如果store状态不为ok时,直接返回结果
        if (checkStoreStatus != PutMessageStatus.PUT_OK) {
            return new PutMessageResult(checkStoreStatus, null);
        }

        //消息检查
        PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch);
        if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
            return new PutMessageResult(msgCheckStatus, null);
        }

        //存储消息
        long beginTime = this.getSystemClock().now();
        PutMessageResult result = this.commitLog.putMessages(messageExtBatch);
        long elapsedTime = this.getSystemClock().now() - beginTime;
        if (elapsedTime > 500) {
            log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length);
        }

        //统计存储消息花费的时间
        this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

        //如果失败,统计失败的次数
        if (null == result || !result.isOk()) {
            this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
        }

        return result;
}

putMessages方法会首先检查store服务器的状态和对消息进行检查,这个跟普通消息的处理是一样的,就不深入checkStoreStatus和checkMessages方法了。然后调用commitLog的putMessages存储消息,最后统计存储消息所花费的时间以及存储消息失败的次数。接下来,我们继续深入putMessages方法是如何存储批量消息的:

public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) {

        //代码省略


        MappedFile unlockMappedFile = null;
        //获取最后一个mappedFile
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

        //fine-grained lock instead of the coarse-grained
        //消息编码类
        MessageExtBatchEncoder batchEncoder = batchEncoderThreadLocal.get();

        //设置已经编码好的消息
        messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));

        putMessageLock.lock();
        try {
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            this.beginTimeInLock = beginLockTimestamp;

            // Here settings are stored timestamp, in order to ensure an orderly
            // global
            messageExtBatch.setStoreTimestamp(beginLockTimestamp);

            //mappedFile为空或者已经写满,重新创建mappedFile
            if (null == mappedFile || mappedFile.isFull()) {
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
            }
            //如果mappedFile为null,则返回创建mappedFile失败的结果
            if (null == mappedFile) {
                log.error("Create mapped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
            }

            //存储消息
            result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback);
            //对存储消息的结果进行处理
            switch (result.getStatus()) {
                case PUT_OK:
                    break;
                case END_OF_FILE:
                    unlockMappedFile = mappedFile;
                    // Create a new file, re-write the message
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                    if (null == mappedFile) {
                        // XXX: warn and notify me
                        log.error("Create mapped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
                        beginTimeInLock = 0;
                        return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                    }
                    result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback);
                    break;
                case MESSAGE_SIZE_EXCEEDED:
                case PROPERTIES_SIZE_EXCEEDED:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
                case UNKNOWN_ERROR:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                default:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
            }

            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
            beginTimeInLock = 0;
        } finally {
            putMessageLock.unlock();
        }

        if (elapsedTimeInLock > 500) {
            log.warn("[NOTIFYME]putMessages in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, messageExtBatch.getBody().length, result);
        }

        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
        }

        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

        // Statistics
        //统计相关
        storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(result.getMsgNum());
        storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(result.getWroteBytes());

        //消息刷盘
        handleDiskFlush(result, putMessageResult, messageExtBatch);

        //消息同步
        handleHA(result, putMessageResult, messageExtBatch);

        return putMessageResult;
}

上述省略了一些无关紧要的代码,putMessages方法处理批量消息的存储与普通消息的存储的流程差不多,首先获取最后一个mappedFile,封装消息,这里的封装消息跟普通消息的存储有点不一样,就是这里专门用MessageExtBatchEncoder类处理批量消息,利用batchEncoder.encode对批量消息进行遍历写入内存。在批量消息写入之前,首先获取可重入锁,然后调用appendMessages方法对批量消息进行存储,当批量消息存储结果返回时,根据存储的结果状态返回不同处理结果。最后记录统计相关结果、消息进行刷盘、消息同步等。批量消息存储的流程跟普通消息存储的流程大体都相同。

appendMessages方法将消息追加到文件中,在批量消息存储和普通消息存储都用到了这个方法。将批量消息进行追加到文件中,跟普通消息追加到文件中,差别比较大的就是批量消息的追加是需要遍历追加文件的,看懂了普通消息的追加,批量消息的追加也是比较容易看懂的,批量消息的存储也是没有什么问题。

Search

    微信好友

    博士的沙漏

    Table of Contents