此博客主要是在观看张孝祥老师的教学视频的过程中,自己所做的学习笔记,在此以博客的形式分享出来,方便大家学习 ,建议大家观看视频,以此笔记作为回顾资料。 参考资料 传智播客_张孝祥_Java多线程与并发库高级应用视频教程下载 视频下载
创建线程的两种方式: 1,创建Thread的子类,重写run方法 2,给Thread类传入Runnable接口
两种方式的区别: 第二种方式可以实现数据共享,而且更倾向于面向对象的编程思想。一般都是采用第二种方式。
new Thread().start(); 调用了start方法后,就会运行Thread类的run方法,如下 public void run(){ if(target!=null){ targe.run(); } } 如果target为空,就什么也不做
new Thread( new Runnable(){ public void run() { //1 } } ){ public void run() { //2 } }.start();
执行的是2run方法 执行的步骤: 先运行子类的run方法,如果子类没有重写run方法,就去运行父类的run方法,上述代码中子类重写了run方法,所以就不会运行Runnable中的run方法。
1秒后,炸一次 new Timer().schedule(new TimerTask() { @Override public void run() { System.out.println("bombing!"); } }, 1000); 每隔两秒炸一次<一方式>
new Timer().schedule(new TimerTask() { @Override public void run() { System.out.println("bombing!"); } }, 1000,2000);
每隔两秒钟炸一次 <二方式> new Timer().schedule(new MyTimerTask(), 2000);
class MyTimerTask extends TimerTask{ @Override public void run() { System.out.println("bombing!"); new Timer().schedule(new MyTimerTask() ,2000); } } 注意:每个TimerTask()只能运行一次 先隔一秒炸一次,再隔两秒钟炸一次,再搁一秒钟炸一次,。。。。 private static int count =0; new Timer().schedule(new MyTimerTask(), 1000); class MyTimerTask extends TimerTask{ @Override public void run() { count = (count+1)%2; System.out.println("bombing!"); new Timer().schedule(new TimerTask() ,1000+count*1000); } }
在静态方法中,不能new内部类的实例对象 原因: 内部类,可以访问外部类的成员变量,调用静态方法的时候,没有创建对象,此时没有可以访问的成员变量,所以会报错。
回顾需要重新看视频
回顾需要重新看视频
线程内部共享数据,线程间数据独立
package cn.itcast.heima2; import java.util.HashMap; import java.util.Map; import java.util.Random; public class ThreadScopeShareData { private static Map<Thread, Integer> threadData = new HashMap<Thread, Integer>(); public static void main(String[] args) { for(int i=0;i<2;i++){ new Thread(new Runnable(){ @Override public void run() { int data = new Random().nextInt(); System.out.println(Thread.currentThread().getName() + " has put data :" + data); threadData.put(Thread.currentThread(), data); new A().get(); new B().get(); } }).start(); } } static class A{ public void get(){ int data = threadData.get(Thread.currentThread()); System.out.println("A from " + Thread.currentThread().getName() + " get data :" + data); } } static class B{ public void get(){ int data = threadData.get(Thread.currentThread()); System.out.println("B from " + Thread.currentThread().getName() + " get data :" + data); } } }
ThreadLocal类,实现了线程内部共享数据,线程间数据独立,比05节视频中的更加简化方便
《1》 import java.util.HashMap; import java.util.Map; import java.util.Random;
public class ThreadLocalTest {
public static void main(String[] args) { new ThreadLocalTest().init(); } //init private void init(){ for(int i =0;i<2;i++){ new Thread(new Runnable() { public void run() { int data = new Random().nextInt(); Person.getThreadInstance().setName(Thread.currentThread().getName()); Person.getThreadInstance().setAge(data); new A().get(); new B().get(); } }).start(); } } //A class A { Person person = Person.getThreadInstance(); public void get(){ System.out.println("A:-"+Thread.currentThread().getName()+":name:"+person.getName()+":age:"+person.getAge()); } } //B class B { Person person = Person.getThreadInstance(); public void get(){ System.out.println("B:-"+Thread.currentThread().getName()+":name:"+person.getName()+":age:"+person.getAge()); } } //Person 将跟线程相关的绑定,放在共享的数据类的内部实现 static class Person{ private static ThreadLocal<Person> threadLocal = new ThreadLocal<ThreadLocalTest.Person>(); private Person(){ } public static Person getThreadInstance(){ Person person = threadLocal.get(); if(person==null){ person = new Person(); threadLocal.set(person); } return person;. } private String name; private int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } } }
《2》
import java.util.HashMap; import java.util.Map; import java.util.Random;
public class ThreadLocalTest {
public static final ThreadLocal<Person> threadlocal = new ThreadLocal(){
@Override protected Object initialValue() { return new Person(); } }; public static void main(String[] args) { new ThreadLocalTest().init(); } private void init(){ for(int i =0;i<2;i++){ new Thread(new Runnable() { public void run() { int data = new Random().nextInt(); threadlocal.get().setName(Thread.currentThread().getName()); threadlocal.get().setAge(data); new A().get(); new B().get(); } }).start(); } } //A class A { Person person = threadlocal.get(); public void get(){ System.out.println("A:-"+Thread.currentThread().getName()+":name:"+person.getName()+":age:"+person.getAge()); } } //B class B { Person person = threadlocal.get(); public void get(){ System.out.println("B:-"+Thread.currentThread().getName()+":name:"+person.getName()+":age:"+person.getAge()); } } //Person static class Person{ public Person(){ } private String name; private int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } } }
如果 每个线程执行的代码相同,可以使用同一个Runnable对象,这个Runnable对象中有那个共享数据,例如,买票系统就可以这么做。
如果每个线程执行的代码不同,这时候需要用不同的Runnable对象,有如下两种方式来实现这些Runnable对象之间的数据共享:
第一种: 将共享数据封装在另外一个对象中,然后将这个对象逐一传递给各个Runnable对象。每个线程对共享数据的操作方法也分配到那个对象身上去完成,这样容易实现针对该数据进行的各个操作的互斥和通信。
第二种:将这些Runnable对象作为某一个类中的内部类,共享数据作为这个外部类中的成员变量,每个线程对共享数据的操作方法也分配给外部类,以便实现对共享数据进行的各个操作的互斥和通信,作为内部类的各个Runnable对象调用外部类的这些方法。
上面两种方式的组合:将共享数据封装在另外一个对象中,每个线程对共享数据的操作方法也分配到那个对象身上去完成,对象作为这个外部类中的成员变量或方法中的局部变量,每个线程Runnable对象作为外部类中的成员内部类或局部内部类。
总之,要同步互斥的几段代码最好是分别放在几个独立的方法中,这些方法再放在同一个类中,这样比较容易实现它们之间的同步互斥和通信。 极端且简单的方式,即在任意一个类中定义一个static的变量,这将被所有线程共享。
设计四个线程,其中两个线程每次对j加一,另外两个线程每次对j减一
第一种示例代码 public class MultiThreadShareData { private static ShareData shareData = new ShareData(); public static void main(String[] args) { MyRunnable1 runNable1 = new MyRunnable1(shareData); MyRunnable2 runNable2 = new MyRunnable2(shareData); new Thread(runNable1).start(); new Thread(runNable2).start(); } } class ShareData{ private int j =0; public ShareData(){ } public void increment(){ j++; } public void decrement(){ j--; } } class MyRunnable1 implements Runnable{ private ShareData shareData; public MyRunnable1(ShareData shareData){ this.shareData = shareData; } public void run() { this.shareData.increment(); } } class MyRunnable2 implements Runnable{ private ShareData shareData; public MyRunnable2(ShareData shareData){ this.shareData = shareData; } public void run() { this.shareData.decrement(); } } 或者 public class MultiThreadShareData { public static void main(String[] args) {
MultiThreadShareData multiThreadShareData = new MultiThreadShareData();
ShareData shareData = multiThreadShareData.new ShareData();
MyRunnable1 runNable1 = multiThreadShareData.new MyRunnable1(shareData); MyRunnable2 runNable2 = multiThreadShareData.new MyRunnable2(shareData); new Thread(runNable1).start(); new Thread(runNable2).start(); } class ShareData{ private int j =0; public ShareData(){ } public void increment(){ j++; } public void decrement(){ j--; } } class MyRunnable1 implements Runnable{ private ShareData shareData; public MyRunnable1(ShareData shareData){ this.shareData = shareData; } public void run() { this.shareData.increment(); } } class MyRunnable2 implements Runnable{ private ShareData shareData; public MyRunnable2(ShareData shareData){ this.shareData = shareData; } public void run() { this.shareData.decrement(); } }
}
第二种示例代码
public class MultiThreadShareData {
public static void main(String[] args) { final ShareData shareData = new ShareData(); new Thread(new Runnable() { public void run() { shareData.increment(); } }).start(); new Thread(new Runnable() { public void run() { shareData.decrement(); } }).start(); } }
class ShareData{ private int j =0; public ShareData(){ } public void increment(){ j++; } public void decrement(){ j--; } }
线程池的概念与Executors类的应用 创建固定大小的线程池 创建缓存线程池 创建单一线程池(如何实现线程死亡后重新启动) 关闭线程池 shutdown与shutdownNow的比较 用线程池启动定时器 调用ScheduledExecutorService的schedule方法,返回的ScheduleFuture对象可以取消任务。 支持间隔重复任务的定时方式,不直接支持绝对定时方式,需要转换成相对时间方式。
创建一个固定线程数量的线程池,内有3个线程,分配给了10个任务,3个线程执行这10个任务,当一个线程执行完一个任务之后,再去执行另一个任务,直到所有的任务执行完毕,但线程池中线程不会销毁。 ExecutorService executorService = Executors.newFixedThreadPool(3); for(int i=1;i<=10;i++){ final int taskId = i; executorService.execute(new Runnable() { public void run() { for(int j=1;j<=10;j++){ System.out.println(Thread.currentThread().getName()+"----"+j+"次"+"execute task"+taskId); } } }); }
创建一个缓存线程池,缓存线程池中线程的数量是不固定的,动态变化,刚开始有3个任务,就只有3个线程,后来又来了6个任务,那就又增加了6个线程,任务执行完后,超时一段时间,多余线程销毁。
ExecutorService executorService = Executors.newCachedThreadPool(); for(int i=1;i<=10;i++){ final int taskId = i; executorService.execute(new Runnable() { public void run() { for(int j=1;j<=10;j++){ System.out.println(Thread.currentThread().getName()+"----"+j+"次"+"execute task"+taskId); } } }); } executorService.shutdown(); //当所有线程都空闲的时候,杀死线程,终止程序。 executorService.shutdownNow();//不管线程中的任务有没有执行完,都杀死线程。
创建一个只含有一个线程的线程池,该线程池只含有一个线程,当线程池里的线程被销毁后,线程池又会创建一个线程,替代原来的线程 ExecutorService executorService = Executors.newSingleThreadExecutor(); for(int i=1;i<=10;i++){ final int taskId = i; executorService.execute(new Runnable() { public void run() { for(int j=1;j<=10;j++){ System.out.println(Thread.currentThread().getName()+"----"+j+"次"+"execute task"+taskId); } } }); }
创建一个调度线程池,内含有3个线程,实现10秒定时执行功能 ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3); scheduledExecutorService.schedule(new Runnable() { public void run() { System.out.println("bomb!!!"); } },10, TimeUnit.SECONDS);
创建一个调度线程池,内含有3个线程,实现10秒定时执后,以后每隔2秒执行一次的功能。 ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3); scheduledExecutorService.scheduleAtFixedRate(new Runnable() { public void run() { System.out.println("bomb!!!"); } },10, 2, TimeUnit.SECONDS);
Future取得的结果类型和Callable返回的结果类型必须一致,这是通过泛型来实现的。
Callable要采用ExecutorSevice的submit方法提交,返回的future对象可以取消任务。
System.out.println("主线程::::"+Thread.currentThread().getName()); ExecutorService executorService = Executors.newSingleThreadExecutor(); Future future = executorService.submit(new Callable() { public Object call() throws Exception { Thread.sleep(2000); return Thread.currentThread().getName(); } }); String string = null; try { System.out.println("等待开始"); string = (String) future.get();//没有结果会一直等待,知道有结果为止 //string = (String) future.get(10, TimeUnit.SECONDS);//等待10s,没有有结果报异常 System.out.println("等待结束"); } catch (Exception e) { e.printStackTrace(); } System.out.println("Callable线程::::"+string);
CompletionService用于提交一组Callable任务,其take方法返回已完成的一个Callable任务对应的Future对象。 好比我同时种了几块地的麦子,然后就等待收割。收割时,则是那块先成熟了,则先去收割哪块麦子。
ExecutorService executorService = Executors.newFixedThreadPool(10); CompletionService completionService = new ExecutorCompletionService(executorService); for(int i=1;i<=10;i++){ final int taskId = i; completionService.submit(new Callable() { public Object call() throws Exception { Thread.sleep(new Random().nextInt(5000)); return "执行完的任务的ID::::"+taskId; } }); } for(int i=1;i<=10;i++){ try { String string = (String) completionService.take().get(); System.out.println(string); } catch (Exception e) { e.printStackTrace(); } }
Lock比传统线程模型中的synchronized方式更加面向对象,与生活中的锁类似,锁本身也应该是一个对象。两个线程执行的代码片段要实现同步互斥的效果,它们必须用同一个Lock对象。
public static void main(String[] args) { new LockTest().action(); } private void action(){ final Outputer outputer = new Outputer(); new Thread(new Runnable() { public void run() { for(int i=0;i<10;i++){ outputer.output("zhangxiaoxiang\n"); } } }).start(); new Thread(new Runnable() { public void run() { for(int i=0;i<10;i++){ outputer.output("lihuoming\n"); } } }).start(); } private class Outputer{ private Lock lock = null; public Outputer(){ lock = new ReentrantLock(); } public void output(String name){ lock.lock(); try{ for(int i = 0;i<name.length();i++){ System.out.print(name.charAt(i)); }; }finally{ lock.unlock(); } } }
读写锁:分为读锁和写锁,多个读锁不互斥,读锁与写锁互斥,这是由jvm自己控制的,你只要上好相应的锁即可。如果你的代码只读数据,可以很多人同时读,但不能同时写,那就上读锁;如果你的代码修改数据,只能有一个人在写,且不能同时读取,那就上写锁。总之,读的时候上读锁,写的时候上写锁!
package cn.itcast.heima2; import java.util.Random; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class ReadWriteLockTest { public static void main(String[] args) { final Queue3 q3 = new Queue3(); for(int i=0;i<3;i++) { new Thread(){ public void run(){ while(true){ q3.get(); } } }.start(); new Thread(){ public void run(){ while(true){ q3.put(new Random().nextInt(10000)); } } }.start(); } } } class Queue3{ private Object data = null;//共享数据,只能有一个线程能写该数据,但可以有多个线程同时读该数据。 ReadWriteLock rwl = new ReentrantReadWriteLock(); public void get(){ rwl.readLock().lock(); try { System.out.println(Thread.currentThread().getName() + " be ready to read data!"); Thread.sleep((long)(Math.random()*1000)); System.out.println(Thread.currentThread().getName() + "have read data :" + data); } catch (InterruptedException e) { e.printStackTrace(); }finally{ rwl.readLock().unlock(); } } public void put(Object data){ rwl.writeLock().lock(); try { System.out.println(Thread.currentThread().getName() + " be ready to write data!"); Thread.sleep((long)(Math.random()*1000)); this.data = data; System.out.println(Thread.currentThread().getName() + " have write data: " + data); } catch (InterruptedException e) { e.printStackTrace(); }finally{ rwl.writeLock().unlock(); } } }
############################缓存系统示例代码##############################
import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class CacheDemo { private Map cacheMap = new HashMap<String,Object>(); public static void main(String[] args) { } private ReadWriteLock rwl = new ReentrantReadWriteLock();
public Object get(int key) throws Exception{ rwl.readLock().lock(); Object value = null; try{ value = cacheMap.get(key); if(value==null){ rwl.readLock().unlock(); rwl.writeLock().lock(); try{ value = "aaaa";//实际上是queryDB() if(value == null){ throw new Exception(); } cacheMap.put(key, value); }finally{ rwl.writeLock().unlock(); } rwl.readLock().lock(); } }finally{ rwl.readLock().unlock(); } return value; } }
阻塞队列
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition();// notFull 缓存不满 final Condition notEmpty = lock.newCondition();//notEmpty 缓存非空 final Object[] items = new Object[100]; int putptr,takeptr,count; public void put(Object x) throws InterruptedException{ lock.lock(); try{ while(count==items.length) notFull.await();//缓存不满这个条件是假的 及意思是 缓存是满的 items[putptr]=x; if(++putptr==items.length) putptr=0; ++count; notEmpty.signal();//缓存非空这个条件是真的 }finally{ lock.unlock(); } } public Object take() throws InterruptedException{ lock.lock(); try{ while(count==0) notEmpty.await();//缓存非空这个条件是假的 及意思是 现在缓存是空的 Object x = items[takeptr]; if(++takeptr==items.length) takeptr=0; --count; notFull.signal();//缓存不满这个条件是真的 return x; }finally{ lock.unlock(); } } } i
Semaphore可以维护当前访问自身的线程个数,并提供了同步机制。使用Semaphore可以控制同时访问资源的线程个数,例如,实现一个文件允许的并发访问数。 Semaphore实现的功能就类似厕所有5个坑,假如有十个人要上厕所,那么同时能有多少个人去上厕所呢?同时只能有5个人能够占用,当5个人中的任何一个人让开后,其中在等待的另外5个人中又有一个可以占用了。 另外等待的5个人中可以是随机获得优先机会,也可以是按照先来后到的顺序获得机会,这取决于构造Semaphore对象时传入的参数选项。 单个信号量的Semaphore对象可以实现互斥锁的功能,并且可以是由一个线程获得了“锁”,再由另一个线程释放“锁”,这可应用于死锁恢复的一些场合。
package cn.itcast.heima2; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class TwoTest { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(3); for(int i=0;i<10;i++){ Runnable runnable = new Runnable() { public void run() { try { semaphore.acquire(); System.out.println("线程" + Thread.currentThread().getName() + "进入,当前已有" + (3-semaphore.availablePermits()) + "个并发"); Thread.sleep((long) (Math.random()*10000)); System.out.println("线程" + Thread.currentThread().getName() + "即将离开"); semaphore.release(); //下面代码有时候执行不准确,因为其没有和上面的代码合成原子单元 System.out.println("线程" + Thread.currentThread().getName() + "已离开,当前已有" + (3-semaphore.availablePermits()) + "个并发"); } catch (InterruptedException e) { e.printStackTrace(); } } }; executorService.execute(runnable); } } }
表示大家彼此等待,大家集合好后才开始出发,分散活动后又在指定地点集合碰面,这就好比整个公司的人员利用周末时间集体郊游一样,先各自从家出发到公司集合后,再同时出发到公园游玩,在指定地点集合后再同时开始就餐,…。
import java.util.Random; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CyclicBarrierTest { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); final CyclicBarrier cyclicBarrier = new CyclicBarrier(3); for(int i=1;i<=3;i++){ Runnable runnable = new Runnable() { public void run() { try { Thread.sleep((long) (Math.random()*10000)); System.out.println("线程"+Thread.currentThread().getName()+"即将到达集合点1" + ",当前已有"+(cyclicBarrier.getNumberWaiting()+1)+"个到达集合点," + (cyclicBarrier.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候")); cyclicBarrier.await(); Thread.sleep((long) (Math.random()*10000)); System.out.println("线程" + Thread.currentThread().getName() + "即将到达集合地点2,当前已有" + (cyclicBarrier.getNumberWaiting()+1) + "个已经到达," + (cyclicBarrier.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候")); cyclicBarrier.await(); Thread.sleep((long)(Math.random()*10000)); System.out.println("线程" + Thread.currentThread().getName() + "即将到达集合地点3,当前已有" + (cyclicBarrier.getNumberWaiting() + 1) + "个已经到达," + (cyclicBarrier.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候")); cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } } }; executorService.execute(runnable); } executorService.shutdown(); } }
犹如倒计时计数器,调用CountDownLatch对象的countDown方法就将计数器减1,当计数到达0时,则所有等待者或单个等待者开始执行。这直接通过代码来说明CountDownLatch的作用,这样学员的理解效果更直接。 可以实现一个人(也可以是多个人)等待其他所有人都来通知他,这犹如一个计划需要多个领导都签字后才能继续向下实施。还可以实现一个人通知多个人的效果,类似裁判一声口令,运动员同时开始奔跑。用这个功能做百米赛跑的游戏程序不错哦!
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
public class CountDownLatch {
public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); final java.util.concurrent.CountDownLatch orderCount = new java.util.concurrent.CountDownLatch(1); final java.util.concurrent.CountDownLatch ansCount = new java.util.concurrent.CountDownLatch(3); for(int i=1;i<=3;i++){ Runnable runnable = new Runnable() { public void run() { System.out.println("线程" + Thread.currentThread().getName() +"正准备接受命令"); try { orderCount.await(); System.out.println("线程" + Thread.currentThread().getName() + "已接受命令"); Thread.sleep((long)(Math.random()*10000)); System.out.println("线程" + Thread.currentThread().getName() + "回应命令处理结果"); ansCount.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }; executorService.execute(runnable); } try { Thread.sleep((long)(Math.random()*10000)); System.out.println("线程" + Thread.currentThread().getName() + "即将发布命令"); orderCount.countDown(); System.out.println("线程" + Thread.currentThread().getName() +"已发送命令,正在等待结果"); ansCount.await(); System.out.println("线程" + Thread.currentThread().getName() +"已收到所有响应结果");
} catch (InterruptedException e) { e.printStackTrace(); } executorService.shutdown(); } }
用于实现两个人之间的数据交换,每个人在完成一定的事务后想与对方交换数据,第一个先拿出数据的人将一直等待第二个人拿着数据到来时,才能彼此交换数据。
import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
public class ExchangerTest {
public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); final Exchanger exchanger = new Exchanger(); executorService.execute(new Runnable() { public void run() { try { String data1 = "毒品"; System.out.println("线程" + Thread.currentThread().getName() +"正在把" + data1 +"换出去"); Thread.sleep((long) (Math.random()*10000)); String data2 = (String) exchanger.exchange(data1); System.out.println("线程" + Thread.currentThread().getName() + "换回" + data2); } catch (InterruptedException e) { e.printStackTrace(); } } }); executorService.execute(new Runnable() { public void run() { try { String data1 = "美金"; System.out.println("线程" + Thread.currentThread().getName() +"正在把" + data1 +"换出去"); Thread.sleep((long) (Math.random()*10000)); String data2 = (String) exchanger.exchange(data1); System.out.println("线程" + Thread.currentThread().getName() + "换回" + data2); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); } }
什么是可阻塞队列,阻塞队列的作用与实际应用,阻塞队列的实现原理。 阻塞队列与Semaphore有些相似,但也不同,阻塞队列是一方存放数据,另一方释放数据,Semaphore通常则是由同一方设置和释放信号量。
ArrayBlockingQueue 只有put方法和take方法才具有阻塞功能
用3个空间的队列来演示阻塞队列的功能和效果。
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingDeque; import java.util.concurrent.BlockingQueue; public class BlockQueueTest { public static void main(String[] args) { final BlockingQueue blockingQueue = new ArrayBlockingQueue(3); for(int i=1;i<=2;i++){ new Thread(new Runnable() { public void run() { while(true){ try { Thread.sleep((long) (Math.random()*10000)); System.out.println(Thread.currentThread().getName()+"准备放数据"); blockingQueue.put(1); System.out.println(Thread.currentThread().getName()+"放数据成功"+"当前队列有"+blockingQueue.size()+"个数据"); } catch (Exception e) { e.printStackTrace(); } } } }).start(); } new Thread(new Runnable() { public void run() { while(true){ try { Thread.sleep((long) (Math.random()*10000)); System.out.println(Thread.currentThread().getName() + "准备取数据!"); blockingQueue.take(); System.out.println(Thread.currentThread().getName()+"取数据成功"+"当前队列有"+blockingQueue.size()+"个数据"); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); } }
用两个具有1个空间的队列来实现同步通知的功能。
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BlockingQueueCommunicationTest { public static void main(String[] args) { new BlockingQueueCommunicationTest().execute(); } private void execute(){ final Business business = new Business(); new Thread(new Runnable() { public void run() { for(int j=1;j<=100;j++){ business.sub(j); } } }).start(); for(int j=1;j<=100;j++){ business.main(j); } } private class Business{ BlockingQueue blockingQueue1 = new ArrayBlockingQueue(1); BlockingQueue blockingQueue2 = new ArrayBlockingQueue(1); //匿名构造方法,先于非匿名构造方法执行 { try { blockingQueue2.put(1); } catch (InterruptedException e) { e.printStackTrace(); } } public void sub(int j){ try { blockingQueue1.put(1); } catch (InterruptedException e) { e.printStackTrace(); } for(int i=1;i<=10;i++){ System.out.println("sub thread sequece of " + i + ",loop of " + j); } try { blockingQueue2.take(); } catch (InterruptedException e) { e.printStackTrace(); } } public void main(int j){ try { blockingQueue2.put(1); } catch (InterruptedException e) { e.printStackTrace(); } for(int i=1;i<=10;i++){ System.out.println("main thread sequece of " + i + ",loop of " + j); } try { blockingQueue1.take(); } catch (InterruptedException e) { e.printStackTrace(); } } } }