天天看点

Java张孝祥视频 学习笔记 多线程

此博客主要是在观看张孝祥老师的教学视频的过程中,自己所做的学习笔记,在此以博客的形式分享出来,方便大家学习 ,建议大家观看视频,以此笔记作为回顾资料。 参考资料 传智播客_张孝祥_Java多线程与并发库高级应用视频教程下载 视频下载

创建线程的两种方式: 1,创建Thread的子类,重写run方法 2,给Thread类传入Runnable接口

两种方式的区别: 第二种方式可以实现数据共享,而且更倾向于面向对象的编程思想。一般都是采用第二种方式。

new Thread().start(); 调用了start方法后,就会运行Thread类的run方法,如下 public void run(){     if(target!=null){        targe.run();     } } 如果target为空,就什么也不做

new Thread(         new Runnable(){              public void run() { //1                }         }   ){        public void run() { //2         }  }.start();

执行的是2run方法 执行的步骤: 先运行子类的run方法,如果子类没有重写run方法,就去运行父类的run方法,上述代码中子类重写了run方法,所以就不会运行Runnable中的run方法。

 1秒后,炸一次  new Timer().schedule(new TimerTask() {            @Override        public void run() {             System.out.println("bombing!");         }   }, 1000); 每隔两秒炸一次<一方式>

new Timer().schedule(new TimerTask() {            @Override        public void run() {             System.out.println("bombing!");         } }, 1000,2000);

 每隔两秒钟炸一次 <二方式>  new Timer().schedule(new MyTimerTask(), 2000);

 class MyTimerTask extends TimerTask{        @Override    public void run() {          System.out.println("bombing!");         new Timer().schedule(new MyTimerTask() ,2000);      }   } 注意:每个TimerTask()只能运行一次 先隔一秒炸一次,再隔两秒钟炸一次,再搁一秒钟炸一次,。。。。 private static int count =0; new Timer().schedule(new MyTimerTask(), 1000); class MyTimerTask extends TimerTask{       @Override    public void run() {          count = (count+1)%2;         System.out.println("bombing!");         new Timer().schedule(new TimerTask() ,1000+count*1000);      } }

在静态方法中,不能new内部类的实例对象 原因: 内部类,可以访问外部类的成员变量,调用静态方法的时候,没有创建对象,此时没有可以访问的成员变量,所以会报错。

回顾需要重新看视频 

回顾需要重新看视频

线程内部共享数据,线程间数据独立

package cn.itcast.heima2; import java.util.HashMap; import java.util.Map; import java.util.Random; public class ThreadScopeShareData {    private static Map<Thread, Integer> threadData = new HashMap<Thread, Integer>();  public static void main(String[] args) {       for(int i=0;i<2;i++){            new Thread(new Runnable(){            @Override             public void run() {                  int data = new Random().nextInt();                  System.out.println(Thread.currentThread().getName()  + " has put data :" + data);                   threadData.put(Thread.currentThread(), data);                  new A().get();                  new B().get();             }        }).start();    }  }     static class A{       public void get(){            int data = threadData.get(Thread.currentThread());            System.out.println("A from " + Thread.currentThread().getName()  + " get data :" + data);       }  }    static class B{       public void get(){       int data = threadData.get(Thread.currentThread());       System.out.println("B from " + Thread.currentThread().getName() + " get data :" + data);     }  } }

ThreadLocal类,实现了线程内部共享数据,线程间数据独立,比05节视频中的更加简化方便

《1》 import java.util.HashMap; import java.util.Map; import java.util.Random;

public class ThreadLocalTest {

       public static void main(String[] args) {           new ThreadLocalTest().init();      }    //init      private void init(){           for(int i =0;i<2;i++){                new Thread(new Runnable() {                         public void run() {                                int data = new Random().nextInt();                          Person.getThreadInstance().setName(Thread.currentThread().getName());                           Person.getThreadInstance().setAge(data);                          new A().get();                           new B().get();                 }             }).start();         }     }   //A  class A {       Person person = Person.getThreadInstance();       public void get(){            System.out.println("A:-"+Thread.currentThread().getName()+":name:"+person.getName()+":age:"+person.getAge());          }  }  //B  class B {       Person person = Person.getThreadInstance();       public void get(){            System.out.println("B:-"+Thread.currentThread().getName()+":name:"+person.getName()+":age:"+person.getAge());       } }  //Person   将跟线程相关的绑定,放在共享的数据类的内部实现      static class Person{           private static ThreadLocal<Person>  threadLocal = new ThreadLocal<ThreadLocalTest.Person>();              private Person(){               }            public static Person  getThreadInstance(){                Person person = threadLocal.get();                if(person==null){                     person = new Person();                     threadLocal.set(person);                }                return person;.          }               private String name;           private int age;             public String getName() {                return name;           }           public void setName(String name) {                 this.name = name;           }           public int getAge() {                return age;           }           public void setAge(int age) {                this.age = age;           }       }    }

《2》

import java.util.HashMap; import java.util.Map; import java.util.Random;

public class ThreadLocalTest {

     public static final ThreadLocal<Person>  threadlocal = new  ThreadLocal(){

              @Override               protected Object initialValue() {                        return new Person();               }             };        public static void main(String[] args) {           new ThreadLocalTest().init();      }         private void init(){           for(int i =0;i<2;i++){                new Thread(new Runnable() {                         public void run() {                                int data = new Random().nextInt();                          threadlocal.get().setName(Thread.currentThread().getName());                          threadlocal.get().setAge(data);                          new A().get();                          new B().get();                 }            }).start();         }     }      //A      class A {           Person person = threadlocal.get();           public void get(){                System.out.println("A:-"+Thread.currentThread().getName()+":name:"+person.getName()+":age:"+person.getAge());              }      }    //B     class B {        Person person = threadlocal.get();        public void get(){            System.out.println("B:-"+Thread.currentThread().getName()+":name:"+person.getName()+":age:"+person.getAge());         }     }       //Person        static class Person{                   public Person(){                }             private String name;            private int age;             public String getName() {                return name;           }           public void setName(String name) {                 this.name = name;           }           public int getAge() {                return age;           }           public void setAge(int age) {                this.age = age;           }       }   }

如果 每个线程执行的代码相同,可以使用同一个Runnable对象,这个Runnable对象中有那个共享数据,例如,买票系统就可以这么做。

如果每个线程执行的代码不同,这时候需要用不同的Runnable对象,有如下两种方式来实现这些Runnable对象之间的数据共享:

第一种: 将共享数据封装在另外一个对象中,然后将这个对象逐一传递给各个Runnable对象。每个线程对共享数据的操作方法也分配到那个对象身上去完成,这样容易实现针对该数据进行的各个操作的互斥和通信。 

第二种:将这些Runnable对象作为某一个类中的内部类,共享数据作为这个外部类中的成员变量,每个线程对共享数据的操作方法也分配给外部类,以便实现对共享数据进行的各个操作的互斥和通信,作为内部类的各个Runnable对象调用外部类的这些方法。

上面两种方式的组合:将共享数据封装在另外一个对象中,每个线程对共享数据的操作方法也分配到那个对象身上去完成,对象作为这个外部类中的成员变量或方法中的局部变量,每个线程Runnable对象作为外部类中的成员内部类或局部内部类。

总之,要同步互斥的几段代码最好是分别放在几个独立的方法中,这些方法再放在同一个类中,这样比较容易实现它们之间的同步互斥和通信。  极端且简单的方式,即在任意一个类中定义一个static的变量,这将被所有线程共享。

设计四个线程,其中两个线程每次对j加一,另外两个线程每次对j减一

第一种示例代码   public class MultiThreadShareData {    private static ShareData shareData = new ShareData();     public static void main(String[] args) {       MyRunnable1 runNable1 = new MyRunnable1(shareData);       MyRunnable2 runNable2 = new MyRunnable2(shareData);       new Thread(runNable1).start();       new Thread(runNable2).start();   } }  class ShareData{         private int j =0;         public ShareData(){         }       public void increment(){           j++;      }      public void decrement(){           j--;      } } class MyRunnable1 implements Runnable{      private ShareData shareData;      public MyRunnable1(ShareData shareData){           this.shareData = shareData;      }      public void run() {           this.shareData.increment();      } }  class MyRunnable2 implements Runnable{     private ShareData shareData;     public MyRunnable2(ShareData shareData){           this.shareData = shareData;      }     public void run() {           this.shareData.decrement();      } }  或者 public class MultiThreadShareData {         public static void main(String[] args) {

      MultiThreadShareData multiThreadShareData = new MultiThreadShareData();

       ShareData shareData = multiThreadShareData.new ShareData(); 

      MyRunnable1 runNable1 = multiThreadShareData.new MyRunnable1(shareData);       MyRunnable2 runNable2 = multiThreadShareData.new MyRunnable2(shareData);       new Thread(runNable1).start();       new Thread(runNable2).start();   }       class ShareData{         private int j =0;         public ShareData(){         }       public void increment(){           j++;      }      public void decrement(){           j--;      } } class MyRunnable1 implements Runnable{      private ShareData shareData;      public MyRunnable1(ShareData shareData){           this.shareData = shareData;      }      public void run() {           this.shareData.increment();      } }  class MyRunnable2 implements Runnable{     private ShareData shareData;     public MyRunnable2(ShareData shareData){           this.shareData = shareData;      }     public void run() {           this.shareData.decrement();      } } 

   } 

第二种示例代码

public class MultiThreadShareData {

      public static void main(String[] args) {            final ShareData shareData = new ShareData();                    new Thread(new Runnable() {               public void run() {                      shareData.increment();              }         }).start();                    new Thread(new Runnable() {             public void run() {                   shareData.decrement();             }        }).start();  }     }

class ShareData{       private int j =0;        public ShareData(){         }       public void increment(){           j++;      }      public void decrement(){           j--;      } }

线程池的概念与Executors类的应用 创建固定大小的线程池 创建缓存线程池 创建单一线程池(如何实现线程死亡后重新启动) 关闭线程池  shutdown与shutdownNow的比较 用线程池启动定时器  调用ScheduledExecutorService的schedule方法,返回的ScheduleFuture对象可以取消任务。 支持间隔重复任务的定时方式,不直接支持绝对定时方式,需要转换成相对时间方式。   

创建一个固定线程数量的线程池,内有3个线程,分配给了10个任务,3个线程执行这10个任务,当一个线程执行完一个任务之后,再去执行另一个任务,直到所有的任务执行完毕,但线程池中线程不会销毁。     ExecutorService executorService = Executors.newFixedThreadPool(3);        for(int i=1;i<=10;i++){             final int taskId = i;             executorService.execute(new Runnable() {                  public void run() {                       for(int j=1;j<=10;j++){                            System.out.println(Thread.currentThread().getName()+"----"+j+"次"+"execute task"+taskId);                       }                  }             });      }

创建一个缓存线程池,缓存线程池中线程的数量是不固定的,动态变化,刚开始有3个任务,就只有3个线程,后来又来了6个任务,那就又增加了6个线程,任务执行完后,超时一段时间,多余线程销毁。

   ExecutorService executorService = Executors.newCachedThreadPool();        for(int i=1;i<=10;i++){             final int taskId = i;             executorService.execute(new Runnable() {                  public void run() {                       for(int j=1;j<=10;j++){                            System.out.println(Thread.currentThread().getName()+"----"+j+"次"+"execute task"+taskId);                       }                  }             });     } executorService.shutdown();        //当所有线程都空闲的时候,杀死线程,终止程序。 executorService.shutdownNow();//不管线程中的任务有没有执行完,都杀死线程。

创建一个只含有一个线程的线程池,该线程池只含有一个线程,当线程池里的线程被销毁后,线程池又会创建一个线程,替代原来的线程  ExecutorService executorService = Executors.newSingleThreadExecutor();        for(int i=1;i<=10;i++){             final int taskId = i;             executorService.execute(new Runnable() {                  public void run() {                       for(int j=1;j<=10;j++){                            System.out.println(Thread.currentThread().getName()+"----"+j+"次"+"execute task"+taskId);                       }                  }             });     }

创建一个调度线程池,内含有3个线程,实现10秒定时执行功能 ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);         scheduledExecutorService.schedule(new Runnable() {     public void run() {      System.out.println("bomb!!!");     }    },10, TimeUnit.SECONDS);

创建一个调度线程池,内含有3个线程,实现10秒定时执后,以后每隔2秒执行一次的功能。 ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);     scheduledExecutorService.scheduleAtFixedRate(new Runnable() {         public void run() {       System.out.println("bomb!!!");     }    },10, 2, TimeUnit.SECONDS);

Future取得的结果类型和Callable返回的结果类型必须一致,这是通过泛型来实现的。

Callable要采用ExecutorSevice的submit方法提交,返回的future对象可以取消任务。

   System.out.println("主线程::::"+Thread.currentThread().getName());   ExecutorService executorService = Executors.newSingleThreadExecutor();   Future future = executorService.submit(new Callable() {        public Object call() throws Exception {             Thread.sleep(2000);             return Thread.currentThread().getName();            }   });    String string = null;   try {        System.out.println("等待开始");        string = (String) future.get();//没有结果会一直等待,知道有结果为止     //string = (String) future.get(10, TimeUnit.SECONDS);//等待10s,没有有结果报异常        System.out.println("等待结束");   } catch (Exception e) {        e.printStackTrace();   }   System.out.println("Callable线程::::"+string);

CompletionService用于提交一组Callable任务,其take方法返回已完成的一个Callable任务对应的Future对象。 好比我同时种了几块地的麦子,然后就等待收割。收割时,则是那块先成熟了,则先去收割哪块麦子。

       ExecutorService executorService = Executors.newFixedThreadPool(10);     CompletionService completionService = new ExecutorCompletionService(executorService);     for(int i=1;i<=10;i++){            final int taskId = i;            completionService.submit(new Callable() {                 public Object call() throws Exception {                        Thread.sleep(new Random().nextInt(5000));                       return "执行完的任务的ID::::"+taskId;                 }             });       }     for(int i=1;i<=10;i++){       try {          String string = (String) completionService.take().get();         System.out.println(string);     } catch (Exception e) {         e.printStackTrace();     }  }

Lock比传统线程模型中的synchronized方式更加面向对象,与生活中的锁类似,锁本身也应该是一个对象。两个线程执行的代码片段要实现同步互斥的效果,它们必须用同一个Lock对象。

public static void main(String[] args) {           new LockTest().action();    }     private void action(){         final Outputer outputer = new Outputer();         new Thread(new Runnable() {            public void run() {                  for(int i=0;i<10;i++){                       outputer.output("zhangxiaoxiang\n");                 }            }      }).start();     new Thread(new Runnable() {        public void run() {              for(int i=0;i<10;i++){                  outputer.output("lihuoming\n");             }        }      }).start(); }    private class Outputer{       private Lock lock = null;       public Outputer(){            lock = new ReentrantLock();       }       public void output(String name){                lock.lock();            try{                 for(int i = 0;i<name.length();i++){                      System.out.print(name.charAt(i));             };            }finally{                 lock.unlock();            }           } }

读写锁:分为读锁和写锁,多个读锁不互斥,读锁与写锁互斥,这是由jvm自己控制的,你只要上好相应的锁即可。如果你的代码只读数据,可以很多人同时读,但不能同时写,那就上读锁;如果你的代码修改数据,只能有一个人在写,且不能同时读取,那就上写锁。总之,读的时候上读锁,写的时候上写锁!

package cn.itcast.heima2;  import java.util.Random; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class ReadWriteLockTest {  public static void main(String[] args) {       final Queue3 q3 = new Queue3();       for(int i=0;i<3;i++)       {            new Thread(){                 public void run(){                      while(true){                            q3.get();                 }              } }.start();             new Thread(){                 public void run(){                         while(true){                               q3.put(new Random().nextInt(10000));                          }                 }            }.start();         }       } }  class Queue3{      private Object data = null;//共享数据,只能有一个线程能写该数据,但可以有多个线程同时读该数据。      ReadWriteLock rwl = new ReentrantReadWriteLock();      public void get(){               rwl.readLock().lock();               try {                    System.out.println(Thread.currentThread().getName() + " be ready to read data!");                    Thread.sleep((long)(Math.random()*1000));                    System.out.println(Thread.currentThread().getName() + "have read data :" + data);               } catch (InterruptedException e) {                    e.printStackTrace();               }finally{                   rwl.readLock().unlock();               }     }      public void put(Object data){           rwl.writeLock().lock();             try {                System.out.println(Thread.currentThread().getName() + " be ready to write data!");                Thread.sleep((long)(Math.random()*1000));                this.data = data;                System.out.println(Thread.currentThread().getName() + " have write data: " + data);             } catch (InterruptedException e) {                e.printStackTrace();          }finally{                rwl.writeLock().unlock();           }    }  }

############################缓存系统示例代码##############################

import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class CacheDemo {          private Map cacheMap = new HashMap<String,Object>();            public static void main(String[] args) {          }            private ReadWriteLock rwl = new ReentrantReadWriteLock();

         public Object get(int key) throws Exception{               rwl.readLock().lock();               Object value = null;               try{                    value = cacheMap.get(key);                    if(value==null){                         rwl.readLock().unlock();                         rwl.writeLock().lock();                         try{                              value = "aaaa";//实际上是queryDB()                              if(value == null){                                   throw new Exception();                              }                               cacheMap.put(key, value);                         }finally{                              rwl.writeLock().unlock();                         }                         rwl.readLock().lock();                 }         }finally{               rwl.readLock().unlock();         }             return value;      }   }

阻塞队列

import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;         public class BoundedBuffer {      final Lock lock = new ReentrantLock();  final Condition notFull = lock.newCondition();// notFull  缓存不满  final Condition notEmpty = lock.newCondition();//notEmpty 缓存非空      final Object[] items = new Object[100];  int putptr,takeptr,count;     public void put(Object x) throws InterruptedException{       lock.lock();       try{                while(count==items.length)                     notFull.await();//缓存不满这个条件是假的 及意思是 缓存是满的                items[putptr]=x;            if(++putptr==items.length) putptr=0;            ++count;                notEmpty.signal();//缓存非空这个条件是真的           }finally{            lock.unlock();       }  }      public Object take() throws InterruptedException{       lock.lock();      try{           while(count==0)                notEmpty.await();//缓存非空这个条件是假的   及意思是  现在缓存是空的                 Object x = items[takeptr];           if(++takeptr==items.length) takeptr=0;           --count;                 notFull.signal();//缓存不满这个条件是真的           return x;                }finally{               lock.unlock();          }     } } i

Semaphore可以维护当前访问自身的线程个数,并提供了同步机制。使用Semaphore可以控制同时访问资源的线程个数,例如,实现一个文件允许的并发访问数。 Semaphore实现的功能就类似厕所有5个坑,假如有十个人要上厕所,那么同时能有多少个人去上厕所呢?同时只能有5个人能够占用,当5个人中的任何一个人让开后,其中在等待的另外5个人中又有一个可以占用了。 另外等待的5个人中可以是随机获得优先机会,也可以是按照先来后到的顺序获得机会,这取决于构造Semaphore对象时传入的参数选项。 单个信号量的Semaphore对象可以实现互斥锁的功能,并且可以是由一个线程获得了“锁”,再由另一个线程释放“锁”,这可应用于死锁恢复的一些场合。

package cn.itcast.heima2; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class TwoTest {          public static void main(String[] args) {                 ExecutorService executorService = Executors.newCachedThreadPool();                final Semaphore semaphore = new Semaphore(3);                for(int i=0;i<10;i++){                      Runnable runnable = new Runnable() {                               public void run() {                                        try {                                                semaphore.acquire();                                                System.out.println("线程" + Thread.currentThread().getName() + "进入,当前已有" + (3-semaphore.availablePermits()) + "个并发");                                                      Thread.sleep((long) (Math.random()*10000));                                                     System.out.println("线程" + Thread.currentThread().getName() + "即将离开");                                                  semaphore.release();                                                     //下面代码有时候执行不准确,因为其没有和上面的代码合成原子单元                                               System.out.println("线程" + Thread.currentThread().getName() + "已离开,当前已有" + (3-semaphore.availablePermits()) + "个并发");                                                  } catch (InterruptedException e) {                                                    e.printStackTrace();                                           }                        }             };             executorService.execute(runnable);        }   } } 

表示大家彼此等待,大家集合好后才开始出发,分散活动后又在指定地点集合碰面,这就好比整个公司的人员利用周末时间集体郊游一样,先各自从家出发到公司集合后,再同时出发到公园游玩,在指定地点集合后再同时开始就餐,…。   

import java.util.Random; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CyclicBarrierTest {        public static void main(String[] args) {                ExecutorService executorService = Executors.newCachedThreadPool();                final CyclicBarrier cyclicBarrier = new CyclicBarrier(3);                     for(int i=1;i<=3;i++){                                 Runnable runnable = new Runnable() {                                    public void run() {                                                 try {                                                    Thread.sleep((long) (Math.random()*10000));                                                    System.out.println("线程"+Thread.currentThread().getName()+"即将到达集合点1" +                                                      ",当前已有"+(cyclicBarrier.getNumberWaiting()+1)+"个到达集合点," +                                                    (cyclicBarrier.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候"));                                                    cyclicBarrier.await();                                                             Thread.sleep((long) (Math.random()*10000));                                                             System.out.println("线程" + Thread.currentThread().getName() +                                                  "即将到达集合地点2,当前已有" + (cyclicBarrier.getNumberWaiting()+1) + "个已经到达," + (cyclicBarrier.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候"));                                                              cyclicBarrier.await();                                                    Thread.sleep((long)(Math.random()*10000));                                                    System.out.println("线程" + Thread.currentThread().getName() +                                                    "即将到达集合地点3,当前已有" + (cyclicBarrier.getNumberWaiting() + 1) + "个已经到达," + (cyclicBarrier.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候"));                                                              cyclicBarrier.await();                                       } catch (Exception e) {                                            e.printStackTrace();                                       }                            }             };                    executorService.execute(runnable);         }       executorService.shutdown();    }  }

犹如倒计时计数器,调用CountDownLatch对象的countDown方法就将计数器减1,当计数到达0时,则所有等待者或单个等待者开始执行。这直接通过代码来说明CountDownLatch的作用,这样学员的理解效果更直接。  可以实现一个人(也可以是多个人)等待其他所有人都来通知他,这犹如一个计划需要多个领导都签字后才能继续向下实施。还可以实现一个人通知多个人的效果,类似裁判一声口令,运动员同时开始奔跑。用这个功能做百米赛跑的游戏程序不错哦!

import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;

public class CountDownLatch {

         public static void main(String[] args) {                ExecutorService executorService = Executors.newCachedThreadPool();                final java.util.concurrent.CountDownLatch orderCount = new java.util.concurrent.CountDownLatch(1);                final java.util.concurrent.CountDownLatch ansCount = new java.util.concurrent.CountDownLatch(3);                          for(int i=1;i<=3;i++){                       Runnable runnable = new Runnable() {                                        public void run() {                                           System.out.println("线程" + Thread.currentThread().getName() +"正准备接受命令");                                              try {                                                orderCount.await();                                                System.out.println("线程" + Thread.currentThread().getName() +  "已接受命令");                                                  Thread.sleep((long)(Math.random()*10000));                                                System.out.println("线程" + Thread.currentThread().getName() +   "回应命令处理结果");                                                 ansCount.countDown();                                                    } catch (InterruptedException e) {                                                 e.printStackTrace();                                           }                              }                     };                        executorService.execute(runnable);              }                   try {                           Thread.sleep((long)(Math.random()*10000));               System.out.println("线程" + Thread.currentThread().getName() +  "即将发布命令");                orderCount.countDown();               System.out.println("线程" + Thread.currentThread().getName() +"已发送命令,正在等待结果");                 ansCount.await();               System.out.println("线程" + Thread.currentThread().getName() +"已收到所有响应结果");  

         } catch (InterruptedException e) {             e.printStackTrace();          }          executorService.shutdown();              }  } 

用于实现两个人之间的数据交换,每个人在完成一定的事务后想与对方交换数据,第一个先拿出数据的人将一直等待第二个人拿着数据到来时,才能彼此交换数据。

import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;

public class ExchangerTest {

        public static void main(String[] args) {                   ExecutorService executorService = Executors.newCachedThreadPool();                   final Exchanger exchanger = new Exchanger();                       executorService.execute(new Runnable() {                                public void run() {                                         try {                                             String data1 = "毒品";                                             System.out.println("线程" + Thread.currentThread().getName() +"正在把" + data1 +"换出去");                                             Thread.sleep((long) (Math.random()*10000));                                             String data2 = (String) exchanger.exchange(data1);                                             System.out.println("线程" + Thread.currentThread().getName() + "换回" + data2);                                       } catch (InterruptedException e) {                                             e.printStackTrace();                                 }                            }                   });                    executorService.execute(new Runnable() {                                  public void run() {                                         try {                                              String data1 = "美金";                                              System.out.println("线程" + Thread.currentThread().getName() +"正在把" + data1 +"换出去");                                              Thread.sleep((long) (Math.random()*10000));                                              String data2 = (String) exchanger.exchange(data1);                                              System.out.println("线程" + Thread.currentThread().getName() + "换回" + data2);                                         } catch (InterruptedException e) {                                              // TODO Auto-generated catch block                                              e.printStackTrace();                                         }                                }                     });         } }

什么是可阻塞队列,阻塞队列的作用与实际应用,阻塞队列的实现原理。 阻塞队列与Semaphore有些相似,但也不同,阻塞队列是一方存放数据,另一方释放数据,Semaphore通常则是由同一方设置和释放信号量。

ArrayBlockingQueue 只有put方法和take方法才具有阻塞功能

用3个空间的队列来演示阻塞队列的功能和效果。

import java.util.concurrent.ArrayBlockingQueue;  import java.util.concurrent.BlockingDeque; import java.util.concurrent.BlockingQueue; public class BlockQueueTest {    public static void main(String[] args) {            final BlockingQueue blockingQueue = new ArrayBlockingQueue(3);            for(int i=1;i<=2;i++){                 new Thread(new Runnable() {                                public void run() {                                       while(true){                                    try {                                         Thread.sleep((long) (Math.random()*10000));                                         System.out.println(Thread.currentThread().getName()+"准备放数据");                                         blockingQueue.put(1);                                         System.out.println(Thread.currentThread().getName()+"放数据成功"+"当前队列有"+blockingQueue.size()+"个数据");                                            } catch (Exception e) {                                         e.printStackTrace();                                    }                               }                            }              }).start();         }                new Thread(new Runnable() {                         public void run() {                              while(true){                                       try {                                            Thread.sleep((long) (Math.random()*10000));                                            System.out.println(Thread.currentThread().getName() + "准备取数据!");                                            blockingQueue.take();                                            System.out.println(Thread.currentThread().getName()+"取数据成功"+"当前队列有"+blockingQueue.size()+"个数据");                                       } catch (InterruptedException e) {                                            e.printStackTrace();                                        }                              }                     }         }).start();    } }

用两个具有1个空间的队列来实现同步通知的功能。 

import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BlockingQueueCommunicationTest {      public static void main(String[] args) {               new BlockingQueueCommunicationTest().execute();      }      private void execute(){               final Business business = new Business();                 new Thread(new Runnable() {                        public void run() {                         for(int j=1;j<=100;j++){                              business.sub(j);                         }                    }               }).start();                 for(int j=1;j<=100;j++){                    business.main(j);               }        }        private class Business{             BlockingQueue blockingQueue1 = new ArrayBlockingQueue(1);           BlockingQueue blockingQueue2 = new ArrayBlockingQueue(1);                    //匿名构造方法,先于非匿名构造方法执行           {                    try {                         blockingQueue2.put(1);                    } catch (InterruptedException e) {                        e.printStackTrace();                    }           }             public void sub(int j){                 try {                     blockingQueue1.put(1);                } catch (InterruptedException e) {                     e.printStackTrace();                }                for(int i=1;i<=10;i++){                     System.out.println("sub thread sequece of " + i + ",loop of " + j);                }                try {                     blockingQueue2.take();                } catch (InterruptedException e) {                     e.printStackTrace();                }          }       public void main(int j){            try {                 blockingQueue2.put(1);            } catch (InterruptedException e) {                 e.printStackTrace();            }            for(int i=1;i<=10;i++){                 System.out.println("main thread sequece of " + i + ",loop of " + j);            }            try {                 blockingQueue1.take();            } catch (InterruptedException e) {                 e.printStackTrace();            }     }   } }