Rocketmq源码消息主从同步

2022/01/24 RocketMQ

RocketMQ源码-消息主从同步

RocketMQ发送消息后,需要将消息进行落盘保存,然后进行消息的主从同步。

//代码位置:org.apache.rocketmq.store.CommitLog
public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
        //如果是master 同步
        if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
            //获取数据同步服务
            HAService service = this.defaultMessageStore.getHaService();
            //消息存储成功
            if (messageExt.isWaitStoreMsgOK()) {
                // Determine whether to wait
                //slave is ok
                if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
                    //创建主从同步提交请求
                    GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                    //交给GroupTransferService处理,该类判断主从同步是否完成
                    service.putRequest(request);
                    //唤醒全部的 Slave 同步
                    service.getWaitNotifyObject().wakeupAll();
                    PutMessageStatus replicaStatus = null;
                    try {
                        //复制的状态,等待同步的结果
                        replicaStatus = request.future().get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
                                TimeUnit.MILLISECONDS);
                    } catch (InterruptedException | ExecutionException | TimeoutException e) {
                    }
                    //如果同步不成功,打印错误日志
                    if (replicaStatus != PutMessageStatus.PUT_OK) {
                        log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
                            + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
                        putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                    }
                }
                // Slave problem
                else {
                    // Tell the producer, slave not available
                    //告诉生产者,slave不可用
                    putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
                }
            }
        }

}

handleHA方法首先判断Broker的角色是否是SYNC_MASTER,如果是,则获取HAService类,HAService类是用来主从同步数据的服务。HAService在Broker启动时,创建默认的消息存储类DefaultMessageStore并启动,创建DefaultMessageStore的时候会创建HAService,启动消息存储类时会启动HAService。在消息落盘成功后,才进行消息的主从同步。isSlaveOK判断Slave从服务是否是良好的,Slave从服务器良好的标志是与master的连接数大于0并且是否已经到达需要推送数据的要求了。

public boolean isSlaveOK(final long masterPutWhere) {
        //与master的连接数
        boolean result = this.connectionCount.get() > 0;
        result =
            result //master 推送的位移减去推送给slave的位移是否小于slave 配置的最大落后位移
                && ((masterPutWhere - this.push2SlaveMaxOffset.get()) < this.defaultMessageStore
                .getMessageStoreConfig().getHaSlaveFallbehindMax());
        return result;
}

如果slave从服务器有问题的,则直接告知生产者,slave不可用。否则,创建主从同步提交请求GroupCommitRequest,并交给GroupTransferService处理,该类判断主从同步是否完成。然后唤醒所有slave同步线程,阻塞等待主从同步的状态。从上面的代码来看,并没有涉及到具体的同步流程。

主从同步

流程

在分析主从同步的具体过程之前,先看看同步的大致流程:

  • Master启动并且监听Slave的请求连接
  • Master启动Slave的连接创建,如果收到Slave的请求连接,那么就创建Master与Slave的连接。
  • Slave的启动,主动发起对Master的连接,建立TCP连接。并且向Master上报自己的最大偏移量,拉取消息并将消息落盘保存到Slave本地(commitLog 文件)。
  • Master读写进行分离,ReadSocketService类用来读取从服务器上报的位移偏移量,WriteSocketService类用来向从服务写数据。ReadSocketService和WriteSocketService都是一个线程,只要线程不停止,ReadSocketService就不停的读取从服务上报的信息,WriteSocketService也不停的向从服写数据。

核心类

HAService:主从同步核心实现类

HAService重要属性

  • AtomicInteger connectionCount :Master 维护的与Slave的连接数。
  • List connectionList : Master 与 Slave 的连接集合。
  • AcceptSocketService acceptSocketService : Master接收Slave发送的连接请求线程实现类。
  • DefaultMessageStore defaultMessageStore : 默认消息存储。
  • WaitNotifyObject waitNotifyObject : 同步等待实现。
  • AtomicLong push2SlaveMaxOffset : Master推送给slave的最大的位移。
  • GroupTransferService groupTransferService : 判断主从同步复制是否完成的线程实现类。
  • HAClient haClient :HA客户端实现,Slave端网络的实现类。

