這篇長文除了由淺入深的一步步迭代出無鎖隊(duì)列的實(shí)現(xiàn)原理,也會借此說說如何在項(xiàng)目中注意避免寫出有 BUG 的程序,與此同時(shí)也會簡單聊聊如何測試一段代碼,而這些能力應(yīng)該是所有軟件開發(fā)工作者都應(yīng)該引起注意的。而在介紹的過程中也會讓你明白理論和實(shí)際的差距到底在哪。
高級程序員和初級程序員之間,魚鷹認(rèn)為差距之一就在于寫出來的代碼 BUG 多還是少,當(dāng)然解決 BUG 的能力也很重要。
既要有挖坑的能力,也要有填坑的實(shí)力!
很早之前魚鷹就聽說過無鎖隊(duì)列,但由于以前的項(xiàng)目不是很復(fù)雜,很多時(shí)候都可以不需要無鎖隊(duì)列,所以就沒有更深入的學(xué)習(xí),直到這次串口通信想試試無鎖隊(duì)列的效果,才最終用上了這一神奇的代碼。
網(wǎng)上有個詞形容無鎖隊(duì)列魚鷹覺得很貼切:巧奪天工!
現(xiàn)在就從隊(duì)列開始說起吧。
什么是隊(duì)列,顧名思義,就類似于超市面前排起的一個隊(duì)伍,當(dāng)最前面的顧客買完了東西,后面的顧客整體向前移動,而處于新隊(duì)頭的顧客繼續(xù)消費(fèi)。如果沒有后來的顧客,那么最終這個隊(duì)伍將消失。而如果有新的顧客到來,那么他將排在隊(duì)伍最后等待購買。
對于
顧客隊(duì)伍來說,消費(fèi)者,在這里其實(shí)是
超市(不是顧客),因?yàn)樗屵@個隊(duì)伍越來越短,直至消失;而生產(chǎn)者,在這個模型中無法看到,只能說是
市場需求導(dǎo)致隊(duì)伍越來越長。
如果市場上所有的口罩會自動排隊(duì)的話,那么購買口罩的人就是口罩隊(duì)伍的
消費(fèi)者,而生產(chǎn)口罩的廠家就是
生產(chǎn)者。因?yàn)閺S家生產(chǎn)的少,而購買者多,所以才會出現(xiàn)供不應(yīng)求的情況,而一旦廠家產(chǎn)能上來了,供需就能平衡,而這種平衡不僅是市場需要的,也是軟件所追求的目標(biāo)。
說回超市模型,如果我們用軟件模擬這種排隊(duì)情況,應(yīng)該怎么做呢?
聲明一個足夠大的數(shù)組,后面來的數(shù)據(jù)(顧客)總是放在最后面,一旦有程序取用了數(shù)據(jù),那么
整體向前移動,后面來的數(shù)據(jù)繼續(xù)放在最后。(這里需要一個變量指示隊(duì)伍的長度,畢竟不是真實(shí)的排隊(duì)場景,無法直接從隊(duì)伍本身看出哪里是隊(duì)尾)。
這樣確實(shí)很好的模擬了現(xiàn)實(shí)中的情況,但對于軟件來說,效率太低!
對于顧客而言,
所有的顧客每次前進(jìn)一小步,感覺好像沒什么,也很正常,畢竟最終等待的時(shí)間還是一樣的,但是每隔一小段時(shí)間就要動一動,還是挺麻煩的事情。
而對于程序而言,不僅僅是移動數(shù)據(jù)需要耗費(fèi)CPU,更重要的是在移動過程中,如何保證“消費(fèi)者”能正確提取數(shù)據(jù),“生產(chǎn)者”能正確插入數(shù)據(jù)。畢竟你在移動數(shù)據(jù)的過程中有可能消費(fèi)者需要消費(fèi)數(shù)據(jù),而生產(chǎn)者需要生成數(shù)據(jù),如何進(jìn)行同步呢?
現(xiàn)實(shí)其實(shí)已經(jīng)給了我們答案。
不知道你是否發(fā)現(xiàn)一個現(xiàn)象,銀行里面一排排的椅子上坐滿了人,但是銀行窗口前卻沒有了長長的隊(duì)伍?這是怎么回事?為什么那些人不再排隊(duì)了,他們不怕有人插隊(duì)嗎?
當(dāng)你親自體驗(yàn)了之后就知道,沒人會插你的隊(duì),為什么?
當(dāng)你走進(jìn)銀行時(shí),服務(wù)員會提醒你去一臺機(jī)器前領(lǐng)取一個紙質(zhì)號碼,然后就請你在座位上等著。
等了一會,銀行內(nèi)廣播開始播報(bào)“9527,請到2號柜臺辦理業(yè)務(wù)”,你一聽,這不就是自己剛領(lǐng)的號嘛,所以你馬上就到窗口前辦理需要的業(yè)務(wù)了。
那么這是如何實(shí)現(xiàn)的?為什么你不再需要站著排隊(duì)了,也不需要一點(diǎn)一點(diǎn)的向前挪動,而是一直
坐著
就把隊(duì)給
排了?
每來一個人領(lǐng)取號碼,機(jī)器就把
當(dāng)前的
計(jì)數(shù)值給顧客,然后增加當(dāng)前的計(jì)數(shù),作為下一位顧客使用的號碼;而每當(dāng)銀行柜員服務(wù)完一位顧客,機(jī)器就會自動播報(bào)本次已服務(wù)對象號碼的下一個號碼前來辦理業(yè)務(wù),并自加
另一個計(jì)數(shù)
值,作為下一次辦理業(yè)務(wù)的對象。而當(dāng)兩計(jì)數(shù)器值相等時(shí),就代表該柜員完成了所有顧客的服務(wù)。
所以,不是沒有排隊(duì),而是
機(jī)器在幫你排隊(duì)
。
那么這種機(jī)制在程序中該如何實(shí)現(xiàn)呢?
在這里我們先假設(shè)只有一個柜臺能辦理業(yè)務(wù)(實(shí)際銀行是有多個柜臺,這里暫時(shí)不考慮那么復(fù)雜),一個機(jī)器為顧客發(fā)放編號。
首先需要兩個變量充當(dāng)機(jī)器的職能:
in:初始化為 0,當(dāng)來了顧客,將當(dāng)前值發(fā)給客戶,然后自動加 1
out:初始化為 0,當(dāng)柜員需要請求下一位顧客前來服務(wù)時(shí),服務(wù)顧客編號為當(dāng)前值,然后自動加 1
因?yàn)榧垪l上可展示的位數(shù)是有限的,所以我們可以先假設(shè)為4位數(shù),即兩個變量值范圍是0~9999(還好STM32F103 RAM內(nèi)存足夠)。
然后需要一個元素容量為10000大數(shù)組,存放目前排隊(duì)顧客的編號,因?yàn)閿?shù)組中存放的是0~9999的編號,所以數(shù)組的聲明采用uint16_t 進(jìn)行聲明,這樣才能把編號存放在數(shù)組單元中。
現(xiàn)在看代碼(RT-Thread系統(tǒng)):
uint16_t queue[10000];
uint16_t in;
uint16_t out;
int get(void *parameter)
{
if(in == out)
{
rt_kprintf("當(dāng)前沒有顧客等待服務(wù)\n");
return -1;
}
rt_kprintf("請編號:【%04u】 到柜臺辦理服務(wù)\n",queue[out]);
out = (out + 1) % 10000;
return 0;
}
MSH_CMD_EXPORT(get, RT-Thread first led sample);
int put(void)
{
if((in + 1) % 10000 == out)
{
rt_kprintf("現(xiàn)在排隊(duì)人數(shù)太多,請下次嘗試\n");
return -1;
}
rt_kprintf("您當(dāng)前領(lǐng)取的編號為 %04u\n",in);
queue[in] = in;
in = (in + 1) % 10000;
return 0;
}
MSH_CMD_EXPORT(put, RT-Thread first led sample);
現(xiàn)在假設(shè)柜員剛開始上班,他不清楚有沒有顧客等待服務(wù),所以他首先使用 get函數(shù)獲取自己的服務(wù)顧客編號,但是因?yàn)楣駟T來的有點(diǎn)早,顧客還沒來銀行辦理業(yè)務(wù),所以第一次機(jī)器告訴柜員沒有顧客等待服務(wù)。
后來陸續(xù)來了三位顧客,柜員再一次獲取時(shí),編號為 0 的顧客開始前來辦理業(yè)務(wù),柜員服務(wù)完之后,接著繼續(xù)把剩下兩位的業(yè)務(wù)都辦理完成,第四次獲取時(shí),柜員發(fā)現(xiàn)沒有顧客等待了,所以柜員可以休息一下(實(shí)際情況是,柜員前應(yīng)該有顯示當(dāng)前排隊(duì)人數(shù),當(dāng)排隊(duì)人數(shù)為 0 時(shí),就不會去使用 get 函數(shù)了)
雖然可顯示的位數(shù)為四位數(shù),有可能出現(xiàn)一天共有9999位顧客需要辦理服務(wù),但事實(shí)上同一時(shí)刻不可能有那么多人
同時(shí)在銀行排隊(duì)辦理業(yè)務(wù),所以為了節(jié)約空間,有必要縮小數(shù)組空間。
假設(shè)一次排隊(duì)最多同時(shí)有99位顧客需要業(yè)務(wù),那么數(shù)組設(shè)置為 100(為什么多一個?)。
但是因?yàn)橐惶熘锌偣部赡苡谐^ 99 位顧客辦理業(yè)務(wù),那么直接使用in作為顧客編號肯定不合適,因?yàn)閕n的范圍是0~99,那么必然需要另一個變量用于記錄當(dāng)天的總?cè)藬?shù),這里假設(shè)為counter(當(dāng)你修改代碼時(shí),你會發(fā)現(xiàn)把之前的 10000 設(shè)置為宏是明智的選擇)。
這里除了修改數(shù)組大小和put函數(shù)外,其他都一樣:
#define BUFF_SIZE 100
uint16_t queue[BUFF_SIZE];
uint16_t in;
uint16_t out;
uint16_t counter;
int get(void *parameter)
{
if(in == out)
{
rt_kprintf("當(dāng)前沒有顧客等待服務(wù)\n");
return -1;
}
rt_kprintf("請編號:【%04u】 到柜臺辦理服務(wù)\n",queue[out]);
out = (out + 1) % BUFF_SIZE;
return 0;
}
MSH_CMD_EXPORT(get, RT-Thread first led sample);
int put(void)
{
if((in + 1) % BUFF_SIZE == out)
{
rt_kprintf("現(xiàn)在排隊(duì)人數(shù)太多,請下次嘗試\n");
return -1;
}
rt_kprintf("您當(dāng)前領(lǐng)取的編號為 %04u\n",counter);
queue[in] = counter;
counter = (counter + 1) % 10000;
in = (in + 1) % BUFF_SIZE;
return 0;
}
MSH_CMD_EXPORT(put, RT-Thread first led sample);
你會發(fā)現(xiàn),雖然修改了數(shù)組大小,但是只要不會出現(xiàn)
同時(shí)超過100人來銀行排隊(duì),那么和之前的效果是一樣的,這樣一來,
大大降低了
內(nèi)存
空間使用。
那么魚鷹繼續(xù)寫另一種處理方式,稱之為V1.5吧。
雖然因?yàn)橐粋€神奇的取余公式,可以讓一個數(shù)始終在某一個范圍內(nèi)變化,但這也帶來非常一個明顯的問題,那就是始終不能把隊(duì)列真正填滿。
雖然只是空閑了一個元素,但是對于喜歡追求極致的人來說,這也是不可忍受的事情,所以是否有好的辦法利用所有隊(duì)列空間呢?
這里再加入一個變量,實(shí)時(shí)跟蹤隊(duì)列長度,代碼如下:
#define BUFF_SIZE 7
typedef struct
{
uint16_t queue[BUFF_SIZE];
uint16_t in;
uint16_t out;
uint16_t used;
}fifo_def;
uint16_t counter;
fifo_def fifo;
int get(void *parameter)
{
if(fifo.used == 0)
{
rt_kprintf("當(dāng)前沒有顧客等待服務(wù)\n");
return -1;
}
rt_kprintf("請編號:【%04u】 到柜臺辦理服務(wù)\n",fifo.queue[fifo.out]);
fifo.out = (fifo.out + 1) % BUFF_SIZE;
fifo.used--;
return 0;
}
MSH_CMD_EXPORT(get, RT-Thread first led sample);
int put(void)
{
if(fifo.used >= BUFF_SIZE)
{
rt_kprintf("現(xiàn)在排隊(duì)人數(shù)太多,請下次嘗試\n");
return -1;
}
rt_kprintf("您當(dāng)前領(lǐng)取的編號為 %04u\n",counter);
fifo.queue[fifo.in] = counter;
counter = (counter + 1) % 10000;
fifo.in = (fifo.in + 1) % BUFF_SIZE;
fifo.used++;
return 0;
}
MSH_CMD_EXPORT(put, RT-Thread first led sample);
在這個版本中,魚鷹將隊(duì)列所需的元素整合成一個結(jié)構(gòu)體方便使用(當(dāng)需要增加一個隊(duì)列時(shí),顯得比較方便),并加入一個used變量,還縮小了隊(duì)列大小,變成
7,方便我們關(guān)注隊(duì)列最本質(zhì)的東西。
因?yàn)樵黾恿艘粋€變量,所以可以完整利用所有空間,但是是否有更好的方式呢?
這還不是無鎖隊(duì)列的最終形態(tài),而是一個初版,但已經(jīng)是無鎖隊(duì)列中最核心的內(nèi)容了,也是實(shí)現(xiàn)無鎖隊(duì)列的關(guān)鍵,定為V2.0。
#define BUFF_SIZE 8 // 文章說的是 7,但是這是不能用的,只能為 2 的冪次方
typedef struct
{
uint32_t in;
uint32_t out;
uint8_t *queue;
}fifo_def;
uint8_t counter;
uint8_t fifo_buff[BUFF_SIZE];
fifo_def fifo = {0,0,fifo_buff};
int get(void *parameter)
{
if(fifo.in - fifo.out == 0)
{
rt_kprintf("當(dāng)前沒有顧客等待服務(wù)\n");
return -1;
}
rt_kprintf("請編號:【%04u】 到柜臺辦理服務(wù)\n",fifo.queue[fifo.out % BUFF_SIZE]);
fifo.out++;
return 0;
}
MSH_CMD_EXPORT(get, RT-Thread first led sample);
int put(void)
{
if(BUFF_SIZE - fifo.in + fifo.out == 0)
{
rt_kprintf("現(xiàn)在排隊(duì)人數(shù)太多,請下次嘗試\n");
return -1;
}
rt_kprintf("您當(dāng)前領(lǐng)取的編號為 %04u\n",counter);
fifo.queue[fifo.in % BUFF_SIZE] = counter;
counter += 1;
fifo.in++;
return 0;
}
MSH_CMD_EXPORT(put, RT-Thread first led sample);
if(BUFF_SIZE - (fifo.in - fifo.out) == 0)
{
rt_kprintf("現(xiàn)在排隊(duì)人數(shù)太多,請下次嘗試\n");
return -1;
}
if(fifo.in - fifo.out == 0)
{
rt_kprintf("當(dāng)前沒有顧客等待服務(wù)\n");
return -1;
}
現(xiàn)在先說第一個簡單的,為什么in和out相等了,就代表為空?
我們可以這樣理解,小紅、小藍(lán)兩個人在一個環(huán)形操場跑步,在起步時(shí),兩個人的位置一樣,但跑了一段時(shí)間時(shí),兩人位置不同了,因?yàn)樾〖t跑的快,所以總在小藍(lán)前面,但他們是好朋友,所以小紅總是在跑了一段時(shí)間后停下來等小藍(lán)。
雖然小藍(lán)跑的慢,但因?yàn)樾〖t的等待,所以每次追上小紅處于同一個位置時(shí),都可以認(rèn)他們又處于起步階段,就好像他們從來沒有動過。
也就是說in和out相等時(shí),就可以認(rèn)為隊(duì)列為空。
這個說實(shí)話,魚鷹不知道該如何說明,只能強(qiáng)行解釋一波了(說笑的,不過確實(shí)很難說清楚)。
BUFF_SIZE - (fifo.in - fifo.out)
內(nèi)括號里面的其實(shí)就是《延時(shí)功能進(jìn)化論》中說的那個神奇的算式。
這個算式總是能準(zhǔn)確in和out之間的差值(不管是in大于out還是小于),也就是隊(duì)列中已使用了的空間大小,當(dāng)然這是有條件的。
想象一下這樣一個場景,小紅不再等待小藍(lán),而是一口氣跑下去,當(dāng)小紅再次接近小藍(lán)時(shí),你會發(fā)現(xiàn)小紅看到的是小藍(lán)的后背,也就是說,從表象來看,小藍(lán)跑到小紅的前面了。
但是群眾的眼光是雪亮的,他們知道小紅其實(shí)是跑在小藍(lán)的前面的,所以按小紅在小藍(lán)前面這種情況用尺子量一下兩者就能準(zhǔn)確知道兩者的距離。
我們現(xiàn)在繼續(xù)分析,因?yàn)樾〖t跑太快了,它超越了小藍(lán)!
雖然觀眾還是可以通過尺子進(jìn)行距離測量,但是如果單從操場
絕對位置(假設(shè)操場上有距離刻度)的角度來看,已經(jīng)無法準(zhǔn)確計(jì)算他們的距離了(當(dāng)然如果能記錄小紅超越小藍(lán)的次數(shù),那還是可以計(jì)算的,現(xiàn)在按下不表)。
所以如果把 in 當(dāng)成小紅操場所在的刻度信息,out當(dāng)成小藍(lán)的操場刻度信息,那么兩者 in 減 out 就是他們的距離信息(當(dāng)然了,這里利用了變量溢出的特性才能準(zhǔn)確計(jì)算,關(guān)于這個可以看《運(yùn)算符 % 的妙用》)。
上面動圖的隊(duì)列空間是0x18,即24,如果我們把這個隊(duì)列空間增大,增大到四個字節(jié)大小4294967295
+ 1怎么樣?
然后給你一把長度為 7 的尺子,讓你能準(zhǔn)確測量小紅和小藍(lán)的距離,你會怎么做(想象一下這個場景)?
因?yàn)槌咦幽軠y量的距離太多,而小紅跑的太快,所以你必須約束一下小紅,讓他在距離小藍(lán)為7或者更短的距離時(shí)必須停下來等小藍(lán)或者放慢速度!
由此我們就可以同時(shí)理解下面這個公式和in與out的自加了。
BUFF_SIZE - (fifo.in - fifo.out)
fifo.in++;
fifo.out++;
BUFF_SIZE 就是這把尺子,而 in 和 out 相減可以得到兩者距離,尺子總長減去兩者距離,就可得到尺子剩余可測量的距離,也是小紅還可再跑的距離。
而一直自加其實(shí)就是利用變量自動溢出的特性,而即使溢出導(dǎo)致 out 大于 in 也不會影響計(jì)算結(jié)果。
看一個動圖結(jié)合理解溢出(公**眾&&號可查看):
那為什么這里使用32位無符號整型,可不可以用8位數(shù)?
不可以,起碼在上面那個公式下是不可以的(可以稍作修改)。
這里其實(shí)涉及到計(jì)算機(jī)最為基礎(chǔ)的知識(基礎(chǔ)不牢,地動山搖,不過還好魚鷹掌握一點(diǎn)匯編知識和許多調(diào)試技巧)。
現(xiàn)在魚鷹把 in 和 out 修改為 8 位無符號數(shù),然后測試當(dāng) in = 0 和 out = -7(因?yàn)槭菬o符號,這里其實(shí)是
249),開始測試這個公式:
你會發(fā)現(xiàn)in - out = 0xFFFF FF07(這個結(jié)果會導(dǎo)致順利執(zhí)行put函數(shù),而不是直接退出),而不是0x0000 0007,這是因?yàn)橛?jì)算8位數(shù)時(shí),采用的是32位整型計(jì)算,也就是運(yùn)算等級提升了,就類似浮點(diǎn)型和整型一起混合計(jì)算,自動使用浮點(diǎn)計(jì)算!
但是當(dāng)你使用32位數(shù)聲明in和out,還是in = 0,out = -7(其實(shí)是4294967289),那么計(jì)算結(jié)果就完全不一樣了(變成了0x0000 0007,put函數(shù)直接退出)。
這里講的太細(xì)了,使道友少了很多
獨(dú)立思考,現(xiàn)在留個問題給各位,為什么選擇 -7 ?
寫完這部分,魚鷹才發(fā)現(xiàn) V2.0 版本是存在 BUG 的,原因就在于存儲數(shù)據(jù)和讀取數(shù)據(jù)那需要注意的代碼,之前魚鷹想當(dāng)然的認(rèn)為直接取余 % 即可正確讀取和存儲數(shù)據(jù),實(shí)際上因?yàn)?7 這個數(shù)根本不能被 0xFFFF FFFF + 1 給整除,也就無法正確使用了,所以
V2.0
版本無法使用,但如果把 7 換成 8,就沒有問題了,這里需要特別注意(還好提早發(fā)現(xiàn)了,不然就是誤人子弟了)。
好了 ,終于把最難理解的部分勉強(qiáng)說完了!
因?yàn)?in 和 out 一直自加,所以不像 V1.0 一樣出現(xiàn)空和滿時(shí) in 和 out 都是相等,也就不需要留下一個空間了,完美的利用了所有可用的空間!
但是 in 和 out 總是自加,必然會出現(xiàn) in 和 out 大于空間大小的情況,所以在取隊(duì)列中的元素時(shí)必須使用 % 限制大小,才能準(zhǔn)確獲取隊(duì)列中的數(shù)據(jù)(前提是隊(duì)列大小為 2 的冪次方),而 get 和 put 開頭的判斷總能保證
in
大于或等于
out(這個只是從結(jié)果來看是這樣,但實(shí)際有可能 in 小于 out,但這個不影響最終結(jié)果),而
in
–
out
小于或等于隊(duì)列大小,防止了可能的數(shù)組越界情況。
現(xiàn)在開始介紹真正的無鎖隊(duì)列。
如果僅僅從無鎖的角度來說,前面幾個版本除了 V1.5 外其實(shí)都是無鎖模式了(后面會說為什么 V1.5 不是無鎖),那為什么還要升級?目的何在?。
不知道你是否還記得筆記開頭魚鷹曾說因?yàn)樾侍投床捎醚h(huán)隊(duì)列,為什么,因?yàn)榘凑漳壳暗膶?shí)現(xiàn)來說,每次 get 都只能獲取一個隊(duì)列元素,而每次 put 也只能寫入一個元素,這對于大量隊(duì)列元素的拷貝來說效率非常低下(比如從隊(duì)列中拷貝大量串口數(shù)據(jù))。
假如說你從隊(duì)列中獲取所有的數(shù)據(jù),常規(guī)方式你只能使用 get 函數(shù)一個一個元素的取出,每次 get 除了函數(shù)體本身的運(yùn)算外(判斷是否為空、調(diào)整索引、取隊(duì)列元素、自加 out),還有進(jìn)入、退出函數(shù)的消耗,如果獲取的數(shù)量少還好,一旦多了,這其中的消耗就不得不考慮了。
所以存在大量隊(duì)列數(shù)據(jù)拷貝的情況下,以上各個版本都是不合適的,所以有必要進(jìn)行優(yōu)化升級!
既然上述版本不合適,我們就會思考,如果能僅從in和out的信息就能拷貝所有隊(duì)列中的數(shù)據(jù)那該多好。
事實(shí)上,linux 中的無鎖隊(duì)列實(shí)現(xiàn)就是利用 in 和 ou t就解決了大量數(shù)據(jù)拷貝問題。
而這個拷貝思想如果能運(yùn)用在
塊存儲設(shè)備,那么將極大簡化代碼。
那么我們來看看無鎖隊(duì)列是如何實(shí)現(xiàn)的。
首先上代碼(魚鷹為了方便說明和測試,未提供完整函數(shù),這個可后臺領(lǐng)取):
#define BUFF_SIZE 8
typedef struct
{
uint32_t in;
uint32_t out;
uint32_t size;
uint8_t *buffer;
}fifo_def;
uint8_t counter;
uint8_t fifo_buff[BUFF_SIZE];
fifo_def fifo = {0,0,BUFF_SIZE,fifo_buff};
void init(fifo_def *pfifo,uint8_t *buff,uint32_t size)
{
if(size == 0 || (size & (size - 1)))
{
retern;
}
pfifo->in = 0
pfifo->out = 0;
pfifo->size = size;
pfifo->buffer = buff;
}
int get(void *parameter)
{
fifo_def *pfifo = &fifo;
uint32_t len = 2;
uint8_t buffer[2];
uint32_t l;
len = min(len, pfifo->in - pfifo->out);
l = min(len, pfifo->size - (pfifo->out & (pfifo->size - 1)));
memcpy(buffer, pfifo->buffer + (pfifo->out & (pfifo->size - 1)), l);
memcpy(buffer + l, pfifo->buffer, len - l);
pfifo->out += len;
if(len)
{
for(int i = 0; i < len; i++)
{
rt_kprintf("請編號:【%04u】 到柜臺辦理服務(wù)\n",buffer[i]);
}
}
else
{
rt_kprintf("當(dāng)前沒有顧客等待服務(wù)\n");
}
return 0;
}
MSH_CMD_EXPORT(get, RT-Thread first led sample);
int put(void)
{
fifo_def *pfifo = &fifo;
uint32_t len = 1;
uint32_t l;
uint8_t buffer[1] = {counter};
len = min(len, pfifo->size - pfifo->in + pfifo->out);
l = min(len, pfifo->size - (pfifo->in & (pfifo->size - 1)));
memcpy(pfifo->buffer + (pfifo->in & (pfifo->size - 1)), buffer, l);
memcpy(pfifo->buffer, buffer + l, len - l);
pfifo->in += len;
if(len)
{
rt_kprintf("您當(dāng)前領(lǐng)取的編號為 %04u\n",buffer[0]);
}
else
{
rt_kprintf("當(dāng)前沒有顧客等待服務(wù)\n");
}
counter++;
return 0;
}
MSH_CMD_EXPORT(put, RT-Thread first led sample);
1、將隊(duì)列大小改為 8,即符合2的冪次方(這是in和out無限自加所要求的,也是一種限制)
2、將緩存名更改為buffer,更容易理解(所以取名字也很重要)
3、增加了一個變量size,這是方便在多個fifo情況可以使用同一個函數(shù)體(函數(shù)復(fù)用),通常這個值只在初始化時(shí)賦值一次,之后不會修改。
現(xiàn)在著重說明一種特殊拷貝情況,當(dāng)能理解這種特殊情況,那么其他情況也就不難理解了。
首先看兩個min,這是用來取兩者之間最小的那一個。
len = min(len, pfifo->in - pfifo->out);
l = min(len, pfifo->size - (pfifo->out & (pfifo->size - 1)));
memcpy(buffer, pfifo->buffer + (pfifo->out & (pfifo->size - 1)), l);
memcpy(buffer + l, pfifo->buffer, len - l);
開始的len是用戶要求拷貝的長度,但考慮到用戶可能拷貝大于隊(duì)列已有數(shù)據(jù)的長度,所以有必要對它進(jìn)行重新設(shè)置,即最多只能拷貝目前隊(duì)列中已有的大小。
第二個min其實(shí)用來獲取隊(duì)列out開始到數(shù)組結(jié)束的大小,難理解?看下圖就很清楚了:
所以,它獲取的就是out到數(shù)組結(jié)束的地方,即上圖5~7的數(shù)據(jù),那么為什么它能從out這個值得到數(shù)組內(nèi)的索引,它不是一直在自加嗎?它肯定會出現(xiàn)超過數(shù)組的情況???
是的,你猜的沒錯,如果數(shù)組大小
小于四個字節(jié)所表示的大小的話,它肯定會超過數(shù)組的索引值的,但你不要忘了,數(shù)組的大小是2的冪次方,也就是說out % size的余數(shù)必然還在數(shù)組中,即out這個值其實(shí)已經(jīng)蘊(yùn)含了數(shù)組索引值的信息的。
難理解,繼續(xù)看之前的動態(tài)(沒辦法,資源太少啦):
當(dāng)你只盯著
后三位 bit 數(shù)據(jù),而不管前面有多少 bit 時(shí),你就能理解了。
雖然 out 如上所示一直在自加過程中,但后三位(十進(jìn)制 0 ~ 7)是
自成循環(huán)的,而這自成循環(huán)的值就是我們的數(shù)組索引 0~7 的值,所以為了效率,可使用 & 運(yùn)算(效率一說后面再講)。
我們再進(jìn)一步思考,因?yàn)樗昧俗詈蟮难h(huán)數(shù)作為數(shù)組的索引,在 in、out、size 都是四個字節(jié)的情況下,隊(duì)列最大的長度是多大?
錯,應(yīng)該是 0x8000 0000,2 GB。因?yàn)槿绻?GB的話,size 的值為 0x1 0000 0000,超過四字節(jié)大小了,所以隊(duì)列大小被除了被 in、out 所限制,還被size限制了。
size – out 不就是數(shù)組
右邊部分的數(shù)據(jù)大小了,拷它!
然后數(shù)組
開頭部分剩余數(shù)據(jù)大小,拷它!
這樣一來,就很清楚了,當(dāng)你明白了這些,你會發(fā)現(xiàn)這兩個拷貝函數(shù)能適應(yīng)隊(duì)列存儲的各種情況,巧奪天工!
無鎖隊(duì)列的原理看似結(jié)束了,關(guān)鍵代碼也介紹清楚了,但其實(shí)一個關(guān)鍵點(diǎn)到現(xiàn)在還沒有進(jìn)行說明,那就是,怎么它就是無鎖了,而V1.5又為什么不是無鎖?
無鎖,有個很重要的前提,就是
一個生產(chǎn)者,一個消費(fèi)者,如果沒有這個前提,以上所有版本都不能稱之為無鎖!
那么首先討論,軟件中的生產(chǎn)者和消費(fèi)者到底是什么?
如果一個資源(這里可以是各種變量、外設(shè))只在一個
任務(wù)中順序使用,那么無鎖、有鎖都沒有關(guān)系,但是在多個任務(wù)中就不一樣了。
那么現(xiàn)在再來說說這里的
任務(wù)代表著什么?任務(wù)是一個函數(shù),一個功能?是,也不是。
裸機(jī)系統(tǒng)中,main 算一個函數(shù),也是一個主功能函數(shù),但它可以算任務(wù)嗎?算!系統(tǒng)中有很多中斷處理函數(shù),他們算一個個的任務(wù)嗎?算!除此之外還有任務(wù)嗎?沒了!
操作系統(tǒng)中(以RT-Thread為例),main 算一個任務(wù)嗎?算!RT-thread 將它設(shè)置為一個線程,所以系統(tǒng)中的所有的
線程都是一個個的任務(wù);
中斷處理函數(shù)算一個個的任務(wù)嗎?算!除此之外還有任務(wù)嗎?沒了!
那么這里所說的任務(wù)是靠什么劃分的?你總不能拍腦袋就定下來了吧,總要有依據(jù)有標(biāo)準(zhǔn)吧!
靠什么?靠的是一個函數(shù)會不會被另一個函數(shù)打斷執(zhí)行!
何為打斷?打斷是指一個函數(shù)正在執(zhí)行,然后中間因?yàn)槟撤N原因,不得不先執(zhí)行別的函數(shù),此時(shí)需要把一些臨界資源保存以供下次恢復(fù)使用?
何為臨界資源?說白了,就是一個個 CPU 寄存器,當(dāng)你進(jìn)行任務(wù)切換時(shí),為了保證下次能回到正確的位置正確的執(zhí)行,就需要把寄存器保存起來。
線程能在這稱之為任務(wù),就是因?yàn)楫?dāng)它被打斷執(zhí)行時(shí)(可能執(zhí)行其他任務(wù),可能執(zhí)行中斷處理函數(shù)),需要保存臨界資源;而中斷處理函數(shù)也是如此,雖然它保存臨界資源時(shí)不是由CPU執(zhí)行的,但它也要由硬件自動完成(STM32是如此)。
所以,這里所說的任務(wù)和你想的任務(wù)稍有不同,但為了方便理解,繼續(xù)使用“
任務(wù)”一詞。
當(dāng)一個資源同時(shí)被兩個以上任務(wù)(比如一個線程和一個中斷函數(shù))所使用時(shí),為了防止一個任務(wù)正在使用過程中被其他任務(wù)所使用,可能采取如下措施:
5、使用位帶(單片機(jī)提供,理解徹底后會發(fā)現(xiàn)和無鎖隊(duì)列類似,詳情看魚鷹信號量筆記)
現(xiàn)在就來看看無鎖隊(duì)列是如何實(shí)現(xiàn)無鎖的?
如果你認(rèn)真分析兩個函數(shù),你會發(fā)現(xiàn)不管是 get 函數(shù)還是 put 函數(shù),其實(shí)都只修改了一個全局變量,另一個全局變量只作為讀取操作。所以雖然 in 和out(size 一直都只是讀?。┩瑫r(shí)在兩個任務(wù)中使用,但并不需要加鎖。
當(dāng)然修改的位置也很重要,它們都是在數(shù)據(jù)
存儲或者提取之后才修改的全局變量,如果順序反了呢?以前魚鷹說順序的時(shí)候,都說先把資源鎖占領(lǐng)了再說,但這里卻不同,而是先使用資源,最后才修改,為什么?請自行思考。
那么為什么 V1.5 需要加鎖呢?還記得那個 used 全局變量嗎?
當(dāng) used 全局變量在一個任務(wù)中自加,在另一個任務(wù)中自減,而沒有任何保護(hù)措施,你認(rèn)為會發(fā)生什么?
你要知道,自加、自減C語言代碼只有一條,但匯編語句卻有多條。
首先讀取used的值,然后加1或減1。如果兩者之間被打斷了,就會出現(xiàn)問題。
比如used 開始等于 5,一個
任務(wù)
1讀取后保存到寄存器R0中準(zhǔn)備加1,然后被另一個
任務(wù)
2打斷,它也讀取了used,還是5,順利減 1,變成4,回到
任務(wù)
1繼續(xù)加 1,變成了 6。最終used變成了6,但實(shí)際上因?yàn)閳?zhí)行了
任務(wù)
2,它此時(shí)應(yīng)該還是 5的。
這樣一來,used 就不能準(zhǔn)確反映隊(duì)列中已使用的空間了,只能加鎖保護(hù)。
但是無鎖隊(duì)列卻不會出現(xiàn)這種情況,因?yàn)樗恍薷囊粋€屬于自己的那個變量,另一個變量只讀取,不修改,這才是無鎖實(shí)現(xiàn)的關(guān)鍵之處!
無鎖、無鎖,其實(shí)
是
有鎖!這里的鎖是in和out這兩把鎖,各司其職,實(shí)現(xiàn)了無鎖的效果(沒有用關(guān)中斷之類的鎖),使數(shù)組這個共享資源即使被兩個任務(wù)同時(shí)使用也不會出現(xiàn)問題。但是三個以上任務(wù)使用還能這么處理嗎?
當(dāng)然不能,因?yàn)檫@必然涉及到一個in或者out同時(shí)被兩個任務(wù)修改的情況,這樣一來就會出現(xiàn)used的情況,而且你還不能單純只鎖定in或者out,而需要同時(shí)把整個函數(shù)鎖定,否則就會出現(xiàn)問題。
好了,到此無鎖所有關(guān)鍵細(xì)節(jié)問題都清清楚楚,明明白白了,但是對于一個項(xiàng)目來說,這就足夠了?
如果你認(rèn)為掌握了理論就沒問題了,那你就太天真了!
先上點(diǎn)小菜,之前一直有說 % 和 & 兩個運(yùn)算符效率的問題,那么它們效率差別到底多大?
以前魚鷹剛開始接觸循環(huán)隊(duì)列時(shí),以魚鷹當(dāng)時(shí)的水平根本就沒有考慮效率的問題,就只是感覺 % 好神奇啊,然后到處用;也不明白為啥uCOS 的內(nèi)存管理不用 % ,畢竟它是那么的神奇,是吧!
后來無意中看到說 % 效率比 & 低,那么低多少?我們來測試一下 V1.0 和V2.0 的執(zhí)行效率好了。因?yàn)?V2.0 對大小有限制,那么就設(shè)置為 8 好了。
從測試結(jié)果來看1.44 us和1.26 us(72 M主頻,如何測量的下篇筆記告訴你,記得關(guān)注哦)相差也不是很大,14%左右的差距,不過數(shù)據(jù)量大的情況下還是選擇 & 吧,畢竟高效一些。
但是如果單純從 & 和 % 運(yùn)算角度來說,在STM32F103環(huán)境下能達(dá)到
3
倍的差距,這個自行測試就好了。
還有一個效率問題,那就是在單個隊(duì)列元素插入與獲取上。
因?yàn)閂2.5是通用型的put、get函數(shù),在單個元素的情況下效率必然不是很高,所以魚鷹針對單個元素的插入與獲取又封裝了兩個新函數(shù),通過測試對比put函數(shù),發(fā)現(xiàn)一個為1.60,一個0.67 us,差距還是挺大的(如果用在串口中斷中,當(dāng)然效率越高越好),所以需要根據(jù)場合使用合適的方式。
好了,現(xiàn)在來聊聊一個消費(fèi)者一個生產(chǎn)者模式下可能產(chǎn)生的BUG以及該如何解決吧。
如果min是一個函數(shù),那么V2.5代碼沒有任何問題,但是如果它是一個宏呢?
#define min(x,y) ((x) < (y) ? (x) : (y))
看著這個宏好像挺規(guī)范的,該有的括號都加上了,應(yīng)該不會出現(xiàn)問題才是,真的如此嗎?
魚鷹在很多筆記中都曾說過,如果只是對一個共享資源進(jìn)行讀取操作的話是不會出現(xiàn)問題的,但這只是針對共享資源本身而言是如此,
對于使用者來說,就不是這樣了!
魚鷹曾經(jīng)在《
入門 uCOS 操作系統(tǒng)的一點(diǎn)建議》中說過,當(dāng)時(shí)的魚鷹不明白一個判斷語句為什么要在判斷前關(guān)中斷,雖然當(dāng)時(shí)的理解是因?yàn)? 判斷和
修改應(yīng)該順序進(jìn)行而不應(yīng)該被打斷,但是其實(shí)
判斷和
再次
讀取在某種情況下也不可以被打斷。
當(dāng)你把 min 宏展開后,你就會發(fā)現(xiàn) in 或者 out 被兩次讀取了。
len = len < in - out ? len : in - out
假設(shè)get函數(shù)想讀入3個字節(jié)數(shù)據(jù),即len等于3,此時(shí)剛好隊(duì)列中也有3個數(shù)據(jù),即in – out 也等于3,3 < 3 ? 判斷失敗,跳轉(zhuǎn)到最后那條語句執(zhí)行,跳轉(zhuǎn)時(shí),另一個任務(wù)修改了in,增加了1,那么in – out 等于4,即最終 len 的值 為 4。
也就是說,本來你的程序應(yīng)該只獲取 3 個數(shù)據(jù)的,實(shí)際上卻獲取了4 個數(shù)據(jù),如果說你的應(yīng)用程序以最終的len值作為實(shí)際get的數(shù)據(jù),那么無鎖隊(duì)列還是那個無鎖隊(duì)列,怕就怕你的應(yīng)用程序會根據(jù)傳入的len和返回len值做一些判斷操作;還有一些可能就是應(yīng)用程序就只獲取3個數(shù)據(jù),只能少,不能多,否則程序就會出現(xiàn)問題。
總之,因?yàn)閮纱巫x取中斷導(dǎo)致無鎖隊(duì)列不再是你想的那個隊(duì)列了。
首先想到的就是把min這個宏改成函數(shù),為了提高一點(diǎn)效率,也可以用inline進(jìn)行聲明。
另一種高效方法是,在進(jìn)入get函數(shù)時(shí),把in - out的結(jié)果保存在局部變量L中(這樣可不必再申請一個變量,而且簡化了兩次讀取與運(yùn)算操作),用它再帶入宏中使用,這樣即使后面別的任務(wù)修改了in的值,也不會影響程序的運(yùn)行,只不過沒有非常及時(shí)提取最新數(shù)據(jù)而已。
從這里可以看出來,used變量因?yàn)楦北荆ū4嬷档郊拇嫫髦校┰驅(qū)е滦枰踊コ怄i,而這里卻不得不增加一個副本L來保證程序的正確運(yùn)行,所以,副本的好與壞只能因情況而異,一定要謹(jǐn)慎分析。
對嵌入式了解比較多的道友應(yīng)該清楚,多CPU和單CPU下可能出現(xiàn)代碼
亂序執(zhí)行的情況,有可能是編譯器修改了執(zhí)行順序,也有可能是硬件自動修改了執(zhí)行順序,那么上述的無鎖隊(duì)列就可能會出現(xiàn)問題(前面也說過順序很重要)。
而真正的linux內(nèi)核無鎖隊(duì)列實(shí)現(xiàn)其實(shí)是加入了內(nèi)存屏障的(這個可以看參考代碼),而因?yàn)镾TM32單片機(jī)比較簡單,所以魚鷹去掉了內(nèi)存屏障代碼。
但是還有一種情況,那就是數(shù)據(jù)緩存問題。我們知道,單片機(jī)(STM32存在
指令與數(shù)據(jù)預(yù)取功能)為了運(yùn)行效率,都采用流水線工作,而且為了提高效率,會預(yù)取多個數(shù)據(jù)或指令,如果說在切換任務(wù)時(shí)沒有把整個流水線清除,那么也可能出現(xiàn)問題,不過這個問題暫時(shí)沒有出現(xiàn),以后再看吧。
終于即將結(jié)束了,現(xiàn)在再簡單嘮嗑一下如何測試無鎖隊(duì)列或者鏈表問題。
我們知道隊(duì)列出隊(duì)入隊(duì)都是有順序的,如果我們在測試時(shí),把有規(guī)律的數(shù)據(jù)放入隊(duì)列中,那么獲取數(shù)據(jù)時(shí)也根據(jù)這個規(guī)律來進(jìn)行判斷是否出現(xiàn)問題,那么就可以實(shí)現(xiàn)自動檢測隊(duì)列是否正常運(yùn)行。
比如說,存入的數(shù)據(jù)一直自加,而接收時(shí)用一個變量記錄上次接收的數(shù)據(jù),然后和現(xiàn)在的數(shù)據(jù)進(jìn)行比對是否相差1,那么就能簡單判斷無鎖隊(duì)列的功能。
而為了測試溢出之后in小于out的情況(in和out實(shí)在是太大了,要讓他溢出太難等了),那么可以將in和out一開始設(shè)置為接近溢出值就行,比如 -7 等。
如果簡單的兩個線程間進(jìn)行壓力測試,那么你很難測出問題來,這是因?yàn)榫€程測試代碼量太少,大部分時(shí)間CPU都是空閑狀態(tài),所以函數(shù)總是能順序執(zhí)行而不會被打斷,如果想讓代碼可以被頻繁打斷以測試安全,那么將兩個函數(shù)分別放到
微秒級別的中斷處理函數(shù)中或許能夠測試出一些問題。
當(dāng)然以上方法只是比較粗淺的測試,真正還是要在實(shí)際中進(jìn)行長時(shí)間的穩(wěn)定性測試才行,只有這樣,你的程序才算是經(jīng)受住了考驗(yàn),否則紙上談兵終覺淺??!
本文來源:公眾號:【魚鷹談單片機(jī)】,作者:魚鷹Osprey