天天看点

Java并发编程高级内容介绍

计数器:CountDownLatch

CountDownLatch类似于一个计数器,和Atomic类比较相近,操作是原子的,即多个线程同时只能有一个可以去操作。CountDownLatch对象设置一个初始的数字作为计数值,任何调用这个对象上的await()方法都会阻塞,直到这个计数器的计数值被其他的线程调用countDown()减为0为止。典型的应用场景就是:有一个任务想要往下执行,但必须要等到其他的任务执行完毕后才可以继续往下执行。例如在Zookeeper的使用过程中,由于客户端与服务器建立连接是异步调用的,因此主线程需要await()阻塞直至异步回调countDown()完成。

代码示例:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

<code>public</code> <code>class</code> <code>CountDownLatchTest {</code>

<code>    </code><code>public</code> <code>static</code> <code>void</code> <code>main(String[] args) {</code>

<code>        </code><code>final</code> <code>CountDownLatch countDownLatch = </code><code>new</code> <code>CountDownLatch(</code><code>2</code><code>);</code>

<code>        </code><code>Thread work1 = </code><code>new</code> <code>Thread(</code><code>new</code> <code>Runnable() {</code>

<code>            </code><code>@Override</code>

<code>            </code><code>public</code> <code>void</code> <code>run() {</code>

<code>                </code><code>System.out.println(Thread.currentThread() + </code><code>" doing work...start"</code><code>);</code>

<code>                </code><code>try</code> <code>{</code>

<code>                    </code><code>Thread.sleep(</code><code>200</code><code>);</code>

<code>                </code><code>} </code><code>catch</code> <code>(InterruptedException e) {</code>

<code>                    </code><code>e.printStackTrace();</code>

<code>                </code><code>}</code>

<code>                </code><code>System.out.println(Thread.currentThread() + </code><code>" doing work...end "</code><code>);</code>

<code>                </code><code>countDownLatch.countDown();</code>

<code>            </code><code>}</code>

<code>        </code><code>},</code><code>"work1"</code><code>);</code>

<code>        </code><code>Thread work2 = </code><code>new</code> <code>Thread(</code><code>new</code> <code>Runnable() {</code>

<code>                </code><code>System.out.println(Thread.currentThread().getName() + </code><code>" doing work...start"</code><code>);</code>

<code>                </code><code>System.out.println(Thread.currentThread().getName() + </code><code>" doing work...end "</code><code>);</code>

<code>        </code><code>},</code><code>"work2"</code><code>);</code>

<code>        </code><code>work1.start();</code>

<code>        </code><code>work2.start();</code>

<code>        </code><code>try</code> <code>{</code>

<code>            </code><code>countDownLatch.await();</code>

<code>            </code><code>System.out.println(</code><code>"all workers finish "</code><code>);</code>

<code>        </code><code>} </code><code>catch</code> <code>(InterruptedException e) {</code>

<code>            </code><code>e.printStackTrace();</code>

<code>        </code><code>}</code>

<code>    </code><code>}</code>

<code>}</code>

齐步走:CyclicBarrier

Barrier的意思是栅栏,就是让一组线程相互等待,直至所有线程都到齐了,那么就可以齐步走。Cyclic是循环的意思,就是说Barrier可以循环使用。CyclicBarrier主要的方法就是await(),较CountDownLatch的await()虽然都是阻塞,但是CyclicBarrier.await()有返回值int,即当前线程是第几个到达这个Barrier的线程。

构造CyclicBarrier时指定计数值,await() 方法每被调用一次,计数便会减少1,并阻塞住当前线程。当计数减至0时,阻塞解除,所有在此 CyclicBarrier 上面阻塞的线程开始运行。在这之后,如果再次调用 await() 方法,计数就又会变成 N-1,新一轮重新开始。在构造方法上还可以传递一个Runnable对象,阻塞解除时这个Runnable会得到运行。

CyclicBarrier有点“不见不散”的味道,想一想,如果某个成员因某种原因来不了Barrier这个地方,那么我们一直等待吗?实际中,如果来不了理应通知其他成员,别等了,回家吧!注意到CyclicBarrier.await()独有的BrokenBarrierException异常

<a href="http://s3.51cto.com/wyfs02/M02/8B/AB/wKioL1hU62jTIP18AAAgAV0NFVo139.png" target="_blank"></a>

