天天看点

Java多线程学习笔记12之线程间通信

详细代码见:github代码地址

本节内容:

1) wait()锁释放与notify()锁不释放

2) 当interrupt()方法遇到wait()方法

3) notify()与notifyAll()使用

4) 方法wait(long)的作用

5) notify()方法通知过早

6) 调用wait()方法的条件变化,造成逻辑混乱

7) 生产者消费者模式, 一个生产者和一个消费者

(5) 方法wait()锁释放与notify()锁不释放

验证wait()释放锁举例:

package chapter03.section1.thread_3_1_4.project_1_waitReleaseLock;

public class Service {
	
	public void testMethod(Object lock) {
		try {
			synchronized(lock) {
				System.out.println("begin wait()");
				lock.wait();
				System.out.println("  end wait()");
			}
		} catch (InterruptedException e) {
			// TODO: handle exception
			e.printStackTrace();
		}
	}
}


package chapter03.section1.thread_3_1_4.project_1_waitReleaseLock;

public class ThreadA extends Thread {

	private Object lock;

	public ThreadA(Object lock) {
		super();
		this.lock = lock;
	}

	@Override
	public void run() {
		Service service = new Service();
		service.testMethod(lock);
	}
}


package chapter03.section1.thread_3_1_4.project_1_waitReleaseLock;

public class ThreadB extends Thread {
	private Object lock;

	public ThreadB(Object lock) {
		super();
		this.lock = lock;
	}

	@Override
	public void run() {
		Service service = new Service();
		service.testMethod(lock);
	}
}


package chapter03.section1.thread_3_1_4.project_1_waitReleaseLock;

public class Test {
	public static void main(String[] args) {

		Object lock = new Object();

		ThreadA a = new ThreadA(lock);
		a.start();

		ThreadB b = new ThreadB(lock);
		b.start();
	}
}
/*
result:
begin wait()
begin wait()
 */
           

结果分析:

wait方法自动释放锁

验证notify()被执行后,不释放锁。

package chapter03.section1.thread_3_1_4.project_2_notifyHoldLock;

public class Service {
	
	public void testMethod(Object lock) {
		try {
			synchronized(lock) {
				System.out.println("begin wait() ThreadName="
						+ Thread.currentThread().getName());
				lock.wait();
				System.out.println("  end wait() ThreadName="
						+ Thread.currentThread().getName());
			}
		} catch (InterruptedException e) {
			// TODO: handle exception
			e.printStackTrace();
		}
	}
	
