pulsar消息隊列,聊聊 Pulsar: Pulsar 的核心概念與基礎架構

 2023-12-25 阅读 32 评论 0

摘要:一、Pulsar 介紹 Apache Pulsar 是 Apache 軟件基金會的頂級項目,是下一代云原生分布式消息流平臺,集消息、存儲、輕量化函數式計算為一體,采用計算與存儲分離架構設計,支持多租戶、持久化存儲、多機房跨區域數據復制,具有強一致性、高吞吐

一、Pulsar 介紹

Apache Pulsar 是 Apache 軟件基金會的頂級項目,是下一代云原生分布式消息流平臺,集消息、存儲、輕量化函數式計算為一體,采用計算與存儲分離架構設計,支持多租戶、持久化存儲、多機房跨區域數據復制,具有強一致性、高吞吐、低延時及高可擴展性等流數據存儲特性。

Pulsar 的關鍵特性如下:

  • 是下一代云原生分布式消息流平臺。
  • Pulsar 的單個實例原生支持多個集群,可跨機房在集群間無縫地完成消息復制。
  • 極低的發布延遲和端到端延遲。
  • 可無縫擴展到超過一百萬個 topic。
  • 簡單的客戶端 API,支持 Java、Go、Python 和 C++。
  • 主題的多種訂閱模式(獨占、共享和故障轉移)。
  • 通過 Apache BookKeeper 提供的持久化消息存儲機制保證消息傳遞 。
  • 由輕量級的 serverless 計算框架 Pulsar Functions 實現流原生的數據處理。
  • 基于 Pulsar Functions 的 serverless connector 框架 Pulsar IO 使得數據更易移入、移出 Apache Pulsar。
  • 分層式存儲可在數據陳舊時,將數據從熱存儲卸載到冷/長期存儲(如S3、GCS)中。

二、什么是云原生

既然說 Pulsar 是下一代云原生分布式消息流平臺,那我們得知道什么是云原生吧。

云原生的概念是 2013 年 Matt Stine 提出的,到目前為止,云原生的概念發生了多次變更,目前最新對云原生的定義為:DevOps + 持續交付 + 微服務 + 容器。

pulsar消息隊列?而符合云原生架構的應用程序是:采用開源堆棧(k8s + docker)進行容器化,基于微服務架構提高靈活性和可維護性,借助敏捷方法、DevOps 支持持續迭代和運維自動化,利用云平臺設施實現彈性伸縮、動態調度、優化資源利用率。

在這里插入圖片描述

三、核心概念

3.1 Messages(消息)

ComponentDescription
Value / data payload消息攜帶的數據,所有 Pulsar 的消息攜帶原始 bytes,但是消息數據也需要遵循數據 schemas。
Key消息可以被 Key 打標簽。這可以對 topic 壓縮之類的事情起作用。
Properties可選的,用戶定義屬性的 key/value map。
Producer name生產消息的 producer 的名稱(producer 被自動賦予默認名稱,但你也可以自己指定。)
Sequence ID在 topic 中,每個 Pulsar 消息屬于一個有序的序列。消息的 sequence ID 是它在序列中的次序。
Publish time消息發布的時間戳
Event time可選的時間戳,應用可以附在消息上,代表某個事件發生的時間,例如,消息被處理時。如果沒有明確的設置,那么 event time 為0。
TypedMessageBuilder它用于構造消息。您可以使用TypedMessageBuilder設置消息屬性,比如消息鍵、消息值。設置TypedMessageBuilder時,將鍵設置為字符串。如果您將鍵設置為其他類型,例如,AVRO對象,則鍵將作為字節發送,并且很難從消費者處取回AVRO對象。

消息的默認大小為 5 MB,可以通過以下方式配置消息的最大大小。

  • broker.conf
    # The max size of a message (in bytes).
    maxMessageSize=5242880
    
  • bookkeeper.conf
    # The max size of the netty frame (in bytes). Any messages received larger than this value are rejected. The default value is 5 MB.
    nettyMaxFrameSizeBytes=5253120
    

3.2 Producers(生產者)

生產者是關聯到 topic 的程序,它發布消息到 Pulsar 的 broker 上。

