天天看點

java多線程(九) 之 同步工具類

同步工具類,可以是任何一個對象,隻要它根據自身的狀态來協調線程的控制流,阻塞隊列可以作為同步工具類,其他類型的同步工具類還包括信号量,栅欄以及閉鎖.

所有的同步工具類都包含了一些特定的結構化屬性: 它們封裝了一些狀态,這些特定狀态将決定執行同步工具類的線程是繼續執行還是等待,此外還提供了一些方法對狀态進行操作,以及一些方法用于高效地等待同步工具類進入預期狀态.

一. 閉鎖

閉鎖是一種同步工具類,可以延遲線程的進度直到其到達終止狀态.

例如: A線程要執行,需要先執行B,C,D線程進行初始化完畢,閉鎖就是A線程的等待過程

閉鎖的常用場所:

1. 確定某個計算所需的資源初始化

2. 確定某個服務啟動所需的其他服務已啟動

3. 遊戲中用得比較多的是: 等待所有玩家都準備就緒,再執行遊戲線程.

CountDownLatch是一個常用的閉鎖工具類

示例:

這個代碼展示了閉鎖的兩大用處:

1. startGate保證了for循環中建立的線程同步執行

2. endGate保證了擷取所有線程執行完的準确性

import java.util.concurrent.CountDownLatch;

public class TestHarness {
    public long timeTasks(int nThreads, final Runnable task){
        final CountDownLatch startGate = new CountDownLatch();
        final CountDownLatch endGate = new CountDownLatch(nThreads);
        for(int i = ;i<nThreads;i++){
            Thread t = new Thread(){
                @Override
                public void run() {
                    try {
                        startGate.await();
                        try{
                            task.run();
                        }finally{
                            endGate.countDown();
                        }
                    } catch (InterruptedException e) {}
                }
            };
            t.start();
        }
        long start = System.nanoTime();
        startGate.countDown();
        try {
            endGate.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long end = System.nanoTime();
        return end-start;
    }
}
           

FutureTask做閉鎖

future.get()的行為取決于任務的狀态,如果任務完成,那麼get會立即放回結果.

如果沒有完成,那麼調用get()的所有線程,都将要阻塞到任務進入完成标志.

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class Preloader {
    private final FutureTask<ProductInfo> future = 
            new FutureTask<ProductInfo> (new Callable<ProductInfo>(){
                public ProductInfo call() throws Exception {
                    return new ProductInfo();
                }
            });

    private final Thread thread = new Thread(future);

    public void start(){
        thread.start();
    }

    public ProductInfo get() throws InterruptedException, ExecutionException{
        return future.get();
    }

    class ProductInfo{}
}
           

二. 信号量

計數信号量,用來控制同時通路某個特定資源的操作數量,或者同時執行某個指定的操作數量,計數信号量還可以用來實作某種資源池,或者對容器施加邊界.

使用Semaphore為容器設定邊界

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Semaphore;

public class BoundedHashSet<T> {
    private final Set<T> set;
    private final Semaphore sem;
    public BoundedHashSet(int bound){
        this.set = Collections.synchronizedSet(new HashSet<T>());
        sem = new Semaphore(bound);
    }

    public boolean add(T o) throws InterruptedException{
        sem.acquire();
        boolean wasAdded = false;
        try{
            wasAdded = set.add(o);
            return wasAdded;
        }finally{
            if(!wasAdded){
                sem.release();
            }
        }
    }
    public boolean remove(Object o){
        boolean wasRemove = set.remove(o);
        if(wasRemove)
            sem.release();
        return wasRemove;
    }
}
           

三. 栅欄

栅欄類似于閉鎖, 它能阻塞一組線程直到某個事件的發生,與閉鎖的關鍵差別在于: 閉鎖用于等待事件,而栅欄用于等待其他線程.

CyclicBarrier協調細胞自動衍生系統中的計算

為每個元素配置設定一個獨立的線程是不現實的,因為這将産生過多的線程,而在協調這些線程上導緻的開銷将降低計算性能.

合理的做法是: 将問題分解為一定數量的子問題,為每個子問題配置設定一個線程來求解.

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CellularAutomato {
    private final Board mainBoard;
    private final CyclicBarrier barrier;
    private final Worker[] workers;

    public CellularAutomato(Board board){
        this.mainBoard = board;
        int count = Runtime.getRuntime().availableProcessors();
        this.barrier = new CyclicBarrier(count,new Runnable(){
            @Override
            public void run() {
                mainBoard.commitNewValues();
            }

        });
        this.workers = new Worker[count];
        for(int i = ;i<count;i++){
            workers[i] = new Worker(mainBoard.getSubBoard(count,i));
        }
    }

    public void start(){
        for(int i = ;i<workers.length;i++){
            new Thread(workers[i]).start();
        }
        mainBoard.waitForCovergence();
    }

    private class Worker implements Runnable{
        private final Board board;

        public Worker(Board board){
            this.board = board;
        }

        @Override
        public void run() {
            while(!board.hasConverged()){
                for(int x = ; x<board.getMaxX();x++){
                    for(int y = ;y<board.getMaxY();y++){
                        board.setNewValue(x,y,computeValue(x,y));
                    }
                }
                try {
                    barrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    public Object computeValue(int x, int y) {
        // TODO Auto-generated method stub
        return null;
    }
}

class Board{

    public boolean hasConverged() {
        // TODO Auto-generated method stub
        return false;
    }

    public void waitForCovergence() {
        // TODO Auto-generated method stub

    }

    public Board getSubBoard(int count, int i) {
        // TODO Auto-generated method stub
        return null;
    }

    public void commitNewValues() {
        // TODO Auto-generated method stub

    }

    public void setNewValue(int x, int y, Object computeValue) {
        // TODO Auto-generated method stub

    }

    public int getMaxY() {
        // TODO Auto-generated method stub
        return ;
    }

    public int getMaxX() {
        // TODO Auto-generated method stub
        return ;
    }

}
           

Exchanger 兩方栅欄

線程各方在栅欄位置上交換資料,當兩方執行不對稱操作時,Exchanger會非常有用.

例如: 當一個線程向緩沖區寫入資料,另一個線程從緩沖區讀取資料,這些線程可以使用Exchanger來彙合,并将滿的緩沖區與空的緩沖區交換.當兩個線程通過Exchanger交換對象的時候,這種交換就把這兩個對象安全的釋出給另一方.

繼續閱讀