Rocketmq源码生产者发送顺序消息

2022/01/13 RocketMQ

RocketMQ源码-生产者发送顺序消息

在分析顺序消息之前,我们先看看什么是顺序消息?

顺序消息是指生产者按照什么样的顺序发送消息,消费者只能按照生产者发送消息的顺序进行消费。我们在购物网站购买商品时,一个订单的完成需要经过三个严格有序的步骤:订单创建、订单付款、订单生成。生产者产生了订单创建、订单付款、订单生成三条消息,消费只能按照上述顺序消费消息才能有意义。

RocketMQ能严格保证消息的有序性,将顺序消息分为全局顺序消息与分区顺序消息。

  • 全局顺序:对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。
  • 分区顺序:对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。

RocketMQ的一个topic默认有4个消息队列,普通消息的发送,是通过轮询的方式选择队列,然后将消息发送出去的,消息之间是没有顺序的。要保证RocketMQ消息全局有序,那么一个topic有且只能一个消息队列,所有的消息都是严格按照先进先出的顺序进行发布和消费。要保证分区有序,将消息根据Sharding Key进行区分,然后将同一个分区的消息严格按照先进先出的顺序进行发布和消息,也就说将同一分区的消息发送到一个队列中去

发送全局顺序消息,性能比较差,只能用一个队列进行消息的发送。大多数场景都是保证消息分区顺序就行了。

如何保证消息的有序性

保证消息的有序性,RocketMQ从是三个方面一起保证消息的有序性:

  • 生产者生产顺序消息
  • Broker保存顺序消息
  • 消费者顺序消费消息

生产者生产顺序消息

RocketMQ生产者为了生产有顺序的消息,根据sharding key将所有的消息分区,然后将同一分区的消息严格按照先进先出的顺序通过同一个队列发送给Broker服务。这样就能保证生产者发送给Broker服务器的消息有顺序了吗?答案是并不能,万一消息在发送过程中丢失或者后发送的消息比先发送的消息早到Broker服务器,这些请求都不能保证发送出去的消息的有序性。RocketMQ发送顺序消息是采用同步的方式发送给Broker服务器,当Broker服务器返回ack确认,RocketMQ才发送下一条消息。这样就能保证生产者发送的消息是有序的。

Broker保存顺序消息

只要生产者发送过来的消息是有序的,Broker按照生产者发送过来的消息保存起来就保证消息在Broker服务器的有序性,当消息保存成功以后,才给生产者发送ack确认,生产者才会发送下一条消息。即使消息重复发送,也是没有关系的,只要在消费者端做幂等,将重复的消息去重就行了。

消费者顺序消费消息

消费端一般会有多个消费者和多线程进行消费,消息者要顺序消费消息,就要保证同一时间只有一个线程去消费消息。消费者是从Broker服务器拉取消息进行消费的,当消费者拉取消息时,首先向Broker申请锁,如果申请到锁,就拉取消息,否则就放弃拉取消息,下一次再尝试拉取消息。等到将消息拉到消费端时,一般会在线程池中进行消费消息,这时候为了保证同一时间只有一个线程能消费消息,对消费队列加可重入锁。,经过上面的操作,就能保证消费者消费的消息是有序的。

讲了这么多理论,我们接下来看看生产者是如何发送顺序消息,以及深入分析生产者发送顺序消息的源代码,消息者是如何保证消费的消息是有序的则不在这里分析。

/**
* Producer,发送顺序消息
*/
public class Producer {

   public static void main(String[] args) throws Exception {
       DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

       producer.setNamesrvAddr("127.0.0.1:9876");

       producer.start();

       String[] tags = new String[]{"TagA", "TagC", "TagD"};

       // 订单列表
       List<OrderStep> orderList = new Producer().buildOrders();

       Date date = new Date();
       SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
       String dateStr = sdf.format(date);
       for (int i = 0; i < 10; i++) {
           // 加个时间前缀
           String body = dateStr + " Hello RocketMQ " + orderList.get(i);
           Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());

           SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
               @Override
               public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                   Long id = (Long) arg;  //根据订单id选择发送queue
                   long index = id % mqs.size();
                   return mqs.get((int) index);
               }
           }, orderList.get(i).getOrderId());//订单id

           System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
               sendResult.getSendStatus(),
               sendResult.getMessageQueue().getQueueId(),
               body));
       }

       producer.shutdown();
   }

   /**
    * 订单的步骤
    */
   private static class OrderStep {
       private long orderId;
       private String desc;

       public long getOrderId() {
           return orderId;
       }

       public void setOrderId(long orderId) {
           this.orderId = orderId;
       }

       public String getDesc() {
           return desc;
       }

       public void setDesc(String desc) {
           this.desc = desc;
       }

       @Override
       public String toString() {
           return "OrderStep{" +
               "orderId=" + orderId +
               ", desc='" + desc + '\'' +
               '}';
       }
   }

   /**
    * 生成模拟订单数据
    */
   private List<OrderStep> buildOrders() {
       List<OrderStep> orderList = new ArrayList<OrderStep>();

       OrderStep orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("创建");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111065L);
       orderDemo.setDesc("创建");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("付款");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103117235L);
       orderDemo.setDesc("创建");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111065L);
       orderDemo.setDesc("付款");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103117235L);
       orderDemo.setDesc("付款");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111065L);
       orderDemo.setDesc("完成");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("推送");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103117235L);
       orderDemo.setDesc("完成");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("完成");
       orderList.add(orderDemo);

       return orderList;
   }
}

以上代码是官方文档的案例,我直接就拿来用了。以上发送顺序消息最重要的就是实现MessageQueueSelector的select方法,这个方法是如何选择将消息队列发送到那个消息队列中去。上述代码根据订单id将消息分到不同队列中,相同订单id的消息将会发送到同一个消息队列中。将订单id作为send方法的最后一个参数。接下来深入下send方法,send会多次调用重载send方法:

//代码位置:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send
public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout);
}

//代码位置:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendSelectImpl
 private SendResult sendSelectImpl(
        Message msg,
        MessageQueueSelector selector,
        Object arg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback, final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        //代码省略
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            MessageQueue mq = null;
            try {
               //代码省略
                //调用业务实现的select方法选择消息队列
                mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
            } catch (Throwable e) {
                throw new MQClientException("select message queue throwed exception.", e);
            }

            //代码省略
        }

        validateNameServerSetting();
        throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}

send方法采用同步的方法发送消息,等确认收到Broker服务的ack后,才可以继续发送下一条消息,然后根据业务实现的select方法选择消息队列,这样就保证了将同一分区的消息发送到相同的队列中去。剩下的逻辑就是发送消息的流程了,这个RocketMQ源码之生产者发送消息分析在中已经分析过了,这里就不讲了。

Search

    微信好友

    博士的沙漏

    Table of Contents