pulsar源碼是什么語言開發,pulsar文檔

 2023-12-25 阅读 33 评论 0

摘要:文章目錄一、pulsar簡介二、消息收發messagingproducerconsumerstopic訂閱多主題訂閱分區topic非持久topic消息保留和過期消息去重生產者冪等去重和實際一次語義消息延遲傳遞三、架構四、租戶五、消息壓縮六、Pulsar ATS-SNI 路由七、Multiple advertised listeners八、schem

文章目錄

  • 一、pulsar簡介
  • 二、消息收發
    • messaging
    • producer
    • consumers
    • topic
    • 訂閱
    • 多主題訂閱
    • 分區topic
    • 非持久topic
    • 消息保留和過期
    • 消息去重
    • 生產者冪等
    • 去重和實際一次語義
    • 消息延遲傳遞
  • 三、架構
  • 四、租戶
  • 五、消息壓縮
  • 六、Pulsar ATS-SNI 路由
  • 七、Multiple advertised listeners
  • 八、schema
  • 九、pulsar函數
  • 九、部署
    • 一、單機部署
    • 二、集群部署
  • 十、系統管理

一、pulsar簡介

Pulsar 是一個用于服務器到服務器的消息系統,具有多租戶、高性能等優勢。 Pulsar 最初由 Yahoo 開發,目前由 Apache 軟件基金會管理。

Pulsar 的關鍵特性如下:

Pulsar 的單個實例原生支持多個集群,可跨機房在集群間無縫地完成消息復制。
極低的發布延遲和端到端延遲。
可無縫擴展到超過一百萬個 topic。
簡單的客戶端 API,支持 Java、Go、Python 和 C++。

支持多種 topic 訂閱模式(獨占訂閱、共享訂閱、故障轉移訂閱)。

通過 Apache BookKeeper 提供的持久化消息存儲機制保證消息傳遞 。

由輕量級的 serverless 計算框架 Pulsar Functions 實現流原生的數據處理。
基于 Pulsar Functions 的 serverless connector 框架 Pulsar IO 使得數據更易移入、移出 Apache Pulsar。
分層式存儲可在數據陳舊時,將數據從熱存儲卸載到冷/長期存儲(如S3、GCS)中。

二、消息收發

messaging

  1. Pulsar 采用 發布-訂閱的設計模式(簡稱 pub-sub), 該設計模式中,producer 發布消息到 topic, Consumer 訂閱 topic、處理發布的消息,并在處理完成后發送確認。
  2. 一旦創建訂閱,即使 consumer 斷開連接,Pulsar 仍然可以保存所有消息。 在 consumer 確認消息已處理成功后,才會刪除消息。
  3. 消息的組成部分:
  • key(消息可以選擇用鍵進行標記,這在 topic 壓縮 等操作很有用)
  • value
  • 屬性
  • producer名稱
  • 序列Id
  • 發布時間
  • 事件時間
  • TypedMessageBuilder(用于構造消息。 您可以使用 TypedMessageBuilder 設置消息的鍵值對屬性。
    在設置 TypedMessageBuilder 時,最佳的選擇是將 key 設置為字符串。 如果將 key 設置為其他類型(例如,AVRO 對象),則 key 會以字節形式發送,這時 consumer 就很難使用了)
  1. 消息默認最大為5M。通過配置調整。
  • broker.conf
# The max size of a message (in bytes).
maxMessageSize=5242880
  • bookkeeper.con
# The max size of the netty frame (in bytes). Any messages received larger than this value are rejected. The default value is 5 MB.
nettyMaxFrameSizeBytes=5253120

producer

  1. 發送模式 :
    Producer 可以以同步(sync) 或 異步(async) 的方式發布消息到 broker。
  • 同步發送:生產者發送消息后等待broker的確認,
    如果沒有收到確認,生產者確定發送失敗。
  • 異步發送:Producer 將把消息放于阻塞隊列中,并立即返回 然后,客戶端將在后臺將消息發送給 broker。 如果隊列已滿(最大大小可配置),則調用 API 時,producer 可能會立即被阻止或失敗,具體取決于傳遞給 producer 的參數。
  1. 訪問模式:

  2. 壓縮

  • 支持LZ4 ZLIB ZSTD SNAPPY壓縮類型。
  • 批量處理:

