當(dāng)前位置:首頁 > 公眾號(hào)精選 > 架構(gòu)師社區(qū)
[導(dǎo)讀]背景 二胖上次寫完參數(shù)校驗(yàn)(《二胖寫參數(shù)校驗(yàn)的坎坷之路》)之后,領(lǐng)導(dǎo)一直不給他安排其他開發(fā)任務(wù),就一直讓他看看代碼熟悉業(yè)務(wù)。二胖每天上班除了偶爾跟坐在隔壁的前端小姐姐聊聊天,就是看看這些枯燥無味的業(yè)務(wù)代碼,無聊的一匹。雖然二胖已是久經(jīng)職場(chǎng)的

ich_media_content " id="js_content">

ce.com">

背景

二胖上次寫完參數(shù)校驗(yàn)(《二胖寫參數(shù)校驗(yàn)的坎坷之路》)之后,領(lǐng)導(dǎo)一直不給他安排其他開發(fā)任務(wù),就一直讓他看看代碼熟悉業(yè)務(wù)。二胖每天上班除了偶爾跟坐在隔壁的前端小姐姐聊聊天,就是看看這些枯燥無味的業(yè)務(wù)代碼,無聊的一匹。雖然二胖已是久經(jīng)職場(chǎng)的老油條了,但是看到同事們的周報(bào)都寫的滿滿的,而自己的周報(bào),就一兩行,熟悉了什么功能。心里還是慌得一匹,畢竟公司不養(yǎng)閑人啊。于是乎二胖終于鼓起勇氣為了向領(lǐng)導(dǎo)表明自己的上進(jìn)心,主動(dòng)向領(lǐng)導(dǎo)要開發(fā)任務(wù)。領(lǐng)導(dǎo)一看這小伙子這么有上進(jìn)心,于是就到任務(wù)看板里面挑了一個(gè)業(yè)務(wù)邏輯比較簡(jiǎn)單的任務(wù)分配給了二胖。二胖拿到這個(gè)任務(wù)屁顛屁顛的回到座位。任務(wù)比較簡(jiǎn)單,就是通過爬蟲去爬取某些賣機(jī)票(某豬、某攜、某團(tuán)等)的網(wǎng)站的一些機(jī)票,然后保存到數(shù)據(jù)庫。

同步入庫

二胖拿到任務(wù),三下五除二就把任務(wù)完成了。

 public static void main(String[] args) throws InterruptedException {
        String mouZhuFlightPrice = getMouZhuFlightPrice();
        String mouXieFlightPrice = getMouXieFlightPrice();
        String mouTuanFlightPrice = getMouTuanFlightPrice();
        saveDb(mouZhuFlightPrice);
        saveDb(mouXieFlightPrice);
        saveDb(mouTuanFlightPrice);
    }


    /**
     * 模擬請(qǐng)求某豬網(wǎng)站 爬取機(jī)票信息
     *
     *
     * @return
     * @throws InterruptedException
     */

    public static String getMouZhuFlightPrice() throws InterruptedException {
        // 模擬請(qǐng)求某豬網(wǎng)站 爬取機(jī)票信息
        Thread.sleep(10000);
        return "獲取到某豬網(wǎng)站的機(jī)票信息了";
    }

    /**
     * 模擬請(qǐng)求某攜網(wǎng)站 爬取機(jī)票信息
     *
     * @return
     * @throws InterruptedException
     */

    public static String getMouXieFlightPrice() throws InterruptedException {
        // 模擬請(qǐng)求某攜網(wǎng)站 爬取機(jī)票信息
        Thread.sleep(5000);
        return "獲取到某攜網(wǎng)站的機(jī)票信息了";
    }


    /**
     * 模擬請(qǐng)求團(tuán)網(wǎng)站 爬取機(jī)票信息
     *
     * @return
     * @throws InterruptedException
     */

    public static String getMouTuanFlightPrice() throws InterruptedException {
        // 模擬請(qǐng)求某團(tuán)網(wǎng)站 爬取機(jī)票信息
        Thread.sleep(3000);
        return "獲取到某團(tuán)網(wǎng)站的機(jī)票信息了";
    }

    /**
     * 保存DB
     *
     * @param flightPriceList
     */

    public static void saveDb(String flightPriceList) {
            // 解析字符串 進(jìn)行異步入庫
    }

