天天看點

java.util.concurrent包API學習筆記

newFixedThreadPool

建立一個固定大小的線程池。

shutdown():用于關閉啟動線程,如果不調用該語句,jvm不會關閉。

awaitTermination():用于等待子線程結束,再繼續執行下面的代碼。該例中我設定一直等着子線程結束。

public class Test {

	public static void main(String[] args) throws IOException, InterruptedException {
		ExecutorService service = Executors.newFixedThreadPool(2);
		for (int i = 0; i < 4; i++) {
			Runnable run = new Runnable() {
				@Override
				public void run() {
					System.out.println("thread start");
				}
			};
			service.execute(run);
		}
		service.shutdown();
		service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
		System.out.println("all thread complete");
	}
}
           

 輸出: thread start

thread start

thread start

thread start

all thread complete

newScheduledThreadPool

這個先不說,我喜歡用spring quartz.

CyclicBarrier

假設有隻有的一個場景:每個線程代表一個跑步運動員,當運動員都準備好後,才一起出發,隻要有一個人沒有準備好,大家都等待.

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class Runner implements Runnable {

	private CyclicBarrier barrier;

	private String name;

	public Runner(CyclicBarrier barrier, String name) {
		super();
		this.barrier = barrier;
		this.name = name;
	}

	@Override
	public void run() {
		try {
			Thread.sleep(1000 * (new Random()).nextInt(8));
			System.out.println(name + " 準備OK.");
			barrier.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (BrokenBarrierException e) {
			e.printStackTrace();
		}
		System.out.println(name + " Go!!");
	}
}

public class Race {

	public static void main(String[] args) throws IOException, InterruptedException {
		CyclicBarrier barrier = new CyclicBarrier(3);

		ExecutorService executor = Executors.newFixedThreadPool(3);
		executor.submit(new Thread(new Runner(barrier, "zhangsan")));
		executor.submit(new Thread(new Runner(barrier, "lisi")));
		executor.submit(new Thread(new Runner(barrier, "wangwu")));

		executor.shutdown();
	}

}
           

輸出: wangwu 準備OK.

zhangsan 準備OK.

lisi 準備OK.

lisi Go!!

zhangsan Go!!

wangwu Go!!

ThreadPoolExecutor

newFixedThreadPool生成一個固定的線程池,顧名思義,線程池的線程是不會釋放的,即使它是Idle。這就會産生性能問題,比如如果線程池的大小為200,當全部使用完畢後,所有的線程會繼續留在池中,相應的記憶體和線程切換(while(true)+sleep循環)都會增加。如果要避免這個問題,就必須直接使用ThreadPoolExecutor()來構造。可以像Tomcat的線程池一樣設定“最大線程數”、“最小線程數”和“空閑線程keepAlive的時間”。

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)

corePoolSize:池中所儲存的線程數,包括空閑線程(非最大同時幹活的線程數)。如果池中線程數多于 corePoolSize,則這些多出的線程在空閑時間超過 keepAliveTime 時将會終止。

maximumPoolSize:線程池中最大線程數

keepAliveTime:線程空閑回收的時間

unit:keepAliveTime的機關

workQueue:儲存任務的隊列,可以如下選擇:

  •   無界隊列: new LinkedBlockingQueue<Runnable>();
  •   有界隊列: new ArrayBlockingQueue<Runnable>(8);你不想讓用戶端無限的請求吃光你的CPU和記憶體吧,那就用有界隊列

handler:

當送出任務數大于隊列size會抛出

RejectedExecutionException,可選的值為:

  • ThreadPoolExecutor.CallerRunsPolicy 等待隊列空閑
  • ThreadPoolExecutor.DiscardPolicy:丢棄要插入隊列的任務
  • ThreadPoolExecutor.DiscardOldestPolicy:删除隊頭的任務

關于corePoolSize和maximumPoolSize:

 Java官方Docs寫道: 當新任務在方法 execute(java.lang.Runnable) 中送出時,如果運作的線程少于 corePoolSize,則建立新線程來處理請求(即使存在空閑線程)。如果運作的線程多于 corePoolSize 而少于 maximumPoolSize,則僅當隊列(queue)滿時才建立新線程。如果設定的 corePoolSize 和 maximumPoolSize 相同,則建立了固定大小的線程池。如果将 maximumPoolSize 設定為基本的無界值(如 Integer.MAX_VALUE),則允許池适應任意數量的并發任務。

public class Test {

	public static void main(String[] args) {
		BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
		ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 1, TimeUnit.DAYS, queue);

