Kafka:消息是如何在服務(wù)端存儲(chǔ)與讀取的,你真的知道嗎?
前言
經(jīng)過(guò)前 5 篇文章的介紹,估么著小伙伴們已經(jīng)對(duì)消息生產(chǎn)和消費(fèi)的流程應(yīng)該有一個(gè)比較清晰的認(rèn)識(shí)了。當(dāng)然小伙伴們肯定也比較好奇,Kafka 能夠處理千萬(wàn)級(jí)消息,那它的消息是如何在 Partition 上存儲(chǔ)的呢?今天這篇文章就來(lái)為大家揭秘消息是如何存儲(chǔ)的。本文主要從消息的邏輯存儲(chǔ)和物理存儲(chǔ)兩個(gè)角度來(lái)介紹其實(shí)現(xiàn)原理。
文章概覽
-
Partition、Replica、Log 和 LogSegment 的關(guān)系。 -
寫入消息流程分析。 -
消費(fèi)消息及副本同步流程分析。
Partition、Replica、Log 和 LogSegment 的關(guān)系
假設(shè)有一個(gè) Kafka 集群,Broker 個(gè)數(shù)為 3,Topic 個(gè)數(shù)為 1,Partition 個(gè)數(shù)為 3,Replica 個(gè)數(shù)為 2。Partition 的物理分布如下圖所示。
從上圖可以看出,該 Topic 由三個(gè) Partition 構(gòu)成,并且每個(gè) Partition 由主從兩個(gè)副本構(gòu)成。每個(gè) Partition 的主從副本分布在不同的 Broker 上,通過(guò)這點(diǎn)也可以看出,當(dāng)某個(gè) Broker 宕機(jī)時(shí),可以將分布在其他 Broker 上的從副本設(shè)置為主副本,因?yàn)橹挥兄鞲北緦?duì)外提供讀寫請(qǐng)求,當(dāng)然在最新的 2.x 版本中從副本也可以對(duì)外讀請(qǐng)求了。將主從副本分布在不同的 Broker 上從而提高系統(tǒng)的可用性。
Partition 的實(shí)際物理存儲(chǔ)是以 Log 文件的形式展示的,而每個(gè) Log 文件又以多個(gè) LogSegment 組成。Kafka 為什么要這么設(shè)計(jì)呢?其實(shí)原因比較簡(jiǎn)單,隨著消息的不斷寫入,Log 文件肯定是越來(lái)越大,Kafka 為了方便管理,將一個(gè)大文件切割成一個(gè)一個(gè)的 LogSegment 來(lái)進(jìn)行管理;每個(gè) LogSegment 由數(shù)據(jù)文件和索引文件構(gòu)成,數(shù)據(jù)文件是用來(lái)存儲(chǔ)實(shí)際的消息內(nèi)容,而索引文件是為了加快消息內(nèi)容的讀取。
可能又有朋友會(huì)問(wèn),Kafka 本身消費(fèi)是以 Partition 維度順序消費(fèi)消息的,磁盤在順序讀的時(shí)候效率很高完全沒(méi)有必要使用索引啊。其實(shí) Kafka 為了滿足一些特殊業(yè)務(wù)需求,比如要隨機(jī)消費(fèi) Partition 中的消息,此時(shí)可以先通過(guò)索引文件快速定位到消息的實(shí)際存儲(chǔ)位置,然后進(jìn)行處理。
總結(jié)一下 Partition、Replica、Log 和 LogSegment 之間的關(guān)系。消息是以 Partition 維度進(jìn)行管理的,為了提高系統(tǒng)的可用性,每個(gè) Partition 都可以設(shè)置相應(yīng)的 Replica 副本數(shù),一般在創(chuàng)建 Topic 的時(shí)候同時(shí)指定 Replica 的個(gè)數(shù);Partition 和 Replica 的實(shí)際物理存儲(chǔ)形式是通過(guò) Log 文件展現(xiàn)的,為了防止消息不斷寫入,導(dǎo)致 Log 文件大小持續(xù)增長(zhǎng),所以將 Log 切割成一個(gè)一個(gè)的 LogSegment 文件。
注意: 在同一時(shí)刻,每個(gè)主 Partition 中有且只有一個(gè) LogSegment 被標(biāo)識(shí)為可寫入狀態(tài),當(dāng)一個(gè) LogSegment 文件大小超過(guò)一定大小后(比如當(dāng)文件大小超過(guò) 1G,這個(gè)就類似于 HDFS 存儲(chǔ)的數(shù)據(jù)文件,HDFS 中數(shù)據(jù)文件達(dá)到 128M 的時(shí)候就會(huì)被分出一個(gè)新的文件來(lái)存儲(chǔ)數(shù)據(jù)),就會(huì)新創(chuàng)建一個(gè) LogSegment 來(lái)繼續(xù)接收新寫入的消息。
寫入消息流程分析
流程解析
在第 3 篇文章講過(guò),生產(chǎn)者客戶端對(duì)于每個(gè) Partition 一次會(huì)發(fā)送一批消息到服務(wù)端,服務(wù)端收到一批消息后寫入相應(yīng)的 Partition 上。上圖流程主要分為如下幾步:
-
客戶端消息收集器收集屬于同一個(gè)分區(qū)的消息,并對(duì)每條消息設(shè)置一個(gè)偏移量,且每一批消息總是從 0 開始單調(diào)遞增。比如第一次發(fā)送 3 條消息,則對(duì)三條消息依次編號(hào) [0,1,2],第二次發(fā)送 4 條消息,則消息依次編號(hào)為 [0,1,2,3]。注意此處設(shè)置的消息偏移量是相對(duì)偏移量。 -
客戶端將消息發(fā)送給服務(wù)端,服務(wù)端拿到下一條消息的絕對(duì)偏移量,將傳到服務(wù)端的這批消息的相對(duì)偏移量修改成絕對(duì)偏移量。 -
將修改后的消息以追加的方式追加到當(dāng)前活躍的 LogSegment 后面,然后更新絕對(duì)偏移量。 -
將消息集寫入到文件通道。 -
文件通道將消息集 flush 到磁盤,完成消息的寫入操作。
了解以上過(guò)程后,我們?cè)趤?lái)看看消息的具體構(gòu)成情況。
一條消息由如下三部分構(gòu)成:
-
OffSet:偏移量,消息在客戶端發(fā)送前將相對(duì)偏移量存儲(chǔ)到該位置,當(dāng)消息存儲(chǔ)到 LogSegment 前,先將其修改為絕對(duì)偏移量在寫入磁盤。 -
Size:本條 Message 的內(nèi)容大小 -
Message:消息的具體內(nèi)容,其具體又由 7 部分組成,crc 用于校驗(yàn)消息,Attribute 代表了屬性,key-length 和 value-length 分別代表 key 和 value 的長(zhǎng)度,key 和 value 分別代表了其對(duì)應(yīng)的內(nèi)容。
消息偏移量的計(jì)算過(guò)程
通過(guò)以上流程可以看出,每條消息在被實(shí)際存儲(chǔ)到磁盤時(shí)都會(huì)被分配一個(gè)絕對(duì)偏移量后才能被寫入磁盤。在同一個(gè)分區(qū)內(nèi),消息的絕對(duì)偏移量都是從 0 開始,且單調(diào)遞增;在不同分區(qū)內(nèi),消息的絕對(duì)偏移量是沒(méi)有任何關(guān)系的。接下來(lái)討論下消息的絕對(duì)偏移量的計(jì)算規(guī)則。
確定消息偏移量有兩種方式,一種是順序讀取每一條消息來(lái)確定,此種方式代價(jià)比較大,實(shí)際上我們并不想知道消息的內(nèi)容,而只是想知道消息的偏移量;第二種是讀取每條消息的 Size 屬性,然后計(jì)算出下一條消息的起始偏移量。比如第一條消息內(nèi)容為 “abc”,寫入磁盤后的偏移量為:8(OffSet)+ 4(Message 大?。? 3(Message 內(nèi)容的長(zhǎng)度)= 15。第二條寫入的消息內(nèi)容為“defg”,其起始偏移量為 15,下一條消息的起始偏移量應(yīng)該是:15+8+4+4=31,以此類推。
消費(fèi)消息及副本同步流程分析
和寫入消息流程不同,讀取消息流程分為兩種情況,分別是消費(fèi)端消費(fèi)消息和從副本(備份副本)同步主副本的消息。在開始分析讀取流程之前,需要先明白幾個(gè)用到的變量,不然流程分析可能會(huì)看的比較糊涂。
-
BaseOffSet:基準(zhǔn)偏移量,每個(gè) Partition 由 N 個(gè) LogSegment 組成,每個(gè) LogSegment 都有基準(zhǔn)偏移量,大概由如下構(gòu)成,數(shù)組中每個(gè)數(shù)代表一個(gè) LogSegment 的基準(zhǔn)偏移量:[0,200,400,600, ...]。 -
StartOffSet:起始偏移量,由消費(fèi)端發(fā)起讀取消息請(qǐng)求時(shí),指定從哪個(gè)位置開始消費(fèi)消息。 -
MaxLength:拉取大小,由消費(fèi)端發(fā)起讀取消息請(qǐng)求時(shí),指定本次最大拉取消息內(nèi)容的數(shù)據(jù)大小。該參數(shù)可以通過(guò) max.partition.fetch.bytes
來(lái)指定,默認(rèn)大小為 1M。 -
MaxOffSet:最大偏移量,消費(fèi)端拉取消息時(shí),最高可拉取消息的位置,即俗稱的“高水位”。該參數(shù)由服務(wù)端指定,其作用是為了防止生產(chǎn)端還未寫入的消息就被消費(fèi)端進(jìn)行消費(fèi)。此參數(shù)對(duì)于從副本同步主副本不會(huì)用到。 -
MaxPosition:LogSegment 的最大位置,確定了起始偏移量在某個(gè) LogSegment 上開始,讀取 MaxLength 后,不能超過(guò) MaxPosition。MaxPosition 是一個(gè)實(shí)際的物理位置,而非偏移量。
假設(shè)消費(fèi)端從 000000621 位置開始消費(fèi)消息,關(guān)于幾個(gè)變量的關(guān)系如下圖所示。
消費(fèi)端和從副本拉取流程如下:
-
客戶端確定拉取的位置,即 StartOffSet 的值,找到主副本對(duì)應(yīng)的 LogSegment。 -
LogSegment 由索引文件和數(shù)據(jù)文件構(gòu)成,由于索引文件是從小到大排列的,首先從索引文件確定一個(gè)小于等于 StartOffSet 最近的索引位置。 -
根據(jù)索引位置找到對(duì)應(yīng)的數(shù)據(jù)文件位置,由于數(shù)據(jù)文件也是從小到大排列的,從找到的數(shù)據(jù)文件位置順序向后遍歷,直到找到和 StartOffSet 相等的位置,即為消費(fèi)或拉取消息的位置。 -
從 StartOffSet 開始向后拉取 MaxLength 大小的數(shù)據(jù),返回給消費(fèi)端或者從副本進(jìn)行消費(fèi)或備份操作。
假設(shè)拉取消息起始位置為 00000313,消息拉取流程圖如下:
總結(jié)
本文從邏輯存儲(chǔ)和物理存儲(chǔ)的角度,分析了消息的寫入與消費(fèi)流程。其中邏輯存儲(chǔ)是以 Partition 來(lái)管理一批一批的消息,Partition 映射 Log 對(duì)象,Log 對(duì)象管理了多個(gè) LogSegment,多個(gè) Partition 構(gòu)成了一個(gè)完整的 Topic。消息的實(shí)際物理存儲(chǔ)是由一個(gè)一個(gè)的 LogSegment 構(gòu)成,每個(gè) LogSegment 又由索引文件和數(shù)據(jù)文件構(gòu)成。下篇文章我們來(lái)分析一些實(shí)際生產(chǎn)環(huán)境中的常用操作及數(shù)據(jù)接入方案,敬請(qǐng)期待。
特別推薦一個(gè)分享架構(gòu)+算法的優(yōu)質(zhì)內(nèi)容,還沒(méi)關(guān)注的小伙伴,可以長(zhǎng)按關(guān)注一下:
長(zhǎng)按訂閱更多精彩▼
如有收獲,點(diǎn)個(gè)在看,誠(chéng)摯感謝
免責(zé)聲明:本文內(nèi)容由21ic獲得授權(quán)后發(fā)布,版權(quán)歸原作者所有,本平臺(tái)僅提供信息存儲(chǔ)服務(wù)。文章僅代表作者個(gè)人觀點(diǎn),不代表本平臺(tái)立場(chǎng),如有問(wèn)題,請(qǐng)聯(lián)系我們,謝謝!