天天看點

java并發之線程同步(synchronized和鎖機制)

目錄

  • 使用synchronized實作同步方法
  • 使用非依賴屬性實作同步
  • 在同步塊中使用條件(wait(),notify(),notifyAll())
  • 使用鎖實作同步
  • 使用讀寫鎖實作同步資料通路
  • 修改鎖的公平性
  • 在鎖中使用多條件(Multri Condition)

正文

多個執行線程共享一個資源的情景,是并發程式設計中最常見的情景之一。多個線程讀或者寫相同的資料等情況時可能會導緻資料不一緻。為了解決這些問題,引入了臨界區概念。臨界區是一個用以通路共享資源的代碼塊,這個代碼塊在同一時間内隻允許一個線程執行。

Java提供了同步機制。當一個線程試圖通路一個臨界區時,它将使用一種同步機制來檢視是不是已有其他線程進入臨界區。如果沒有其他線程進入臨界區,它就可以進入臨界區;如果已有線程進入了臨界區,它就被同步機制挂起,直到進入的線程離開這個臨界區。如果在等待進入臨界區的線程不止一個,JVM會随機選擇其中的一個,其餘的将繼續等待。

概念比較好了解,具體在java程式中是如何展現的呢?臨界區對應的代碼是怎麼樣的?

回到頂部

每一個用synchronized關鍵字聲明的方法都是臨界區。在Java中,同一個對象的臨界區,在同一時間隻有一個允許被通路。

注意:用synchronized關鍵字聲明的靜态方法,同時隻能被一個執行線程通路,但是其他線程可以通路這個對象的非靜态方法。即:兩個線程可以同時通路一個對象的兩個不同的synchronized方法,其中一個是靜态方法,一個是非靜态方法。

知道了synchronized關鍵字的作用,再來看一下synchronized關鍵字的使用方式。

  • 在方法聲明中加入synchronized關鍵字
  • 1 public synchronized void addAmount(double amount) {
    2 }      
  • 在代碼塊中使用synchronized關鍵字,obj一般可以使用this關鍵字表示本類對象
  • 1 synchronized(obj){
    2 }      

需要注意的是:前面已經提到,引入synchronized關鍵字是為了聲明臨界區,解決在多線程環境下共享變量的資料更改安全問題。那麼,一般用到synchronized關鍵字的地方也就是 在對共享資料 通路或者修改的地方。下面舉一個例子,例子場景是這樣:公司定時會給賬戶打款,銀行對賬戶進行扣款。那麼款項對于銀行和公司來說就是一個共享資料。那麼synchronized關鍵字就應該在修改賬戶的地方使用。

聲明一個Account類:

1 public class Account {
 2     private double balance;
 3     public double getBalance() {
 4         return balance;
 5     }
 6     public void setBalance(double balance) {
 7         this.balance = balance;
 8     }
 9     public synchronized void addAmount(double amount) {
10         double tmp=balance;
11         try {
12             Thread.sleep(10);
13         } catch (InterruptedException e) {
14             e.printStackTrace();
15         }
16         tmp+=amount;
17         balance=tmp;
18     }
19     public synchronized void subtractAmount(double amount) {
20         double tmp=balance;
21         try {
22             Thread.sleep(10);
23         } catch (InterruptedException e) {
24             e.printStackTrace();
25         }
26         tmp-=amount;
27         balance=tmp;
28     }
29 }      
java并發之線程同步(synchronized和鎖機制)

Bank類扣款:

java并發之線程同步(synchronized和鎖機制)
1 public class Bank implements Runnable {
 2     private Account account;
 3     public Bank(Account account) {
 4         this.account=account;
 5     }
 6     public void run() {
 7         for (int i=0; i<100; i++){
 8             account.subtractAmount(1000);
 9         }
10     }
11 }      
java并發之線程同步(synchronized和鎖機制)

Company類打款:

java并發之線程同步(synchronized和鎖機制)
1 public class Company implements Runnable {
 2     private Account account;
 3     public Company(Account account) {
 4         this.account=account;
 5     }
 6 
 7     public void run() {
 8         for (int i=0; i<100; i++){
 9             account.addAmount(1000);
10         }
11     }
12 }      
java并發之線程同步(synchronized和鎖機制)

這裡需要注意的就是:在Bank和Company的構造函數裡面傳遞的參數是Account,就是一個共享資料。

Main函數:

java并發之線程同步(synchronized和鎖機制)
1 public class Main {
 2     public static void main(String[] args) {
 3         Account    account=new Account();
 4         account.setBalance(1000);
 5         Company    company=new Company(account);
 6         Thread companyThread=new Thread(company);
 7         Bank bank=new Bank(account);
 8         Thread bankThread=new Thread(bank);
 9 
10         companyThread.start();
11         bankThread.start();
12         try {
13             companyThread.join();
14             bankThread.join();
15             System.out.printf("Account : Final Balance: %f\n",account.getBalance());
16         } catch (InterruptedException e) {
17             e.printStackTrace();
18         }
19     }
20 }      
java并發之線程同步(synchronized和鎖機制)

這個例子比較簡單,但是可以說明問題。

補充:

1、synchronized關鍵字會降低應用程式的性能,是以隻能在并發場景中修改共享資料的方法上使用它。

2、臨界區的通路應該盡可能的短。方法的其餘部分保持在synchronized代碼塊之外,以擷取更好的性能

非依賴屬性:例如在一個類中有兩個非依賴屬性,Object obj1,Object obj2;他們被多個線程共享,那麼同一時間隻允許一個線程通路其中的一個屬性變量,其他的某個線程通路另一個屬性變量。

舉例如下:兩個看電影的房間和兩個售票口,一個售票處賣出的一張票,隻能用于其中的一個電影院。不能同時作用于兩個電影房間。

Cinema類:

java并發之線程同步(synchronized和鎖機制)
1 public class Cinema {
 2     private long vacanciesCinema1;
 3     private long vacanciesCinema2;
 4 
 5     private final Object controlCinema1, controlCinema2;
 6 
 7     public Cinema(){
 8         controlCinema1=new Object();
 9         controlCinema2=new Object();
10         vacanciesCinema1=20;
11         vacanciesCinema2=20;
12     }
13     
14     public boolean sellTickets1 (int number) {
15         synchronized (controlCinema1) {
16             if (number<vacanciesCinema1) {
17                 vacanciesCinema1-=number;
18                 return true;
19             } else {
20                 return false;
21             }
22         }
23     }
24     
25     public boolean sellTickets2 (int number){
26         synchronized (controlCinema2) {
27             if (number<vacanciesCinema2) {
28                 vacanciesCinema2-=number;
29                 return true;
30             } else {
31                 return false;
32             }
33         }
34     }
35     
36     public boolean returnTickets1 (int number) {
37         synchronized (controlCinema1) {
38             vacanciesCinema1+=number;
39             return true;
40         }
41     }
42     public boolean returnTickets2 (int number) {
43         synchronized (controlCinema2) {
44             vacanciesCinema2+=number;
45             return true;
46         }
47     }
48     public long getVacanciesCinema1() {
49         return vacanciesCinema1;
50     }
51     public long getVacanciesCinema2() {
52         return vacanciesCinema2;
53     }
54 }      
java并發之線程同步(synchronized和鎖機制)

這樣的話,vacanciescinema1和vacanciescinema2(剩餘票數)是獨立的,因為他們屬于不同的對象。這種情況下,隻允許一個同時有一個線程修改vacanciescinema1或者vacanciescinema2,但是允許有兩個線程同時修改vacanciescinema1和vacanciescinema2。

首先需要明确:

  1. 上述三個方法都是Object 類的方法。
  2. 上述三個方法都必須在同步代碼塊中使用。

當一個線程調用wait()方法時,JVM将這個線程置入休眠,并且釋放控制這個同步代碼塊的對象,同時允許其他線程執行這個對象控制的其他同步代碼塊。為了喚醒這個線程,必須在這個對象控制的某個同步代碼塊中調用notify()或者notifyAll()方法。

上述一段話很重要!!!它說明了使用上述三個函數的方法以及方法的作用。

wait():将線程置入休眠狀态,并且釋放控制這個同步代碼塊的對象,釋放了以後其他線程就可以執行這個對象控制的其他代碼塊。也就是可以進入了。這個和Thread.sleep(millions)方法不同,sleep()方法是睡眠指定時間後自動喚醒。

notify()/notifyAll():使用wait()方法休眠的線程需要在該對象控制的某個同步代碼塊中 調用notify或者notifyAll()方法去喚醒,才能進入就緒狀态等待JVM的調用。否則一緻處于休眠狀态。

難點:線程休眠和喚醒的時機,就是說什麼時候調用notify()或者notifyAll()方法???

拿生産者和消費者的例子來說:生産者往隊列中塞資料,消費者從隊列中取資料,是以這個隊列是共享資料

資料存儲類 EventStorage

塞資料方法和取資料方法:set()、get()