當批量處理啟用時,producer 會在單個請求中積累并發送一批消息。 批量處理的量大小由最大消息數和最大發布延遲定義。 因此,積壓數量是分批處理的總數,而不是信息總數。

在 Pulsar 中,批次被跟蹤并存儲為單個單元,而不是單個消息。 Consumer 將批量處理的消息拆分成單個消息。 但即使啟用了批量處理,也始終將計劃中的消息(通過 deliverAt 或者 deliverAfter 進行配置) 作為單個消息發送。

一般來說,當 consumer 確認了一個批的所有消息,該批才會被認定為確認。 這意味著當發生不可預料的失敗、否定的確認(negative acknowledgements)或確認超時,都可能導致批中的所有消息都被重新發送,即使其中一些消息已經被確認了。

維護批量索引的確認狀態并跟蹤每批索引的確認狀態,以避免向 consumer 發送已確認的消息。 當某一批消息的所有索引都被確認時,該批消息將被刪除。

啟用批量索引確認將會導致更多內存開銷。

  • 分塊
    當你想要啟用分塊(chunking) 時,請閱讀以下說明。

Batching and chunking cannot be enabled simultaneously. 如果想要啟用分塊(chunking) ,您必須提前禁用批量處理。
Chunking is only supported for persisted topics.分塊只支持持久消息。
Chunking is only supported for the exclusive and failover subscription modes.分塊只支持獨占和災備訂閱模式。

處理一個 producer 和一個訂閱 consumer 的分塊消息
如下圖所示,當生產者向主題發送一批大的分塊消息和普通的非分塊消息時。 假設生產者發送的消息為 M1,M1 有三個分塊 M1-C1,M1-C2 和 M1-C3。 這個 broker 在其管理的ledger里面保存所有的三個塊消息,然后以相同的順序分發給消費者(獨占/災備模式)。 消費者將在內存緩存所有的塊消息,直到收到所有的消息塊。將這些消息合并成為原始的消息M1,發送給處理進程。

當多個生產者發布塊消息到單個主題,這個 Broker 在同一個 Ledger 里面保存來自不同生產者的所有塊消息。 如下所示,生產者1發布的消息 M1,M1 由 M1-C1, M1-C2 和 M1-C3 三個塊組成。 生產者2發布的消息 M2,M2 由 M2-C1, M2-C2 和 M2-C3 三個塊組成。 這些特定消息的所有分塊是順序排列的,但是其在 ledger 里面可能不是連續的。 這種方式會給消費者帶來一定的內存負擔。因為消費者會為每個大消息在內存開辟一塊緩沖區,以便將所有的塊消息合并為原始的大消息。

consumers

Consumer 向 broker 發送消息流獲取申請(flow permit request)以獲取消息。 在 Consumer 端有一個隊列,用于接收從 broker 推送來的消息。 你能夠通過receiverQueueSize參數配置隊列的長度 (隊列的默認長度是1000) 每當 consumer.receive() 被調用一次,就從緩沖區(buffer)獲取一條消息。

  1. 接收模式
    同步模式(同步模式,在收到消息之前都是被阻塞的),異步模式(異步接收模式會立即返回一個 future 值(如 Java 中的 CompletableFuture),一旦收到新的消息就立刻完成。)
  2. 監聽
    Client libraries provide listener implementation for consumers. For example, the Java client provides a MesssageListener interface. 在這個接口中,一旦接受到新的消息,received方法將被調用。

3.消息確認
當消費者成功的消費了一條消息,這個消費者會發送一個確認信息給broker。 這個消息時是永久保存的,只有在收到訂閱者消費成功的消息確認后才會被刪除。 如果希望消息被 Consumer 確認后仍然保留下來,可配置 消息保留策略實現。

消息有兩種確認模式:單條確認或者累計確認。 累積確認時,消費者只需要確認最后一條他收到的消息。 所有之前(包含此條)的消息,都不會被再次重發給那個消費者。