3.2.1 Send modes(發送模式)

producer 可以以同步或者異步的方式發布消息到 broker。

ModeDescription
異步發送發送消息后,producer等待broker的確認。如果沒有收到確認,producer會認為發送失敗。
同步發送producer 將會把消息放入阻塞隊列,然后馬上返回。客戶端類庫將會在背后把消息發送給 broker。如果隊列滿了,根據傳給 producer 的參數,producer 可能阻塞或者直接返回失敗。

3.2.2 Access mode(訪問模式)

pulsar源碼是什么語言開發。你可以為生產者提供不同類型的主題訪問模式。

Access modeDescription
Shared(共享)多個生產者可以發布一個主題,這是默認設置。
Exclusive(獨占)一個主題只能由一個生產者發布。如果已經有生產者連接,其他生產者試圖發布該主題立即得到錯誤。如果“老”生產者與 broker 發生網絡分區,“老”生產者將被驅逐,“新”生產者將被選為下一個唯一的生產者。
WaitForExclusive(獨占等待)如果已經有一個生產者連接,生產者的創建是未決的(而不是超時),直到生產者獲得獨占訪問。成功成為排他性的生產者被視為領導者。因此,如果您想為您的應用程序實現 leader 選舉方案,您可以使用這種訪問模式。

3.2.3 Compression(壓縮)

你可以壓縮生產者在傳輸期間發布的消息。Pulsar 目前支持以下類型的壓縮:

  • LZ4
  • ZLIB
  • ZSTD
  • SNAPPY

3.2.4 Batching(批處理)

如果批處理開啟,producer 將會累積一批消息,然后通過一次請求發送出去。批處理的大小取決于最大的消息數量及最大的發布延遲。

3.2.5 Chunking(分塊)

  • 批處理和分塊不能同時啟用。要啟用分塊,必須提前禁用批處理。
  • Chunking 只支持持久化的主題。
  • Chunking 僅支持 exclusive 和 failover 訂閱模式。
3.2.5.1 處理一個 producer 和一個訂閱 consumer 的分塊消息

如下圖所示,當生產者向主題發送一批大的分塊消息和普通的非分塊消息時。 假設生產者發送的消息為 M1,M1 有三個分塊 M1-C1,M1-C2 和 M1-C3。 這個 broker 在其管理的 ledger 里面保存所有的三個塊消息,然后以相同的順序分發給消費者(獨占/災備模式)。 消費者將在內存緩存所有的塊消息,直到收到所有的消息塊。將這些消息合并成為原始的消息 M1,發送給處理進程。
在這里插入圖片描述

3.2.5.2 處理多個 producer 和一個訂閱 consumer 的分塊消息

當多個生產者發布塊消息到單個主題,這個 Broker 在同一個 Ledger 里面保存來自不同生產者的所有塊消息。 如下所示,生產者1發布的消息 M1,M1 由 M1-C1, M1-C2 和 M1-C3 三個塊組成。 生產者2發布的消息 M2,M2 由 M2-C1, M2-C2 和 M2-C3 三個塊組成。 這些特定消息的所有分塊是順序排列的,但是其在 ledger 里面可能不是連續的。 這種方式會給消費者帶來一定的內存負擔。因為消費者會為每個大消息在內存開辟一塊緩沖區,以便將所有的塊消息合并為原始的大消息。
在這里插入圖片描述

3.3 Consumers(消費者)

IT架構?消費者通過訂閱關聯到主題,然后接收消息的程序。

3.3.1 Receive modes(接收模式)

消息可以通過同步或者異步的方式從 broker 接收。

ModeDescription
同步接收同步接收將會阻塞,直到消息可用。
異步接收異步接收立即返回 future 值,例如 java 中的 CompletableFuture,一旦新消息可用,它即刻完成。

3.3.2 Listeners(監聽)

客戶端類庫提供了它們對于 consumer 的監聽實現。舉一個 Java 客戶端的例子,它提供了 MessageListener 接口。在這個接口中,一旦接受到新的消息,received 方法將被調用。

3.3.3 Acknowledgement(確認)