	public void synNotifyMethod(Object lock) {
		try {
			synchronized (lock) {
				System.out.println("begin notify() ThreadName="
						+ Thread.currentThread().getName() + " time="
						+ System.currentTimeMillis());
				lock.notify();
				Thread.sleep(5000);
				System.out.println("  end notify() ThreadName="
						+ Thread.currentThread().getName() + " time="
						+ System.currentTimeMillis());
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}


package chapter03.section1.thread_3_1_4.project_2_notifyHoldLock;

public class NotifyThread extends Thread{
	private Object lock;

	public NotifyThread(Object lock) {
		super();
		this.lock = lock;
	}
	
	@Override
	public void run() {
		Service service = new Service();
		service.synNotifyMethod(lock);
	}
}


package chapter03.section1.thread_3_1_4.project_2_notifyHoldLock;

public class synNotifyMethodThread extends Thread{
	private Object lock;

	public synNotifyMethodThread(Object lock) {
		super();
		this.lock = lock;
	}

	@Override
	public void run() {
		Service service = new Service();
		service.synNotifyMethod(lock);
	}
}


package chapter03.section1.thread_3_1_4.project_2_notifyHoldLock;

public class ThreadA extends Thread {
	private Object lock;

	public ThreadA(Object lock) {
		super();
		this.lock = lock;
	}
	
	@Override
	public void run() {
		Service service =new Service();
		service.testMethod(lock);
	}
}


package chapter03.section1.thread_3_1_4.project_2_notifyHoldLock;

public class Test {
	public static void main(String[] args) throws InterruptedException {

		Object lock = new Object();

		ThreadA a = new ThreadA(lock);
		a.start();

		NotifyThread notifyThread = new NotifyThread(lock);
		notifyThread.start();

		synNotifyMethodThread c = new synNotifyMethodThread(lock);
		c.start();
	}
}
/*
result:
begin wait() ThreadName=Thread-0
begin notify() ThreadName=Thread-1 time=1540283462655
  end notify() ThreadName=Thread-1 time=1540283467660
  end wait() ThreadName=Thread-0
begin notify() ThreadName=Thread-2 time=1540283467660
  end notify() ThreadName=Thread-2 time=154028347266
 */
           

结果分析:

b线程执行notify()之后执行完run方法中代码才释放锁,c才开始获得lock执行

(6) 当interrupt()方法遇到wait方法

当线程呈wait状态时,调用线程对象的interrupt方法会出现InterruptedException异常

举个例子:

package chapter03.section1.thread_3_1_5.project_1_waitInterruptException;

public class Service {

	public void testMethod(Object lock) {
		try {
			synchronized (lock) {
				System.out.println("begin wait()");
				lock.wait();
				System.out.println("  end wait()");
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
			System.out.println("出现异常了,因为呈wait状态的线程被interrupt了!");
		}
	}
}


package chapter03.section1.thread_3_1_5.project_1_waitInterruptException;

public class ThreadA extends Thread {

	private Object lock;

	public ThreadA(Object lock) {
		super();
		this.lock = lock;
	}

	@Override
	public void run() {
		Service service = new Service();
		service.testMethod(lock);
	}
}


package chapter03.section1.thread_3_1_5.project_1_waitInterruptException;

public class Test {
	public static void main(String[] args) {

		try {
			Object lock = new Object();

			ThreadA a = new ThreadA(lock);
			a.start();

			Thread.sleep(5000);

			a.interrupt();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}
/*
result:
begin wait()
java.lang.InterruptedException
出现异常了,因为呈wait状态的线程被interrupt了!
	at java.base/java.lang.Object.wait(Native Method)
	at java.base/java.lang.Object.wait(Unknown Source)
	at chapter03.section1.thread_3_1_5.project_1_waitInterruptException.Service.testMethod(Service.java:9)
	at chapter03.section1.thread_3_1_5.project_1_waitInterruptException.ThreadA.run(ThreadA.java:15)
 */
           

由上面的实验总结如下:

1) 执行完同步代码块就会释放对象的锁

2) 在执行同步代码块的过程中,遇到异常而导致线程终止,锁也会被释放

2) 在执行同步代码块的过程中,执行了锁所属对象的wait()方法,这个线程会释放对象锁

,而此线程对象会进入线程等待池中,等待被唤醒。

(7) 只通知一个线程

调用方法notify()一次只随机通知一个线程进行唤醒。notify多次调用可以唤醒多个线程。

另外多次调用notify()方法可以替换成调用notifyAll()方法唤醒所有等待此对象锁的线程

举例:

package chapter03.section1.thread_3_1_6.project_1_notifyOne;


public class Service {

	public void testMethod(Object lock) {
		try {
			synchronized (lock) {
				System.out.println("begin wait() ThreadName="
						+ Thread.currentThread().getName());
				lock.wait();
				System.out.println("  end wait() ThreadName="
						+ Thread.currentThread().getName());
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}


package chapter03.section1.thread_3_1_6.project_1_notifyOne;

public class ThreadA extends Thread {
	private Object lock;

	public ThreadA(Object lock) {
		super();
		this.lock = lock;
	}

	@Override
	public void run() {
		Service service = new Service();
		service.testMethod(lock);
	}
}


package chapter03.section1.thread_3_1_6.project_1_notifyOne;

public class ThreadB extends Thread {
	private Object lock;

	public ThreadB(Object lock) {
		super();
		this.lock = lock;
	}

	@Override
	public void run() {
		Service service = new Service();
		service.testMethod(lock);
	}
}


package chapter03.section1.thread_3_1_6.project_1_notifyOne;

public class ThreadC extends Thread {
	private Object lock;

	public ThreadC(Object lock) {
		super();
		this.lock = lock;
	}

	@Override
	public void run() {
		Service service = new Service();
		service.testMethod(lock);
	}
}


package chapter03.section1.thread_3_1_6.project_1_notifyOne;

public class NotifyThread extends Thread {
	private Object lock;

	public NotifyThread(Object lock) {
		super();
		this.lock = lock;
	}

	@Override
	public void run() {
		synchronized (lock) {
			lock.notify();
			lock.notify();
			lock.notify();
			lock.notify();
			lock.notify();
			lock.notify();
			lock.notify();
			lock.notify();
			lock.notify();
			//可以调用lock.notifyAll()唤醒所有等待lock锁的线程
		}
	}
}


package chapter03.section1.thread_3_1_6.project_1_notifyOne;

public class Test {

	public static void main(String[] args) throws InterruptedException {

		Object lock = new Object();

		ThreadA a = new ThreadA(lock);
		a.start();

		ThreadB b = new ThreadB(lock);
		b.start();

		ThreadC c = new ThreadC(lock);
		c.start();

		Thread.sleep(1000);

		NotifyThread notifyThread = new NotifyThread(lock);
		notifyThread.start();
	}
}
/*
result:
begin wait() ThreadName=Thread-0
begin wait() ThreadName=Thread-1
begin wait() ThreadName=Thread-2
  end wait() ThreadName=Thread-0
  end wait() ThreadName=Thread-2
  end wait() ThreadName=Thread-1
 */
           

 (8) 方法wait(long)的作用

 带一个参数的wait(long)方法的功能是等待某一段时间内是否有线程对锁进行唤醒,如果

 超过这个时间则自动唤醒

 举个例子:

package chapter03.section1.thread_3_1_8.project_1_waitHasParamMethod;

public class MyRunnable {
	static private Object lock = new Object();
	static private Runnable runnable1 = new Runnable() {
		@Override
		public void run() {
			try {
				synchronized(lock) {
					System.out.println("wait begin timer="
							+ System.currentTimeMillis());
					lock.wait(5000);
					System.out.println("wait   end timer="
							+ System.currentTimeMillis());
				}
			} catch (InterruptedException e) {
				// TODO: handle exception
				e.printStackTrace();
			}
		}
	};
	
	static private Runnable runnable2 = new Runnable() {
		@Override
		public void run() {
			synchronized(lock) {
				System.out.println("notify begin timer="
						+ System.currentTimeMillis());
				lock.notify();
				System.out.println("notify   end timer="
						+ System.currentTimeMillis());
			}
		}
	};
	
	public static void main(String[] args) throws InterruptedException{
		Thread t1 = new Thread(runnable1);
		t1.start();
		Thread.sleep(3000);
		Thread t2 = new Thread(runnable2);
		t2.start();
	}
}
/*
带上注释的结果result:
wait begin timer=1540343153258
wait   end timer=1540343158275
可以看到过了大约5秒钟线程t1被自动唤醒
不带注释的结果result:
wait begin timer=1540343363801
notify begin timer=1540343366809
notify   end timer=1540343366809
wait   end timer=1540343366809
可以看到3秒后,对呈WAITING状态的线程进行了唤醒
*/
           

(9) 通知过早

如果通知过早,则会打乱程序正常的逻辑

举个例子:

package chapter03.section1.thread_3_1_9.project_1_firstNotify;

public class MyRun {
	
	private String lock = new String("");
	private boolean isFirstRunB = false;
	
	private Runnable runnableA = new Runnable(){
		@Override
		public void run() {
			try {
				synchronized(lock) {
					while(isFirstRunB == false) {
						System.out.println("begin wait");
						lock.wait();
						System.out.println("  end wait");
					}
				}
			} catch (InterruptedException e) {
				// TODO: handle exception
				e.printStackTrace();
			}
		}
	};
	
	private Runnable runnableB = new Runnable() {
		@Override
		public void run() {
			synchronized(lock) {
				System.out.println("begin notify");
				lock.notify();
				System.out.println("  end notify");
				isFirstRunB = true;
			}
		}
	};
	
	public static void main(String[] args) throws InterruptedException {
		MyRun run = new MyRun();
		
//		Thread a = new Thread(run.runnableA);
//		a.start();
//		
//		Thread.sleep(100);
//		
//		Thread b = new Thread(run.runnableB);
//		b.start();
		
		Thread b = new Thread(run.runnableB);
		b.start();
		
		Thread.sleep(100);
		
		Thread a = new Thread(run.runnableA);
		a.start();
	}
}
/*
不替换成注释内容result:
begin notify
  end notify
这是因为notify提早通知了,如果发出notify操作时没有
处于阻塞状态中的线程,那么该命令将被忽略
result:
begin wait
begin notify
  end notify
  end wait
*/
           

(10) 等待wait的条件发生变化

在使用wait/notify模式时,还需要注意另一种情况,也就是wait等待的条件发生了变化

,也容易造成程序逻辑的混乱

举个例子:

package chapter03.section1.tread_3_1_10.project_1_waitOld;

import java.util.ArrayList;
import java.util.List;

public class ValueObject {
	public static List<String> list = new ArrayList<>();
}


package chapter03.section1.tread_3_1_10.project_1_waitOld;

public class Add {
	
	private String lock;
	
	public Add(String lock) {
		super();
		this.lock = lock;
	}
	
	public void add() {
		synchronized(lock) {
			ValueObject.list.add("anyString");
			lock.notifyAll();
		}
	}
}


package chapter03.section1.tread_3_1_10.project_1_waitOld;

public class Substract {
	private String lock;
	
	public Substract(String lock) {
		super();
		this.lock = lock;
	}
	
	public void substract() {
		try {
			synchronized(lock) {
//				while(ValueObject.list.size() == 0) {
				if(ValueObject.list.size() == 0) {
					System.out.println("wait begin ThreadName="
							+ Thread.currentThread().getName());
					lock.wait();
					System.out.println("wait   end ThreadName="
							+ Thread.currentThread().getName());
				}
				ValueObject.list.remove(0);
				System.out.println("list size=" + ValueObject.list.size());
			}
		} catch (InterruptedException e) {
			// TODO: handle exception
			e.printStackTrace();
		}
	}
}


package chapter03.section1.tread_3_1_10.project_1_waitOld;

public class ThreadAdd extends Thread{

	private Add p;
	
	public ThreadAdd(Add p) {
		super();
		this.p = p;
	}
	
	@Override
	public void run() {
		p.add();
	}
}


package chapter03.section1.tread_3_1_10.project_1_waitOld;

public class ThreadSubstract extends Thread{
	private Substract r;
	
	public ThreadSubstract(Substract r) {
		super();
		this.r = r;
	}
	
	@Override 
	public void run() {
		r.substract();
	}
}


package chapter03.section1.tread_3_1_10.project_1_waitOld;

public class Run {
	public static void main(String[] args) throws InterruptedException {
		String lock = new String("");

		Add add = new Add(lock);
		Substract subtract = new Substract(lock);

		ThreadSubstract subtract1Thread = new ThreadSubstract(subtract);
		subtract1Thread.setName("subtract1Thread");
		subtract1Thread.start();

		ThreadSubstract subtract2Thread = new ThreadSubstract(subtract);
		subtract2Thread.setName("subtract2Thread");
		subtract2Thread.start();

		Thread.sleep(1000);

		ThreadAdd addThread = new ThreadAdd(add);
		addThread.setName("addThread");
		addThread.start();
	}
}

/*
不加注释result:
wait begin ThreadName=subtract1Thread
wait begin ThreadName=subtract2Thread
wait   end ThreadName=subtract1Thread
list size=0
wait   end ThreadName=subtract2Thread
Exception in thread "subtract2Thread" java.lang.IndexOutOfBoundsException: Index 0 out-of-bounds for length 0
	at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source)
	at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source)
	at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source)
	at java.base/java.util.Objects.checkIndex(Unknown Source)
	at java.base/java.util.ArrayList.remove(Unknown Source)
	at chapter03.section1.tread_3_1_10.project_1_waitOld.Substract.substract(Substract.java:21)
	at chapter03.section1.tread_3_1_10.project_1_waitOld.ThreadSubstract.run(ThreadSubstract.java:13)
加上注释带上while循环result:
wait begin ThreadName=subtract2Thread
wait begin ThreadName=subtract1Thread
wait   end ThreadName=subtract2Thread
list size=0
wait   end ThreadName=subtract1Thread
wait begin ThreadName=subtract1Thread
*/
           

结果分析:

不带注释出现异常的原因是由两个实现删除remove()操作的线程,当都被唤醒之后,第

一个可以删除增加的一个元素,第二个删除就越界了,抛出IndexOutOfBoundsException

异常。加入while以后,保证增加一个元素,只有一个线程来负责删除,其余的就进入while

循环调用lock.wait()阻塞等待了。

2. 生产者/消费者模式实现

等待/通知模式最经典的案例就是"生产者/消费者"模式

生产者消费者问题是线程模型中的经典问题: 生产者和消费者在同一时间段内共用同一存储

空间,生产者向空间里生产数据,而消费者取走数据。

(1) 一个生产者,一个消费者

举个例子:

package chapter03.section1.thread_3_1_11.project_1_p_r_test;

public class ValueObject {
	public static String value = "";
}


package chapter03.section1.thread_3_1_11.project_1_p_r_test;

//生产者
public class P {
	private String lock;
	
	public P(String lock) {
		super();
		this.lock = lock;
	}
	
	public void setValue() {
		try {
			synchronized(lock) {
				if(!ValueObject.value.equals("")) {
					lock.wait(); //等待消费者消费
				}
				String value = System.currentTimeMillis() + "_"
						+ System.nanoTime();
				System.out.println("set的值是" + value);
				ValueObject.value = value;
				lock.notify();	
			}
		} catch (InterruptedException e) {
			// TODO: handle exception
			e.printStackTrace();
		}
	}
}


package chapter03.section1.thread_3_1_11.project_1_p_r_test;
//消费者
public class C {
	
	private String lock;
	
	public C(String lock) {
		super();
		this.lock = lock;
	}
	
	public void getValue() {
		try {
			synchronized(lock) {
				if(ValueObject.value.equals("")) {
					lock.wait(); //等待生产者生产
				}
				System.out.println("get的值是" + ValueObject.value);
				ValueObject.value = "";
				lock.notify(); //唤醒生产者来生产
			}
		} catch (InterruptedException e) {
			// TODO: handle exception
			e.printStackTrace();
		}
	}
}


package chapter03.section1.thread_3_1_11.project_1_p_r_test;

public class ThreadC extends Thread{
	private C r;
	
	public ThreadC(C r) {
		super();
		this.r = r;
	}
	
	@Override
	public void run() {
		while(true) {
			r.getValue();
		}
	}
}


package chapter03.section1.thread_3_1_11.project_1_p_r_test;

public class ThreadP extends Thread{
	private P p;
	
	public ThreadP(P p) {
		super();
		this.p = p;
	}
	
	@Override
	public void run() {
		while(true) {
			p.setValue();
		}
	}
}


package chapter03.section1.thread_3_1_11.project_1_p_r_test;

public class Run {
	public static void main(String[] args) {
		String lock = new String("");
		P p = new P(lock);
		C r= new C(lock);
		
		ThreadP pThread = new ThreadP(p);
		ThreadC rThread = new ThreadC(r);
		
		pThread.start();
		rThread.start();
	}
}
/*
result:
......................................
set的值是1540348717246_1290335084879336
get的值是1540348717246_1290335084879336
set的值是1540348717246_1290335084899247
get的值是1540348717246_1290335084899247
set的值是1540348717246_1290335084915744
get的值是1540348717246_1290335084915744
set的值是1540348717246_1290335084936793
......................................
*/
           

结果分析:

可以看到消费者和生产者轮流消费和生产。但是在此基础上设计出多个生产者和多个消费

者,在运行的时候可能出现"假死"的情况,也就是所有的线程呈WAITING等待状态。