雲智慧(北京)科技有限公司 陳鑫
重新開機線程池
taskmanager
public class taskmanager implements runnable {
…..
public taskmanager (set<filetask>runners) {
super();
this.runners = runners;
executetasks(runners);
}
private voidexecutetasks(set<filetask> runners) {
for (filetask task : runners){
pool.execute(task);
system.out.println(task.getclass().getsimplename() + " has beenstarted");
}
@override
public void run() {
while(!thread.currentthread().isinterrupted()) {
try {
long current = system.currenttimemillis();
for (filetask wrapper : runners) {
if (wrapper.getlastexectime() != 0 && current -wrapper.getlastexectime() > wrapper.getinterval() * 5 * 1000) { // 開始忘了乘以1000
wrapper.interrupt();
if (wrapper.getfiles() != null){
for (file file : wrapper.getfiles()){
file.delete();
}
}
system.out.println("going to shutdown thethread pool");
list<runnable> shutdownnow = pool.shutdownnow(); // 不等目前pool裡的任務執行完,直接關閉線程池
for (runnable run : shutdownnow) {
system.out.println(run + " goingto be shutdown");
while (pool.awaittermination(1, timeunit.seconds)) {
system.out.println("the threadpool has been shutdown " + new date());
executetasks(runners);//重新執行
thread.sleep(200);
}
}
} catch(exception e1) {
e1.printstacktrace();
}
thread.sleep(500);
} catch(interruptedexception e) {
public static void main(string[] args) {
set<filetask> tasks =new hashset<filetask>();
filetask task = newfiletask();
task.setinterval(1);
task.setname("task-1");
tasks.add(task);
filetask task1 = newfiletask();
task1.setinterval(2);
task.setname("task-2");
tasks.add(task1);
taskmanager codemanager = new taskmanager (tasks);
newthread(codemanager).start();
}
成功!把整個的threadpoolexector裡所有的worker全部停止,之後再向其隊列裡重新加入要執行的兩個task(注意這裡并沒有清空,隻是停止而已)。這樣做雖然能夠及時處理task,但是一個很緻命的缺點在于,如果不能明确的知道threadpoolexecutor要執行的task,就沒有辦法重新執行這些任務。
定制線程池
好吧!停止鑽研别人的東西!我們完全可以自己寫一個自己的threadpoolexecutor,隻要把worker暴露出來就可以了。這裡是不是回想起前面的start問題來了,沒錯,我們即便能夠直接針對thread進行interrupt, 但是不能再次start它了。那麼clone一個同樣的thread行不行呢?
thread
@override
protectedobject clone() throws clonenotsupportedexception{
throw newclonenotsupportedexception();
答案顯而易見,線程是不支援clone 的。我們需要重新new 一個thread來重新運作。其實我們隻需要将原來的worker裡的runnable換成我們自己的task,然後将通路權限适當放開就可以了。還有,就是讓我們的customthreadpoolexecutor繼承thread,因為它需要定時監控自己的所有的worker裡thread的運作狀态。
customthreadpoolexecutor
public class customthreadpoolexecutor extendsthreadpoolexecutor implements runnable {
public voidexecute(testask command) {
….//将執行接口改為接收我們的業務類
…
private final class worker
extends abstractqueuedsynchronizer
implements runnable
{
…
testask firsttask; //将runnable改為我們的業務類,友善檢視狀态
…
worker(testask firsttask) {
…//同樣将初始化參數改為我們的業務類
}
public staticvoid main(string[] args) {
customthreadpoolexecutor pool = new customthreadpoolexecutor(0, integer.max_value,
60l, timeunit.seconds,
newsynchronousqueue<runnable>());
testasktask = new testask();
task.setinterval(1);
pool.execute(task);
testasktask1 = new testask();
task1.setinterval(2);
pool.execute(task1);
newthread(pool).start();
}
@override
public voidrun() {
while(!thread.currentthread().isinterrupted()) {
try {
long current = system.currenttimemillis();
set<testask> toreexecute = new hashset<testask>();
system.out.println("\t number is " + number);
for(worker wrapper : workers) {
testask tt = wrapper.firsttask;
if (tt != null) {
if (current - tt.getlastexectime() > tt.getinterval() * 5 * 1000) {
wrapper.interruptifstarted();
remove(tt);
if (tt.getfiles() != null) {
for (file file: tt.getfiles()) {
file.delete();
}
}
system.out.println("thread is timeout : " + tt + " "+ new date());
toreexecute.add(tt);
}
}
}
if(toreexecute.size() > 0) {
mainlock.lock();
try {
for (testask tt :toreexecute) {
execute(tt); // execute this task again
}
} finally {
mainlock.unlock();
}
} catch(exception e1) {
system.out.println("error happens when we trying to interrupt andrestart a code task ");
}
thread.sleep(500);
} catch(interruptedexception e) {
testask
class testask implements runnable {
…..
lastexectime = system.currenttimemillis();
system.out.println(thread.currentthread().getname() + " is running-> " + new date());
customthreadpoolexecutor.number++;
thread.sleep(getinterval() * 6 * 1000);
system.out.println(thread.currentthread().getname() + " aftersleep");
thread.currentthread().interrupt();
system.out.println("interruptedexception happens");
system.out.println("going to die");
綜上,最穩妥的就是使用jdk自帶的threadpoolexecutor,如果需要對池裡的task進行任意時間的控制,可以考慮全面更新,全方面,360度無死角的定制自己的線程池當然是最好的方案,但是一定要注意對于共享對象的處理,适當的處理好并發通路共享對象的方法。
鑒于我們的場景,由于時間緊,而且需要了解的task并不多,暫時選用全部重新更新的政策。上線後,抽時間把自己定制的threadpoolexecutor搞定,然後更新上去!