newFixedThreadPool
建立一個固定大小的線程池。
shutdown():用于關閉啟動線程,如果不調用該語句,jvm不會關閉。
awaitTermination():用于等待子線程結束,再繼續執行下面的代碼。該例中我設定一直等着子線程結束。
public class Test {
public static void main(String[] args) throws IOException, InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 4; i++) {
Runnable run = new Runnable() {
@Override
public void run() {
System.out.println("thread start");
}
};
service.execute(run);
}
service.shutdown();
service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
System.out.println("all thread complete");
}
}
輸出: thread start
thread start
thread start
thread start
all thread complete
newScheduledThreadPool
這個先不說,我喜歡用spring quartz.
CyclicBarrier
假設有隻有的一個場景:每個線程代表一個跑步運動員,當運動員都準備好後,才一起出發,隻要有一個人沒有準備好,大家都等待.
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
class Runner implements Runnable {
private CyclicBarrier barrier;
private String name;
public Runner(CyclicBarrier barrier, String name) {
super();
this.barrier = barrier;
this.name = name;
}
@Override
public void run() {
try {
Thread.sleep(1000 * (new Random()).nextInt(8));
System.out.println(name + " 準備OK.");
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(name + " Go!!");
}
}
public class Race {
public static void main(String[] args) throws IOException, InterruptedException {
CyclicBarrier barrier = new CyclicBarrier(3);
ExecutorService executor = Executors.newFixedThreadPool(3);
executor.submit(new Thread(new Runner(barrier, "zhangsan")));
executor.submit(new Thread(new Runner(barrier, "lisi")));
executor.submit(new Thread(new Runner(barrier, "wangwu")));
executor.shutdown();
}
}
輸出: wangwu 準備OK.
zhangsan 準備OK.
lisi 準備OK.
lisi Go!!
zhangsan Go!!
wangwu Go!!
ThreadPoolExecutor
newFixedThreadPool生成一個固定的線程池,顧名思義,線程池的線程是不會釋放的,即使它是Idle。這就會産生性能問題,比如如果線程池的大小為200,當全部使用完畢後,所有的線程會繼續留在池中,相應的記憶體和線程切換(while(true)+sleep循環)都會增加。如果要避免這個問題,就必須直接使用ThreadPoolExecutor()來構造。可以像Tomcat的線程池一樣設定“最大線程數”、“最小線程數”和“空閑線程keepAlive的時間”。
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
corePoolSize:池中所儲存的線程數,包括空閑線程(非最大同時幹活的線程數)。如果池中線程數多于 corePoolSize,則這些多出的線程在空閑時間超過 keepAliveTime 時将會終止。
maximumPoolSize:線程池中最大線程數
keepAliveTime:線程空閑回收的時間
unit:keepAliveTime的機關
workQueue:儲存任務的隊列,可以如下選擇:
- 無界隊列: new LinkedBlockingQueue<Runnable>();
- 有界隊列: new ArrayBlockingQueue<Runnable>(8);你不想讓用戶端無限的請求吃光你的CPU和記憶體吧,那就用有界隊列
handler:
當送出任務數大于隊列size會抛出
RejectedExecutionException,可選的值為:
- ThreadPoolExecutor.CallerRunsPolicy 等待隊列空閑
- ThreadPoolExecutor.DiscardPolicy:丢棄要插入隊列的任務
- ThreadPoolExecutor.DiscardOldestPolicy:删除隊頭的任務
關于corePoolSize和maximumPoolSize:
Java官方Docs寫道: 當新任務在方法 execute(java.lang.Runnable) 中送出時,如果運作的線程少于 corePoolSize,則建立新線程來處理請求(即使存在空閑線程)。如果運作的線程多于 corePoolSize 而少于 maximumPoolSize,則僅當隊列(queue)滿時才建立新線程。如果設定的 corePoolSize 和 maximumPoolSize 相同,則建立了固定大小的線程池。如果将 maximumPoolSize 設定為基本的無界值(如 Integer.MAX_VALUE),則允許池适應任意數量的并發任務。
public class Test {
public static void main(String[] args) {
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 1, TimeUnit.DAYS, queue);
for (int i = 0; i < 20; i++) {
final int index = i;
executor.execute(new Runnable() {
public void run() {
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("thread %d finished", index));
}
});
}
executor.shutdown();
}
}
原子變量(Atomic )
并發庫中的BlockingQueue是一個比較好玩的類,顧名思義,就是阻塞隊列。該類主要提供了兩個方法put()和take(),前者将一個對象放到隊列中,如果隊列已經滿了,就等待直到有空閑節點;後者從head取一個對象,如果沒有對象,就等待直到有可取的對象。
下面的例子比較簡單,一個讀線程,用于将要處理的檔案對象添加到阻塞隊列中,另外四個寫線程用于取出檔案對象,為了模拟寫操作耗時長的特點,特讓線程睡眠一段随機長度的時間。另外,該Demo也使用到了線程池和原子整型(AtomicInteger),AtomicInteger可以在并發情況下達到原子化更新,避免使用了synchronized,而且性能非常高。由于阻塞隊列的put和take操作會阻塞,為了使線程退出,在隊列中添加了一個“辨別”,算法中也叫“哨兵”,當發現這個哨兵後,寫線程就退出。
import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class Test {
static long randomTime() {
return (long) (Math.random() * 1000);
}
public static void main(String[] args) {
// 能容納100個檔案
final BlockingQueue<File> queue = new LinkedBlockingQueue<File>(100);
// 線程池
final ExecutorService exec = Executors.newFixedThreadPool(5);
final File root = new File("D:\\dist\\blank");
// 完成标志
final File exitFile = new File("");
// 讀個數
final AtomicInteger rc = new AtomicInteger();
// 寫個數
final AtomicInteger wc = new AtomicInteger();
// 讀線程
Runnable read = new Runnable() {
public void run() {
scanFile(root);
scanFile(exitFile);
}
public void scanFile(File file) {
if (file.isDirectory()) {
File[] files = file.listFiles(new FileFilter() {
public boolean accept(File pathname) {
return pathname.isDirectory() || pathname.getPath().endsWith(".log");
}
});
for (File one : files)
scanFile(one);
} else {
try {
int index = rc.incrementAndGet();
System.out.println("Read0: " + index + " " + file.getPath());
queue.put(file);
} catch (InterruptedException e) {
}
}
}
};
exec.submit(read);
// 四個寫線程
for (int index = 0; index < 4; index++) {
// write thread
final int num = index;
Runnable write = new Runnable() {
String threadName = "Write" + num;
public void run() {
while (true) {
try {
Thread.sleep(randomTime());
int index = wc.incrementAndGet();
File file = queue.take();
// 隊列已經無對象
if (file == exitFile) {
// 再次添加"标志",以讓其他線程正常退出
queue.put(exitFile);
break;
}
System.out.println(threadName + ": " + index + " " + file.getPath());
} catch (InterruptedException e) {
}
}
}
};
exec.submit(write);
}
exec.shutdown();
}
}
CountDownLatch
從名字可以看出,CountDownLatch是一個倒數計數的鎖,當倒數到0時觸發事件,也就是開鎖,其他人就可以進入了。在一些應用場合中,需要等待某個條件達到要求後才能做後面的事情;同時當線程都完成後也會觸發事件,以便進行後面的操作。
CountDownLatch最重要的方法是countDown()和await(),前者主要是倒數一次,後者是等待倒數到0,如果沒有到達0,就隻有阻塞等待了。
一個CountDouwnLatch執行個體是不能重複使用的,也就是說它是一次性的,鎖一經被打開就不能再關閉使用了,如果想重複使用,請考慮使用CyclicBarrier。
下面的例子簡單的說明了CountDownLatch的使用方法,模拟了100米賽跑,10名選手已經準備就緒,隻等裁判一聲令下。當所有人都到達終點時,比賽結束。
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Test {
public static void main(String[] args) throws InterruptedException {
// 開始的倒數鎖
final CountDownLatch begin = new CountDownLatch(1);
// 結束的倒數鎖
final CountDownLatch end = new CountDownLatch(10);
// 十名選手
final ExecutorService exec = Executors.newFixedThreadPool(10);
for (int index = 0; index < 10; index++) {
final int NO = index + 1;
Runnable run = new Runnable() {
public void run() {
try {
begin.await();
Thread.sleep((long) (Math.random() * 10000));
System.out.println("No." + NO + " arrived");
} catch (InterruptedException e) {
} finally {
end.countDown();
}
}
};
exec.submit(run);
}
System.out.println("Game Start");
begin.countDown();
end.await();
System.out.println("Game Over");
exec.shutdown();
}
}
使用Callable和Future實作線程等待和多線程傳回值
假設在main線程啟動一個線程,然後main線程需要等待子線程結束後,再繼續下面的操作,我們會通過join方法阻塞main線程,代碼如下:
Runnable runnable = ...;
Thread t = new Thread(runnable);
t.start();
t.join();
......
通過JDK1.5線程池管理的線程可以使用Callable和Future實作(join()方法無法應用到線上程池線程)
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Test {
public static void main(String[] args) throws InterruptedException, ExecutionException {
System.out.println("start main thread");
final ExecutorService exec = Executors.newFixedThreadPool(5);
Callable<String> call = new Callable<String>() {
public String call() throws Exception {
System.out.println(" start new thread.");
Thread.sleep(1000 * 5);
System.out.println(" end new thread.");
return "some value.";
}
};
Future<String> task = exec.submit(call);
Thread.sleep(1000 * 2);
task.get(); // 阻塞,并待子線程結束,
exec.shutdown();
exec.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
System.out.println("end main thread");
}
}
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* 多線程傳回值測試
*/
public class ThreadTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
System.out.println("start main thread");
int threadCount = 5;
final ExecutorService exec = Executors.newFixedThreadPool(threadCount);
List<Future<Integer>> tasks = new ArrayList<Future<Integer>>();
for (int i = 0; i < threadCount; i++) {
Callable<Integer> call = new Callable<Integer>() {
public Integer call() throws Exception {
Thread.sleep(1000);
return 1;
}
};
tasks.add(exec.submit(call));
}
long total = 0;
for (Future<Integer> future : tasks) {
total += future.get();
}
exec.shutdown();
System.out.println("total: " + total);
System.out.println("end main thread");
}
}
CompletionService
這個東西的使用上很類似上面的example,不同的是,它會首先取完成任務的線程。下面的參考文章裡,專門提到這個,大家有興趣可以看下,例子:
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Test {
public static void main(String[] args) throws InterruptedException,
ExecutionException {
ExecutorService exec = Executors.newFixedThreadPool(10);
CompletionService<String> serv =
new ExecutorCompletionService<String>(exec);
for (int index = 0; index < 5; index++) {
final int NO = index;
Callable<String> downImg = new Callable<String>() {
public String call() throws Exception {
Thread.sleep((long) (Math.random() * 10000));
return "Downloaded Image " + NO;
}
};
serv.submit(downImg);
}
Thread.sleep(1000 * 2);
System.out.println("Show web content");
for (int index = 0; index < 5; index++) {
Future<String> task = serv.take();
String img = task.get();
System.out.println(img);
}
System.out.println("End");
// 關閉線程池
exec.shutdown();
}
}
Semaphore信号量
拿到信号量的線程可以進入代碼,否則就等待。通過acquire()和release()擷取和釋放通路許可。下面的例子隻允許5個線程同時進入執行acquire()和release()之間的代碼
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class Test {
public static void main(String[] args) {
// 線程池
ExecutorService exec = Executors.newCachedThreadPool();
// 隻能5個線程同時通路
final Semaphore semp = new Semaphore(5);
// 模拟20個用戶端通路
for (int index = 0; index < 20; index++) {
final int NO = index;
Runnable run = new Runnable() {
public void run() {
try {
// 擷取許可
semp.acquire();
System.out.println("Accessing: " + NO);
Thread.sleep((long) (Math.random() * 10000));
// 通路完後,釋放
semp.release();
} catch (InterruptedException e) {
}
}
};
exec.execute(run);
}
// 退出線程池
exec.shutdown();
}
}
參考:
jdk1.5中的線程池使用簡介
http://www.java3z.com/cwbwebhome/article/article2/2875.html
CAS原理
http://www.blogjava.net/syniii/archive/2010/11/18/338387.html?opt=admin
jdk1.5中java.util.concurrent包編寫多線程
http://hi.baidu.com/luotoo/blog/item/b895c3c2d650591e0ef47731.html
ExecutorSerive vs CompletionService
http://www.coderanch.com/t/491704/threads/java/ExecutorSerive-vs-CompletionService
-- end --