RocketMQ源码-消息主从同步
RocketMQ发送消息后,需要将消息进行落盘保存,然后进行消息的主从同步。
//代码位置:org.apache.rocketmq.store.CommitLog
public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
//如果是master 同步
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
//获取数据同步服务
HAService service = this.defaultMessageStore.getHaService();
//消息存储成功
if (messageExt.isWaitStoreMsgOK()) {
// Determine whether to wait
//slave is ok
if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
//创建主从同步提交请求
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
//交给GroupTransferService处理,该类判断主从同步是否完成
service.putRequest(request);
//唤醒全部的 Slave 同步
service.getWaitNotifyObject().wakeupAll();
PutMessageStatus replicaStatus = null;
try {
//复制的状态,等待同步的结果
replicaStatus = request.future().get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
}
//如果同步不成功,打印错误日志
if (replicaStatus != PutMessageStatus.PUT_OK) {
log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
+ messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
}
// Slave problem
else {
// Tell the producer, slave not available
//告诉生产者,slave不可用
putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}
}
handleHA方法首先判断Broker的角色是否是SYNC_MASTER,如果是,则获取HAService类,HAService类是用来主从同步数据的服务。HAService在Broker启动时,创建默认的消息存储类DefaultMessageStore并启动,创建DefaultMessageStore的时候会创建HAService,启动消息存储类时会启动HAService。在消息落盘成功后,才进行消息的主从同步。isSlaveOK判断Slave从服务是否是良好的,Slave从服务器良好的标志是与master的连接数大于0并且是否已经到达需要推送数据的要求了。
public boolean isSlaveOK(final long masterPutWhere) {
//与master的连接数
boolean result = this.connectionCount.get() > 0;
result =
result //master 推送的位移减去推送给slave的位移是否小于slave 配置的最大落后位移
&& ((masterPutWhere - this.push2SlaveMaxOffset.get()) < this.defaultMessageStore
.getMessageStoreConfig().getHaSlaveFallbehindMax());
return result;
}
如果slave从服务器有问题的,则直接告知生产者,slave不可用。否则,创建主从同步提交请求GroupCommitRequest,并交给GroupTransferService处理,该类判断主从同步是否完成。然后唤醒所有slave同步线程,阻塞等待主从同步的状态。从上面的代码来看,并没有涉及到具体的同步流程。
主从同步
流程
在分析主从同步的具体过程之前,先看看同步的大致流程:
- Master启动并且监听Slave的请求连接
- Master启动Slave的连接创建,如果收到Slave的请求连接,那么就创建Master与Slave的连接。
- Slave的启动,主动发起对Master的连接,建立TCP连接。并且向Master上报自己的最大偏移量,拉取消息并将消息落盘保存到Slave本地(commitLog 文件)。
- Master读写进行分离,ReadSocketService类用来读取从服务器上报的位移偏移量,WriteSocketService类用来向从服务写数据。ReadSocketService和WriteSocketService都是一个线程,只要线程不停止,ReadSocketService就不停的读取从服务上报的信息,WriteSocketService也不停的向从服写数据。
核心类
HAService:主从同步核心实现类
HAService重要属性
- AtomicInteger connectionCount :Master 维护的与Slave的连接数。
- List connectionList : Master 与 Slave 的连接集合。
- AcceptSocketService acceptSocketService : Master接收Slave发送的连接请求线程实现类。
- DefaultMessageStore defaultMessageStore : 默认消息存储。
- WaitNotifyObject waitNotifyObject : 同步等待实现。
- AtomicLong push2SlaveMaxOffset : Master推送给slave的最大的位移。
- GroupTransferService groupTransferService : 判断主从同步复制是否完成的线程实现类。
- HAClient haClient :HA客户端实现,Slave端网络的实现类。
HAConnection:HA Master-Slave 网络连接实现类
HAConnection重要属性:
- HAService haService:关联的AService实现类。
- SocketChannel socketChannel:网络通道。
- String clientAddr:客户端地址。
- WriteSocketService writeSocketService:Master向Slave写数据的服务类。
- ReadSocketService readSocketService:Master从Slave读数据的服务类。
HAService实现原理
HAService类是主从同步的核心类型,最重要的方法就是start方法,代码如下:
//代码位置:org.apache.rocketmq.store.ha.HAService
public void start() throws Exception {
//开始监听slave连接
this.acceptSocketService.beginAccept();
//监听slave 连接创建
this.acceptSocketService.start();
//判断主从同步是否结束
this.groupTransferService.start();
//ha客户端启动
this.haClient.start();
}
- acceptSocketService.beginAccept():启动Mater监听服务,处理Slave连接请求。
- acceptSocketService.start():启动AcceptSocketService,处理Slave连接请求,创建HAConnection(Master和Slave的连接)并添加到List集合。
- groupTransferService.start():启动判断主从同步复制是否完成的线程实现类,监听主从同步是否已经完成。
- haClient.start():HA客户端启动,Slave上报同步进度给Master、并且将Master发送的数据落盘保存起来。
AcceptSocketService原理
AcceptSocketService类是HAService类的内部类,是Master用来监听Slave请求连接的。HAService类的start方法会调用AcceptSocketService的beginAccept方法:
//代码位置:org.apache.rocketmq.store.ha.HAService$AcceptSocketService
public void beginAccept() throws Exception {
//创建ServerSocketChannel
this.serverSocketChannel = ServerSocketChannel.open();
//创建Selector
this.selector = RemotingUtil.openSelector();
//设置TCP reuseAddress
this.serverSocketChannel.socket().setReuseAddress(true);
//绑定监听套接字(IP、port)slave的ip和port
this.serverSocketChannel.socket().bind(this.socketAddressListen);
//设置非阻塞
this.serverSocketChannel.configureBlocking(false);
//注册OP_ACCEPT(连接事件)
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}
AcceptSocketService是线程类,主要处理Slave请求连接的逻辑在run方法:
//代码位置:org.apache.rocketmq.store.ha.HAService$AcceptSocketService
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
//选择器每1s处理一次连接就绪事件
this.selector.select(1000);
Set<SelectionKey> selected = this.selector.selectedKeys();
if (selected != null) {
//遍历所有的连接就绪事件
for (SelectionKey k : selected) {
//如果事件已经就绪和是连接就绪事件
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
//连接通道
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
if (sc != null) {
HAService.log.info("HAService receive new connection, "
+ sc.socket().getRemoteSocketAddress());
try {
//创建HA连接,并且启动,添加连接集合中
HAConnection conn = new HAConnection(HAService.this, sc);
conn.start();
HAService.this.addConnection(conn);
} catch (Exception e) {
log.error("new HAConnection exception", e);
sc.close();
}
}
} else {
log.warn("Unexpected ops in select " + k.readyOps());
}
}
//清除所有的选择事件
selected.clear();
}
} catch (Exception e) {
log.error(this.getServiceName() + " service has exception.", e);
}
}
log.info(this.getServiceName() + " service end");
}
AcceptSocketService线程不停止,run方法就一直监听Slave的连接请求。选择器每1s选择出所有连接就绪事件,如果选择的就绪事件不为空,则遍历所有连接就绪事件,如果事件已经就绪和是连接就绪事件,获取到连接通道,创建HAConnection连接,并且启动HAConnection,将HAConnection添加到连接集合中。AcceptSocketService只处理Slave的的请求连接,Master与Slave的数据传输委托给HAConnection处理,这部分内容将在下面进行讲述。
GroupTransferService实现原理
生产者发送消息给Broker,Broker将消息进行落盘保存,然后进行主从同步,但是怎么判断主从同步已经完成?GroupTransferService就是判断主从同步已经完成的线程。
private void doWaitTransfer() {
//锁住主从同步结束读请求
synchronized (this.requestsRead) {
//如果主从同步结束读请求不为空
if (!this.requestsRead.isEmpty()) {
//遍历所有主从同步结束读请求
for (CommitLog.GroupCommitRequest req : this.requestsRead) {
//推送给slave的最大的位移是否大于等于下一条消息的起始偏移量
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
//等待时间等于当前时间加上同步超时时间
long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now()
+ HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout();
//如果同步给slave的数据还没有完成 && 当前时间小于等待的时间
while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {
//等待1秒,并且交换主从同步结束读请求与主从同步结束写请求
this.notifyTransferObject.waitForRunning(1000);
//推送给slave的最大的位移是否大于等于下一条消息的起始偏移量
transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
}
//如果超时,打印日志
if (!transferOK) {
log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
}
//设置最终的同步状态
req.wakeupCustomer(transferOK);
}
//清除请去读
this.requestsRead.clear();
}
}
}
doWaitTransfer方法对requestsRead进行加锁,并且遍历所有的主从同步结束读请求,一直阻塞到主从同步结束。首先判断推送给slave的最大的位移是否大于等于下一条消息的起始偏移量,如果小于的话,说明数据还没有同步完,另外同步没有超时,那么就等待1秒,并且一直阻塞。一直循环判断上面两个条件,只要其中之一不满足,则退出阻塞。退出阻塞,要不就是数据已经同步完成,要不就是数据同步超时。最后设置最终的同步状态。
HAClient实现原理
HAClient是的作用有两个:向Master汇报已拉取消息的偏移量、处理Master发送给Slave的数据。HAClient线程启动以后就不断处理上面两个流程。
- 主动连接Master,如果连接不成功,则等待5秒,否则进行下一步的处理。
private boolean connectMaster() throws ClosedChannelException {
if (null == socketChannel) {
String addr = this.masterAddress.get();
if (addr != null) {
SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
if (socketAddress != null) {
//创建连接slave 和 master的连接
this.socketChannel = RemotingUtil.connect(socketAddress);
if (this.socketChannel != null) {
//注册OP_READ(网络读事件)
this.socketChannel.register(this.selector, SelectionKey.OP_READ);
}
}
}
//设置当前的复制进度为commitlog文件的最大偏移量
this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
//最后一次写的时间
this.lastWriteTimestamp = System.currentTimeMillis();
}
return this.socketChannel != null;
}
connectMaster方法是slave主动连接master,首先判断该slave与master的socketChannel是否等于null,如果等于socketChannel等于null,则创建slave与master的连接,然后注册OP_READ的事件,并初始化currentReportedOffset 为commitlog文件的最大偏移量。如果Broker启动的时候,配置的角色是Slave时,但是masterAddress没有配置,那么就不会连接master。最后该方法返回是否成功连接上master。
- 判断是否需要向Master汇报已拉取消息偏移量。
private boolean isTimeToReportOffset() {
//当前时间减去最后一次写时间
long interval =
HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;
//大于发送心跳时间间隔
boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig()
.getHaSendHeartbeatInterval();
return needHeart;
}
当前时间减去最后一些写时间,大于发送心跳时间间隔,那么就需要向master上报已拉取的消息偏移量。上报已拉取的消息偏移量方法如下:
private boolean reportSlaveMaxOffset(final long maxOffset) {
//发送8字节的请求
this.reportOffset.position(0);
this.reportOffset.limit(8);
//最大偏移量
this.reportOffset.putLong(maxOffset);
this.reportOffset.position(0);
this.reportOffset.limit(8);
//如果需要向Master反馈当前拉取偏移量,则向Master发送一个8字节的请求,请求包中包含的数据为当前Broker消息文件的最大偏移量。
//如果还有剩余空间并且循环次数小于3
for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
try {
//发送上报的偏移量
this.socketChannel.write(this.reportOffset);
} catch (IOException e) {
log.error(this.getServiceName()
+ "reportSlaveMaxOffset this.socketChannel.write exception", e);
return false;
}
}
//最后一次写时间
lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
//是否还有剩余空间
return !this.reportOffset.hasRemaining();
}
当isTimeToReportOffset判断需要上报已经拉取消息的偏移量时,调用reportSlaveMaxOffset方法进行上报,reportSlaveMaxOffset方法会发送8字节的请求,请求中包含了最大偏移量。如果还有剩余空间并且循环次数小于3,则向master发送上报的偏移量,最后更新下最后一次写时间,reportSlaveMaxOffset方法返回是否还有剩余空间,如果没有剩余空间,则调用closeMaster方法关闭与master的连接。
- 进行事件选择,并且处理读事件,读事件是将master发送的数据落盘保存。
//进行事件选择, 其执行间隔为 1s 。
this.selector.select(1000);
//处理读事件
boolean ok = this.processReadEvent();
//处理读事件不ok,关闭与master的连接
if (!ok) {
this.closeMaster();
}
//上报slave最大的偏移量
if (!reportSlaveMaxOffsetPlus()) {
continue;
}
进行事件选择, 其执行间隔为 1s ,然后调用processReadEvent方法处理读事件,处理完读事件以后上报最大的偏移量,如果没有剩余的空间,则跳过。
//处理读事件
private boolean processReadEvent() {
//读取大小为0的次数
int readSizeZeroTimes = 0;
//判断是否还有剩余
while (this.byteBufferRead.hasRemaining()) {
try {
//读取到缓存中
int readSize = this.socketChannel.read(this.byteBufferRead);
//读取的大小大于0
if (readSize > 0) {
//重置读取到0字节的次数
readSizeZeroTimes = 0;
//分发读请求
boolean result = this.dispatchReadRequest();
if (!result) {
log.error("HAClient, dispatchReadRequest error");
return false;
}
} else if (readSize == 0) {
//如果连续 3 次从网络通道读取到 0 个字节,则结束本次读,返回 true 。
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
//如果读取到的字节数小于0或发生IO异常,则返回false。
log.info("HAClient, processReadEvent read socket < 0");
return false;
}
} catch (IOException e) {
log.info("HAClient, processReadEvent read socket exception", e);
return false;
}
}
return true;
}
如果byteBufferRead还有剩余空间,将读取消息到byteBufferRead中,如果读取的大小大于0,则分发读请求,处理读取到的数据,读取的大小等于0,并且连续3次读取到0个字节,则结束本次读,直接返回ture。如果读取到的字节数小于0或发生IO异常,则返回false。
private boolean dispatchReadRequest() {
//头部长度,物理偏移量与消息的长度
final int msgHeaderSize = 8 + 4; // phyoffset + size
//记录当前byteBufferRead的当前指针
int readSocketPos = this.byteBufferRead.position();
while (true) {
//当前指针减去本次己处理读缓存区的指针
int diff = this.byteBufferRead.position() - this.dispatchPosition;
//是否大于头部长度,是否包含头部
if (diff >= msgHeaderSize) {
//master的物理偏移量
long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
//消息的长度
int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
//slave 偏移量
long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
//如果slave的最大物理偏移量与master给的偏移量不相等,则返回false
//从后面的处理逻辑来看,返回false,将会关闭与master的连接,在Slave本次周期内将不会再参与主从同步了。
if (slavePhyOffset != 0) {
if (slavePhyOffset != masterPhyOffset) {
log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
+ slavePhyOffset + " MASTER: " + masterPhyOffset);
return false;
}
}
//包含完整的信息
if (diff >= (msgHeaderSize + bodySize)) {
//读取消息到缓冲区
byte[] bodyData = new byte[bodySize];
this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize);
this.byteBufferRead.get(bodyData);
//添加到commit log 文件
HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
this.byteBufferRead.position(readSocketPos);
//当前的已读缓冲区指针
this.dispatchPosition += msgHeaderSize + bodySize;
//上报slave最大的复制偏移量
if (!reportSlaveMaxOffsetPlus()) {
return false;
}
continue;
}
}
//没有包含完整的消息,
//其核心思想是将readByteBuffer中剩余的有效数据先复制到readByteBufferBak,然后交换readByteBuffer与readByteBufferBak。
if (!this.byteBufferRead.hasRemaining()) {
this.reallocateByteBuffer();
}
break;
}
return true;
}
dispatchReadRequest方法将读取到的数据一条一条解析,并且落盘保存到本地。但是读取的数据可能不是完整的,所以要判断读取的数据是否完整,消息包括消息头和消息体,消息头12字节长度,包括物理偏移量与消息的长度。首先判断读取到数据的长度是否大于消息头部长度,如果小于说明读取的数据是不完整的,则判断byteBufferRead是否还有剩余空间,并且将readByteBuffer中剩余的有效数据先复制到readByteBufferBak,然后交换readByteBuffer与readByteBufferBak。
如果读取的数据的长度大于消息头部长度,则判断master和slave的偏移量是否相等,如果slave的最大物理偏移量与master给的偏移量不相等,则返回false。在后续的处理中,返回false,将会关闭与master的连接。如果读取的数据长度大于等于消息头长读与消息体长度,说明读取的数据是包好完整消息的,将消息体的内容从byteBufferRead中读取出来,并且将消息保存到commitLog文件中。
总结起来,HAClient类的作用就是Slave上报自己的最大偏移量,以及处理从master拉取过来的数据并落盘保存起来。
HAConnection实现原理
在AcceptSocketService类的分析中,我们知道master监听到sslave的连接后,会创建HAConnection并且启动。HAConnection的作用是master用来处理slave上报的复制进度,并且向slave发送数据,处理上述过程是采用读写分离的,HAConnection的内部类WriteSocketService是master向slave写数据的服务类,HAConnection的内部类ReadSocketService是master从slave读数据的服务类。这两个内部类都是线程类,HAConnection启动后,这两个线程也将启动,不断运行。
ReadSocketService的run方法:
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.selector.select(1000);
//处理读事件
boolean ok = this.processReadEvent();
if (!ok) {
HAConnection.log.error("processReadEvent error");
break;
}
//当前时间戳减去最后一次读取时间戳
long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
//大于长连接保持的时间
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
break;
}
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}
//停止该读线程
this.makeStop();
//停止写线程
writeSocketService.makeStop();
//删除连接
haService.removeConnection(HAConnection.this);
//连接数减一
HAConnection.this.haService.getConnectionCount().decrementAndGet();
SelectionKey sk = this.socketChannel.keyFor(this.selector);
if (sk != null) {
sk.cancel();
}
//关闭选择器
try {
this.selector.close();
this.socketChannel.close();
} catch (IOException e) {
HAConnection.log.error("", e);
}
HAConnection.log.info(this.getServiceName() + " service end");
}
ReadSocketService的作用就是用来读取slave发送的数据,run方法的核心实现就是每1s执行一次事件就绪选择,然后调用processReadEvent方法处理读请求,读取从服务器的拉取请求。如果线程停止了,则做些清理的工作,如关闭消除,删除连接等。
private boolean processReadEvent() {
//读取的字节数为0的次数
int readSizeZeroTimes = 0;
//缓冲区没有剩余空间,还原
if (!this.byteBufferRead.hasRemaining()) {
this.byteBufferRead.flip();
this.processPosition = 0;
}
//剩余空间
while (this.byteBufferRead.hasRemaining()) {
try {
//处理网络读
int readSize = this.socketChannel.read(this.byteBufferRead);
//读取的字节大于0
if (readSize > 0) {
readSizeZeroTimes = 0;
//最后一次读时间戳
this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
//当前位移减去已处理数据位移大于等于8,表明收到从服务器一条拉取消息请求。读取从服务器已拉取偏移量
if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
//消息长度,减去8个字节(消息物理偏移量),
long readOffset = this.byteBufferRead.getLong(pos - 8);
this.processPosition = pos;
//从服务器反馈已拉取完成的数据偏移量
HAConnection.this.slaveAckOffset = readOffset;
if (HAConnection.this.slaveRequestOffset < 0) {
HAConnection.this.slaveRequestOffset = readOffset;
log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
}
//唤醒线程去判断自己的关注的消息是否已经传输完成
HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
}
} else if (readSize == 0) {
//如果读取到的字节数等于0,则重复三次,否则结束本次读请求处理;
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
//如果读取到的字节数小于0,表示连接被断开,返回false,后续会断开该连接。
log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
return false;
}
} catch (IOException e) {
log.error("processReadEvent exception", e);
return false;
}
}
return true;
}
}
如果byteBufferRead没有剩余空间,则清理byteBufferRead,并将当前已处理数据的指针processPosition设置为0。
如果byteBufferRead还有剩余剩余空间,读取slave拉取数据的请求,如果读取的字节大小大于0并且本次读取到的内容的字节大于等于8,则说明master接收到了slave的一条完整拉取数据的请求,读取从服务器已拉取偏移量,然后唤醒线程去判断自己关注的数据是否已经传输完成。在讲解GroupTransferService的原理时,我们知道GroupTransferService的作用判断主从同步已经完成的线程,这里就是去通知线程。
如果读取到的字节数等于0,则重复三次,否则结束本次读请求处理;如果读取到的字节数小于0,表示连接被断开,返回false,后续会断开该连接。
WriteSocketService的run方法比较长,下面会分成多个部分进行讲解:
this.selector.select(1000);
//从服务器请求拉取数据的偏移量等于-1,说明Master还未收到从服务器的拉取请求,放弃本次事件处理
if (-1 == HAConnection.this.slaveRequestOffset) {
Thread.sleep(10);
continue;
}
选择器每秒进行选择一次就绪事件,当slaveRequestOffset等于-1时,说明master还未收到slave的拉取请求,睡眠后放弃本次事件的处理。
//下一次传输的物理偏移量等于-1,表示初次进行数据传输
if (-1 == this.nextTransferFromWhere) {
//如果slaveRequestOffset为0,则从当前commitlog文件最大偏移量开始传输,否则根据从服务器的拉取请求偏移量开始传输。
if (0 == HAConnection.this.slaveRequestOffset) {
//master位移
long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
//master位移减去MappedFile文件的大小
masterOffset = masterOffset- (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getMappedFileSizeCommitLog());
if (masterOffset < 0) {
masterOffset = 0;
}
this.nextTransferFromWhere = masterOffset;
} else {
this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
}
log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr + "], and slave request " + HAConnection.this.slaveRequestOffset);
}
nextTransferFromWhere等于-1,表示初次进行数据传输,如果slaveRequestOffset为0,则从当前commitlog文件最大偏移量开始传输,否则根据从服务器的拉取请求偏移量开始传输。
//上一次数据是否全部传输给客户端
if (this.lastWriteOver) {
//当前时间戳减去最后一次写时间戳
long interval =
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
//大于发送心跳时间间隔的时间
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getHaSendHeartbeatInterval()) {
// Build Header
this.byteBufferHeader.position(0);
//消息头长度
this.byteBufferHeader.limit(headerSize);
this.byteBufferHeader.putLong(this.nextTransferFromWhere);
this.byteBufferHeader.putInt(0);
this.byteBufferHeader.flip();
//传输数据
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}
} else {
//还没有传输完毕
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}
如果上一次数据全部传输给slave,判断是否需要发送心跳给slave,心跳包头部的大小为12字节,消息长度默认存0,表示本次数据包为心跳包,避免长连接由于空闲被关闭。如果上一次数据还没有传输完毕,则继续传输上一次的数据,然后再次判断是否传输完成,如果消息还是未全部传输,则结束此次事件处理,待下次写事件到底后,继续将未传输完的数据先写入slave。transferData方法的作用是数据传输给slave。具体看看transferData方法:
private boolean transferData() throws Exception {
int writeSizeZeroTimes = 0;
// Write Header 传输头信息
while (this.byteBufferHeader.hasRemaining()) {
//写入头信息给slave服务器
int writeSize = this.socketChannel.write(this.byteBufferHeader);
//写入大小大于0
if (writeSize > 0) {
writeSizeZeroTimes = 0;
this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
} else if (writeSize == 0) {
//写入大小大于等于3
if (++writeSizeZeroTimes >= 3) {
break;
}
} else {
throw new Exception("ha master write header error < 0");
}
}
if (null == this.selectMappedBufferResult) {
return !this.byteBufferHeader.hasRemaining();
}
writeSizeZeroTimes = 0;
// Write Body 写入消息体
//消息头缓冲没有剩余空间
if (!this.byteBufferHeader.hasRemaining()) {
//还有剩余数据
while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
//将数据写入到从服务器
int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
if (writeSize > 0) {
writeSizeZeroTimes = 0;
this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
} else if (writeSize == 0) {
if (++writeSizeZeroTimes >= 3) {
break;
}
} else {
throw new Exception("ha master write body error < 0");
}
}
}
//头信息和消息体都没有剩余空间
boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();
//没有剩余空间,则释放mapped缓存
if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
this.selectMappedBufferResult.release();
this.selectMappedBufferResult = null;
}
return result;
}
transferData方法会向slave传输两部分数据:传输头信息以及消息体信息。
首先向slave写入头信息,如果写入的字节大小大于0,则设置写入大小为0的次数writeSizeZeroTimes为0,并且将最后一次写入时间更新为当前时间。如果写入的字节大小等于0并且writeSizeZeroTimes大于等于3,则停止向slave写头信息,如果写入的字节大小小于0,则抛出异常。然后向slave写入消息体,向slave写入消息体的过程跟写入头信息的过程是一样的。