4.取消確認

當消費者在某個時間沒有成功的消費某條消息,消費者想重新消費到這條消息,這個消費者可以發送一條取消確認消息到 broker,broker 會將這條消息重新發給消費者。

消息取消確認也有單條取消模式和累積取消模式 ,這依賴于消費者使用的訂閱模式。

在獨占消費模式和災備訂閱模式中,消費者僅僅只能對收到的最后一條消息進行取消確認。

5.確認超時

如果消息沒有被成功消費,你想去讓 broker 自動重新交付這個消息, 你可以采用未確認消息自動重新交付機制。 客戶端會跟蹤 超時 時間范圍內所有未確認的消息。 并且在指定超時時間后會發送一個 重發未確認的消息 請求到 broker。

6.死信主題

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES).topic(topic).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).deadLetterTopic("your-topic-name").build()).subscribe();

7.重試消息
很多在線的業務系統,由于業務邏輯處理出現異常,消息一般需要被重新消費。 若需要允許延時重新消費失敗的消息,你可以配置生產者同時發送消息到業務主題和重試主題,并允許消費者自動重試消費。 配置了允許消費者自動重試。如果消息沒有被消費成功,它將被保存到重試主題當中。并在指定延時時間后,自動重新消費重試主題里面的消費失敗消息。

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES).topic(topic).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).enableRetry(true).receiverQueueSize(100).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry").build()).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();

topic

Topic的名稱為符合良好結構的URL:

{persistent|non-persistent}://tenant/namespace/topic
  1. 持久化 / 非持久化

用來標識 topic 的類型。 Pulsar 支持兩種主題類型:持久化和非持久化。 主題默認是持久化類型,如果不特殊指定主題類型,那主題就是持久化的。 對于持久化的主題,所有的消息都會被持久化的保存到磁盤當中(如果 broker 不是單機模式,消息會被持久化到多塊磁盤),而非持久化的主題的數據不會被保存到磁盤里面。

2.租戶

The topic tenant within the instance. Tenants are essential to multi-tenancy in Pulsar, and spread across clusters.

3.命名空間

將相關聯的 topic 作為一個組來管理,是管理 Topic 的基本單元。 大多數對 topic 的管理都是對命名空間的一項配置。 每個租戶里面可以有一個或者多個命名空間。

4.topic

The final part of the name. Topic names have no special meaning in a Pulsar instance.

訂閱

訂閱是命名好的配置規則,指導消息如何投遞給消費者。 Pulsar 中有四種訂閱模式: 獨占,共享,災備和key共享 。
When a subscription has no consumers, its subscription mode is undefined. A subscription’s mode is defined when a consumer connects to the subscription, and the mode can be changed by restarting all consumers with a different configuration.(如果一個訂閱沒有消費者,訂閱模式是不確定的。訂閱模式在一個消費者連接到訂閱的時候被定義,訂閱模式可以唄更改當重啟所有的消費者(不同的配置))。

  • Exclusive 獨占模式

在獨占模式中只有一個消費者允許訂閱,如果多個消費者訂閱相同的主題,會報錯。

  • Failover 災備模式
    主消費者會消費非分區主題或者分區主題中的每個分區的消息。

對于分區主題來說,Broker 將按照消費者的優先級和消費者名稱的詞匯表順序對消費者進行排序。 然后試圖將主題均勻的分配給優先級最高的消費者。

對于非分區主題來說,broker 會根據消費者訂閱非分區主題的順序選擇消費者。

  • key_shared
    在Key_Shared模式下,多個消費者可以附加到同一個訂閱。消息以跨使用者的分發方式傳遞,具有相同鍵或相同排序鍵的消息只被傳遞給一個使用者。無論消息被重新交付多少次,它都被交付給相同的使用者。當使用者連接或斷開時會導致服務使用者更改消息的某些鍵。

多主題訂閱

