pulsar源碼是什么語言開發,Pulsar知識整理

 2023-12-25 阅读 35 评论 0

摘要:關鍵特性 Pulsar 的單個實例原生支持多個集群,可跨機房在集群間無縫地完成消息復制。 極低的發布延遲和端到端延遲。 可無縫擴展到超過一百萬個 topic。簡單的客戶端 API,支持 Java、Go、Python 和 C++。支持多種 topic 訂閱模式(獨占訂閱、共享

關鍵特性

  • Pulsar 的單個實例原生支持多個集群,可跨機房在集群間無縫地完成消息復制。

  • 極低的發布延遲和端到端延遲。

  • 可無縫擴展到超過一百萬個 topic。
  • 簡單的客戶端 API,支持 Java、Go、Python 和 C++。
  • 支持多種 topic 訂閱模式(獨占訂閱、共享訂閱、故障轉移訂閱)。
  • 通過 Apache BookKeeper 提供的持久化消息存儲機制保證消息傳遞 。
  • 基于 Pulsar Functions 的 serverless connector 框架 Pulsar IO 使得數據更易移入、移出 Apache Pulsar。
  • 分層式存儲可在數據陳舊時,將數據從熱存儲卸載到冷/長期存儲(如S3、GCS)中。

消息

Pulsar采用發布訂閱的設計模式(pub-sub),該模式下,producer發送消息到topic,consumer訂閱topic,消費producer發布的消息,并在處理完成后向broker發送確認。

消息是pulsar的基礎“單元”,消息的組成如下:

pulsar源碼是什么語言開發、組件

描述

Value/data payload值。消息攜帶的消息體,二進制數據。
Key鍵。消息可以選擇使用key進行標記,這個與消息的順序,topic壓縮等操作有關系
Properties屬性。用戶可以自定義的一些鍵值對
Sequence ID序號。所有保存在topic中的消息,都會有一個排序的序號。類比Kafka,就是消息的Offset
Publish time發送時間。producer發送消息時的時間戳
EventTime事件時間。一個可選的時間戳。
TypeMessageBuilder這個是用來構造一個消息。

Producer生產者

發送模式

Producer 可以以同步(sync) 或 異步(async) 的方式發布消息到 broker。

  • 同步發送:producer會同步等待broker給它一個ack,如果收不到ack,就認為消息發送失敗。
  • 異步發送:Producer 將把消息放到阻塞隊列里,并立即返回。然后,客戶端將在后臺將消息異步發送給 broker。 如果隊列已滿,則調用API的時候,producer可能會立即被阻止或失敗,具體取決于傳遞給 producer 的參數。

訪問模式

Producer訪問topic的模式有分享(Shared)、獨占(Exclusive)、等待獨占(waitForExclusive):

  • 分享模式:可以同時有多個producer,向一個topic發送消息。
  • 獨占模式:只有一個producer可以向topic發送消息。如果已經有一個producer連接了該topic,其他試圖在該topic上發布消息的producer會馬上得到錯誤信息。
  • 等待獨占模式:producer連接到topic上,如果有其他producer已經連接了該topic,則新的producer連接會被掛起,直到獨占topic的連接結束(主-備,只有主能操作,主掛了,預備的連接升級為主)。

壓縮

知識梳理,Producer支持在傳輸消息的時候,對消息進行壓縮,支持的壓縮算法有:

  • LZ4
  • ZLIB
  • ZSTD
  • SNAPPY

批量處理

Pulsar支持啟用消息的批量處理,開啟批量處理的時候,producer會積累請求的消息,并合并為一個批次,一次性發送。批量處理的量大小由“最大消息數”和“最大延遲時間”兩個配置決定。達到“最大消息數”或者“最大延遲時間”就會發送。

在Pulsar中,批次被跟蹤存儲為單個單元,而不是每個消息獨立存儲為單個單元。

Consumer處理消息的時候,也是按整個批次接收,接收后,再將批次拆分成一個個獨立的消息。

當Consumer確認了一個批次中的所有消息,這個批次才會被判定為已確認。這就有可能造成重復消費的問題,各種異常、nack,確認超時等情況發生,會導致broker會把批次消息重新推送給consumer,即使其中一部分consumer已經消費過。

pulse記憶方法?為了解決這個問題,Pulsar在2.6.0之后,引入的批次ack下標的概念,broker會維護每個批次中的消息ack下標,避免向consumer推送重復的消息。

但是默認情況下,broker這個功能是關閉的(acknowledgmentAtBatchIndexLevelEnabled=false)。因為開啟批次下標功能,會導致更多的內存開銷(因為要維護每個批次的內部下標)。

分塊

當啟用分塊的時候(chunkingEnabled=true),如果消息的大小超過配置的最大大小,則producer會將原始消息分隔成多個塊,并將它們與塊的元數據單獨和按順序地發送到broker。

在broker中,分塊消息將和普通消息以相同的方式存儲在Managed Ledger上。

consumer緩存的收到的塊消息,直到收到消息的所有塊。然后consumer將分塊拼接成真正的消息,并將它放入到接收隊列。這個時候,客戶端才算真正接收到消息,從隊列取出消息進行消費。如果consumer未能在過期時間內接受到消息的所有分塊,則會放棄過期未完成的分塊消息,默認1小時。

pulsar是什么意思。consumer一旦確認消費整個分塊消息,則內部會將消息對應的所有分塊都進行確認。

因為需要緩存分塊消息,所以可能出現分區消息過多,緩沖區爆滿的情況。所以有maxPendingChunkedMessage配置,當緩存的消息量達到這個值,新的分塊消息,consumer就會通過靜默ack或者nack,并讓broker稍后把這些消息重發。

Consumer消費者

consumer向topic發起訂閱,處理producer發送到topic的消息。

consumer向broker發送消息流獲取申請,以獲取信息。在consumer端有一個隊列,用于接收broker推送來的消息。每當consumer.receive()調用一次,就從緩沖區獲取一條信息。

接收模式

consumer可以通過同步(Sync)或(異步)的方式從broker接收數據。

發送模式

說明

同步接收同步模式,在收到消息之前都是被阻塞的。
異步接收異步接收模式會立即返回一個future,一旦收到新的消息,就立即完成。

確認

producer發送消息到broker之后,就會被永久保存起來。consumer成功消費了一條消息后,會向broker發送一個確認消息,broker在收到消費者消費成功的消息后,才有可能會被刪除。如果希望消息被consumer消費后,還能被保留,可以配置“消息保留策略”。

消息有兩種方式去確認:

  • 分開確認:消費者對每個消息獨立確認。例如broker向consumer發送了1,2,3,4,四個消息,consumer可以直接確認2、4。但是1、3沒確認。
  • 累計確認:消費者只確認最后一條消息,該消息之前的所有消息也都會被確認。同樣以1、2、3、4為例,確認4,則4個消息都會確認被消費了。這個就與kafka的提交offset類似了。

注意:累計確認,不能在“共享訂閱模式”下使用!!!因為該模式涉及到多個訪問相同訂閱的消費者,只能按單條消費確認。

取消確認

當consumer沒有成功消費某條消息,希望后面重新消費該消息,則可以給broker發送一個nack請求。

注意:

  • 在獨占消費模式和災備訂閱模式中,消費者僅僅只能對收到的最后一條消息進行nack。
  • 在共享訂閱和KEY共享模式中,你可以對單個消息進行nack。
  • 獨占、災備和共享模式,本來是有序的,但是使用nack,可能會導致消息亂序。
  • 如果開啟批量處理,nack一個消息時,跟這個消息同個批次的其他消息也會在后續重新投遞給consumer。

確認超時

如果consumer在指定的時間內沒有給broker發送ack或nack消息,客戶端會跟蹤“超時未確認”的消息。并在指定超時時間后,給broker發送一個“重發未確認消息”的請求。

注意:盡量優先使用nack~

死信主題

當consumer沒有辦法成功消費某個消息的時候,可以將該消息暫存到另外一個特殊的topic,然后去消費該消息之后的其他消息,這個特殊的topic,就是“死信主題”。

java的實例代碼:

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)

