多線程 1 的最快操作
public?class?LongAdder?{
???private?long?count?=?0L;
???public?void?add()?{
???????count ;
???}
}
可以加鎖去實(shí)現(xiàn),但效率太低。public?class?LongAdder?{
???private?long?count?=?0L;
???public?void?add()?{
???synchronized(this){
???????count ;
???}
}
可以用原子類這種樂觀鎖實(shí)現(xiàn),比加 synchronized 鎖效率高很多。public?class?LongAdder?{
???private?AtomicLong?count?=?new?AtomicLong(0L);
???public?void?add()?{
???????count.incrementAndGet();
???}
}
當(dāng)然,更高級(jí)的玩法也可以自己調(diào)用 UNSAFE 模擬原子類里的 CAS 操作,但實(shí)際上就是把原子類的源碼給展開了。(v = count 應(yīng)該放在循環(huán)里)public?class?LongAdder?{
???private?volatile?long?count?=?0L;
???public?void?add()?{
???????boolean?success?=?false;
???????int?v =?count;
???????while(!success)?{
???????????success?=?UNSAFE.compareAndSwapLong(
????????????????????LongAdder.class,?countOffset,?v,?v ?1);
???????}
???}
}
這幾段多線程 1 的代碼如果看不明白,可以找些資料把這塊的基礎(chǔ)補(bǔ)一下哈,本文就不贅述了,我們繼續(xù)。關(guān)于這個(gè)多線程 1 操作,有沒有效率更高的辦法呢?分析需求
我們先別急著想,怎么把它變快,一頭扎到技術(shù)實(shí)現(xiàn)上。同我們接一個(gè)新產(chǎn)品的需求一樣,我們首先分析一下,這個(gè)產(chǎn)品提出這個(gè)需求的核心目的是什么,有時(shí)候往往可以使問題簡(jiǎn)化。我們想一個(gè)極端的場(chǎng)景,成百上千個(gè)線程一直連續(xù)不斷對(duì)這個(gè) count 進(jìn)行 1 操作,一直加上個(gè)一年,一年后,我們只需要看一下最終的值是多少,即可。整個(gè)功能就是這樣,加一年,最后看那么一下。我們看看之前的原子類 1 的代碼。
public?class?LongAdder?{
???private?AtomicLong?count?=?new?AtomicLong(0L);
???public?void?add()?{
???????count.incrementAndGet();
???}
}
每時(shí)每刻都將 1 的操作真真正正計(jì)算了一遍,并賦值給 count。但我們只是一年后要讀取這個(gè) count 值一次,顯然,中間這一年對(duì) count 值準(zhǔn)確地計(jì)算出結(jié)果,就是不必要的。而恰恰是因?yàn)槊看味家獪?zhǔn)確計(jì)算出它的結(jié)果,導(dǎo)致多線程之間發(fā)生了競(jìng)爭(zhēng),浪費(fèi)了資源。那思路就打開了!設(shè)計(jì)思路
我們事先搞出多個(gè)這種 count 變量,并且用某種方式讓不同線程對(duì)應(yīng)到不同 count 變量上。你看這樣,如果僅僅有四個(gè)線程,就完全不存在線程競(jìng)爭(zhēng)的問題,每個(gè)線程操作唯一的變量。過一段時(shí)間后, 獲取最終的值,只需要把它們加和即可。這樣,獲取 count 值的復(fù)雜度增加了,需要做個(gè)加和操作,但卻是整個(gè)過程完全沒有線程競(jìng)爭(zhēng)。犧牲讀性能,換取寫性能。用空間換時(shí)間。你看,即使一個(gè)小小的多線程 1 操作的設(shè)計(jì),也存在架構(gòu)思維中的 trade off 思想,這在我之前兩篇架構(gòu)文章中多次提到。正所謂,不存在完美的算法,我們都只是在做平衡,犧牲這個(gè),才能換取那個(gè)。
具體實(shí)現(xiàn)
設(shè)計(jì)思路中,我們盡可能把問題簡(jiǎn)化,才能得到一個(gè)大方向。現(xiàn)在我們要具體設(shè)計(jì)了,就要把剛剛懶得思考的問題,拿出來了,這個(gè)過程的確比較痛苦。
懶加載
首先,我們當(dāng)然希望,整個(gè)過程都不存在線程競(jìng)爭(zhēng)。這樣我們一開始就創(chuàng)建了那么多 count,并且把線程一一映射過去,假如本來他們共同對(duì)同一個(gè)共享變量 1 就不會(huì)產(chǎn)生競(jìng)爭(zhēng),那這種方式就有很大問題了:1. 浪費(fèi)了空間2. 多了線程映射的算法邏輯3. 最終獲取值時(shí)還要加和得不償失呀。所以我們采用懶加載的辦法,一開始,仍然是對(duì)同一個(gè)共享變量 1,等真正出現(xiàn)競(jìng)爭(zhēng)了,再開始啟用更多的 count。我們把一開始使用的唯一共享變量叫做 base,把之后開啟的多個(gè)變量叫做 Cell 類,放在一個(gè) Cell[] 數(shù)組里。Cell 類里只有一個(gè)變量就是 value,存儲(chǔ)累加過程中的值。數(shù)組擴(kuò)容
一開始,這個(gè) Cell 數(shù)組是空的。等 base 變量出現(xiàn)了一次競(jìng)爭(zhēng)失敗的情況,就初始化這個(gè) Cell[] 數(shù)組,第一次里面放兩個(gè) Cell。此時(shí),如果只有三個(gè)線程 1,就可以保證不會(huì)發(fā)生競(jìng)爭(zhēng)。但如果此時(shí)又來了一個(gè)線程,導(dǎo)致了競(jìng)爭(zhēng),即 CAS 失敗,那么可以擴(kuò)容 Cell[] 數(shù)組。可以注意到我畫的,Cell 數(shù)組初始大小為 2,之后擴(kuò)容也是翻倍的方式,不知道你有沒有想到些什么,我們接著往下看。線程映射綁定
剛剛,我們一直默認(rèn),線程和 Cell 數(shù)組中的每個(gè) Cell 是一一對(duì)應(yīng)的關(guān)系,可是怎么做到這一點(diǎn)呢?我們?cè)诿總€(gè)線程中,維護(hù)一個(gè)局部變量,這個(gè)變量屬于這個(gè)線程,這個(gè)變量的值根據(jù) Cell[] 數(shù)組的大小哈希取模,就可以映射到其中一個(gè) Cell 上了。那同線程綁定的這個(gè)局部變量是怎么來的呢?別擔(dān)心,JDK 已經(jīng)幫我們?cè)O(shè)計(jì)好了,這就是 Thread 類里的變量 probe。public?class?Thread?implements?Runnable?{
???...
???int?threadLocalRandomProbe;
???...
}
但是我們不能直接獲取,需要借助 ThreadLocalRandom 類的如下辦法獲取。static?final?int?getProbe()?{
???return?UNSAFE.getInt(Thread.currentThread(),?PROBE);
}
當(dāng)然,獲取出的這個(gè)值,可能哈希取模后也會(huì)發(fā)生沖突。沒關(guān)系,請(qǐng)注意,這只是哈希取模沖突,也就是多個(gè)線程可能要對(duì)同一個(gè) Cell 里的 value 進(jìn)行 CAS 1 操作,但不一定會(huì)產(chǎn)生競(jìng)爭(zhēng)。所以,發(fā)生哈希取模沖突后,先直接嘗試 CAS 1 操作,如果能成功,就沒那么多事了。但假如恰好,CAS 的時(shí)候又發(fā)生了競(jìng)爭(zhēng),導(dǎo)致操作失敗怎么辦?還好,可以用這種方式為該線程的 probe 重新賦值。
probe?=?新的值,自己生成一個(gè);
UNSAFE.putInt(Thread.currentThread(),?PROBE,?probe);
重新賦值后的 probe,再次經(jīng)過哈希取模后,就不會(huì)和之前的沖突了。但很不幸,假如再?zèng)_突怎么辦?那就再次嘗試 CAS 1 操作。但假如又很不幸,CAS 1 操作又失敗了,要不要繼續(xù)重新賦值 probe 呢?要,不過,此時(shí)說明競(jìng)爭(zhēng)已經(jīng)很激烈了。簡(jiǎn)單說就是,這個(gè) Cell 數(shù)組有點(diǎn)擁擠了,此時(shí)我們選擇將數(shù)組擴(kuò)容!擴(kuò)容大家還記得吧,就是上一節(jié)中的。這就又回到了上一節(jié)中的步驟,如此循環(huán)往復(fù)。當(dāng)然,擴(kuò)容也要有個(gè)限度,我們規(guī)定,數(shù)組大小超過 CPU 核心數(shù)后,就不再擴(kuò)容了。CPU 核心數(shù),可以用如下方式獲取。
Runtime.getRuntime().availableProcessors();
如果再發(fā)生沖突和競(jìng)爭(zhēng)的情況,那就不斷重新賦值 probe,不斷嘗試 CAS。有同學(xué)可能會(huì)說,那一直不成功咋辦呢?別忘了,如果不使用我們這個(gè) LongAdder,僅僅用一個(gè)原子類不斷 1,失敗的概率是更高的,我們已經(jīng)通過將線程分散到不同 Cell,降低了發(fā)生競(jìng)爭(zhēng)失敗的概率了。
執(zhí)行流程
至此,設(shè)計(jì)思路和實(shí)現(xiàn)過程,就都搞定了,我們來看一下整個(gè)流程。1. 最開始只有一個(gè) base 變量,多個(gè)線程和諧地進(jìn)行 CAS 1 操作。2. 直到有一天,兩個(gè)線程發(fā)生了競(jìng)爭(zhēng),即其中一個(gè)線程 CAS 時(shí)失敗了,那么就創(chuàng)建一個(gè)大小為 2 的 Cell[] 數(shù)組,用線程私有的局部變量 probe 取模,映射到一個(gè) Cell 上,對(duì)其 CAS 1 操作。3. 不過假如線程 probe 取模后,發(fā)現(xiàn)那個(gè) Cell 已經(jīng)被綁定過了,不要緊,先 CAS 1 試一試。4. 但如果沒試成功,說明此處有競(jìng)爭(zhēng),那重新計(jì)算一下線程的 probe 值,映射到一個(gè)新的 Cell 上。5. 如果此時(shí)又沖突,并且 CAS 1 又失敗,那么將 Cell[] 數(shù)組擴(kuò)容。6. 最后當(dāng)要獲取最終的累計(jì)和時(shí),用 base 的值,加上所有 Cell[] 數(shù)組里的 value 值,得出一個(gè)和,返回給調(diào)用方。這個(gè)破玩意,其實(shí) JDK 中早有實(shí)現(xiàn),又是 Doug Lea 大神寫的一個(gè)類,名為LongAdder
LongAdder
通過我們剛剛眼花繚亂的分析,再看 Doug Lea 大神的 LongAdder 就非常容易了。我們看最核心的 add 方法,這是最外層的邏輯,很容易理解。
public?class?LongAdder?extends?Striped64?implements?Serializable?{
???public?void?add(long?x)?{
???????Cell[]?as;?long?b,?v;?int?m;?Cell?a;
???????if?((as?=?cells)?!=?null?||?!casBase(b?=?base,?b? ?x))?{
???????????//?已經(jīng)初始化了?Cell?數(shù)組
???????????//?或者對(duì)?base?變量?CAS? 1?操作失敗
???????????//?就走到這里了
???????????boolean?uncontended?=?true;
???????????if?(as?==?null?||?(m?=?as.length?-?1)?0?||
???????????????//?下面這行就是?probe?取模操作
???????????????(a?=?as[getProbe()?