Apache Pulsar 是 Apache 軟件基金會的頂級項目,是下一代云原生分布式消息流平臺,集消息、存儲、輕量化函數式計算為一體,采用計算與存儲分離架構設計,支持多租戶、持久化存儲、多機房跨區域數據復制,具有強一致性、高吞吐、低延時及高可擴展性等流數據存儲特性。
Pulsar 的關鍵特性如下:
既然說 Pulsar 是下一代云原生分布式消息流平臺,那我們得知道什么是云原生吧。
云原生的概念是 2013 年 Matt Stine 提出的,到目前為止,云原生的概念發生了多次變更,目前最新對云原生的定義為:DevOps + 持續交付 + 微服務 + 容器。
pulsar消息隊列?而符合云原生架構的應用程序是:采用開源堆棧(k8s + docker)進行容器化,基于微服務架構提高靈活性和可維護性,借助敏捷方法、DevOps 支持持續迭代和運維自動化,利用云平臺設施實現彈性伸縮、動態調度、優化資源利用率。
Component | Description |
---|---|
Value / data payload | 消息攜帶的數據,所有 Pulsar 的消息攜帶原始 bytes,但是消息數據也需要遵循數據 schemas。 |
Key | 消息可以被 Key 打標簽。這可以對 topic 壓縮之類的事情起作用。 |
Properties | 可選的,用戶定義屬性的 key/value map。 |
Producer name | 生產消息的 producer 的名稱(producer 被自動賦予默認名稱,但你也可以自己指定。) |
Sequence ID | 在 topic 中,每個 Pulsar 消息屬于一個有序的序列。消息的 sequence ID 是它在序列中的次序。 |
Publish time | 消息發布的時間戳 |
Event time | 可選的時間戳,應用可以附在消息上,代表某個事件發生的時間,例如,消息被處理時。如果沒有明確的設置,那么 event time 為0。 |
TypedMessageBuilder | 它用于構造消息。您可以使用TypedMessageBuilder設置消息屬性,比如消息鍵、消息值。設置TypedMessageBuilder時,將鍵設置為字符串。如果您將鍵設置為其他類型,例如,AVRO對象,則鍵將作為字節發送,并且很難從消費者處取回AVRO對象。 |
消息的默認大小為 5 MB,可以通過以下方式配置消息的最大大小。
# The max size of a message (in bytes).
maxMessageSize=5242880
# The max size of the netty frame (in bytes). Any messages received larger than this value are rejected. The default value is 5 MB.
nettyMaxFrameSizeBytes=5253120
生產者是關聯到 topic 的程序,它發布消息到 Pulsar 的 broker 上。
producer 可以以同步或者異步的方式發布消息到 broker。
Mode | Description |
---|---|
異步發送 | 發送消息后,producer等待broker的確認。如果沒有收到確認,producer會認為發送失敗。 |
同步發送 | producer 將會把消息放入阻塞隊列,然后馬上返回。客戶端類庫將會在背后把消息發送給 broker。如果隊列滿了,根據傳給 producer 的參數,producer 可能阻塞或者直接返回失敗。 |
pulsar源碼是什么語言開發。你可以為生產者提供不同類型的主題訪問模式。
Access mode | Description |
---|---|
Shared(共享) | 多個生產者可以發布一個主題,這是默認設置。 |
Exclusive(獨占) | 一個主題只能由一個生產者發布。如果已經有生產者連接,其他生產者試圖發布該主題立即得到錯誤。如果“老”生產者與 broker 發生網絡分區,“老”生產者將被驅逐,“新”生產者將被選為下一個唯一的生產者。 |
WaitForExclusive(獨占等待) | 如果已經有一個生產者連接,生產者的創建是未決的(而不是超時),直到生產者獲得獨占訪問。成功成為排他性的生產者被視為領導者。因此,如果您想為您的應用程序實現 leader 選舉方案,您可以使用這種訪問模式。 |
你可以壓縮生產者在傳輸期間發布的消息。Pulsar 目前支持以下類型的壓縮:
如果批處理開啟,producer 將會累積一批消息,然后通過一次請求發送出去。批處理的大小取決于最大的消息數量及最大的發布延遲。
如下圖所示,當生產者向主題發送一批大的分塊消息和普通的非分塊消息時。 假設生產者發送的消息為 M1,M1 有三個分塊 M1-C1,M1-C2 和 M1-C3。 這個 broker 在其管理的 ledger 里面保存所有的三個塊消息,然后以相同的順序分發給消費者(獨占/災備模式)。 消費者將在內存緩存所有的塊消息,直到收到所有的消息塊。將這些消息合并成為原始的消息 M1,發送給處理進程。
當多個生產者發布塊消息到單個主題,這個 Broker 在同一個 Ledger 里面保存來自不同生產者的所有塊消息。 如下所示,生產者1發布的消息 M1,M1 由 M1-C1, M1-C2 和 M1-C3 三個塊組成。 生產者2發布的消息 M2,M2 由 M2-C1, M2-C2 和 M2-C3 三個塊組成。 這些特定消息的所有分塊是順序排列的,但是其在 ledger 里面可能不是連續的。 這種方式會給消費者帶來一定的內存負擔。因為消費者會為每個大消息在內存開辟一塊緩沖區,以便將所有的塊消息合并為原始的大消息。
IT架構?消費者通過訂閱關聯到主題,然后接收消息的程序。
消息可以通過同步或者異步的方式從 broker 接收。
Mode | Description |
---|---|
同步接收 | 同步接收將會阻塞,直到消息可用。 |
異步接收 | 異步接收立即返回 future 值,例如 java 中的 CompletableFuture,一旦新消息可用,它即刻完成。 |
客戶端類庫提供了它們對于 consumer 的監聽實現。舉一個 Java 客戶端的例子,它提供了 MessageListener 接口。在這個接口中,一旦接受到新的消息,received 方法將被調用。
消費者成功處理了消息,需要發送確認給 broker,以讓 broker 丟掉這條消息(否則它將存儲著此消息)。
消息的確認可以一個接一個,也可以累積一起。累積確認時,消費者只需要確認最后一條它收到的消息。所有之前(包含此條)的消息,都不會被重新發給那個消費者。
芯片架構有多少種,累積消息確認不能用于 shared 訂閱模式,因為 shared 訂閱為同一個訂閱引入了多個消費者。
和其它的發布訂閱系統一樣,Pulsar 中的 topic 是帶有名稱的通道,用來從 producer 到 consumer 傳輸消息。Topic 的名稱是符合良好結構的 URL。
{persistent|non-persistent}://tenant/namespace/topic
Topic name component | Description |
---|---|
persistent / non-persistent | 定義了 topic 類型,Pulsar 支持兩種不同 topic:持久和非持久(默認是持久類型,如果你沒有指明類型,topic 將會是持久類型)。持久 topic 的所有消息都會保存在硬盤上(這意味著多塊硬盤,除非是單機模式的 broker),反之,非持久 topic 的數據不會存儲到硬盤上。 |
tenant | 實例中 topic 的租戶。tenant 是 Pulsar 多租戶的基本要素。可以被跨集群的傳播。 |
namespace | topic 的管理單元,相關 topic 組的管理機制。大多數的 topic 配置在 namespace 層面生效。每個 tenant 可以有多個 namespace。 |
topic | 主題名稱的最后組成部分,topic 的名稱很自由,沒有什么特殊的含義。 |
普通主題僅由單個 broker 提供服務,這限制了主題的最大吞吐量。分區主題是由多個 broker 處理的一種特殊類型的主題,因此允許更高的吞吐量。
分區的主題實際上實現為 N 個內部主題,其中 N 是分區的數量。當將消息發布到分區主題時,每個消息都被路由到幾個 broker 中的一個。分區在 broker 間的分布由 Pulsar 自動處理。
如上圖,Topic1 主題有 5 個分區(P0 到 P4),劃分在 3 個 broker 上。因為分區比 broker 多,前兩個 broker 分別處理兩個分區,而第三個 broker 只處理一個分區(同樣,Pulsar 自動處理分區的分布)。
此主題的消息將廣播給兩個消費者。路由模式決定將每個消息發布到哪個分區,而訂閱模式決定將哪些消息發送到哪個消費者。
Java產品架構概念。在大多數情況下,可以分別決定路由和訂閱模式。通常,吞吐量問題應該指導分區/路由決策,而訂閱決策應該根據應用程序語義進行指導。
就訂閱模式的工作方式而言,分區主題和普通主題之間沒有區別,因為分區僅決定消息由生產者發布和由消費者處理和確認之間發生了什么。
分區主題需要通過管理 API 顯式創建,分區的數量可以在創建主題時指定。
當發布消息到分區 topic,你必須要指定路由模式。路由模式決定了每條消息被發布到的分區(其實是內部主題)。
下面是三種默認可用的路由模式:
Mode | Description |
---|---|
RoundRobinPartition | message 無 key 則輪詢,有 key 則 hash(key) 指定分區。(默認模式) |
SinglePartition | message 無 key,producer 將會隨機選擇一個分區,把所有的消息發往該分區。如果 message 指定了 key,分區的 producer 會把 key 做 hash,然后分配消息到指定的分區。 |
CustomPartition | 使用自定義消息路由實現,可以決定特定的消息進入指定的分區。 |
cpu架構?消息的順序與路由模式和消息的 key 有關:
Ordering guarantee | Description | Routing Mode and Key |
---|---|---|
Per-key-partition(按 key 分區) | 具有相同 key 的所有消息將被按順序放置在同一個分區中。 | 使用 SinglePartition 或 RoundRobinPartition 模式,Key 由每個消息提供。 |
Per-producer(按 producer) | 來自同一生產者的所有消息將是有序的。 | 使用 SinglePartition 模式,并且沒有為每個消息提供 Key。 |
HashingScheme 是一個 enum,表示在選擇要為特定消息使用的分區時可用的標準哈希函數集。
有兩種類型的標準哈希函數可用:JavaStringHash
和 Murmur3_32Hash
。生產者的默認哈希函數是 JavaStringHash
。請注意,當生產者可以來自不同的多語言客戶端時,JavaStringHash
是沒有用的,在這個用例下,建議使用 Murmur3_32Hash
。
默認情況下, Pulsar 會保存所有沒確認的消息到 BookKeeper 中。持久 Topic 的消息在 Broker 重啟或者 Consumer 出現問題時保存下來。
除了持久 Topic , Pulsar 也支持非持久 Topic 。這些 Topic 的消息只存在于內存中,不會存儲到磁盤。
AMD架構、因為 Broker 不會對消息進行持久化存儲,當 Producer 將消息發送到 Broker 時, Broker 可以立即將 ack 返回給 Producer ,所以非持久 Topic 的消息傳遞會比持久 Topic 的消息傳遞更快一些。相對的,當 Broker 因為一些原因宕機、重啟后,非持久 Topic 的消息都會消失,訂閱者將無法收到這些消息。
死信主題允許你在用戶無法成功消費某些消息時使用新消息。在這種機制中,無法使用的消息存儲在單獨的主題中,稱為死信主題。你可以決定如何處理死信主題中的消息。
下面的例子展示了如何在 Java 客戶端中使用默認的死信主題:
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES).topic(topic).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build()).subscribe();
默認的死信主題格式:
<topicname>-<subscriptionname>-DLQ
如果你想指定死信主題的名稱,請使用下面的 Java 客戶端示例:
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES).topic(topic).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).deadLetterTopic("your-topic-name").build()).subscribe();
工程體系架構概念?死信主題依賴于消息的重新投遞。由于確認超時或否認確認,消息將被重新發送。如果要對消息使用否定確認,請確保在確認超時之前對其進行否定確認。
目前,在共享和 Key_Shared 訂閱模式下啟用了死信主題。
對于許多在線業務系統,由于業務邏輯處理中出現異常,消息會被重復消費。若要配置重新消費失敗消息的延遲時間,你可以配置生產者將消息發送到業務主題和重試主題,并在消費者上啟用自動重試。當在消費者上啟用自動重試時,如果消息沒有被消費,則消息將存儲在重試主題中,因此消費者在指定的延遲時間后將自動接收來自重試主題的失敗消息。
默認情況下,不啟用自動重試功能。你可以將 enableRetry 設置為 true,以啟用消費者的自動重試。
下面來看個如何使用從重試主題來消費消息的示例:
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES).topic(topic).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).enableRetry(true).receiverQueueSize(100).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry").build()).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
Pulsar 支持 exclusive(獨占)、failover(災備)、 shared(共享)和 key_shared(key 共享) 四種消息訂閱模式,這四種模式的示意圖如下:
pascal架構。獨占模式是 Pulsar 默認的消息訂閱模式,在這種模式下,只能有一個 consumer 消費消息,如果有多于一個 consumer 消費此 topic 則會出錯,消費示意圖如下:
災備模式下,一個 topic 也是只有單個 consumer 消費一個訂閱關系的消息,與獨占模式不同之處在于,災備模式下,每個消費者會被排序,當前面的消費者無法連接上 broker 后,消息會由下一個消費者消費,消費示意圖如下:
共享模式下,消息可被多個 consumer 同時消費,無法保證消息的順序,并且無法使用 one by one 和 cumulative 的 ack 模式,消息通過 roundrobin 的方式投遞到每一個消費者,消費示意圖如下:
Key_Shared 模式是 Shared 模式的一種,不同的是它按 key 對消息做投遞,相同的 key 的消息會被投遞到同一個 consumer 上,消費示意圖如下:
默認策略:
兩個特性:
注:所有消息保留和過期在 namespace 層面管理。
實現消息去重的一種方式是確保消息僅生成一次,即生產者冪等。這種方式的缺點是把消息去重的工作交由應用去做。
在 Pulsar 中, Broker 支持配置開啟消息去重,用戶不需要為了消息去重去調整 Producer 的代碼。啟用消息去重后,即使一條消息被多次發送到 Topic 上,這條消息也只會被持久化到磁盤一次。
如下圖,未開啟消息去重時, Producer 發送消息 1 到 Topic 后, Broker 會把消息 1 持久化到 BookKeeper ,當 Producer 又發送消息 1 時, Broker 會把消息 1 再一次持久化到 BookKeeper 。開啟消息去重后,當 Producer 再次發送消息 1 時, Broker 不會把消息 1 再一次持久化到磁盤。
Producer 對每一個發送的消息,都會采用遞增的方式生成一個唯一的 sequenceID,這個消息會放在 message 的元數據中傳遞給 Broker 。同時, Broker 也會維護一個 PendingMessage 隊列,當 Broker 返回發送成功 ack 后, Producer 會將 PendingMessage 隊列中的對應的 Sequence ID 刪除,表示 Producer 任務這個消息生產成功。Broker 會記錄針對每個 Producer 接收到的最大 Sequence ID 和已經處理完的最大 Sequence ID。
當 Broker 開啟消息去重后, Broker 會對每個消息請求進行是否去重的判斷。收到的最新的 Sequence ID 是否大于 Broker 端記錄的兩個維度的最大 Sequence ID,如果大于則不重復,如果小于或等于則消息重復。消息重復時, Broker 端會直接返回 ack,不會繼續走后續的存儲處理流程。
延時消息功能允許 Consumer 能夠在消息發送到 Topic 后過一段時間才能消費到這條消息。在這種機制中,消息在發布到 Broker 后,會被存儲在 BookKeeper 中,當到消息特定的延遲時間時,消息就會傳遞給 Consumer 。
下圖為消息延遲傳遞的機制。Broker 在存儲延遲消息的時候不會進行特殊的處理。當 Consumer 消費消息的時候,如果這條消息設置了延遲時間,則會把這條消息加入 DelayedDeliveryTracker 中,當到了指定的發送時間時,DelayedDeliveryTracker 才會把這條消息推送給消費者。
注:延遲消息傳遞僅在共享訂閱模式下有效。在獨占和故障轉移訂閱模式下,將立即分派延遲的消息。
# Whether to enable the delayed delivery for messages.
# If disabled, messages are immediately delivered and there is no tracking overhead.
delayedDeliveryEnabled=true# Control the ticking time for the retry of delayed message delivery,
# affecting the accuracy of the delivery time compared to the scheduled time.
# Default is 1 second.
delayedDeliveryTickTimeMillis=1000
// message to be delivered at the configured delay interval
producer.newMessage().deliverAfter(3L, TimeUnit.Minute).value("Hello Pulsar!").send();
在 Pulsar 中,可以通過兩種方式實現延遲投遞。分別為 deliverAfter 和 deliverAt。
deliverAfter 可以指定具體的延遲時間戳,deliverAt 可以指定消息在多長時間后消費。兩種方式本質時一樣的,deliverAt 方式下,客戶端會計算出具體的延遲時間戳發送給 Broker 。
DelayedDeliveryTracker 會記錄所有需要延遲投遞的消息的 index 。index 由 Timestamp、 Ledger ID、 Entry ID 三部分組成,其中 Ledger ID 和 Entry ID 用于定位該消息,Timestamp 除了記錄需要投遞的時間,還用于延遲優先級隊列排序。DelayedDeliveryTracker 會根據延遲時間對消息進行排序,延遲時間最短的放在前面。當 Consumer 在消費時,如果有到期的消息需要消費,則根據 DelayedDeliveryTracker index 的 Ledger ID、 Entry ID 找到對應的消息進行消費。如下圖, Producer 依次投遞 m1、m2、m3、m4、m5 這五條消息,m2 沒有設置延遲時間,所以會被 Consumer 直接消費。m1、m3、m4、m5 在 DelayedDeliveryTracker 會根據延遲時間進行排序,并在到達延遲時間時,依次被 Consumer 進行消費。
Pulsar 的云原生架構天然支持多租戶,每個租戶下還支持多 Namespace(命名空間),非常適合做共享大集群,方便維護。此外,Pulsar 天然支持租戶之間資源的邏輯隔離,只要用戶的運營管控后臺和監控足夠強大,便可以做到動態隔離大流量租戶,防止互相干擾,還能實現大集群資源的充分利用。
Pulsar 的多租戶性質主要體現在 Topic 的 URL 中,結構如下:
persistent://tenant/namespace/topic
租戶、命名空間、topic 更直觀的關系可以看下圖:
在這個圖中,每當 P1、P2 和 P3 生產者分別將消息發布到 Cluster-A、Cluster-B 和 Cluster-C 上的 T1 主題時,這些消息就會立即跨集群復制。一旦消息被復制,C1 和 C2 消費者就可以從他們各自的集群中消費這些消息。
沒有跨地域復制,C1 和 C2 消費者就不能使用 P3 生產者發布的消息。
單個 Pulsar 集群由以下三部分組成:
Pulsar 分理出 Broker 與 Bookie 兩層架構,Broker 為無狀態服務,用于發布和消費消息,而 BookKeeper 專注于存儲。Pulsar 存儲是分片的,這種架構可以避免擴容時受限制,實現數據的獨立擴展和快速恢復。
Pulsar 的 broker 是一個無狀態組件,主要負責運行另外的兩個組件:
出于性能考慮,消息通常從 Managed Ledger 緩存中分派出去,除非積壓超過緩存大小。如果積壓的消息對于緩存來說太大了,則 Broker 將開始從 BookKeeper 那里讀取 Entries(Entry 同樣是 BookKeeper 中的概念,相當于一條記錄)。
最后,為了支持全局 Topic 異地復制,Broker 會控制 Replicators 追蹤本地發布的條目,并把這些條目用Java客戶端重新發布到其他區域。
Pulsar 使用 Apache ZooKeeper 進行元數據存儲、集群配置和協調。
Apache Pulsar 為應用程序提供有保證的信息傳遞,如果消息成功到達 broker,就認為其預期到達了目的地。
為了提供這種保證,未確認送達的消息需要持久化存儲直到它們被確認送達。這種消息傳遞模式通常稱為持久消息傳遞,在 Pulsar 內部,所有消息都被保存并同步 N 份,例如,2 個服務器保存四份,每個服務器上面都有鏡像的 RAID 存儲。
Pulsar 用 Apache BookKeeper 作為持久化存儲。BookKeeper 是一個分布式的預寫日志(WAL)系統,有如下幾個特性特別適合 Pulsar 的應用場景:
下圖展示了 brokers 和 bookies 是如何交互的:
相比 Kafka、RocketMQ 等 MQ,Pulsar 基于 BookKeeper 的存儲、計算分離架構,使得 Pulsar 的消息存儲可以獨立于 Broker 而擴展。
Ledger 是一個只追加的數據結構,并且只有一個寫入器,這個寫入器負責多個 BookKeeper 存儲節點(就是 Bookies)的寫入。 Ledger 的條目會被復制到多個 bookies。 Ledgers 本身有著非常簡單的語義:
Pulsar 客戶端和 Pulsar 集群交互的一種方式就是直連 Pulsar brokers 。 然而,在某些情況下,這種直連既不可行也不可取,因為客戶端并不知道 broker 的地址。 例如在云環境或者 Kubernetes 以及其他類似的系統上面運行 Pulsar,直連 brokers 就基本上不可能了。
Pulsar proxy 為這個問題提供了一個解決方案,為所有的 broker 提供了一個網關,如果選擇運行了Pulsar Proxy,所有的客戶都會通過這個代理而不是直接與 brokers 通信。
連接到 Pulsar brokers 的客戶端需要能夠使用單個 URL 與整個 Pulsar 實例通信。
你可以使用自己的服務發現系統。如果你使用自己的系統,只有一個要求:當客戶端端點執行 HTTP 請求,比如 http://pulsar.us-west.example.com:8080
,客戶端需要被重定向到一些活躍在集群所需的 broker,無論通過 DNS、HTTP 或 IP 重定向或其他手段。
分層存儲的卸載機制就充分利用了這種面向分片式架構(segment oriented architecture)。 當需要開始卸載數據時,消息日志中的分片就依次被同步至分層存儲中, 直到消息日志中所有的分片(除了當前分片之外)都已被寫入分層存儲后。
默認情況下寫入到 BookKeeper 的數據會復制三個物理機副本。 然而,一旦分片被封存在 BookKeeper 中后,該分片就不可更改并且可以復制到歸檔存儲中去。 長期存儲可以達到節省存儲費用的目的。通過使用 Reed-Solomon error correction
機制,還可減少物理備份數量。
版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。
工作时间:8:00-18:00
客服电话
电子邮件
admin@qq.com
扫码二维码
获取最新动态