HAConnection:HA Master-Slave 网络连接实现类

HAConnection重要属性:

  • HAService haService:关联的AService实现类。
  • SocketChannel socketChannel:网络通道。
  • String clientAddr:客户端地址。
  • WriteSocketService writeSocketService:Master向Slave写数据的服务类。
  • ReadSocketService readSocketService:Master从Slave读数据的服务类。

HAService实现原理

HAService类是主从同步的核心类型,最重要的方法就是start方法,代码如下:

//代码位置:org.apache.rocketmq.store.ha.HAService
public void start() throws Exception {
        //开始监听slave连接
        this.acceptSocketService.beginAccept();
        //监听slave 连接创建
        this.acceptSocketService.start();
        //判断主从同步是否结束
        this.groupTransferService.start();
        //ha客户端启动
        this.haClient.start();
}
  • acceptSocketService.beginAccept():启动Mater监听服务,处理Slave连接请求。
  • acceptSocketService.start():启动AcceptSocketService,处理Slave连接请求,创建HAConnection(Master和Slave的连接)并添加到List集合。
  • groupTransferService.start():启动判断主从同步复制是否完成的线程实现类,监听主从同步是否已经完成。
  • haClient.start():HA客户端启动,Slave上报同步进度给Master、并且将Master发送的数据落盘保存起来。

AcceptSocketService原理

AcceptSocketService类是HAService类的内部类,是Master用来监听Slave请求连接的。HAService类的start方法会调用AcceptSocketService的beginAccept方法:

//代码位置:org.apache.rocketmq.store.ha.HAService$AcceptSocketService
public void beginAccept() throws Exception {
    //创建ServerSocketChannel
    this.serverSocketChannel = ServerSocketChannel.open();
    //创建Selector
    this.selector = RemotingUtil.openSelector();
    //设置TCP reuseAddress
    this.serverSocketChannel.socket().setReuseAddress(true);
    //绑定监听套接字(IP、port)slave的ip和port
    this.serverSocketChannel.socket().bind(this.socketAddressListen);
    //设置非阻塞
    this.serverSocketChannel.configureBlocking(false);
    //注册OP_ACCEPT(连接事件)
    this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}

AcceptSocketService是线程类,主要处理Slave请求连接的逻辑在run方法:

//代码位置:org.apache.rocketmq.store.ha.HAService$AcceptSocketService
public void run() {
            log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                    //选择器每1s处理一次连接就绪事件
                    this.selector.select(1000);
                    Set<SelectionKey> selected = this.selector.selectedKeys();

                    if (selected != null) {
                        //遍历所有的连接就绪事件
                        for (SelectionKey k : selected) {
                            //如果事件已经就绪和是连接就绪事件
                            if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                                //连接通道
                                SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();

                                if (sc != null) {
                                    HAService.log.info("HAService receive new connection, "
                                        + sc.socket().getRemoteSocketAddress());

                                    try {
                                        //创建HA连接,并且启动,添加连接集合中
                                        HAConnection conn = new HAConnection(HAService.this, sc);
                                        conn.start();
                                        HAService.this.addConnection(conn);
                                    } catch (Exception e) {
                                        log.error("new HAConnection exception", e);
                                        sc.close();
                                    }
                                }
                            } else {
                                log.warn("Unexpected ops in select " + k.readyOps());
                            }
                        }

                        //清除所有的选择事件
                        selected.clear();
                    }
                } catch (Exception e) {
                    log.error(this.getServiceName() + " service has exception.", e);
                }
            }

            log.info(this.getServiceName() + " service end");
}

AcceptSocketService线程不停止,run方法就一直监听Slave的连接请求。选择器每1s选择出所有连接就绪事件,如果选择的就绪事件不为空,则遍历所有连接就绪事件,如果事件已经就绪和是连接就绪事件,获取到连接通道,创建HAConnection连接,并且启动HAConnection,将HAConnection添加到连接集合中。AcceptSocketService只处理Slave的的请求连接,Master与Slave的数据传输委托给HAConnection处理,这部分内容将在下面进行讲述。

