天天看點

并發筆記(六)多線程步調一緻,CountDownLatch、CyclicBarrier、Phaser、Exchanger

一.CountDownLatch

案例:對賬系統的對賬功能,使用者下單之後生成訂單記錄,物流會生成派送訂單,防止少發或者是多發,定時排程任務,查詢未對賬的訂單和派送訂單進行對賬,對于存在差異的生成插入差異庫中。

初始思路:

while(存在未對賬訂單){
  // 查詢未對賬訂單
  pos = getPOrders();
  // 查詢派送單
  dos = getDOrders();
  // 執行對賬操作
  diff = check(pos, dos);
  // 差異寫入差異庫
  save(diff);
} 
           

顯然上訴是一個串行的功能,提升性能,首先想到利用多線程,如何拆分呢?

第一想法拆出兩個線程單獨去做查詢功能?圖檔位址

并發筆記(六)多線程步調一緻,CountDownLatch、CyclicBarrier、Phaser、Exchanger
while(存在未對賬訂單){
  // 查詢未對賬訂單
  Thread T1 = new Thread(()->{
    pos = getPOrders();
  });
  T1.start();
  // 查詢派送單
  Thread T2 = new Thread(()->{
    dos = getDOrders();
  });
  T2.start();
  // 等待T1、T2結束
  T1.join();
  T2.join();
  // 執行對賬操作
  diff = check(pos, dos);
  // 差異寫入差異庫
  save(diff);
} 
           

上訴代碼存在問題,每次都需要建立線程和銷毀線程,浪費性能,使用線程池,并且使用CountDownLatch協調兩個線程的步調一緻性。

優化後:

// 建立2個線程的線程池
Executor executor = Executors.newFixedThreadPool(2);
while(存在未對賬訂單){
	// 計數器初始化為2 
	CountDownLatch latch = new CountDownLatch(2);
	// 查詢未對賬訂單
	executor.execute(()-> {
	  pos = getPOrders();
	  latch.countDown();
	});
	// 查詢派送單
	executor.execute(()-> {
	  dos = getDOrders();
	  latch.countDown();
	});
	latch.await();
	// 執行對賬操作
	diff = check(pos, dos);
	// 差異寫入差異庫
	save(diff);
}   
           

二.CyclicBarrier

可以了解為循環栅欄,相對于CountDownLatch來說,明顯優勢就是可以循環使用,并且在滿足栅欄的條件的時候支援同步執行回調函數。

在對賬的案例中,我們可以進一步優化,将對賬和儲存的功能也單獨的起一個線程,這樣在對賬的時候,也不會影響查詢待對賬訂單和查詢派送訂單。

查詢待對賬、派送訂單為生産者,對賬以及儲存作為消費者—生産者消費者模型

// 訂單隊列
Vector<P> pos;
// 派送單隊列
Vector<D> dos;
// 執行回調的線程池 
Executor executor = 
  Executors.newFixedThreadPool(1);
final CyclicBarrier barrier =
  new CyclicBarrier(2, ()->{
    executor.execute(()->check());
  });
  
void check(){
  P p = pos.remove(0);
  D d = dos.remove(0);
  // 執行對賬操作
  diff = check(p, d);
  // 差異寫入差異庫
  save(diff);
}
  
void checkAll(){
  // 循環查詢訂單庫
  Thread T1 = new Thread(()->{
    while(存在未對賬訂單){
      // 查詢訂單庫
      pos.add(getPOrders());
      // 等待
      barrier.await();
    }
  });
  T1.start();  
  // 循環查詢運單庫
  Thread T2 = new Thread(()->{
    while(存在未對賬訂單){
      // 查詢運單庫
      dos.add(getDOrders());
      // 等待
      barrier.await();
    }
  });
  T2.start();
}
           

三.Phaser

Phaser翻譯為階段,也就是,我們可定義不同階段達到條件之後做什麼事情,更像是CyclicBarrier的加強版本。

而且Phaser比CyclicBarrier更加靈活,因為給出初始量之後,我們在每個階段都可以減少或者是新增線程數。

簡單介紹相關方法:想要使用好還是有難度的,以後有時間可以擴充

public class Test2 {
    static Random r = new Random();
    static MyPhaser phaser = new MyPhaser();
    static void milliSleep(int milli){
        try {
            TimeUnit.MILLISECONDS.sleep(milli);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) throws InterruptedException {
        phaser.bulkRegister(8);
        for (int i = 0; i < 5; i++) {
            new Thread(new Person("person"+i)).start();
        }
        new Thread(new Person("司儀")).start();
        new Thread(new Person("新郎")).start();
        new Thread(new Person("新娘")).start();

    }
    static class Person implements Runnable{
        String name;
        public Person(String name){
            this.name = name;
        }
        public void arrive(){
            Test2.milliSleep(r.nextInt(1000));
            System.out.printf("%s到達現場   ",name);
            phaser.arriveAndAwaitAdvance();
        }
        public void eat(){
            Test2.milliSleep(r.nextInt(1000));
            System.out.printf("%s吃完   ", name);
            phaser.arriveAndAwaitAdvance();
        }
        public void leave(){
            Test2.milliSleep(r.nextInt(1000));
            System.out.printf("%s離開  ",name);
            phaser.arriveAndAwaitAdvance();
        }
        public void hug(){
            if(name.equals("新郎") || name.equals("新娘")){
                Test2.milliSleep(r.nextInt(1000));
                System.out.printf("%s洞房  ",name);
                phaser.arriveAndAwaitAdvance();
            }else{
                phaser.arriveAndDeregister();//到達登出
            }

        }
        @Override
        public void run() {
            arrive();
            eat();
            leave();
            hug();
        }
    }
    static class MyPhaser extends Phaser{
        /**
         *
         * @param phase 目前階段
         * @param registeredParties 完成目前階段的人數
         * @return 是否結束流程
         */
        @Override
        protected boolean onAdvance(int phase, int registeredParties) {
            switch (phase){
                case 0:
                    System.out.println("所有人都到齊了"+registeredParties);
                    return false;
                case 1:
                    System.out.println("所有人都吃完了"+registeredParties);
                    return false;
                case 2:
                    System.out.println("所有人都離開了"+registeredParties);
                    return false;
                case 3:
                    System.out.println("婚禮介紹,新郎新娘入洞房"+registeredParties);
                    return true;
                default:
                    return true;
            }
        }
    }
}
           

輸出結果:

并發筆記(六)多線程步調一緻,CountDownLatch、CyclicBarrier、Phaser、Exchanger

Phaser未在項目中應用,内部提供的功能很多的方法,感興趣的可以深入了解上訴隻是簡單介紹了,如何建立以及兩個方法arriveAndAwaitAdvance以及arriveAndDeregister方法。

四.Exchanger

兩個線程互相互動資料的。

public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();
        new Thread(()->{
            String s = "T1";
            try {
                s=exchanger.exchange(s);//線程阻塞的狀态
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+" : "+s);
        },"t1").start();
        new Thread(()->{
            String s = "T2";
            try {
                s=exchanger.exchange(s);//線程阻塞的狀态
                // 也提供了,逾時的方法
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+" : "+s);
        },"t2").start();
    }
           

總結

CountDownLatch:多線程之間同步的,不可複用。

CyclicBarrier:相對于CountDownLatch,可以複用,并且達到"栅欄"的限制條件,可以執行回調方法(同步的);

Phaser:相對于CyclicBarrier,也同樣可以複用,但是是分階段的,不同階段的條件值是可以變化的,并且在達到不同階段的時候可以執行不同的回調方法,功能更加多樣化。

Exchanger:兩個線程阻塞式的進行資源互動功能。