??????????????.topic(topic)

??????????????.subscriptionName("my-subscription")

??????????????.subscriptionType(SubscriptionType.Shared)

??????????????.deadLetterPolicy(DeadLetterPolicy.builder()

????????????????????.maxRedeliverCount(maxRedeliveryCount)

????????????????????.deadLetterTopic("your-topic-name")

????????????????????.build())

??????????????.subscribe();

deadLetterTopic并不是必須的,沒有指定私信主題名稱的時候,默認主題為:<topicname>-<subscriptionname>-DLQ。

當前死信主題能在共享和key共享兩種模式下使用。

重試主題

死信主題,能在我們消費某些消息失敗的時候,將這些消息暫存起來,待后續處理。但是我們現實的業務場景,大多是希望自動重試,例如消息消費失敗,1分鐘之后重新消費該信息,所以就有了“重試主題”。

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)

????????????????.topic(topic)

????????????????.subscriptionName("my-subscription")

????????????????.subscriptionType(SubscriptionType.Shared)

????????????????.enableRetry(true)

????????????????.receiverQueueSize(100)

????????????????.deadLetterPolicy(DeadLetterPolicy.builder()

????????????????????????.maxRedeliverCount(maxRedeliveryCount)

????????????????????????.retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")

????????????????????????.build())

