Rocketmq源码消息刷盘

2022/01/18 RocketMQ

RocketMQ消息刷盘

将消息写到缓存中,并没有真正将消息写到文件中。现在就来分析消息是如何刷盘的。

文件刷盘机制

RocketMQ的存储与读写是基于JDK NIO的内存映射机制(MappedByteBuffer)的,消息存储时首先将消息追加到内存,再根据配置的刷盘策略在不同时间进行刷写磁盘。如果是同步刷盘,消息追加到内存后,将同步调用MappedByteBuffer的force()方法;如果是异步刷盘,在消息追加到内存后立刻返回给消息发送端。RocketMQ使用一个单独的线程按照某一个设定的频率执行刷盘操作。通过在broker配置文件中配置flushDiskType来设定刷盘方式,可选值为ASYNC_FLUSH(异步刷盘)、SYNC_FLUSH(同步刷盘),默认为异步刷盘。本书默认以消息存储文件Commitlog文件刷盘机制为例来剖析RocketMQ的刷盘机制,ConsumeQueue、IndexFile刷盘的实现原理与Commitlog刷盘机制类似。RocketMQ处理刷盘的实现方法为Commitlog#handleDiskFlush()方法,刷盘流程作为消息发送、消息存储的子流程。值得注意的是索引文件的刷盘并不是采取定时刷盘机制,而是每更新一次索引文件就会将上一次的改动刷写到磁盘。

文件刷盘实现

在创建CommitLog对象的时候,会初始化刷盘服务:

//代码位置:org.apache.rocketmq.store.CommitLog
public CommitLog(final DefaultMessageStore defaultMessageStore) {
    //省略代码
    //同步刷盘
    if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        this.flushCommitLogService = new GroupCommitService();
    } else {
        //异步刷盘
        this.flushCommitLogService = new FlushRealTimeService();
    }
    //实时提交服务
    this.commitLogService = new CommitRealTimeService();

    //省略代码
}

CommitLog是RocketMQ存储消息的核心组件之一。在这里,CommitLog的构造函数传入DefaultMessageStore对象,根据DefaultMessageStore的配置决定使用同步刷盘还是异步刷盘。如果flushDiskType配置为SYNC_FLUSH(同步刷盘),则创建GroupCommitService对象作为flushCommitLogService;如果flushDiskType配置为ASYNC_FLUSH(异步刷盘),则创建FlushRealTimeService对象作为flushCommitLogService。

接下来,代码初始化了commitLogService,并将其设置为CommitRealTimeService对象,表示使用实时提交服务。

总体来说,这段代码的作用是根据消息存储系统的配置,创建适当的刷盘服务和提交服务,从而实现高效、可靠的消息存储服务。

RocketMQ刷盘方式

刷盘方式有同步和异步刷盘,RocketMQ默认的是异步刷盘,同步刷盘就是当消息追加到缓存以后,就立即进行刷盘到文件中保存起来,异步刷盘就是后台任务进行异步刷盘操作。

不管是同步刷盘还是异步刷盘都是继承了FlushCommitLogService类,GroupCommitService类用于同步刷盘,FlushRealTimeService用于异步刷盘,CommitRealTimeService用于将缓存的消息写到file channel。

在CommitLog启动的时候,会启动刷盘服务,代码如下:

//代码位置:org.apache.rocketmq.store.CommitLog#start
 public void start() {
        this.flushCommitLogService.start();

        //如果TransientStorePool开关打开,flush message to FileChannel at fixed periods
        if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            this.commitLogService.start();
        }
}

在putMessage方法中,当消息写到缓存以后,就会调用handleDiskFlush方法进行刷盘,handleDiskFlush方法如下:

//代码位置:org.apache.rocketmq.store.CommitLog#handleDiskFlush
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
        // Synchronization flush
        //同步刷盘
        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
            //等待消息存储成功
            if (messageExt.isWaitStoreMsgOK()) {
                //创建一个同步刷盘请求
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                //添加同步刷盘请求
                service.putRequest(request);
                CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
                PutMessageStatus flushStatus = null;
                try {
                    //阻塞获取刷盘状态
                    flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
                            TimeUnit.MILLISECONDS);
                } catch (InterruptedException | ExecutionException | TimeoutException e) {
                    //flushOK=false;
                }
                //刷盘的状态不成功
                if (flushStatus != PutMessageStatus.PUT_OK) {
                    log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                        + " client address: " + messageExt.getBornHostString());
                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                }
            } else {
                //唤醒刷盘服务
                service.wakeup();
            }
        }
        // Asynchronous flush
        else {
            //TransientStorePool开关不可用
            if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                //唤醒异步刷盘服务
                flushCommitLogService.wakeup();
            } else {
                //唤醒commitLogService服务
                commitLogService.wakeup();
            }
        }
 }

handleDiskFlush方法根据刷盘方式处理不同的逻辑:

  • 同步刷盘 如果消息存储到缓存中不成功,就调用wakeup方法唤醒同步刷盘服务,否则创建同步刷盘请求,putRequest方法添加到同步刷盘请求队列中,然后阻塞等待同步刷盘的状态,如果同步刷盘不成功,则打印等待刷盘的错误日志。
  • 异步刷盘 如果TransientStorePoolEnable开关不可用,则唤醒异步刷盘服务,否则唤醒提交服务。

同步刷盘实现流程如下。

  • 构建GroupCommitRequest同步任务并提交到GroupCommitRequest。
  • 等待同步刷盘任务完成,如果超时则返回刷盘错误,刷盘成功后正常返回给调用方。

下面让我们一一介绍GroupCommitRequest的核心属性。

  • long nextOffset:刷盘点偏移量。
  • CountDownLatch countDownLatch:倒记数锁存器。
  • flushOk:刷盘结果,初始为false。

消费发送线程将消息追加到内存映射文件后,将同步任务GroupCommitRequest提交到GroupCommitService线程,然后调用阻塞等待刷盘结果,超时时间默认为5s。

同步刷盘服务GroupCommitService

//刷盘请求写队列 同步刷盘任务暂存容器
private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
//刷盘请求读队列 GroupCommitService线程每次处理的request容器,这是一个设计亮点,避免了任务提交与任务执行的锁冲突。
private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();

GroupCommitService类有两个属性,requestsWrite是刷盘请求写队列,当刷盘请求过来时,将刷盘请求添加到requestsWrite,当需要刷盘时,遍历requestsRead的刷盘请求将进行刷盘。在上述handleDiskFlush方法中,调用putRequest方法就是将刷盘请求写到requestsWrite中,代码如下:

//代码位置:org.apache.rocketmq.store.CommitLog.GroupCommitService#putRequest
public synchronized void putRequest(final GroupCommitRequest request) {
     //添加到写请求list中
    synchronized (this.requestsWrite) {
        this.requestsWrite.add(request);
    }
    //如果更新成功,就唤醒waitPoint
    if (hasNotified.compareAndSet(false, true)) {
      waitPoint.countDown(); // notify
    }
}

putRequest方法用synchronized修饰requestsWrite,然后将同步刷盘请求写到requestsWrite中,如果更新通知标志成功,就唤醒GroupCommitService服务。当GroupCommitService服务被唤醒以后就会调用swapRequests方法,如下:

//代码位置:org.apache.rocketmq.store.CommitLog.GroupCommitService#swapRequests
private void swapRequests() {
   List<GroupCommitRequest> tmp = this.requestsWrite;
   this.requestsWrite = this.requestsRead;
   this.requestsRead = tmp;
}

swapRequests方法的作用就是将 requestsWrite 中的请求交换到 requestsRead中,这样就处理刷盘的时候,也可以接受刷盘请求,避免产生竞争。

GroupCommitService服务是一个线程,启动以后就会在后台不断调用doCommit方法进行刷盘,doCommit方法遍历requestsRead的同步刷盘请求进行处理:

//代码位置:org.apache.rocketmq.store.CommitLog.GroupCommitService#doCommit
private void doCommit() {
            //同步锁,防止刷盘请求写入队列与刷盘请求读队列互换
            synchronized (this.requestsRead) {
                if (!this.requestsRead.isEmpty()) {
                    for (GroupCommitRequest req : this.requestsRead) {
                        // There may be a message in the next file, so a maximum of
                        // two times the flush
                        boolean flushOK = false;
                        //两次刷盘,因为可能第一次刷盘并没有将消息全部刷进文件中,文件是固定大小的
                        for (int i = 0; i < 2 && !flushOK; i++) {

                            flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
                            //如果刷盘偏移量小于下一位移,说明还能继续刷盘
                            if (!flushOK) {
                                CommitLog.this.mappedFileQueue.flush(0);
                            }
                        }

                        req.wakeupCustomer(flushOK);
                    }

                    //设置检查点的写入时间
                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                    if (storeTimestamp > 0) {
                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                    }

                    //清空requestsRead
                    this.requestsRead.clear();
                } else {
                    // Because of individual messages is set to not sync flush, it
                    // will come to this process
                    CommitLog.this.mappedFileQueue.flush(0);
                }
            }
}

doCommit方法首先用synchronized对requestsRead进行同步锁上锁,防止同一时间其他线程操作requestsRead。如果requestsRead不为空,则遍历requestsRead,对同一同步刷盘请求,进行了两次刷盘,原因是可能第一次刷盘并没有把消息全部刷盘到文件中,文件是固定大小的,当文件差不多满了的时候,消息只能再一次刷到下一个文件中。调用方法getFlushedWhere判断当前刷盘偏移量是否大于等于该消息写入后的结束偏移量,如果否,则调用mappedFileQueue.flush方法进行刷盘,该方法等到分析完异步刷盘再一起分析。刷完盘以后,调用wakeupCustomer方法唤醒其他外部等待的线程。遍历完刷盘同步请求时,记录检查点的写入时间并清空requestsRead。

异步刷盘

CommitLog#handleDiskFlush

// Asynchronous flush
else {
if (!this.defaultMessageStore.getMessageStoreConfig().
        isTransientStorePoolEnable()) {
    flushCommitLogService.wakeup();
} else {
  commitLogService.wakeup();
}

异步刷盘根据是否开启transientStorePoolEnable机制,刷盘实现会有细微差别。如果transientStorePoolEnable为true,RocketMQ会单独申请一个与目标物理文件(commitlog)同样大小的堆外内存,该堆外内存将使用内存锁定,确保不会被置换到虚拟内存中去,消息首先追加到堆外内存,然后提交到与物理文件的内存映射内存中,再flush到磁盘。如果transientStorePoolEnable为flalse,消息直接追加到与物理文件直接映射的内存中,然后刷写到磁盘中。

  • 首先将消息直接追加到ByteBuffer(堆外内存DirectByteBuffer),wrotePosition随着消息的不断追加向后移动。
  • CommitRealTimeService线程默认每200ms将ByteBuffer新追加的内容(wrotePosition减去commitedPosition)的数据提交到MappedByteBuffer中。
  • MappedByteBuffer在内存中追加提交的内容,wrotePosition指针向前后移动,然后返回。
  • commit操作成功返回,将commitedPosition向前后移动本次提交的内容长度,此时wrotePosition指针依然可以向前推进。
  • FlushRealTimeService线程默认每500ms将MappedByteBuffer中新追加的内存(wrotePosition减去上一次刷写位置flushedPositiont)通过调用MappedByteBuffer#force()方法将数据刷写到磁盘。

RocketMQ默认的是异步刷盘,异步刷盘服务FlushRealTimeService启动以后就在run方法不停的运行,run方法如下:

//代码位置:org.apache.rocketmq.store.CommitLog.FlushRealTimeService#run
public void run() {
            CommitLog.log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                //是否固定时间周期进行刷盘
                boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();

                //刷盘的间隔
                int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
                //刷盘的最少页数
                int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();

                //最大刷盘间隔
                int flushPhysicQueueThoroughInterval =
                    CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();

                boolean printFlushProgress = false;

                // Print flush progress
                long currentTimeMillis = System.currentTimeMillis();
                //当前时间大于等于最后一次刷盘的时间加上最大刷盘间隔
                if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
                    //刷盘时间设置为最后一次刷盘时间
                    this.lastFlushTimestamp = currentTimeMillis;
                    //刷盘最少页数设置为0
                    flushPhysicQueueLeastPages = 0;
                    printFlushProgress = (printTimes++ % 10) == 0;
                }

                try {
                    //休眠
                    if (flushCommitLogTimed) {
                        Thread.sleep(interval);
                    } else {
                        this.waitForRunning(interval);
                    }

                    if (printFlushProgress) {
                        this.printFlushProgress();
                    }

                    //刷盘开始时间
                    long begin = System.currentTimeMillis();
                    //刷盘
                    CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                    //设置检查点的写入时间
                    if (storeTimestamp > 0) {
                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                    }
                    //刷盘花费时间
                    long past = System.currentTimeMillis() - begin;
                    if (past > 500) {
                        log.info("Flush data to disk costs {} ms", past);
                    }
                } catch (Throwable e) {
                    CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                    this.printFlushProgress();
                }
            }

            // Normal shutdown, to ensure that all the flush before exit
            //关闭刷盘服务之前,首先进行刷盘,防止消息丢失
            boolean result = false;
            for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
                result = CommitLog.this.mappedFileQueue.flush(0);
                CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
            }

            this.printFlushProgress();

            CommitLog.log.info(this.getServiceName() + " service end");
}