這次二胖學(xué)乖了,任務(wù)完成了先去找下坐他對(duì)面的技術(shù)大拿(看他那發(fā)際線就知道了)同事“二狗”讓二狗大拿幫忙指點(diǎn)一二,看看代碼是否還能有優(yōu)化的地方。畢竟領(lǐng)導(dǎo)對(duì)代碼的性能、以及代碼的優(yōu)雅是有要求的。領(lǐng)導(dǎo)多次在部門的周會(huì)上提到讓我們多看看“二狗”寫的代碼,學(xué)習(xí)下人家寫代碼的優(yōu)雅、抽象、封裝等等。二狗大概的瞄了下二胖寫的代碼,提出了個(gè)小小的建議“這個(gè)代碼可以采用多線程來優(yōu)化下哦,你看某豬這個(gè)網(wǎng)站耗時(shí)是拿到結(jié)果需要10s,其他的耗時(shí)都比它短,先有結(jié)果的我們可以先處理的,不需要等到大家都返回了再來處理的”。

輪循futureList獲取結(jié)果

幸好二胖對(duì)多線程了解一點(diǎn)點(diǎn),于是乎采用future的方式來實(shí)現(xiàn)。二胖使用一個(gè)List來保存每個(gè)任務(wù)返回的Future,然后去輪詢這些Future,直到每個(gè)Future都已完成。由于需要先完成的任務(wù)需要先執(zhí)行,且不希望出現(xiàn)因?yàn)榕旁谇懊娴娜蝿?wù)阻塞導(dǎo)致后面先完成的任務(wù)的結(jié)果沒有及時(shí)獲取的情況,所以在調(diào)用get方式時(shí),需要將超時(shí)時(shí)間設(shè)置為0。

  public static void main(String[] args) {
        int taskSize = 3;
        Future<String> mouZhuFlightPriceFuture = executor.submit(() -> getMouZhuFlightPrice());
        Future<String> mouXieFlightPriceFuture = executor.submit(() -> getMouXieFlightPrice());
        Future<String> mouTuanFlightPriceFuture = executor.submit(() -> getMouTuanFlightPrice());
        List<Future<String>> futureList = new ArrayList<>();
        futureList.add(mouZhuFlightPriceFuture);
        futureList.add(mouXieFlightPriceFuture);
        futureList.add(mouTuanFlightPriceFuture);
        // 輪詢,獲取完成任務(wù)的返回結(jié)果
        while (taskSize > 0) {
            for (Future<String> future : futureList) {
                String result = null;
                try {
                    result = future.get(0, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    taskSize--;
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    taskSize--;
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    // 超時(shí)異常需要忽略,因?yàn)槲覀冊(cè)O(shè)置了等待時(shí)間為0,只要任務(wù)沒有完成,就會(huì)報(bào)該異常
                }
                // 任務(wù)已經(jīng)完成
                if (result != null) {
                    System.out.println("result=" + result);
                    // 從future列表中刪除已經(jīng)完成的任務(wù)
                    futureList.remove(future);
                    taskSize--;
                    // 此處必須break,否則會(huì)拋出并發(fā)修改異常。(也可以通過將futureList聲明為CopyOnWriteArrayList類型解決)
                    break// 進(jìn)行下一次while循環(huán)
                }
            }
        }
    }

上述代碼有兩個(gè)小細(xì)節(jié)需要注意下:

  • 如采用ArrayList的話futureList刪除之后需要break進(jìn)行下一次while循環(huán),否則會(huì)產(chǎn)生我們意想不到的ConcurrentModificationException異常。具體原因可看下《ArrayList的刪除姿勢(shì)你都掌握了嗎》這個(gè)文章,里面有詳細(xì)的介紹。

  • 在捕獲了InterruptedExceptionExecutionException異常后記得 taskSize--否則就會(huì)發(fā)生死循環(huán)。如果生產(chǎn)發(fā)生了死循環(huán)你懂的,cpu被你打滿,程序假死等。你離被開除也不遠(yuǎn)了。

  • 上面輪詢future列表非常的復(fù)雜,而且還有很多異常需要處理,還有很多細(xì)節(jié)需要考慮,還有被開除的風(fēng)險(xiǎn)。所以這種方案也被pass了。

自定義BlockingQueue實(shí)現(xiàn)

  • 上述方案被 pass之后,二胖就在思考可以借用哪種數(shù)據(jù)來實(shí)現(xiàn)下先進(jìn)先出的功能,貌似隊(duì)列可以實(shí)現(xiàn)下這個(gè)功能。所以二胖又寫了一版采用隊(duì)列來實(shí)現(xiàn)的功能。
  final static ExecutorService executor = new ThreadPoolExecutor(66,
            0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Future<String> mouZhuFlightPriceFuture = executor.submit(() -> getMouZhuFlightPrice());
        Future<String> mouXieFlightPriceFuture = executor.submit(() -> getMouXieFlightPrice());
        Future<String> mouTuanFlightPriceFuture = executor.submit(() -> getMouTuanFlightPrice());

        // 創(chuàng)建阻塞隊(duì)列
        BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(3);
        executor.execute(() -> run(mouZhuFlightPriceFuture, blockingQueue));
        executor.execute(() -> run(mouXieFlightPriceFuture, blockingQueue));
        executor.execute(() -> run(mouTuanFlightPriceFuture, blockingQueue));
        // 異步保存所有機(jī)票價(jià)格
        for (int i = 0; i < 3; i++) {
            String result = blockingQueue.take();
            System.out.println(result);
            saveDb(result);
        }
    }

    private static void run(Future<String> flightPriceFuture, BlockingQueue<String> blockingQueue) {
        try {
            blockingQueue.put(flightPriceFuture.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
  • 這次比上個(gè)版本好多了,代碼也簡(jiǎn)潔多了。不過按理說這種需求應(yīng)該是大家經(jīng)常遇到的,應(yīng)該不需要自己來實(shí)現(xiàn)把, JAVA這么貼心的語言應(yīng)該會(huì)有 api可以直接拿來用吧。

CompletionService實(shí)現(xiàn)

  • 二胖現(xiàn)在畢竟也是對(duì)代碼的簡(jiǎn)潔性有追求的人了。于是乎二胖去翻翻自己躺在書柜里吃灰的并發(fā)相關(guān)的書籍,看看是否有解決方案。 還在使用Future輪詢獲取結(jié)果嗎?CompletionService快來了解下。終于皇天不負(fù)有心人在二胖快要放棄的時(shí)候突然發(fā)現(xiàn)了新大陸。  《Java并發(fā)編程實(shí)戰(zhàn)》一書6.3.5節(jié) CompletionService:ExecutorBlockingQueue,有這樣一段話:

如果向Executor提交了一組計(jì)算任務(wù),并且希望在計(jì)算完成后獲得結(jié)果,那么可以保留與每個(gè)任務(wù)關(guān)聯(lián)的Future,然后反復(fù)使用get方法,同時(shí)將參數(shù)timeout指定為0,從而通過輪詢來判斷任務(wù)是否完成。這種方法雖然可行,但卻有些繁瑣。幸運(yùn)的是,還有一種更好的方法:完成服務(wù)CompletionService。

  final static ExecutorService executor = new ThreadPoolExecutor(66,
            0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletionService completionService = new ExecutorCompletionService(executor);
        completionService.submit(() -> getMouZhuFlightPrice());
        completionService.submit(() -> getMouXieFlightPrice());
        completionService.submit(() -> getMouTuanFlightPrice());
        for (int i = 0; i < 3; i++) {
            String result = (String)completionService.take().get();
            System.out.println(result);
            saveDb(result);
        }
    }

當(dāng)我們使用了CompletionService不用遍歷future列表,也不需要去自定義隊(duì)列了,代碼變得簡(jiǎn)潔了。下面我們就來分析下CompletionService實(shí)現(xiàn)的原理吧。

CompletionService 介紹

  • 我們可以先看下 JDK源碼中 CompletionServicejavadoc說明吧
/**
 * A service that decouples the production of new asynchronous tasks
 * from the consumption of the results of completed tasks.  Producers
 * {@code submit} tasks for execution. Consumers {@code take}
 * completed tasks and process their results in the order they
 * complete.

大概意思是CompletionService實(shí)現(xiàn)了生產(chǎn)者提交任務(wù)和消費(fèi)者獲取結(jié)果的解耦,生產(chǎn)者和消費(fèi)者都不用關(guān)心任務(wù)的完成順序,由CompletionService來保證,消費(fèi)者一定是按照任務(wù)完成的先后順序來獲取執(zhí)行結(jié)果。

成員變量

既然需要按照任務(wù)的完成順序獲取結(jié)果,那內(nèi)部應(yīng)該也是通過隊(duì)列來實(shí)現(xiàn)的吧。打開源碼我們可以看到,里面有三個(gè)成員變量

public class ExecutorCompletionService<Vimplements CompletionService<V{
 // 執(zhí)行task的線程池,創(chuàng)建CompletionService必須指定;
    private final Executor executor;
    //主要用于創(chuàng)建待執(zhí)行task;
    private final AbstractExecutorService aes;
    //存儲(chǔ)已完成狀態(tài)的task,默認(rèn)是基于鏈表結(jié)構(gòu)的阻塞隊(duì)列LinkedBlockingQueue。     
    private final BlockingQueue<Future<V>> completionQueue;

任務(wù)提交

ExecutorCompletionService任務(wù)的提交和執(zhí)行都是委托給Executor來完成。當(dāng)提交某個(gè)任務(wù)時(shí),該任務(wù)首先將被包裝為一個(gè)QueueingFuture

public Future<V> submit(Callable<V> task) {
        if (task == nullthrow new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }

任務(wù)完成后何時(shí)進(jìn)入隊(duì)列

還在使用Future輪詢獲取結(jié)果嗎?CompletionService快來了解下。從源碼可以看出,QueueingFutureFutureTask的子類,實(shí)現(xiàn)了done方法,在task執(zhí)行完成之后將當(dāng)前task添加到completionQueue,將返回結(jié)果加入到阻塞隊(duì)列中,加入的順序就是任務(wù)完成的先后順序。done方法的具體調(diào)用在FutureTaskfinishCompletion方法。

獲取已完成任務(wù)

 public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }

    public Future<V> poll() {
        return completionQueue.poll();
    }

    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException 
{
        return completionQueue.poll(timeout, unit);
    }

takepoll都是調(diào)用BlockingQueue提供的方法。

  • take() 獲取任務(wù)阻塞,直到可以拿到任務(wù)為止。
  • poll() 獲取任務(wù)不阻塞,如果沒有獲取到任務(wù)直接返回 null。
  • poll(long timeout, TimeUnit unit) 帶超時(shí)時(shí)間等待的獲取任務(wù)方法( 一般推薦使用這種

總結(jié)

  • CompletionService 把線程池  Executor 和阻塞隊(duì)列  BlockingQueue融合在一起,能夠讓批異步任務(wù)的管理更簡(jiǎn)單,將生產(chǎn)者提交任務(wù)和消費(fèi)者獲取結(jié)果的解耦。
  • CompletionService 能夠讓異步任務(wù)的執(zhí)行結(jié)果有序化,先執(zhí)行完的先進(jìn)入阻塞隊(duì)列,利用這個(gè)特性,我們可以輕松實(shí)現(xiàn)后續(xù)處理的有序性,避免無謂的等待。
  • 參考
  • 《java并發(fā)編程實(shí)戰(zhàn)》
    https://www.jianshu.com/p/19093422dd57 https://blog.csdn.net/cbjcry/article/details/84222853 https://www.jianshu.com/p/493ae1b107e4


  • 特別推薦一個(gè)分享架構(gòu)+算法的優(yōu)質(zhì)內(nèi)容,還沒關(guān)注的小伙伴,可以長(zhǎng)按關(guān)注一下:

    還在使用Future輪詢獲取結(jié)果嗎?CompletionService快來了解下。

    還在使用Future輪詢獲取結(jié)果嗎?CompletionService快來了解下。

  • 還在使用Future輪詢獲取結(jié)果嗎?CompletionService快來了解下。

  • 長(zhǎng)按訂閱更多精彩▼

    visible !important;width: 205px !important;" src="/images/21ic_nopic.gif" data-src="ee95QH2pLD5+VulxaIWieu/sWg55ZRRqvBRKcDqaznlYgk+/6rxO6H7/8nYs69ZtmQ6GuYUDZLUNalfS+8FKl2fpvXUOencKBa0l" class="delay_img" alt="還在使用Future輪詢獲取結(jié)果嗎?CompletionService快來了解下。" >

    ce: normal;text-align: right;line-height: 2em;box-sizing: border-box !important;word-wrap: break-word !important;">如有收獲,點(diǎn)個(gè)在看,誠(chéng)摯感謝

免責(zé)聲明:本文內(nèi)容由21ic獲得授權(quán)后發(fā)布,版權(quán)歸原作者所有,本平臺(tái)僅提供信息存儲(chǔ)服務(wù)。文章僅代表作者個(gè)人觀點(diǎn),不代表本平臺(tái)立場(chǎng),如有問題,請(qǐng)聯(lián)系我們,謝謝!

本站聲明: 本文章由作者或相關(guān)機(jī)構(gòu)授權(quán)發(fā)布,目的在于傳遞更多信息,并不代表本站贊同其觀點(diǎn),本站亦不保證或承諾內(nèi)容真實(shí)性等。需要轉(zhuǎn)載請(qǐng)聯(lián)系該專欄作者,如若文章內(nèi)容侵犯您的權(quán)益,請(qǐng)及時(shí)聯(lián)系本站刪除。
換一批
延伸閱讀

9月2日消息,不造車的華為或?qū)⒋呱龈蟮莫?dú)角獸公司,隨著阿維塔和賽力斯的入局,華為引望愈發(fā)顯得引人矚目。

關(guān)鍵字: 阿維塔 塞力斯 華為

倫敦2024年8月29日 /美通社/ -- 英國(guó)汽車技術(shù)公司SODA.Auto推出其旗艦產(chǎn)品SODA V,這是全球首款涵蓋汽車工程師從創(chuàng)意到認(rèn)證的所有需求的工具,可用于創(chuàng)建軟件定義汽車。 SODA V工具的開發(fā)耗時(shí)1.5...

關(guān)鍵字: 汽車 人工智能 智能驅(qū)動(dòng) BSP

北京2024年8月28日 /美通社/ -- 越來越多用戶希望企業(yè)業(yè)務(wù)能7×24不間斷運(yùn)行,同時(shí)企業(yè)卻面臨越來越多業(yè)務(wù)中斷的風(fēng)險(xiǎn),如企業(yè)系統(tǒng)復(fù)雜性的增加,頻繁的功能更新和發(fā)布等。如何確保業(yè)務(wù)連續(xù)性,提升韌性,成...

關(guān)鍵字: 亞馬遜 解密 控制平面 BSP

8月30日消息,據(jù)媒體報(bào)道,騰訊和網(wǎng)易近期正在縮減他們對(duì)日本游戲市場(chǎng)的投資。

關(guān)鍵字: 騰訊 編碼器 CPU

8月28日消息,今天上午,2024中國(guó)國(guó)際大數(shù)據(jù)產(chǎn)業(yè)博覽會(huì)開幕式在貴陽舉行,華為董事、質(zhì)量流程IT總裁陶景文發(fā)表了演講。

關(guān)鍵字: 華為 12nm EDA 半導(dǎo)體

8月28日消息,在2024中國(guó)國(guó)際大數(shù)據(jù)產(chǎn)業(yè)博覽會(huì)上,華為常務(wù)董事、華為云CEO張平安發(fā)表演講稱,數(shù)字世界的話語權(quán)最終是由生態(tài)的繁榮決定的。

關(guān)鍵字: 華為 12nm 手機(jī) 衛(wèi)星通信

要點(diǎn): 有效應(yīng)對(duì)環(huán)境變化,經(jīng)營(yíng)業(yè)績(jī)穩(wěn)中有升 落實(shí)提質(zhì)增效舉措,毛利潤(rùn)率延續(xù)升勢(shì) 戰(zhàn)略布局成效顯著,戰(zhàn)新業(yè)務(wù)引領(lǐng)增長(zhǎng) 以科技創(chuàng)新為引領(lǐng),提升企業(yè)核心競(jìng)爭(zhēng)力 堅(jiān)持高質(zhì)量發(fā)展策略,塑強(qiáng)核心競(jìng)爭(zhēng)優(yōu)勢(shì)...

關(guān)鍵字: 通信 BSP 電信運(yùn)營(yíng)商 數(shù)字經(jīng)濟(jì)

北京2024年8月27日 /美通社/ -- 8月21日,由中央廣播電視總臺(tái)與中國(guó)電影電視技術(shù)學(xué)會(huì)聯(lián)合牽頭組建的NVI技術(shù)創(chuàng)新聯(lián)盟在BIRTV2024超高清全產(chǎn)業(yè)鏈發(fā)展研討會(huì)上宣布正式成立。 活動(dòng)現(xiàn)場(chǎng) NVI技術(shù)創(chuàng)新聯(lián)...

關(guān)鍵字: VI 傳輸協(xié)議 音頻 BSP

北京2024年8月27日 /美通社/ -- 在8月23日舉辦的2024年長(zhǎng)三角生態(tài)綠色一體化發(fā)展示范區(qū)聯(lián)合招商會(huì)上,軟通動(dòng)力信息技術(shù)(集團(tuán))股份有限公司(以下簡(jiǎn)稱"軟通動(dòng)力")與長(zhǎng)三角投資(上海)有限...

關(guān)鍵字: BSP 信息技術(shù)
關(guān)閉
關(guān)閉