天天看點

聊聊并發(七)——Java中的阻塞隊列

阻塞隊列(blockingqueue)是一個支援兩個附加操作的隊列。這兩個附加的操作是:在隊列為空時,擷取元素的線程會等待隊列變為非空。當隊列滿時,存儲元素的線程會等待隊列可用。阻塞隊列常用于生産者和消費者的場景,生産者是往隊列裡添加元素的線程,消費者是從隊列裡拿元素的線程。阻塞隊列就是生産者存放元素的容器,而消費者也隻從容器裡拿元素。

阻塞隊列提供了四種處理方法:

方法\處理方式

抛出異常

傳回特殊值

一直阻塞

逾時退出

插入方法

add(e)

offer(e)

put(e)

offer(e,time,unit)

移除方法

remove()

poll()

take()

poll(time,unit)

檢查方法

element()

peek()

不可用

抛出異常:是指當阻塞隊列滿時候,再往隊列裡插入元素,會抛出illegalstateexception(“queue full”)異常。當隊列為空時,從隊列裡擷取元素時會抛出nosuchelementexception異常 。

傳回特殊值:插入方法會傳回是否成功,成功則傳回true。移除方法,則是從隊列裡拿出一個元素,如果沒有則傳回null

一直阻塞:當阻塞隊列滿時,如果生産者線程往隊列裡put元素,隊列會一直阻塞生産者線程,直到拿到資料,或者響應中斷退出。當隊列空時,消費者線程試圖從隊列裡take元素,隊列也會阻塞消費者線程,直到隊列可用。

逾時退出:當阻塞隊列滿時,隊列會阻塞生産者線程一段時間,如果超過一定的時間,生産者線程就會退出。

jdk7提供了7個阻塞隊列。分别是

arrayblockingqueue :一個由數組結構組成的有界阻塞隊列。

linkedblockingqueue :一個由連結清單結構組成的有界阻塞隊列。

priorityblockingqueue :一個支援優先級排序的無界阻塞隊列。

delayqueue:一個使用優先級隊列實作的無界阻塞隊列。

synchronousqueue:一個不存儲元素的阻塞隊列。

linkedtransferqueue:一個由連結清單結構組成的無界阻塞隊列。

linkedblockingdeque:一個由連結清單結構組成的雙向阻塞隊列。

arrayblockingqueue是一個用數組實作的有界阻塞隊列。此隊列按照先進先出(fifo)的原則對元素進行排序。預設情況下不保證通路者公平的通路隊列,所謂公平通路隊列是指阻塞的所有生産者線程或消費者線程,當隊列可用時,可以按照阻塞的先後順序通路隊列,即先阻塞的生産者線程,可以先往隊列裡插入元素,先阻塞的消費者線程,可以先從隊列裡擷取元素。通常情況下為了保證公平性會降低吞吐量。我們可以使用以下代碼建立一個公平的阻塞隊列:

<code>1</code>

<code>arrayblockingqueue fairqueue =</code><code>new</code>  <code>arrayblockingqueue(</code><code>1000</code><code>,</code><code>true</code><code>);</code>

通路者的公平性是使用可重入鎖實作的,代碼如下:

<code>public</code> <code>arrayblockingqueue(</code><code>int</code> <code>capacity,</code><code>boolean</code> <code>fair) {</code>

<code>2</code>

<code>        </code><code>if</code> <code>(capacity &lt;=</code><code>0</code><code>)</code>

<code>3</code>

<code>            </code><code>throw</code> <code>new</code> <code>illegalargumentexception();</code>

<code>4</code>

<code>        </code><code>this</code><code>.items =</code><code>new</code> <code>object[capacity];</code>

<code>5</code>

<code>        </code><code>lock =</code><code>new</code> <code>reentrantlock(fair);</code>

<code>6</code>

<code>        </code><code>notempty = lock.newcondition();</code>

<code>7</code>

<code>        </code><code>notfull =  lock.newcondition();</code>

<code>8</code>

<code>}</code>

linkedblockingqueue是一個用連結清單實作的有界阻塞隊列。此隊列的預設和最大長度為integer.max_value。此隊列按照先進先出的原則對元素進行排序。