java并發之線程同步(synchronized和鎖機制)
1 public synchronized void set(){
 2             while (storage.size()==maxSize){
 3                 try {
 4                     wait();
 5                 } catch (InterruptedException e) {
 6                     e.printStackTrace();
 7                 }
 8             }
 9             storage.add(new Date());
10             System.out.printf("Set: %d\n", storage.size());
11             notify();
12     }    
13    public synchronized void get(){
14             while (storage.size()==0){
15                 try {
16                     wait();
17                 } catch (InterruptedException e) {
18                     e.printStackTrace();
19                 }
20             }
21             System.out.printf("Get: %d: %s\n",storage.size(),((LinkedList<?>)storage).poll());
22             notify();
23     }      
java并發之線程同步(synchronized和鎖機制)

分析上面這個簡單的程式:

1、方法使用synchronized關鍵字聲明同步代碼塊。是以這個函數裡面可以使用同步條件。

2、首先判斷隊列是否已經滿了,這裡要使用while而不是if。為什麼呢?while是一緻查詢是否已經滿了,而if是判斷一次就完事了。

3、如果滿了,調用wait()方法釋放該對象,那麼其他方法(例如get())就可以使用這個對象了。get()方法進入後取出一個資料,然後喚醒上一個被休眠的線程。

4、雖然線程被喚醒了,但是由于get()方法線程占用對象鎖,是以set()方法處于阻塞狀态。直到get()方法取出所有的資料滿足休眠條件以後,set()方法重新執行

5、重複以上步驟

Java提供了同步代碼塊的另一種機制,它比synchronized關鍵字更強大也更加靈活。這種機制基于Lock接口及其實作類(例如:ReentrantLock)

它比synchronized關鍵字好的地方:

1、提供了更多的功能。tryLock()方法的實作,這個方法試圖擷取鎖,如果鎖已經被其他線程占用,它将傳回false并繼續往下執行代碼。

2、Lock接口允許分離讀和寫操作,允許多個線程讀和隻有一個寫線程。ReentrantReadWriteLock

3、具有更好的性能

一個鎖的使用執行個體:

java并發之線程同步(synchronized和鎖機制)
1 public class PrintQueue {
 2     private final Lock queueLock=new ReentrantLock();
 3 
 4     public void printJob(Object document){
 5         queueLock.lock();
 6         
 7         try {
 8             Long duration=(long)(Math.random()*10000);
 9             System.out.printf("%s: PrintQueue: Printing a Job during %d seconds\n",Thread.currentThread().getName(),(duration/1000));
10             Thread.sleep(duration);
11         } catch (InterruptedException e) {
12             e.printStackTrace();
13         } finally {
14             queueLock.unlock();
15         }
16     }
17 }      
java并發之線程同步(synchronized和鎖機制)

聲明一把鎖,其中ReentrantLock(可重入的互斥鎖)是Lock接口的一個實作

1 private final Lock queueLock=new ReentrantLock();      

然後在函數裡面調用lock()方法聲明同步代碼塊(臨界區)

1 queueLock.lock();      

最後在finally塊中釋放鎖,重要!!!

1 queueLock.unlock();      

鎖機制最大的改進之一就是ReadWriteLock接口和他的唯一實作類ReentrantReadWriteLock.這個類有兩個鎖,一個是讀操作鎖,一個是寫操作鎖。使用讀操作鎖時可以允許多個線程同時通路,使用寫操作鎖時隻允許一個線程進行。在一個線程執行寫操作時,其他線程不能夠執行讀操作。

在調用寫操作鎖時,使用一個線程。

寫操作鎖的用法:

java并發之線程同步(synchronized和鎖機制)
1 public void setPrices(double price1, double price2) {
2         lock.writeLock().lock();
3         this.price1=price1;
4         this.price2=price2;
5         lock.writeLock().unlock();
6     }      
java并發之線程同步(synchronized和鎖機制)

讀操作鎖:

java并發之線程同步(synchronized和鎖機制)
1   public double getPrice1() {
 2         lock.readLock().lock();
 3         double value=price1;
 4         lock.readLock().unlock();
 5         return value;
 6     }
 7     public double getPrice2() {
 8         lock.readLock().lock();
 9         double value=price2;
10         lock.readLock().unlock();
11         return value;
12     }      
java并發之線程同步(synchronized和鎖機制)

ReentrantLock和ReetrantReadWriteLock構造函數都含有一個布爾參數fair。預設fair為false,即非公平模式。

公平模式:當有很多線程在等待鎖時,鎖将選擇一個等待時間最長的線程進入臨界區。