????????????????.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)

????????????????.subscribe();

Topic主題

Topic的結構如下:

{persistent|non-persistent}://tenant/namespace/topic

Topic名稱組成

說明

persistent/non-persistent

topic的類型。Pulsar支持“持久化”和“非持久化”兩種topic類型。默認是持久化類型。對于持久化類型的topic,所有消息都會被持久化到磁盤中,而非持久化的topic,數據只會保存在內存中。

當broker接收到非持久化topic的消息,會馬上投遞給當前或者的consumer;如果當前沒有活著的consumer,則消息丟失。因為沒有經過持久化,所以會比普通的持久化topic要快,但是會丟信息。

租戶(tenant)Pulsar用于支持多租戶的,數據基于租戶隔離。
命名空間(namespace)用于將相關聯的topic作為一個組來管理。每個租戶下一個配置多個命名空間。
主題名主題的名字。

注意:不需要特意去Pulsar上創建topic,當使用一個不存在的topic時,Pulsar會自動創建這個topic。

訂閱模式

Pulsar中有四種訂閱模式:獨占、災備、共享和key共享:

注意:當一個subscription沒有任何consumer的時候,它的訂閱模式是undefined。當有一個consumer連接上subscription時,該consumer使用的模式,就決定了整個subscription的模式。所有,可以通過修改配置,然后重啟所有的consumer來改變subscription的模式

獨享模式(Exclusive):

該模式下,只允許一個consumer連接到subscription,如果有第一個consumer連接到subscription,則會報錯。

災備模式(Failover):

在災備模式下,允許多個consumer同時連接到相同的subscription,但是只要一個master的consumer能接收到“分分區topic”或“分區topic的分區”的消息。當作為master的consumer斷開連接了,就會把所有未ack的消息投遞給下一個或者的consumer(故障轉移)。

對于分區的topic,broker會把所有consumer的優先級和consumer的名稱的字典順序來排序,然后broker盡量將分區均勻地分配給優先級高的consumer。

對于非分區的topic,第一個訂閱的consumer就是master。

共享模式(Shared):

在共享模式下,允許多個consumer同事連接到相同的subscription,消息會被輪詢的投遞到這些consumer。如果有一個consumer掛了,那些已經發送給它,但是未得到ack的消息,會被重新分發給其他或者的consumer。

注意:

  • 消息的順序沒有保障。因為消息被輪詢投遞給各個consumer,多個consumer之間,是并發處理消息
  • 無法使用“累計確認”,因為同一個topic的消息,分別發送給了不同的consumer,如果允許使用“累計確認”,會將其他consumer接收到的消息也確認了

