同步工具類,可以是任何一個對象,隻要它根據自身的狀态來協調線程的控制流,阻塞隊列可以作為同步工具類,其他類型的同步工具類還包括信号量,栅欄以及閉鎖.
所有的同步工具類都包含了一些特定的結構化屬性: 它們封裝了一些狀态,這些特定狀态将決定執行同步工具類的線程是繼續執行還是等待,此外還提供了一些方法對狀态進行操作,以及一些方法用于高效地等待同步工具類進入預期狀态.
一. 閉鎖
閉鎖是一種同步工具類,可以延遲線程的進度直到其到達終止狀态.
例如: A線程要執行,需要先執行B,C,D線程進行初始化完畢,閉鎖就是A線程的等待過程
閉鎖的常用場所:
1. 確定某個計算所需的資源初始化
2. 確定某個服務啟動所需的其他服務已啟動
3. 遊戲中用得比較多的是: 等待所有玩家都準備就緒,再執行遊戲線程.
CountDownLatch是一個常用的閉鎖工具類
示例:
這個代碼展示了閉鎖的兩大用處:
1. startGate保證了for循環中建立的線程同步執行
2. endGate保證了擷取所有線程執行完的準确性
import java.util.concurrent.CountDownLatch;
public class TestHarness {
public long timeTasks(int nThreads, final Runnable task){
final CountDownLatch startGate = new CountDownLatch();
final CountDownLatch endGate = new CountDownLatch(nThreads);
for(int i = ;i<nThreads;i++){
Thread t = new Thread(){
@Override
public void run() {
try {
startGate.await();
try{
task.run();
}finally{
endGate.countDown();
}
} catch (InterruptedException e) {}
}
};
t.start();
}
long start = System.nanoTime();
startGate.countDown();
try {
endGate.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
long end = System.nanoTime();
return end-start;
}
}
FutureTask做閉鎖
future.get()的行為取決于任務的狀态,如果任務完成,那麼get會立即放回結果.
如果沒有完成,那麼調用get()的所有線程,都将要阻塞到任務進入完成标志.
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class Preloader {
private final FutureTask<ProductInfo> future =
new FutureTask<ProductInfo> (new Callable<ProductInfo>(){
public ProductInfo call() throws Exception {
return new ProductInfo();
}
});
private final Thread thread = new Thread(future);
public void start(){
thread.start();
}
public ProductInfo get() throws InterruptedException, ExecutionException{
return future.get();
}
class ProductInfo{}
}
二. 信号量
計數信号量,用來控制同時通路某個特定資源的操作數量,或者同時執行某個指定的操作數量,計數信号量還可以用來實作某種資源池,或者對容器施加邊界.
使用Semaphore為容器設定邊界
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Semaphore;
public class BoundedHashSet<T> {
private final Set<T> set;
private final Semaphore sem;
public BoundedHashSet(int bound){
this.set = Collections.synchronizedSet(new HashSet<T>());
sem = new Semaphore(bound);
}
public boolean add(T o) throws InterruptedException{
sem.acquire();
boolean wasAdded = false;
try{
wasAdded = set.add(o);
return wasAdded;
}finally{
if(!wasAdded){
sem.release();
}
}
}
public boolean remove(Object o){
boolean wasRemove = set.remove(o);
if(wasRemove)
sem.release();
return wasRemove;
}
}
三. 栅欄
栅欄類似于閉鎖, 它能阻塞一組線程直到某個事件的發生,與閉鎖的關鍵差別在于: 閉鎖用于等待事件,而栅欄用于等待其他線程.
CyclicBarrier協調細胞自動衍生系統中的計算
為每個元素配置設定一個獨立的線程是不現實的,因為這将産生過多的線程,而在協調這些線程上導緻的開銷将降低計算性能.
合理的做法是: 将問題分解為一定數量的子問題,為每個子問題配置設定一個線程來求解.
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CellularAutomato {
private final Board mainBoard;
private final CyclicBarrier barrier;
private final Worker[] workers;
public CellularAutomato(Board board){
this.mainBoard = board;
int count = Runtime.getRuntime().availableProcessors();
this.barrier = new CyclicBarrier(count,new Runnable(){
@Override
public void run() {
mainBoard.commitNewValues();
}
});
this.workers = new Worker[count];
for(int i = ;i<count;i++){
workers[i] = new Worker(mainBoard.getSubBoard(count,i));
}
}
public void start(){
for(int i = ;i<workers.length;i++){
new Thread(workers[i]).start();
}
mainBoard.waitForCovergence();
}
private class Worker implements Runnable{
private final Board board;
public Worker(Board board){
this.board = board;
}
@Override
public void run() {
while(!board.hasConverged()){
for(int x = ; x<board.getMaxX();x++){
for(int y = ;y<board.getMaxY();y++){
board.setNewValue(x,y,computeValue(x,y));
}
}
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
public Object computeValue(int x, int y) {
// TODO Auto-generated method stub
return null;
}
}
class Board{
public boolean hasConverged() {
// TODO Auto-generated method stub
return false;
}
public void waitForCovergence() {
// TODO Auto-generated method stub
}
public Board getSubBoard(int count, int i) {
// TODO Auto-generated method stub
return null;
}
public void commitNewValues() {
// TODO Auto-generated method stub
}
public void setNewValue(int x, int y, Object computeValue) {
// TODO Auto-generated method stub
}
public int getMaxY() {
// TODO Auto-generated method stub
return ;
}
public int getMaxX() {
// TODO Auto-generated method stub
return ;
}
}
Exchanger 兩方栅欄
線程各方在栅欄位置上交換資料,當兩方執行不對稱操作時,Exchanger會非常有用.
例如: 當一個線程向緩沖區寫入資料,另一個線程從緩沖區讀取資料,這些線程可以使用Exchanger來彙合,并将滿的緩沖區與空的緩沖區交換.當兩個線程通過Exchanger交換對象的時候,這種交換就把這兩個對象安全的釋出給另一方.