非公平模式:當有很多線程在等待鎖時,鎖将随機選擇一個等待區(就緒狀态)的線程進入臨界區。

這兩種模式隻适用于lock()和unlock()方。而Lock接口的tryLock()方法沒有将線程置于休眠,fair屬性并不影響這個方法。

鎖條件可以和synchronized關鍵字聲明的臨界區的方法(wait(),notify(),notifyAll())做類比。鎖條件通過Conditon接口聲明。Condition提供了挂起線程和喚醒線程的機制。

使用方法:

java并發之線程同步(synchronized和鎖機制)
1 private Condition lines;
 2     private Condition space;
 3      */
 4     public void insert(String line) {
 5         lock.lock();
 6         try {
 7             while (buffer.size() == maxSize) {
 8                 space.await();
 9             }
10             buffer.offer(line);
11             System.out.printf("%s: Inserted Line: %d\n", Thread.currentThread()
12                     .getName(), buffer.size());
13             lines.signalAll();
14         } catch (InterruptedException e) {
15             e.printStackTrace();
16         } finally {
17             lock.unlock();
18         }
19     }
20 public String get() {
21         String line=null;
22         lock.lock();        
23         try {
24             while ((buffer.size() == 0) &&(hasPendingLines())) {
25                 lines.await();
26             }
27             
28             if (hasPendingLines()) {
29                 line = buffer.poll();
30                 System.out.printf("%s: Line Readed: %d\n",Thread.currentThread().getName(),buffer.size());
31                 space.signalAll();
32             }
33         } catch (InterruptedException e) {
34             e.printStackTrace();
35         } finally {
36             lock.unlock();
37         }
38         return line;
39     }      
java并發之線程同步(synchronized和鎖機制)

  • 1 public synchronized void addAmount(double amount) {
    2 }      
  • 1 synchronized(obj){
    2 }      
java并發之線程同步(synchronized和鎖機制)
1 public class Account {
 2     private double balance;
 3     public double getBalance() {
 4         return balance;
 5     }
 6     public void setBalance(double balance) {
 7         this.balance = balance;
 8     }
 9     public synchronized void addAmount(double amount) {
10         double tmp=balance;
11         try {
12             Thread.sleep(10);
13         } catch (InterruptedException e) {
14             e.printStackTrace();
15         }
16         tmp+=amount;
17         balance=tmp;
18     }
19     public synchronized void subtractAmount(double amount) {
20         double tmp=balance;
21         try {
22             Thread.sleep(10);
23         } catch (InterruptedException e) {
24             e.printStackTrace();
25         }
26         tmp-=amount;
27         balance=tmp;
28     }
29 }      
java并發之線程同步(synchronized和鎖機制)
java并發之線程同步(synchronized和鎖機制)
1 public class Bank implements Runnable {
 2     private Account account;
 3     public Bank(Account account) {
 4         this.account=account;
 5     }
 6     public void run() {
 7         for (int i=0; i<100; i++){
 8             account.subtractAmount(1000);
 9         }
10     }
11 }      
java并發之線程同步(synchronized和鎖機制)
java并發之線程同步(synchronized和鎖機制)
1 public class Company implements Runnable {
 2     private Account account;
 3     public Company(Account account) {
 4         this.account=account;
 5     }
 6 
 7     public void run() {
 8         for (int i=0; i<100; i++){
 9             account.addAmount(1000);
10         }
11     }
12 }      
java并發之線程同步(synchronized和鎖機制)
java并發之線程同步(synchronized和鎖機制)
1 public class Main {
 2     public static void main(String[] args) {
 3         Account    account=new Account();
 4         account.setBalance(1000);
 5         Company    company=new Company(account);
 6         Thread companyThread=new Thread(company);
 7         Bank bank=new Bank(account);
 8         Thread bankThread=new Thread(bank);
 9 
10         companyThread.start();
11         bankThread.start();
12         try {
13             companyThread.join();
14             bankThread.join();
15             System.out.printf("Account : Final Balance: %f\n",account.getBalance());
16         } catch (InterruptedException e) {
17             e.printStackTrace();
18         }
19     }
20 }      
java并發之線程同步(synchronized和鎖機制)

