雲智慧(北京)科技有限公司 陳鑫
一個排程器,兩個排程任務,分别處理兩個目錄下的txt檔案,某個排程任務應對某些複雜問題的時候會持續特别長的時間,甚至有一直阻塞的可能。我們需要一個manager來管理這些task,當這個task的上一次執行時間距離現在超過5個排程周期的時候,就直接停掉這個線程,然後再重新開機它,保證兩個目标目錄下沒有待處理的txt檔案堆積。
直接使用java預設的線程池排程task1和task2.由于外部txt的種種不可控原因,導緻task2線程阻塞。現象就是task1和線程池排程器都正常運作着,但是task2遲遲沒有動作。
當然,找到具體的阻塞原因并進行針對性解決是很重要的。但是,這種措施很可能并不能完全、徹底、全面的處理好所有未知情況。我們需要保證任務線程或者排程器的健壯性!
線程池排程器并沒有原生的針對被排程線程的業務運作狀态進行監控處理的api。因為task2是阻塞在我們的業務邏輯裡的,是以最好的方式是寫一個taskmanager,所有的任務線程在執行任務前全部到這個taskmanager這裡來注冊自己。這個taskmanager就負責對于每個自己管轄範圍内的task進行實時全程監控!
後面的重點就是如何處理超過5個執行周期的task了。
方案如下:
一旦發現這個task線程,立即中止它,然後再次重新開機;
一旦發現這個task線程,直接将整個pool清空并停止,重新放入這兩個task ——【task明确的情況下】;
中止後重新開機
task實作類
classfiletask extends thread {
private long lastexectime = 0;
protected long interval = 10000;
public long getlastexectime() {
returnlastexectime;
}
public void setlastexectime(longlastexectime) {
this.lastexectime =lastexectime;
public long getinterval() {
return interval;
public void setinterval(long interval) {
this.interval = interval;
public file[] getfiles() {
return null;
@override
public void run() {
while(!thread.currentthread().isinterrupted()) {
lastexectime = system.currenttimemillis();
system.out.println(thread.currentthread().getname() + " is running ->" + new date());
try {
thread.sleep(getinterval() * 6 * 1000);
} catch(interruptedexception e) {
thread.currentthread().interrupt();
e.printstacktrace(); // 當線程池shutdown之後,這裡就會抛出exception了
}
}
}
taskmanager
public class taskmanager implements runnable {
private final static log logger = logfactory.getlog(taskmanager.class);
public set<filetask> runners = newcopyonwritearrayset<filetask>();
executorservice pool =executors.newcachedthreadpool();
public voidregistercoderunnable(filetask process) {
runners.add(process);
publictaskmanager (set<filetask>runners) {
this.runners = runners;
long current = system.currenttimemillis();
for (filetask wrapper : runners) {
if (current - wrapper.getlastexectime() >wrapper.getinterval()* 5) {
wrapper.interrupt();
for (file file : wrapper.getfiles()) {
file.delete();
}
wrapper.start();
}
}
} catch(exception e1) {
logger.error("error happens when we trying to interrupt and restart a task");
exceptioncollector.registerexception(e1);
thread.sleep(500);
這段代碼會報錯java.lang.thread illegalthreadstateexception。為什麼呢?其實這是一個很基礎的問題,您應該不會像我一樣馬虎。檢視thread.start()的注釋, 有這樣一段:
it is never legal to start a thread more thanonce. in particular, a thread may not be restarted once it has completedexecution.