		for (int i = 0; i < 20; i++) {
			final int index = i;
			executor.execute(new Runnable() {
				public void run() {
					try {
						Thread.sleep(4000);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					System.out.println(String.format("thread %d finished", index));
				}
			});
		}
		executor.shutdown();
	}
}
           

原子變量(Atomic )

并發庫中的BlockingQueue是一個比較好玩的類,顧名思義,就是阻塞隊列。該類主要提供了兩個方法put()和take(),前者将一個對象放到隊列中,如果隊列已經滿了,就等待直到有空閑節點;後者從head取一個對象,如果沒有對象,就等待直到有可取的對象。

下面的例子比較簡單,一個讀線程,用于将要處理的檔案對象添加到阻塞隊列中,另外四個寫線程用于取出檔案對象,為了模拟寫操作耗時長的特點,特讓線程睡眠一段随機長度的時間。另外,該Demo也使用到了線程池和原子整型(AtomicInteger),AtomicInteger可以在并發情況下達到原子化更新,避免使用了synchronized,而且性能非常高。由于阻塞隊列的put和take操作會阻塞,為了使線程退出,在隊列中添加了一個“辨別”,算法中也叫“哨兵”,當發現這個哨兵後,寫線程就退出。

import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class Test {

	static long randomTime() {
		return (long) (Math.random() * 1000);
	}

	public static void main(String[] args) {
		// 能容納100個檔案
		final BlockingQueue<File> queue = new LinkedBlockingQueue<File>(100);
		// 線程池
		final ExecutorService exec = Executors.newFixedThreadPool(5);
		final File root = new File("D:\\dist\\blank");
		// 完成标志
		final File exitFile = new File("");
		// 讀個數
		final AtomicInteger rc = new AtomicInteger();
		// 寫個數
		final AtomicInteger wc = new AtomicInteger();
		// 讀線程
		Runnable read = new Runnable() {
			public void run() {
				scanFile(root);
				scanFile(exitFile);
			}

			public void scanFile(File file) {
				if (file.isDirectory()) {
					File[] files = file.listFiles(new FileFilter() {
						public boolean accept(File pathname) {
							return pathname.isDirectory() || pathname.getPath().endsWith(".log");
						}
					});
					for (File one : files)
						scanFile(one);
				} else {
					try {
						int index = rc.incrementAndGet();
						System.out.println("Read0: " + index + " " + file.getPath());
						queue.put(file);
					} catch (InterruptedException e) {
					}
				}
			}
		};
		exec.submit(read);
		// 四個寫線程
		for (int index = 0; index < 4; index++) {
			// write thread
			final int num = index;
			Runnable write = new Runnable() {
				String threadName = "Write" + num;

				public void run() {
					while (true) {
						try {
							Thread.sleep(randomTime());
							int index = wc.incrementAndGet();
							File file = queue.take();
							// 隊列已經無對象
							if (file == exitFile) {
								// 再次添加"标志",以讓其他線程正常退出
								queue.put(exitFile);
								break;
							}
							System.out.println(threadName + ": " + index + " " + file.getPath());
						} catch (InterruptedException e) {
						}
					}
				}

			};
			exec.submit(write);
		}
		exec.shutdown();
	}

}
           

CountDownLatch

從名字可以看出,CountDownLatch是一個倒數計數的鎖,當倒數到0時觸發事件,也就是開鎖,其他人就可以進入了。在一些應用場合中,需要等待某個條件達到要求後才能做後面的事情;同時當線程都完成後也會觸發事件,以便進行後面的操作。 

CountDownLatch最重要的方法是countDown()和await(),前者主要是倒數一次,後者是等待倒數到0,如果沒有到達0,就隻有阻塞等待了。

一個CountDouwnLatch執行個體是不能重複使用的,也就是說它是一次性的,鎖一經被打開就不能再關閉使用了,如果想重複使用,請考慮使用CyclicBarrier。

下面的例子簡單的說明了CountDownLatch的使用方法,模拟了100米賽跑,10名選手已經準備就緒,隻等裁判一聲令下。當所有人都到達終點時,比賽結束。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Test {

	public static void main(String[] args) throws InterruptedException {

		// 開始的倒數鎖
		final CountDownLatch begin = new CountDownLatch(1);

		// 結束的倒數鎖
		final CountDownLatch end = new CountDownLatch(10);

		// 十名選手
		final ExecutorService exec = Executors.newFixedThreadPool(10);

		for (int index = 0; index < 10; index++) {
			final int NO = index + 1;
			Runnable run = new Runnable() {
				public void run() {
					try {
						begin.await();
						Thread.sleep((long) (Math.random() * 10000));
						System.out.println("No." + NO + " arrived");
					} catch (InterruptedException e) {
					} finally {
						end.countDown();
					}
				}
			};
			exec.submit(run);
		}
		System.out.println("Game Start");
		begin.countDown();
		end.await();
		System.out.println("Game Over");
		exec.shutdown();
	}

}
           