當consumer訂閱pulsar的主題時,它默認指定訂閱了一個主題,例如:persistent://public/default/my-topic。 從Pulsar的1.23.0-incubating的版本開始,Pulsar消費者可以同時訂閱多個topic。 你可以用以下兩種方式定義topic的列表:

On the basis of a regular expression (regex), for example persistent://public/default/finance-.*
通過明確指定的topic列表

當訂閱多個主題的時候,Pulsar 客戶端將自動調用 Pulsar API 找到符合匹配規則的主題列表,然后訂閱這些主題。 如果此時有暫不存在的主題,那么一旦這些主題被創建,消費者會自動訂閱這些主題。

// Subscribe to all topics in a namespace
Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*");
Consumer<byte[]> allTopicsConsumer = pulsarClient.newConsumer().topicsPattern(allTopicsInNamespace).subscriptionName("subscription-1").subscribe();// Subscribe to a subsets of topics in a namespace, based on regex
Pattern someTopicsInNamespace = Pattern.compile("persistent://public/default/foo.*");
Consumer<byte[]> someTopicsConsumer = pulsarClient.newConsumer().topicsPattern(someTopicsInNamespace).subscriptionName("subscription-1").subscribe();

分區topic

普通的主題僅僅被保存在單個 broker中,這限制了主題的最大吞吐量。 Partitioned topics are a special type of topic that are handled by multiple brokers, thus allowing for higher throughput.

分區主題實際是通過在底層擁有 N 個內部主題來實現的,這個 N 的數量就是等于分區的數量。 當向分區的topic發送消息,每條消息被路由到其中一個broker。 Pulsar自動處理跨broker的分區分布。

路由模式確定每條消息該發往哪個分區而訂閱模式確定消息傳遞給哪個消費者。

分區topic和普通topic,對于訂閱模式如何工作,沒有任何不同。分區只是決定了從生產者生產消息到消費者處理及確認消息過程中發生的事情。

  • 路由模式
  1. RoundRobinPartition
    如果消息沒有指定 key,為了達到最大吞吐量,生產者會以 round-robin 方式將消息發布到所有分區。 請注意round-robin并不是作用于每條單獨的消息,而是作用于延遲處理的批次邊界,以確保批處理有效。 如果消息指定了key,分區生產者會根據key的hash值將該消息分配到對應的分區。 這是默認的模式。
  2. SinglePartition
    如果消息沒有指定 key,生產者將會隨機選擇一個分區,并發布所有消息到這個分區。 如果消息指定了key,分區生產者會根據key的hash值將該消息分配到對應的分區。
  3. CustomPartition
    使用自定義消息路由器實現來決定特定消息的分區。 用戶可以創建自定義路由模式:使用 Java client 并實現MessageRouter 接口。
  • 順序保證
    當使用 SinglePartition或者RoundRobinPartition模式時,如果消息有key,消息將會被路由到匹配的分區,這是基于ProducerBuilder 中HashingScheme 指定的散列shema。

  • 散列scheme
    HashingScheme 是代表一組標準散列函數的枚舉。為一個指定消息選擇分區時使用。

有兩種可用的散列函數: JavaStringHash 和Murmur3_32Hash. The default hashing function for producer is JavaStringHash. 請注意,當producer可能來自于不同語言客戶端時,JavaStringHash是不起作用的。建議使用Murmur3_32Hash。

非持久topic

持久性主題上的消息數據可以在 broker 重啟和訂閱者故障轉移之后繼續存在。

Pulsar也提供了非持久topic。非持久topic的消息不會被保存在硬盤上,只存活于內存中。當使用非持久topic分發時,殺掉Pulsar的broker或者關閉訂閱者,此topic( non-persistent))上所有的瞬時消息都會丟失,意味著客戶端可能會遇到消息缺失。

non-persistent://tenant/namespace/topic

默認非持久topic在broker上是開啟的。

  • 性能

    非持久topic讓producer有更低的發布延遲

  • 客戶端API

    非持久topic consumer.

PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
String npTopic = "non-persistent://public/default/my-topic";
String subscriptionName = "my-subscription-name";Consumer<byte[]> consumer = client.newConsumer().topic(npTopic).subscriptionName(subscriptionName).subscribe();