消費者成功處理了消息,需要發送確認給 broker,以讓 broker 丟掉這條消息(否則它將存儲著此消息)。

消息的確認可以一個接一個,也可以累積一起。累積確認時,消費者只需要確認最后一條它收到的消息。所有之前(包含此條)的消息,都不會被重新發給那個消費者。

芯片架構有多少種,累積消息確認不能用于 shared 訂閱模式,因為 shared 訂閱為同一個訂閱引入了多個消費者。

3.4 Topics(主題)

和其它的發布訂閱系統一樣,Pulsar 中的 topic 是帶有名稱的通道,用來從 producer 到 consumer 傳輸消息。Topic 的名稱是符合良好結構的 URL。

{persistent|non-persistent}://tenant/namespace/topic
Topic name componentDescription
persistent / non-persistent定義了 topic 類型,Pulsar 支持兩種不同 topic:持久和非持久(默認是持久類型,如果你沒有指明類型,topic 將會是持久類型)。持久 topic 的所有消息都會保存在硬盤上(這意味著多塊硬盤,除非是單機模式的 broker),反之,非持久 topic 的數據不會存儲到硬盤上。
tenant實例中 topic 的租戶。tenant 是 Pulsar 多租戶的基本要素。可以被跨集群的傳播。
namespacetopic 的管理單元,相關 topic 組的管理機制。大多數的 topic 配置在 namespace 層面生效。每個 tenant 可以有多個 namespace。
topic主題名稱的最后組成部分,topic 的名稱很自由,沒有什么特殊的含義。

3.4.1 Partitioned topics(分區主題)

普通主題僅由單個 broker 提供服務,這限制了主題的最大吞吐量。分區主題是由多個 broker 處理的一種特殊類型的主題,因此允許更高的吞吐量。

分區的主題實際上實現為 N 個內部主題,其中 N 是分區的數量。當將消息發布到分區主題時,每個消息都被路由到幾個 broker 中的一個。分區在 broker 間的分布由 Pulsar 自動處理。
在這里插入圖片描述
如上圖,Topic1 主題有 5 個分區(P0 到 P4),劃分在 3 個 broker 上。因為分區比 broker 多,前兩個 broker 分別處理兩個分區,而第三個 broker 只處理一個分區(同樣,Pulsar 自動處理分區的分布)。

此主題的消息將廣播給兩個消費者。路由模式決定將每個消息發布到哪個分區,而訂閱模式決定將哪些消息發送到哪個消費者。

Java產品架構概念。在大多數情況下,可以分別決定路由和訂閱模式。通常,吞吐量問題應該指導分區/路由決策,而訂閱決策應該根據應用程序語義進行指導。

就訂閱模式的工作方式而言,分區主題和普通主題之間沒有區別,因為分區僅決定消息由生產者發布和由消費者處理和確認之間發生了什么。

分區主題需要通過管理 API 顯式創建,分區的數量可以在創建主題時指定。

3.4.1.1 Routing modes(路由模式)

當發布消息到分區 topic,你必須要指定路由模式。路由模式決定了每條消息被發布到的分區(其實是內部主題)。

下面是三種默認可用的路由模式:

ModeDescription
RoundRobinPartitionmessage 無 key 則輪詢,有 key 則 hash(key) 指定分區。(默認模式)
SinglePartitionmessage 無 key,producer 將會隨機選擇一個分區,把所有的消息發往該分區。如果 message 指定了 key,分區的 producer 會把 key 做 hash,然后分配消息到指定的分區。
CustomPartition使用自定義消息路由實現,可以決定特定的消息進入指定的分區。
3.4.1.2 Ordering guarantee(順序保證)

cpu架構?消息的順序與路由模式和消息的 key 有關:

Ordering guaranteeDescriptionRouting Mode and Key
Per-key-partition(按 key 分區)具有相同 key 的所有消息將被按順序放置在同一個分區中。使用 SinglePartition 或 RoundRobinPartition 模式,Key 由每個消息提供。
Per-producer(按 producer)來自同一生產者的所有消息將是有序的。使用 SinglePartition 模式,并且沒有為每個消息提供 Key。
3.4.1.3 Hashing scheme(哈希方案)

