一.CountDownLatch
案例:對賬系統的對賬功能,使用者下單之後生成訂單記錄,物流會生成派送訂單,防止少發或者是多發,定時排程任務,查詢未對賬的訂單和派送訂單進行對賬,對于存在差異的生成插入差異庫中。
初始思路:
while(存在未對賬訂單){
// 查詢未對賬訂單
pos = getPOrders();
// 查詢派送單
dos = getDOrders();
// 執行對賬操作
diff = check(pos, dos);
// 差異寫入差異庫
save(diff);
}
顯然上訴是一個串行的功能,提升性能,首先想到利用多線程,如何拆分呢?
第一想法拆出兩個線程單獨去做查詢功能?圖檔位址

while(存在未對賬訂單){
// 查詢未對賬訂單
Thread T1 = new Thread(()->{
pos = getPOrders();
});
T1.start();
// 查詢派送單
Thread T2 = new Thread(()->{
dos = getDOrders();
});
T2.start();
// 等待T1、T2結束
T1.join();
T2.join();
// 執行對賬操作
diff = check(pos, dos);
// 差異寫入差異庫
save(diff);
}
上訴代碼存在問題,每次都需要建立線程和銷毀線程,浪費性能,使用線程池,并且使用CountDownLatch協調兩個線程的步調一緻性。
優化後:
// 建立2個線程的線程池
Executor executor = Executors.newFixedThreadPool(2);
while(存在未對賬訂單){
// 計數器初始化為2
CountDownLatch latch = new CountDownLatch(2);
// 查詢未對賬訂單
executor.execute(()-> {
pos = getPOrders();
latch.countDown();
});
// 查詢派送單
executor.execute(()-> {
dos = getDOrders();
latch.countDown();
});
latch.await();
// 執行對賬操作
diff = check(pos, dos);
// 差異寫入差異庫
save(diff);
}
二.CyclicBarrier
可以了解為循環栅欄,相對于CountDownLatch來說,明顯優勢就是可以循環使用,并且在滿足栅欄的條件的時候支援同步執行回調函數。
在對賬的案例中,我們可以進一步優化,将對賬和儲存的功能也單獨的起一個線程,這樣在對賬的時候,也不會影響查詢待對賬訂單和查詢派送訂單。
查詢待對賬、派送訂單為生産者,對賬以及儲存作為消費者—生産者消費者模型
// 訂單隊列
Vector<P> pos;
// 派送單隊列
Vector<D> dos;
// 執行回調的線程池
Executor executor =
Executors.newFixedThreadPool(1);
final CyclicBarrier barrier =
new CyclicBarrier(2, ()->{
executor.execute(()->check());
});
void check(){
P p = pos.remove(0);
D d = dos.remove(0);
// 執行對賬操作
diff = check(p, d);
// 差異寫入差異庫
save(diff);
}
void checkAll(){
// 循環查詢訂單庫
Thread T1 = new Thread(()->{
while(存在未對賬訂單){
// 查詢訂單庫
pos.add(getPOrders());
// 等待
barrier.await();
}
});
T1.start();
// 循環查詢運單庫
Thread T2 = new Thread(()->{
while(存在未對賬訂單){
// 查詢運單庫
dos.add(getDOrders());
// 等待
barrier.await();
}
});
T2.start();
}
三.Phaser
Phaser翻譯為階段,也就是,我們可定義不同階段達到條件之後做什麼事情,更像是CyclicBarrier的加強版本。
而且Phaser比CyclicBarrier更加靈活,因為給出初始量之後,我們在每個階段都可以減少或者是新增線程數。
簡單介紹相關方法:想要使用好還是有難度的,以後有時間可以擴充
public class Test2 {
static Random r = new Random();
static MyPhaser phaser = new MyPhaser();
static void milliSleep(int milli){
try {
TimeUnit.MILLISECONDS.sleep(milli);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
phaser.bulkRegister(8);
for (int i = 0; i < 5; i++) {
new Thread(new Person("person"+i)).start();
}
new Thread(new Person("司儀")).start();
new Thread(new Person("新郎")).start();
new Thread(new Person("新娘")).start();
}
static class Person implements Runnable{
String name;
public Person(String name){
this.name = name;
}
public void arrive(){
Test2.milliSleep(r.nextInt(1000));
System.out.printf("%s到達現場 ",name);
phaser.arriveAndAwaitAdvance();
}
public void eat(){
Test2.milliSleep(r.nextInt(1000));
System.out.printf("%s吃完 ", name);
phaser.arriveAndAwaitAdvance();
}
public void leave(){
Test2.milliSleep(r.nextInt(1000));
System.out.printf("%s離開 ",name);
phaser.arriveAndAwaitAdvance();
}
public void hug(){
if(name.equals("新郎") || name.equals("新娘")){
Test2.milliSleep(r.nextInt(1000));
System.out.printf("%s洞房 ",name);
phaser.arriveAndAwaitAdvance();
}else{
phaser.arriveAndDeregister();//到達登出
}
}
@Override
public void run() {
arrive();
eat();
leave();
hug();
}
}
static class MyPhaser extends Phaser{
/**
*
* @param phase 目前階段
* @param registeredParties 完成目前階段的人數
* @return 是否結束流程
*/
@Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase){
case 0:
System.out.println("所有人都到齊了"+registeredParties);
return false;
case 1:
System.out.println("所有人都吃完了"+registeredParties);
return false;
case 2:
System.out.println("所有人都離開了"+registeredParties);
return false;
case 3:
System.out.println("婚禮介紹,新郎新娘入洞房"+registeredParties);
return true;
default:
return true;
}
}
}
}
輸出結果:
Phaser未在項目中應用,内部提供的功能很多的方法,感興趣的可以深入了解上訴隻是簡單介紹了,如何建立以及兩個方法arriveAndAwaitAdvance以及arriveAndDeregister方法。
四.Exchanger
兩個線程互相互動資料的。
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
new Thread(()->{
String s = "T1";
try {
s=exchanger.exchange(s);//線程阻塞的狀态
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" : "+s);
},"t1").start();
new Thread(()->{
String s = "T2";
try {
s=exchanger.exchange(s);//線程阻塞的狀态
// 也提供了,逾時的方法
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" : "+s);
},"t2").start();
}
總結
CountDownLatch:多線程之間同步的,不可複用。
CyclicBarrier:相對于CountDownLatch,可以複用,并且達到"栅欄"的限制條件,可以執行回調方法(同步的);
Phaser:相對于CyclicBarrier,也同樣可以複用,但是是分階段的,不同階段的條件值是可以變化的,并且在達到不同階段的時候可以執行不同的回調方法,功能更加多樣化。
Exchanger:兩個線程阻塞式的進行資源互動功能。