GroupTransferService实现原理

生产者发送消息给Broker,Broker将消息进行落盘保存,然后进行主从同步,但是怎么判断主从同步已经完成?GroupTransferService就是判断主从同步已经完成的线程。

private void doWaitTransfer() {
            //锁住主从同步结束读请求
            synchronized (this.requestsRead) {
                //如果主从同步结束读请求不为空
                if (!this.requestsRead.isEmpty()) {
                    //遍历所有主从同步结束读请求
                    for (CommitLog.GroupCommitRequest req : this.requestsRead) {
                        //推送给slave的最大的位移是否大于等于下一条消息的起始偏移量
                        boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                        //等待时间等于当前时间加上同步超时时间
                        long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now()
                            + HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout();
                        //如果同步给slave的数据还没有完成 && 当前时间小于等待的时间
                        while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {
                            //等待1秒,并且交换主从同步结束读请求与主从同步结束写请求
                            this.notifyTransferObject.waitForRunning(1000);
                            //推送给slave的最大的位移是否大于等于下一条消息的起始偏移量
                            transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                        }

                        //如果超时,打印日志
                        if (!transferOK) {
                            log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
                        }

                        //设置最终的同步状态
                        req.wakeupCustomer(transferOK);
                    }

                    //清除请去读
                    this.requestsRead.clear();
                }
            }
}

doWaitTransfer方法对requestsRead进行加锁,并且遍历所有的主从同步结束读请求,一直阻塞到主从同步结束。首先判断推送给slave的最大的位移是否大于等于下一条消息的起始偏移量,如果小于的话,说明数据还没有同步完,另外同步没有超时,那么就等待1秒,并且一直阻塞。一直循环判断上面两个条件,只要其中之一不满足,则退出阻塞。退出阻塞,要不就是数据已经同步完成,要不就是数据同步超时。最后设置最终的同步状态。

HAClient实现原理

HAClient是的作用有两个:向Master汇报已拉取消息的偏移量、处理Master发送给Slave的数据。HAClient线程启动以后就不断处理上面两个流程。

  • 主动连接Master,如果连接不成功,则等待5秒,否则进行下一步的处理。
private boolean connectMaster() throws ClosedChannelException {
            if (null == socketChannel) {
                String addr = this.masterAddress.get();
                if (addr != null) {

                    SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
                    if (socketAddress != null) {
                        //创建连接slave 和 master的连接
                        this.socketChannel = RemotingUtil.connect(socketAddress);
                        if (this.socketChannel != null) {
                            //注册OP_READ(网络读事件)
                            this.socketChannel.register(this.selector, SelectionKey.OP_READ);
                        }
                    }
                }

                //设置当前的复制进度为commitlog文件的最大偏移量
                this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

                //最后一次写的时间
                this.lastWriteTimestamp = System.currentTimeMillis();
            }

            return this.socketChannel != null;
}

connectMaster方法是slave主动连接master,首先判断该slave与master的socketChannel是否等于null,如果等于socketChannel等于null,则创建slave与master的连接,然后注册OP_READ的事件,并初始化currentReportedOffset 为commitlog文件的最大偏移量。如果Broker启动的时候,配置的角色是Slave时,但是masterAddress没有配置,那么就不会连接master。最后该方法返回是否成功连接上master。

  • 判断是否需要向Master汇报已拉取消息偏移量。
private boolean isTimeToReportOffset() {
    //当前时间减去最后一次写时间
    long interval =
                HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;
    //大于发送心跳时间间隔
    boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig()
                .getHaSendHeartbeatInterval();

     return needHeart;
}

当前时间减去最后一些写时间,大于发送心跳时间间隔,那么就需要向master上报已拉取的消息偏移量。上报已拉取的消息偏移量方法如下:

private boolean reportSlaveMaxOffset(final long maxOffset) {
            //发送8字节的请求
            this.reportOffset.position(0);
            this.reportOffset.limit(8);
            //最大偏移量
            this.reportOffset.putLong(maxOffset);
            this.reportOffset.position(0);
            this.reportOffset.limit(8);
            //如果需要向Master反馈当前拉取偏移量,则向Master发送一个8字节的请求,请求包中包含的数据为当前Broker消息文件的最大偏移量。
            //如果还有剩余空间并且循环次数小于3
            for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
                try {
                    //发送上报的偏移量
                    this.socketChannel.write(this.reportOffset);
                } catch (IOException e) {
                    log.error(this.getServiceName()
                        + "reportSlaveMaxOffset this.socketChannel.write exception", e);
                    return false;
                }
            }

            //最后一次写时间
            lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
            //是否还有剩余空间
            return !this.reportOffset.hasRemaining();
}

当isTimeToReportOffset判断需要上报已经拉取消息的偏移量时,调用reportSlaveMaxOffset方法进行上报,reportSlaveMaxOffset方法会发送8字节的请求,请求中包含了最大偏移量。如果还有剩余空间并且循环次数小于3,则向master发送上报的偏移量,最后更新下最后一次写时间,reportSlaveMaxOffset方法返回是否还有剩余空间,如果没有剩余空间,则调用closeMaster方法关闭与master的连接。

  • 进行事件选择,并且处理读事件,读事件是将master发送的数据落盘保存。
//进行事件选择, 其执行间隔为 1s 。
this.selector.select(1000);

 //处理读事件
boolean ok = this.processReadEvent();
//处理读事件不ok,关闭与master的连接
if (!ok) {
   this.closeMaster();
}
//上报slave最大的偏移量
if (!reportSlaveMaxOffsetPlus()) {
   continue;
}

进行事件选择, 其执行间隔为 1s ,然后调用processReadEvent方法处理读事件,处理完读事件以后上报最大的偏移量,如果没有剩余的空间,则跳过。

//处理读事件
private boolean processReadEvent() {
        //读取大小为0的次数
       int readSizeZeroTimes = 0;
        //判断是否还有剩余
       while (this.byteBufferRead.hasRemaining()) {
         try {
             //读取到缓存中
             int readSize = this.socketChannel.read(this.byteBufferRead);
             //读取的大小大于0
             if (readSize > 0) {
                 //重置读取到0字节的次数
                 readSizeZeroTimes = 0;
                 //分发读请求
                 boolean result = this.dispatchReadRequest();
                 if (!result) {
                     log.error("HAClient, dispatchReadRequest error");
                     return false;
                 }
             } else if (readSize == 0) {
                 //如果连续 3 次从网络通道读取到 0 个字节,则结束本次读,返回 true 。
                 if (++readSizeZeroTimes >= 3) {
                     break;
                 }
             } else {
                 //如果读取到的字节数小于0或发生IO异常,则返回false。
                 log.info("HAClient, processReadEvent read socket < 0");
                 return false;
             }
         } catch (IOException e) {
             log.info("HAClient, processReadEvent read socket exception", e);
             return false;
         } 
     }
   return true;
}

如果byteBufferRead还有剩余空间,将读取消息到byteBufferRead中,如果读取的大小大于0,则分发读请求,处理读取到的数据,读取的大小等于0,并且连续3次读取到0个字节,则结束本次读,直接返回ture。如果读取到的字节数小于0或发生IO异常,则返回false。

