在实际并发编程中,可以利用synchronized来同步线程对于共享对象的访问,用户需要显示的定义synchronized代码块或者方法。为了加快开发,可以使用Java平台一些并发基础模块来开发。
一 同步容器类
同步容器类主要包括Vector和Hashtable,都是通过Collections.synchronizedXxx工厂方法创建的。这些类都是线程安全的。一些修改删除添加方法都利用了synchronized来进行同步。将这些容器类的状态封装,然后对于其所有的public方法都进行同步,这样就保证了每次只能允许一个线程访问这些容器类。对于每个public方法都保证了原子性操作。
当对于同步容器类进行多个synchronized方法组成的复合操作,为了保证正确性,必须要确保这些复合操作为一个原子操作。
对于这些同步容器里进行迭代处理中,需要注意多个线程访问的问题。当利用for来迭代处理,容器的大小可能会在迭代中修改,则就会抛出ArrayIndexOutOfBoundsException异常。而在利用迭代器foreach运用interator过程中,可能会抛出ConcurrentModificationException。所以在对于这种同步容器类迭代过程中一定要进行加锁处理。
在对同步容器类调用toString,hashCode,equals,containsAll,removeAll,retainAll以及容器作为参数传递都会对容器进行迭代操作。
二 并发容器类
由于同步容器类,只允许同时一个外部线程访问该集合,则降低了它的并发能力。就引出了并发容器类。并发容器类主要就是针对多线程访问设计的。
常见的有ConcurrentHashMap以代替同步且基于散列的Map。CopyOnWriteArrayList用于以迭代为主要操作来代替List。Queue和BlockingQueue,其中BlockingQueue尤其在生产者与消费者模式中作为缓冲得到了很大的运用。
ConcurrentHashMap,这是一种并发性容器类,这里并没有针对每一个方法使用同一个锁进行同步,而是在内部用一种分段锁来实现并发性操作。可以允许同时多个读线程操作,允许同时多个写线程操作,多个读线程与写线程同时操作。迭代过程不需要加锁,但是在迭代过程可能容量大小会发生变化。这种最重要的是用于针对get,put,containsKey和remove等操作频繁的多线程中。
它增加了几个复合型的原子性操作,从而可以直接使用不用加锁。
CopyOnWriteArrayList,这是一种并发性容器类,迭代过程不需要加锁。它是“写入时复制”的容器,当在每次修改时候,都会复制底层数组,创建并发布一个新的容器副本。这种最重要用于需要频繁的进行迭代操作,且迭代操作远远大于修改操作的时候,多个线程可以同时对这个容器进行迭代操作,而不会彼此干扰且与修改容器的线程不相干。
Queue,是一种用来保存临时的数据,包括ConcurrentLinkedQueue,Queue上的操作不会阻塞,如果队列为空,会立即返回。
BlockingQueue,它主要用于并发操作方面。这是一种基于阻塞的队列,当从队列中获取元素时候,如果队列为空则等待,当向队列插入元素时候,如果队列满了则等待。它利用put和take来获取对象,这种操作都是在内部加锁机制实现的。常见的几个插入与移除对象操作:
put,将object放入队列中,如果无空间,则一直等待到有空间,会阻塞调用该方法的线程。
offer,如果可以放入object,则返回true,否则false。一会阻塞调用该方法的线程。
poll,获取首位对象,若立即得不到,可以等待一定时间后再返回值或者null。
take,获取首位对象,若队列为空,则一直等待有元素添加,会阻塞调用该方法的线程。
常见的子类包括ArrayBlockingQueue和LinkedBlockingQueue,分别用于替代LinkedList和ArrayList,提高并发性能。SynchronousQueue,这是一种同步移除与添加的队列,每个插入操作必须等待另一个线程的对应移除操作。同步队列没有任何内部容量,甚至连一个队列的容量都没有。
利用ArrayBlockingQueue可以很容易实现生产者与消费者模式,不用进行额外的同步处理,因为它内部都已经实现了同步处理,并且进行了并发性能的提高。代码示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
<code>package</code> <code>whut.producer;</code>
<code>import</code> <code>java.util.Random;</code>
<code>import</code> <code>java.util.concurrent.ArrayBlockingQueue;</code>
<code>import</code> <code>java.util.concurrent.BlockingQueue;</code>
<code>//利用BlockingQueue实现消费者与生产者</code>
<code>class</code> <code>Producer </code><code>implements</code> <code>Runnable {</code>
<code> </code><code>private</code> <code>final</code> <code>BlockingQueue queue;</code>
<code> </code><code>public</code> <code>Producer(BlockingQueue q) {</code>
<code> </code><code>queue = q;</code>
<code> </code><code>}</code>
<code> </code><code>public</code> <code>void</code> <code>run() {</code>
<code> </code><code>try</code> <code>{</code>
<code> </code><code>int</code> <code>i=</code><code>0</code><code>;</code>
<code> </code><code>while</code> <code>(i<</code><code>10</code><code>)</code>
<code> </code><code>{</code>
<code> </code><code>queue.put(i);</code>
<code> </code><code>i++;</code>
<code> </code><code>}</code>
<code> </code><code>} </code><code>catch</code> <code>(InterruptedException ex) {</code>
<code> </code><code>}</code>
<code> </code><code>private</code> <code>Object produce()</code>
<code> </code><code>{</code>
<code> </code><code>Random rd=</code><code>new</code> <code>Random();</code>
<code> </code><code>int</code> <code>res=rd.nextInt(</code><code>10</code><code>);</code>
<code> </code><code>return</code> <code>res;</code>
<code>}</code>
<code>class</code> <code>Consumer </code><code>extends</code> <code>Thread {</code>
<code> </code><code>public</code> <code>Consumer(String name,BlockingQueue q) {</code>
<code> </code><code>super</code><code>(name);</code>
<code> </code><code>while</code> <code>(</code><code>true</code><code>) {</code>
<code> </code><code>consume(queue.take());</code>
<code> </code><code>private</code> <code>void</code> <code>consume(Object x) {</code>
<code> </code><code>System.out.println(Thread.currentThread().getName()+</code><code>" = "</code><code>+x);</code>
<code>class</code> <code>BlockingQueueDemo {</code>
<code> </code><code>public</code> <code>static</code> <code>void</code> <code>main(String[] args) {</code>
<code> </code><code>BlockingQueue<Integer> q = </code><code>new</code> <code>ArrayBlockingQueue<Integer>(</code><code>10</code><code>);</code>
<code> </code><code>Producer p = </code><code>new</code> <code>Producer(q);</code>
<code> </code><code>Consumer c1 = </code><code>new</code> <code>Consumer(</code><code>"Apple"</code><code>,q);</code>
<code> </code><code>Consumer c2 = </code><code>new</code> <code>Consumer(</code><code>"Hawk"</code><code>,q);</code>
<code> </code><code>new</code> <code>Thread(p).start();</code>
<code> </code><code>c1.start();</code>
<code> </code><code>c2.start();</code>
三 同步工具类
常见使用的同步工具类有信号量(Semaphore),栅栏(Barrier),闭锁(Latch).
闭锁(Latch),可以延迟线程进度直到闭锁到达最终状态。闭锁主要用来确保某些活动直到其他活动都执行完毕后才能继续执行,这里执行的形如递减操作,初始化等待活动的数目,当递减到0,则该活动才得以继续执行。
CountDownLatch是一种灵活的闭锁实现,有个状态为计数器,利用构造器设置。通过countDown方法递减计数器,表示一个活动已经执行完毕,而await方法等待计数器为0,如果不为0则阻塞等待,或者线程被中断,或者等待超时。这两个方法必须成对使用。
CountDownLatch的简单实例:
<code>package</code> <code>whut.concurrentmodel;</code>
<code>import</code> <code>java.util.concurrent.CountDownLatch;</code>
<code>//利用闭锁来实现,闭锁可以用于线程之间的协作,</code>
<code>//即一个线程必须等待其余所有活动完后执行</code>
<code>public</code> <code>class</code> <code>CountDownLatchClient {</code>
<code> </code><code>public</code> <code>void</code> <code>timeTasks(</code><code>int</code> <code>nThreads, </code><code>final</code> <code>Runnable task)</code>
<code> </code><code>throws</code> <code>InterruptedException {</code>
<code> </code><code>// 工作线程等待其他活动执行完毕的{闭锁}</code>
<code> </code><code>final</code> <code>CountDownLatch startGate = </code><code>new</code> <code>CountDownLatch(</code><code>1</code><code>);</code>
<code> </code><code>// 主线程等待所有工作线程执行完毕的{闭锁}</code>
<code> </code><code>final</code> <code>CountDownLatch endGate = </code><code>new</code> <code>CountDownLatch(nThreads);</code>
<code> </code><code>for</code> <code>(</code><code>int</code> <code>i = </code><code>0</code><code>; i < nThreads; i++) {</code>
<code> </code><code>Thread t = </code><code>new</code> <code>Thread() {</code>
<code> </code><code>public</code> <code>void</code> <code>run() {</code>
<code> </code><code>try</code> <code>{</code>
<code> </code><code>// 工作线程先等待其他活动执行完毕</code>
<code> </code><code>startGate.await();</code>
<code> </code><code>try</code> <code>{</code>
<code> </code><code>task.run();</code>
<code> </code><code>} </code><code>finally</code> <code>{</code>
<code> </code><code>System.out.println(Thread.currentThread().getName()</code>
<code> </code><code>+ </code><code>" work finished..."</code><code>);</code>
<code> </code><code>// 工作线程执行完毕后,递减闭锁值</code>
<code> </code><code>endGate.countDown();</code>
<code> </code><code>}</code>
<code> </code><code>} </code><code>catch</code> <code>(InterruptedException ie) {</code>
<code> </code><code>}</code>
<code> </code><code>}</code>
<code> </code><code>};</code>
<code> </code><code>t.start();</code>
<code> </code><code>// 这里具体是任务,不过直接模拟了活动执行完毕了</code>
<code> </code><code>startGate.countDown();</code>
<code> </code><code>// 主线程先等待工作线程执行到0</code>
<code> </code><code>endGate.await();</code>
<code> </code><code>System.out.println(</code><code>"All workthread have finished..."</code><code>);</code>
<code> </code><code>// 主线程</code>
<code> </code><code>// TODO Auto-generated method stub</code>
<code> </code><code>Runnable task = </code><code>new</code> <code>Runnable() {</code>
<code> </code><code>public</code> <code>void</code> <code>run() {</code>
<code> </code><code>int</code> <code>i = </code><code>0</code><code>;</code>
<code> </code><code>while</code> <code>(i < </code><code>100</code><code>) {</code>
<code> </code><code>i++;</code>
<code> </code><code>};</code>
<code> </code><code>CountDownLatchClient cdl = </code><code>new</code> <code>CountDownLatchClient();</code>
<code> </code><code>cdl.timeTasks(</code><code>10</code><code>, task);</code>
<code> </code><code>} </code><code>catch</code> <code>(InterruptedException e) {</code>
<code> </code><code>System.out.println(</code><code>"do another thing ...."</code><code>);</code>
栅栏(Barrier),可以阻塞一组线程直到某个事件发生。它和闭锁一样。闭锁用于等待其他活动,栅栏用于等待其他线程。这里执行的形如递增操作,初始化等待线程的数目,当递增到目标线程数目时候,则该线程才得以继续执行。
CyclicBarrier,是一种常用的栅栏类,可以使得一定数量的参与者反复在栅栏处汇集。通常用于并行迭代计算中。这种将一个复杂的大问题,分解成多个子问题,为每一个子问题创建线程来处理。当线程到达栅栏位置时候将调用await,这个将一直阻塞,直到所有的线程都到达了栅栏位置。在使用它的时候,可以传递一个Runnable,用于当成功通过栅栏后执行的操作或任务。一般每个线程执行的时候先利用barrier.hasCoverged判断,然后执行任务,最后利用barrier.await(),来阻塞所有都到达栅栏位置。
一般使用代码如下:
<code>import</code> <code>java.util.concurrent.BrokenBarrierException;</code>
<code>import</code> <code>java.util.concurrent.CyclicBarrier;</code>
<code>//栅栏实例</code>
<code>public</code> <code>class</code> <code>BarrierClient {</code>
<code> </code><code>BarrierClient bc=</code><code>new</code> <code>BarrierClient();</code>
<code> </code><code>//获取可以同时并行处理的数目</code>
<code> </code><code>int</code> <code>count=Runtime.getRuntime().availableProcessors();</code>
<code> </code><code>CyclicBarrier barrier=</code><code>new</code> <code>CyclicBarrier(count);</code>
<code> </code><code>for</code><code>(</code><code>int</code> <code>i=</code><code>0</code><code>;i<count;i++)</code>
<code> </code><code>{</code>
<code> </code><code>Worker work=bc.</code><code>new</code> <code>Worker(barrier);</code>
<code> </code><code>new</code> <code>Thread(work).start();</code>
<code> </code><code>}</code>
<code> </code>
<code> </code><code>private</code> <code>class</code> <code>Worker </code><code>implements</code> <code>Runnable</code>
<code> </code><code>private</code> <code>final</code> <code>CyclicBarrier bar;</code>
<code> </code><code>public</code> <code>Worker(CyclicBarrier bar)</code>
<code> </code><code>{</code>
<code> </code><code>this</code><code>.bar=bar;</code>
<code> </code><code>public</code> <code>void</code> <code>run()</code>
<code> </code><code>//dosome work</code>
<code> </code><code>//...........</code>
<code> </code><code>try</code><code>{</code>
<code> </code><code>bar.await();</code>
<code> </code><code>}</code><code>catch</code><code>(InterruptedException e)</code>
<code> </code><code>}</code><code>catch</code><code>(BrokenBarrierException e)</code>
<code> </code>
栅栏与闭锁,可以使得任务线程同时开始同时结束,利用栅栏也可以实现与闭锁一样的效果。在并发测试中很有用
信号量(Semaphore),主要是用来控制同时访问某个特定资源的操作数目,或者执行某个指定操作的数目。可以用来实现资源池,对容器施加边界。Semaphore管理者一组虚拟许可,许可的数目由构造器指定。执行操作必须先获取许可,使用完毕后释放许可。如果没有许可,则acquire一直阻塞直到有许可。release方法将返回一个许可信号量。当初始值为1,则可以用作互斥体。
代码示例如下:
<code>import</code> <code>java.util.Collections;</code>
<code>import</code> <code>java.util.HashSet;</code>
<code>import</code> <code>java.util.Set;</code>
<code>import</code> <code>java.util.concurrent.Semaphore;</code>
<code>//利用Semaphore来实现集合的边界处理</code>
<code>public</code> <code>class</code> <code>SemaphoreTest<T> {</code>
<code> </code><code>private</code> <code>final</code> <code>Set<T> set;</code>
<code> </code><code>private</code> <code>final</code> <code>Semaphore sem;</code>
<code> </code>
<code> </code><code>public</code> <code>SemaphoreTest(</code><code>int</code> <code>bound)</code>
<code> </code><code>this</code><code>.set=Collections.synchronizedSet(</code><code>new</code> <code>HashSet<T>());</code><code>//同步处理</code>
<code> </code><code>//设置Semaphore的大小,用于设置set的边界,控制同时多少个访问</code>
<code> </code><code>sem=</code><code>new</code> <code>Semaphore(bound);</code>
<code> </code><code>//add操作成功则会返回true,否则返回false</code>
<code> </code><code>public</code> <code>boolean</code> <code>add(T o)</code><code>throws</code> <code>InterruptedException</code>
<code> </code><code>sem.acquire();</code><code>//获取信号量</code>
<code> </code><code>boolean</code> <code>wasAdded=</code><code>false</code><code>;</code>
<code> </code><code>try</code><code>{</code>
<code> </code><code>wasAdded=set.add(o);</code><code>//同步访问这些方法</code>
<code> </code><code>return</code> <code>wasAdded;</code>
<code> </code><code>}</code><code>finally</code><code>{</code>
<code> </code><code>if</code><code>(!wasAdded)</code>
<code> </code><code>sem.release();</code><code>//释放信号量,如果没有添加成功</code>
<code> </code><code>public</code> <code>boolean</code> <code>remove(Object o)</code>
<code> </code><code>boolean</code> <code>wasRemoved=set.remove(o);</code><code>//成功移除返回true</code>
<code> </code><code>if</code><code>(wasRemoved)</code>
<code> </code><code>sem.release();</code><code>//释放信号量</code>
<code> </code><code>return</code> <code>wasRemoved;</code>
FutureTask,这是一种可以获取长时间执行任务的快照,可以执行任何返回该对象,从而继续做其他工作,当需要获取任务的执行结果的时候,再利用Future.get获取任务的处理结果。如果任务已经完成,则get会立即返回,否则则会阻塞直到任务进入完成状态,然后返回结果或者抛出异常。FutureTask将计算结果从执行计算的线程传递到了获取这个结果的线程。
FutureTask可以包装Callable和Runnable作为其构造器参数,同时FutureTask是实现了Runnable接口,故可以作为Executor.execute的参数传递。
使用FutureTask的方式有两种,一种是将其作为Thread的构造器参数或者execute()的参数在新的线程中执行。一种是直接运行其run方法,在主线程中串行运行。
代码示例:
<code>package</code> <code>whut.future;</code>
<code>import</code> <code>java.util.concurrent.Callable;</code>
<code>import</code> <code>java.util.concurrent.ExecutionException;</code>
<code>import</code> <code>java.util.concurrent.FutureTask;</code>
<code>//利用FutureTask来实现future设计模式</code>
<code>public</code> <code>class</code> <code>FutureTaskDemo {</code>
<code> </code><code>MyCallale mc = </code><code>new</code> <code>MyCallale();</code>
<code> </code><code>FutureTask<String> myfuture = </code><code>new</code> <code>FutureTask<String>(mc);</code>
<code> </code><code>new</code> <code>Thread(myfuture).start();</code>
<code> </code><code>System.out.println(</code><code>"operate other thing"</code><code>);</code>
<code> </code><code>System.out.println(</code><code>"data1="</code> <code>+ myfuture.get());</code>
<code> </code><code>} </code><code>catch</code> <code>(ExecutionException e) {</code>
<code>class</code> <code>MyCallale </code><code>implements</code> <code>Callable<String> {</code>
<code> </code><code>@Override</code>
<code> </code><code>public</code> <code>String call() </code><code>throws</code> <code>Exception {</code>
<code> </code><code>int</code> <code>i = </code><code>0</code><code>;</code>
<code> </code><code>Random r = </code><code>new</code> <code>Random();</code>
<code> </code><code>StringBuilder sb = </code><code>new</code> <code>StringBuilder();</code>
<code> </code><code>int</code> <code>res = </code><code>0</code><code>;</code>
<code> </code><code>while</code> <code>(i < </code><code>100000000</code><code>) {</code>
<code> </code><code>i++;</code>
<code> </code><code>res = r.nextInt(i);</code>
<code> </code><code>sb.append(res);</code>
<code> </code><code>return</code> <code>sb.toString();</code>
四 阻塞对象池的几种方式
在实现阻塞对象池中,可以自定义同步,有数组方式和LinkedList,由于这些都不是线程安全的,所以需要显示的进行synchronized同步处理。但是都是串行化访问的,不利于并行处理。
还可以利用基础构建模块,利用BlockingQueue和Semaphore来实现,这些内部都实现了加锁机制,更便于并发与同步的效率。
本文转自 zhao_xiao_long 51CTO博客,原文链接:http://blog.51cto.com/computerdragon/1211712