天天看點

Condition與Lock配合完成 java 語言級别的 等待通知機制

作者:夢幻随風的網際網路筆記

java并發程式設計

Condition與Lock配合完成 java 語言級别的 等待通知機制

任何一個java對象都繼承于Object類,線上程間實作通信的往往會用到Object的幾個方法,如:wait(),wait(long timeout),wait(long timeout, int nanos)與notify(),notifyAll()幾個方法

實作等待/通知機制

Object的wait和notify/notify是與對象螢幕配合完成線程間的等待/通知機制,

而Condition與Lock配合完成等待通知機制,前者是java底層級别的,後者是語言級别的,具有更高的可控制性和擴充性。

Condition:

await() 線程進入等待狀态

signal() 喚醒一個等待在condition上的線程

下面是 Condition 基于非公平鎖的獨占鎖實作 代碼示例:

import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;
import java.util.Date;
import java.util.concurrent.locks.Condition;
public class SimplifyReentrantLockDemo {
    
   
   /* 是基于非公平鎖的獨占鎖實作。
    * 在獲得同步鎖時,同步器維護一個同步隊列,擷取狀态失敗的線程都會被加入到隊列中并在隊列中進行自旋;
    * 移出隊列(或停止自旋)的條件是前驅節點為頭節點且成功擷取了同步狀态。
    * 在釋放同步狀态時,同步器調用tryRelease(int arg)方法釋放同步狀态,然後喚醒頭節點的後繼節點。*/
   