非持久 producer

Producer<byte[]> producer = client.newProducer().topic(npTopic).create();

消息保留和過期

Pulsar broker默認如下:

  • immediately delete all messages that have been acknowledged by a consumer。被消費者確認后立即刪除。

  • 以消息backlog的形式,持久保存所有的未被確認消息。

存留規則會被用于某namespace下所有的topic,指明哪些消息會被持久存儲,即使已經被確認過。 沒有被留存規則覆蓋的消息將會被刪除。 Without a retention policy, all of the acknowledged messages would be deleted.

圖中下面的是消息過期,有些消息即使還沒有被確認,也被刪除掉了。因為根據設置在namespace上的TTL,他們已經過期了。(例如,TTL為5分鐘,過了十分鐘消息還沒被確認)

消息去重

消息去重保證了一條消息只能在 Pulsar 服務端被持久化一次。 消息去重是一個 Pulsar 可選的特性,它能夠阻止不必要的消息重復,它保證了即使消息被消費了多次,也只會被保存一次。

生產者冪等

在 Pulsar 中,消息去重是在 broker上處理的,用戶不需要去修改客戶端的代碼。 相反,你只需要通過修改配置就可以實現。

去重和實際一次語義

消息去重,使 Pulsar 成為了流處理引擎(SPE)或者其他尋求 “僅僅一次” 語義的連接系統所需的理想消息系統。 如果消息系統沒有提供自動去重能力,那么 SPE (流處理引擎) 或者其他連接系統就必須自己實現去重語義,這意味著需要應用去承擔這部分的去重工作。 使用Pulsar,嚴格的順序保證不會帶來任何應用層面的代價。

消息延遲傳遞

延時消息功能允許你能夠過一段時間才能消費到這條消息,而不是消息發布后,就馬上可以消費到。

Broker 保存消息是不經過任何檢查的。 當消費者消費一條消息時,如果這條消息是延時消息,那么這條消息會被加入到DelayedDeliveryTracker當中。 訂閱檢查機制會從DelayedDeliveryTracker獲取到超時的消息,并交付給消費者。

  • broker 配置
# Whether to enable the delayed delivery for messages.
# If disabled, messages are immediately delivered and there is no tracking overhead.
delayedDeliveryEnabled=true# Control the ticking time for the retry of delayed message delivery,
# affecting the accuracy of the delivery time compared to the scheduled time.
# Default is 1 second.
delayedDeliveryTickTimeMillis=1000
  • producer配置
producer.newMessage().deliverAfter(3L, TimeUnit.Minute).value("Hello Pulsar!").send();

三、架構

http://pulsar.apache.org/docs/zh-CN/concepts-architecture-overview/

  1. 單個 Pulsar 集群由以下三部分組成:
  • 一個或者多個 broker 負責處理和負載均衡 producer 發出的消息,并將這些消息分派給 consumer;
  • Broker 與 Pulsar 配置存儲交互來處理相應的任務,并將消息存儲在 BookKeeper 實例中(又稱 bookies);
    Broker 依賴 ZooKeeper 集群處理特定的任務,等等。
  • 包含一個或多個 bookie 的 BookKeeper 集群負責消息的持久化存儲。
  • 一個Zookeeper集群,用來處理多個Pulsar集群之間的協調任務。

四、租戶

http://pulsar.apache.org/docs/zh-CN/concepts-multi-tenancy/

Multi Tenancy

成為一個多租戶系統是 Pulsar 最初設計理念的一部分。 并且,Pulsar 提出了租戶的概念。 租戶可以跨集群分布,每個租戶都可以有單獨的認證和授權機制。 租戶也是存儲配額、消息 TTL 和隔離策略的管理單元。

Pulsar 的多租戶性質主要體現在 topic 的 URL 中,其結構如下:

persistent://tenant/namespace/topic

可以看到,租戶是 topic 的最基本單位(比命名空間和 topic 名稱更為基本)。

五、消息壓縮

某些情況下,consumer并不需要完整的topic日志。 他們可能只需要幾個值來構造一個更 “淺” 的日志圖像, 也許僅僅只是最新的值。

