Rocketmq问题解决

2022/01/28 RocketMQ

RocketMQ问题

如何在 SpringBoot 项目中控制 RocketMQ 消费线程数量

最近在新项目开发中遇到一个有趣的问题,如何在 SpringBoot 项目中控制 RocketMQ 消费线程数量。如何设置单个主题 消费线程的最小数量和最大数量,用来区分不同主题 吞吐量不同。 我们先介绍一下 RocketMQ 消息监听再来说明 RocketMQ 消费线程。

RocketMQ 消息监听

设置消费者组为 my_consumer_group,监听 TopicTest 队列,并使用并发消息监听器MessageListenerConcurrently

public class Consumer {
 
     public static void main(String[] args) throws InterruptedException, MQClientException {
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_group");
         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
         consumer.subscribe("TopicTest", "*");
        consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
         consumer.registerMessageListener(new MessageListenerConcurrently() {
             @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

如果使用的是并发消费的话,使用 ConsumeMessageConcurrentlyService

// 无界队列,并且不可配置容量.那 DefaultMQPushConsumer#consumeThreadMax 配置就毫无意义了.
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
this.consumeExecutor = new ThreadPoolExecutor(
    this.defaultMQPushConsumer.getConsumeThreadMin(), // 默认20
    this.defaultMQPushConsumer.getConsumeThreadMax(), // 默认64
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.consumeRequestQueue,
    new ThreadFactoryImpl("ConsumeMessageThread_"));

consumer消费线程池参数:

  • 默认最小消费线程数 20
  • 默认最大消费线程数 64
  • keepAliveTime = 60*1000 单位:秒
  • 队列:new LinkedBlockingQueue<>() 无界队列
  • 线程名称:前缀是:ConsumeMessageThread_ 注意:因为线程池使用的是无界队列,那么设置的最大线程数,其实没有什么意义。

修改线程池线程数

上面我们已经知道了,设置线程池的最大线程数是没什么用的。 那我们其实可以设置线程池的最小线程数,来修改consumer消费消息时的线程池大小。

public static void main(String[] args) throws InterruptedException, MQClientException {
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
 
         consumer.setConsumeThreadMin(30);
         consumer.setConsumeThreadMax(64);
 
         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
         consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }

修改线程池线程数-SpringBoot版

如果consumer是使用spring boot进行集成的,则可以这样设置消费者线程数: 只需要在消费者类实现接口RocketMQPushConsumerLifecycleListener

public interface RocketMQPushConsumerLifecycleListener extends RocketMQConsumerLifecycleListener<DefaultMQPushConsumer> {
}

Search

    微信好友

    博士的沙漏

    Table of Contents