使用Callable和Future實作線程等待和多線程傳回值

   假設在main線程啟動一個線程,然後main線程需要等待子線程結束後,再繼續下面的操作,我們會通過join方法阻塞main線程,代碼如下:  

Runnable runnable = ...;
    Thread t = new Thread(runnable);
    t.start();
    t.join();
    ......

           

 通過JDK1.5線程池管理的線程可以使用Callable和Future實作(join()方法無法應用到線上程池線程)  

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Test {

	public static void main(String[] args) throws InterruptedException, ExecutionException {
		System.out.println("start main thread");
		final ExecutorService exec = Executors.newFixedThreadPool(5);
		
		Callable<String> call = new Callable<String>() {
			public String call() throws Exception {
				System.out.println("  start new thread.");
				Thread.sleep(1000 * 5);
				System.out.println("  end new thread.");
				return "some value.";
			}
		};
		Future<String> task = exec.submit(call);
		Thread.sleep(1000 * 2);
		task.get(); // 阻塞,并待子線程結束,
		exec.shutdown();
		exec.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
		System.out.println("end main thread");
	}

}
           
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
* 多線程傳回值測試
*/
public class ThreadTest {

	public static void main(String[] args) throws InterruptedException, ExecutionException {
		System.out.println("start main thread");
		int threadCount = 5;
		final ExecutorService exec = Executors.newFixedThreadPool(threadCount);

		List<Future<Integer>> tasks = new ArrayList<Future<Integer>>();
		for (int i = 0; i < threadCount; i++) {
			Callable<Integer> call = new Callable<Integer>() {
				public Integer call() throws Exception {
					Thread.sleep(1000);
					return 1;
				}
			};
			tasks.add(exec.submit(call));
		}
		long total = 0;
		for (Future<Integer> future : tasks) {
			total += future.get();
		}
		exec.shutdown();
		System.out.println("total: " + total);
		System.out.println("end main thread");
	}
}
           

CompletionService

這個東西的使用上很類似上面的example,不同的是,它會首先取完成任務的線程。下面的參考文章裡,專門提到這個,大家有興趣可以看下,例子:

import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Test {
	public static void main(String[] args) throws InterruptedException,
	ExecutionException {
		ExecutorService exec = Executors.newFixedThreadPool(10);
		CompletionService<String> serv =
		new ExecutorCompletionService<String>(exec);
		for (int index = 0; index < 5; index++) {
			final int NO = index;
			Callable<String> downImg = new Callable<String>() {
				public String call() throws Exception {
					Thread.sleep((long) (Math.random() * 10000));
					return "Downloaded Image " + NO;
				}
			};
			serv.submit(downImg);
		}
		Thread.sleep(1000 * 2);
		System.out.println("Show web content");
		for (int index = 0; index < 5; index++) {
			Future<String> task = serv.take();
			String img = task.get();
			System.out.println(img);
		}
		System.out.println("End");
		// 關閉線程池
		exec.shutdown();
	}
}
           

Semaphore信号量

拿到信号量的線程可以進入代碼,否則就等待。通過acquire()和release()擷取和釋放通路許可。下面的例子隻允許5個線程同時進入執行acquire()和release()之間的代碼

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class Test {

	public static void main(String[] args) {
		// 線程池
		ExecutorService exec = Executors.newCachedThreadPool();
		// 隻能5個線程同時通路
		final Semaphore semp = new Semaphore(5);
		// 模拟20個用戶端通路
		for (int index = 0; index < 20; index++) {
			final int NO = index;
			Runnable run = new Runnable() {
				public void run() {
					try {
						// 擷取許可
						semp.acquire();
						System.out.println("Accessing: " + NO);
						Thread.sleep((long) (Math.random() * 10000));
						// 通路完後,釋放
						semp.release();
					} catch (InterruptedException e) {
					}
				}
			};
			exec.execute(run);
		}
		// 退出線程池
		exec.shutdown();
	}

}
           

參考:

jdk1.5中的線程池使用簡介

http://www.java3z.com/cwbwebhome/article/article2/2875.html

CAS原理

http://www.blogjava.net/syniii/archive/2010/11/18/338387.html?opt=admin

jdk1.5中java.util.concurrent包編寫多線程

http://hi.baidu.com/luotoo/blog/item/b895c3c2d650591e0ef47731.html

ExecutorSerive vs CompletionService

http://www.coderanch.com/t/491704/threads/java/ExecutorSerive-vs-CompletionService

-- end --