天天看點

【Java并發程式設計的藝術】【學習筆記】Java并發集合8、阻塞隊列

8、阻塞隊列

​ 阻塞隊列(BlockingQueue)是一個支援兩個附加操作的隊列。這兩個附加操作支援阻塞的插入和移除方法。

​ 1)支援阻塞的插入方法:意思是當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿。

​ 2)支援阻塞的移除方法:意思是在隊列為空時,擷取元素的線程會等待隊列變為非空。

​ 阻塞隊列常用于生産者和消費者的場景。

插入和移除操作的4中處理方式

方法/處理方法 抛出異常 傳回特殊值 一直阻塞 逾時退出
插入方法 add(e) offer(e) put(e) offer(e, time, unit)
移除方法 remove(e) poll() take() poll(time, unit)
檢查方法 element() peek() 不可用 不可用

8.1、ArrayBlockingQueue

​ 一個由數組結構組成的有限阻塞隊列。此隊列按照FIFO的原則對元素進行排序。

​ ArrayBlockingQueue有界且固定,在構造函數時确認大小,然後不支援改變。

​ 在多線程環境下不保證“公平性”。所謂的公平通路隊列,即先阻塞線程先通路隊列。非公平性是當隊列可用時,阻塞的線程都可以争奪隊列的通路資格。

​ 為了保證公平性,通常會降低吞吐量。建立公平的阻塞隊列

ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(,true);
           

​ 通路的公平性是使用重入鎖實作的。

public ArrayBlockingQueue(int capacity, boolean fair) {
  if (capacity <= )
    throw new IllegalArgumentException();
  this.items = new Object[capacity];
  lock = new ReentrantLock(fair);
  notEmpty = lock.newCondition();
  notFull =  lock.newCondition();
}
           

8.2、LinkedBlockingQueue

​ 一個由連結清單結構組成的有界阻塞隊列。此隊列的預設和最大長度為Integer.MAX_VALUE。此隊列按照先FIFO原則對元素進行排序。

8.3、PriorityBlockingQueue

​ 一個支援優先級排序的無界阻塞隊列。預設情況下元素采取自然順序升序排列。

​ 也可以自定義類實作compareTo()方法來指定元素排序規則,或者初始化PriorityBlockingQueue時,指定構造參數Comparartor來對元素進行排序。

​ 需要注意的是不能保證同優先級元素的排序。

​ PriorityBlockingQueue低層采用二叉堆和重入鎖來實作的。

二叉堆:

​ 二叉樹是一種特殊的堆,結果性而言就是完全二叉樹或者是近似完全二叉樹。

​ 最大堆:父節點的鍵值總是大于或等于任何一個子節點的鍵值。

​ 最小堆:父節點的鍵值總是小于或等于任何一個子節點的鍵值。

​ 二叉堆一般用數組表示,那麼其左孩子節點為:2 * n + 1,其右孩子節點為2 * (n+1),其父節點為(n-1)/2。

​ 因為是二叉堆,是以添加操作就是不斷“上冒”,而删除操作則是不斷“下掉”。

8.4、DelayQueue

​ 一個支援延時擷取元素的無界阻塞隊列。隊列使用PriorityQueue來實作。隊列中的元素必須實作Delayed接口,在建立元素時可以指定多久才能從隊列中擷取目前元素。隻有在延遲期滿時才能從隊列中提取元素。

​ DelayQueue應用場景:

緩存系統的設計:可以用DelayQueue保持緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中擷取元素時,表示緩存有效期到了。

定時任務排程:使用DelayQueue儲存當天将會執行的任何和執行時間,一旦從DelayQueue中擷取到任務就開始執行。

​ (1)如何實作Delayed接口:

DelayQueue隊列的元素必須實作Delayed接口。可以參考ScheduledThreadPoolExecutor裡ScheduledFutureTask類的實作。

第一步:在對象建立的時候,初始化基本資料。使用time記錄目前對象延遲到什麼時候可以使用,使用sequenceNumber來辨別元素在隊列中的先後順序。
private static final AtomicLong sequencer = new AtomicLong();

ScheduledFutureTask(Runnable r, V result, long ns, long period) {
  super(r, result);
  this.time = ns;
  this.period = period;
  this.sequenceNumber = sequencer.getAndIncrement();
}
           