首先解释三个配置参数的含义。

  • commitIntervalCommitLog:CommitRealTimeService线程间隔时间,默认200ms。
  • commitCommitLogLeastPages:一次提交任务至少包含页数,如果待提交数据不足,小于该参数配置的值,将忽略本次提交任务,默认4页。
  • commitDataThoroughInterval:两次真实提交最大间隔,默认200ms。

解释四个配置参数的含义。

  • flushCommitLogTimed:默认为false,表示await方法等待;如果为true,表示使用Thread.sleep方法等待。
  • flushIntervalCommitLog:FlushRealTimeService线程任务运行间隔。
  • flushPhysicQueueLeastPages:一次刷写任务至少包含页数,如果待刷写数据不足,小于该参数配置的值,将忽略本次刷写任务,默认4页。
  • flushPhysicQueueThoroughInterval:两次真实刷写任务最大间隔,默认10s。

run方法首先获取flushCommitLogTimed(是否固定时间周期进行刷盘)、interval(刷盘的间隔)、flushPhysicQueueLeastPages(刷盘的最少页数)、flushPhysicQueueThoroughInterval(最大刷盘间隔)这些属性值,然后判断当前时间是否大于等于最后一次刷盘的时间加上最大刷盘间隔,如果是,则设置最后一次刷盘时间以及刷盘最少页数,在该条件下,必须进行一次刷盘操作。

接着根据flushCommitLogTimed判断休眠的方式,如果flushCommitLogTimed为true,则进行线程休眠,否则等待休眠。当休眠完以后,调用mappedFileQueue.flush方法进行刷盘,同步刷盘也是调用mappedFileQueue.flush方法进行刷盘。最后记录检查点的写入时间以及打印刷盘花费时间。

当刷盘服务被关闭之前,就是run方法中while循环不再进行时,要做最后的刷盘操作,防止消息尽量少丢失,默认刷盘次数为10次(RETRY_TIMES_OVER)。

MappedFile的刷盘

不管是同步刷盘还是异步刷盘,最终都是调用mappedFileQueue.flush方法进行刷盘,mappedFileQueue.flush方法如下:

//代码位置:org.apache.rocketmq.store.MappedFileQueue#flush
public boolean flush(final int flushLeastPages) {
        boolean result = true;
        //根据刷盘位移获取mappedFile
        MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
        if (mappedFile != null) {
            long tmpTimeStamp = mappedFile.getStoreTimestamp();
            //刷盘
            int offset = mappedFile.flush(flushLeastPages);
              //刷盘以后的位置
            long where = mappedFile.getFileFromOffset() + offset;
            //刷盘是否成功
            result = where == this.flushedWhere;
            this.flushedWhere = where;
            if (0 == flushLeastPages) {
                this.storeTimestamp = tmpTimeStamp;
            }
        }

        return result;
}

flush方法首先根据刷盘位移获取mappedFile,要知道将消息刷盘到哪一个MappedFile文件中,findMappedFileByOffset方法找到本次刷盘起始点的MappedFile文件。当获取的mappedFile文件不为null,则调用mappedFile.flush方法进行刷盘,刷盘盘以后更新刷盘以后的偏移量。我们接下来看看MappedFile文件是否进行刷盘的:

//代码位置:org.apache.rocketmq.store.MappedFile#flush
public int flush(final int flushLeastPages) {

        //如果可以刷盘
        if (this.isAbleToFlush(flushLeastPages)) {
            if (this.hold()) {
                int value = getReadPosition();

                try {
                    //We only append data to fileChannel or mappedByteBuffer, never both.
                    //如果writeBuffer不为空,则表明消息是先提交到writeBuffer中,已经从writeBuffer提交到fileChannel,直接调用fileChannel.force()
                    if (writeBuffer != null || this.fileChannel.position() != 0) {
                        this.fileChannel.force(false);
                    } else {
                        //消息是直接存储在文件内存映射缓冲区mappedByteBuffer中,直接调用它的force()即可
                        this.mappedByteBuffer.force();
                    }
                } catch (Throwable e) {
                    log.error("Error occurred when force data to disk.", e);
                }

                //设置刷盘的偏移量
                this.flushedPosition.set(value);
                this.release();
            } else {
                log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
                this.flushedPosition.set(getReadPosition());
            }
        }
        //返回刷盘偏移量
        return this.getFlushedPosition();
}

flush方法首先判断用isAbleToFlush方法判断是否可以刷盘,当文件没有写满、刷盘内容的长度小于文件剩余的长度、如果满足就可以进行刷盘。

如果writeBuffer不为空,则表明消息是先提交到writeBuffer中,已经从writeBuffer提交到fileChannel,直接调用fileChannel.force方法将消息刷盘。否则消息是直接存储在文件内存映射缓冲区mappedByteBuffer中,直接调用它的force方法进行刷盘。最后设置下刷盘的偏移量以及返回刷盘的偏移量。

CommitRealTimeService的作用

分析完同步刷盘和异步刷盘,接下来分析来CommitRealTimeService的作用,我们直接来看看CommitRealTimeService的run方法:

//代码位置:org.apache.rocketmq.store.CommitLog.CommitRealTimeService#run
public void run() {
            CommitLog.log.info(this.getServiceName() + " service started");
            while (!this.isStopped()) {
                //提交间隔
                int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();

                //最少一次提交的页数
                int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();

                //最大的提交间隔
                int commitDataThoroughInterval =
                    CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();

                //提交开始时间
                long begin = System.currentTimeMillis();
                //提交开始时间大于等最后一次提交时间加上最大的提交间隔
                if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
                    this.lastCommitTimestamp = begin;
                    commitDataLeastPages = 0;
                }

                try {
                    //提交
                    boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
                    long end = System.currentTimeMillis();
                    if (!result) {
                        this.lastCommitTimestamp = end; // result = false means some data committed.
                        //now wake up flush thread.
                        flushCommitLogService.wakeup();
                    }

                    //提交花费时间大于500毫秒就打印日志
                    if (end - begin > 500) {
                        log.info("Commit data to file costs {} ms", end - begin);
                    }
                    //等待interval
                    this.waitForRunning(interval);
                } catch (Throwable e) {
                    CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
                }
            }

            //提交服务停止后,需要刷盘,防止数据丢失
            boolean result = false;
            for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
                result = CommitLog.this.mappedFileQueue.commit(0);
                CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
            }
            CommitLog.log.info(this.getServiceName() + " service end");
        }
}

CommitRealTimeService的run方法有个while循环,当CommitRealTimeService启动以后没有停止,while循环就一直执行。while循环里面首先获取一些变量的值,如interval(提交间隔)、commitDataLeastPages(最少一次提交的页数)、commitDataThoroughInterval(最大的提交间隔)。当提交开始时间大于等最后一次提交时间加上最大的提交间隔时,设置最后提交的时间以及最少提交的页数为0,说明这时候最少也要提交一次了。然后调用mappedFileQueue.commit方法进行提交,提交完以后就等到interval毫秒以后再循环上述的流程。当CommitRealTimeService被停止时,默认进行十次刷盘,最大限度防止数据丢失。

接下来我们继续深入mappedFileQueue.commit方法:

//代码位置:org.apache.rocketmq.store.MappedFileQueue#commit
public boolean commit(final int commitLeastPages) {
        boolean result = true;
        //查找提交的mappedFile
        MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);
        if (mappedFile != null) {
            //提交
            int offset = mappedFile.commit(commitLeastPages);
            //提交偏移量
            long where = mappedFile.getFileFromOffset() + offset;
            result = where == this.committedWhere;
            //提交的偏移量
            this.committedWhere = where;
        }

        return result;
}

mappedFileQueue.commit查找MappedFile,才能知道哪个MappedFile需要提交,当查找的mappedFile不等于null时,通过mappedFile.commit方法提交,提交完以后设置提交的偏移量。

mappedFile.commit方法首先就是判断是否满足提交的条件,如果满足的话,将缓存writeBuffer的数据提交到fileChannel,然后当所有的缓存writeBuffer的所有数据都提交了,就清空缓存writeBuffer。mappedFile.commit的代码就不展示,想了解的可以深入理解下。

过期文件删除机制

由于RocketMQ操作CommitLog、ConsumeQueue文件是基于内存映射机制并在启动的时候会加载commitlog、ConsumeQueue目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久存储在消息服务器上,所以需要引入一种机制来删除已过期的文件。 RocketMQ顺序写Commitlog文件、ConsumeQueue文件,所有写操作全部落在最后一个CommitLog或ConsumeQueue文件上,之前的文件在下一个文件创建后将不会再被更新。RocketMQ清除过期文件的方法是:如果非当前写文件在一定时间间隔内没有再次被更新,则认为是过期文件,可以被删除,RocketMQ不会关注这个文件上的消息是否全部被消费。 默认每个文件的过期时间为72小时,通过在Broker配置文件中设置fileReservedTime来改变过期时间,单位为小时。

接下来详细分析RocketMQ是如何设计与实现上述机制的。

private void addScheduleTask() {
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        public void run() {
            DefaultMessageStore.this.cleanFilesPeriodically();
        }
    }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), 
        TimeUnit.MILLISECONDS);
// ... 省略其他定时任务
}

RocketMQ会每隔10s调度一次cleanFilesPeriodically,检测是否需要清除过期文件。执行频率可以通过设置cleanResourceInterval,默认为10s。

private void cleanFilesPeriodically() {
    this.cleanCommitLogService.run();
    this.cleanConsumeQueueService.run();
}

分别执行清除消息存储文件(Commitlog文件)与消息消费队列文件(ConsumeQueue文件)。由于消息消费队列文件与消息存储文件(Commitlog)共用一套过期文件删除机制,本书将重点讲解消息存储过期文件删除。实现方法:DefaultMessageStore$CleanCommitLogService#deleteExpiredFiles。

long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().
            getFileReservedTime();
int deletePhysicFilesInterval = DefaultMessageStore.this.
            getMessageStoreConfig().getDeleteCommitLogFilesInterval();
int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.
        getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();

解释一下这个三个配置属性的含义。

  • fileReservedTime:文件保留时间,也就是从最后一次更新时间到现在,如果超过了该时间,则认为是过期文件,可以被删除。
  • deletePhysicFilesInterval:删除物理文件的间隔,因为在一次清除过程中,可能需要被删除的文件不止一个,该值指定两次删除文件的间隔时间。
  • destroyMapedFileIntervalForcibly:在清除过期文件时,如果该文件被其他线程所占用(引用次数大于0,比如读取消息),此时会阻止此次删除任务,同时在第一次试图删除该文件时记录当前时间戳,destroyMapedFileIntervalForcibly表示第一次拒绝删除之后能保留的最大时间,在此时间内,同样可以被拒绝删除,同时会将引用减少1000个,超过该时间间隔后,文件将被强制删除。
boolean timeup = this.isTimeToDelete(); 
boolean spacefull = this.isSpaceToDelete(); 
boolean manualDelete = this.manualDeleteFileSeveralTimes > 0; 
if (timeup || spacefull || manualDelete) { 
    //继续执行删除逻辑 
    return;
} else { 
    // 本次删除任务无作为。 
}

