天天看點

java使用預設線程池踩過的坑(二)

雲智慧(北京)科技有限公司  陳鑫

是的,一個線程不能夠啟動兩次。那麼它是怎麼判斷的呢?

public synchronized void start() {

        /**

         * a zero status valuecorresponds to state "new".    0對應的是state new

         */

        if (threadstatus!= 0)    //如果不是new state,就直接抛出異常!

            throw newillegalthreadstateexception();

        group.add(this);

        boolean started = false;

        try {

            start0();    // 啟動線程的native方法

            started = true;

        } finally {

            try {

               if (!started) {

                   group.threadstartfailed(this);

                }

            } catch(throwable ignore) {

            }

        }

    }

恩,隻有是new狀态才能夠調用native方法啟動一個線程。好吧,到這裡了,就普及也自補一下jvm裡的線程狀态:

所有的線程狀态::

l  new —— 還沒有啟動過

l  runnable  —— 正在jvm上運作着

l  blocked  —— 正在等待鎖/信号量被釋放

l  waiting  —— 等待其他某個線程的某個特定動作

l  timed_waiting —— a thread that iswaiting for another thread to perform an action for up to a specified waitingtime is in this state.

l  terminated —— 退出,停止

線程在某個時間點上隻可能存在一種狀态,這些狀态是jvm裡的,并不反映作業系統線程的狀态。查一下thread的api,沒有對其狀态進行修改的api。那麼這條路是不通的嗎?

仔細考慮一下……

如果把任務做成runnable實作類,然後在把這個實作類丢進線程池排程器之前,利用此runnable構造一個thread,是不是這個thread對象就能夠控制這個runnable對象,進而控制線上程池中運作着的task了呢?非也!讓我們看看thread和threadpoolexecutor對runnable的處理吧。

 thread

    /* what will berun. */

private runnabletarget;

結合上面的start()方法,很容易猜出,start0()會把target弄成一個線程來進行運作。

 threadpoolexecutor

public void execute(runnable command){

        if (command== null)

            thrownew nullpointerexception();

        int c =ctl.get();

        if(workercountof(c) < corepoolsize) {

            if (addworker(command, true))

               return;

            c =ctl.get();

        }

        if(isrunning(c) && workqueue.offer(command)) {

            intrecheck = ctl.get();

            if (!isrunning(recheck) && remove(command))

               reject(command);

            else if(workercountof(recheck) == 0)

               addworker(null, false);

        else if (!addworker(command, false))

           reject(command);

}

private boolean addworker(runnablefirsttask, boolean core) {

        …

        booleanworkerstarted = false;

        booleanworkeradded = false;

        worker w =null;

        try {

            finalreentrantlock mainlock = this.mainlock;

            w = newworker(firsttask);

            finalthread t = w.thread;

            if (t!= null) {

               mainlock.lock();

                try{

                   int c = ctl.get();

                   int rs = runstateof(c);

                   if (rs < shutdown ||

                       (rs == shutdown && firsttask == null)) {

                       if (t.isalive()) // precheck that t is startable

                            throw newillegalthreadstateexception();

workers.add(w);

                       int s = workers.size();

                       if (s > largestpoolsize)

                            largestpoolsize =s;

                       workeradded = true;

                   }

                }finally {

                   mainlock.unlock();

                }

                if(workeradded) {

t.start();

                   workerstarted = true;

            }

        } finally {

            if (!workerstarted)

               addworkerfailed(w);

        return workerstarted;

    }

那麼worker又是怎樣的呢?

 worker

private final class worker

        extendsabstractqueuedsynchronizer

        implementsrunnable

    {

        finalthread thread;

        runnablefirsttask;

        volatilelong completedtasks;

       worker(runnable firsttask) {

           setstate(-1); //調用runworker之前不可以interrupt

           this.firsttask = firsttask;

           this.thread = getthreadfactory().newthread(this);

        public voidrun() {

           runworker(this);

           ……   

           …….

        voidinterruptifstarted() {

            threadt;

            if(getstate() >= 0 && (t = thread) != null &&!t.isinterrupted()) {

t.interrupt();

                }catch (securityexception ignore) {

可見worker裡既包裝了runnable對象——task,又包裝了一個thread對象——以自己作為初始化參數,因為worker也是runnable對象。然後對外提供了運作與停止接口,run()和interruptifstarted()。回顧上面使用thread的例子不禁有了新的領悟,我們把一個thread對象交給threadpoolexecutor執行後,實際的調用是對thread(filetask())對象,我們暫時稱之為workerwrapper。那麼我們在池外進行filetask.interrupt()操作影響的是filetask對象,而不是workerwrapper。是以可能上面對于start()方法二次調用不是特别适當。更恰當的應該是在filetask.interrupt()的時候就跑出異常,因為從來沒有對filetask對象執行過start()方法,這時候去interrupt就會出現錯誤。具體如下圖:

java使用預設線程池踩過的坑(二)

分析到此,我們已經明确除了調用threadpoolexecutor了的interruptworkers()方法别無其他途徑操作這些worker了。

private void interruptworkers() {

        finalreentrantlock mainlock = this.mainlock;

       mainlock.lock();

            for(worker w : workers)

w.interruptifstarted();

           mainlock.unlock();

繼續閱讀