HashingScheme 是一個 enum,表示在選擇要為特定消息使用的分區時可用的標準哈希函數集。

有兩種類型的標準哈希函數可用:JavaStringHashMurmur3_32Hash。生產者的默認哈希函數是 JavaStringHash。請注意,當生產者可以來自不同的多語言客戶端時,JavaStringHash 是沒有用的,在這個用例下,建議使用 Murmur3_32Hash

3.4.2 persistent/Non-persistent topics(持久/非持久主題)

默認情況下, Pulsar 會保存所有沒確認的消息到 BookKeeper 中。持久 Topic 的消息在 Broker 重啟或者 Consumer 出現問題時保存下來。

除了持久 Topic , Pulsar 也支持非持久 Topic 。這些 Topic 的消息只存在于內存中,不會存儲到磁盤。

AMD架構、因為 Broker 不會對消息進行持久化存儲,當 Producer 將消息發送到 Broker 時, Broker 可以立即將 ack 返回給 Producer ,所以非持久 Topic 的消息傳遞會比持久 Topic 的消息傳遞更快一些。相對的,當 Broker 因為一些原因宕機、重啟后,非持久 Topic 的消息都會消失,訂閱者將無法收到這些消息。

3.4.3 Dead letter topic(死信主題)

死信主題允許你在用戶無法成功消費某些消息時使用新消息。在這種機制中,無法使用的消息存儲在單獨的主題中,稱為死信主題。你可以決定如何處理死信主題中的消息。

下面的例子展示了如何在 Java 客戶端中使用默認的死信主題:

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES).topic(topic).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build()).subscribe();

默認的死信主題格式:

<topicname>-<subscriptionname>-DLQ

如果你想指定死信主題的名稱,請使用下面的 Java 客戶端示例:

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES).topic(topic).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).deadLetterTopic("your-topic-name").build()).subscribe();        

工程體系架構概念?死信主題依賴于消息的重新投遞。由于確認超時或否認確認,消息將被重新發送。如果要對消息使用否定確認,請確保在確認超時之前對其進行否定確認。
目前,在共享和 Key_Shared 訂閱模式下啟用了死信主題。

3.4.4 Retry letter topic(重試主題)

對于許多在線業務系統,由于業務邏輯處理中出現異常,消息會被重復消費。若要配置重新消費失敗消息的延遲時間,你可以配置生產者將消息發送到業務主題和重試主題,并在消費者上啟用自動重試。當在消費者上啟用自動重試時,如果消息沒有被消費,則消息將存儲在重試主題中,因此消費者在指定的延遲時間后將自動接收來自重試主題的失敗消息。

默認情況下,不啟用自動重試功能。你可以將 enableRetry 設置為 true,以啟用消費者的自動重試。

下面來看個如何使用從重試主題來消費消息的示例:

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES).topic(topic).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).enableRetry(true).receiverQueueSize(100).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry").build()).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();

3.5 Subscriptions(訂閱模式)

Pulsar 支持 exclusive(獨占)、failover(災備)、 shared(共享)和 key_shared(key 共享) 四種消息訂閱模式,這四種模式的示意圖如下:
在這里插入圖片描述

3.5.1 Exclusive(獨占模式)

pascal架構。獨占模式是 Pulsar 默認的消息訂閱模式,在這種模式下,只能有一個 consumer 消費消息,如果有多于一個 consumer 消費此 topic 則會出錯,消費示意圖如下:
在這里插入圖片描述

3.5.2 Failover(災備模式)

災備模式下,一個 topic 也是只有單個 consumer 消費一個訂閱關系的消息,與獨占模式不同之處在于,災備模式下,每個消費者會被排序,當前面的消費者無法連接上 broker 后,消息會由下一個消費者消費,消費示意圖如下:
在這里插入圖片描述

3.5.3 Shared(共享模式)

共享模式下,消息可被多個 consumer 同時消費,無法保證消息的順序,并且無法使用 one by one 和 cumulative 的 ack 模式,消息通過 roundrobin 的方式投遞到每一個消費者,消費示意圖如下:

