天天看點

線程同步工具(七)在并發任務間交換資料

java 并發 api 提供了一種允許2個并發任務間互相交換資料的同步應用。更具體的說,exchanger 類允許在2個線程間定義同步點,當2個線程到達這個點,他們互相交換資料類型,使用第一個線程的資料類型變成第二個的,然後第二個線程的資料類型變成第一個的。

這個類在遇到類似生産者和消費者問題時,是非常有用的。來一個非常經典的并發問題:你有相同的資料buffer,一個或多個資料生産者,和一個或多個資料消費者。隻是exchange類隻能同步2個線程,是以你隻能在你的生産者和消費者問題中隻有一個生産者和一個消費者時使用這個類。

在這個指南,你将學習如何使用 exchanger 類來解決隻有一個生産者和一個消費者的生産者和消費者問題。

準備

這個指南的例子使用eclipse ide實作。如果你使用eclipse或其他ide,如netbeans,打開它并建立一個新的java項目。

怎麼做呢<b>…</b>

按照這些步驟來實作下面的例子:

<code>01</code>

<code>package</code> <code>tool;</code>

<code>02</code>

<code>import</code> <code>java.util.list;</code>

<code>03</code>

<code>import</code> <code>java.util.concurrent.exchanger;</code>

<code>04</code>

<code>05</code>

<code>//1. 首先,從實作producer開始吧。建立一個類名為producer并一定實作 runnable 接口。</code>

<code>06</code>

