Kafka是大型架构核心,下面我详解Kafka核心命令@mikechen
1、查看 Kafka 版本
查看当前 Kafka 安装版本。
kafka-topics.sh --version
输出示例:
3.9.0
适用场景
- 查看安装版本
- 排查版本兼容问题
2、创建 Topic
创建一个新的 Topic。
kafka-topics.sh \
--bootstrap-server localhost:9092 \
--create \
--topic order-topic \
--partitions 3 \
--replication-factor 2
参数说明:
--bootstrap-server:Kafka Broker 地址--topic:Topic 名称--partitions:分区数--replication-factor:副本数
输出:
Created topic order-topic
3、查看所有 Topic
kafka-topics.sh \
--bootstrap-server localhost:9092 \
--list
输出:
order-topic
user-topic
log-topic
适用场景
查看集群有哪些 Topic。
4、查看 Topic 详情
kafka-topics.sh \
--bootstrap-server localhost:9092 \
--describe \
--topic order-topic
输出示例:
Topic: order-topic
PartitionCount:3
ReplicationFactor:2
Leader:1
Replicas:1,2
ISR:1,2
重点关注:
- Partition
- Leader
- ISR
- Replicas
5、修改 Topic 分区数
增加分区。
kafka-topics.sh \
--bootstrap-server localhost:9092 \
--alter \
--topic order-topic \
--partitions 6
注意:
Kafka 只能增加分区,不能减少分区。
6、删除 Topic
kafka-topics.sh \
--bootstrap-server localhost:9092 \
--delete \
--topic order-topic
注意:
Broker 需开启:
delete.topic.enable=true
7、发送消息(Producer)
启动生产者。
kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic order-topic
输入:
hello
kafka
java
消息立即发送。
8、消费消息(Consumer)
启动消费者。
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic order-topic
只消费新消息。
如果读取历史消息:
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic order-topic \
--from-beginning
9、查看 Consumer Group
查看所有消费者组。
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--list
输出:
order-group
user-group
10、查看 Consumer Group 详情
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--describe \
--group order-group
输出:
TOPIC
PARTITION
CURRENT-OFFSET
LOG-END-OFFSET
LAG
重点:
- CURRENT OFFSET
- LAG
LAG:
LAG = 最新Offset - 当前消费Offset
LAG 越大:
说明消费越慢。
11、重置 Consumer Offset
从最早开始消费。
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group order-group \
--reset-offsets \
--to-earliest \
--execute \
--topic order-topic
常用于:
- 数据重放
- 测试
12、查看 Broker 信息
查看 Broker 元数据。
kafka-broker-api-versions.sh \
--bootstrap-server localhost:9092
输出:
Broker 0
Broker 1
Broker 2
可以查看:
- Broker ID
- API Version
- 支持能力
13、查看日志目录
kafka-log-dirs.sh \
--bootstrap-server localhost:9092 \
--describe
查看:
- 磁盘使用率
- Topic 存储位置
- 分区大小
14、性能测试(Producer)
Kafka 自带性能测试工具。
kafka-producer-perf-test.sh \
--topic test \
--num-records 1000000 \
--record-size 1024 \
--throughput -1 \
--producer-props bootstrap.servers=localhost:9092
输出:
Records/sec
MB/sec
Latency
适合:
压测 Broker。
15、性能测试(Consumer)
kafka-consumer-perf-test.sh \
--bootstrap-server localhost:9092 \
--topic test \
--messages 100000
输出:
MB/sec
Messages/sec
适合:
测试消费能力。