天天看點

主線程等待幾個子線程執行完成方案

 有時,為了程式的性能,我們有必要對程式中的for循環(含有sql/rpc操作)進行并發處理,要求是并發處理完之後才能繼續執行主線程。現給出如下兩種方案:

1. countdownlatch

主線程等待幾個子線程執行完成方案

package com.itlong.whatsmars.base.sync;  

import java.util.concurrent.countdownlatch;  

/** 

 * created by shenhongxi on 2016/8/12. 

 */  

public class countdownlatchtest {  

    public static void main(string[] args) {  

        countdownlatch latch = new countdownlatch(3);  

        long start = system.currenttimemillis();  

        for (int i = 0; i < 3; i++) {  

            new thread(new subrunnable(i, latch)).start();  

        }  

        try {  

            latch.await();  

        } catch (interruptedexception e) {  

            e.printstacktrace();  

        system.out.println(system.currenttimemillis() - start);  

        system.out.println("main finished");  

    }  

    static class subrunnable implements runnable {  

        private int id = -1;  

        private countdownlatch latch;  

        subrunnable(int id, countdownlatch latch) {  

            this.id = id;  

            this.latch = latch;  

        @override  

        public void run() {  

            try {  

                thread.sleep(3000);  

                system.out.println(string  

                        .format("sub thread %d finished", id));  

            } catch (interruptedexception e) {  

                e.printstacktrace();  

            } finally {  

                latch.countdown();  

            }  

}  

 countdownlatch用隊列來存放任務,主要是一個構造器和兩個方法,相關代碼這裡不予贅述。countdownlatch很貼合我們的要求,但沒用到線程池,而且latch是隻提供了計數功能然後子線程的邏輯有沒有可能會在主線程邏輯之後執行??,綜合考慮,我推薦下面的這種方案。

2. executorservice

主線程等待幾個子線程執行完成方案

import java.util.arraylist;  

import java.util.list;  

import java.util.concurrent.callable;  

import java.util.concurrent.executorservice;  

import java.util.concurrent.executors;  

public class callabletest {  

    public static void main(string[] args) throws exception {  

        executorservice pool = executors.newfixedthreadpool(3);  

        list<callable<void>> subs = new arraylist<callable<void>>();  

            subs.add(new subcallable(i));  

            pool.invokeall(subs);  

        } finally {  

            pool.shutdown();  

    static class subcallable implements callable<void> {  

        public subcallable(int id) {  

        public void call() throws exception {  

                        .format("child thread %d finished", id));  

            return null;  

 abstractexecutorservice

主線程等待幾個子線程執行完成方案

public <t> list<future<t>> invokeall(collection<? extends callable<t>> tasks)  

        throws interruptedexception {  

        if (tasks == null)  

            throw new nullpointerexception();  

        list<future<t>> futures = new arraylist<future<t>>(tasks.size());  

        boolean done = false;  

            for (callable<t> t : tasks) {  

                runnablefuture<t> f = newtaskfor(t);  

                futures.add(f);  

                execute(f);  

            for (future<t> f : futures) {  

                if (!f.isdone()) {  

                    try {  

                        f.get();  

                    } catch (cancellationexception ignore) {  

                    } catch (executionexception ignore) {  

                    }  

                }  

            done = true;  

            return futures;  

            if (!done)  

                for (future<t> f : futures)  

                    f.cancel(true);  

接下來我做了個join的試驗,發現同樣可以達到目的,但不推薦此法。

主線程等待幾個子線程執行完成方案

 * 子線程與主線程是順序執行的,各子線程之間還是異步的 

public class jointest {  

        thread t1 = new thread(new subrunnable(0));  

        thread t2 = new thread(new subrunnable(1));  

        thread t3 = new thread(new subrunnable(2));  

        t1.start();  

        t2.start();  

        t3.start();  

        t1.join();  

        t2.join();  

        t3.join();  

        subrunnable(int id) {  

                system.out.println("hi, i'm id-" + id);  

                thread.sleep(9000);  

最後,我們順便提下org.springframework.scheduling.concurrent.threadpooltaskexecutor

主線程等待幾個子線程執行完成方案

public class threadpooltaskexecutor extends executorconfigurationsupport implements schedulingtaskexecutor {  

    private final object poolsizemonitor = new object();  

    private int corepoolsize = 1;  

    private int maxpoolsize = integer.max_value;  

    private int keepaliveseconds = 60;  

    private boolean allowcorethreadtimeout = false;  

    private int queuecapacity = integer.max_value;  

    private threadpoolexecutor threadpoolexecutor;  

    /** 

     * set the threadpoolexecutor's core pool size. 

     * default is 1. 

     * <p><b>this setting can be modified at runtime, for example through jmx.</b> 

     */  

    public void setcorepoolsize(int corepoolsize) {  

        synchronized (this.poolsizemonitor) {  

            this.corepoolsize = corepoolsize;  

            if (this.threadpoolexecutor != null) {  

                this.threadpoolexecutor.setcorepoolsize(corepoolsize);  

     * return the threadpoolexecutor's core pool size. 

    public int getcorepoolsize() {  

            return this.corepoolsize;  

看到我們熟悉的threadpoolexecutor之後,我們瞬間明白了一切。

另外我們腦補下幾個接口/類的關系

主線程等待幾個子線程執行完成方案

public interface executorservice extends executor {  

  <t> list<future<t>> invokeall(collection<? extends callable<t>> tasks)  

        throws interruptedexception;  

public interface executor {  

    void execute(runnable command);  

public abstract class abstractexecutorservice implements executorservice{  

  public <t> list<future<t>> invokeall(collection<? extends callable<t>> tasks) {  

    // ...  

  }  

public class threadpoolexecutor extends abstractexecutorservice {  

  public threadpoolexecutor(int corepoolsize,  

                              int maximumpoolsize,  

                              long keepalivetime,  

                              timeunit unit,  

                              blockingqueue<runnable> workqueue) {  

        this(corepoolsize, maximumpoolsize, keepalivetime, unit, workqueue,  

             executors.defaultthreadfactory(), defaulthandler);  

原文連結:[http://wely.iteye.com/blog/2317944]