第二步:實作getDelay方法,該方法傳回目前元素還需要延時多長時間,機關是納秒
public long getDelay(TimeUnit unit) {
  return unit.convert(time - now(), NANOSECONDS);
}
           
第三步:實作compareTo方法來指定元素的順序。
public int compareTo(Delayed other) {
  if (other == this) // compare zero if same object
    return ;
  if (other instanceof ScheduledFutureTask) {
    ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
    long diff = time - x.time;
    if (diff < )
      return -;
    else if (diff > )
      return ;
    else if (sequenceNumber < x.sequenceNumber)
      return -;
    else
      return ;
  }
  long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
  return (diff < ) ? - : (diff > ) ?  : ;
}
           

​ (2)如何實作延時阻塞隊列

​ 當消費者從隊列裡擷取元素時,如果元素沒有達到延時時間,就阻塞目前線程。

long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay <= )
  return q.poll();
else if (leader != null)
  available.await();
else {
  Thread thisThread = Thread.currentThread();
  leader = thisThread;
  try {
    available.awaitNanos(delay);
  } finally {
    if (leader == thisThread)
      leader = null;
  }
}
           

​ 代碼中變量leader是一個等待擷取隊列頭部元素的線程。如果leader不等于空,表示已經有線程在等待擷取隊列的頭元素。是以,使用await()方法讓目前線程等待信号。如果leader等于空,則把目前線程設定成leader,并使用awaitNanos()方法讓目前線程等待接收信号或等待delay時間。

8.5、SynchronousQueue

​ 一個不存儲元素的阻塞隊列。每一個put操作必須等待一個take操作,否則不能繼續添加元素。

​ 它支援公平通路隊列。預設情況下線程采用非公平性政策通路隊列。

public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
           

​ SynchronousQueue可以看成是一個傳球手,負責把生産者線程處理的資料直接傳遞給消費者線程。隊列本身并不存儲任何元素,非常适合傳遞性場景。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。

8.6、LinkedTransferQueue

​ 一個由連結清單結構組成的無界阻塞TransferQueue隊列。相當于其他阻塞隊列,LinkedTransferQueue多了tryTransfer和transfer方法。

​ (1) transfer方法

​ 如果目前有消費者正在等待接受元素(消費者使用take()方法或帶時間限制的poll()方法時),transfer方法可以把生産者傳入的元素立刻transfer(傳輸)給消費者。如果沒有消費者在等待接收元素,transfer方法會将元素存放在隊列的tail節點,并等到該元素被消費者消費了才傳回。

transfer方法的關鍵代碼:

Node pred = tryAppend(s, haveData);
return awaitMatch(s, pred, e, (how == TIMED), nanos);
           

​ 第一行代碼是試圖把存放目前元素的s節點作為tail節點。第二行代碼是讓CPU自旋等待消費者消費元素。因為自旋會消耗CPU,是以自旋一定的次數後使用Thread.yield()方法來暫停目前正在執行的線程,并執行其他線程。

​ (2) tryTransfer方法

​ tryTransfer方法是用來試探生産者傳入的元素是否能直接傳給消費者。如果沒有消費者等待接收元素,則傳回false。和transfer方法的差別是tryTransfer方法無論消費者是否接收,方法立即傳回,而transfer方法是必須等待消費者消費了才傳回。

​ 對于帶有時間限制的tryTransfer(E e, long timeout, TimeUnit unit)方法,試圖把生産者傳入的元素直接傳給消費者,但是如果沒有消費者消費該元素則等待指定的時間在傳回,如果逾時還沒消費元素,則傳回false,如果在逾時時間内消費了元素,則傳回true。

​ 相當于ConcurrentLinkedQueue、SynchronousQueue(公平模式下)、無界的LinkedBlockingDeque等的超集。

8.7、LinkedBlockingDeque

​ 一個由連結清單結構組成的雙向阻塞隊列。所謂雙向隊列指的是可以從隊列兩端插入和移出元素。

​ 在初始化LinkedBlockingDeque時可以設定容量防止其過度碰撞。

​ 另外,雙向阻塞隊列可以運用在“工作竊取”模式中。

繼續閱讀