1. Kafka命令行运维入门指南
第一次接触Kafka命令行工具时,我完全被各种参数搞晕了。记得有次线上服务突然报警,我手忙脚乱地翻文档找命令,结果耽误了宝贵的故障处理时间。后来花了三个月时间系统梳理,才发现Kafka命令行工具就像瑞士军刀,用对了能解决90%的运维问题。
作为分布式消息系统的核心组件,Kafka的命令行工具主要分为五大类:集群管理、Topic操作、消息生产消费、消费者组监控和性能测试。这些工具都存放在Kafka安装目录的bin文件夹下,文件名都以.sh结尾。建议每位运维同学都把这些命令的常用参数背下来,关键时刻能救命。
2. 集群启停与基础管理
2.1 集群启动的三种姿势
前台启动适合调试场景,所有日志直接输出到控制台:
bin/kafka-server-start.sh config/server.properties生产环境一定要用后台启动,推荐这两种方式:
# 方式一:使用-daemon参数 bin/kafka-server-start.sh -daemon config/server.properties # 方式二:nohup方式(方便记录日志) nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &如果需要监控JMX指标,记得指定端口:
JMX_PORT=9991 bin/kafka-server-start.sh -daemon config/server.properties2.2 安全停止集群的正确姿势
直接运行停止脚本可能遇到进程无法退出的情况,我推荐先确认BrokerID再停止:
# 先查看正在运行的Broker jps | grep Kafka # 再执行停止命令 bin/kafka-server-stop.sh如果遇到进程卡死,可以手动kill -9,但一定要先确保没有正在进行的副本同步操作。
3. Topic全生命周期管理
3.1 Topic创建参数详解
创建Topic时这三个参数最关键:
bin/kafka-topics.sh --create \ --bootstrap-server localhost:9092 \ --topic order_events \ --partitions 6 \ # 分区数决定并行度 --replication-factor 3 \ # 副本数决定容灾能力 --config retention.ms=86400000 # 消息保留时间踩坑提醒:如果ZooKeeper设置了chroot路径,必须完整指定:
--zookeeper localhost:2181/kafka_cluster3.2 分区扩容实战技巧
线上业务增长经常需要扩容分区,但要注意:
- 只能增加不能减少分区数
- 会改变消息路由规则
- 可能引发短暂性能波动
扩容命令示例:
bin/kafka-topics.sh --alter \ --bootstrap-server localhost:9092 \ --topic order_events \ --partitions 12建议在业务低峰期操作,完成后用describe命令确认:
bin/kafka-topics.sh --describe \ --bootstrap-server localhost:9092 \ --topic order_events4. 消息生产与消费实战
4.1 生产消息的三种模式
基础生产命令:
bin/kafka-console-producer.sh \ --broker-list localhost:9092 \ --topic order_events需要消息Key时使用:
bin/kafka-console-producer.sh \ --broker-list localhost:9092 \ --topic order_events \ --property parse.key=true \ --property key.separator=:实际项目中更推荐用性能测试工具:
bin/kafka-producer-perf-test.sh \ --topic order_events \ --num-records 1000000 \ --record-size 1024 \ --throughput -1 \ --producer-props bootstrap.servers=localhost:90924.2 消费消息的六种姿势
从头消费(适合数据重放):
bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic order_events \ --from-beginning指定消费组(实现负载均衡):
bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic order_events \ --group order_processor精准位点消费(故障恢复时特别有用):
bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic order_events \ --partition 3 \ --offset 152345. 消费者组深度监控
5.1 消费者组状态诊断
查看所有消费组:
bin/kafka-consumer-groups.sh \ --bootstrap-server localhost:9092 \ --list诊断具体消费组延迟:
bin/kafka-consumer-groups.sh \ --bootstrap-server localhost:9092 \ --group order_processor \ --describe重点关注LAG列,表示未消费的消息积压量。如果发现持续增长,可能是消费者处理能力不足。
5.2 消费组重置操作
重置到最早位点(慎用):
bin/kafka-consumer-groups.sh \ --bootstrap-server localhost:9092 \ --group order_processor \ --reset-offsets \ --to-earliest \ --execute \ --topic order_events按时间重置(推荐方式):
bin/kafka-consumer-groups.sh \ --bootstrap-server localhost:9092 \ --group order_processor \ --reset-offsets \ --to-datetime 2023-01-01T00:00:00.000 \ --execute \ --topic order_events6. 高级运维技巧
6.1 Leader均衡优化
当发现分区Leader分布不均时:
bin/kafka-leader-election.sh \ --bootstrap-server localhost:9092 \ --election-type preferred \ --all-topic-partitions6.2 副本迁移操作
将分区3的副本从Broker 1迁移到Broker 4:
cat << EOF > reassign.json { "version":1, "partitions":[ {"topic":"order_events","partition":3,"replicas":[4,2,3]} ] } EOF bin/kafka-reassign-partitions.sh \ --bootstrap-server localhost:9092 \ --reassignment-json-file reassign.json \ --execute6.3 压测工具组合使用
生产者压测:
bin/kafka-producer-perf-test.sh \ --topic benchmark \ --num-records 1000000 \ --record-size 1024 \ --throughput -1 \ --producer-props \ bootstrap.servers=localhost:9092 \ compression.type=lz4消费者压测:
bin/kafka-consumer-perf-test.sh \ --topic benchmark \ --bootstrap-server localhost:9092 \ --messages 1000000 \ --group perf_test