private boolean dispatchReadRequest() {
      //头部长度,物理偏移量与消息的长度
      final int msgHeaderSize = 8 + 4; // phyoffset + size
      //记录当前byteBufferRead的当前指针
      int readSocketPos = this.byteBufferRead.position();

      while (true) {
          //当前指针减去本次己处理读缓存区的指针
          int diff = this.byteBufferRead.position() - this.dispatchPosition;
          //是否大于头部长度,是否包含头部
          if (diff >= msgHeaderSize) {
              //master的物理偏移量
              long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
              //消息的长度
              int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);

              //slave 偏移量
              long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

              //如果slave的最大物理偏移量与master给的偏移量不相等,则返回false
              //从后面的处理逻辑来看,返回false,将会关闭与master的连接,在Slave本次周期内将不会再参与主从同步了。
              if (slavePhyOffset != 0) {
                  if (slavePhyOffset != masterPhyOffset) {
                      log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
                          + slavePhyOffset + " MASTER: " + masterPhyOffset);
                      return false;
                  }
              }

              //包含完整的信息
              if (diff >= (msgHeaderSize + bodySize)) {
                  //读取消息到缓冲区
                  byte[] bodyData = new byte[bodySize];
                  this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize);
                  this.byteBufferRead.get(bodyData);

                  //添加到commit log 文件
                  HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);

                  this.byteBufferRead.position(readSocketPos);
                  //当前的已读缓冲区指针
                  this.dispatchPosition += msgHeaderSize + bodySize;

                  //上报slave最大的复制偏移量
                  if (!reportSlaveMaxOffsetPlus()) {
                      return false;
                  }

                  continue;
              }
          }

          //没有包含完整的消息,
          //其核心思想是将readByteBuffer中剩余的有效数据先复制到readByteBufferBak,然后交换readByteBuffer与readByteBufferBak。
          if (!this.byteBufferRead.hasRemaining()) {
              this.reallocateByteBuffer();
          }

          break;
      }

      return true;
}

dispatchReadRequest方法将读取到的数据一条一条解析,并且落盘保存到本地。但是读取的数据可能不是完整的,所以要判断读取的数据是否完整,消息包括消息头和消息体,消息头12字节长度,包括物理偏移量与消息的长度。首先判断读取到数据的长度是否大于消息头部长度,如果小于说明读取的数据是不完整的,则判断byteBufferRead是否还有剩余空间,并且将readByteBuffer中剩余的有效数据先复制到readByteBufferBak,然后交换readByteBuffer与readByteBufferBak。

如果读取的数据的长度大于消息头部长度,则判断master和slave的偏移量是否相等,如果slave的最大物理偏移量与master给的偏移量不相等,则返回false。在后续的处理中,返回false,将会关闭与master的连接。如果读取的数据长度大于等于消息头长读与消息体长度,说明读取的数据是包好完整消息的,将消息体的内容从byteBufferRead中读取出来,并且将消息保存到commitLog文件中。

总结起来,HAClient类的作用就是Slave上报自己的最大偏移量,以及处理从master拉取过来的数据并落盘保存起来。

HAConnection实现原理

在AcceptSocketService类的分析中,我们知道master监听到sslave的连接后,会创建HAConnection并且启动。HAConnection的作用是master用来处理slave上报的复制进度,并且向slave发送数据,处理上述过程是采用读写分离的,HAConnection的内部类WriteSocketService是master向slave写数据的服务类,HAConnection的内部类ReadSocketService是master从slave读数据的服务类。这两个内部类都是线程类,HAConnection启动后,这两个线程也将启动,不断运行。

ReadSocketService的run方法:

public void run() {
     HAConnection.log.info(this.getServiceName() + " service started");

      while (!this.isStopped()) {
          try {
              this.selector.select(1000);
              //处理读事件
              boolean ok = this.processReadEvent();
              if (!ok) {
                  HAConnection.log.error("processReadEvent error");
                  break;
              }
              //当前时间戳减去最后一次读取时间戳
              long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
              //大于长连接保持的时间
              if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
                  log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
                  break;
              }
          } catch (Exception e) {
              HAConnection.log.error(this.getServiceName() + " service has exception.", e);
              break;
          }
      }

      //停止该读线程
      this.makeStop();

      //停止写线程
      writeSocketService.makeStop();

      //删除连接
      haService.removeConnection(HAConnection.this);

      //连接数减一
      HAConnection.this.haService.getConnectionCount().decrementAndGet();


      SelectionKey sk = this.socketChannel.keyFor(this.selector);
      if (sk != null) {
          sk.cancel();
      }

      //关闭选择器
      try {
          this.selector.close();
          this.socketChannel.close();
      } catch (IOException e) {
          HAConnection.log.error("", e);
      }

      HAConnection.log.info(this.getServiceName() + " service end");
}

