消息队列--Kafka 生产环境最佳实践

张开发
2026/4/12 10:56:18 15 分钟阅读

分享文章

消息队列--Kafka 生产环境最佳实践
系列导读本篇将深入讲解 Kafka 生产环境的部署、配置与最佳实践。文章目录一、Kafka 架构概述1.1 核心概念1.2 核心组件二、生产环境部署2.1 集群规划2.2 关键配置2.3 Docker Compose 部署三、生产者最佳实践3.1 配置优化3.2 发送消息四、消费者最佳实践4.1 配置优化4.2 消费消息五、监控与运维5.1 关键指标5.2 监控工具总结一、Kafka 架构概述1.1 核心概念┌─────────────────────────────────────────────────────────────┐ │ Kafka 架构 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Producer ──► Topic ──► Partition ──► Consumer │ │ │ │ │ ▼ │ │ ┌─────────┐ │ │ │ Broker │ (集群) │ │ └─────────┘ │ │ │ │ │ ▼ │ │ ┌─────────┐ │ │ │ ZooKeeper│ │ │ └─────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘1.2 核心组件组件说明BrokerKafka 服务节点Topic消息主题Partition分区并行处理单元Consumer Group消费者组实现负载均衡ZooKeeper集群协调服务二、生产环境部署2.1 集群规划生产环境推荐配置 - Broker: 3 节点奇数 - ZooKeeper: 3 节点 - 磁盘: SSD独立磁盘 - 内存: 32GB - 网络: 万兆网卡2.2 关键配置# server.properties # Broker ID集群内唯一 broker.id0 # 监听地址 listenersPLAINTEXT://0.0.0.0:9092 advertised.listenersPLAINTEXT://192.168.1.100:9092 # 日志目录建议多磁盘 log.dirs/data/kafka-logs # 分区数 num.partitions3 # 副本数 default.replication.factor3 # 最小同步副本 min.insync.replicas2 # 日志保留时间 log.retention.hours168 # 日志段大小 log.segment.bytes10737418242.3 Docker Compose 部署version:3services:zookeeper:image:confluentinc/cp-zookeeper:latestenvironment:ZOOKEEPER_CLIENT_PORT:2181ZOOKEEPER_TICK_TIME:2000volumes:-./zk-data:/var/lib/zookeeper/datakafka1:image:confluentinc/cp-kafka:latestdepends_on:-zookeeperports:-9092:9092environment:KAFKA_BROKER_ID:1KAFKA_ZOOKEEPER_CONNECT:zookeeper:2181KAFKA_ADVERTISED_LISTENERS:PLAINTEXT://192.168.1.100:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR:3kafka2:image:confluentinc/cp-kafka:latestdepends_on:-zookeeperports:-9093:9092environment:KAFKA_BROKER_ID:2KAFKA_ZOOKEEPER_CONNECT:zookeeper:2181KAFKA_ADVERTISED_LISTENERS:PLAINTEXT://192.168.1.101:9093三、生产者最佳实践3.1 配置优化ConfigurationpublicclassKafkaProducerConfig{BeanpublicProducerFactoryString,StringproducerFactory(){MapString,ObjectconfignewHashMap();// Broker 地址config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.1.100:9092,192.168.1.101:9092);// 重试次数config.put(ProducerConfig.RETRIES_CONFIG,3);// 批量大小config.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);// 等待时间config.put(ProducerConfig.LINGER_MS_CONFIG,5);// 缓冲区大小config.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);// ACK 配置config.put(ProducerConfig.ACKS_CONFIG,all);// 序列化config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);returnnewDefaultKafkaProducerFactory(config);}}3.2 发送消息ServicepublicclassOrderProducer{AutowiredprivateKafkaTemplateString,StringkafkaTemplate;// 同步发送publicvoidsendSync(Orderorder){try{SendResultString,StringresultkafkaTemplate.send(order-topic,order.getId(),JSON.toJSONString(order)).get();log.info(发送成功: {},result.getRecordMetadata());}catch(Exceptione){log.error(发送失败,e);}}// 异步发送publicvoidsendAsync(Orderorder){kafkaTemplate.send(order-topic,order.getId(),JSON.toJSONString(order)).addCallback(result-log.info(发送成功),ex-log.error(发送失败,ex));}}四、消费者最佳实践4.1 配置优化ConfigurationEnableKafkapublicclassKafkaConsumerConfig{BeanpublicConsumerFactoryString,StringconsumerFactory(){MapString,ObjectconfignewHashMap();config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.1.100:9092,192.168.1.101:9092);config.put(ConsumerConfig.GROUP_ID_CONFIG,order-group);config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,earliest);config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,100);config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,300000);config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);returnnewDefaultKafkaConsumerFactory(config);}}4.2 消费消息ComponentpublicclassOrderConsumer{KafkaListener(topicsorder-topic,groupIdorder-group,concurrency3)publicvoidconsume(ConsumerRecordString,Stringrecord,Acknowledgmentack){try{OrderorderJSON.parseObject(record.value(),Order.class);// 业务处理processOrder(order);// 手动提交偏移量ack.acknowledge();}catch(Exceptione){log.error(消费失败: {},record.value(),e);// 发送到死信队列sendToDLQ(record);}}}五、监控与运维5.1 关键指标指标说明告警阈值MessagesInPerSec每秒消息数-BytesInPerSec每秒字节数-UnderReplicatedPartitions未同步分区数 0OfflinePartitionsCount离线分区数 0ActiveControllerCount活跃控制器数 15.2 监控工具推荐监控方案 - Prometheus Grafana指标监控 - Kafka Manager可视化管理 - Kafka Exporter指标导出总结✅Kafka 架构Broker、Topic、Partition✅生产环境部署集群规划、配置优化✅生产者实践配置、同步/异步发送✅消费者实践配置、手动提交✅监控运维关键指标、监控工具下篇预告RabbitMQ 高可用集群部署作者刘~浪地球系列消息队列一更新时间2026-04-12

更多文章