在這里插入圖片描述

3.5.4 Key_Shared(Key 共享模式)

Key_Shared 模式是 Shared 模式的一種,不同的是它按 key 對消息做投遞,相同的 key 的消息會被投遞到同一個 consumer 上,消費示意圖如下:
在這里插入圖片描述

3.6 Message retention and expiry(消息保留和過期)

默認策略:

  • 立即刪除所有已經被消費者確認過的的消息;
  • 以 backlog 的形式,持久保存所有未被確認的消息;

兩個特性:

  • 消息保留讓你可以保存 consumer 確認過的消息;
  • 消息過期讓你可以給未被確認的消息設置存活時長(TTL);

在這里插入圖片描述
注:所有消息保留和過期在 namespace 層面管理。

3.7 Message deduplication(消息去重)

實現消息去重的一種方式是確保消息僅生成一次,即生產者冪等。這種方式的缺點是把消息去重的工作交由應用去做。

在 Pulsar 中, Broker 支持配置開啟消息去重,用戶不需要為了消息去重去調整 Producer 的代碼。啟用消息去重后,即使一條消息被多次發送到 Topic 上,這條消息也只會被持久化到磁盤一次。

如下圖,未開啟消息去重時, Producer 發送消息 1 到 Topic 后, Broker 會把消息 1 持久化到 BookKeeper ,當 Producer 又發送消息 1 時, Broker 會把消息 1 再一次持久化到 BookKeeper 。開啟消息去重后,當 Producer 再次發送消息 1 時, Broker 不會把消息 1 再一次持久化到磁盤。

在這里插入圖片描述

3.7.1 去重原理

Producer 對每一個發送的消息,都會采用遞增的方式生成一個唯一的 sequenceID,這個消息會放在 message 的元數據中傳遞給 Broker 。同時, Broker 也會維護一個 PendingMessage 隊列,當 Broker 返回發送成功 ack 后, Producer 會將 PendingMessage 隊列中的對應的 Sequence ID 刪除,表示 Producer 任務這個消息生產成功。Broker 會記錄針對每個 Producer 接收到的最大 Sequence ID 和已經處理完的最大 Sequence ID。

當 Broker 開啟消息去重后, Broker 會對每個消息請求進行是否去重的判斷。收到的最新的 Sequence ID 是否大于 Broker 端記錄的兩個維度的最大 Sequence ID,如果大于則不重復,如果小于或等于則消息重復。消息重復時, Broker 端會直接返回 ack,不會繼續走后續的存儲處理流程。

3.8 Delayed message delivery(消息延遲傳遞)

延時消息功能允許 Consumer 能夠在消息發送到 Topic 后過一段時間才能消費到這條消息。在這種機制中,消息在發布到 Broker 后,會被存儲在 BookKeeper 中,當到消息特定的延遲時間時,消息就會傳遞給 Consumer 。

下圖為消息延遲傳遞的機制。Broker 在存儲延遲消息的時候不會進行特殊的處理。當 Consumer 消費消息的時候,如果這條消息設置了延遲時間,則會把這條消息加入 DelayedDeliveryTracker 中,當到了指定的發送時間時,DelayedDeliveryTracker 才會把這條消息推送給消費者。
在這里插入圖片描述

注:延遲消息傳遞僅在共享訂閱模式下有效。在獨占和故障轉移訂閱模式下,將立即分派延遲的消息。

3.8.1 示例

  • Broker
    # Whether to enable the delayed delivery for messages.
    # If disabled, messages are immediately delivered and there is no tracking overhead.
    delayedDeliveryEnabled=true# Control the ticking time for the retry of delayed message delivery,
    # affecting the accuracy of the delivery time compared to the scheduled time.
    # Default is 1 second.
    delayedDeliveryTickTimeMillis=1000
    
  • Producer
    // message to be delivered at the configured delay interval
    producer.newMessage().deliverAfter(3L, TimeUnit.Minute).value("Hello Pulsar!").send();
    

3.8.2 消息延遲傳遞原理

在 Pulsar 中,可以通過兩種方式實現延遲投遞。分別為 deliverAfter 和 deliverAt。

