Pulsar 的單個實例原生支持多個集群,可跨機房在集群間無縫地完成消息復制。
極低的發布延遲和端到端延遲。
Pulsar采用發布訂閱的設計模式(pub-sub),該模式下,producer發送消息到topic,consumer訂閱topic,消費producer發布的消息,并在處理完成后向broker發送確認。
消息是pulsar的基礎“單元”,消息的組成如下:
pulsar源碼是什么語言開發、組件 | 描述 |
---|---|
Value/data payload | 值。消息攜帶的消息體,二進制數據。 |
Key | 鍵。消息可以選擇使用key進行標記,這個與消息的順序,topic壓縮等操作有關系 |
Properties | 屬性。用戶可以自定義的一些鍵值對 |
Sequence ID | 序號。所有保存在topic中的消息,都會有一個排序的序號。類比Kafka,就是消息的Offset |
Publish time | 發送時間。producer發送消息時的時間戳 |
EventTime | 事件時間。一個可選的時間戳。 |
TypeMessageBuilder | 這個是用來構造一個消息。 |
Producer 可以以同步(sync) 或 異步(async) 的方式發布消息到 broker。
Producer訪問topic的模式有分享(Shared)、獨占(Exclusive)、等待獨占(waitForExclusive):
知識梳理,Producer支持在傳輸消息的時候,對消息進行壓縮,支持的壓縮算法有:
Pulsar支持啟用消息的批量處理,開啟批量處理的時候,producer會積累請求的消息,并合并為一個批次,一次性發送。批量處理的量大小由“最大消息數”和“最大延遲時間”兩個配置決定。達到“最大消息數”或者“最大延遲時間”就會發送。
在Pulsar中,批次被跟蹤存儲為單個單元,而不是每個消息獨立存儲為單個單元。
Consumer處理消息的時候,也是按整個批次接收,接收后,再將批次拆分成一個個獨立的消息。
當Consumer確認了一個批次中的所有消息,這個批次才會被判定為已確認。這就有可能造成重復消費的問題,各種異常、nack,確認超時等情況發生,會導致broker會把批次消息重新推送給consumer,即使其中一部分consumer已經消費過。
pulse記憶方法?為了解決這個問題,Pulsar在2.6.0之后,引入的批次ack下標的概念,broker會維護每個批次中的消息ack下標,避免向consumer推送重復的消息。
但是默認情況下,broker這個功能是關閉的(acknowledgmentAtBatchIndexLevelEnabled=false)。因為開啟批次下標功能,會導致更多的內存開銷(因為要維護每個批次的內部下標)。
當啟用分塊的時候(chunkingEnabled=true),如果消息的大小超過配置的最大大小,則producer會將原始消息分隔成多個塊,并將它們與塊的元數據單獨和按順序地發送到broker。
在broker中,分塊消息將和普通消息以相同的方式存儲在Managed Ledger上。
consumer緩存的收到的塊消息,直到收到消息的所有塊。然后consumer將分塊拼接成真正的消息,并將它放入到接收隊列。這個時候,客戶端才算真正接收到消息,從隊列取出消息進行消費。如果consumer未能在過期時間內接受到消息的所有分塊,則會放棄過期未完成的分塊消息,默認1小時。
pulsar是什么意思。consumer一旦確認消費整個分塊消息,則內部會將消息對應的所有分塊都進行確認。
因為需要緩存分塊消息,所以可能出現分區消息過多,緩沖區爆滿的情況。所以有maxPendingChunkedMessage配置,當緩存的消息量達到這個值,新的分塊消息,consumer就會通過靜默ack或者nack,并讓broker稍后把這些消息重發。
consumer向topic發起訂閱,處理producer發送到topic的消息。
consumer向broker發送消息流獲取申請,以獲取信息。在consumer端有一個隊列,用于接收broker推送來的消息。每當consumer.receive()調用一次,就從緩沖區獲取一條信息。
consumer可以通過同步(Sync)或(異步)的方式從broker接收數據。
發送模式 | 說明 |
---|---|
同步接收 | 同步模式,在收到消息之前都是被阻塞的。 |
異步接收 | 異步接收模式會立即返回一個future,一旦收到新的消息,就立即完成。 |
producer發送消息到broker之后,就會被永久保存起來。consumer成功消費了一條消息后,會向broker發送一個確認消息,broker在收到消費者消費成功的消息后,才有可能會被刪除。如果希望消息被consumer消費后,還能被保留,可以配置“消息保留策略”。
消息有兩種方式去確認:
注意:累計確認,不能在“共享訂閱模式”下使用!!!因為該模式涉及到多個訪問相同訂閱的消費者,只能按單條消費確認。
當consumer沒有成功消費某條消息,希望后面重新消費該消息,則可以給broker發送一個nack請求。
注意:
如果consumer在指定的時間內沒有給broker發送ack或nack消息,客戶端會跟蹤“超時未確認”的消息。并在指定超時時間后,給broker發送一個“重發未確認消息”的請求。
注意:盡量優先使用nack~
當consumer沒有辦法成功消費某個消息的時候,可以將該消息暫存到另外一個特殊的topic,然后去消費該消息之后的其他消息,這個特殊的topic,就是“死信主題”。
java的實例代碼:
|
deadLetterTopic并不是必須的,沒有指定私信主題名稱的時候,默認主題為:<topicname>-<subscriptionname>-DLQ。
當前死信主題能在共享和key共享兩種模式下使用。
死信主題,能在我們消費某些消息失敗的時候,將這些消息暫存起來,待后續處理。但是我們現實的業務場景,大多是希望自動重試,例如消息消費失敗,1分鐘之后重新消費該信息,所以就有了“重試主題”。
|
|
Topic名稱組成 | 說明 |
---|---|
persistent/non-persistent | topic的類型。Pulsar支持“持久化”和“非持久化”兩種topic類型。默認是持久化類型。對于持久化類型的topic,所有消息都會被持久化到磁盤中,而非持久化的topic,數據只會保存在內存中。 當broker接收到非持久化topic的消息,會馬上投遞給當前或者的consumer;如果當前沒有活著的consumer,則消息丟失。因為沒有經過持久化,所以會比普通的持久化topic要快,但是會丟信息。 |
租戶(tenant) | Pulsar用于支持多租戶的,數據基于租戶隔離。 |
命名空間(namespace) | 用于將相關聯的topic作為一個組來管理。每個租戶下一個配置多個命名空間。 |
主題名 | 主題的名字。 |
注意:不需要特意去Pulsar上創建topic,當使用一個不存在的topic時,Pulsar會自動創建這個topic。
Pulsar中有四種訂閱模式:獨占、災備、共享和key共享:
注意:當一個subscription沒有任何consumer的時候,它的訂閱模式是undefined。當有一個consumer連接上subscription時,該consumer使用的模式,就決定了整個subscription的模式。所有,可以通過修改配置,然后重啟所有的consumer來改變subscription的模式。
獨享模式(Exclusive):
該模式下,只允許一個consumer連接到subscription,如果有第一個consumer連接到subscription,則會報錯。
災備模式(Failover):
在災備模式下,允許多個consumer同時連接到相同的subscription,但是只要一個master的consumer能接收到“分分區topic”或“分區topic的分區”的消息。當作為master的consumer斷開連接了,就會把所有未ack的消息投遞給下一個或者的consumer(故障轉移)。
對于分區的topic,broker會把所有consumer的優先級和consumer的名稱的字典順序來排序,然后broker盡量將分區均勻地分配給優先級高的consumer。
對于非分區的topic,第一個訂閱的consumer就是master。
共享模式(Shared):
在共享模式下,允許多個consumer同事連接到相同的subscription,消息會被輪詢的投遞到這些consumer。如果有一個consumer掛了,那些已經發送給它,但是未得到ack的消息,會被重新分發給其他或者的consumer。
注意:
key共享模式(Key_Shared):
該模式跟普通共享模式差不多,也允許多個consumer同時消費信息,但是會把key相同的消息投遞給同一個consumer,大致能保證相同key的消息的順序。
當有一個consumer新增或者掛掉的時候,會改變消息的分配邏輯。例如,是key%活著的consumer,得到消息需要投遞給哪一個consumer,如果新增或者減少consumer數量,就會導致分配的consumer不一樣。
注意:
從Pulsar1.23.0-incubating版本之后,Pulsar的consumer端就支持同時訂閱多個topic。可以有兩種方式:
注意:訂閱多個topic的時候,多個topic之間消息的順序性沒有保證。
普通的topic,只被保存在單個broker上,限制了主題的吞吐量。分區topic是一種特殊的topic,可以分布在多個broker上,提高topic的吞吐量。
分區topic底層實際上是N個內部的普通topic,N就是分區數,這些內部的普通topic可以被分配到不同的broker上。
當向分區topic發送消息的時候,每條消息會被路由到其中的一個broker。Pulsar自動處理跨broker的分區分布。
路由模式:
前面我們提到過“訂閱模式”,訂閱模式是消息怎么投遞給consumer;現在講的是“路由模式”,producer怎么把消息發給broker。
Pulsar有三種路由模式:
順序保證:
當使用“輪詢分區”、“單個分區”的路由模式,producer只要指定key,則相同key的消息會被投遞到同一個分區,key相同的消息,順序有保證。
當使用“單個分區”的路由模式,producer沒有指定key,則所有消息都會被投遞到同一個分區,順序有保證,但是吞吐量就低了,降級為普通topic。
Pulsar默認的消息保留策略為:
有兩個特性,可以覆蓋上面的行為:
消息保留。允許保存已經被consumer ack了的消息。
消息過期 Time-To-Live(TTL)。允許給消息設置一個TTL,消息過期了,即使消息還沒被消費,也會被清除。
Pulsar支持對消息去重,同一條消息即使被重復投遞多次,在broker端只會被保存一次。
消息去重,實際上就是“生產者冪等”。
使用其他消息中間件,都沒有提供該特性的支持,當要保證消息發送的可靠性,就會引入失敗重試邏輯。而這往往就會導致消息的重復發送,消費者端就被迫需要去做消息冪等處理,實現“消費者冪等”。
所以Pulsar對于那些需要實現“僅一次”的場景,會更友好。
要啟用消息去重,需要同時對broker和client兩端做配置。
有三種級別來啟用broker端的消息去重:
1、broker級別啟用,所有的namespace和topics都會去重。相關配置如下:
參數名 | 說明 | 默認值 |
---|---|---|
brokerDeduplicationEnabled | 整個broker級別的去重配置開關,true為開啟,fasle為關閉。 | false |
brokerDeduplicationMaxNumberOfProducers | 為了去重目的而存儲信息的最大producer數量。 | 10000 |
brokerDeduplicationEntriesInterval | 一個快照文件能保存的最大消息數。 配置較大的值,產生的快照文件數量會比較少;但是這樣會導致tpoics的恢復變慢(要重放快照文件)。 | 1000 |
brokerDeduplicationProducerInactivityTimeoutMinutes | producer掉線多長時間后會,broker會把快照文件清除。 | 360(單位:分鐘) |
2、即使brokerDeduplicationEnabled=false,沒有開啟去重功能。也可以單獨在namespace下開啟:
pulsar-admin namespaces set-deduplication?\
public/default \
--enable
3、也可以只開啟單獨的topics:
pulsar-admin topics set-deduplication
client端啟用去重,需要做兩件事:
為什么做這兩件事情,就是啟動客戶端去重呢?
producer發送消息的時候有SequenceNumber的概念,每發一個消息,就會+1。
在broker端,會根據producer的名稱來保存producer發送的消息對應的SequenceNumber。
至于為什么要設置消息超時時間為0,就不大懂了,可能是消息超時被刪會影響去重判斷?
我們很多場景,需要在一段時間后,觸發一個事情。例如開會15分鐘之前發送一個提醒~Pulsar的延遲消息功能就符合這種場景。
發送延遲消息,消息會被存儲在BookKeeper,broker保存消息的時候,并不會做任何檢查的(不會判斷消息是延遲消息,還是正常消息)。
而是當consumer消費一個消息的時候,判斷消息是否是延遲消息,如果是延遲消息,會將消息加入到DelayedDeliveryTracker。
DelayedDeliveryTracker會在內存中保存一個時間索引(時間→消息id),DelayedDeliveryTracker檢查延遲時間到了之后,才把消息重新投遞consumer。
注意:目前延遲消息只能作用于“共享訂閱”模式。在“獨占”或者“災備”模式下,消息都是直接投遞。
Pulsar的broker是一個無狀態的組件,主要負責運行兩個組件:
注意:Pulsar的集群是支持跨集群復制數據的。為了支持Topic的異地復制,Broker會使用Relicators追蹤本地發布的數據,并把這些數據用JAVA 客戶端重新發布到其他地方。
一個Pulsar實例可以加入多個集群,在集群之前實現數據同步。
單個Pulsar集群由三部分組成:
注意:集群內部需要一個Zookeeper集群,保存元數據;集群之間還有一個Zookeeper集群,用來處理Pulsar集群之間的協調任務。
Pulsar集群使用Zookeeper來存儲元數據,集群配置和協調任務。
在一個Pulsar實例中:
Pulsar提供的消息投遞的可靠性的保障。即一個消息被成功投遞到broker,它就一定會被投遞到它的目標。
為了提供這種保證,未被consumer ack的消息,會一直被保存到consumer ack。
Pulsar用Apache BookKeeper作為持久化存儲,這是一個分布式的預寫日志(WAL)系統,以下特性很適合用于Pulsar:
另外,consumer訂閱的消費位置cursors,也是存儲在BookKeeper。
Ledger是一個只追加的數據結構,并且只有一個寫入器,這個寫入器負責多個BookKeeper存儲節點(就是Bookies)的寫入。 Ledger的條目會被復制到多個bookies。Ledgers本身有著非常簡單的語義:
Ledger讀一致性
BookKeeper的主要優勢在于它能在有系統故障時保證讀的一致性。由于Ledger只能被一個進程寫入(之前提的寫入器進程),這樣這個進程在寫入時不會有沖突,從而寫入會非常高效。
在一次故障之后,ledger會啟動一個恢復進程來確定ledger的最終狀態并確認最后提交到日志的是哪一個條目。 在這之后,能保證所有的ledger讀進程讀取到相同的內容。
managed ledgers是一個在ledgers之上構造的概念,是BookKeeper ledgers提供的一個單一的日志抽象,作為一個topic的存儲層。
也就是消息流的抽象,有一個寫入器進程,不斷的在消息流的尾部添加消息(producer發送消息),并且有多個cursor消費這個流(consumer訂閱消息),每個cursor都有自己的消費位置。
一個managed ledgers底層使用多個BookKeeper ledgers來保存數據。使用多個ledgers的原因有:
在BookKeeper中,最重要的日志,就是事務日志。在寫數據到ledger之前,bookie會確保這個更新的事務日志,已經持久化到存儲上。
當Bookie啟動,或者舊的日志文件的大小達到最大限制時,會創建新的日志文件。
Pulsar客戶端和Pulsar集群的交互,可以直接連接Pulsar的brokers。但是某些情況下,這種直連無法做到,例如pulsar部署在云環境或者K8S中,ip并不固定。
Pulsar proxy就是為了解決這種問題,它扮演網關的角色,客戶端與它通訊,而不是直接與broker。
Pulsar proxy是直接從Zookeeper上讀取它需要的信息,例如broker的ip和端口(broker啟動后,會被自己的信息,注冊到Zookeeper中)。
Messaging · Apache Pulsar
看這篇就夠了!RocketMQ、Kafka、Pulsar事務消息|原子性|kafka|回滾|事務型|rocketmq_網易訂閱
版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。
工作时间:8:00-18:00
客服电话
电子邮件
admin@qq.com
扫码二维码
获取最新动态