Topic壓縮的工作原理
當通過命令行觸發topic壓縮,Pulsar將會從頭到尾迭代整個topic。 對于它碰到的每個key,壓縮程序將會只保留這個key最近的事件。

之后,broker將會創建一個新的BookKeeper ledger 然后開始對topic的每條消息進行第二次迭代。 對于每條消息,如果key匹配到它的最新事件,key的數據內容,消息ID,元數據將會被寫入最新創建的ledger。 如果key并沒有匹配到最新的消息,消息將被跳過。 如果給定的消息,負載是空的,它將被跳過并且視為刪除(類似key-value數據庫中的 tombstones概念); At the end of this second iteration through the topic, the newly created BookKeeper ledger is closed and two things are written to the topic’s metadata: the ID of the BookKeeper ledger and the message ID of the last compacted message (this is known as the compaction horizon of the topic). 寫入元數據后,壓縮就完成了。

初始化壓縮操作完成后,將來任何對壓縮層位及壓縮backlog的修改,都會通知給擁有該topic的Pulsar broker。 當下列更改發生時:

啟用讀取壓縮功能的客戶端(consumer和reader),將會嘗試從topic中讀取消息,或者:
像從正常的主題那樣讀取(如果消息的ID大于等于壓縮層位),或
從壓縮層位的開始讀取(如果消息ID小于壓縮層位)

六、Pulsar ATS-SNI 路由

代理服務器是一個中轉服務,它通過網絡轉發多個客戶端的請求到不同的后端服務器上。 代理服務器在正向代理和反向代理的場景中起到了類似“流量檢查”的作用。它能給你的系統帶來如下好處,如負載均衡,性能,安全,自動擴縮容等等。

代理服務起到了反向代理的作用,它在所有的 broker 前面創建了一個網關。 Pulsar 不支持Apache Traffic Server (ATS), HAProxy, Nginx和Envoy 等開源代理組件。 代理服務器支持SNI 路由, SNI 路由用于在不斷開 SSL 連接的情況下將流量轉發到目標地址。 四層路由轉發提供了更大的透明度,因為轉發的出口連接是根據TCP包里面攜帶的目標地址信息來確定的。

默認情況下,records.config文件在本地的/usr/local/etc/trafficserver/目錄下。 該文件包含了 ATS 相關的可配置參數。

要修改配置文件records.config,有如下步驟。

通過參數http.server_ports配置 TLS 監聽的端口,通過修改參數: ssl.client.cert.path和ssl.client.cert.filename,修改所使用的代理證書,以建立安全的 TLS 隧道。
通過參數http.connect_port配置 Broker 的服務端口,用來建立隧道。 假設 Pulsar broker 監聽的是443和6651端口,那么就需要修改配置http.connect_ports為對應的值。

七、Multiple advertised listeners

Pulsar 客戶端使用多個 Advertised 監聽器需要以下兩步。

  1. 在 broker 配置文件配置多個 Advertised 監聽器。
advertisedListeners={listenerName}:pulsar://xxxx:6650, {listenerName}:pulsar+ssl://xxxx:6651
  1. 為客戶端指定監聽器名稱。

    PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://xxxx:6650").listenerName("external").build();
    

八、schema

https://pulsar.apache.org/docs/zh-CN/schema-evolution-compatibility/

  1. 無schema得情況下創建producer,則 producer 只能 produce byte[] 類型的消息。 如果有 POJO 類,則需要在發送消息前將 POJO 序列化為字節。

    Producer<byte[]> producer = client.newProducer().topic(topic).create();
    User user = new User("Tom", 28);
    byte[] message =// serialize the `user` by yourself;
    producer.send(message);
    
  2. 有schema。如果通過指定 schema 來創建 producer,則可以直接將類發送到 topic,而無需考慮如何將 POJO 序列化為字節。

    Producer<User> producer = client.newProducer(JSONSchema.of(User.class)).topic(topic).create();
    User user = new User("Tom", 28);
    producer.send(user);
    

    九、pulsar函數

    [外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-9DQPoHWw-1627607746077)(/Users/pengwang/Library/Application Support/typora-user-images/image-20210719111602674.png)]