<code>public</code> <code>class</code> <code>CyclicBarrierTest {</code>

<code>        </code><code>final</code> <code>CyclicBarrier cyclicBarrier = </code><code>new</code> <code>CyclicBarrier(</code><code>2</code><code>, </code><code>new</code> <code>Runnable() {</code>

<code>                </code><code>System.out.println(</code><code>"都准备好啦!"</code><code>);</code>

<code>        </code><code>});</code>

<code>        </code><code>Thread runman1 = </code><code>new</code> <code>Thread(</code><code>new</code> <code>Runnable() {</code>

<code>                    </code><code>cyclicBarrier.await();</code>

<code>                    </code><code>System.out.println(Thread.currentThread().getName() + </code><code>"i am ok"</code><code>);</code>

<code>                </code><code>} </code><code>catch</code> <code>(BrokenBarrierException e) {</code>

<code>        </code><code>},</code><code>"runman1"</code><code>);</code>

<code>        </code><code>Thread runman2 = </code><code>new</code> <code>Thread(</code><code>new</code> <code>Runnable() {</code>

<code>        </code><code>},</code><code>"runman2"</code><code>);</code>

<code>        </code><code>runman1.start();</code>

<code>        </code><code>runman2.start();</code>

Callable And Future

<code>public</code> <code>class</code> <code>FutureTest {</code>

<code>        </code><code>FutureTask&lt;String&gt; futureTask = </code><code>new</code> <code>FutureTask&lt;String&gt;(</code><code>new</code> <code>Callable&lt;String&gt;() {</code>

<code>            </code><code>public</code> <code>String call() </code><code>throws</code> <code>Exception {</code>

<code>                </code><code>Thread.sleep(</code><code>2000</code><code>);</code>

<code>                </code><code>return</code> <code>"ok"</code><code>;</code>

<code>        </code><code>ExecutorService es = Executors.newFixedThreadPool(</code><code>1</code><code>);</code>

<code>        </code><code>es.submit(futureTask);</code>

<code>        </code><code>System.out.println(</code><code>"开启线程去异步处理,主线程继续往下执行!"</code><code>);</code>

<code>            </code><code>System.out.println(</code><code>"取得异步处理结果:"</code> <code>+ futureTask.get());</code>

<code>        </code><code>} </code><code>catch</code> <code>(ExecutionException e) {</code>

注意到线程池执行任务,可以利用2个方法:

<a href="http://s3.51cto.com/wyfs02/M01/8B/B2/wKiom1hVO5KTzT65AAAF84kvGAc039.png" target="_blank"></a>

<a href="http://s3.51cto.com/wyfs02/M00/8B/B3/wKiom1hVPBaj2Sh-AAAGyfavKEw459.png" target="_blank"></a>

<a href="http://s5.51cto.com/wyfs02/M00/8B/B3/wKiom1hVO-jTO8zFAAAGw9T46oo296.png" target="_blank"></a>

submit和execute有什么区别呢?从入参和结果类型就知道了。

信号量:Semaphore

Semaphore实现的功能就类似厕所有5个坑,假如有10个人要上厕所,那么显然同时只能有5个人占用厕所,当5个人中的任何一个人让开后,其中等待的另外5个人中又有一个人可以占用了。另外等待的5个人中可以是随机获得优先机会,也可以是按照先来后到的顺序获得机会,这取决于构造Semaphore对象时传入的fair参数选项。

Semaphore可以控制某个资源可被同时访问的个数(构造方法传入),通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。

<code>public</code> <code>static</code> <code>void</code> <code>main(String[] args) {</code>

<code>    </code><code>final</code> <code>Semaphore semaphore = </code><code>new</code> <code>Semaphore(</code><code>5</code><code>);</code>

<code>    </code><code>for</code><code>(</code><code>int</code> <code>i = </code><code>0</code> <code>; i &lt; </code><code>6</code> <code>; i++){</code>

<code>        </code><code>new</code> <code>Thread(</code><code>new</code> <code>Runnable() {</code>

<code>                    </code><code>semaphore.acquire();</code>

<code>                    </code><code>System.out.println(Thread.currentThread().getName() + </code><code>" 运行..."</code><code>);</code>

<code>                    </code><code>Thread.sleep(</code><code>1000</code><code>);</code>

<code>                    </code><code>semaphore.release();</code>

<code>                    </code><code>System.out.println(Thread.currentThread().getName() + </code><code>" 结束..."</code><code>);</code>

<code>        </code><code>},String.valueOf(i)).start();</code>

Condition

JDK由原始的synchronized发展到Lock,以类的方式提供锁机制,发展出重入锁、读写锁,以类的形式存在自然功能更加强大灵活,比如可以tryLock进行锁的嗅探。在synchronized代码块中我们可以使用wait/notify/notifyAll来进行线程的协同工作,那么JDK也发展了这一块,即Condition。Condition.await类似于wait,Condition.signal/signalAll类似于notify/nofityAll。下面我们简单实现一个Condition版的生产者/消费者。

处理核心:Handler

58

59

60

<code>public</code> <code>class</code> <code>Handler {</code>

<code>    </code><code>//容器</code>

<code>    </code><code>private</code> <code>LinkedList&lt;String&gt; linkedList = </code><code>new</code> <code>LinkedList&lt;String&gt;();</code>

<code>    </code><code>//限制</code>

<code>    </code><code>private</code> <code>int</code> <code>MAX_SIZE = </code><code>3</code><code>;</code>

<code>    </code><code>//锁</code>

<code>    </code><code>private</code> <code>Lock lock = </code><code>new</code> <code>ReentrantLock();</code>

<code>    </code><code>//condition  实际上,可以new多个condition,这里暂且只是用给一个</code>

<code>    </code><code>private</code> <code>Condition condition = lock.newCondition();</code>

<code>    </code><code>public</code> <code>void</code> <code>put(String bread){</code>

<code>        </code><code>try</code><code>{</code>

<code>            </code><code>lock.lock();</code>

<code>            </code><code>if</code><code>(linkedList.size() == MAX_SIZE){</code>

<code>                </code><code>System.out.println(</code><code>"容器已满"</code><code>);</code>

<code>                </code><code>condition.await();</code>

<code>            </code><code>linkedList.add(bread);</code>

<code>            </code><code>System.out.println(</code><code>"放入面包"</code> <code>+ bread);</code>

<code>            </code><code>condition.signalAll();</code>

<code>        </code><code>}</code><code>catch</code> <code>(Exception e){</code>

<code>        </code><code>}</code><code>finally</code> <code>{</code>

<code>            </code><code>lock.unlock();</code>

<code>    </code><code>public</code> <code>void</code> <code>eat(){</code>

<code>            </code><code>if</code><code>(linkedList.size() == </code><code>0</code><code>){</code>

<code>                </code><code>System.out.println(</code><code>"容器为空"</code><code>);</code>

<code>            </code><code>String bread = linkedList.removeFirst();</code>

<code>            </code><code>System.out.println(</code><code>"吃掉一个面包"</code> <code>+ bread);</code>

<code>        </code><code>}</code><code>catch</code><code>(Exception e){</code>

生产者:Produce

<code>public</code> <code>class</code> <code>Produce </code><code>implements</code> <code>Runnable{</code>

<code>    </code><code>private</code> <code>Handler handler;</code>

<code>    </code><code>public</code> <code>Produce(Handler handler) {</code>

<code>        </code><code>this</code><code>.handler = handler;</code>

<code>    </code><code>@Override</code>

<code>    </code><code>public</code> <code>void</code> <code>run() {</code>

<code>        </code><code>for</code><code>(</code><code>int</code> <code>i = </code><code>0</code> <code>; i &lt; </code><code>10</code> <code>; i++){</code>

<code>            </code><code>try</code> <code>{</code>

<code>                </code><code>Thread.sleep(</code><code>new</code> <code>Random().nextInt(</code><code>1000</code><code>));</code>

<code>            </code><code>} </code><code>catch</code> <code>(InterruptedException e) {</code>

<code>                </code><code>e.printStackTrace();</code>

<code>            </code><code>handler.put(String.valueOf(i));</code>

消费者:Consume

<code>public</code> <code>class</code> <code>Consume </code><code>implements</code> <code>Runnable{</code>

<code>    </code><code>public</code> <code>Consume(Handler handler) {</code>

<code>        </code><code>for</code> <code>(</code><code>int</code> <code>i = </code><code>0</code> <code>; i &lt; </code><code>10</code> <code>; i++){</code>

<code>            </code><code>handler.eat();</code>

Main:

<code>public</code> <code>class</code> <code>Main {</code>

<code>        </code><code>Handler handler = </code><code>new</code> <code>Handler();</code>

<code>        </code><code>Produce produce = </code><code>new</code> <code>Produce(handler);</code>

<code>        </code><code>Consume consume = </code><code>new</code> <code>Consume(handler);</code>

<code>        </code><code>new</code> <code>Thread(consume).start();</code>

<code>        </code><code>new</code> <code>Thread(produce).start();</code>

本文转自zfz_linux_boy 51CTO博客,原文链接:http://blog.51cto.com/zhangfengzhe/1883655,如需转载请自行联系原作者