天天看點

java使用預設線程池踩過的坑(三)

雲智慧(北京)科技有限公司  陳鑫

重新開機線程池

 taskmanager

public class taskmanager implements runnable {

    …..

    public taskmanager (set<filetask>runners) {

        super();

        this.runners = runners;

        executetasks(runners);

    }

    private voidexecutetasks(set<filetask> runners) {

        for (filetask task : runners){

           pool.execute(task);

           system.out.println(task.getclass().getsimplename() + " has beenstarted");

        }

    @override

    public void run() {

        while(!thread.currentthread().isinterrupted()) {

            try {

               long current = system.currenttimemillis();

               for (filetask wrapper : runners) {

                   if (wrapper.getlastexectime() != 0 && current -wrapper.getlastexectime() > wrapper.getinterval() * 5 * 1000) {   // 開始忘了乘以1000

                       wrapper.interrupt();

                       if (wrapper.getfiles() != null){

                           for (file file : wrapper.getfiles()){

                               file.delete();

                           }

                       }

                       system.out.println("going to shutdown thethread pool");

                       list<runnable> shutdownnow = pool.shutdownnow();    // 不等目前pool裡的任務執行完,直接關閉線程池

                       for (runnable run : shutdownnow) {

                           system.out.println(run + " goingto be shutdown");

                      while (pool.awaittermination(1, timeunit.seconds)) { 

                           system.out.println("the threadpool has been shutdown " + new date());

                           executetasks(runners);//重新執行

                           thread.sleep(200);

                   }

                }

            } catch(exception e1) {

               e1.printstacktrace();

            }

               thread.sleep(500);

            } catch(interruptedexception e) {

    public static void main(string[] args) {

        set<filetask> tasks =new hashset<filetask>();

        filetask task = newfiletask();

        task.setinterval(1);

        task.setname("task-1");

        tasks.add(task);

        filetask task1 = newfiletask();

        task1.setinterval(2);

       task.setname("task-2");

        tasks.add(task1);

        taskmanager  codemanager = new taskmanager (tasks);

        newthread(codemanager).start();

}

成功!把整個的threadpoolexector裡所有的worker全部停止,之後再向其隊列裡重新加入要執行的兩個task(注意這裡并沒有清空,隻是停止而已)。這樣做雖然能夠及時處理task,但是一個很緻命的缺點在于,如果不能明确的知道threadpoolexecutor要執行的task,就沒有辦法重新執行這些任務。

定制線程池

好吧!停止鑽研别人的東西!我們完全可以自己寫一個自己的threadpoolexecutor,隻要把worker暴露出來就可以了。這裡是不是回想起前面的start問題來了,沒錯,我們即便能夠直接針對thread進行interrupt, 但是不能再次start它了。那麼clone一個同樣的thread行不行呢?

 thread

@override

    protectedobject clone() throws clonenotsupportedexception{

        throw newclonenotsupportedexception();

答案顯而易見,線程是不支援clone 的。我們需要重新new 一個thread來重新運作。其實我們隻需要将原來的worker裡的runnable換成我們自己的task,然後将通路權限适當放開就可以了。還有,就是讓我們的customthreadpoolexecutor繼承thread,因為它需要定時監控自己的所有的worker裡thread的運作狀态。

  customthreadpoolexecutor

public class customthreadpoolexecutor extendsthreadpoolexecutor implements runnable {

         public voidexecute(testask command) {

….//将執行接口改為接收我們的業務類

         …

         private final class worker

        extends abstractqueuedsynchronizer

        implements runnable

    {

        …

testask firsttask; //将runnable改為我們的業務類,友善檢視狀态

                   …

                   worker(testask firsttask) {

            …//同樣将初始化參數改為我們的業務類

        }

    public staticvoid main(string[] args) {

       customthreadpoolexecutor pool = new customthreadpoolexecutor(0, integer.max_value,

               60l, timeunit.seconds,

                newsynchronousqueue<runnable>());

        testasktask = new testask();

       task.setinterval(1);

       pool.execute(task);

        testasktask1 = new testask();

       task1.setinterval(2);

       pool.execute(task1);

        newthread(pool).start();

    }

    @override

    public voidrun() {

        while(!thread.currentthread().isinterrupted()) {

            try {

               long current = system.currenttimemillis();

               set<testask> toreexecute = new hashset<testask>();

               system.out.println("\t number is " + number);

                for(worker wrapper : workers) {

                    testask tt = wrapper.firsttask;

                   if (tt != null) {

                       if (current - tt.getlastexectime() > tt.getinterval() * 5 * 1000) {

wrapper.interruptifstarted();

                            remove(tt);

                           if (tt.getfiles() != null) {

                                for (file file: tt.getfiles()) {

                                   file.delete();

                                }

                            }

                           system.out.println("thread is timeout : " + tt + " "+ new date());

                           toreexecute.add(tt);

                       }

                   }

                }

 if(toreexecute.size() > 0) {

                    mainlock.lock();

                    try {

                        for (testask tt :toreexecute) {

                            execute(tt);    // execute this task again

                        }

                    } finally {

                       mainlock.unlock();

                    }

            } catch(exception e1) {

               system.out.println("error happens when we trying to interrupt andrestart a code task ");

            }

                thread.sleep(500);

            } catch(interruptedexception e) {

 testask

class testask implements runnable {

    …..

           lastexectime = system.currenttimemillis();

           system.out.println(thread.currentthread().getname() + " is running-> " + new date());

               customthreadpoolexecutor.number++;

               thread.sleep(getinterval() * 6 * 1000);

               system.out.println(thread.currentthread().getname() + " aftersleep");

thread.currentthread().interrupt();

               system.out.println("interruptedexception happens");

       system.out.println("going to die");

綜上,最穩妥的就是使用jdk自帶的threadpoolexecutor,如果需要對池裡的task進行任意時間的控制,可以考慮全面更新,全方面,360度無死角的定制自己的線程池當然是最好的方案,但是一定要注意對于共享對象的處理,适當的處理好并發通路共享對象的方法。

鑒于我們的場景,由于時間緊,而且需要了解的task并不多,暫時選用全部重新更新的政策。上線後,抽時間把自己定制的threadpoolexecutor搞定,然後更新上去!