使用 Pulsar Functions SDK for Java 實現一個 Java 函數,你可以實現如下函數:

import java.util.function.Function;public class JavaNativeExclamationFunction implements Function<String, String> {@Overridepublic String apply(String input) {return String.format("%s!", input);}
}
package org.example.functions;import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;import java.util.Arrays;public class WordCountFunction implements Function<String, Void> {// This function is invoked every time a message is published to the input topic@Overridepublic Void process(String input, Context context) throws Exception {Arrays.asList(input.split(" ")).forEach(word -> {String counterKey = word.toLowerCase();context.incrCounter(counterKey, 1);});return null;}
}

部署:將上面的代碼編譯成可部署的 JAR 文件,可以使用如下命令行將 JAR 包部署到 Pulsar 集群中

$ bin/pulsar-admin functions create \--jar target/my-jar-with-dependencies.jar \--classname org.example.functions.WordCountFunction \--tenant public \--namespace default \--name word-count \--inputs persistent://public/default/sentences \--output persistent://public/default/count

九、部署

一、單機部署

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-b0R29FYO-1627607746079)(/Users/pengwang/Library/Application Support/typora-user-images/image-20210719153532618.png)]

啟動命令:bin/pulsar standalone

二、集群部署

  1. 架構圖

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-rD0nZtdY-1627607746081)(/Users/pengwang/Library/Application Support/typora-user-images/image-20210719154214979.png)]

  1. 要求
  • 至少 6 臺 Linux 機器或虛擬機
    • 3 臺部署Zookeeper
    • 3臺部署 Pulsar broker 和 Bookeeper bookie
  • 包含 Pulsar 所有 broker 主機的 DNS(域名解析服務器)
  1. 硬件條件

    zookeeper可以使用功能較弱的機器或虛擬機。脈沖星僅在周期協調和配置相關的任務中使用ZooKeeper,不用于基本操作。

    運行 bookie 和 Pulsar broker 的機器,必須使用高規格的機器。 例如,對于AWS部署,i3.4xlarge實例可能是合適的。 你可以參考如下方法選擇機器:

    • 高性能的CPU和10Gbps NIC (適用于Pulsar brokers)
    • 小型快速固態硬盤(SSD)或硬盤驅動器(HDD),帶有RAID控制器和電池供電的寫緩存(適用于BookKeeper bookies)。
  2. 安裝

    一. pulsar

    wget https://archive.apache.org/dist/pulsar/pulsar-2.8.0/apache-pulsar-2.8.0-bin.tar.gztar -xvzf apache-pulsar-2.8.0-bin.tar.gz
    cd apache-pulsar-2.8.0
    

    二.可選組件

    Connector,分層存儲。

    三.zookeeper集群

    部署 Pulsar 集群,你必須首先部署 Zookeeper (在所有其他組件之前)。 建議部署3個節點的 Zookeeper 集群。

    1. 將所有zookeeper信息添加到conf/zookeeper.conf。如:

      server.1=zk1.us-west.example.com:2888:3888
      server.2=zk2.us-west.example.com:2888:3888
      server.3=zk3.us-west.example.com:2888:3888
      
    2. 在每臺zookeeper機器設置myid。

       mkdir -p data/zookeeperecho 1 > data/zookeeper/myid
      
    3. 啟動zookeeper。

      $ bin/pulsar-daemon start zookeeper
      
      1. 初始化集群元數據。

        $ bin/pulsar initialize-cluster-metadata \--cluster pulsar-cluster-1 \--zookeeper zk1.us-west.example.com:2181 \--configuration-store zk1.us-west.example.com:2181 \--web-service-url http://pulsar.us-west.example.com:8080 \--web-service-url-tls https://pulsar.us-west.example.com:8443 \--broker-service-url pulsar://pulsar.us-west.example.com:6650 \--broker-service-url-tls pulsar+ssl://pulsar.us-west.example.com:6651
        
    4. 部署bookeeper集群

      在conf/bookkeerper.conf 配置bookeeper bookies。設置zkServers參數。

      zkServers=zk1.us-west.example.com:2181,zk2.us-west.example.com:2181,zk3.us-west.example.com:2181
      

      在每臺bookeeper上啟動bookie。

      后臺運行:

      $ bin/pulsar-daemon start bookie
      

      前臺運行:

      $ bin/pulsar bookie
      

      驗證bookie是否正常工作:

      $ bin/bookkeeper shell bookiesanity
      

      這個命令會在本地的 bookie 創建一個臨時的 BookKeeper ledger,往里面寫一些條目,然后讀取它,最后刪除這個 ledger。

      啟動了所有的 bookie 后,你能夠在任意一臺bookie上,使用BookKeeper shell的simpletest命令,去校驗集群內所有的 bookie 是否都已經啟動。

      $ bin/bookkeeper shell simpletest --ensemble <num-bookies> --writeQuorum <num-bookies> --ackQuorum <num-bookies> --numEntries <num-entries>
      
    5. 部署pulsar broker

      Broker 配置中有一些非常重要的參數,這些參數可以確保每個Broker 連接到已部署的 ZooKeeper 集群。 需要確認 zookeeperServersconfigurationStoreServers 配置項的值是正確的。 在當前情況下,由于只有一個集群,沒有單獨用來存儲配置的 Zookeeper 集群,那么配置項configurationStoreServerszookeeperServers 是一樣的值。

      zookeeperServers=zk1.us-west.example.com:2181,zk2.us-west.example.com:2181,zk3.us-west.example.com:2181
      configurationStoreServers=zk1.us-west.example.com:2181,zk2.us-west.example.com:2181,zk3.us-west.example.com:2181
      

      你必須配置集群的名字( 初始化集群元數據 提供的集群名字必須和這個配置項匹配):

      clusterName=pulsar-cluster-1
      

      此外,初始化集群的時候提供的 broker 和 web 服務的端口也必須和下面的配置匹配(特別是當你定義了和默認值不同的端口的時候)

      brokerServicePort=6650
      brokerServicePortTls=6651
      webServicePort=8080
      webServicePortTls=8443
      
    6. 啟用pulsar function(可選)

      1. conf/broker.conf文件的配置項functionsWorkerEnabled設置為true,啟用函數 worker,

        functionsWorkerEnabled=true
        
      2. 修改conf/functions_worker.yml的配置項pulsarFunctionsCluster 為集群的名稱,該名稱和初始化集群元數據使用的集群名字是同一個。

        pulsarFunctionsCluster: pulsar-cluster-1
        
    7. 啟動broker。

      后臺:

      $ bin/pulsar-daemon start broker
      

      前臺:

      $ bin/pulsar broker
      

      一旦啟動了你打算使用的所有 broker, 你的 Pulsar 集群應該就準備就緒了!

      十、系統管理

      1. Pulsar 依靠兩個外部系統完成重要工作:
      • ZooKeeper 負責各種與配置和協調相關的任務。

      • BookKeeper 負責消息數據的 持久化存儲。

        ZooKeeper 和 BookKeeper 都是 Apache 開源項目。

      1. Pulsar manager

        docker 安裝:

        docker pull apachepulsar/pulsar-manager:v0.2.0
        docker run -it \-p 9527:9527 -p 7750:7750 \-e SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties \apachepulsar/pulsar-manager:v0.2.0
        

        設置管理員賬號密碼:

        CSRF_TOKEN=$(curl http://localhost:7750/pulsar-manager/csrf-token)
        curl \-H 'X-XSRF-TOKEN: $CSRF_TOKEN' \-H 'Cookie: XSRF-TOKEN=$CSRF_TOKEN;' \-H "Content-Type: application/json" \-X PUT http://localhost:7750/pulsar-manager/users/superuser \-d '{"name": "admin", "password": "apachepulsar", "description": "test", "email": "username@test.org"}'
        

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://808629.com/196799.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 86后生记录生活 Inc. 保留所有权利。

底部版权信息