rabbitmq消息积压:如何快速排查与处理

张开发
2026/4/12 22:15:47 15 分钟阅读

分享文章

rabbitmq消息积压:如何快速排查与处理
线上消息队列积压了消费者处理不过来老板在群里你——这时候怎么办今天聊点实际的从判断积压到快速处理讲点能直接上手的东西。怎么判断消息积压了RabbitMQ# 查看队列消息数量rabbitmqctl list_queues name messages messages_ready messages_unacked# 输出大概长这样# name messages messages_ready messages_unacked# order 15234 15200 34关键看这几个数字messages- 队列里总共有多少消息messages_ready- 等待消费的消息积压的主力messages_unacked- 已投递但还没确认的一般来说messages_ready超过几千就该关注了上万就是严重积压。Kafka# 查看消费 lag落后多少kafka-consumer-groups.sh --bootstrap-server localhost:9092\--groupmy-consumer-group\--describe# 输出大概长这样# GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG# my-consumer order 0 52341 67589 15248LAG就是积压量。LAG 几百几千问题不大上万就得处理了。为什么会积压常见原因就几种1. 消费者挂了最常见。消费者进程没了但生产者还在往里扔消息。2. 消费者变慢了比如数据库慢查询、对外接口超时。消息在那儿等着时间一长就堆起来了。3. 消费者数量不对分区数固定但消费者少了或者消费者不干活了。4. 消费逻辑有死循环或者阻塞消费端卡在某个地方不消费也不报错。快速处理方案方案一临时增加消费者推荐如果是 Kafka把消费者实例扩几个# Docker Compose 临时扩容services:consumer:deploy:replicas:5# 从 1 临时改成 5RabbitMQ 的话多启动几个消费者实例就行。注意Kafka 一个分区只能被一个消费者消费所以扩容数量别超过分区数。方案二提高消费并行度BeanpublicConcurrentKafkaListenerContainerFactoryString,StringkafkaListenerContainerFactory(ConsumerFactoryString,StringconsumerFactory){ConcurrentKafkaListenerContainerFactoryString,StringfactorynewConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(consumerFactory);factory.setConcurrency(10);// 默认1临时调大成10returnfactory;}RabbitMQ 的话调整 prefetch 数量factory.setPrefetchCount(50);// 默认250太大了会积压太久太小了吞吐量上不去方案三跳过积压消息先消费最新的有时候积压太多了处理完要很久。这时候可以先跳过旧消息Kafka 方案重置 offset 到最新# 把消费者组的 offset 重置到最新kafka-consumer-groups.sh --bootstrap-server localhost:9092\--groupmy-consumer-group\--topicorder\--reset-offsets --to-latest\--execute注意这个方案会丢消息只有在业务能接受丢消息的时候才用。方案四临时写个脚本消费掉直接写个简单脚本扫队列然后批量处理ServicepublicclassFastConsumerService{AutowiredprivateRabbitTemplaterabbitTemplate;publicvoidconsumeAndDiscard(StringqueueName,intlimit){for(inti0;ilimit;i){MessagemessagerabbitTemplate.receive(queueName,1000);if(messagenull){break;}// 空消费直接确认。积压太多可以用这个临时方案// 但记得后续要补偿处理这些消息}}}再次强调空消费会丢消息后续一定要有补偿机制。排查积压的根本原因处理完积压只是治标找到根因才是治本。看消费者日志消费者有没有报错有没有超时有没 BLOCKED# RabbitMQ 查看连接状态rabbitmqctl list_connections pid channels# 查看消费者是否正常rabbitmqctl list_consumers queue_name看监控RabbitMQ 管理界面http://localhost:15672Kafka 使用 Kafka Eagle 或者 JMX 监控# 开启 JMX KAFKA_JMX_OPTS-Dcom.sun.management.jmxremotetrue常见坑数据库连接池打满消费者查询数据库连接池耗尽所有消费线程都在等连接。对外接口超时消费者调第三方接口接口响应慢消费速度断崖式下降。内存泄漏消费者 OOM 了但没死透勉强撑着但几乎不消费。怎么防止积压几点经验监控先行。LAG 超过阈值就告警别等老板你。消费者要快。IO 操作异步化能批量处理就批量。分区/队列数要够。Kafka 分区数决定了最大并发消费数。做好限流。上游流量突增时MQ 要能扛住。总结场景推荐方案消费者挂了重启/扩容消费者消费太慢提高并发度、优化消费逻辑积压太多业务能接受丢消息重置 offset 到最新积压太多业务不能丢消息扩容 慢慢消费 加补偿积压不可怕怕的是没监控、没预案。下次遇到这种情况希望你能从容应对。

更多文章