Kafka消息队列详解(看这篇就够了)

Kafka消息队列详解(看这篇就够了)-mikechen

Kafka消息队列定义

Kafka消息队列详解(看这篇就够了)-mikechen

Kafka是一个高性能、可扩展的消息队列系统,用于处理实时数据流,它的设计目标是提供可靠的、高吞吐量的消息传递,并支持持久化存储和容错机制。

 

Kafka特点

Kafka主要特点是:高吞吐量、低延迟、高可用性、分布式、可扩展性和可靠性。

  1. 高吞吐量

Kafka可以处理大量的消息,每秒可以处理数百万条消息,这使得它非常适合于大规模数据传输的应用场景。

  1. 低延迟

Kafka的消息发布和订阅速度非常快,通常只有几毫秒的延迟,这使得它非常适合于实时数据流处理的应用场景。

  1. 高可用性

Kafka可以保证消息的可靠传输,同时具有自动容错和故障恢复机制,这使得它非常适合于需要高可用性的应用场景。

  1. 分布式

Kafka是一个分布式系统,可以扩展到多个服务器上,这使得它非常适合于处理大规模数据和需要可扩展性的应用场景。

  1. 可扩展性

Kafka可以通过添加更多的服务器来扩展它的容量和性能,这使得它非常适合于需要可扩展性的应用场景。

 

Kafka概念

下面是对Kafka消息队列的详细解释:

1.主题(Topics)

Kafka的消息被发布到主题中,每个主题可以被认为是一个消息的类别或者流,生产者将消息发送到特定的主题,而消费者可以订阅一个或多个主题来接收消息。

 

2.分区(Partitions)

每个主题可以被分为一个或多个分区,每个分区是一个有序的消息日志。

消息在分区内按照顺序追加,每条消息都有一个唯一的偏移量(Offset)标识其在分区中的位置,分区允许Kafka在分布式环境下进行并行处理和水平扩展,以提高吞吐量和可靠性。

 

3.生产者(Producers)

Kafka消息队列详解(看这篇就够了)-mikechen

生产者负责将消息发布到Kafka的主题,生产者可以选择将消息发送到特定的分区,并且可以指定消息的键(Key)。

根据键的哈希值,Kafka将消息路由到相应的分区,或者使用自定义的分区器来确定消息应该被发送到哪个分区。

 

4.消费者(Consumers)

消费者订阅一个或多个主题,并从相应的分区中读取消息。

每个分区只能由一个消费者进行消费,但是一个消费者可以订阅多个分区。

消费者可以以不同的消费组(Consumer Group)的形式进行组织,每个消费组内的消费者共同消费分区中的消息。

 

5.消费者组(Consumer Group)

消费者可以组成一个消费者组,消费者组内的消费者共同负责处理主题的所有分区,每个分区只能被消费者组内的一个消费者消费。

Kafka使用消费者组来实现负载均衡和扩展性,一个分区只能被一个消费者组中的一个消费者消费,其他消费者无法读取该分区的消息。

 

6.持久化存储

Kafka将所有的消息持久化地存储在磁盘上,以确保数据的可靠性和持久性,这使得Kafka非常适合构建可靠的数据管道和进行数据回放。

消息传递保证:Kafka提供可靠的消息传递保证。一旦消息被写入Kafka的分区,它将会持久化,并且可以在之后被读取。生产者可以选择使用不同的消息传递语义,如至多一次、至少一次或精确一次,以满足不同的应用需求。

 

Kafka工作原理

Kafka的工作原理可以概括为生产者将消息发布到主题,然后消费者订阅主题并消费其中的消息。

如下图所示:

Kafka消息队列详解(看这篇就够了)-mikechen

上图基本都是目前主流消息中间件的原型,可以很清楚的看见三者的关系。

生产者将数据生产出来,交给 broker 进行存储。

如下图所示:

Kafka消息队列详解(看这篇就够了)-mikechen

消费者需要消费数据了,就从broker中去拿出数据来,然后完成一系列对数据的处理操作。

备注:图上有个细节需要注意:producer 到 broker 的过程是 push也就是有数据就推送到 broker,而 consumer 到 broker 的过程是 pull是通过 consumer 主动去拉数据的

多个 broker 协同合作,producer 和 consumer 部署在各个业务逻辑中被频繁的调用,三者通过 zookeeper管理协调请求和转发。

Kafka使用zookeeper作为其分布式协调框架,很好的将消息生产、消息存储、消息消费的过程结合在一起。

如下图所示:

Kafka消息队列详解(看这篇就够了)-mikechen

这样一个高性能的分布式消息发布订阅系统就完成了。

 

Kafka消息队列安装

Kafka官方下载地址:https://kafka.apache.org/downloads

Kafka消息队列详解(看这篇就够了)-mikechen

根据自己的需求下载相应版本的Kafka,Kafka是由scala语言编写,下载稳定Scala版本。

如下图所示:

Kafka消息队列详解(看这篇就够了)-mikechen

1.新建Kafka安装目录

命令:

cd /

cd /opt

mkdir kafka

 

2.Kafka解压安装包

tar -xzvf 解压包名称

比如:

[root@mikechen kafka]# tar -zxvf kafka_2.12-3.1.0.tgz

然后再查看解压文件:

[root@mikechen kafka]# cd kafka_2.12-3.1.0
[root@mikechen kafka_2.12-3.1.0]# ls
bin  config  libs  LICENSE  licenses  NOTICE  site-docs

 

3.修改核心配置文件

进入config配置目录,修改配置文件 server.properties

cd config

Kafka消息队列详解(看这篇就够了)-mikechen

修改配置文件 server.properties:

vi /opt/kafka/config/server.properties

修改如下6处内容:

1)修改kafka配置

#要求每台kafka都有唯一的brokerid
broker.id=1 

listeners=PLAINTEXT://server1:9092

配置说明:

​​broker.id​​ :配置的是集群环境,要求每台kafka都有唯一的brokerid;

listeners:listeners配置的是kafka的tcp侦听ip地址;

注:listeners监听器千万不能写localhost,否则虽然在本地可以通信,一旦外网通过JavaAPI访问就会出错。

 

2)修改日志的位置

log.dirs=/opt/kafka/data

log.dirs​​ :数据存放的目录。

 

3)修改Zookeeper

zookeeper.connect=server1:2181

如果是集群配置,修改为:

zookeeper.connect=server1:2181,server2:2181,server3:2181

多个zookeeper通过,分割,比如:erver1:2181,server2:2181即可。

 

4.测试安装

1.启动zookeeper

kafka 是基于 Zookeeper 的消息管理系统,所以启动的时候是需要使用到 Zookeeper ,但其内置了Zookeeper ,所以只需要根据bin目录下的文件进行启动即可。

cd /opt/kafka/bin
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties

Zookeeper 启动成功后会出现 binding to port 0.0.0.0/0.0.0.0:2181 所示结果表示启动成功。

 

2.再启动kafka

进入到kafka bin目录下, 启动如下指令

./kafka-server-start.sh -daemon ../config/server.propertie

 

3.验证Kafka

命令:

ps -ef|grep kafka

kafka进程存在就说明已经安装成功了。

作者简介

陈睿|mikechen,10年+大厂架构经验,就职于阿里巴巴、淘宝、百度等一线互联网大厂。

关注作者「mikechen」公众号,获取更多技术干货!

后台回复架构,即可获取《阿里架构师进阶专题全部合集》,后台回复面试即可获取《史上最全阿里Java面试题总结

评论交流
    说说你的看法