本章将介紹下列内容:
使用非阻塞式線程安全清單
使用阻塞式線程安全清單
使用按優先級排序的阻塞式線程安全清單
使用帶有延遲元素的線程安全清單
使用線程安全可周遊映射
生成并發随機數
使用原子變量
使用原子數組
資料結構(data structure)是程式設計中的基本元素,幾乎每個程式都使用一種或多種資料結構來存儲和管理資料。java api提供了包含接口、類和算法的java集合架構(java collection framework),它實作了可用在程式中的大量資料結構。
當需要在并發程式中使用資料集合時,必須要謹慎地選擇相應的實作方式。大多數集合類不能直接用于并發應用,因為它們沒有對本身資料的并發通路進行控制。如果一些并發任務共享了一個不适用于并發任務的資料結構,将會遇到資料不一緻的錯誤,并将影響程式的準确運作。這類資料結構的一個例子是arraylist類。
java提供了一些可以用于并發程式中的資料集合,它們不會引起任何問題。一般來說,java提供了兩類适用于并發應用的集合。
阻塞式集合(blocking collection):這類集合包括添加和移除資料的方法。當集合已滿或為空時,被調用的添加或者移除方法就不能立即被執行,那麼調用這個方法的線程将被阻塞,一直到該方法可以被成功執行。
非阻塞式集合(non-blocking collection):這類集合也包括添加和移除資料的方法。如果方法不能立即被執行,則傳回null或抛出異常,但是調用這個方法的線程不會被阻塞。
通過本章的各個小節,你将學會如何在并發應用中使用一些java集合。
非阻塞式清單對應的實作類:concurrentlinkeddeque類;
阻塞式清單對應的實作類:linkedblockingdeque 類;
用于資料生成或消費的阻塞式清單對應的實作類:linkedtransferqueue類;
按優先級排序清單元素的阻塞式清單對應的實作類:priorityblockingqueue類;
帶有延遲清單元素的阻塞式清單對應的實作類:delayqueue類;
非阻塞式可周遊映射對應的實作類:concurrentskiplistmap類;
随機數字對應的實作類:threadlocalrandom類;
原子變量對應的實作類:atomiclong和atomicintegerarray類。
最基本的集合類型是清單(list)。一個清單包含的元素數量不定,可以在任何位置添加、讀取或移除元素。并發清單允許不同的線程在同一時間添加或移除清單中的元素,而不會造成資料不一緻。
在本節,将會學到如何在并發程式中使用非阻塞式清單。非阻塞式清單提供了一些操作,如果被執行的操作不能夠立即運作(例如,在清單為空時,從清單取出一個元素),方法會抛出異常或傳回null。java 7引入了concurrentlinkeddeque類來實作非阻塞式并發清單。
将要實作的範例包括以下兩個不同的任務:
添加大量的資料到一個清單中;
從同一個清單中移除大量的資料。
本節的範例是在eclipse ide裡完成的。無論你使用eclipse還是其他的ide(比如netbeans),都可以打開這個ide并且建立一個新的java工程。
按照接下來的步驟實作本節的範例。
1.建立一個名為addtask的類,實作runnable接口。
<code>1</code>
<code>public</code> <code>class</code> <code>addtask </code><code>implements</code> <code>runnable {</code>
2.聲明一個私有的concurrentlinkeddeque屬性list,并指定它的泛型參數是string型的。
<code>private</code> <code>concurrentlinkeddeque list;</code>
3.實作類的構造器來初始化屬性。
<code>public</code> <code>addtask(concurrentlinkeddeque list) {</code>
<code>2</code>
<code>this</code><code>.list=list;</code>
<code>3</code>
<code>}</code>
4.實作run()方法。這個方法将10,000個字元串存放到清單中,這些字元串由目前執行任務的線程的名稱和數字組成。
<code>@override</code>
<code>public</code> <code>void</code> <code>run() {</code>
<code>string name=thread.currentthread().getname();</code>
<code>4</code>
<code>for</code> <code>(</code><code>int</code> <code>i=</code><code>0</code><code>; i&lt;</code><code>10000</code><code>; i++){</code>
<code>5</code>
<code>list.add(name+</code><code>": element "</code><code>+i);</code>
<code>6</code>
<code>7</code>
5.建立名為polltask的類,并實作runnable接口。
<code>public</code> <code>class</code> <code>polltask </code><code>implements</code> <code>runnable {</code>
6.聲明一個私有的concurrentlinkeddeque屬性list,并指定它的泛型參數是string型的。
7.實作類的構造器來初始化屬性。
<code>public</code> <code>polltask(concurrentlinkeddeque list) {</code>
8.實作run()方法。這個方法将清單中的10,000個字元串取出,總共取5,000次,每次取兩個元素。
<code>for</code> <code>(</code><code>int</code> <code>i=</code><code>0</code><code>; i&lt;</code><code>5000</code><code>; i++) {</code>
<code>list.pollfirst();</code>
<code>list.polllast();</code>
9.建立範例的主類main,并添加main()方法。
<code>public</code> <code>class</code> <code>main {</code>
<code>public</code> <code>static</code> <code>void</code> <code>main(string[] args) {</code>
10.建立concurrentlinkeddeque對象,并指定它的泛型參數是string型的。
<code>concurrentlinkeddeque list=</code><code>new</code>
<code>concurrentlinkeddeque&lt;&gt;();</code>
11.建立線程數組threads,它包含100個線程。
<code>thread threads[]=</code><code>new</code> <code>thread[</code><code>100</code><code>];</code>
12.建立100個addtask對象及其對應的運作線程。将每個線程存放到上一步建立的數組中,然後啟動線程。
<code>for</code> <code>(</code><code>int</code> <code>i=</code><code>0</code><code>; i addtask task=</code><code>new</code> <code>addtask(list);</code>
<code>threads[i]=</code><code>new</code> <code>thread(task);</code>
<code>threads[i].start();</code>
<code>system.out.printf("main: %d addtask threads have been</code>
<code>launched\n",threads.length);</code>
13.使用join()方法等待線程完成。
<code>for</code> <code>(</code><code>int</code> <code>i=</code><code>0</code><code>; i&lt;threads.length; i++) {</code>
<code>try</code> <code>{</code>
<code>threads[i].join();</code>
<code>} </code><code>catch</code> <code>(interruptedexception e) {</code>
<code>e.printstacktrace();</code>
14.将清單的元素數量列印到控制台。
<code>system.out.printf(</code><code>"main: size of the list: %d\n"</code><code>,list.size());</code>
15.建立100個polltask對象及其對應的運作線程。将每個線程存放到上一步建立的數組中,然後啟動線程。
<code>for</code> <code>(</code><code>int</code> <code>i=</code><code>0</code><code>; i&lt; threads.length; i++){</code>
<code>polltask task=</code><code>new</code> <code>polltask(list);</code>
<code>system.out.printf("main: %d polltask threads have been</code>
16.使用join()方法等待線程完成。
17.将清單的元素數量列印到控制台。
<code>system.out.printf(</code><code>"main: size of the list: %d\n"</code><code>,list.size());</code>
本節使用的泛型參數是string類的concurrentlinkeddeque對象,用來實作一個非阻塞式并發資料清單。下面的截屏顯示了程式的運作結果。
首先,執行100個addtask任務将元素添加到concurrentlinkeddeque對象list中。每個任務使用add()方法向這個清單中插入10,000個元素。add()方法将新元素添加到清單尾部。當所有任務運作完畢,清單中的元素數量将被列印到控制台。在這一刻,清單中有1,000,000個元素。
接下來,執行100個polltask任務将元素從清單中移除。每個任務使用pollfirst()和polllast()方法從清單中移除10,000個元素。pollfirst()方法傳回并移除清單中的第一個元素,polllast()方法傳回并移除清單中的最後一個元素。如果清單為空,這些方法傳回null。當所有任務運作完畢,清單中的元素數量将被列印到控制台。在這一刻,清單中有0個元素。
使用size()方法輸出清單中的元素數量。需要注意的是,這個方法傳回的值可能不是真實的,尤其當有線程在添加資料或移除資料時,這個方法需要周遊整個清單來計算元素數量,而周遊過的資料可能已經改變。僅當沒有任何線程修改清單時,才能保證傳回的結果是準确的。
concurrentlinkeddeque類提供了其他從清單中讀取資料的方法。
getfirst()和getlast():分别傳回清單中第一個和最後一個元素,傳回的元素不會從清單中移除。如果清單為空,這兩個方法抛出nosuchelementexcpetion異常。
peek()、peekfirst()和peeklast():分别傳回清單中第一個和最後一個元素,傳回的元素不會從清單中移除。如果清單為空,這些方法傳回null。
remove()、removefirst()和removelast():分别傳回清單中第一個和最後一個元素,傳回的元素将會從清單中移除。如果清單為空,這些方法抛出nosuchelementexcpetion異常。
最基本的集合類型是清單。一個清單包含的元素數量不定,可以在任何位置添加、讀取或移除元素。并發清單允許不同的線程在同一時間添加或移除清單中的元素,而不會造成資料不一緻。
在本節,你會學到如何在并發程式中使用阻塞式清單。阻塞式清單與非阻塞式清單的主要差别是:阻塞式清單在插入和删除操作時,如果清單已滿或為空,操作不會被立即執行,而是将調用這個操作的線程阻塞隊列直到操作可以執行成功。java引入了linkedblocking deque類來實作阻塞式清單。
1.建立名為client的類,并實作runnable接口。
<code>public</code> <code>class</code> <code>client </code><code>implements</code> <code>runnable{</code>
2.聲明一個私有的linkedblockingdeque屬性requestlist,并指定它的泛型參數是string型的。
<code>private</code> <code>linkedblockingdeque requestlist;</code>
<code>public</code> <code>client (linkedblockingdeque requestlist) {</code>
<code>this</code><code>.requestlist=requestlist;</code>
4.實作run()方法。使用requestlist對象的put()方法,每兩秒向清單requestlist中插入5個字元串。重複3次。
<code>01</code>
<code>02</code>
<code>03</code>
<code>for</code> <code>(</code><code>int</code> <code>i=</code><code>0</code><code>; i&lt;</code><code>3</code><code>; i++) {</code>
<code>04</code>
<code>for</code> <code>(</code><code>int</code> <code>j=</code><code>0</code><code>; j&lt;</code><code>5</code><code>; j++) {</code>
<code>05</code>
<code>stringbuilder request=</code><code>new</code> <code>stringbuilder();</code>
<code>06</code>
<code>request.append(i);</code>
<code>07</code>
<code>request.append(</code><code>":"</code><code>);</code>
<code>08</code>
<code>request.append(j);</code>
<code>09</code>
<code>10</code>
<code>requestlist.put(request.tostring());</code>
<code>11</code>
<code>12</code>
<code>13</code>
<code>14</code>
<code>system.out.printf(</code><code>"client: %s at %s.\n"</code><code>,request,</code><code>new</code>
<code>15</code>
<code>date());</code>
<code>16</code>
<code>17</code>
<code>18</code>
<code>timeunit.seconds.sleep(</code><code>2</code><code>);</code>
<code>19</code>
<code>20</code>
<code>21</code>
<code>22</code>
<code>23</code>
<code>system.out.printf(</code><code>"client: end.\n"</code><code>);</code>
<code>24</code>
5. 建立範例的主類main,并添加main()方法。
<code>public</code> <code>static</code> <code>void</code> <code>main(string[] args) </code><code>throws</code> <code>exception {</code>
6. 聲明并建立linkedblockingdeque屬性list,并指定它的泛型參數是string型的。
<code>linkedblockingdeque list=</code><code>new</code> <code>linkedblockingdeque&lt;&gt;(</code><code>3</code><code>);</code>
7.将client作為傳入參數建立線程thread并啟動。
<code>client client=</code><code>new</code> <code>client(list);</code>
<code>thread thread=</code><code>new</code> <code>thread(client);</code>
<code>thread.start();</code>
8.使用list對象的take()方法,每300毫秒從清單中取出3個字元串對象,重複5次。在控制台輸出字元串。
<code>for</code> <code>(</code><code>int</code> <code>i=</code><code>0</code><code>; i </code><code>for</code> <code>(</code><code>int</code> <code>j=</code><code>0</code><code>; j&lt;</code><code>3</code><code>; j++) {</code>
<code>string request=list.take();</code>
<code>system.out.printf("main: request: %s at %s. size:</code>
<code>%d\n",request,</code><code>new</code> <code>date(),list.size());</code>
<code>timeunit.milliseconds.sleep(</code><code>300</code><code>);</code>
9.輸出一條表示程式結束的消息。
<code>system.out.printf(</code><code>"main: end of the program.\n"</code><code>);</code>
本節使用的泛型參數是string的linkedblockingdeque對象,用來實作一個阻塞式并發資料清單。
client類使用put()方法将字元串插入到清單中。如果清單已滿(清單生成時指定了固定的容量),調用這個方法的線程将被阻塞直到清單中有了可用的空間。
main類使用take()方法從清單中取字元串。如果清單為空,調用這個方法的線程将被阻塞直到清單不為空(即有可用的元素)。
這個例子中使用了linkedblockingdeque對象的兩個方法,調用它們的線程可能會被阻塞,在阻塞時如果線程被中斷,方法會抛出interruptedexception異常,是以必須捕獲和處理這個異常。
linkedblockingdeque類也提供了其他存取元素的方法,這些方法不會引起阻塞,而是抛出異常或傳回null。
takefirst()和takelast():分别傳回清單中第一個和最後一個元素,傳回的元素會從清單中移除。如果清單為空,調用方法的線程将被阻塞直到清單中有可用的元素出現。
getfirst()和getlast():分别傳回清單中第一個和最後一個元素,傳回的元素不會從清單中移除。如果清單為空,則抛出nosuchelementexcpetinon異常。
peek()、peekfirst()和peeklast():分别傳回清單中第一個和最後一個元素,傳回的元素不會從清單中移除。如果清單為空,傳回null。
poll()、pollfirst()和polllast():分别傳回清單中第一個和最後一個元素,傳回的元素将會從清單中移除。如果清單為空,傳回null。
add()、addfirst()和addlast(): 分别将元素添加到清單中第一位和最後一位。如果清單已滿(清單生成時指定了固定的容量),這些方法将抛出illegalstateexception異常。