deliverAfter 可以指定具體的延遲時間戳,deliverAt 可以指定消息在多長時間后消費。兩種方式本質時一樣的,deliverAt 方式下,客戶端會計算出具體的延遲時間戳發送給 Broker 。

DelayedDeliveryTracker 會記錄所有需要延遲投遞的消息的 index 。index 由 Timestamp、 Ledger ID、 Entry ID 三部分組成,其中 Ledger ID 和 Entry ID 用于定位該消息,Timestamp 除了記錄需要投遞的時間,還用于延遲優先級隊列排序。DelayedDeliveryTracker 會根據延遲時間對消息進行排序,延遲時間最短的放在前面。當 Consumer 在消費時,如果有到期的消息需要消費,則根據 DelayedDeliveryTracker index 的 Ledger ID、 Entry ID 找到對應的消息進行消費。如下圖, Producer 依次投遞 m1、m2、m3、m4、m5 這五條消息,m2 沒有設置延遲時間,所以會被 Consumer 直接消費。m1、m3、m4、m5 在 DelayedDeliveryTracker 會根據延遲時間進行排序,并在到達延遲時間時,依次被 Consumer 進行消費。
在這里插入圖片描述

3.9 多租戶模式

Pulsar 的云原生架構天然支持多租戶,每個租戶下還支持多 Namespace(命名空間),非常適合做共享大集群,方便維護。此外,Pulsar 天然支持租戶之間資源的邏輯隔離,只要用戶的運營管控后臺和監控足夠強大,便可以做到動態隔離大流量租戶,防止互相干擾,還能實現大集群資源的充分利用。

  • Tenant(租戶)和 Namespace(命名空間)是 Pulsar 支持多租戶的兩個核心概念。
  • 在租戶級別,Pulsar 為特定的租戶預留合適的存儲空間、應用授權和認證機制。
  • 在命名空間級別,Pulsar 有一系列的配置策略(Policy),包括存儲配額、流控、消息過期策略和命名空間之間的隔離策略。

Pulsar 的多租戶性質主要體現在 Topic 的 URL 中,結構如下:

persistent://tenant/namespace/topic

租戶、命名空間、topic 更直觀的關系可以看下圖:
在這里插入圖片描述

3.10 統一消息模型

  • Pulsar 做了隊列模型與流模型的統一,在 Topic 級別只需保存一份數據,同一份數據可多次消費。以流式、隊列等方式計算不同的訂閱模型,大大的提升了靈活度。
  • 同時 Pulsar 通過事務采用 Exactly-Once(剛好一次)的語義,在進行消息傳輸過程中,可以確保數據不丟不重。

在這里插入圖片描述
在這里插入圖片描述

3.11 Segmented Streams(分片流)

  • Pulsar 將無界的數據看作是分片的流,分片分散存儲在分層存儲(tiered storage)、BookKeeper 集群和 Broker 節點上,而對外提供一個統一的、無界數據的視圖。
  • 不需要用戶顯式遷移數據,對用戶無感知,減少存儲成本并保持近似無限的存儲。

在這里插入圖片描述

3.12 Geo Replication(跨地域復制)

  • Pulsar 中的跨地域復制是將 Pulsar 中持久化的消息在多個集群間備份。
  • 在 Pulsar 2.4.0 中新增了復制訂閱模式(Replicated-subscriptions),在某個集群失效情況下,該功能可以在其他集群恢復消費者的消費狀態, 從而達到熱備模式下消息服務的高可用。

在這里插入圖片描述
在這個圖中,每當 P1、P2 和 P3 生產者分別將消息發布到 Cluster-A、Cluster-B 和 Cluster-C 上的 T1 主題時,這些消息就會立即跨集群復制。一旦消息被復制,C1 和 C2 消費者就可以從他們各自的集群中消費這些消息。

沒有跨地域復制,C1 和 C2 消費者就不能使用 P3 生產者發布的消息。

四、云原生架構

4.1 Pulsar 集群架構

