天天看點

多線程并發工具類

1.等待多線程完成的CountDownLatch

    CountDownLatch允許一個或多個線程等待其他線程完成操作。即他可以實作與join()方法相同的功能,而且比join的功能更多。可以在初始化CountDownLatch時傳入一個int參數來設定初始計數值,任何在CountDownLatch對象上調用wait()的方法都将被阻塞,直到這個CountDownLatch對象的計數值為0。CountDownLatch被設計為隻能觸發一次,計數值不能被重置。

    當我們調用CountDownLatch的countDown方法時,計數值N就會減1,CountDownLatch的await方法 會阻塞目前線程,直到N變成零。由于countDown方法可以用在任何地方,是以這裡說的N個 點,可以是N個線程,也可以是1個線程裡的N個執行步驟。用在多個線程時,隻需要把這個 CountDownLatch的引用傳遞到線程裡即可。

    注意:計數器必須大于等于0,隻是等于0時候,計數器就是零,則此時調用await方法時不會阻塞目前線程。CountDownLatch不可能重新初始化或者修改CountDownLatch對象的内部計數器的值。一個線程調用countDown方法發生之前,另外一個線程調用await方法。一個線程調用countDown方法并不會被阻塞,隻有調用await()方法的線程才會被阻塞。

public class TestCountDownLatch {
	static CountDownLatch c=new CountDownLatch(8);
	public static void main(String[] args) throws InterruptedException {
		for(int i=1;i<=8;i++){
			Thread t=new Thread(new Runnable() {
				@Override
				public void run() {
					try {
						Thread.sleep(1000);
						System.out.println(Thread.currentThread().getName()+"完成");
						c.countDown();//計數器減1
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
			});
			t.start();
		}
		c.await();//主線程會在此處阻塞,直到CountDownLatch的計數器為0才會恢複
		System.out.println("完成所有準備任務");
		System.out.println("主程式開始執行");
	}
}
           

2.同步屏障CyclicBarrier

    CyclicBarrier的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續運作。

    CyclicBarrier預設的構造方法是CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每個線程調用await方法告訴CyclicBarrier我已經到達了屏障,然後目前線程被阻塞。

public class TestCyclicBarrier {
	static CyclicBarrier c=new CyclicBarrier(2);
	public static void main(String[] args) {
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					c.await();//在此處阻塞,等待剩餘的所有任務進入阻塞(到達屏障)即可解除阻塞
				} catch (Exception e) {
				}
				System.out.println(1);
			}
		}).start();
		try {
			c.await();//在此處阻塞,等待剩餘的所有任務進入阻塞(到達屏障)即可解除阻塞
		} catch (Exception e) {
			
		}
		System.out.println(2);
	}

}
           

    注意:如果把new CyclicBarrier(2)修改成new CyclicBarrier(3),則主線程和子線程會永遠等待, 因為沒有第三個線程執行await方法,即沒有第三個線程到達屏障,是以之前到達屏障的兩個線程都不會繼續執行。

    CyclicBarrier還提供一個更進階的構造函數CyclicBarrier(int parties,Runnable barrierAction),用于線上程到達屏障時,保證會優先執行barrierAction,友善處理更複雜的業務場景。

public class TestCyclicBarrier1 {
	static CyclicBarrier c = new CyclicBarrier(2, new A());
	public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
		new Thread(new Runnable() {
			@Override
			public void run() {
				// TODO Auto-generated method stub
				try {
					c.await();
					System.out.println(2);
				} catch (Exception e) {
					// TODO: handle exception
				}
			}
		}).start();
		c.await();
		System.out.println(1);
	}
	static class A implements Runnable {
		@Override
		public void run() {
			System.out.println(3);
		}
	}
}
/*
 * 輸出結果:
 * 3
 * 2
 * 1
 */           

    因為CyclicBarrier設定了攔截線程的數量是2,是以必須等代碼中的第一個線程和線程A 都執行完之後,才會繼續執行主線程,是以輸出結果為3 2 1。那麼此時有一個問題,如果阻塞的線程數大于CyclicBarrier的計數器會怎樣?

public class TestCyclicBarrier {
	static CyclicBarrier c=new CyclicBarrier(2);
	public static void main(String[] args) {
		for(int i=1;i<=3;i++){
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					c.await();//在此處阻塞,等待剩餘的所有任務進入阻塞(到達屏障)
				} catch (Exception e) {
				}
				System.out.println(Thread.currentThread().getName());
			}
		}).start();
		}
		try {
			c.await();//在此處阻塞,等待剩餘的所有任務進入阻塞(到達屏障)
		} catch (Exception e) {
			
		}
		System.out.println(2);
	}

}
           

根據結果可以知道,CyclicBarrier可以自動重置計數器數量,當攔截線程數量為2時會把從阻塞隊列中任意取出兩個解除阻塞并執行,如果還有剩餘的阻塞隊列則會重置計數器,如果剩餘阻塞隊列數量小于計數器則會阻塞運作,也就是說,如果有阻塞隊列數X與計數器N,X%N==0,那麼所有線程都會執行,如果X%N!=0,那麼會有部分線程處于阻塞狀态無法執行。也可以手動調用 reset()方法來進行重置計數器。

    CyclicBarrier還提供其他有用的方法,比如getNumberWaiting方法可以獲得Cyclic-Barrier 阻塞的線程數量。isBroken()方法用來了解阻塞的線程是否被中斷。