java并發之線程同步(synchronized和鎖機制)
1 public class Cinema {
 2     private long vacanciesCinema1;
 3     private long vacanciesCinema2;
 4 
 5     private final Object controlCinema1, controlCinema2;
 6 
 7     public Cinema(){
 8         controlCinema1=new Object();
 9         controlCinema2=new Object();
10         vacanciesCinema1=20;
11         vacanciesCinema2=20;
12     }
13     
14     public boolean sellTickets1 (int number) {
15         synchronized (controlCinema1) {
16             if (number<vacanciesCinema1) {
17                 vacanciesCinema1-=number;
18                 return true;
19             } else {
20                 return false;
21             }
22         }
23     }
24     
25     public boolean sellTickets2 (int number){
26         synchronized (controlCinema2) {
27             if (number<vacanciesCinema2) {
28                 vacanciesCinema2-=number;
29                 return true;
30             } else {
31                 return false;
32             }
33         }
34     }
35     
36     public boolean returnTickets1 (int number) {
37         synchronized (controlCinema1) {
38             vacanciesCinema1+=number;
39             return true;
40         }
41     }
42     public boolean returnTickets2 (int number) {
43         synchronized (controlCinema2) {
44             vacanciesCinema2+=number;
45             return true;
46         }
47     }
48     public long getVacanciesCinema1() {
49         return vacanciesCinema1;
50     }
51     public long getVacanciesCinema2() {
52         return vacanciesCinema2;
53     }
54 }      
java并發之線程同步(synchronized和鎖機制)

java并發之線程同步(synchronized和鎖機制)
1 public synchronized void set(){
 2             while (storage.size()==maxSize){
 3                 try {
 4                     wait();
 5                 } catch (InterruptedException e) {
 6                     e.printStackTrace();
 7                 }
 8             }
 9             storage.add(new Date());
10             System.out.printf("Set: %d\n", storage.size());
11             notify();
12     }    
13    public synchronized void get(){
14             while (storage.size()==0){
15                 try {
16                     wait();
17                 } catch (InterruptedException e) {
18                     e.printStackTrace();
19                 }
20             }
21             System.out.printf("Get: %d: %s\n",storage.size(),((LinkedList<?>)storage).poll());
22             notify();
23     }      
java并發之線程同步(synchronized和鎖機制)

java并發之線程同步(synchronized和鎖機制)
1 public class PrintQueue {
 2     private final Lock queueLock=new ReentrantLock();
 3 
 4     public void printJob(Object document){
 5         queueLock.lock();
 6         
 7         try {
 8             Long duration=(long)(Math.random()*10000);
 9             System.out.printf("%s: PrintQueue: Printing a Job during %d seconds\n",Thread.currentThread().getName(),(duration/1000));
10             Thread.sleep(duration);
11         } catch (InterruptedException e) {
12             e.printStackTrace();
13         } finally {
14             queueLock.unlock();
15         }
16     }
17 }      
java并發之線程同步(synchronized和鎖機制)
1 private final Lock queueLock=new ReentrantLock();      
1 queueLock.lock();      
1 queueLock.unlock();      

java并發之線程同步(synchronized和鎖機制)
1 public void setPrices(double price1, double price2) {
2         lock.writeLock().lock();
3         this.price1=price1;
4         this.price2=price2;
5         lock.writeLock().unlock();
6     }      
java并發之線程同步(synchronized和鎖機制)
java并發之線程同步(synchronized和鎖機制)
1   public double getPrice1() {
 2         lock.readLock().lock();
 3         double value=price1;
 4         lock.readLock().unlock();
 5         return value;
 6     }
 7     public double getPrice2() {
 8         lock.readLock().lock();
 9         double value=price2;
10         lock.readLock().unlock();
11         return value;
12     }      
java并發之線程同步(synchronized和鎖機制)

java并發之線程同步(synchronized和鎖機制)
1 private Condition lines;
 2     private Condition space;
 3      */
 4     public void insert(String line) {
 5         lock.lock();
 6         try {
 7             while (buffer.size() == maxSize) {
 8                 space.await();
 9             }
10             buffer.offer(line);
11             System.out.printf("%s: Inserted Line: %d\n", Thread.currentThread()
12                     .getName(), buffer.size());
13             lines.signalAll();
14         } catch (InterruptedException e) {
15             e.printStackTrace();
16         } finally {
17             lock.unlock();
18         }
19     }
20 public String get() {
21         String line=null;
22         lock.lock();        
23         try {
24             while ((buffer.size() == 0) &&(hasPendingLines())) {
25                 lines.await();
26             }
27             
28             if (hasPendingLines()) {
29                 line = buffer.poll();
30                 System.out.printf("%s: Line Readed: %d\n",Thread.currentThread().getName(),buffer.size());
31                 space.signalAll();
32             }
33         } catch (InterruptedException e) {
34             e.printStackTrace();
35         } finally {
36             lock.unlock();
37         }
38         return line;
39     }      
java并發之線程同步(synchronized和鎖機制)