單個 Pulsar 集群由以下三部分組成:

  • 一個或者多個 broker 負責處理和負載均衡 producer 發出的消息,并將這些消息分派給 consumer;Broker 與 Pulsar 配置存儲交互來處理相應的任務,并將消息存儲在 BookKeeper 實例中(又稱 bookies);Broker 依賴 ZooKeeper 集群處理特定的任務,等等。
  • 包含一個或多個 bookie 的 BookKeeper 集群負責消息的持久化存儲。
  • 一個 ZooKeeper 集群,用來處理多個 Pulsar 集群之間的協調任務。

在這里插入圖片描述
Pulsar 分理出 Broker 與 Bookie 兩層架構,Broker 為無狀態服務,用于發布和消費消息,而 BookKeeper 專注于存儲。Pulsar 存儲是分片的,這種架構可以避免擴容時受限制,實現數據的獨立擴展和快速恢復。

4.2 Brokers

Pulsar 的 broker 是一個無狀態組件,主要負責運行另外的兩個組件:

  • 一個 HTTP 服務器(Service discovery),它暴露了 REST 系統管理接口以及在生產者和消費者之間進行 Topic 查找的 API。
  • 一個調度分發器(Dispatcher),它是異步的 TCP 服務器,通過自定義二進制協議應用于所有相關的數據傳輸。

出于性能考慮,消息通常從 Managed Ledger 緩存中分派出去,除非積壓超過緩存大小。如果積壓的消息對于緩存來說太大了,則 Broker 將開始從 BookKeeper 那里讀取 Entries(Entry 同樣是 BookKeeper 中的概念,相當于一條記錄)。

最后,為了支持全局 Topic 異地復制,Broker 會控制 Replicators 追蹤本地發布的條目,并把這些條目用Java客戶端重新發布到其他區域。

4.3 ZooKeeper 元數據存儲

Pulsar 使用 Apache ZooKeeper 進行元數據存儲、集群配置和協調。

  • 配置存儲 Quorum 存儲了租戶、命名空間和其他需要全局一致的配置項。
  • 每個集群有自己獨立的本地 ZooKeeper 保存集群內部配置和協調信息,例如 broker 負責哪幾個主題及所有權歸屬元數據、broker 負載報告,BookKeeper ledger 元數據(這個是 BookKeeper 本身所依賴的)等等。

4.4 BookKeeper 持久化存儲

Apache Pulsar 為應用程序提供有保證的信息傳遞,如果消息成功到達 broker,就認為其預期到達了目的地。

為了提供這種保證,未確認送達的消息需要持久化存儲直到它們被確認送達。這種消息傳遞模式通常稱為持久消息傳遞,在 Pulsar 內部,所有消息都被保存并同步 N 份,例如,2 個服務器保存四份,每個服務器上面都有鏡像的 RAID 存儲。

Pulsar 用 Apache BookKeeper 作為持久化存儲。BookKeeper 是一個分布式的預寫日志(WAL)系統,有如下幾個特性特別適合 Pulsar 的應用場景:

  • 使 Pulsar 能夠利用獨立的日志,稱為 ledgers,可以隨著時間的推移為 topic 創建多個 ledgers。
  • 它為處理順序消息提供了非常有效的存儲。
  • 保證了多系統掛掉時 ledgers 的讀取一致性。
  • 提供不同的 Bookies 之間均勻的 IO 分布的特性。
  • 它在容量和吞吐量方面都具有水平伸縮性。能夠通過增加 bookies 立即增加容量到集群中,并提升吞吐量。
  • Bookies 被設計成可以承載數千的并發讀寫的 ledgers。 使用多個磁盤設備,一個用于日志,另一個用于一般存儲,這樣 Bookies 可以將讀操作的影響和對于寫操作的延遲分隔開。

4.4.1 brokers 與 bookies 交互

下圖展示了 brokers 和 bookies 是如何交互的:
在這里插入圖片描述
相比 Kafka、RocketMQ 等 MQ,Pulsar 基于 BookKeeper 的存儲、計算分離架構,使得 Pulsar 的消息存儲可以獨立于 Broker 而擴展。

4.4.2 Ledgers

