在開源的業(yè)界已經(jīng)有這么多消息隊列中間件了,Pulsar 作為一個新勢力到底有什么優(yōu)點呢?
Pulsar 自從出身就不斷的再和其他的消息隊列(Kafka,RocketMQ 等等)做比較。
但是 Pulsar 的設(shè)計思想和大多數(shù)的消息隊列中間件都不同,具備了高吞吐,低延遲,計算存儲分離,多租戶,異地復(fù)制等功能。
所以 Pulsar 也被譽為下一代消息隊列中間件,接下來我會一一對其進(jìn)行詳細(xì)的解析。
Pulsar 架構(gòu)原理
Pulsar 架構(gòu)原理如下圖:
整體的架構(gòu)和其他的消息隊列中間件差別不是太大,相信大家也看到了很多熟悉的名詞,接下來會給大家一一解釋這些名詞的含義。
名詞解釋:
-
Producer:消息生產(chǎn)者,將消息發(fā)送到 Broker。
-
Consumer:消息消費者,從 Broker 讀取消息到客戶端,進(jìn)行消費處理。
-
Broker:可以看作是 Pulsar 的 Server,Producer 和 Consumer 都看作是 Client 消息處理的節(jié)點。
Pulsar 的 Broker 和其他消息中間件的都不一樣,他是無狀態(tài)的沒有存儲,所以可以無限制的擴展,這個后面也會詳解講到。
-
Bookie:負(fù)責(zé)所有消息的持久化,這里采用的是 Apache Bookeeper。
-
ZK:和 Kafka 一樣 Pulsar 也是使用 ZK 保存一些元數(shù)據(jù),比如配置管理,Topic 分配,租戶等等。
-
Service Discovery:可以理解為 Pulsar 中的 Nginx,只用一個 URL 就可以和整個 Broker 進(jìn)行打交道,當(dāng)然也可以使用自己的服務(wù)發(fā)現(xiàn)。
客戶端發(fā)出的讀取,更新或刪除主題的初始請求將發(fā)送給可能不是處理該主題的 Broker 。
如果這個 Broker 不能處理該主題的請求,Broker 將會把該請求重定向到可以處理主題請求的 Broker。
不論是 Kafka,RocketMQ 還是我們的 Pulsar 其實作為消息隊列中間件最為重要的大概就是分為三個部分:
-
Producer 是如何生產(chǎn)消息,發(fā)送到對應(yīng)的 Broker。
-
Broker 是如何處理消息,將高效的持久化以及查詢。
-
Consumer 是如何進(jìn)行消費消息。
而我們后面也會圍繞著這三個部分進(jìn)行展開講解。
Producer 生產(chǎn)消息
先簡單看一下如何用代碼進(jìn)行消息發(fā)送:PulsarClient client = PulsarClient.create("pulsar://pulsar.us-west.example.com:6650"); Producer producer = client.createProducer( "persistent://sample/standalone/ns1/my-topic"); // Publish 10 messages to the topic for (int i = 0; i < 10; i++) { producer.send("my-message".getBytes()); }
Step1:首先使用我們的 URL 創(chuàng)建一個 Client 這個 URL 是我們 Service Discovery 的地址,如果我們使用單機模式可以進(jìn)行直連。
Step2:我們傳入了一個類似 URL 的參數(shù),我們只需要傳遞這個就能指定我們到底在哪個 Topic 或者 Namespace 下面創(chuàng)建的,URL 的格式為:
{persistent|non-persistent}://tenant/namespace/topic
上面三個步驟中,步驟 1,2 屬于我們準(zhǔn)備階段,用于構(gòu)建客戶端,構(gòu)建 Producer,我們真的核心邏輯在 Send 中。
那這里我先提幾個小問題,大家可以先想想在其他消息隊列中是怎么做的,然后再對比 Pulsar 的看一下:
-
我們調(diào)用了 Send 之后是會立即發(fā)送嗎?
-
如果是多 Partition,怎么找到我應(yīng)該發(fā)送到哪個 Broker 呢?
發(fā)送模式
我們上面說了 Send 分為 Async 和 Sync 兩種模式,但實際上在 Pulsar 內(nèi)部 Sync 模式也是采用的 Async 模式,在 Sync 模式下模擬回調(diào)阻塞,達(dá)到同步的效果。
這個在 Kafka 中也是采用的這個模式,但是在 RocketMQ 中,所有的 Send 都是真正的同步,都會直接請求到 Broker。
基于這個模式,在 Pulsar 和 Kafka 中都支持批量發(fā)送,在 RocketMQ 中是直接發(fā)送,批量發(fā)送有什么好處呢?
當(dāng)我們發(fā)送的 TPS 特別高的時候,如果每次發(fā)送都直接和 Broker 直連,可能會做很多的重復(fù)工作,比如壓縮,鑒權(quán),創(chuàng)建鏈接等等。
比如我們發(fā)送 1000 條消息,那么可能會做 1000 次這個重復(fù)的工作,如果是批量發(fā)送的話這 1000 條消息合并成一次請求,相對來說壓縮,鑒權(quán)這些工作就只需要做一次。
發(fā)送負(fù)載均衡
在消息隊列中通常會將 Topic 進(jìn)行水平擴展,在 Pulsar 和 Kafka 中叫做 Partition,在 RocketMQ 中叫做 Queue,本質(zhì)上都是分區(qū),我們可以將不同分區(qū)落在不同的 Broker 上,達(dá)到我們水平擴展的效果。
在我們發(fā)送的時候可以自己制定選擇 Partition 的策略,也可以使用它默認(rèn)輪訓(xùn) Partition 策略。
當(dāng)我們選擇了 Partition 之后,我們怎么確定哪一個 Partition 對應(yīng)哪一個 Broker 呢?
Step1:我們所有的信息分區(qū)映射信息在 ZK 和 Broker 的緩存中都有進(jìn)行存儲。
Step2:我們通過查詢 Broker,可以獲取到分區(qū)和 Broker 的關(guān)系,并且定時更新。
Step3:在 Pulsar 中每個分區(qū)在發(fā)送端的時候都被抽象成為一個單獨的 Producer,這個和 Kafka,RocketMQ 都不一樣。
在 Kafka 里面大概就是選擇了 Partition 之后然后再去找 Partition 對應(yīng)的 Broker 地址,然后進(jìn)行發(fā)送。
Pulsar 將每一個 Partition 都封裝成 Producer,在代碼實現(xiàn)上就不需要去關(guān)注他具體對應(yīng)的是哪個 Broker,所有的邏輯都在 Producer 這個代碼里面,整體來說比較干凈。
壓縮消息
消息壓縮是優(yōu)化信息傳輸?shù)氖侄沃?,我們通常看見一些大型文件都會是以一個壓縮包的形式提供下載。
在我們消息隊列中我們也可以用這種思想,我們將一個 Batch 的消息,比如有 1000 條可能有 1M 的傳輸大小,但是經(jīng)過壓縮之后可能就只會有幾十 KB,增加了我們和 Broker 的傳輸效率,但是與之同時我們的 CPU 也帶來了損耗。
Pulsar 客戶端支持多種壓縮類型,如 lz4、zlib、zstd、snappy 等。
client.newProducer() .topic(“test-topic”) .compressionType(CompressionType.LZ4) .create();
Broker
接下來我們來說說第二個比較重要的部分 Broker,在 Broker 的設(shè)計中 Pulsar 和其他所有的消息隊列差別比較大,而正是因為這個差別也成為了他的特點。
計算和存儲分離
首先我們來說說他最大的特點:計算和存儲分離。
我們在開始的說過 Pulsar 是下一代消息隊列,就非常得益于他這個架構(gòu)設(shè)計,無論是 Kafka 還是 RocketMQ,所有的計算和存儲都放在同一個機器上。
這個模式有幾個弊端:
-
擴展困難:當(dāng)我們需要擴展的集群的時候,我們通常是因為 CPU 或者磁盤其中一個原因影響,但是我們卻要申請一個可能 CPU 和磁盤配置都很好的機器,造成了資源浪費。并且 Kafka 這種進(jìn)行擴展,還需要進(jìn)行遷移數(shù)據(jù),過程十分繁雜。
-
負(fù)載不均衡: 當(dāng)某些 Partion 數(shù)據(jù)特別多的時候,會導(dǎo)致 Broker 負(fù)載不均衡,如下面圖,如果某個 Partition 數(shù)據(jù)特別多,那么就會導(dǎo)致某個 Broker(輪船)承載過多的數(shù)據(jù),但是另外的 Broker 可能又比較空閑。
-
對于計算:也就是我們的 Broker,提供消息隊列的讀寫,不存儲任何數(shù)據(jù),無狀態(tài)對于我們擴展非常友好,只要你機器足夠,就能隨便上。
擴容 Broker 往往適用于增加 Consumer 的吞吐,當(dāng)我們有一些大流量的業(yè)務(wù)或者活動,比如電商大促,可以提前進(jìn)行 Broker 的擴容。
-
對于存儲:也就是我們的 Bookie,只提供消息隊列的存儲,如果對消息量有要求的,我們可以擴容 Bookie,并且我們不需要遷移數(shù)據(jù),擴容十分方便。
消息存儲
名詞解析:
-
Entry:是存儲到 bookkeeper 中的一條記錄,其中包含 Entry ID,記錄實體等。
-
Ledger:可以認(rèn)為 ledger 是用來存儲 Entry 的,多個 Entry 序列組成一個 ledger。
-
Journal:其實就是 bookkeeper 的 WAL(write ahead log),用于存 bookkeeper 的事務(wù)日志,journal 文件有一個最大大小,達(dá)到這個大小后會新起一個 journal 文件。
-
Entry log:存儲 Entry 的文件,ledger 是一個邏輯上的概念,entry 會先按 ledger 聚合,然后寫入 entry log 文件中。同樣,entry log 會有一個最大值,達(dá)到最大值后會新起一個新的 entry log 文件。
-
Index file:ledger 的索引文件,ledger 中的 entry 被寫入到了 entry log 文件中,索引文件用于 entry log 文件中每一個 ledger 做索引,記錄每個 ledger 在 entry log 中的存儲位置以及數(shù)據(jù)在 entry log 文件中的長度。
-
MetaData Storage: 元數(shù)據(jù)存儲,是用于存儲 bookie 相關(guān)的元數(shù)據(jù),比如 bookie 上有哪些 ledger,bookkeeper 目前使用的是 zk 存儲,所以在部署 bookkeeper 前,要先有 zk 集群。
整體架構(gòu)上的寫流程:
-
Step1:Broker 發(fā)起寫請求,首先對 Journal 磁盤寫入 WAL,熟悉 MySQL 的朋友知道 redolog,journal 和 redolog 作用一樣都是用于恢復(fù)沒有持久化的數(shù)據(jù)。
-
Step2:然后再將數(shù)據(jù)寫入 index 和 ledger,這里為了保持性能不會直接寫盤,而是寫 pagecache,然后異步刷盤。
-
Step3:對寫入進(jìn)行 ack。
讀流程為:
-
Step1:先讀取 index,當(dāng)然也是先讀取 cache,再走 disk。
-
Step2:獲取到 index 之后,根據(jù) index 去 entry logger 中去對應(yīng)的數(shù)據(jù)。
如何高效讀寫?在 Kafka 中當(dāng)我們的 Topic 變多了之后,由于 Kafka 一個 Topic 一個文件,就會導(dǎo)致我們的磁盤 IO 從順序?qū)懽兂呻S機寫。
在 RocketMQ 中雖然將多個 Topic 對應(yīng)一個寫入文件,讓寫入變成了順序?qū)?,但是我們的讀取很容易導(dǎo)致我們的 Pagecache 被各種覆蓋刷新,這對于我們的 IO 的影響是非常大的。
所以 Pulsar 在讀寫兩個方面針對這些問題都做了很多優(yōu)化:
寫流程:順序?qū)?Pagecache。在寫流程中我們的所有的文件都是獨立磁盤,并且同步刷盤的只有 Journal。
Journal 是順序?qū)懸粋€ journal-wal 文件,順序?qū)懶史浅8?。ledger 和 index 雖然都會存在多個文件,但是我們只會寫入 Pagecache,異步刷盤,所以隨機寫不會影響我們的性能。
讀流程:broker cache+bookie cache,在 Pulsar 中對于追尾讀(tailing read)非常友好基本不會走 IO。
一般情況下我們的 Consumer 是會立即去拿 Producer 發(fā)送的消息的,所以這部分在持久化之后依然在 Broker 中作為 Cache 存在。
當(dāng)然就算 Broker 沒有 Cache(比如 Broker 是新建的),我們的 Bookie 也會在 Memtable 中有自己的 Cache,通過多重 Cache 減少讀流程走 IO。
我們可以發(fā)現(xiàn)在最理想的情況下讀寫的 IO 是完全隔離開來的,所以在 Pulsar 中能很容易就支持百萬級 Topic,而在我們的 Kafka 和 RocketMQ 中這個是非常困難的。
無限流式存儲
一個 Topic 實際上是一個 ledgers流(Segment),通過這個設(shè)計所以 Pulsar 他并不是一個單純的消息隊列系統(tǒng),他也可以代替流式系統(tǒng),所以他也叫流原生平臺,可以替代 Flink 等系統(tǒng)。
Segment 可以看作是我們寫入文件的一個基本維度,同一個 Segment 的數(shù)據(jù)會寫在同一個文件上面,不同 Segment 將會是不同文件,而 Segment 之間的在 Metadata 中進(jìn)行保存。
分層存儲
在 Kafka 和 RocketMQ 中消息是會有一定的保存時間的,因為磁盤會有空間限制。
在 Pulsar 中也提供這個功能,但是如果你想讓自己的消息永久存儲,那么可以使用分級存儲,我們可以將一些比較老的數(shù)據(jù),定時的刷新到廉價的存儲中,比如 s3,那么我們就可以無限存儲我們的消息隊列了。
數(shù)據(jù)復(fù)制
在 Pulsar 中的數(shù)據(jù)復(fù)制和 Kafka,RocketMQ 都有很大的不同,在其他消息隊列中通常是其他副本主動同步,通常這個時間就會變得不可預(yù)測。
-
Ensemble Size(E):決定給定 Ledger 可用的 Bookie 池大小。
-
Write Quorum Size(Qw):指定 Pulsar 向其中寫入 Entry 的 Bookie 數(shù)量。
-
Ack Quorum Size(Qa):指定必須 Ack 寫入的 Bookie 數(shù)量。
采用這種并發(fā)寫的方式,會更加高效的進(jìn)行數(shù)據(jù)復(fù)制,尤其是當(dāng)數(shù)據(jù)副本比較多的時候。
Consumer
接下來我們來聊聊 Pulsar 中最后一個比較重要的組成 Consumer。
訂閱模式
訂閱模式是用來定義我們的消息如何分配給不同的消費者,不同消息隊列中間件都有自己的訂閱模式。
一般我們常見的訂閱模式有:
-
集群模式:一條消息只能被一個集群內(nèi)的消費者所消費。
- 廣播模式: 一條消息能被集群內(nèi)所有的消費者消費。
在 Pulsar 中提供了 4 種訂閱模式,分別是:
獨占:顧名思義只能由一個消費者獨占,如果同一個集群內(nèi)有第二個消費者去注冊,第二個就會失敗,這個適用于全局有序的消息。
災(zāi)備:加強版獨占,如果獨占的那個掛了,會自動的切換到另外一個好的消費者,但是還是只能由一個獨占。
共享模式:這個模式看起來有點像集群模式,一條消息也是只能被一個集群內(nèi)消費者消費,但是和 RocketMQ 不同的是,RocketMQ 是以 Partition 維度,同一個 Partition 的數(shù)據(jù)都會被發(fā)到一個機器上。
在 Pulsar 中消費不會以 Partition 維度,而是輪訓(xùn)所有消費者進(jìn)行消息發(fā)送。這有個什么好處呢?
如果你有 100 臺機器,但是你只有 10 個 Partition 其實你只有 10 臺消費者能運轉(zhuǎn),但是在 Pulsar 中 100 臺機器都可以進(jìn)行消費處理。
鍵共享:類似上面說的 Partition 維度去發(fā)送,在 RocketMQ 中同一個 Key 的順序消息都會被發(fā)送到一個 Partition。
但是這里不會有 Partition 維度,而只是按照 Key 的 Hash 去分配到固定的 Consumer,也解決了消費者能力限制于 Partition 個數(shù)問題。
消息獲取模式
不論是在 Kafka 還是在 RocketMQ 中我們都是 Client 定時輪訓(xùn)我們的 Broker 獲取消息,這種模式叫做長輪訓(xùn)(Long-Polling)模式。
這種模式有一個缺點網(wǎng)絡(luò)開銷比較大,我們來計算一下 Consumer 被消費的時延,我們假設(shè) Broker 和 Consumer 之間的一次網(wǎng)絡(luò)延時為 R。
那么我們總共的時間為:
-
當(dāng)某一條消息 A 剛到 Broker 的,這個時候 long-polling 剛好打包完數(shù)據(jù)返回,Broker 返回到 Consumer 這個時間為 R。
-
Consumer 又再次發(fā)送 Request 請求,這個又為 R。
-
將我們的消息 A 返回給 Consumer 這里又為 R。
如果只考慮網(wǎng)絡(luò)時延,我們可以看見我們這條消息的消費時延大概是 3R,所以我們必須想點什么對其進(jìn)行一些優(yōu)化。
有同學(xué)可能馬上就能想到,我們消息來了直接推送給我們的 Consumer 不就對了,這下我們的時延只會有一次 R,這個就是我們常見的推模式。
但是簡單的推模式是有問題的,如果我們有生產(chǎn)速度遠(yuǎn)遠(yuǎn)大于消費速度,那么推送的消息肯定會干爆我們的內(nèi)存,這個就是背壓。
那么我們怎么解決背壓呢?我們就可以優(yōu)化推送方式,將其變?yōu)閯討B(tài)推送,我們結(jié)合 Long-polling,在 long-polling 請求時將 Buffer 剩余空間告知給 Broker,由 Broker 負(fù)責(zé)推送數(shù)據(jù)。
此時 Broker 知道最多可以推送多少條數(shù)據(jù),那么就可以控制推送行為,不至于沖垮 Consumer。
舉個例子:Consumer 發(fā)起請求時 Buffer 剩余容量為 100,Broker 每次最多返回 32 條消息。
那么 Consumer 的這次 long-polling 請求 Broker 將在執(zhí)行 3 次 Push(共 Push 96 條消息)之后返回 Response 給 Consumer(Response 包含 4 條消息)。
如果采用 long-polling 模型,Consumer 每發(fā)送一次請求 Broker 執(zhí)行一次響應(yīng)。
這個例子需要進(jìn)行 4 次 long-polling 交互(共 4 個 Request 和 4 個 Response,8 次網(wǎng)絡(luò)操作;Dynamic Push/Pull 中是 1 個 Request,3 次 Push 和 1 個 Response,共 5 次網(wǎng)絡(luò)操作)。
所以 Pulsar 就采用了這種消息獲取模式,從 Consumer 層進(jìn)一步優(yōu)化消息達(dá)到時間。
我覺得這個設(shè)計非常巧妙,很多中間件的這種 long-polling 模式都可以參考這種思想去做一個改善。
總結(jié)
Apache Pulsar 很多設(shè)計思想都和其他中間件不一樣,但無疑于其更加貼近于未來。
大膽預(yù)測一下其他的一些消息中間件未來的發(fā)展也都會向其靠攏,目前國內(nèi)的 Pulsar 使用者也是越來越多,騰訊云提供了 Pulsar 的云版本 TDMQ。
當(dāng)然還有一些其他的知名公司華為,知乎,虎牙等等有都在對其做一個逐步的嘗試,我相信 Pulsar 真的是一個趨勢。
最后也讓我想起了最近大江大河大結(jié)局的一句話:
所有的變化,都可能伴隨著痛苦和彎路,開放的道路,也不會是闊野坦途,但大江大河,奔涌向前的趨勢,不是任何險灘暗礁,能夠阻擋的。道之所在,雖千萬人吾往矣。
免責(zé)聲明:本文內(nèi)容由21ic獲得授權(quán)后發(fā)布,版權(quán)歸原作者所有,本平臺僅提供信息存儲服務(wù)。文章僅代表作者個人觀點,不代表本平臺立場,如有問題,請聯(lián)系我們,謝謝!