priorityblockingqueue是一個支援優先級的無界隊列。預設情況下元素采取自然順序排列,也可以通過比較器comparator來指定元素的排序規則。元素按照升序排列。

delayqueue是一個支援延時擷取元素的無界阻塞隊列。隊列使用priorityqueue來實作。隊列中的元素必須實作delayed接口,在建立元素時可以指定多久才能從隊列中擷取目前元素。隻有在延遲期滿時才能從隊列中提取元素。我們可以将delayqueue運用在以下應用場景:

緩存系統的設計:可以用delayqueue儲存緩存元素的有效期,使用一個線程循環查詢delayqueue,一旦能從delayqueue中擷取元素時,表示緩存有效期到了。

定時任務排程。使用delayqueue儲存當天将會執行的任務和執行時間,一旦從delayqueue中擷取到任務就開始執行,從比如timerqueue就是使用delayqueue實作的。

隊列中的delayed必須實作compareto來指定元素的順序。比如讓延時時間最長的放在隊列的末尾。實作代碼如下:

<code>01</code>

<code>public</code> <code>int</code> <code>compareto(delayed other) {</code>

<code>02</code>

<code>           </code><code>if</code> <code>(other ==</code><code>this</code><code>)</code><code>// compare zero only if same object</code>

<code>03</code>

<code>                </code><code>return</code> <code>0</code><code>;</code>

<code>04</code>

<code>            </code><code>if</code> <code>(other</code><code>instanceof</code> <code>scheduledfuturetask) {</code>

<code>05</code>

<code>                </code><code>scheduledfuturetask x = (scheduledfuturetask)other;</code>

<code>06</code>

<code>                </code><code>long</code> <code>diff = time - x.time;</code>

<code>07</code>

<code>                </code><code>if</code> <code>(diff &lt;</code><code>0</code><code>)</code>

<code>08</code>

<code>                    </code><code>return</code> <code>-</code><code>1</code><code>;</code>

<code>09</code>

<code>                </code><code>else</code> <code>if</code> <code>(diff &gt;</code><code>0</code><code>)</code>

<code>10</code>

<code>                    </code><code>return</code> <code>1</code><code>;</code>

<code>11</code>

<code>       </code><code>else</code> <code>if</code> <code>(sequencenumber &lt; x.sequencenumber)</code>

<code>12</code>

<code>13</code>

<code>                </code><code>else</code>

<code>14</code>

<code>15</code>

<code>            </code><code>}</code>

<code>16</code>

<code>            </code><code>long</code> <code>d = (getdelay(timeunit.nanoseconds) -</code>

<code>17</code>

<code>                      </code><code>other.getdelay(timeunit.nanoseconds));</code>

<code>18</code>

<code>            </code><code>return</code> <code>(d ==</code><code>0</code><code>) ?</code><code>0</code> <code>: ((d &lt;</code><code>0</code><code>) ? -</code><code>1</code> <code>:</code><code>1</code><code>);</code>

<code>19</code>

<code>        </code><code>}</code>

<b>如何實作delayed接口</b>

我們可以參考scheduledthreadpoolexecutor裡scheduledfuturetask類。這個類實作了delayed接口。首先:在對象建立的時候,使用time記錄前對象什麼時候可以使用,代碼如下:

<code>scheduledfuturetask(runnable r, v result,</code><code>long</code> <code>ns,</code><code>long</code> <code>period) {</code>

<code>            </code><code>super</code><code>(r, result);</code>

<code>            </code><code>this</code><code>.time = ns;</code>

<code>            </code><code>this</code><code>.period = period;</code>

<code>            </code><code>this</code><code>.sequencenumber = sequencer.getandincrement();</code>

然後使用getdelay可以查詢目前元素還需要延時多久,代碼如下:

通過構造函數可以看出延遲時間參數ns的機關是納秒,自己設計的時候最好使用納秒,因為getdelay時可以指定任意機關,一旦以納秒作為機關,而延時的時間又精确不到納秒就麻煩了。使用時請注意當time小于目前時間時,getdelay會傳回負數。

<b>如何實作延時隊列</b>

延時隊列的實作很簡單,當消費者從隊列裡擷取元素時,如果元素沒有達到延時時間,就阻塞目前線程。

<code>long</code> <code>delay = first.getdelay(timeunit.nanoseconds);</code>

<code>                    </code><code>if</code> <code>(delay &lt;=</code><code>0</code><code>)</code>

<code>                        </code><code>return</code> <code>q.poll();</code>

<code>                    </code><code>else</code> <code>if</code> <code>(leader !=</code><code>null</code><code>)</code>

<code>                        </code><code>available.await();</code>

synchronousqueue是一個不存儲元素的阻塞隊列。每一個put操作必須等待一個take操作,否則不能繼續添加元素。synchronousqueue可以看成是一個傳球手,負責把生産者線程處理的資料直接傳遞給消費者線程。隊列本身并不存儲任何元素,非常适合于傳遞性場景,比如在一個線程中使用的資料,傳遞給另外一個線程使用,synchronousqueue的吞吐量高于linkedblockingqueue 和 arrayblockingqueue。

linkedtransferqueue是一個由連結清單結構組成的無界阻塞transferqueue隊列。相對于其他阻塞隊列linkedtransferqueue多了trytransfer和transfer方法。

transfer方法。如果目前有消費者正在等待接收元素(消費者使用take()方法或帶時間限制的poll()方法時),transfer方法可以把生産者傳入的元素立刻transfer(傳輸)給消費者。如果沒有消費者在等待接收元素,transfer方法會将元素存放在隊列的tail節點,并等到該元素被消費者消費了才傳回。transfer方法的關鍵代碼如下:

<code>node pred = tryappend(s, havedata);</code>

<code>return</code> <code>awaitmatch(s, pred, e, (how == timed), nanos);</code>

第一行代碼是試圖把存放目前元素的s節點作為tail節點。第二行代碼是讓cpu自旋等待消費者消費元素。因為自旋會消耗cpu,是以自旋一定的次數後使用thread.yield()方法來暫停目前正在執行的線程,并執行其他線程。

trytransfer方法。則是用來試探下生産者傳入的元素是否能直接傳給消費者。如果沒有消費者等待接收元素,則傳回false。和transfer方法的差別是trytransfer方法無論消費者是否接收,方法立即傳回。而transfer方法是必須等到消費者消費了才傳回。

對于帶有時間限制的trytransfer(e e, long timeout, timeunit unit)方法,則是試圖把生産者傳入的元素直接傳給消費者,但是如果沒有消費者消費該元素則等待指定的時間再傳回,如果逾時還沒消費元素,則傳回false,如果在逾時時間内消費了元素,則傳回true。

linkedblockingdeque是一個由連結清單結構組成的雙向阻塞隊列。所謂雙向隊列指的你可以從隊列的兩端插入和移出元素。雙端隊列因為多了一個操作隊列的入口,在多線程同時入隊時,也就減少了一半的競争。相比其他的阻塞隊列,linkedblockingdeque多了addfirst,addlast,offerfirst,offerlast,peekfirst,peeklast等方法,以first單詞結尾的方法,表示插入,擷取(peek)或移除雙端隊列的第一個元素。以last單詞結尾的方法,表示插入,擷取或移除雙端隊列的最後一個元素。另外插入方法add等同于addlast,移除方法remove等效于removefirst。但是take方法卻等同于takefirst,不知道是不是jdk的bug,使用時還是用帶有first和last字尾的方法更清楚。在初始化linkedblockingdeque時可以初始化隊列的容量,用來防止其再擴容時過渡膨脹。另外雙向阻塞隊列可以運用在“工作竊取”模式中。

如果隊列是空的,消費者會一直等待,當生産者添加元素時候,消費者是如何知道目前隊列有元素的呢?如果讓你來設計阻塞隊列你會如何設計,讓生産者和消費者能夠高效率的進行通訊呢?讓我們先來看看jdk是如何實作的。

使用通知模式實作。所謂通知模式,就是當生産者往滿的隊列裡添加元素時會阻塞住生産者,當消費者消費了一個隊列中的元素後,會通知生産者目前隊列可用。通過檢視jdk源碼發現arrayblockingqueue使用了condition來實作,代碼如下:

<code>private</code> <code>final</code> <code>condition notfull;</code>

<code>private</code> <code>final</code> <code>condition notempty;</code>

<code>        </code><code>//省略其他代碼</code>

<code>    </code><code>}</code>

