1.启动kafka服务
命令:
bin/kafka-server-start.sh -daemon config/server.properties
2.停止kafka服务
命令:
./kafka-server-stop.sh
3.创建Topic
命令:
bin/kafka-topics.sh --create --topic test0--zookeeper 127.0.0.1:2181 --config max.message.bytes=12800000 --config flush.messages=1 --partitions 5 --replication-factor 1
参数说明:
–topic:后面的test0是topic的名称;
–zookeeper:应该和server.properties文件中的zookeeper.connect一样;
–config:指定当前topic上有效的参数值;
–partitions:指定topic的分区数量;
–replication-factor:指定每个分区的副本个数,默认1个;
4.查看所有的Topic列表
命令:
bin/kafka-topics.sh --list --zookeeper localhost:9092
5.查看所有的Topic的详细信息
命令:
bin/kafka-topics.sh --describe --zookeeper cdh-worker-1:2181/kafka
如果要查看单个 topic 信息:可在上述命令后面添加 –topic <topicName>。
6.查看Topic的分区和副本情况
命令:
bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic test0
运行结果:
Topic:test0 PartitionCount:16 ReplicationFactor:3 Configs: Topic: test0 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 1,0,2 Topic: test0 Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,0,2 Topic: test0 Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 1,0,2 Topic: test0 Partition: 3 Leader: 1 Replicas: 1,2,0 Isr: 1,0,2 Topic: test0 Partition: 4 Leader: 2 Replicas: 2,0,1 Isr: 1,0,2 Topic: test0 Partition: 5 Leader: 0 Replicas: 0,1,2 Isr: 1,0,2
7.删除一个Topic
命令:
bin/kafka-topics.sh --delete --zookeeper 127.0.0.1:2181 --topic test0
8.Kafka发送消息
命令:
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
9.Kafka接收消息
命令:加了–from-beginning 重头消费所有的消息
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
命令:不加–from-beginning 从最新的一条消息开始消费
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
陈睿mikechen
10年+大厂架构经验,资深技术专家,就职于阿里巴巴、淘宝、百度等一线互联网大厂。
关注「mikechen」公众号,获取更多技术干货!
后台回复【面试】即可获取《史上最全阿里Java面试题总结》,后台回复【架构】,即可获取《阿里架构师进阶专题全部合集》