核心概述:本篇我們将繼續學習Java中的多線程,其中有多線程的等待喚醒機制、Condition接口的使用、Java中的線程池、Timer定時器以及ConcurrentHashMap的使用。
第一章:等待喚醒機制
1.1-線程間的通信(了解)
什麼是線程之間的通信呢?
就是多個線程在處理同一個資源,但是處理的動作(線程的任務)卻不相同。
比如:線程A用來生成包子的,線程B用來吃包子的,包子可以了解為同一資源,線程A與線程B處理的動作,一個是生産,一個是消費,那麼線程A與線程B之間就完成了通信,其實就是一種協作關系。
為什麼要處理線程間的通信?
多個線程并發執行時, 在預設情況下CPU是随機切換線程的,當我們需要多個線程來共同完成一件任務,并且我們 希望他們有規律的執行, 那麼多線程之間需要一些協調通信,以此來幫我們達到多線程共同操作一份資料。
如何保證線程間通信有效利用資源?
多個線程在處理同一個資源,并且任務不同時,需要線程通信來幫助解決線程之間對同一個變量的使用或操作。 就是多個線程在操作同一份資料時, 避免對同一共享變量的争奪。也就是我們需要通過一定的手段使各個線程能有效的利用資源。而這種手段即—— 等待喚醒機制。
1.2-什麼是等待喚醒機制(了解)
這是多個線程間的一種協作機制。談到線程我們經常想到的是線程間的競争(race),比如去争奪鎖,但這并不是故事的全部,線程間也會有協作機制。就好比在公司裡你和你的同僚們,你們可能存在在晉升時的競争,但更多時候你們更多是一起合作以完成某些任務。
就是在一個線程進行了規定操作後,就進入等待狀态(wait()), 等待其他線程執行完他們的指定代碼過後 再将其喚醒(notify());在有多個線程進行等待時, 如果需要,可以使用 notifyAll()來喚醒所有的等待線程。
wait/notify 就是線程間的一種協作機制。
1.3-等待喚醒相關方法(重要)
線程等待和喚醒的方法定義在
java.lang.Object
類中。
wait方法
當調用wait方法後,線程不再活動,不再參與排程,進入 wait set 中,是以不會浪費 CPU 資源,也不會去競争鎖了,這時的線程狀态即是 WAITING。它還要等着别的線程執行一個特别的動作,也即是“通知(notify)”在這個對象上等待的線程從wait set 中釋放出來,重新進入到排程隊列(ready queue)中。
notify方法
當調用notify方法後,則選取所通知對象的 wait set 中的一個線程釋放;例如,餐館有空位置後,等候就餐最久的顧客最先入座。
notifyAll方法
當調用notifyAll方法後,則釋放所通知對象的 wait set 上的全部線程。
注意事項
注意事項1:
哪怕隻通知了一個等待的線程,被通知線程也不能立即恢複執行,因為它當國中斷的地方是在同步塊内,而此刻它已經不持有鎖,是以她需要再次嘗試去擷取鎖(很可能面臨其它線程的競争),成功後才能在當初調用 wait 方法之後的地方恢複執行。
總而言之,如果能擷取鎖,線程就從 WAITING 狀态變成 RUNNABLE 狀态;否則,從 wait set 出來,又進入 entry set,線程就從 WAITING 狀态又變成 BLOCKED 狀态
注意事項2:
- wait方法與notify方法必須要由同一個鎖對象調用。因為:對應的鎖對象可以通過notify喚醒使用同一個鎖對象調用的wait方法後的線程。
- wait方法與notify方法是屬于Object類的方法的。因為:鎖對象可以是任意對象,而任意對象的所屬類都是繼承了Object類的。
- wait方法與notify方法必須要在同步代碼塊或者是同步函數中使用。因為:必須要通過鎖對象調用這2個方 法。
1.4-案例(練習)
等待喚醒機制其實就是經典的“生産者與消費者”的問題。
就拿生産包子消費包子來說等待喚醒機制如何有效利用資源
需求
定義一個變量,包子鋪線程完成生産包子,包子進行++操作;吃貨線程完成購買包子,包子變量列印出來。
- 當包子沒有時(包子狀态為false),吃貨線程等待。
- 包子鋪線程生産包子(即包子狀态為true),并通知吃貨線程(解除吃貨的等待狀态)。
- 保證線程安全,必須生産一個消費一個,不能同時生産或者消費多個。
代碼
包子鋪類
public class BaoZiPu {
private int baoZiCount;
//标志位變量
//當包子沒有時(包子狀态為false),吃貨線程等待。
//包子鋪線程生産包子(即包子狀态為true),并通知吃貨線程(解除吃貨的等待狀态)。
private boolean flag;
public void setFlag(boolean flag){
this.flag = flag;
}
public boolean getFlag(){
return flag;
}
//消費者調用方法,變量輸出
public void get(){
System.out.println("消費第"+baoZiCount+"個包子");
}
//生産者調用方法,變量++
public void set(){
baoZiCount++;
System.out.println("生産第"+baoZiCount+"個包子");
}
}
生産者類
public class Product implements Runnable{
private BaoZiPu baoZiPu;
public Product(BaoZiPu baoZiPu){
this.baoZiPu = baoZiPu;
}
@Override
public void run() {
while (true){
synchronized (baoZiPu) {
//生産者線程判斷标志位變量,==true,已經生産還沒有消費
if(baoZiPu.getFlag() == true){
try {
//線程等待
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//生産一個
baoZiPu.set();
//修改标志位
baoZiPu.setFlag(true);
//喚醒對方線程
notify();
}
}
}
}
消費者類
public class Customer implements Runnable {
private BaoZiPu baoZiPu;
public Customer(BaoZiPu baoZiPu){
this.baoZiPu = baoZiPu;
}
@Override
public void run() {
while (true){
synchronized (baoZiPu) {
//消費者線程判斷标志位,==false,沒有生産
if(baoZiPu.getFlag()==false) {
try {
//線程等待
wait();
} catch (InterruptedException ex) {
}
}
//調用消費方法
baoZiPu.get();
//修改标志位
baoZiPu.setFlag(false);
//喚醒對方線程
notify();
}
}
}
}
測試類
public class Test{
public static void main(String[] args) {
BaoZiPu baoZiPu = new BaoZiPu();
Product product = new Product(baoZiPu);
Customer customer = new Customer(baoZiPu);
new Thread(product).start();
new Thread(customer).start();
}
}
執行結果
異常分析
- 程式出現無效的螢幕狀态異常。
- wait()或者notify()方法會抛出此異常。
- 程式中,wait()或者notify()方法的調用者是this對象。
- 而this對象在同步中并不是鎖對象,隻有作為鎖的對象才能調用wait()或者notify()方法。
- 而鎖對象是生産者和消費者共享的包子鋪對象。
代碼改造
生産者類
public class Product implements Runnable{
private BaoZiPu baoZiPu;
public Product(BaoZiPu baoZiPu){
this.baoZiPu = baoZiPu;
}
@Override
public void run() {
while (true){
synchronized (baoZiPu) {
//生産者線程判斷标志位變量,==true,已經生産還沒有消費
if(baoZiPu.getFlag() == true){
try {
//線程等待
baoZiPu.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//生産一個
baoZiPu.set();
//修改标志位
baoZiPu.setFlag(true);
//喚醒對方線程
baoZiPu.notify();
}
}
}
}
消費者類
public class Customer implements Runnable {
private BaoZiPu baoZiPu;
public Customer(BaoZiPu baoZiPu){
this.baoZiPu = baoZiPu;
}
@Override
public void run() {
while (true){
synchronized (baoZiPu) {
//消費者線程判斷标志位,==false,沒有生産
if(baoZiPu.getFlag()==false) {
try {
//線程等待
baoZiPu.wait();
} catch (InterruptedException ex) {
}
}
//調用消費方法
baoZiPu.get();
//修改标志位
baoZiPu.setFlag(false);
//喚醒對方線程
baoZiPu.notify();
}
}
}
}
代碼優化
通過線程等待與喚醒,實作了生産者與消費者案例,但是代碼維護性差,閱讀性差,使用同步方法進行代碼的優化。在包子鋪類中的get(),set()方法進行同步方法的改進。
注意:一旦方法同步後,this就是鎖對象。
包子鋪類:變量flag隻在類中使用,是以可以去掉get/set方法。
包子鋪類
public class BaoZiPu {
private int baoZiCount;
//标志位變量
//當包子沒有時(包子狀态為false),吃貨線程等待。
//包子鋪線程生産包子(即包子狀态為true),并通知吃貨線程(解除吃貨的等待狀态)。
private boolean flag;
//消費者調用方法,使用同步
public synchronized void get(){
//判斷标志位 ==false,沒有生産,線程等待
if (flag == false)
try {
this.wait();
}catch (InterruptedException ex){}
System.out.println("消費第"+baoZiCount+"個包子");
//修改标志位
flag = false;
//喚醒對方線程
this.notify();
}
//生産者調用方法,變量++,使用同步
public synchronized void set(){
//判斷标志位,==true,沒有消費,線程等待
if(flag == true)
try {
this.wait();
}catch (InterruptedException ex){}
baoZiCount++;
System.out.println("生産第"+baoZiCount+"個包子");
//修改标志位
flag = true;
//喚醒對方線程
this.notify();
}
}
生産者類
public class Product implements Runnable{
private BaoZiPu baoZiPu;
public Product(BaoZiPu baoZiPu){
this.baoZiPu = baoZiPu;
}
@Override
public void run() {
while (true){
baoZiPu.set();
}
}
}
消費者類
public class Customer implements Runnable {
private BaoZiPu baoZiPu;
public Customer(BaoZiPu baoZiPu){
this.baoZiPu = baoZiPu;
}
@Override
public void run() {
while (true){
baoZiPu.get();
}
}
}
1.5-sleep()方法和wait()方法的差別(了解)
- sleep()是Thread類靜态方法,不需要對象鎖。
- wait()方法是Object類的方法,被鎖對象調用,而且隻能出現在同步中。
- 執行sleep()方法的線程不會釋放同步鎖。
- 執行wait()方法的線程要釋放同步鎖,被喚醒後還需擷取鎖才能執行。
1.6-多生産者多消費者(了解)
概述
上一練習中,我們實作了生産者和消費者案例,但是如果我們開啟多個生産者線程和多個生産者線程會發生什麼現象呢,線程還會安全嗎?
線程安全原因分析
當開啟了多個線程後,資料出現了安全問題。問題就出現在等待和喚醒環節。我們将線程分成了生産者和消費者兩個部分,需要生産者線程喚醒消費者線程,而消費者線程要喚醒生産者線程。但是線程的喚醒是按照隊列形式進行,先等待的會先被喚醒。很可能出現生産者線程又喚醒了生産者線程,消費者線程喚醒了消費者線程。是以我們需要将線程全部喚醒,使用notifyAll()方法。
全部喚醒後,線程依然不安全,是因為線程判斷完标志位後就會等待,當被喚醒後,就不會再判斷标志位了,我們必須讓線程在喚醒後,還要繼續判斷标志位,允許生存才能生産,不運作生産就要繼續等待。
改造代碼實作多生産和多消費
包子鋪類
public class BaoZiPu {
private int baoZiCount;
//标志位變量
//當包子沒有時(包子狀态為false),吃貨線程等待。
//包子鋪線程生産包子(即包子狀态為true),并通知吃貨線程(解除吃貨的等待狀态)。
private boolean flag;
//消費者調用方法,使用同步
public synchronized void get(){
//判斷标志位 ==false,沒有生産,線程等待
while (flag == false)
try {
this.wait();
}catch (InterruptedException ex){}
System.out.println("消費第"+baoZiCount+"個包子");
//修改标志位
flag = false;
//喚醒對方線程
this.notifyAll();
}
//生産者調用方法,變量++,使用同步
public synchronized void set(){
//判斷标志位,==true,沒有消費,線程等待
while(flag == true)
try {
this.wait();
}catch (InterruptedException ex){}
baoZiCount++;
System.out.println("生産第"+baoZiCount+"個包子");
//修改标志位
flag = true;
//喚醒對方線程
this.notifyAll();
}
}
第二章:Condition接口
2.1-等待喚醒的弊端(了解)
多生産與多消費案例中,我們使用了線程通信的相關方法wait()和notify(),notifyAll()。
-
public final native void wait(long timeout) throws InterruptedException
-
public final native void notify()
-
public final native void notifyAll()
以上三個方法都是本地方法,要和作業系統進行互動,是以線程等待喚醒需要消耗系統資源,程式效率降低。另外我們一次喚醒所有的線程,也會浪費很多資源,為了解決這些弊端,JDK1.5版本的時候出現了Lock接口和Condition接口。
2.2-Condition接口(重點)
介紹
Condition
将
Object
螢幕方法(
wait
、
notify
和
notifyAll
)分解成截然不同的對象,以便通過将這些對象與任意
Lock
實作組合使用,為每個對象提供多個等待 set(wait-set)。其中,
Lock
替代了
synchronized
方法和語句的使用,
Condition
替代了
Object
螢幕方法的使用。
擷取Condition對象
Lock接口的方法newCondition()擷取
-
public Condition newCondition()
Condition對象常用方法
Condition接口方法和Object類方法比較
- Condition可以和任意的Lock組合,實作管理線程的阻塞隊列(直接在記憶體重操作)。
- 一個線程的案例中,可以使用多個Lock鎖,每個Lock鎖上可以結合Condition對象。
- synchronized同步中做不到将線程劃分到不同的隊列中(需要本地方法[c++編寫]和作業系統互動)。
- Object類wait()和notify()都要和作業系統互動,并通知CPU挂起線程,喚醒線程,效率低。
- Condition接口方法await()不和作業系統互動,而是讓釋放鎖,并存放到線程隊列容器中(記憶體總),當被signal()喚醒後,從隊列中出來,從新擷取鎖後在執行。
- 是以使用Lock和Condition的效率比Object要快很多。
生産者與消費者案例改進
包子鋪類
public class BaoZiPu {
private int baoZiCount;
//标志位變量
//當包子沒有時(包子狀态為false),吃貨線程等待。
//包子鋪線程生産包子(即包子狀态為true),并通知吃貨線程(解除吃貨的等待狀态)。
private boolean flag;
//建立Lock接口實作類,線程安全提供鎖定
private Lock lock = new ReentrantLock();
//Condition對象和生産者鎖結合
private Condition productCondition = lock.newCondition();
//Condition對象和消費者鎖結合
private Condition customerCondition = lock.newCondition();
public void setFlag(boolean flag){
this.flag = flag;
}
public boolean getFlag(){
return flag;
}
//消費者調用方法,消費者Lock對象鎖定
public void get(){
lock.lock();
//判斷标志位 ==false,沒有生産,線程等待
while (flag == false)
try {
customerCondition.await();
}catch (InterruptedException ex){}
System.out.println("消費第"+baoZiCount+"個包子");
//修改标志位
flag = false;
//喚醒對方線程
productCondition.signal();
lock.unlock();
}
//生産者調用方法,變量++,生産者Lock對象鎖定
public void set(){
lock.lock();
//判斷标志位,==true,沒有消費,線程等待
while(flag == true)
try {
productCondition.await();
}catch (InterruptedException ex){}
baoZiCount++;
System.out.println("生産第"+baoZiCount+"個包子");
//修改标志位
flag = true;
//喚醒對方線程
customerCondition.signal();
lock.unlock();
}
}
第三章:線程池
3.1-概述(了解)
線程池思想
我們使用線程的時候就去建立一個線程,這樣實作起來非常簡便,但是就會有一個問題:
如果并發的線程數量很多,并且每個線程都是執行一個時間很短的任務就結束了,這樣頻繁建立線程就會大大降低系統的效率,因為頻繁建立線程和銷毀線程需要時間。
那麼有沒有一種辦法使得線程可以複用,就是執行完一個任務,并不被銷毀,而是可以繼續執行其他的任務?
在Java中可以通過線程池來達到這樣的效果。
什麼是線程池
其實就是一個容納多個線程的容器,其中的線程可以反複使用,省去了頻繁建立線程對象的操作,無需反複建立線程而消耗過多資源。
合理使用線程池的好處
- 降低資源消耗。減少了建立和銷毀線程的次數,每個工作線程都可以被重複利用,可執行多個任務。
- 提高響應速度。當任務到達時,任務可以不需要的等到線程建立就能立即執行。
- 提高線程的可管理性。可以根據系統的承受能力,調整線程池中工作線線程的數目,防止因為消耗過多的記憶體,而把伺服器累趴下(每個線程需要大約1MB記憶體,線程開的越多,消耗的記憶體也就越大,最後當機)。
3.2-使用線程池(重點)
java.util.concurrent
包中定義了線程池相關的類和接口。
Java裡面線程池的頂級接口是
java.util.concurrent.Executor
,但是嚴格意義上講 Executor 并不是一個線程 池,而隻是一個執行線程的工具。真正的線程池接口是
java.util.concurrent.ExecutorService
。
要配置一個線程池是比較複雜的,尤其是對于線程池的原理不是很清楚的情況下,很有可能配置的線程池不是較優的,是以在 java.util.concurrent.Executors 線程工廠類裡面提供了一些靜态工廠,生成一些常用的線程池。官方建議使用Executors工程類來建立線程池對象。
Executors類
建立線程池對象的工廠方法,使用此類可以建立線程池對象。
ExecutorService接口
線程池對象的管理接口,送出線程任務,關閉線程池等功能。
Callable接口
線程執行的任務接口,類似于Runnable接口。
- 接口方法
public V call()throw Exception
- 線程要執行的任務方法
- 比起run()方法,call()方法具有傳回值,可以擷取到線程執行的結果。
Future接口
異步計算結果,就是線程執行完成後的結果。
- 接口方法
擷取線程執行的結果,就是擷取call()方法傳回值。public V get()
示例代碼
需求:建立有2個線程的線程池,分别送出線程執行的任務,一個線程執行字元串切割,一個執行1+100的和。
實作Callable接口,字元串切割功能:
public class MyStringCallable implements Callable<String[]> {
private String str;
public MyStringCallable(String str ){
this.str = str;
}
@Override
public String[] call() throws Exception {
return str.split(" +");
}
}
實作Callable接口,1+100求和:
public class MySumCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int sum = 0;
for(int x = 1 ; x<= 100; x++){
sum+=x;
}
return sum;
}
}
測試類:
public static void main(String[] args) throws Exception {
//建立有2個線程的線程池
ExecutorService executorService = Executors.newFixedThreadPool(2);
//送出執行字元串切割任務
Future<String[]> futureString = executorService.submit(new MyStringCallable("aa bbb cc d e"));
System.out.println(Arrays.toString(futureString.get()));
//送出執行求和任務
Future<Integer> futureSum = executorService.submit(new MySumCallable());
System.out.println(futureSum.get());
executorService.shutdown();
}
第四章:Timer定時器
4.1-概述(了解)
Java中的定時器,可以根據指定的時間來運作程式。
java.util.Timer
一種工具,線程用其安排以後在背景線程中執行的任務。可安排任務執行一次,或者定期重複執行。定時器是使用建立的線程來執行,這樣即使主線程main結束了,定時器也依然會繼續工作。
4.2-Timer定時器的使用
常用方法
- 構造方法:無參數。
- 定時方法:public void schedule(TimerTask task,Date firstTime,long period)
- TimerTask是定時器要執行的任務,一個抽象類,我們需要繼承并重寫方法run()
- firstTime定時器開始執行的時間
- period時間間隔,毫秒值
示例
public class Test{
public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
int i = 0;
@Override
public void run() {
i++;
System.out.println(i);
}
},new Date(),1000);
}
}
第五章:ConcurrentHashMap
5.1-概述(了解)
java.util.concurrent.ConcurrentHashMap
支援擷取的完全并發和更新的所期望可調整并發的哈希表。
此集合實作Map接口,是以Map集合中的所有功能都可以直接使用。
- ConcurrentHashMap集合特點
- 底層是哈希表結構
- 此集合是線程安全的,但是某些功能不必鎖定。比如get()
- 不會抛出ConcurrentModificationException并發修改異常
- 此集合支援周遊過程中添加,删除元素。
- ConcurrentHashMap集合的鎖定特點
- 為了提高效率,不會将整個集合全部鎖定。
- 當添加或者移除元素時,是對連結清單進行操作,連結清單存儲在數組中,那麼就隻會針對這個連結清單進行鎖定。
5.2-疊代中添加元素(測試)
public static void main(String[] args) throws Exception {
Map<String,String> map = new ConcurrentHashMap<String, String>();
map.put("1","a");
map.put("2","b");
map.put("3","c");
System.out.println(map);
Set<Map.Entry<String,String>> set = map.entrySet();
Iterator<Map.Entry<String,String>> it = set.iterator();
while (it.hasNext()){
map.put("4","4");
Map.Entry<String, String> next = it.next();
System.out.println(next.getKey()+"="+next.getValue());
}
}
5.3-線程安全測試
public static void main(String[] args) throws Exception {
Map<String,Integer> map = new ConcurrentHashMap<String, Integer>();
Map<String,Integer> map = new HashMap<String, Integer>();
//存儲2000個鍵值對
for(int x = 0 ; x < 2000; x++){
map.put("count"+x,x);
}
//開啟線程,删除前500個
Runnable r1 = new Runnable() {
@Override
public void run() {
for(int i = 0 ; i < 500;i++){
map.remove("count"+i);
}
}
};
//開啟線程,删除1000-1500個
Runnable r2 = new Runnable() {
@Override
public void run() {
for(int i = 1000 ; i < 1500;i++){
map.remove("count"+i);
}
}
};
new Thread(r1).start();
new Thread(r2).start();
//等待2秒,讓2個線程全部運作完畢
Thread.sleep(2000);
//列印集合長度,線程安全集合應該是1000
System.out.println(map.size());
}