ReadSocketService的作用就是用来读取slave发送的数据,run方法的核心实现就是每1s执行一次事件就绪选择,然后调用processReadEvent方法处理读请求,读取从服务器的拉取请求。如果线程停止了,则做些清理的工作,如关闭消除,删除连接等。

private boolean processReadEvent() {
            //读取的字节数为0的次数
            int readSizeZeroTimes = 0;

            //缓冲区没有剩余空间,还原
            if (!this.byteBufferRead.hasRemaining()) {
                this.byteBufferRead.flip();
                this.processPosition = 0;
            }

            //剩余空间
            while (this.byteBufferRead.hasRemaining()) {
                try {
                    //处理网络读
                    int readSize = this.socketChannel.read(this.byteBufferRead);
                    //读取的字节大于0
                    if (readSize > 0) {
                        readSizeZeroTimes = 0;
                        //最后一次读时间戳
                        this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                        //当前位移减去已处理数据位移大于等于8,表明收到从服务器一条拉取消息请求。读取从服务器已拉取偏移量
                        if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
                            int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
                            //消息长度,减去8个字节(消息物理偏移量),
                            long readOffset = this.byteBufferRead.getLong(pos - 8);
                            this.processPosition = pos;

                            //从服务器反馈已拉取完成的数据偏移量
                            HAConnection.this.slaveAckOffset = readOffset;
                            if (HAConnection.this.slaveRequestOffset < 0) {
                                HAConnection.this.slaveRequestOffset = readOffset;
                                log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
                            }

                            //唤醒线程去判断自己的关注的消息是否已经传输完成
                            HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
                        }
                    } else if (readSize == 0) {
                        //如果读取到的字节数等于0,则重复三次,否则结束本次读请求处理;
                        if (++readSizeZeroTimes >= 3) {
                            break;
                        }
                    } else {
                        //如果读取到的字节数小于0,表示连接被断开,返回false,后续会断开该连接。
                        log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
                        return false;
                    }
                } catch (IOException e) {
                    log.error("processReadEvent exception", e);
                    return false;
                }
            }

            return true;
        }
 }

如果byteBufferRead没有剩余空间,则清理byteBufferRead,并将当前已处理数据的指针processPosition设置为0。

如果byteBufferRead还有剩余剩余空间,读取slave拉取数据的请求,如果读取的字节大小大于0并且本次读取到的内容的字节大于等于8,则说明master接收到了slave的一条完整拉取数据的请求,读取从服务器已拉取偏移量,然后唤醒线程去判断自己关注的数据是否已经传输完成。在讲解GroupTransferService的原理时,我们知道GroupTransferService的作用判断主从同步已经完成的线程,这里就是去通知线程。

如果读取到的字节数等于0,则重复三次,否则结束本次读请求处理;如果读取到的字节数小于0,表示连接被断开,返回false,后续会断开该连接。

WriteSocketService的run方法比较长,下面会分成多个部分进行讲解:

this.selector.select(1000);

//从服务器请求拉取数据的偏移量等于-1,说明Master还未收到从服务器的拉取请求,放弃本次事件处理
if (-1 == HAConnection.this.slaveRequestOffset) {
    Thread.sleep(10);
    continue;
}

选择器每秒进行选择一次就绪事件,当slaveRequestOffset等于-1时,说明master还未收到slave的拉取请求,睡眠后放弃本次事件的处理。

//下一次传输的物理偏移量等于-1,表示初次进行数据传输
if (-1 == this.nextTransferFromWhere) {
      //如果slaveRequestOffset为0,则从当前commitlog文件最大偏移量开始传输,否则根据从服务器的拉取请求偏移量开始传输。
      if (0 == HAConnection.this.slaveRequestOffset) {
          //master位移
          long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
          //master位移减去MappedFile文件的大小
          masterOffset = masterOffset- (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getMappedFileSizeCommitLog());

          if (masterOffset < 0) {
              masterOffset = 0;
          }

          this.nextTransferFromWhere = masterOffset;
      } else {
          this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
      }

      log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr + "], and slave request " + HAConnection.this.slaveRequestOffset);
}