RocketMQ在如下三种情况任意之一满足的情况下将继续执行删除文件操作。

  • 指定删除文件的时间点,RocketMQ通过deleteWhen设置一天的固定时间执行一次删除过期文件操作,默认为凌晨4点。
  • 磁盘空间是否充足,如果磁盘空间不充足,则返回true,表示应该触发过期文件删除操作。
  • 预留,手工触发,可以通过调用excuteDeleteFilesManualy方法手工触发过期文件删除,目前RocketMQ暂未封装手工触发文件删除的命令。

DefaultMessageStore$CleanCommitLogService#isSpaceToDelete

private boolean isSpaceToDelete() {
    double ratio = DefaultMessageStore.this.getMessageStoreConfig().
            getDiskMaxUsedSpaceRatio() / 100.0;
    cleanImmediately = false;
    String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().
            getStorePathCommitLog();
    double physicRatio=UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
    if (physicRatio > diskSpaceWarningLevelRatio) {
        boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
        // 省略日志输出语句
        cleanImmediately = true;
    } else if (physicRatio > diskSpaceCleanForciblyRatio) {
        cleanImmediately = true;
    } else {
        boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
        // 省略日志输出语句
    }
        if (physicRatio < 0 || physicRatio > ratio) {
            return true;
    } // 后面省略对ConsumeQueue做同样的判断
            return fasle;
}

首先解释一下几个参数的含义。

  • diskMaxUsedSpaceRatio:表示commitlog、consumequeue文件所在磁盘分区的最大使用量,如果超过该值,则需要立即清除过期文件。
  • cleanImmediately:表示是否需要立即执行清除过期文件操作。
  • physicRatio:当前commitlog目录所在的磁盘分区的磁盘使用率,通过File#getTotalSpace()获取文件所在磁盘分区的总容量,通过File#getFreeSpace()获取文件所在磁盘分区剩余容量。
  • diskSpaceWarningLevelRatio:通过系统参数-Drocketmq.broker.diskSpaceWarningLevelRatio设置,默认0.90。如果磁盘分区使用率超过该阈值,将设置磁盘不可写,此时会拒绝新消息的写入。
  • diskSpaceCleanForciblyRatio:通过系统参数-Drocketmq.broker.diskSpaceCleanForciblyRatio设置,默认0.85。如果磁盘分区使用超过该阈值,建议立即执行过期文件清除,但不会拒绝新消息的写入。

如果当前磁盘分区使用率大于diskSpaceWarningLevelRatio,设置磁盘不可写,应该立即启动过期文件删除操作;如果当前磁盘分区使用率大于diskSpaceCleanForciblyRatio,建议立即执行过期文件清除;如果磁盘使用率低于diskSpaceCleanForciblyRatio将恢复磁盘可写;如果当前磁盘使用率小于diskMaxUsedSpaceRatio则返回false,表示磁盘使用率正常,否则返回true,需要执行清除过期文件。

for (int i = 0; i < mfsLength; i++) {
    MappedFile mappedFile = (MappedFile) mfs[i];
    long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
    if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
        if (mappedFile.destroy(intervalForcibly)) {
            files.add(mappedFile);
            deleteCount++;
            if (files.size() >= DELETE_FILES_BATCH_MAX) {
                break;
            }
            if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
                try {
                    Thread.sleep(deleteFilesInterval);
                 } catch (InterruptedException e) {
                 }
            }
        } else {
            break;
        }
    }
}

执行文件销毁与删除。从倒数第二个文件开始遍历,计算文件的最大存活时间(=文件的最后一次更新时间+文件存活时间(默认72小时)),如果当前时间大于文件的最大存活时间或需要强制删除文件(当磁盘使用超过设定的阈值)时则执行MappedFile#destory方法,清除MappedFile占有的相关资源,如果执行成功,将该文件加入到待删除文件列表中,然后统一执行File#delete方法将文件从物理磁盘中删除。

总结

RocketMQ有同步刷盘与异步刷盘的方法,可以根据场景进行选择刷盘的方式,同步刷盘对性能影响比较大,但是可靠性比较高,因为要等到消息刷盘以后才给生产者返回确认消息;异步输盘方式只要写入到缓存以后就给生产者返回确认消息,同时采用后台异步的方式进行刷盘,提高了性能以及吞吐量。

Search

    微信好友

    博士的沙漏

    Table of Contents