RocketMQ问题
如何在 SpringBoot 项目中控制 RocketMQ 消费线程数量
最近在新项目开发中遇到一个有趣的问题,如何在 SpringBoot 项目中控制 RocketMQ 消费线程数量。如何设置单个主题 消费线程的最小数量和最大数量,用来区分不同主题 吞吐量不同。 我们先介绍一下 RocketMQ 消息监听再来说明 RocketMQ 消费线程。
RocketMQ 消息监听
设置消费者组为 my_consumer_group,监听 TopicTest 队列,并使用并发消息监听器MessageListenerConcurrently
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_group");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
如果使用的是并发消费的话,使用 ConsumeMessageConcurrentlyService
// 无界队列,并且不可配置容量.那 DefaultMQPushConsumer#consumeThreadMax 配置就毫无意义了.
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(), // 默认20
this.defaultMQPushConsumer.getConsumeThreadMax(), // 默认64
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_"));
consumer消费线程池参数:
- 默认最小消费线程数 20
- 默认最大消费线程数 64
- keepAliveTime = 60*1000 单位:秒
- 队列:new LinkedBlockingQueue<>() 无界队列
- 线程名称:前缀是:ConsumeMessageThread_ 注意:因为线程池使用的是无界队列,那么设置的最大线程数,其实没有什么意义。
修改线程池线程数
上面我们已经知道了,设置线程池的最大线程数是没什么用的。 那我们其实可以设置线程池的最小线程数,来修改consumer消费消息时的线程池大小。
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setConsumeThreadMin(30);
consumer.setConsumeThreadMax(64);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
修改线程池线程数-SpringBoot版
如果consumer是使用spring boot进行集成的,则可以这样设置消费者线程数:
只需要在消费者类实现接口RocketMQPushConsumerLifecycleListener
public interface RocketMQPushConsumerLifecycleListener extends RocketMQConsumerLifecycleListener<DefaultMQPushConsumer> {
}