天天看點

基本線程同步(八)在Lock中使用多個條件

在lock中使用多個條件

一個鎖可能伴随着多個條件。這些條件聲明在condition接口中。 這些條件的目的是允許線程擁有鎖的控制并且檢查條件是否為true,如果是false,那麼線程将被阻塞,直到其他線程喚醒它們。condition接口提供一種機制,阻塞一個線程和喚醒一個被阻塞的線程。

在并發程式設計中,生産者與消費者是經典的問題。我們有一個資料緩沖區,一個或多個資料生産者往緩沖區存儲資料,一個或多個資料消費者從緩沖區中取出資料,正如在這一章中前面所解釋的一樣。

在這個指南中,你将學習如何通過使用鎖和條件來實作生産者與消費者問題。

準備工作…

如何做…

按以下步驟來實作的這個例子:

1.首先,讓我們建立一個類來模拟文本檔案。建立filemock類,包括兩個屬性:一個字元串數組類型,名叫content,另一個int類型,名叫index。它們将存儲檔案内容和被檢索到的模拟檔案的行數。

<code>1</code>

<code>public</code> <code>class</code> <code>filemock {</code>

<code>2</code>

<code>private</code> <code>string content[];</code>

<code>3</code>

<code>private</code> <code>int</code> <code>index;</code>

2.實作filemock類的構造器,用随機字元初始化檔案的内容。

<code>01</code>

<code>public</code> <code>filemock(</code><code>int</code> <code>size, </code><code>int</code> <code>length){</code>

<code>02</code>

<code>content=</code><code>new</code> <code>string[size];</code>

<code>03</code>

<code>for</code> <code>(</code><code>int</code> <code>i=</code><code>0</code><code>; i&lt;size; i++){</code>

<code>04</code>

<code>stringbuilder buffer=</code><code>new</code> <code>stringbuilder(length);</code>

<code>05</code>

<code>for</code> <code>(</code><code>int</code> <code>j=</code><code>0</code><code>; j&lt;length; j++){</code>

<code>06</code>

<code>int</code> <code>indice=(</code><code>int</code><code>)math.random()*</code><code>255</code><code>;</code>

<code>07</code>

<code>buffer.append((</code><code>char</code><code>)indice);</code>

<code>08</code>

<code>}</code>

<code>09</code>

<code>content[i]=buffer.tostring();</code>

<code>10</code>

<code>11</code>

<code>index=</code><code>0</code><code>;</code>

<code>12</code>

3.實作hasmorelines()方法,如果檔案有更多的行來處理,則傳回true,如果我們已經取到了模拟檔案的尾部,則傳回false。

<code>public</code> <code>boolean</code> <code>hasmorelines(){</code>

<code>return</code> <code>index&lt;content.length;</code>

4.實作getline()方法,傳回index屬性所确定的行數并增加其值。

<code>public</code> <code>string getline(){</code>

<code>if</code> <code>(</code><code>this</code><code>.hasmorelines()) {</code>

<code>system.out.println(</code><code>"mock: "</code><code>+(content.length-index));</code>

<code>4</code>

<code>return</code> <code>content[index++];</code>

<code>5</code>

<code>6</code>

<code>return</code> <code>null</code><code>;</code>

<code>7</code>

5.現在,實作buffer類,用來實作在生産者與消費者之間共享的緩沖區。

<code>public</code> <code>class</code> <code>buffer {</code>

6.buffer類,有6個屬性:

一個類型為linkedlist&lt;string&gt;,名為buffer的屬性,用來存儲共享資料

一個類型為int,名為maxsize的屬性,用來存儲緩沖區的長度

一個名為lock的reentrantlock對象,用來控制修改緩沖區代碼塊的通路

兩個名分别為lines和space,類型為condition的屬性

一個boolean類型,名為pendinglines的屬性,表明如果緩沖區中有行

<code>private</code> <code>linkedlist&lt;string&gt; buffer;</code>

<code>private</code> <code>int</code> <code>maxsize;</code>

<code>private</code> <code>reentrantlock lock;</code>

<code>private</code> <code>condition lines;</code>

<code>private</code> <code>condition space;</code>

<code>private</code> <code>boolean</code> <code>pendinglines;</code>

7.實作buffer類的構造器,初始化前面描述的所有屬性。

<code>public</code> <code>buffer(</code><code>int</code> <code>maxsize) {</code>

<code>this</code><code>.maxsize=maxsize;</code>

<code>buffer=</code><code>new</code> <code>linkedlist&lt;&gt;();</code>

<code>lock=</code><code>new</code> <code>reentrantlock();</code>

<code>lines=lock.newcondition();</code>

<code>space=lock.newcondition();</code>

<code>pendinglines=</code><code>true</code><code>;</code>

<code>8</code>

8. 實作insert()方法,接收一個string類型參數并試圖将它存儲到緩沖區。首先,它獲得鎖的控制。當它有鎖的控制,它将檢查緩沖區是否有空閑空 間。如果緩沖區已滿,它将調用await()方法在space條件上等待釋放空間。如果其他線程在space條件上調用signal()或 signalall()方法,這個線程将被喚醒。當這種情況發生,這個線程在緩沖區上存儲行并且在lines條件上調用signallall()方法,稍 後我們将會看到,這個條件将會喚醒所有在緩沖行上等待的線程。

<code>public</code> <code>void</code> <code>insert(string line) {</code>

<code>lock.lock();</code>

<code>try</code> <code>{</code>

<code>while</code> <code>(buffer.size() == maxsize) {</code>

<code>space.await();</code>

<code>buffer.offer(line);</code>

<code>system.out.printf(</code><code>"%s: inserted line: %d\n"</code><code>, thread.</code>

<code>currentthread().getname(),buffer.size());</code>

<code>lines.signalall();</code>

<code>} </code><code>catch</code> <code>(interruptedexception e) {</code>

<code>e.printstacktrace();</code>

<code>13</code>

<code>} </code><code>finally</code> <code>{</code>

<code>14</code>

<code>lock.unlock();</code>

<code>15</code>

<code>16</code>

9. 實作get()方法,它傳回存儲在緩沖區上的第一個字元串。首先,它擷取鎖的控制。當它擁有鎖的控制,它檢查緩沖區是否有行。如果緩沖區是空的,它調用 await()方法在lines條件上等待緩沖區中的行。如果其他線程在lines條件上調用signal()或signalall()方法,這個線程将 被喚醒。當它發生時,這個方法擷取緩沖區的首行,并且在space條件上調用signalall()方法,然後傳回string。

<code>public</code> <code>string get() {</code>

<code>string line=</code><code>null</code><code>;</code>

<code>while</code> <code>((buffer.size() == </code><code>0</code><code>) &amp;&amp;(haspendinglines())) {</code>

<code>lines.await();</code>

<code>if</code> <code>(haspendinglines()) {</code>

<code>line = buffer.poll();</code>

<code>system.out.printf(</code><code>"%s: line readed: %d\n"</code><code>,thread.</code>

<code>space.signalall();</code>

<code>17</code>

<code>18</code>

<code>19</code>

<code>20</code>

<code>return</code> <code>line;</code>

<code>21</code>

10.實作setpendinglines()方法,用來設定pendinglines的值。當沒有更多的行生産時,它将被生産者調用。

<code>public</code> <code>void</code> <code>setpendinglines(</code><code>boolean</code> <code>pendinglines) {</code>

<code>this</code><code>.pendinglines=pendinglines;</code>

11.實作haspendinglines()方法,如果有更多的行被處理時,傳回true,否則傳回false。

<code>public</code> <code>boolean</code> <code>haspendinglines() {</code>

<code>return</code> <code>pendinglines || buffer.size()&gt;</code><code>0</code><code>;</code>

12.現在輪到生産者,實作producer類,并指定其實作runnable接口。

<code>public</code> <code>class</code> <code>producer </code><code>implements</code> <code>runnable {</code>

13.聲明兩個屬性:一個filemock類對象,另一個buffer類對象。

<code>private</code> <code>filemock mock;</code>

<code>private</code> <code>buffer buffer;</code>

14.實作producer類的構造器,初始化這兩個屬性。

<code>public</code> <code>producer (filemock mock, buffer buffer){</code>

<code>this</code><code>.mock=mock;</code>

<code>this</code><code>.buffer=buffer;</code>

15.實作run()方法,讀取在filemock對象中建立的所有行,并使用insert()方法将它們存儲到緩沖區。一旦這個過程結束,使用setpendinglines()方法警告緩沖區,不會再産生更多的行。

<code>@override</code>

<code>public</code> <code>void</code> <code>run() {</code>

<code>buffer.setpendinglines(</code><code>true</code><code>);</code>

<code>while</code> <code>(mock.hasmorelines()){</code>

<code>string line=mock.getline();</code>

<code>buffer.insert(line);</code>

<code>buffer.setpendinglines(</code><code>false</code><code>);</code>

<code>9</code>

16.接下來輪到消費者,實作consumer類,并指定它實作runnable接口。

<code>public</code> <code>class</code> <code>consumer </code><code>implements</code> <code>runnable {</code>

17.聲明buffer對象,實作consumer構造器來初始化這個對象。

<code>public</code> <code>consumer (buffer buffer) {</code>

18.實作run()方法,當緩沖區有等待的行,它将擷取一個并處理它。

<code>while</code> <code>(buffer.haspendinglines()) {</code>

<code>string line=buffer.get();</code>

<code>processline(line);</code>

19.實作輔助方法processline(),它隻睡眠10毫秒,用來模拟某種行的處理。

<code>private</code> <code>void</code> <code>processline(string line) {</code>

<code>random random=</code><code>new</code> <code>random();</code>

<code>thread.sleep(random.nextint(</code><code>100</code><code>));</code>

20.通過建立類名為main,且包括main()方法來實作這個示例的主類。

<code>public</code> <code>class</code> <code>main {</code>

<code>public</code> <code>static</code> <code>void</code> <code>main(string[] args) {</code>

21.建立一個filemock對象。

<code>filemock mock=</code><code>new</code> <code>filemock(</code><code>100</code><code>, </code><code>10</code><code>);</code>

22.建立一個buffer對象。

<code>buffer buffer=</code><code>new</code> <code>buffer(</code><code>20</code><code>);</code>

23.建立producer對象,并且用10個線程運作它。

<code>producer producer=</code><code>new</code> <code>producer(mock, buffer);</code>

<code>thread threadproducer=</code><code>new</code> <code>thread(producer,</code><code>"producer"</code><code>);</code>

24.建立consumer對象,并且用10個線程運作它。

<code>consumer consumers[]=</code><code>new</code> <code>consumer[</code><code>3</code><code>];</code>

<code>thread threadconsumers[]=</code><code>new</code> <code>thread[</code><code>3</code><code>];</code>

<code>for</code> <code>(</code><code>int</code> <code>i=</code><code>0</code><code>; i&lt;</code><code>3</code><code>; i++){</code>

<code>consumers[i]=</code><code>new</code> <code>consumer(buffer);</code>

<code>threadconsumers[i]=</code><code>new</code> <code>thread(consumers[i],</code><code>"consumer "</code><code>+i);</code>

25.啟動producer和3個consumers。

<code>threadproducer.start();</code>

<code>threadconsumers[i].start();</code>

它是如何工作的…

所 有condition對象都與鎖有關,并且使用聲明在lock接口中的newcondition()方法來建立。使用condition做任何操作之前, 你必須擷取與這個condition相關的鎖的控制。是以,condition的操作一定是在以調用lock對象的lock()方法為開頭,以調用相同 lock對象的unlock()方法為結尾的代碼塊中。

當一個線程在一個condition上調用await()方法時,它将自動釋放鎖的控制,是以其他線程可以擷取這個鎖的控制并開始執行相同操作,或者由同個鎖保護的其他臨界區。

注釋:當一個線程在一個condition上調用signal()或signallall()方法,一個或者全部在這個condition上等待的線程将被喚醒。這并不能保證的使它們現在睡眠的條件現在是true,是以你必須在while循環内部調用await()方法。你不能離開這個循環,直到 condition為true。當condition為false,你必須再次調用 await()方法。

你必須十分小心 ,在使用await()和signal()方法時。如果你在condition上調用await()方法而卻沒有在這個condition上調用signal()方法,這個線程将永遠睡眠下去。

在調用await()方法後,一個線程可以被中斷的,是以當它正在睡眠時,你必須處理interruptedexception異常。

不止這些…

condition接口提供不同版本的await()方法,如下:

await(long time, timeunit unit):這個線程将會一直睡眠直到:

(1)它被中斷

(2)其他線程在這個condition上調用singal()或signalall()方法

(3)指定的時間已經過了

(4)timeunit類是一個枚舉類型如下的常量:

days,hours, microseconds, milliseconds, minutes, nanoseconds,seconds

awaituninterruptibly():這個線程将不會被中斷,一直睡眠直到其他線程調用signal()或signalall()方法

awaituntil(date date):這個線程将會一直睡眠直到:

(3)指定的日期已經到了

你可以在一個讀/寫鎖中的readlock和writelock上使用conditions。

參見