<code>public</code> <code>class</code> <code>producer</code><code>implements</code> <code>runnable {</code>

<code>07</code>

<code>08</code>

<code>// 2. 聲明 list&lt;string&gt;對象,名為 buffer。這是等等要被互相交換的資料類型。</code>

<code>09</code>

<code>private</code> <code>list&lt;string&gt; buffer;</code>

<code>10</code>

<code>11</code>

<code>// 3. 聲明 exchanger&lt;list&lt;string&gt;&gt;; 對象,名為exchanger。這個 exchanger 對象是用來同步producer和consumer的。</code>

<code>12</code>

<code>private</code> <code>final</code> <code>exchanger&lt;list&lt;string&gt;&gt; exchanger;</code>

<code>13</code>

<code>14</code>

<code>// 4. 實作類的構造函數,初始化這2個屬性。</code>

<code>15</code>

<code>public</code> <code>producer(list&lt;string&gt; buffer, exchanger&lt;list&lt;string&gt;&gt; exchanger) {</code>

<code>16</code>

<code>this</code><code>.buffer = buffer;</code>

<code>17</code>

<code>this</code><code>.exchanger = exchanger;</code>

<code>18</code>

<code>}</code>

<code>19</code>

<code>20</code>

<code>// 5. 實作 run() 方法. 在方法内,實作10次交換。</code>

<code>21</code>

<code>@override</code>

<code>22</code>

<code>public</code> <code>void</code> <code>run() {</code>

<code>23</code>

<code>int</code> <code>cycle =</code><code>1</code><code>;</code>

<code>24</code>

<code>for</code> <code>(</code><code>int</code> <code>i =</code><code>0</code><code>; i &lt;</code><code>10</code><code>; i++) {           system.out.printf(</code><code>"producer: cycle %d\n"</code><code>, cycle);</code>

<code>25</code>

<code>26</code>

<code>// 6. 在每次循環中,加10個字元串到buffer。</code>

<code>27</code>

<code>for</code> <code>(</code><code>int</code> <code>j =</code><code>0</code><code>; j &lt;</code><code>10</code><code>; j++) {</code>

<code>28</code>

<code>string message =</code><code>"event "</code> <code>+ ((i *</code><code>10</code><code>) + j);</code>

<code>29</code>

<code>system.out.printf(</code><code>"producer: %s\n"</code><code>, message);</code>

<code>30</code>

<code>buffer.add(message);</code>

<code>31</code>

<code>32</code>

<code>33</code>

<code>// 7. 調用 exchange() 方法來與consumer交換資料。此方法可能會抛出interruptedexception 異常, 加上處理代碼。</code>

<code>34</code>

<code>try</code> <code>{</code>

<code>35</code>

<code>buffer = exchanger.exchange(buffer);</code>

<code>36</code>

<code>}</code><code>catch</code> <code>(interruptedexception e) {</code>

<code>37</code>

<code>e.printstacktrace();</code>

<code>38</code>

<code>39</code>

<code>system.out.println(</code><code>"producer: "</code> <code>+ buffer.size());</code>

<code>40</code>

<code>cycle++;</code>

<code>41</code>

<code>42</code>

<code>43</code>

<code>//8. 現在, 來實作consumer。建立一個類名為consumer并一定實作 runnable 接口。</code>

<code>public</code> <code>class</code> <code>consumer</code><code>implements</code> <code>runnable {</code>

<code>// 9. 聲明名為buffer的 list&lt;string&gt;對象。這個對象類型是用來互相交換的。</code>

<code>// 10. 聲明一個名為exchanger的 exchanger&lt;list&lt;string&gt;&gt; 對象。用來同步 producer和consumer。</code>

<code>// 11. 實作類的構造函數,并初始化2個屬性。</code>

<code>public</code> <code>consumer(list&lt;string&gt;buffer, exchanger&lt;list&lt;string&gt;&gt; exchanger) {</code>

<code>// 12. 實作 run() 方法。在方法内,實作10次交換。</code>

<code>for</code> <code>(</code><code>int</code> <code>i =</code><code>0</code><code>; i &lt;</code><code>10</code><code>; i++) {</code>

<code>system.out.printf(</code><code>"consumer: cycle %d\n"</code><code>, cycle);</code>

<code>// 13. 在每次循環,首先調用exchange()方法來與producer同步。consumer需要消耗資料。此方法可能會抛出interruptedexception異常, 加上處理代碼。</code>

<code>}</code><code>catch</code> <code>(interruptedexception e) {              e.printstacktrace();</code>

<code>// 14. 把producer發來的在buffer裡的10字元串寫到操控台并從buffer内删除,留白。system.out.println("consumer: " + buffer.size());</code>

<code>string message = buffer.get(</code><code>0</code><code>);</code>

<code>system.out.println(</code><code>"consumer: "</code> <code>+ message);</code>

<code>buffer.remove(</code><code>0</code><code>);</code>

<code>//15.現在,實作例子的主類通過建立一個類,名為core并加入 main() 方法。</code>

<code>import</code> <code>java.util.arraylist;</code>

<code>mport java.util.list;</code>

<code>public</code> <code>class</code> <code>core {</code>

<code>public</code> <code>static</code> <code>void</code> <code>main(string[] args) {</code>

<code>// 16. 建立2個buffers。分别給producer和consumer使用.</code>

<code>list&lt;string&gt; buffer1 =</code><code>new</code> <code>arraylist&lt;string&gt;();</code>

<code>list&lt;string&gt; buffer2 =</code><code>new</code> <code>arraylist&lt;string&gt;();</code>

<code>// 17. 建立exchanger對象,用來同步producer和consumer。</code>

<code>exchanger&lt;list&lt;string&gt;&gt; exchanger =</code><code>new</code> <code>exchanger&lt;list&lt;string&gt;&gt;();</code>

<code>// 18. 建立producer對象和consumer對象。</code>

<code>producer producer =</code><code>new</code> <code>producer(buffer1, exchanger);</code>

<code>consumer consumer =</code><code>new</code> <code>consumer(buffer2, exchanger);</code>

<code>// 19. 建立線程來執行producer和consumer并開始線程。</code>

<code>thread threadproducer =</code><code>new</code> <code>thread(producer);</code>

<code>thread threadconsumer =</code><code>new</code> <code>thread(consumer); threadproducer.start();</code>

<code>threadconsumer.start();</code>

它是怎麼工作的…

消費者開始時是空白的buffer,然後調用exchanger來與生産者同步。因為它需要資料來消耗。生産者也是從空白的buffer開始,然後建立10個字元串,儲存到buffer,并使用exchanger與消費者同步。

在這兒,2個線程(生産者和消費者線程)都是在exchanger裡并交換了資料類型,是以當消費者從exchange() 方法傳回時,它有10個字元串在buffer内。當生産者從 exchange() 方法傳回時,它有空白的buffer來重新寫入。這樣的操作會重複10遍。

如你執行例子,你會發現生産者和消費者是如何并發的執行任務和在每個步驟它們是如何交換buffers的。與其他同步工具一樣會發生這種情況,第一個調用 exchange()方法會進入休眠直到其他線程的達到。

更多…

exchanger 類有另外一個版本的exchange方法:

exchange(v data, long time, timeunit unit):v是聲明phaser的參數種類(例子裡是 list)。 此線程會休眠直到另一個線程到達并中斷它,或者特定的時間過去了。timeunit類有多種常量:days, hours, microseconds, milliseconds, minutes, nanoseconds, 和 seconds。

繼續閱讀