Pulsar最全详解(特点架构及原理使用)

Pulsar最全详解(特点架构及原理使用)-mikechen

Pulsar定义

Pulsar是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计。

Pulsar最全详解(特点架构及原理使用)-mikechen

Pulsar 诞生于 2012 年,最初的目的是为在 Yahoo 内部整合其他消息系统,构建统一逻辑、支撑大集群和跨区域的消息平台。

当时的其他消息系统,包括 Kafka,都不能满足 Yahoo 的需求,比如大集群多租户、稳定可靠的 IO 服务质量、百万级 Topic、跨地域复制等,因此 Pulsar 应运而生。

 

Pulsar特点

Pulsar作为下一代云原生分布式消息流平台,具有如下特点:

  • 具有强一致性、高吞吐、低延时及高可扩展性;
  • 支持多租户、持久化存储、多机房跨区域数据复制;
  • Pulsar简单易用的客户端,支持:Java、Go、Python和C++;
  • Pulsar通过 Apache BookKeeper 提供的持久化消息存储机制,保证消息传递;
  • Pulsar可无缝扩展到超过一百万个 topic;
  • Pulsar支持多种 topic 订阅模式,比如:独占订阅、共享订阅、故障转移订阅等;

Pulsar最全详解(特点架构及原理使用)-mikechen

 

Pulsar架构原理

Pulsar整体架构,如下图所示:

Pulsar最全详解(特点架构及原理使用)-mikechen

Pulsar 由 Producer、Consumer、多个 Broker 、一个 BookKeeper 集群、一个 Zookeeper 集群构成。

1.Producer(数据生产者)

Producer:消息生产者,将消息发送到Broker。

 

2.Broker(消息服务端)

Broker可以看作是Pulsar的Server,Producer和Consumer可以看作是Client,是消息处理的节点。

Pulsar的Broker和其他消息中间件的都不一样,是无状态的没有存储,所以可以无限制的扩展。

Broker负责接收消息、传递消息、集群负载均衡等操作,Broker 不会持久化保存元数据。

 

3.Consumer(数据消费者)

Consumer可以看作PulsarClient,是消息的消费者,负责从 Broker 订阅并消费消息。

 

4.BookKeeper(存储消息)

BookKeeper大家可能比较陌生,BookKeeper其实就是存储服务,负责主题消息的持久化,采用的是Apache Bookeeper。

 

5.ZooKeeper(注册中心)

Kafka一样Pulsar也是使用ZooKeeper保存一些元数据,比如:集群配置等信息,负责集群间的协调(例如:Topic 与 Broker 的关系)、服务发现等。

 

Pulsar工作流程

Pulsar的工作流程,主要分为读流程和写流程。

大致流程,如下图所示:

Pulsar最全详解(特点架构及原理使用)-mikechen

1.写流程(3步)

第一步: broker发起写请求

首先对Journal磁盘写入WAL,熟悉mysql的朋友知道redolog,journal和redolog作用一样都是用于恢复没有持久化的数据。

第二步:数据写入

数据写入:将数据写入index和ledger,这里为了保持性能不会直接写盘,而是写pagecache,然后异步刷盘。

第三步:写入确认

也就是对写入进行ack。

 

2.读流程(2步)

第一步:先读取index

先读取index,当然也是先读取cache,再走disk。

第二步:获取数据

获取到index之后,根据index去entry logger中去对应的数据。

 

Pulsar使用

1.下载 Pulsar

//下载Pulsar
wget <https://archive.apache.org/dist/pulsar/pulsar-2.8.1/apache-pulsar-2.8.1-bin.tar.gz>

//解压
tar xvfz apache-pulsar-2.8.1-bin.tar.gz

 

2.启动 Pulsar

在Bin目录中的命令启动Pulsar,可以独立模式启动 Pulsar。

$ bin/pulsar standalone

启动成功后,会有消息提示,如下所示:

2022-11-01 18:56:29,192 - INFO  - [main:WebSocketService@95] - Configuration Store cache started
2022-11-01 18:5:29,192 - INFO  - [main:AuthenticationService@61] - Authentication is disabled
2022-11-01 18:5:29,192 - INFO  - [main:WebSocketService@108] - Pulsar WebSocket Service started

3.Pulsar示例

代码示例如下:

public class PulsarDemo {

    private static PulsarClient PULSAR_CLIENT = null;

    static {
        try {
            // 创建pulsar客户端
            PULSAR_CLIENT = PulsarClient.builder().serviceUrl("pulsar://127.0.0.1:6650").build();
        } catch (PulsarClientException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws PulsarClientException {

        // 创建生产者
        Producer<byte[]> producer = PULSAR_CLIENT.newProducer().topic("test-topic-1").create();
        // 同步发送消息
        MessageId messageId = producer.send("同步发送的消息".getBytes(StandardCharsets.UTF_8));
        System.out.println("消息发送成功,消息id: " + messageId);

        // 创建消费者
        Consumer<byte[]> consumer = PULSAR_CLIENT.newConsumer().topic("test-topic-1")
                .subscriptionName("test-subscription-1").subscribe();
        //获取一个消息内容
        Message<byte[]> message = consumer.receive();
        System.out.println("接收的消息内容: " + new String(message.getData()));
        // 确认消费成功,以便pulsar删除消费成功的消息
        consumer.acknowledge(message);

        //关闭客户端
        producer.close();
        consumer.close();
        PULSAR_CLIENT.close();
    }
}

 

陈睿mikechen

10年+大厂架构经验,资深技术专家,就职于阿里巴巴、淘宝、百度等一线互联网大厂。

关注「mikechen」公众号,获取更多技术干货!

后台回复面试即可获取《史上最全阿里Java面试题总结》,后台回复架构,即可获取《阿里架构师进阶专题全部合集

评论交流
    说说你的看法