nextTransferFromWhere等于-1,表示初次进行数据传输,如果slaveRequestOffset为0,则从当前commitlog文件最大偏移量开始传输,否则根据从服务器的拉取请求偏移量开始传输。

//上一次数据是否全部传输给客户端
  if (this.lastWriteOver) {

      //当前时间戳减去最后一次写时间戳
      long interval =
          HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;

      //大于发送心跳时间间隔的时间
      if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
          .getHaSendHeartbeatInterval()) {

          // Build Header
          this.byteBufferHeader.position(0);
          //消息头长度
          this.byteBufferHeader.limit(headerSize);
          this.byteBufferHeader.putLong(this.nextTransferFromWhere);
          this.byteBufferHeader.putInt(0);
          this.byteBufferHeader.flip();

          //传输数据
          this.lastWriteOver = this.transferData();
          if (!this.lastWriteOver)
              continue;
      }
  } else {
      //还没有传输完毕
      this.lastWriteOver = this.transferData();
      if (!this.lastWriteOver)
          continue;
  }

如果上一次数据全部传输给slave,判断是否需要发送心跳给slave,心跳包头部的大小为12字节,消息长度默认存0,表示本次数据包为心跳包,避免长连接由于空闲被关闭。如果上一次数据还没有传输完毕,则继续传输上一次的数据,然后再次判断是否传输完成,如果消息还是未全部传输,则结束此次事件处理,待下次写事件到底后,继续将未传输完的数据先写入slave。transferData方法的作用是数据传输给slave。具体看看transferData方法:

private boolean transferData() throws Exception {
            int writeSizeZeroTimes = 0;
            // Write Header 传输头信息
            while (this.byteBufferHeader.hasRemaining()) {
                //写入头信息给slave服务器
                int writeSize = this.socketChannel.write(this.byteBufferHeader);
                //写入大小大于0
                if (writeSize > 0) {
                    writeSizeZeroTimes = 0;
                    this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                } else if (writeSize == 0) {
                    //写入大小大于等于3
                    if (++writeSizeZeroTimes >= 3) {
                        break;
                    }
                } else {
                    throw new Exception("ha master write header error < 0");
                }
            }

            if (null == this.selectMappedBufferResult) {
                return !this.byteBufferHeader.hasRemaining();
            }

            writeSizeZeroTimes = 0;

            // Write Body 写入消息体
            //消息头缓冲没有剩余空间
            if (!this.byteBufferHeader.hasRemaining()) {
                //还有剩余数据
                while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
                    //将数据写入到从服务器
                    int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
                    if (writeSize > 0) {
                        writeSizeZeroTimes = 0;
                        this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                    } else if (writeSize == 0) {
                        if (++writeSizeZeroTimes >= 3) {
                            break;
                        }
                    } else {
                        throw new Exception("ha master write body error < 0");
                    }
                }
            }

            //头信息和消息体都没有剩余空间
            boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();

            //没有剩余空间,则释放mapped缓存
            if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
                this.selectMappedBufferResult.release();
                this.selectMappedBufferResult = null;
            }

            return result;
 }

transferData方法会向slave传输两部分数据:传输头信息以及消息体信息。

首先向slave写入头信息,如果写入的字节大小大于0,则设置写入大小为0的次数writeSizeZeroTimes为0,并且将最后一次写入时间更新为当前时间。如果写入的字节大小等于0并且writeSizeZeroTimes大于等于3,则停止向slave写头信息,如果写入的字节大小小于0,则抛出异常。然后向slave写入消息体,向slave写入消息体的过程跟写入头信息的过程是一样的。

Search

    微信好友

    博士的沙漏

    Table of Contents