OpenMP實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型
生產(chǎn)者消費(fèi)者模型已經(jīng)很古老了吧,最近寫(xiě)了個(gè)OpenMP版的此模型之實(shí)現(xiàn),來(lái)分享下。
先說(shuō)一下模型的大致做法是:
1、生產(chǎn)者需要取任務(wù),生產(chǎn)產(chǎn)品。
2、消費(fèi)者需要取產(chǎn)品,消費(fèi)產(chǎn)品。
生產(chǎn)者在生產(chǎn)某個(gè)產(chǎn)品之后,要告知消費(fèi)者此產(chǎn)品已經(jīng)可以使用了。消費(fèi)者通過(guò)獲得可以使用這個(gè)信號(hào)來(lái)取得產(chǎn)品,進(jìn)一步消費(fèi)產(chǎn)品。
比如,我們有N個(gè)圖像需要對(duì)每一個(gè)圖像作濾波或者變換等處理,并且把處理后的結(jié)果存到硬盤(pán)上。
那么生產(chǎn)者可以將N個(gè)圖像看成N個(gè)任務(wù),每個(gè)任務(wù)都是獨(dú)立的,每個(gè)任務(wù)的計(jì)算結(jié)果可以看成是產(chǎn)品,消費(fèi)者就是取這個(gè)產(chǎn)品來(lái)寫(xiě)入硬盤(pán)。
先貼出一個(gè)實(shí)例代碼再作解釋。
#include#include#include#include#include#define?jobs?1000 #define?sz?102000 #if?defined(_WIN32)?&&?defined(_MSC_VER) #includedouble?abtic()?{ __int64?freq; __int64?clock; QueryPerformanceFrequency(?(LARGE_INTEGER?*)&freq?); QueryPerformanceCounter(?(LARGE_INTEGER?*)&clock?); return?(double)clock/freq*1000*1000; } #else #include#includedouble?abtic()?{ double?result?=?0.0; struct?timeval?tv; gettimeofday(?&tv,?NULL?); result?=?tv.tv_sec*1000*1000?+?tv.tv_usec; return?result; } #endif?/*?_WIN32?*/ #if?1 double?timer; #define?ABTMS?timer=abtic();fprintf(stdout,"%4d??",__LINE__) #define?ABTME?fprintf(stdout,"%4d??%8.8fmsn",__LINE__,(abtic()-timer)/1000.0f) #else #define?ABTMS? #define?ABTME? #endif int?main() { ??char?*jbNotReady; ??double?*a; ??double?*as; ??double?*pa; ??int?j,?k; char?jbnr; ??a?=?(double*)malloc(sz*jobs*sizeof(double)); ??as?=?(double*)malloc(jobs*sizeof(double)); ??jbNotReady?=?(char*)malloc(jobs*sizeof(char)); ??for?(j?=?0;?j?<?jobs;?j++) ??{ ????jbNotReady[j]?=?1; ???? ??} ??memset(a,?0,?sz*jobs*sizeof(double)); ??memset(as,?0,?jobs*sizeof(double)); ??ABTMS; #pragma?omp?parallel?sections?private(j,k,pa)?shared(jbNotReady,as,a) ??{ ????//?producer #pragma?omp?section ????{ ??????for?(j?=?0;?j?<?jobs;?j++) ??????{ ????????pa?=?a+j*sz; ????????for?(k?=?0;?k?<?sz;?k++) ????????{ ??????????pa[k]?=?1.0; ????????} ????????jbNotReady[j]?=?0; #pragma?omp?flush ??????} ????} ????//?consumer #pragma?omp?section ????{ ??????for?(j?=?0;?j?<?jobs;?j++) ??????{ #pragma?omp?flush ????????while?(jbNotReady[j]){ #pragma?omp?flush } ????????as[j]?=?0.0; ????????pa?=?a+j*sz; ????????for?(k?=?0;?k?<?sz;?k++) ????????{ ??????????as[j]?+=?pa[k]; ????????} ????????if?((int)(as[j])!=sz)fprintf(stdout,?"job?id?%3d?:%fn",?j,?as[j]); ??????} ????} ??} ??ABTME; ??free(a); ??free(as); ??free(jbNotReady); ??return?0; }
源代碼中,第一個(gè)section創(chuàng)建的線程扮演的就是生產(chǎn)者的角色,第二個(gè)section扮演消費(fèi)者角色。j這個(gè)變量模擬的是任務(wù)編號(hào),第一個(gè)section中的循環(huán)模擬產(chǎn)生產(chǎn)品。第二個(gè)section每次取一個(gè)任務(wù),而且是順序取,通過(guò)驗(yàn)證任務(wù)是否已經(jīng)準(zhǔn)備好來(lái)獲得正確的產(chǎn)品。
使用flush制導(dǎo)語(yǔ)句是為了將每個(gè)線程的緩存和內(nèi)存強(qiáng)制保持一致,注意生產(chǎn)者向jbNotReady里寫(xiě),而消費(fèi)者只是讀數(shù)據(jù),不會(huì)出現(xiàn)內(nèi)存中的數(shù)據(jù)寫(xiě)后讀,讀后寫(xiě)的問(wèn)題,每個(gè)線程獲得的數(shù)據(jù)都是安全的。
以上代碼支持Windows和Linux,GCC4.4以后的版本都可以執(zhí)行,Windows下只要支持OpenMP的編譯器,都可行。