Ledger 是一個只追加的數據結構,并且只有一個寫入器,這個寫入器負責多個 BookKeeper 存儲節點(就是 Bookies)的寫入。 Ledger 的條目會被復制到多個 bookies。 Ledgers 本身有著非常簡單的語義:

  • Pulsar Broker 可以創建 ledger,添加內容到 ledger 和關閉 ledger。
  • 當一個 ledger 被關閉后,除非明確的要寫數據或者是因為寫入器掛掉導致 ledger 關閉,這個 ledger 只會以只讀模式打開。
  • 最后,當 ledger 中的條目不再有用的時候,整個 legder 可以被刪除(ledger 分布是跨 Bookies 的)。

4.5 Pulsar 代理

Pulsar 客戶端和 Pulsar 集群交互的一種方式就是直連 Pulsar brokers 。 然而,在某些情況下,這種直連既不可行也不可取,因為客戶端并不知道 broker 的地址。 例如在云環境或者 Kubernetes 以及其他類似的系統上面運行 Pulsar,直連 brokers 就基本上不可能了。

Pulsar proxy 為這個問題提供了一個解決方案,為所有的 broker 提供了一個網關,如果選擇運行了Pulsar Proxy,所有的客戶都會通過這個代理而不是直接與 brokers 通信。

4.6 Service discovery(服務發現)

連接到 Pulsar brokers 的客戶端需要能夠使用單個 URL 與整個 Pulsar 實例通信。

你可以使用自己的服務發現系統。如果你使用自己的系統,只有一個要求:當客戶端端點執行 HTTP 請求,比如 http://pulsar.us-west.example.com:8080,客戶端需要被重定向到一些活躍在集群所需的 broker,無論通過 DNS、HTTP 或 IP 重定向或其他手段。
在這里插入圖片描述

五、Pulsar 相關組件

5.1 層級存儲

  • Infinite Stream:以流的方式永久保存原始數據
  • 分區的容量不再受限制
  • 充分利用云存儲或現有的廉價存儲(例如 HDFS)
  • 數據統一表征:客戶端無需關心數據究竟存儲在哪里

在這里插入圖片描述
在這里插入圖片描述
分層存儲的卸載機制就充分利用了這種面向分片式架構(segment oriented architecture)。 當需要開始卸載數據時,消息日志中的分片就依次被同步至分層存儲中, 直到消息日志中所有的分片(除了當前分片之外)都已被寫入分層存儲后。

默認情況下寫入到 BookKeeper 的數據會復制三個物理機副本。 然而,一旦分片被封存在 BookKeeper 中后,該分片就不可更改并且可以復制到歸檔存儲中去。 長期存儲可以達到節省存儲費用的目的。通過使用 Reed-Solomon error correction 機制,還可減少物理備份數量。

5.2 Pulsar IO(Connector)連接器

  • Pulsar IO 分為輸入(Input)和輸出(Output)兩個模塊,輸入代表數據從哪里來,通過 Source 實現數據輸入。輸出代表數據要往哪里去,通過 Sink 實現數據輸出。
  • Pulsar 提出了 IO (也稱為 Pulsar Connector),用于解決 Pulsar 與周邊系統的集成問題,幫助用戶高效完成工作。
  • 目前 Pulsar IO 支持非常多的連接集成操作:例如 HDFS、Spark、Flink、Flume、ES、HBase等。

在這里插入圖片描述

5.3 Pulsar Functions(輕量級計算框架)

  • Pulsar Functions 是一個輕量級的計算框架,可以給用戶提供一個部署簡單、運維簡單、API 簡單的 FASS(Function as a service)平臺。Pulsar Functions 提供基于事件的服務,支持有狀態與無狀態的多語言計算,是對復雜的大數據處理框架的有力補充。
  • Pulsar Functions 的設計靈感來自于 Apache Storm、Apache Heron、Apache Flink 這樣的流處理引擎,Pulsar Functions 將會拓展 Pulsar 和整個消息領域的未來。使用 Pulsar Functions,用戶可以輕松地部署和管理 function,通過 function 從 Pulsar topic 讀取數據或者生產新數據到 Pulsar topic。

在這里插入圖片描述

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://808629.com/196801.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 86后生记录生活 Inc. 保留所有权利。

底部版权信息