key共享模式(Key_Shared):

該模式跟普通共享模式差不多,也允許多個consumer同時消費信息,但是會把key相同的消息投遞給同一個consumer,大致能保證相同key的消息的順序。

當有一個consumer新增或者掛掉的時候,會改變消息的分配邏輯。例如,是key%活著的consumer,得到消息需要投遞給哪一個consumer,如果新增或者減少consumer數量,就會導致分配的consumer不一樣。

注意:

  • consumer使用key共享模式,對應的producer發送消息的時候,就必須設置key或者排序key。producer沒設置key,broker咋用key類分配消息給consumer呢~
  • 不能使用“累計確認”。跟共享模式道理一樣,會造成消息誤提交。
  • producer不能使用批量處理,或者使用就key的批量處理。普通的批處理,同一個批次里的消息key不一樣,可能需要分配給不同的consumer,但實際上,一個批次是真正的一個單元,只能給一個consumer

多主題訂閱

從Pulsar1.23.0-incubating版本之后,Pulsar的consumer端就支持同時訂閱多個topic。可以有兩種方式:

  • 正則匹配,像persistent://public/default/finance-*,public租戶的默認命名空間下,所有以finance-開頭的topic,都訂閱了。
  • 明確指定多個topic。

注意:訂閱多個topic的時候,多個topic之間消息的順序性沒有保證

分區topic

普通的topic,只被保存在單個broker上,限制了主題的吞吐量。分區topic是一種特殊的topic,可以分布在多個broker上,提高topic的吞吐量。

分區topic底層實際上是N個內部的普通topic,N就是分區數,這些內部的普通topic可以被分配到不同的broker上。

當向分區topic發送消息的時候,每條消息會被路由到其中的一個broker。Pulsar自動處理跨broker的分區分布。

路由模式:

前面我們提到過“訂閱模式”,訂閱模式是消息怎么投遞給consumer;現在講的是“路由模式”,producer怎么把消息發給broker。

Pulsar有三種路由模式:

  • 輪詢分區(RoundRobinPartition):如果消息沒有指定key,為了達到最大吞吐量的目的,producer會將消息輪詢投遞給各個分區(批量處理的情況下,輪詢處理的不是消息,而是批次)。如果指定了key,producer會根據key的hash值將該消息分配到對應的分區。默認模式。
  • 單個分區(SinglePartition):如果沒有指定key,producer會隨機選擇一個分區,并且發布的所有消息都分配到該分區。如果消息指定了key,producer會根據key的hash值將該消息分配到對應的分區。
  • 自定義分區(CustomPartition):使用自定義的消息路由器來實現分區路由。

順序保證:

當使用“輪詢分區”、“單個分區”的路由模式,producer只要指定key,則相同key的消息會被投遞到同一個分區,key相同的消息,順序有保證。

當使用“單個分區”的路由模式,producer沒有指定key,則所有消息都會被投遞到同一個分區,順序有保證,但是吞吐量就低了,降級為普通topic。

消息保留策略

Pulsar默認的消息保留策略為:

  • 馬上刪除所有consumer ack的消息。
  • 以backlog的形式,持久保存未被ack的消息。

有兩個特性,可以覆蓋上面的行為:

消息保留。允許保存已經被consumer ack了的消息。

消息過期 Time-To-Live(TTL)。允許給消息設置一個TTL,消息過期了,即使消息還沒被消費,也會被清除。

消息去重

Pulsar支持對消息去重,同一條消息即使被重復投遞多次,在broker端只會被保存一次。

消息去重,實際上就是“生產者冪等”。

使用其他消息中間件,都沒有提供該特性的支持,當要保證消息發送的可靠性,就會引入失敗重試邏輯。而這往往就會導致消息的重復發送,消費者端就被迫需要去做消息冪等處理,實現“消費者冪等”。

所以Pulsar對于那些需要實現“僅一次”的場景,會更友好。

要啟用消息去重,需要同時對broker和client兩端做配置。

有三種級別來啟用broker端的消息去重:

1、broker級別啟用,所有的namespace和topics都會去重。相關配置如下:

參數名

說明

默認值

brokerDeduplicationEnabled整個broker級別的去重配置開關,true為開啟,fasle為關閉。false
brokerDeduplicationMaxNumberOfProducers為了去重目的而存儲信息的最大producer數量。10000
brokerDeduplicationEntriesInterval

一個快照文件能保存的最大消息數。

配置較大的值,產生的快照文件數量會比較少;但是這樣會導致tpoics的恢復變慢(要重放快照文件)。

1000
brokerDeduplicationProducerInactivityTimeoutMinutesproducer掉線多長時間后會,broker會把快照文件清除。360(單位:分鐘)

2、即使brokerDeduplicationEnabled=false,沒有開啟去重功能。也可以單獨在namespace下開啟:

pulsar-admin namespaces set-deduplication?\

public/default \
--enable

3、也可以只開啟單獨的topics:

pulsar-admin topics set-deduplication

client端啟用去重,需要做兩件事:

  • 指定producer名稱。
  • 設置消息超時時間為0,不超時。

為什么做這兩件事情,就是啟動客戶端去重呢?

producer發送消息的時候有SequenceNumber的概念,每發一個消息,就會+1。

在broker端,會根據producer的名稱來保存producer發送的消息對應的SequenceNumber。

至于為什么要設置消息超時時間為0,就不大懂了,可能是消息超時被刪會影響去重判斷?

消息延遲投遞

我們很多場景,需要在一段時間后,觸發一個事情。例如開會15分鐘之前發送一個提醒~Pulsar的延遲消息功能就符合這種場景。

發送延遲消息,消息會被存儲在BookKeeper,broker保存消息的時候,并不會做任何檢查的(不會判斷消息是延遲消息,還是正常消息)。

而是當consumer消費一個消息的時候,判斷消息是否是延遲消息,如果是延遲消息,會將消息加入到DelayedDeliveryTracker。

DelayedDeliveryTracker會在內存中保存一個時間索引(時間→消息id),DelayedDeliveryTracker檢查延遲時間到了之后,才把消息重新投遞consumer。

注意:目前延遲消息只能作用于“共享訂閱”模式。在“獨占”或者“災備”模式下,消息都是直接投遞

Broker

Pulsar的broker是一個無狀態的組件,主要負責運行兩個組件:

  • 作為一個http服務,暴露restful接口,供管理任務和producer/consumer發現topic使用。producer連接到broker來發消息,consumer連接到broker來消費消息。
  • 作為分發器,使用自定義的協議,采用異步TCP發送數據。為了讓效率更高,消息的分發,一般是通過managed leger的緩存來實現。但是當消息堆積,緩存超過的最大限制,broker就會開始從BookKeeper里讀取數據。

注意:Pulsar的集群是支持跨集群復制數據的。為了支持Topic的異地復制,Broker會使用Relicators追蹤本地發布的數據,并把這些數據用JAVA 客戶端重新發布到其他地方。

集群

一個Pulsar實例可以加入多個集群,在集群之前實現數據同步。

單個Pulsar集群由三部分組成:

  • 一個或多個broker。broker與Pulsar的配置存儲層交互,處理各種協作任務,并且使用BookKeeper來存儲消息;依賴Zookeeper來處理特定的任務等。
  • 包含一個或多個bookie的BookKeeper集群負責消息的存儲。
  • 一個Zookeeper,用來處理Pulsar集群之間的協調任務。

注意:集群內部需要一個Zookeeper集群,保存元數據;集群之間還有一個Zookeeper集群,用來處理Pulsar集群之間的協調任務。

元數據存儲

Pulsar集群使用Zookeeper來存儲元數據,集群配置和協調任務。

在一個Pulsar實例中:

  • 保存租戶、命名空間和其他實體,并且保證數據的全局唯一性。
  • 每個集群都有自己的本地Zookeeper,用來存儲集群特定的配置和任務協調,例如:哪些broker負責哪些topic的元數據、broker的負載報告、BookKeeper leger的元數據等。

