本章将介绍下列内容:
使用非阻塞式线程安全列表
使用阻塞式线程安全列表
使用按优先级排序的阻塞式线程安全列表
使用带有延迟元素的线程安全列表
使用线程安全可遍历映射
生成并发随机数
使用原子变量
使用原子数组
数据结构(data structure)是编程中的基本元素,几乎每个程序都使用一种或多种数据结构来存储和管理数据。java api提供了包含接口、类和算法的java集合框架(java collection framework),它实现了可用在程序中的大量数据结构。
当需要在并发程序中使用数据集合时,必须要谨慎地选择相应的实现方式。大多数集合类不能直接用于并发应用,因为它们没有对本身数据的并发访问进行控制。如果一些并发任务共享了一个不适用于并发任务的数据结构,将会遇到数据不一致的错误,并将影响程序的准确运行。这类数据结构的一个例子是arraylist类。
java提供了一些可以用于并发程序中的数据集合,它们不会引起任何问题。一般来说,java提供了两类适用于并发应用的集合。
阻塞式集合(blocking collection):这类集合包括添加和移除数据的方法。当集合已满或为空时,被调用的添加或者移除方法就不能立即被执行,那么调用这个方法的线程将被阻塞,一直到该方法可以被成功执行。
非阻塞式集合(non-blocking collection):这类集合也包括添加和移除数据的方法。如果方法不能立即被执行,则返回null或抛出异常,但是调用这个方法的线程不会被阻塞。
通过本章的各个小节,你将学会如何在并发应用中使用一些java集合。
非阻塞式列表对应的实现类:concurrentlinkeddeque类;
阻塞式列表对应的实现类:linkedblockingdeque 类;
用于数据生成或消费的阻塞式列表对应的实现类:linkedtransferqueue类;
按优先级排序列表元素的阻塞式列表对应的实现类:priorityblockingqueue类;
带有延迟列表元素的阻塞式列表对应的实现类:delayqueue类;
非阻塞式可遍历映射对应的实现类:concurrentskiplistmap类;
随机数字对应的实现类:threadlocalrandom类;
原子变量对应的实现类:atomiclong和atomicintegerarray类。
最基本的集合类型是列表(list)。一个列表包含的元素数量不定,可以在任何位置添加、读取或移除元素。并发列表允许不同的线程在同一时间添加或移除列表中的元素,而不会造成数据不一致。
在本节,将会学到如何在并发程序中使用非阻塞式列表。非阻塞式列表提供了一些操作,如果被执行的操作不能够立即运行(例如,在列表为空时,从列表取出一个元素),方法会抛出异常或返回null。java 7引入了concurrentlinkeddeque类来实现非阻塞式并发列表。
将要实现的范例包括以下两个不同的任务:
添加大量的数据到一个列表中;
从同一个列表中移除大量的数据。
本节的范例是在eclipse ide里完成的。无论你使用eclipse还是其他的ide(比如netbeans),都可以打开这个ide并且创建一个新的java工程。
按照接下来的步骤实现本节的范例。
1.创建一个名为addtask的类,实现runnable接口。
<code>1</code>
<code>public</code> <code>class</code> <code>addtask </code><code>implements</code> <code>runnable {</code>
2.声明一个私有的concurrentlinkeddeque属性list,并指定它的泛型参数是string型的。
<code>private</code> <code>concurrentlinkeddeque list;</code>
3.实现类的构造器来初始化属性。
<code>public</code> <code>addtask(concurrentlinkeddeque list) {</code>
<code>2</code>
<code>this</code><code>.list=list;</code>
<code>3</code>
<code>}</code>
4.实现run()方法。这个方法将10,000个字符串存放到列表中,这些字符串由当前执行任务的线程的名称和数字组成。
<code>@override</code>
<code>public</code> <code>void</code> <code>run() {</code>
<code>string name=thread.currentthread().getname();</code>
<code>4</code>
<code>for</code> <code>(</code><code>int</code> <code>i=</code><code>0</code><code>; i&lt;</code><code>10000</code><code>; i++){</code>
<code>5</code>
<code>list.add(name+</code><code>": element "</code><code>+i);</code>
<code>6</code>
<code>7</code>
5.创建名为polltask的类,并实现runnable接口。
<code>public</code> <code>class</code> <code>polltask </code><code>implements</code> <code>runnable {</code>
6.声明一个私有的concurrentlinkeddeque属性list,并指定它的泛型参数是string型的。
7.实现类的构造器来初始化属性。
<code>public</code> <code>polltask(concurrentlinkeddeque list) {</code>
8.实现run()方法。这个方法将列表中的10,000个字符串取出,总共取5,000次,每次取两个元素。
<code>for</code> <code>(</code><code>int</code> <code>i=</code><code>0</code><code>; i&lt;</code><code>5000</code><code>; i++) {</code>
<code>list.pollfirst();</code>
<code>list.polllast();</code>
9.创建范例的主类main,并添加main()方法。
<code>public</code> <code>class</code> <code>main {</code>
<code>public</code> <code>static</code> <code>void</code> <code>main(string[] args) {</code>
10.创建concurrentlinkeddeque对象,并指定它的泛型参数是string型的。
<code>concurrentlinkeddeque list=</code><code>new</code>
<code>concurrentlinkeddeque&lt;&gt;();</code>
11.创建线程数组threads,它包含100个线程。
<code>thread threads[]=</code><code>new</code> <code>thread[</code><code>100</code><code>];</code>
12.创建100个addtask对象及其对应的运行线程。将每个线程存放到上一步创建的数组中,然后启动线程。
<code>for</code> <code>(</code><code>int</code> <code>i=</code><code>0</code><code>; i addtask task=</code><code>new</code> <code>addtask(list);</code>
<code>threads[i]=</code><code>new</code> <code>thread(task);</code>
<code>threads[i].start();</code>
<code>system.out.printf("main: %d addtask threads have been</code>
<code>launched\n",threads.length);</code>
13.使用join()方法等待线程完成。
<code>for</code> <code>(</code><code>int</code> <code>i=</code><code>0</code><code>; i&lt;threads.length; i++) {</code>
<code>try</code> <code>{</code>
<code>threads[i].join();</code>
<code>} </code><code>catch</code> <code>(interruptedexception e) {</code>
<code>e.printstacktrace();</code>
14.将列表的元素数量打印到控制台。
<code>system.out.printf(</code><code>"main: size of the list: %d\n"</code><code>,list.size());</code>
15.创建100个polltask对象及其对应的运行线程。将每个线程存放到上一步创建的数组中,然后启动线程。
<code>for</code> <code>(</code><code>int</code> <code>i=</code><code>0</code><code>; i&lt; threads.length; i++){</code>
<code>polltask task=</code><code>new</code> <code>polltask(list);</code>
<code>system.out.printf("main: %d polltask threads have been</code>
16.使用join()方法等待线程完成。
17.将列表的元素数量打印到控制台。
<code>system.out.printf(</code><code>"main: size of the list: %d\n"</code><code>,list.size());</code>
本节使用的泛型参数是string类的concurrentlinkeddeque对象,用来实现一个非阻塞式并发数据列表。下面的截屏显示了程序的运行结果。
首先,执行100个addtask任务将元素添加到concurrentlinkeddeque对象list中。每个任务使用add()方法向这个列表中插入10,000个元素。add()方法将新元素添加到列表尾部。当所有任务运行完毕,列表中的元素数量将被打印到控制台。在这一刻,列表中有1,000,000个元素。
接下来,执行100个polltask任务将元素从列表中移除。每个任务使用pollfirst()和polllast()方法从列表中移除10,000个元素。pollfirst()方法返回并移除列表中的第一个元素,polllast()方法返回并移除列表中的最后一个元素。如果列表为空,这些方法返回null。当所有任务运行完毕,列表中的元素数量将被打印到控制台。在这一刻,列表中有0个元素。
使用size()方法输出列表中的元素数量。需要注意的是,这个方法返回的值可能不是真实的,尤其当有线程在添加数据或移除数据时,这个方法需要遍历整个列表来计算元素数量,而遍历过的数据可能已经改变。仅当没有任何线程修改列表时,才能保证返回的结果是准确的。
concurrentlinkeddeque类提供了其他从列表中读取数据的方法。
getfirst()和getlast():分别返回列表中第一个和最后一个元素,返回的元素不会从列表中移除。如果列表为空,这两个方法抛出nosuchelementexcpetion异常。
peek()、peekfirst()和peeklast():分别返回列表中第一个和最后一个元素,返回的元素不会从列表中移除。如果列表为空,这些方法返回null。
remove()、removefirst()和removelast():分别返回列表中第一个和最后一个元素,返回的元素将会从列表中移除。如果列表为空,这些方法抛出nosuchelementexcpetion异常。
最基本的集合类型是列表。一个列表包含的元素数量不定,可以在任何位置添加、读取或移除元素。并发列表允许不同的线程在同一时间添加或移除列表中的元素,而不会造成数据不一致。
在本节,你会学到如何在并发程序中使用阻塞式列表。阻塞式列表与非阻塞式列表的主要差别是:阻塞式列表在插入和删除操作时,如果列表已满或为空,操作不会被立即执行,而是将调用这个操作的线程阻塞队列直到操作可以执行成功。java引入了linkedblocking deque类来实现阻塞式列表。
1.创建名为client的类,并实现runnable接口。
<code>public</code> <code>class</code> <code>client </code><code>implements</code> <code>runnable{</code>
2.声明一个私有的linkedblockingdeque属性requestlist,并指定它的泛型参数是string型的。
<code>private</code> <code>linkedblockingdeque requestlist;</code>
<code>public</code> <code>client (linkedblockingdeque requestlist) {</code>
<code>this</code><code>.requestlist=requestlist;</code>
4.实现run()方法。使用requestlist对象的put()方法,每两秒向列表requestlist中插入5个字符串。重复3次。
<code>01</code>
<code>02</code>
<code>03</code>
<code>for</code> <code>(</code><code>int</code> <code>i=</code><code>0</code><code>; i&lt;</code><code>3</code><code>; i++) {</code>
<code>04</code>
<code>for</code> <code>(</code><code>int</code> <code>j=</code><code>0</code><code>; j&lt;</code><code>5</code><code>; j++) {</code>
<code>05</code>
<code>stringbuilder request=</code><code>new</code> <code>stringbuilder();</code>
<code>06</code>
<code>request.append(i);</code>
<code>07</code>
<code>request.append(</code><code>":"</code><code>);</code>
<code>08</code>
<code>request.append(j);</code>
<code>09</code>
<code>10</code>
<code>requestlist.put(request.tostring());</code>
<code>11</code>
<code>12</code>
<code>13</code>
<code>14</code>
<code>system.out.printf(</code><code>"client: %s at %s.\n"</code><code>,request,</code><code>new</code>
<code>15</code>
<code>date());</code>
<code>16</code>
<code>17</code>
<code>18</code>
<code>timeunit.seconds.sleep(</code><code>2</code><code>);</code>
<code>19</code>
<code>20</code>
<code>21</code>
<code>22</code>
<code>23</code>
<code>system.out.printf(</code><code>"client: end.\n"</code><code>);</code>
<code>24</code>
5. 创建范例的主类main,并添加main()方法。
<code>public</code> <code>static</code> <code>void</code> <code>main(string[] args) </code><code>throws</code> <code>exception {</code>
6. 声明并创建linkedblockingdeque属性list,并指定它的泛型参数是string型的。
<code>linkedblockingdeque list=</code><code>new</code> <code>linkedblockingdeque&lt;&gt;(</code><code>3</code><code>);</code>
7.将client作为传入参数创建线程thread并启动。
<code>client client=</code><code>new</code> <code>client(list);</code>
<code>thread thread=</code><code>new</code> <code>thread(client);</code>
<code>thread.start();</code>
8.使用list对象的take()方法,每300毫秒从列表中取出3个字符串对象,重复5次。在控制台输出字符串。
<code>for</code> <code>(</code><code>int</code> <code>i=</code><code>0</code><code>; i </code><code>for</code> <code>(</code><code>int</code> <code>j=</code><code>0</code><code>; j&lt;</code><code>3</code><code>; j++) {</code>
<code>string request=list.take();</code>
<code>system.out.printf("main: request: %s at %s. size:</code>
<code>%d\n",request,</code><code>new</code> <code>date(),list.size());</code>
<code>timeunit.milliseconds.sleep(</code><code>300</code><code>);</code>
9.输出一条表示程序结束的消息。
<code>system.out.printf(</code><code>"main: end of the program.\n"</code><code>);</code>
本节使用的泛型参数是string的linkedblockingdeque对象,用来实现一个阻塞式并发数据列表。
client类使用put()方法将字符串插入到列表中。如果列表已满(列表生成时指定了固定的容量),调用这个方法的线程将被阻塞直到列表中有了可用的空间。
main类使用take()方法从列表中取字符串。如果列表为空,调用这个方法的线程将被阻塞直到列表不为空(即有可用的元素)。
这个例子中使用了linkedblockingdeque对象的两个方法,调用它们的线程可能会被阻塞,在阻塞时如果线程被中断,方法会抛出interruptedexception异常,所以必须捕获和处理这个异常。
linkedblockingdeque类也提供了其他存取元素的方法,这些方法不会引起阻塞,而是抛出异常或返回null。
takefirst()和takelast():分别返回列表中第一个和最后一个元素,返回的元素会从列表中移除。如果列表为空,调用方法的线程将被阻塞直到列表中有可用的元素出现。
getfirst()和getlast():分别返回列表中第一个和最后一个元素,返回的元素不会从列表中移除。如果列表为空,则抛出nosuchelementexcpetinon异常。
peek()、peekfirst()和peeklast():分别返回列表中第一个和最后一个元素,返回的元素不会从列表中移除。如果列表为空,返回null。
poll()、pollfirst()和polllast():分别返回列表中第一个和最后一个元素,返回的元素将会从列表中移除。如果列表为空,返回null。
add()、addfirst()和addlast(): 分别将元素添加到列表中第一位和最后一位。如果列表已满(列表生成时指定了固定的容量),这些方法将抛出illegalstateexception异常。