RocketMQ源码消息存储
Broker接收到消息请求以后,是如何将消息存储起来的?
目前的MQ中间件从存储模型来看,分为需要持久化和不需要持久化的两种模型,现在大多数的MQ都是支持持久化存储的,比如ActiveMQ、RabbitMQ、Kafka,RocketMQ,而ZeroMQ却不需要支持持久化存储。然而业务系统也大多需要MQ有持久存储的能力,能大大增加系统的高可用性。从存储方式和效率来看,文件系统高于KV存储,KV存储又高于关系型数据库,直接操作文件系统肯定是最快的,但可靠性却是最低的,而关系型数据库的性能和可靠性与文件系统恰恰相反
存储概要设计
RocketMQ主要存储的文件包括Comitlog文件、ConsumeQueue文件、IndexFile文件。RocketMQ将所有主题的消息存储在同一个文件中,确保消息发送时顺序写文件,尽最大的能力确保消息发送的高性能与高吞吐量。但由于消息中间件一般是基于消息主题的订阅机制,这样便给按照消息主题检索消息带来了极大的不便。 为了提高消息消费的效率,RocketMQ引入了ConsumeQueue消息队列文件,每个消息主题包含多个消息消费队列,每一个消息队列有一个消息文件。IndexFile索引文件,其主要设计理念就是为了加速消息的检索性能,根据消息的属性快速从Commitlog文件中检索消息。RocketMQ是一款高性能的消息中间件,存储部分的设计是核心,存储的核心是IO访问性能。
RocketMQ消息存储设计原理
- CommitLog:消息存储文件,所有消息主题的消息都存储在CommitLog文件中。
- ConsumeQueue:消息消费队列,消息到达CommitLog文件后,将异步转发到消息消费队列,供消息消费者消费。
- IndexFile:消息索引文件,主要存储消息Key与Offset的对应关系。
- 事务状态服务:存储每条消息的事务状态。
- 定时消息服务:每一个延迟级别对应一个消息消费队列,存储延迟队列的消息拉取进度。
消息发送处理
Broker服务器接收到消息请求时,通过请求码找到SendMessageProcessor处理器,SendMessageProcessor处理器通过processRequest方法处理消息请求:
//代码位置:org.apache.rocketmq.broker.processor.SendMessageProcessor#processRequest
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
RemotingCommand response = null;
try {
response = asyncProcessRequest(ctx, request).get();
} catch (InterruptedException | ExecutionException e) {
log.error("process SendMessage error, request : " + request.toString(), e);
}
return response;
}
processRequest方法调用asyncProcessRequest方法处理请求,asyncProcessRequest方法是异步方法,通过get方法阻塞获取到最后的结果。asyncProcessRequest的代码如下:
//代码位置:org.apache.rocketmq.broker.processor.SendMessageProcessor#asyncProcessRequest
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final SendMessageContext mqtraceContext;
switch (request.getCode()) {
//消费者发送返回消息
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.asyncConsumerSendMsgBack(ctx, request);
default:
//解析发送消息请求头
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {
return CompletableFuture.completedFuture(null);
}
//发送消息实体
mqtraceContext = buildMsgContext(ctx, requestHeader);
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
//是否批量发送
if (requestHeader.isBatch()) {
return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
} else {
return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
}
}
}
asyncProcessRequest方法逻辑比较简单,通过请求码选择不同的消息请求逻辑,当请求码是CONSUMER_SEND_MSG_BACK时,处理消费者发送返回的消息,默认处理生产者发送的消息请求,首先解析消息发送请求头,如果消息请求头为null,就直接返回处理结果了,然后创建消息内容实体,最后根据消是否是批量。调用不同方法处理消息请求。当是批量消息发送时,调用asyncSendBatchMessage方法处理批量消息,否则调用asyncSendMessage方法处理。
我们解析来看看parseRequestHeader方法,看方法的名字就知道是解析消息请求头,parseRequestHeader方法如下:
//代码位置:org.apache.rocketmq.broker.processor.SendMessageProcessor#parseRequestHeader
protected SendMessageRequestHeader parseRequestHeader(RemotingCommand request)
throws RemotingCommandException {
//发送消息请求头,版本2
SendMessageRequestHeaderV2 requestHeaderV2 = null;
//发送消息请求头,版本1
SendMessageRequestHeader requestHeader = null;
switch (request.getCode()) {
//批量消息,发送消息版本2
case RequestCode.SEND_BATCH_MESSAGE:
case RequestCode.SEND_MESSAGE_V2:
requestHeaderV2 =
(SendMessageRequestHeaderV2) request
.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
//普通消息发送
case RequestCode.SEND_MESSAGE:
if (null == requestHeaderV2) {
requestHeader =
(SendMessageRequestHeader) request
.decodeCommandCustomHeader(SendMessageRequestHeader.class);
} else {
requestHeader = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(requestHeaderV2);
}
default:
break;
}
return requestHeader;
}
RocketMQ为了发送消息的性能,除了会对消息进行压缩外,还会将消息转换为更轻量的消息进行发送,更轻量的消息就是将原来的消息换成不利于阅读的格式,这样消息就会变得更小,传输起来就更快。
parseRequestHeader方法通过请求码的类型解析消息发送请求头,如果是批量消息和更轻量的消息,就解析为SendMessageRequestHeaderV2,否则就解析为SendMessageRequestHeader。
接下来,我们将会从下面两个方面继续分析消除的存储:
- 单笔消息存储处理
- 批量消息存储处理
单笔消息存储处理
//代码位置:org.apache.rocketmq.broker.processor.SendMessageProcessor#asyncSendMessage
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
SendMessageContext mqtraceContext,
SendMessageRequestHeader requestHeader) {
//创建响应结果
final RemotingCommand response = preSend(ctx, request, requestHeader);
//发送消息响应头
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
if (response.getCode() != -1) {
return CompletableFuture.completedFuture(response);
}
//消息体
final byte[] body = request.getBody();
//队列id
int queueIdInt = requestHeader.getQueueId();
//根据topic获取topic配置
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
//队列小于0,随机给一个
if (queueIdInt < 0) {
queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
}
//消息
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
//省略代码:为msgInner赋值
CompletableFuture<PutMessageResult> putMessageResult = null;
Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (transFlag != null && Boolean.parseBoolean(transFlag)) {
//如果拒绝事务消息
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return CompletableFuture.completedFuture(response);
}
//处理事务消息
putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
//处理普通消息
putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}
asyncSendMessage方法先创建发送消息响应头,从发送消息请求头获取消息内容,创建msgInner消息并设置消息的属性。如果消息是事务消息,则进行事务消息的处理,否则,进行普通消息的处理。接下来,我们先分析下普通消息的处理,然后在分析事务消息的处理。
普通消息存储
以消息发送存储为突破点,一点一点揭开RocketMQ存储设计的神秘面纱。消息存储入口:org.apache.rocketmq.store.DefaultMessageStore#putMessage。
//代码位置:org.apache.rocketmq.store.DefaultMessageStore#putMessage
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
//检查store状态
PutMessageStatus checkStoreStatus = this.checkStoreStatus();
if (checkStoreStatus != PutMessageStatus.PUT_OK) {
return new PutMessageResult(checkStoreStatus, null);
}
//消息是否合法
PutMessageStatus msgCheckStatus = this.checkMessage(msg);
if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
return new PutMessageResult(msgCheckStatus, null);
}
long beginTime = this.getSystemClock().now();
//保存消息
PutMessageResult result = this.commitLog.putMessage(msg);
long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) {
log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
}
//统计保存消息花费的时间
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
if (null == result || !result.isOk()) {
//统计保存消息失败次数
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
}
return result;
}
采用DefaultMessageStore类的putMessage方法进行普通消息的存储。
- checkStoreStatus方法先检查store服务器的状态,校验服务器的状态是否已经关闭、Broker的角色是否是slave,是否不可写以及是否繁忙?如果满足上述其中之一的条件,那么说明store服务器的状态都不是正常的。
- 校验完store服务器的状态以后,校验消息是否正常,checkMessage方法主要是判断topic的长度不能超过topic最大的长度127、消息的Properties属性不能超过Properties最大的长度32767。
- 消息正常以后就可以调用commitLog的putMessage方法保存消息了,最后统计保存消息花费的时间和统计保存消息失败次数。
如果日志中包含“message store is not writeable,so putMessage is forbidden”,出现这种日志最有可能是磁盘空间不足,在写ConsumeQueue、IndexFile文件出现错误时会拒绝消息再次写入。
接下来深入commitLog的putMessage的方法,看看是如何保存消息的。
//代码位置:org.apache.rocketmq.store.CommitLog#putMessage
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
//省略代码
long elapsedTimeInLock = 0;
MappedFile unlockMappedFile = null;
//获取最后一个MappedFile
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
//可重入锁
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
msg.setStoreTimestamp(beginLockTimestamp);
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
//保存消息
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
//根据保存消息的状态,做不同的处理
switch (result.getStatus()) {
case PUT_OK:
break;
//如果mappedFile文件已经写满了,那么就在创建一个新的,继续写
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();
}
//省略代码
return putMessageResult;
}
上述代码省略了一些对此次分析不太重要的代码,不影响此次的分析。首先获取当前可以写入的Commitlog文件。
Commitlog文件存储目录为${ROCKET_HOME}/store/commitlog目录,每一个文件默认1G,一个文件写满后再创建另外一个,以该文件中第一个偏移量为文件名,偏移量小于20位用0补齐。例如第一个文件初始偏移量为0,第二个文件的1073741824,代表该文件中的第一条消息的物理偏移量为1073741824,这样根据物理偏移量能快速定位到消息。MappedFileQueue可以看作是${ROCKET_HOME}/store/commitlog文件夹,而MappedFile则对应该文件夹下一个个的文件。
获取最后一个MappedFile文件,MappedFile文件是用来保存消息的。为了避免消息在写入的过程中被其他线程访问,用可重入锁进行上锁,先申请putMessageLock,也就是将消息存储到CommitLog文件中是串行的。当获取MappedFile文件不存在或者已经写满了,就新创建一个MappedFile文件。然后通过MappedFile的appendMessage方法将消息保存起来得到保存消息的结果,根据保存消息状态设置保存消息的结果,这里有一个状态需要注意下,当状态时END_OF_FILE时,说明MappedFile文件已经写满了,所以新建MappedFile文件,再通过MappedFile的appendMessage的方法保存消息。
设置消息的存储时间,如果mappedFile为空,表明${ROCKET_HOME}/store/commitlog目录下不存在任何文件,说明本次消息是第一次消息发送,用偏移量0创建第一个commit文件,文件为00000000000000000000,如果文件创建失败,抛出CREATE_MAPEDFILE_FAILED,很有可能是磁盘空间不足或权限不够。
我们先不分析MappedFile的appendMessage的方法,先分析下CommitLog和MappedFile是什么?
CommitLog可以看成是Broker服务器在逻辑上对应的一个大文件,一个Broker服务器对应一个CommitLog文件,Broker服务接收到的所有消息都通过CommitLog类进行存储,CommitLog类有一个MappedFileQueue,MappedFileQueue队列存储着多个MappedFile文件,每个MappedFile文件的默认大小为1G ,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。(这句话从官方文档copy的)。MappedFile类就是实际存储消息的对应实体,采用内存映射技术来提高文件的访问速度与写入速度,以追加写的方式写入MappedFile文件。
在写入消息之前,首先要获取MappedFile文件,才能知道将消息写到哪个MappedFile文件中,putMessage方法在写入消息到MappedFile文件中,调用getLastMappedFile方法获取最后一个MappedFile文件,让我们看看getLastMappedFile方法:
//代码位置:org.apache.rocketmq.store.MappedFileQueue#getLastMappedFile
public MappedFile getLastMappedFile() {
MappedFile mappedFileLast = null;
while (!this.mappedFiles.isEmpty()) {
try {
mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
break;
} catch (IndexOutOfBoundsException e) {
//continue;
} catch (Exception e) {
log.error("getLastMappedFile has exception.", e);
break;
}
}
return mappedFileLast;
}
mappedFiles是CopyOnWriteArrayList数组,保存着MappedFile,getLastMappedFile方法获取mappedFiles数组的最后一个MappedFile返回。如果获取的MappedFile文件为null或者已经写满,putMessage方法会重新创建一个新的MappedFile文件返回。我们看看MappedFile类:
public class MappedFile extends ReferenceResource {
//系统page大小,4M
public static final int OS_PAGE_SIZE = 1024 * 4;
protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
//总的mapped虚拟内存
private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
//总的mapped文件数
private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
//写指针
protected final AtomicInteger wrotePosition = new AtomicInteger(0);
//提交指针
protected final AtomicInteger committedPosition = new AtomicInteger(0);
//刷盘指针
private final AtomicInteger flushedPosition = new AtomicInteger(0);
//文件大小
protected int fileSize;
protected FileChannel fileChannel;
/**
* Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
*/
//写缓存,消息会先写到这里,reput方法会将写到文件中去
protected ByteBuffer writeBuffer = null;
//临时的存储池
protected TransientStorePool transientStorePool = null;
//文件名字
private String fileName;
private long fileFromOffset;
//文件
private File file;
//
private MappedByteBuffer mappedByteBuffer;
//存储时间戳
private volatile long storeTimestamp = 0;
//第一次在队列中创建
private boolean firstCreateInQueue = false;
//省略代码
}
MappedFile的有几个重要的属性,上面代码已经为这些属性已经注释了,如写指针、读指针、刷盘指针、文件大小、文件名、写缓存等属性。
保存消息
获取到MappedFile文件以后,就可以调用appendMessage方法进行消息的存储,appendMessage方法又会调用appendMessagesInner方法真正进行消息的保存:
//代码位置:org.apache.rocketmq.store.MappedFile.appendMessagesInner
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
assert messageExt != null;
assert cb != null;
//当前写指针
int currentPos = this.wrotePosition.get();
//写指针小于文件大小
if (currentPos < this.fileSize) {
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
AppendMessageResult result;
//broker接收到的消息
if (messageExt instanceof MessageExtBrokerInner) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
} else if (messageExt instanceof MessageExtBatch) {
//批量消息
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
//增加写指针
this.wrotePosition.addAndGet(result.getWroteBytes());
//存储的时间
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
appendMessagesInner方法有两个参数,MessageExt是消息,AppendMessageCallback是追加消息回调。appendMessagesInner方法首先获取当前写指针,如果currentPos大于或等于文件大小则表明文件已写满,抛出AppendMessageStatus.UNKNOWN_ERROR。如果currentPos小于文件大小,通过slice()方法创建一个与MappedFile的共享内存区,并设置position为当前指针。
当写指针小于文件大小,判断messageExt是Broker接收到的消息还是批量消息,然后调用AppendMessageCallback的doAppend方法进行消息的追加,最后增加写指针。
接下来我们分析下doAppend方法是如何将消息追加到文件中:
long wroteOffset = fileFromOffset + byteBuffer.position();
this.resetByteBuffer(hostHolder, 8);
String msgId = MessageDecoder.createMessageId(this.msgIdMemory,
msgInner.getStoreHostBytes(hostHolder), wroteOffset);
创建全局唯一消息ID,消息ID有16字节,消息ID组成= 4字节IP+4字节端口号+8字节消息偏移量。 但为了消息ID可读性,返回给应用程序的msgId为字符类型,可以通过UtilAll.bytes2string方法将msgId字节数组转换成字符串,通过UtilAll.string2bytes方法将msgId字符串还原成16个字节的字节数组,从而根据提取消息偏移量,可以快速通过msgId找到消息内容。
keyBuilder.setLength(0);
keyBuilder.append(msgInner.getTopic());
keyBuilder.append('-');
keyBuilder.append(msgInner.getQueueId());
String key = keyBuilder.toString();
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
if (null == queueOffset) {
queueOffset = 0L;
CommitLog.this.topicQueueTable.put(key, queueOffset);
}
获取该消息在消息队列的偏移量。CommitLog中保存了当前所有消息队列的当前待写入偏移量。
//代码省略:org.apache.rocketmq.store.CommitLog#doAppend
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
//代码省略
/**
* Serialize message
*/
//序列化消息
//消息的properties数据
final byte[] propertiesData =
msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
//properties长度
final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
//消息properties太大
if (propertiesLength > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long. length={}", propertiesData.length);
return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
}
//topic数据和长度
final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
final int topicLength = topicData.length;
//消息体的长度
final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
//消息的长度
final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);
// Exceeds the maximum message 大于最大的消息长度4M
if (msgLen > this.maxMessageSize) {
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
+ ", maxMessageSize: " + this.maxMessageSize);
return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
}
// Determines whether there is sufficient free space
//确定是否有足够的空间
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(maxBlank);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
// 3 The remaining space may be any value
// Here the length of the specially set maxBlank
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
//先写到byteBuffer缓存起来
byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
// Initialization of storage space
//初始化内存
this.resetByteBuffer(msgStoreItemMemory, msgLen);
/**
将消息添加到内存中
**/
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(msgLen);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC
this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
// 4 QUEUEID
this.msgStoreItemMemory.putInt(msgInner.getQueueId());
// 5 FLAG
this.msgStoreItemMemory.putInt(msgInner.getFlag());
// 6 QUEUEOFFSET
this.msgStoreItemMemory.putLong(queueOffset);
// 7 PHYSICALOFFSET
this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
// 8 SYSFLAG
this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
// 9 BORNTIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
// 10 BORNHOST
this.resetByteBuffer(bornHostHolder, bornHostLength);
this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
// 11 STORETIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
// 12 STOREHOSTADDRESS
this.resetByteBuffer(storeHostHolder, storeHostLength);
this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
// 13 RECONSUMETIMES
this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
// 14 Prepared Transaction Offset
this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
// 15 BODY
this.msgStoreItemMemory.putInt(bodyLength);
if (bodyLength > 0)
this.msgStoreItemMemory.put(msgInner.getBody());
// 16 TOPIC
this.msgStoreItemMemory.put((byte) topicLength);
this.msgStoreItemMemory.put(topicData);
// 17 PROPERTIES
this.msgStoreItemMemory.putShort((short) propertiesLength);
if (propertiesLength > 0)
this.msgStoreItemMemory.put(propertiesData);
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
// Write messages to the queue buffer
//将消息写到byteBuffer中
byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
switch (tranType) {
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// The next update ConsumeQueue information
CommitLog.this.topicQueueTable.put(key, ++queueOffset);
break;
default:
break;
}
return result;
}
上述将doAppend方法中一些代码省略掉,但是不影响主要流程,将消息的properties数据、topic数据转换为 byte数组,并计算properties数据的长度、topic数据的长度以及消息的长度。然后判断消息大小是否大于消息最大大小,如果是直接返回超过消息大小的结果。 接着确定是否有足够的空间存下消息,有充足的空间,则将消息写到byteBuffer缓存起来,返回文件已经写满的结果。如果没有充足的空间,初始化足够的空间,并将消息加入到msgStoreItemMemory中,最后将消息写入到byteBuffer中。doAppend方法并没有将消息直接写入到文件中,而是将消息写入到byteBuffer中缓存起来。
CommitLog#calMsgLength
private static int calMsgLength(int bodyLength, int topicLength, int
propertiesLength) {
final int msgLen = 4 //TOTALSIZE
+ 4 //MAGICCODE
+ 4 //BODYCRC
+ 4 //QUEUEID
+ 4 //FLAG
+ 8 //QUEUEOFFSET
+ 8 //PHYSICALOFFSET
+ 4 //SYSFLAG
+ 8 //BORNTIMESTAMP
+ 8 //BORNHOST
+ 8 //STORETIMESTAMP
+ 8 //STOREHOSTADDRESS
+ 4 //RECONSUMETIMES
+ 8 //Prepared Transaction Offset
+ 4 + (bodyLength > 0 ? bodyLength : 0) //BODY
+ 1 + topicLength //TOPIC
+ 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength
+ 0;
return msgLen;
}
根据消息体的长度、主题的长度、属性的长度结合消息存储格式计算消息的总长度。
RocketMQ消息存储格式如下。
- TOTALSIZE:该消息条目总长度,4字节。
- MAGICCODE:魔数,4字节。固定值0xdaa320a7。
- BODYCRC:消息体crc校验码,4字节。
- QUEUEID:消息消费队列ID,4字节。
- FLAG:消息FLAG,RocketMQ不做处理,供应用程序使用,默认4字节。
- QUEUEOFFSET:消息在消息消费队列的偏移量,8字节。
- PHYSICALOFFSET:消息在CommitLog文件中的偏移量,8字节。
- SYSFLAG:消息系统Flag,例如是否压缩、是否是事务消息等,4字节。
- BORNTIMESTAMP:消息生产者调用消息发送API的时间戳,8字节。
- BORNHOST:消息发送者IP、端口号,8字节。
- STORETIMESTAMP:消息存储时间戳,8字节。
- STOREHOSTADDRESS:Broker服务器IP+端口号,8字节。
- RECONSUMETIMES:消息重试次数,4字节。
- Prepared Transaction Offset:事务消息物理偏移量,8字节。
- BodyLength:消息体长度,4字节。
- Body:消息体内容,长度为bodyLenth中存储的值。
- TopicLength:主题存储长度,1字节,表示主题名称不能超过255个字符。
- Topic:主题,长度为TopicLength中存储的值。
- PropertiesLength:消息属性长度,2字节,表示消息属性长度不能超过65536个字符。
- Properties:消息属性,长度为PropertiesLength中存储的值。 上述表示CommitLog条目是不定长的,每一个条目的长度存储在前4个字节中。
如果消息长度+END_FILE_MIN_BLANK_LENGTH大于CommitLog文件的空闲空间,则返回AppendMessageStatus.END_OF_FILE,Broker会重新创建一个新的CommitLog文件来存储该消息。从这里可以看出,每个CommitLog文件最少会空闲8个字节,高4字节存储当前文件剩余空间,低4字节存储魔数:CommitLog.BLANK_MAGIC_CODE。 将消息内容存储到ByteBuffer中,然后创建AppendMessageResult。这里只是将消息存储在MappedFile对应的内存映射Buffer中,并没有刷写到磁盘。
下面我们来一一介绍下AppendMessageResult的属性。
- AppendMessageStatus status:消息追加结果,取值PUT_OK:追加成功;END_OF_FILE:超过文件大小;MESSAGE_SIZE_EXCEEDED:消息长度超过最大允许长度:PROPERTIES_SIZE_EXCEEDED:消息属性超过最大允许长度;UNKNOWN_ERROR:未知异常。
- long wroteOffset:消息的物理偏移量。
- String msgId:消息ID。
- long storeTimestamp:消息存储时间戳。
- long logicsOffset:消息消费队列逻辑偏移量,类似于数组下标。
- long pagecacheRT=0:当前未使用。
- int msgNum=1:消息条数,批量消息发送时消息条数。
DefaultAppendMessageCallback#doAppend只是将消息追加在内存中,需要根据是同步刷盘还是异步刷盘方式,将内存中的数据持久化到磁盘。
处理事务消息
当Broker接收到事务消息以后,根据消息的类型将消息交给TransactionalMessageService类的asyncPrepareMessage方法处理,这个asyncPrepareMessage的具体由TransactionalMessageServiceImpl实现。如下所示:
//代码位置:org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#asyncPrepareMessage
public CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner) {
return transactionalMessageBridge.asyncPutHalfMessage(messageInner);
}
asyncPrepareMessage方法调用asyncPutHalfMessage方法,如下:
//代码位置:org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#asyncPutHalfMessage
public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {
return store.asyncPutMessage(parseHalfMessageInner(messageInner));
}
asyncPutHalfMessage方法首先调用parseHalfMessageInner方法先处理下半消息,处理完以后调用store的asyncPutMessage方法保存消息。我们先看看parseHalfMessageInner方法是如何处理半消息的。(RocketMQ事务消息不熟悉的可以参考阅读《RocketMQ源码分析之事务消息分析》)
//代码位置:org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#parseHalfMessageInner
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
//将真实的topic和queueId保存在Property中
//REAL_TOPIC
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
//REAL_QID
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
//重新设置topic,设置的topic为RMQ_SYS_TRANS_HALF_TOPIC
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
//设置queueId为0
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
parseHalfMessageInner方法首先将消息真实的topic和queueId存放到Property中,并重新设置topic为RMQ_SYS_TRANS_HALF_TOPIC,queueId为0,这样保存起来的消息就不能被消费者消费了。我们返回到asyncPutMessage方法中,asyncPutMessage方法最终也是会调用putMessage存储消息的,putMessage方法就是上述讲述的方法,跟上面普通消息处理是一样的,共用putMessage方法。
总结
Broker接收到消息时,会将消息交给SendMessageProcessor处理器处理,SendMessageProcessor处理器首先解析发送消息请求头,根据消息的类型交给不同方法处理消息。CommitLog类具体处理消息,首先获取最后一个MappedFile文件,然后将消息以追加的方式写入到MappedFile中的byteBuffer缓存起来。
RocketMQ源码Broker存储批量消息
我们来分析如何存储批量消息。
当Broker收到消息时,将会将消息交给SendMessageProcessor处理器处理,SendMessageProcessor处理器根据消息的类型处理不同的消息,当消息是批量消息时,会将消息交给asyncSendBatchMessage方法进行处理。asyncSendBatchMessage方法如下:
//代码位置:org.apache.rocketmq.broker.processor.SendMessageProcessor#asyncSendBatchMessage
private CompletableFuture<RemotingCommand> asyncSendBatchMessage(ChannelHandlerContext ctx, RemotingCommand request,
SendMessageContext mqtraceContext,
SendMessageRequestHeader requestHeader) {
//创建消息响应
final RemotingCommand response = preSend(ctx, request, requestHeader);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
if (response.getCode() != -1) {
return CompletableFuture.completedFuture(response);
}
int queueIdInt = requestHeader.getQueueId();
//根据topic获取topic配置信息
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (queueIdInt < 0) {
//随机一个写队列的id
queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
}
//如果topic的长度大于127,则返回错误的响应
if (requestHeader.getTopic().length() > Byte.MAX_VALUE) {
response.setCode(ResponseCode.MESSAGE_ILLEGAL);
response.setRemark("message topic length too long " + requestHeader.getTopic().length());
return CompletableFuture.completedFuture(response);
}
//批量消息实体
MessageExtBatch messageExtBatch = new MessageExtBatch();
//设置topic和queueid
messageExtBatch.setTopic(requestHeader.getTopic());
messageExtBatch.setQueueId(queueIdInt);
int sysFlag = requestHeader.getSysFlag();
if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
}
messageExtBatch.setSysFlag(sysFlag);
messageExtBatch.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(messageExtBatch, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
messageExtBatch.setBody(request.getBody());
messageExtBatch.setBornTimestamp(requestHeader.getBornTimestamp());
messageExtBatch.setBornHost(ctx.channel().remoteAddress());
messageExtBatch.setStoreHost(this.getStoreHost());
//消费次数
messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
MessageAccessor.putProperty(messageExtBatch, MessageConst.PROPERTY_CLUSTER, clusterName);
//存储批量消息
CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessages(messageExtBatch);
//处理消息存储结果
return handlePutMessageResultFuture(putMessageResult, response, request, messageExtBatch, responseHeader, mqtraceContext, ctx, queueIdInt);
}
asyncSendBatchMessage方法跟普通消息的处理流程差不多,创建消息响应,根据topic查找topic配置信息、创建消息内容实体。然后调用asyncPutMessages方法进行存储批量消息,当批量消息存储完以后,就处理批量消息存储的结果。我们来深入asyncPutMessages方法,asyncPutMessages方法会调用putMessages方法存储消息,putMessages方法如下:
//代码位置:org.apache.rocketmq.store.DefaultMessageStore#putMessages
public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {
//检查store服务器的状态
PutMessageStatus checkStoreStatus = this.checkStoreStatus();
//如果store状态不为ok时,直接返回结果
if (checkStoreStatus != PutMessageStatus.PUT_OK) {
return new PutMessageResult(checkStoreStatus, null);
}
//消息检查
PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch);
if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
return new PutMessageResult(msgCheckStatus, null);
}
//存储消息
long beginTime = this.getSystemClock().now();
PutMessageResult result = this.commitLog.putMessages(messageExtBatch);
long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) {
log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length);
}
//统计存储消息花费的时间
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
//如果失败,统计失败的次数
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
}
return result;
}
putMessages方法会首先检查store服务器的状态和对消息进行检查,这个跟普通消息的处理是一样的,就不深入checkStoreStatus和checkMessages方法了。然后调用commitLog的putMessages存储消息,最后统计存储消息所花费的时间以及存储消息失败的次数。接下来,我们继续深入putMessages方法是如何存储批量消息的:
public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) {
//代码省略
MappedFile unlockMappedFile = null;
//获取最后一个mappedFile
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
//fine-grained lock instead of the coarse-grained
//消息编码类
MessageExtBatchEncoder batchEncoder = batchEncoderThreadLocal.get();
//设置已经编码好的消息
messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
putMessageLock.lock();
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
messageExtBatch.setStoreTimestamp(beginLockTimestamp);
//mappedFile为空或者已经写满,重新创建mappedFile
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
//如果mappedFile为null,则返回创建mappedFile失败的结果
if (null == mappedFile) {
log.error("Create mapped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
//存储消息
result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback);
//对存储消息的结果进行处理
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("Create mapped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();
}
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessages in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, messageExtBatch.getBody().length, result);
}
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
//统计相关
storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(result.getMsgNum());
storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(result.getWroteBytes());
//消息刷盘
handleDiskFlush(result, putMessageResult, messageExtBatch);
//消息同步
handleHA(result, putMessageResult, messageExtBatch);
return putMessageResult;
}
上述省略了一些无关紧要的代码,putMessages方法处理批量消息的存储与普通消息的存储的流程差不多,首先获取最后一个mappedFile,封装消息,这里的封装消息跟普通消息的存储有点不一样,就是这里专门用MessageExtBatchEncoder类处理批量消息,利用batchEncoder.encode对批量消息进行遍历写入内存。在批量消息写入之前,首先获取可重入锁,然后调用appendMessages方法对批量消息进行存储,当批量消息存储结果返回时,根据存储的结果状态返回不同处理结果。最后记录统计相关结果、消息进行刷盘、消息同步等。批量消息存储的流程跟普通消息存储的流程大体都相同。
appendMessages方法将消息追加到文件中,在批量消息存储和普通消息存储都用到了这个方法。将批量消息进行追加到文件中,跟普通消息追加到文件中,差别比较大的就是批量消息的追加是需要遍历追加文件的,看懂了普通消息的追加,批量消息的追加也是比较容易看懂的,批量消息的存储也是没有什么问题。