public class TestCyclicBarrier {
	static CyclicBarrier c=new CyclicBarrier(2);
	public static void main(String[] args) {
		Thread t=new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					c.await();//在此處阻塞,等待剩餘的所有任務進入阻塞(到達屏障)
				} catch (Exception e) {
				}
				System.out.println(Thread.currentThread().getName());
			}
		});
		t.start();
		t.interrupt();
		try {
			c.await();//在此處阻塞,等待剩餘的所有任務進入阻塞(到達屏障)
			System.out.println(2);
		} catch (BrokenBarrierException|InterruptedException e) {
			System.out.println(c.isBroken());
		}
		
	}

}
/*
true
*/           

3.控制并發線程數的Semaphore

    Semaphore(信号量)是用來控制同時通路特定資源的線程數量,它通過協調各個線程,以保證合理的使用公共資源。換句話說,鎖(Lock鎖或synchronized鎖)在任何時刻隻允許一個任務通路被加鎖的資源,而計數信号量允許n個任務同時通路這個資源,還可以将信号量看作是向外分發使用資源的“許可證”,盡管内部沒有這種許可證對象。

    “許可證”的數量是有限的,是以當執有“許可證”的線程數量與“許可證”數量相同時,就會阻止其他線程對共享資源的使用,如果某一個或多個線程使用完共享資源後,就會歸還“許可證”,此時Semaphore(信号量)就會将這些歸還的“許可證”再次分發給阻塞中的線程。通過這種方式就實作了控制線程并發數。

    3.1 API

  • Semaphore(int permits):構造器,接受一個整型的數字,表示可用的許可證數量。
  • acquire():線程調用該方法擷取一個許可證來擷取使用共享資源的資格。
  • release():線程使用完共享資源之後調用方法歸還許可證。
  • tryAcquire():線程調用該方法嘗試擷取許可證。
  • int availablePermits():傳回此信号量中目前可用的許可證數。
  • int getQueueLength():傳回正在等待擷取許可證的線程數。
  • boolean hasQueuedThreads():是否有線程正在等待擷取許可證。
  • void reducePermits(int reduction):減少reduction個許可證,是個protected方法。
  • Collection getQueuedThreads():傳回所有等待擷取許可證的線程集合,是個protected方 法。

    3.2 應用

    Semaphore可以用于做流量控制,特别是公用資源有限的應用場景,比如資料庫連接配接。假如有一個需求,要讀取幾萬個檔案的資料,因為都是IO密集型任務,我們可以啟動幾十個線程 并發地讀取,但是如果讀到記憶體後,還需要存儲到資料庫中,而資料庫的連接配接數隻有10個,這 時我們必須控制隻有10個線程同時擷取資料庫連接配接儲存資料,否則會報錯無法擷取資料庫連 接。這個時候,就可以使用Semaphore來做流量控制。

public class TestSemaphore {
	static Semaphore s=new Semaphore(10);
	static int threadNum=30;
	public static void main(String[] args) {
		for(int i=1;i<=30;i++){
			new Thread(new Runnable() {
				@Override
				public void run() {
					try {
						s.acquire();
						System.out.println(Thread.currentThread().getName()+"do some work");
						s.release();
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					
				}
			}).start();
		}
	}
}
           

4. 線程間交換資料的Exchanger

    Exchanger(交換者)是一個用于線程間協作的工具類。Exchanger用于進行線程間的資料交換。它提供一個同步點,在這個同步點,兩個線程可以交換彼此的資料。這兩個線程通過exchange方法交換資料,如果第一個線程先執行exchange()方法,它會一直等待第二個線程也 執行exchange方法,當兩個線程都到達同步點時,這兩個線程就可以交換資料,将本線程生産出來的資料傳遞給對方。是以由此可見,Exchanger将會與 生産者-消費者模型相關。

    其應用場景有:Exchanger可以用于遺傳算法,遺傳算法裡需要選出兩個人作為交配對象,這時候會交換 兩人的資料,并使用交叉規則得出2個交配結果。Exchanger也可以用于校對工作,比如我們需 要将紙制銀行流水通過人工的方式錄入成電子銀行流水,為了避免錯誤,采用AB崗兩人進行 錄入,錄入到Excel之後,系統需要加載這兩個Excel,并對兩個Excel資料進行校對,看看是否錄入一緻。如果兩個線程有一個沒有執行exchange()方法,則會一直等待,如果擔心有特殊情況發 生,避免一直等待,可以使用exchange(V x,longtimeout,TimeUnit unit)設定最大等待時長。

public class TestExchanger {
	static Exchanger<String> ex=new Exchanger<String>();
	static ExecutorService service=Executors.newFixedThreadPool(2);
	public static void main(String[] args) {
		service.execute(new Runnable() {
			@Override
			public void run() {
				String a="A";
				try {
					ex.exchange(a);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		});
		service.execute(new Runnable() {
			@Override
			public void run() {
				String b="B";
				try {
					String a=ex.exchange(b);
					System.out.println("a錄入的是"+a);
					System.out.println("b錄入的是"+b);
					System.out.println("a與b是否一緻:"+a.equalsIgnoreCase(b));
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		});
		service.shutdown();
	}
}