Pulsar定义
Pulsar是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计。
Pulsar 诞生于 2012 年,最初的目的是为在 Yahoo 内部整合其他消息系统,构建统一逻辑、支撑大集群和跨区域的消息平台。
当时的其他消息系统,包括 Kafka,都不能满足 Yahoo 的需求,比如大集群多租户、稳定可靠的 IO 服务质量、百万级 Topic、跨地域复制等,因此 Pulsar 应运而生。
Pulsar特点
Pulsar作为下一代云原生分布式消息流平台,具有如下特点:
- 具有强一致性、高吞吐、低延时及高可扩展性;
- 支持多租户、持久化存储、多机房跨区域数据复制;
- Pulsar简单易用的客户端,支持:Java、Go、Python和C++;
- Pulsar通过 Apache BookKeeper 提供的持久化消息存储机制,保证消息传递;
- Pulsar可无缝扩展到超过一百万个 topic;
- Pulsar支持多种 topic 订阅模式,比如:独占订阅、共享订阅、故障转移订阅等;
Pulsar架构原理
Pulsar整体架构,如下图所示:
Pulsar 由 Producer、Consumer、多个 Broker 、一个 BookKeeper 集群、一个 Zookeeper 集群构成。
1.Producer(数据生产者)
Producer:消息生产者,将消息发送到Broker。
2.Broker(消息服务端)
Broker可以看作是Pulsar的Server,Producer和Consumer可以看作是Client,是消息处理的节点。
Pulsar的Broker和其他消息中间件的都不一样,是无状态的没有存储,所以可以无限制的扩展。
Broker负责接收消息、传递消息、集群负载均衡等操作,Broker 不会持久化保存元数据。
3.Consumer(数据消费者)
Consumer可以看作PulsarClient,是消息的消费者,负责从 Broker 订阅并消费消息。
4.BookKeeper(存储消息)
BookKeeper大家可能比较陌生,BookKeeper其实就是存储服务,负责主题消息的持久化,采用的是Apache Bookeeper。
5.ZooKeeper(注册中心)
和Kafka一样Pulsar也是使用ZooKeeper保存一些元数据,比如:集群配置等信息,负责集群间的协调(例如:Topic 与 Broker 的关系)、服务发现等。
Pulsar工作流程
Pulsar的工作流程,主要分为读流程和写流程。
大致流程,如下图所示:
1.写流程(3步)
第一步: broker发起写请求
首先对Journal磁盘写入WAL,熟悉mysql的朋友知道redolog,journal和redolog作用一样都是用于恢复没有持久化的数据。
第二步:数据写入
数据写入:将数据写入index和ledger,这里为了保持性能不会直接写盘,而是写pagecache,然后异步刷盘。
第三步:写入确认
也就是对写入进行ack。
2.读流程(2步)
第一步:先读取index
先读取index,当然也是先读取cache,再走disk。
第二步:获取数据
获取到index之后,根据index去entry logger中去对应的数据。
Pulsar使用
1.下载 Pulsar
//下载Pulsar wget <https://archive.apache.org/dist/pulsar/pulsar-2.8.1/apache-pulsar-2.8.1-bin.tar.gz> //解压 tar xvfz apache-pulsar-2.8.1-bin.tar.gz
2.启动 Pulsar
在Bin目录中的命令启动Pulsar,可以独立模式启动 Pulsar。
$ bin/pulsar standalone
启动成功后,会有消息提示,如下所示:
2022-11-01 18:56:29,192 - INFO - [main:WebSocketService@95] - Configuration Store cache started 2022-11-01 18:5:29,192 - INFO - [main:AuthenticationService@61] - Authentication is disabled 2022-11-01 18:5:29,192 - INFO - [main:WebSocketService@108] - Pulsar WebSocket Service started
3.Pulsar示例
代码示例如下:
public class PulsarDemo { private static PulsarClient PULSAR_CLIENT = null; static { try { // 创建pulsar客户端 PULSAR_CLIENT = PulsarClient.builder().serviceUrl("pulsar://127.0.0.1:6650").build(); } catch (PulsarClientException e) { e.printStackTrace(); } } public static void main(String[] args) throws PulsarClientException { // 创建生产者 Producer<byte[]> producer = PULSAR_CLIENT.newProducer().topic("test-topic-1").create(); // 同步发送消息 MessageId messageId = producer.send("同步发送的消息".getBytes(StandardCharsets.UTF_8)); System.out.println("消息发送成功,消息id: " + messageId); // 创建消费者 Consumer<byte[]> consumer = PULSAR_CLIENT.newConsumer().topic("test-topic-1") .subscriptionName("test-subscription-1").subscribe(); //获取一个消息内容 Message<byte[]> message = consumer.receive(); System.out.println("接收的消息内容: " + new String(message.getData())); // 确认消费成功,以便pulsar删除消费成功的消息 consumer.acknowledge(message); //关闭客户端 producer.close(); consumer.close(); PULSAR_CLIENT.close(); } }
陈睿mikechen
10年+大厂架构经验,资深技术专家,就职于阿里巴巴、淘宝、百度等一线互联网大厂。
关注「mikechen」公众号,获取更多技术干货!
后台回复【面试】即可获取《史上最全阿里Java面试题总结》,后台回复【架构】,即可获取《阿里架构师进阶专题全部合集》