<code>public</code> <code>void</code> <code>put(e e)</code><code>throws</code> <code>interruptedexception {</code>

<code>        </code><code>checknotnull(e);</code>

<code>        </code><code>final</code> <code>reentrantlock lock =</code><code>this</code><code>.lock;</code>

<code>        </code><code>lock.lockinterruptibly();</code>

<code>        </code><code>try</code> <code>{</code>

<code>            </code><code>while</code> <code>(count == items.length)</code>

<code>                </code><code>notfull.await();</code>

<code>            </code><code>insert(e);</code>

<code>        </code><code>}</code><code>finally</code> <code>{</code>

<code>            </code><code>lock.unlock();</code>

<code>20</code>

<code>21</code>

<code>22</code>

<code>23</code>

<code>public</code> <code>e take()</code><code>throws</code> <code>interruptedexception {</code>

<code>24</code>

<code>25</code>

<code>26</code>

<code>27</code>

<code>            </code><code>while</code> <code>(count ==</code><code>0</code><code>)</code>

<code>28</code>

<code>                </code><code>notempty.await();</code>

<code>29</code>

<code>            </code><code>return</code> <code>extract();</code>

<code>30</code>

<code>  </code><code>}</code><code>finally</code> <code>{</code>

<code>31</code>

<code>32</code>

<code>33</code>

<code>34</code>

<code>35</code>

<code>private</code> <code>void</code> <code>insert(e x) {</code>

<code>36</code>

<code>        </code><code>items[putindex] = x;</code>

<code>37</code>

<code>        </code><code>putindex = inc(putindex);</code>

<code>38</code>

<code>        </code><code>++count;</code>

<code>39</code>

<code>        </code><code>notempty.signal();</code>

<code>40</code>

當我們往隊列裡插入一個元素時,如果隊列不可用,阻塞生産者主要通過locksupport.park(this);來實作

<code>public</code> <code>final</code> <code>void</code> <code>await()</code><code>throws</code> <code>interruptedexception {</code>

<code>            </code><code>if</code> <code>(thread.interrupted())</code>

<code>                </code><code>throw</code> <code>new</code> <code>interruptedexception();</code>

<code>            </code><code>node node = addconditionwaiter();</code>

<code>            </code><code>int</code> <code>savedstate = fullyrelease(node);</code>

<code>            </code><code>int</code> <code>interruptmode =</code><code>0</code><code>;</code>

<code>            </code><code>while</code> <code>(!isonsyncqueue(node)) {</code>

<code>                </code><code>locksupport.park(</code><code>this</code><code>);</code>

<code>                </code><code>if</code> <code>((interruptmode = checkinterruptwhilewaiting(node)) !=</code><code>0</code><code>)</code>

<code>                    </code><code>break</code><code>;</code>

<code>            </code><code>if</code> <code>(acquirequeued(node, savedstate) &amp;&amp; interruptmode != throw_ie)</code>

<code>                </code><code>interruptmode = reinterrupt;</code>

<code>            </code><code>if</code> <code>(node.nextwaiter !=</code><code>null</code><code>)</code><code>// clean up if cancelled</code>

<code>                </code><code>unlinkcancelledwaiters();</code>

<code>            </code><code>if</code> <code>(interruptmode !=</code><code>0</code><code>)</code>

<code>reportinterruptafterwait(interruptmode);</code>

繼續進入源碼,發現調用setblocker先儲存下将要阻塞的線程,然後調用unsafe.park阻塞目前線程。

<code>public</code> <code>static</code> <code>void</code> <code>park(object blocker) {</code>

<code>        </code><code>thread t = thread.currentthread();</code>

<code>        </code><code>setblocker(t, blocker);</code>

<code>        </code><code>unsafe.park(</code><code>false</code><code>, 0l);</code>

<code>        </code><code>setblocker(t,</code><code>null</code><code>);</code>

unsafe.park是個native方法,代碼如下:

<code>public</code> <code>native</code> <code>void</code> <code>park(</code><code>boolean</code> <code>isabsolute,</code><code>long</code> <code>time);</code>

park這個方法會阻塞目前線程,隻有以下四種情況中的一種發生時,該方法才會傳回。

與park對應的unpark執行或已經執行時。注意:已經執行是指unpark先執行,然後再執行的park。

線程被中斷時。

如果參數中的time不是零,等待了指定的毫秒數時。

發生異常現象時。這些異常事先無法确定。

我們繼續看一下jvm是如何實作park方法的,park在不同的作業系統使用不同的方式實作,在linux下是使用的是系統方法pthread_cond_wait實作。實作代碼在jvm源碼路徑src/os/linux/vm/os_linux.cpp裡的 os::platformevent::park方法,代碼如下:

<code>void</code> <code>os::platformevent::park() {</code>

<code>             </code><code>int</code> <code>v ;</code>

<code>         </code><code>for</code> <code>(;;) {</code>

<code>        </code><code>v = _event ;</code>

<code>         </code><code>if</code> <code>(atomic::cmpxchg (v-1, &amp;_event, v) == v)</code><code>break</code> <code>;</code>

<code>         </code><code>}</code>

<code>         </code><code>guarantee (v &gt;= 0,</code><code>"invariant"</code><code>) ;</code>

<code>         </code><code>if</code> <code>(v == 0) {</code>

<code>         </code><code>// do this the hard way by blocking ...</code>

<code>         </code><code>int</code> <code>status = pthread_mutex_lock(_mutex);</code>

<code>         </code><code>assert_status(status == 0, status,</code><code>"mutex_lock"</code><code>);</code>

<code>         </code><code>guarantee (_nparked == 0,</code><code>"invariant"</code><code>) ;</code>

<code>         </code><code>++ _nparked ;</code>

<code>         </code><code>while</code> <code>(_event &lt; 0) {</code>

<code>         </code><code>status = pthread_cond_wait(_cond, _mutex);</code>

<code>         </code><code>// for some reason, under 2.7 lwp_cond_wait() may return etime ...</code>

<code>         </code><code>// treat this the same as if the wait was interrupted</code>

<code>         </code><code>if</code> <code>(status == etime) { status = eintr; }</code>

<code>         </code><code>assert_status(status == 0 || status == eintr, status,</code><code>"cond_wait"</code><code>);</code>

<code>         </code><code>-- _nparked ;</code>

<code>         </code><code>// in theory we could move the st of 0 into _event past the unlock(),</code>

<code>         </code><code>// but then we'd need a membar after the st.</code>

<code>         </code><code>_event = 0 ;</code>

<code>         </code><code>status = pthread_mutex_unlock(_mutex);</code>

<code>         </code><code>assert_status(status == 0, status,</code><code>"mutex_unlock"</code><code>);</code>

<code>         </code><code>guarantee (_event &gt;= 0,</code><code>"invariant"</code><code>) ;</code>

<code>     </code><code>}</code>

pthread_cond_wait是一個多線程的條件變量函數,cond是condition的縮寫,字面意思可以了解為線程在等待一個條件發生,這個條件是一個全局變量。這個方法接收兩個參數,一個共享變量_cond,一個互斥量_mutex。而unpark方法在linux下是使用pthread_cond_signal實作的。park 在windows下則是使用waitforsingleobject實作的。

當隊列滿時,生産者往阻塞隊列裡插入一個元素,生産者線程會進入waiting (parking)狀态。我們可以使用jstack dump阻塞的生産者線程看到這點:

<code>"main" prio=5 tid=0x00007fc83c000000 nid=0x10164e000 waiting on condition [0x000000010164d000]</code>

<code>   </code><code>java.lang.thread.state: waiting (parking)</code>

<code>        </code><code>at sun.misc.unsafe.park(native method)</code>

<code>        </code><code>- parking to wait for  &lt;0x0000000140559fe8&gt; (a java.util.concurrent.locks.abstractqueuedsynchronizer$conditionobject)</code>

<code>        </code><code>at java.util.concurrent.locks.locksupport.park(locksupport.java:186)</code>

<code>        </code><code>at java.util.concurrent.locks.abstractqueuedsynchronizer$conditionobject.await(abstractqueuedsynchronizer.java:2043)</code>

<code>        </code><code>at java.util.concurrent.arrayblockingqueue.put(arrayblockingqueue.java:324)</code>

<code>        </code><code>at blockingqueue.arrayblockingqueuetest.main(arrayblockingqueuetest.java:11)</code>