持久存儲

Pulsar提供的消息投遞的可靠性的保障。即一個消息被成功投遞到broker,它就一定會被投遞到它的目標。

為了提供這種保證,未被consumer ack的消息,會一直被保存到consumer ack。

Apache BookKeeper

Pulsar用Apache BookKeeper作為持久化存儲,這是一個分布式的預寫日志(WAL)系統,以下特性很適合用于Pulsar:

  • 能讓Pulsar去利用許多獨立的日志(ledgers),這些ledgers會隨便時間,由topic創建。
  • 提供了高效的順序存儲,可以用于數據的復制。
  • 保證了在發生系統故障時的讀一致性。
  • 保證了在bookies之間均勻的I/O分布。
  • 容量和吞吐量有良好的水平拓展性。給BookKeeper集群添加新的節點,即可提高容量。
  • Bookies被設計成支持上千ledgers的并發讀寫。它使用多個存儲設備,一個用于存儲日志,一個用于普通的存儲。這樣Bookis可以將讀寫操作的影響隔離開。

另外,consumer訂閱的消費位置cursors,也是存儲在BookKeeper。

Ledgers

Ledger是一個只追加的數據結構,并且只有一個寫入器,這個寫入器負責多個BookKeeper存儲節點(就是Bookies)的寫入。 Ledger的條目會被復制到多個bookies。Ledgers本身有著非常簡單的語義:

  • Pulsar Broker可以創建ledeger,添加內容到ledger和關閉ledger。
  • 當一個ledger被關閉后,除非明確的要寫數據或者是因為寫入器掛掉導致ledger關閉,否則這個ledger只會以只讀模式打開。
  • 當ledger中的條目不再有用的時候,整個legder可以被刪除(ledger分布是跨Bookies的)。

Ledger讀一致性

BookKeeper的主要優勢在于它能在有系統故障時保證讀的一致性。由于Ledger只能被一個進程寫入(之前提的寫入器進程),這樣這個進程在寫入時不會有沖突,從而寫入會非常高效。

在一次故障之后,ledger會啟動一個恢復進程來確定ledger的最終狀態并確認最后提交到日志的是哪一個條目。 在這之后,能保證所有的ledger讀進程讀取到相同的內容。

Managed ledgers

managed ledgers是一個在ledgers之上構造的概念,是BookKeeper ledgers提供的一個單一的日志抽象,作為一個topic的存儲層。

也就是消息流的抽象,有一個寫入器進程,不斷的在消息流的尾部添加消息(producer發送消息),并且有多個cursor消費這個流(consumer訂閱消息),每個cursor都有自己的消費位置。

一個managed ledgers底層使用多個BookKeeper ledgers來保存數據。使用多個ledgers的原因有:

  • 故障之后,如果原來的ledgers不能寫了,就可以開一個新的ledgers
  • 當一個ledgers里的消息,已經被所有訂閱的cursor消費了,則這個ledgers可以被刪除。

日志存儲

在BookKeeper中,最重要的日志,就是事務日志。在寫數據到ledger之前,bookie會確保這個更新的事務日志,已經持久化到存儲上。

當Bookie啟動,或者舊的日志文件的大小達到最大限制時,會創建新的日志文件。

Pulsar proxy

Pulsar客戶端和Pulsar集群的交互,可以直接連接Pulsar的brokers。但是某些情況下,這種直連無法做到,例如pulsar部署在云環境或者K8S中,ip并不固定。

Pulsar proxy就是為了解決這種問題,它扮演網關的角色,客戶端與它通訊,而不是直接與broker。

Pulsar proxy是直接從Zookeeper上讀取它需要的信息,例如broker的ip和端口(broker啟動后,會被自己的信息,注冊到Zookeeper中)。

相關資料

Messaging · Apache Pulsar

看這篇就夠了!RocketMQ、Kafka、Pulsar事務消息|原子性|kafka|回滾|事務型|rocketmq_網易訂閱

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://808629.com/196798.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 86后生记录生活 Inc. 保留所有权利。

底部版权信息