云智慧(北京)科技有限公司 陈鑫
重启线程池
taskmanager
public class taskmanager implements runnable {
…..
public taskmanager (set<filetask>runners) {
super();
this.runners = runners;
executetasks(runners);
}
private voidexecutetasks(set<filetask> runners) {
for (filetask task : runners){
pool.execute(task);
system.out.println(task.getclass().getsimplename() + " has beenstarted");
}
@override
public void run() {
while(!thread.currentthread().isinterrupted()) {
try {
long current = system.currenttimemillis();
for (filetask wrapper : runners) {
if (wrapper.getlastexectime() != 0 && current -wrapper.getlastexectime() > wrapper.getinterval() * 5 * 1000) { // 开始忘了乘以1000
wrapper.interrupt();
if (wrapper.getfiles() != null){
for (file file : wrapper.getfiles()){
file.delete();
}
}
system.out.println("going to shutdown thethread pool");
list<runnable> shutdownnow = pool.shutdownnow(); // 不等当前pool里的任务执行完,直接关闭线程池
for (runnable run : shutdownnow) {
system.out.println(run + " goingto be shutdown");
while (pool.awaittermination(1, timeunit.seconds)) {
system.out.println("the threadpool has been shutdown " + new date());
executetasks(runners);//重新执行
thread.sleep(200);
}
}
} catch(exception e1) {
e1.printstacktrace();
}
thread.sleep(500);
} catch(interruptedexception e) {
public static void main(string[] args) {
set<filetask> tasks =new hashset<filetask>();
filetask task = newfiletask();
task.setinterval(1);
task.setname("task-1");
tasks.add(task);
filetask task1 = newfiletask();
task1.setinterval(2);
task.setname("task-2");
tasks.add(task1);
taskmanager codemanager = new taskmanager (tasks);
newthread(codemanager).start();
}
成功!把整个的threadpoolexector里所有的worker全部停止,之后再向其队列里重新加入要执行的两个task(注意这里并没有清空,只是停止而已)。这样做虽然能够及时处理task,但是一个很致命的缺点在于,如果不能明确的知道threadpoolexecutor要执行的task,就没有办法重新执行这些任务。
定制线程池
好吧!停止钻研别人的东西!我们完全可以自己写一个自己的threadpoolexecutor,只要把worker暴露出来就可以了。这里是不是回想起前面的start问题来了,没错,我们即便能够直接针对thread进行interrupt, 但是不能再次start它了。那么clone一个同样的thread行不行呢?
thread
@override
protectedobject clone() throws clonenotsupportedexception{
throw newclonenotsupportedexception();
答案显而易见,线程是不支持clone 的。我们需要重新new 一个thread来重新运行。其实我们只需要将原来的worker里的runnable换成我们自己的task,然后将访问权限适当放开就可以了。还有,就是让我们的customthreadpoolexecutor继承thread,因为它需要定时监控自己的所有的worker里thread的运行状态。
customthreadpoolexecutor
public class customthreadpoolexecutor extendsthreadpoolexecutor implements runnable {
public voidexecute(testask command) {
….//将执行接口改为接收我们的业务类
…
private final class worker
extends abstractqueuedsynchronizer
implements runnable
{
…
testask firsttask; //将runnable改为我们的业务类,方便查看状态
…
worker(testask firsttask) {
…//同样将初始化参数改为我们的业务类
}
public staticvoid main(string[] args) {
customthreadpoolexecutor pool = new customthreadpoolexecutor(0, integer.max_value,
60l, timeunit.seconds,
newsynchronousqueue<runnable>());
testasktask = new testask();
task.setinterval(1);
pool.execute(task);
testasktask1 = new testask();
task1.setinterval(2);
pool.execute(task1);
newthread(pool).start();
}
@override
public voidrun() {
while(!thread.currentthread().isinterrupted()) {
try {
long current = system.currenttimemillis();
set<testask> toreexecute = new hashset<testask>();
system.out.println("\t number is " + number);
for(worker wrapper : workers) {
testask tt = wrapper.firsttask;
if (tt != null) {
if (current - tt.getlastexectime() > tt.getinterval() * 5 * 1000) {
wrapper.interruptifstarted();
remove(tt);
if (tt.getfiles() != null) {
for (file file: tt.getfiles()) {
file.delete();
}
}
system.out.println("thread is timeout : " + tt + " "+ new date());
toreexecute.add(tt);
}
}
}
if(toreexecute.size() > 0) {
mainlock.lock();
try {
for (testask tt :toreexecute) {
execute(tt); // execute this task again
}
} finally {
mainlock.unlock();
}
} catch(exception e1) {
system.out.println("error happens when we trying to interrupt andrestart a code task ");
}
thread.sleep(500);
} catch(interruptedexception e) {
testask
class testask implements runnable {
…..
lastexectime = system.currenttimemillis();
system.out.println(thread.currentthread().getname() + " is running-> " + new date());
customthreadpoolexecutor.number++;
thread.sleep(getinterval() * 6 * 1000);
system.out.println(thread.currentthread().getname() + " aftersleep");
thread.currentthread().interrupt();
system.out.println("interruptedexception happens");
system.out.println("going to die");
综上,最稳妥的就是使用jdk自带的threadpoolexecutor,如果需要对池里的task进行任意时间的控制,可以考虑全面更新,全方面,360度无死角的定制自己的线程池当然是最好的方案,但是一定要注意对于共享对象的处理,适当的处理好并发访问共享对象的方法。
鉴于我们的场景,由于时间紧,而且需要了解的task并不多,暂时选用全部重新更新的策略。上线后,抽时间把自己定制的threadpoolexecutor搞定,然后更新上去!