       public static void main(String[] args) {
          
          
          final SimplifyReentrantLock lock = new SimplifyReentrantLock();
          
          //一個AQS中可以有多個條件隊列,但是隻有一個同步隊列。
          
          //ConditionObject是實作條件隊列的關鍵,
         // 每個ConditionObject對象都維護一個單獨的條件等待對列
          //每個ConditionObject對應一個條件隊列,它記錄該隊列的頭節點和尾節點。
          //條件隊列是單向的,而同步隊列是雙向的,會有前驅指針。
           final Condition condition = lock.newCondition();
           
           new Thread(new Runnable() {
            public void run() {
               lock.lock();
                  try {
                      System.out.println("A-----------進入等待!"+ DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MS_FORMAT));
                  /*
                  condition.await()方法後會使得目前擷取lock的線程進入到等待隊列,
                  如果該線程能夠從await()方法傳回的話一定是該線程擷取了與condition相關聯的lock

                  目前線程調用condition.await()方法後,會使得目前線程釋放lock然後加入到等待隊列中,
                  直至被signal/signalAll後會使得目前線程從等待隊列中移至到同步隊列中去,直到獲得了lock後才會從await方法傳回,或者在等待時被中斷會做中斷處理。
                  */
                      condition.await(); // 釋放鎖
                      System.out.println("A-----------接收到B的通知!繼續執行!"+ DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MS_FORMAT));
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  lock.unlock();
            }
         }).start();




/*
         void signal():喚醒一個等待在condition上的線程,将該線程從等待隊列中轉移到同步隊列中,如果在同步隊列中能夠競争到Lock則可以從等待方法中傳回。
         void signalAll():與1的差別在于能夠喚醒所有等待在condition上的線程*/
           new Thread(new Runnable() {
            public void run() {
               try {
                      System.out.println("B-----------模拟3秒後發送通知過!"+ DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MS_FORMAT));
                      Thread.sleep(3000);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  lock.lock();
                  System.out.println("B----------發送通知!"+ DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MS_FORMAT));
               //signal():喚醒一個等待在condition上的線程,将該線程從等待隊列中轉移到同步隊列中,如果在同步隊列中能夠競争到Lock則可以從等待方法中傳回。
                  condition.signal();
                  lock.unlock();
            }
         }).start();
       }


}
           
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class SimplifyReentrantLock implements Lock {
    //AQS子類的對象   互斥鎖用它來工作
    private final Sync sync = new Sync();

    /**
     * AQS的子類Sync
     * AQS全稱為AbstractQueuedSynchronizer
     * AQS是一個抽象類,主要是通過繼承的方式來使用,它本身沒有實作任何的同步接口,僅僅是定義了同步狀态的擷取以及釋放的方法來提供自定義的同步元件。
     *
     * 一個重要的卻别就是沒有ReentrantLock中的NonfairSync和FairSync
     */
    private static class Sync extends AbstractQueuedSynchronizer {

        @Override
        protected boolean isHeldExclusively() {
            //是否處于占用狀态
            return getState() == 1;
        }

        @Override
        protected boolean tryAcquire(int arg) {
            //當狀态為0是擷取鎖
           //這裡做一下判斷,如果state的值為等于0,立馬将state設定為1
           //通過cas操作來修改state狀态,表示争搶鎖的操作
           int state = this.getState();
           /*
            * compareAndSetState(0, 1)
            * 
           這段代碼其實邏輯很簡單,就是通過cas樂觀鎖的方式來做比較并替換。
           上面這段代碼的意思是,如果目前記憶體中的state的值和預期值expect相等,則替換為update。
           更新成功傳回true,否則傳回false.
           這個操作是原子的,不會出現線程安全問題,這裡面涉及到Unsafe這個類的操作,*/
           if(state == 0) {
              if (compareAndSetState(0, arg)) {
                   //若直接修改成功了,則将占用鎖的線程設定為目前線程
                   
                   //exclusiveOwnerThread屬性是AQS從父類AbstractOwnableSynchronizer中繼承的屬性,用來儲存目前占用同步狀态的線程。
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
           }else { //如果是第二個線程進來
            Thread currentThread = Thread.currentThread();//目前進來的線程
            Thread ownerThread = this.getExclusiveOwnerThread();//已經儲存進去的獨占式線程
            if(currentThread == ownerThread) { //判斷一下進來的線程和儲存進去的線程是同一線程麼?如果是,則擷取鎖成功,如果不是則擷取鎖失敗
               
               //state屬性的總結:
               //1. 因為存在多線程競争的情形,使用CAS設定值
               //2. state初始值為0,線程加一次鎖,state加1,獲得鎖的線程再次加鎖,state值再次加1。是以state表示已獲得鎖線程進行lock操作的次數
               //3. 是volatile修飾的變量,線程直接從主存中讀
               
               this.setState(state+arg); //設定state狀态
               return true;
            }
         }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
           //鎖的擷取和鎖的釋放是一一對應的,擷取過多少次鎖就釋放多少次鎖
         if(Thread.currentThread() != this.getExclusiveOwnerThread()) {
            //如果釋放鎖的不是目前線程,則抛出異常
            throw new RuntimeException();
         }
            //釋放鎖,将狀态設定為0
            if (getState() == 0) {
               //如果state狀态為0,則說明鎖還沒有被占用
                throw new IllegalMonitorStateException();
            }
            int state = this.getState()-arg;
         //接下來判斷state是否已經歸零,隻有state歸零的時候才真正的釋放鎖
         if(state == 0) {
            //state已經歸零,做掃尾工作
            this.setState(0);
            this.setExclusiveOwnerThread(null);
            return true;
         }
         this.setState(state);
         return false;
        }

        Condition newCondition() {
            return new ConditionObject();
        }

    }

    @Override
    public void lock() { 
       //if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
       sync.acquire(1); 
    }

    @Override
    public void unlock() { 
       sync.release(1); 
    }

    @Override
    public Condition newCondition() { 
       return sync.newCondition(); 
    }

    @Override
    public boolean tryLock() { 
       return sync.tryAcquire(1); 
    }

    @Override
    public void lockInterruptibly() throws InterruptedException { }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }
}
           
Condition與Lock配合完成 java 語言級别的 等待通知機制