目錄
- 使用synchronized實作同步方法
- 使用非依賴屬性實作同步
- 在同步塊中使用條件(wait(),notify(),notifyAll())
- 使用鎖實作同步
- 使用讀寫鎖實作同步資料通路
- 修改鎖的公平性
- 在鎖中使用多條件(Multri Condition)
正文
多個執行線程共享一個資源的情景,是并發程式設計中最常見的情景之一。多個線程讀或者寫相同的資料等情況時可能會導緻資料不一緻。為了解決這些問題,引入了臨界區概念。臨界區是一個用以通路共享資源的代碼塊,這個代碼塊在同一時間内隻允許一個線程執行。
Java提供了同步機制。當一個線程試圖通路一個臨界區時,它将使用一種同步機制來檢視是不是已有其他線程進入臨界區。如果沒有其他線程進入臨界區,它就可以進入臨界區;如果已有線程進入了臨界區,它就被同步機制挂起,直到進入的線程離開這個臨界區。如果在等待進入臨界區的線程不止一個,JVM會随機選擇其中的一個,其餘的将繼續等待。
概念比較好了解,具體在java程式中是如何展現的呢?臨界區對應的代碼是怎麼樣的?
回到頂部
每一個用synchronized關鍵字聲明的方法都是臨界區。在Java中,同一個對象的臨界區,在同一時間隻有一個允許被通路。
注意:用synchronized關鍵字聲明的靜态方法,同時隻能被一個執行線程通路,但是其他線程可以通路這個對象的非靜态方法。即:兩個線程可以同時通路一個對象的兩個不同的synchronized方法,其中一個是靜态方法,一個是非靜态方法。
知道了synchronized關鍵字的作用,再來看一下synchronized關鍵字的使用方式。
- 在方法聲明中加入synchronized關鍵字
-
1 public synchronized void addAmount(double amount) { 2 }
- 在代碼塊中使用synchronized關鍵字,obj一般可以使用this關鍵字表示本類對象
-
1 synchronized(obj){ 2 }
需要注意的是:前面已經提到,引入synchronized關鍵字是為了聲明臨界區,解決在多線程環境下共享變量的資料更改安全問題。那麼,一般用到synchronized關鍵字的地方也就是 在對共享資料 通路或者修改的地方。下面舉一個例子,例子場景是這樣:公司定時會給賬戶打款,銀行對賬戶進行扣款。那麼款項對于銀行和公司來說就是一個共享資料。那麼synchronized關鍵字就應該在修改賬戶的地方使用。
聲明一個Account類:
1 public class Account {
2 private double balance;
3 public double getBalance() {
4 return balance;
5 }
6 public void setBalance(double balance) {
7 this.balance = balance;
8 }
9 public synchronized void addAmount(double amount) {
10 double tmp=balance;
11 try {
12 Thread.sleep(10);
13 } catch (InterruptedException e) {
14 e.printStackTrace();
15 }
16 tmp+=amount;
17 balance=tmp;
18 }
19 public synchronized void subtractAmount(double amount) {
20 double tmp=balance;
21 try {
22 Thread.sleep(10);
23 } catch (InterruptedException e) {
24 e.printStackTrace();
25 }
26 tmp-=amount;
27 balance=tmp;
28 }
29 }

Bank類扣款:

1 public class Bank implements Runnable {
2 private Account account;
3 public Bank(Account account) {
4 this.account=account;
5 }
6 public void run() {
7 for (int i=0; i<100; i++){
8 account.subtractAmount(1000);
9 }
10 }
11 }

Company類打款:

1 public class Company implements Runnable {
2 private Account account;
3 public Company(Account account) {
4 this.account=account;
5 }
6
7 public void run() {
8 for (int i=0; i<100; i++){
9 account.addAmount(1000);
10 }
11 }
12 }

這裡需要注意的就是:在Bank和Company的構造函數裡面傳遞的參數是Account,就是一個共享資料。
Main函數:

1 public class Main {
2 public static void main(String[] args) {
3 Account account=new Account();
4 account.setBalance(1000);
5 Company company=new Company(account);
6 Thread companyThread=new Thread(company);
7 Bank bank=new Bank(account);
8 Thread bankThread=new Thread(bank);
9
10 companyThread.start();
11 bankThread.start();
12 try {
13 companyThread.join();
14 bankThread.join();
15 System.out.printf("Account : Final Balance: %f\n",account.getBalance());
16 } catch (InterruptedException e) {
17 e.printStackTrace();
18 }
19 }
20 }

這個例子比較簡單,但是可以說明問題。
補充:
1、synchronized關鍵字會降低應用程式的性能,是以隻能在并發場景中修改共享資料的方法上使用它。
2、臨界區的通路應該盡可能的短。方法的其餘部分保持在synchronized代碼塊之外,以擷取更好的性能
非依賴屬性:例如在一個類中有兩個非依賴屬性,Object obj1,Object obj2;他們被多個線程共享,那麼同一時間隻允許一個線程通路其中的一個屬性變量,其他的某個線程通路另一個屬性變量。
舉例如下:兩個看電影的房間和兩個售票口,一個售票處賣出的一張票,隻能用于其中的一個電影院。不能同時作用于兩個電影房間。
Cinema類:

1 public class Cinema {
2 private long vacanciesCinema1;
3 private long vacanciesCinema2;
4
5 private final Object controlCinema1, controlCinema2;
6
7 public Cinema(){
8 controlCinema1=new Object();
9 controlCinema2=new Object();
10 vacanciesCinema1=20;
11 vacanciesCinema2=20;
12 }
13
14 public boolean sellTickets1 (int number) {
15 synchronized (controlCinema1) {
16 if (number<vacanciesCinema1) {
17 vacanciesCinema1-=number;
18 return true;
19 } else {
20 return false;
21 }
22 }
23 }
24
25 public boolean sellTickets2 (int number){
26 synchronized (controlCinema2) {
27 if (number<vacanciesCinema2) {
28 vacanciesCinema2-=number;
29 return true;
30 } else {
31 return false;
32 }
33 }
34 }
35
36 public boolean returnTickets1 (int number) {
37 synchronized (controlCinema1) {
38 vacanciesCinema1+=number;
39 return true;
40 }
41 }
42 public boolean returnTickets2 (int number) {
43 synchronized (controlCinema2) {
44 vacanciesCinema2+=number;
45 return true;
46 }
47 }
48 public long getVacanciesCinema1() {
49 return vacanciesCinema1;
50 }
51 public long getVacanciesCinema2() {
52 return vacanciesCinema2;
53 }
54 }

這樣的話,vacanciescinema1和vacanciescinema2(剩餘票數)是獨立的,因為他們屬于不同的對象。這種情況下,隻允許一個同時有一個線程修改vacanciescinema1或者vacanciescinema2,但是允許有兩個線程同時修改vacanciescinema1和vacanciescinema2。
首先需要明确:
- 上述三個方法都是Object 類的方法。
- 上述三個方法都必須在同步代碼塊中使用。
當一個線程調用wait()方法時,JVM将這個線程置入休眠,并且釋放控制這個同步代碼塊的對象,同時允許其他線程執行這個對象控制的其他同步代碼塊。為了喚醒這個線程,必須在這個對象控制的某個同步代碼塊中調用notify()或者notifyAll()方法。
上述一段話很重要!!!它說明了使用上述三個函數的方法以及方法的作用。
wait():将線程置入休眠狀态,并且釋放控制這個同步代碼塊的對象,釋放了以後其他線程就可以執行這個對象控制的其他代碼塊。也就是可以進入了。這個和Thread.sleep(millions)方法不同,sleep()方法是睡眠指定時間後自動喚醒。
notify()/notifyAll():使用wait()方法休眠的線程需要在該對象控制的某個同步代碼塊中 調用notify或者notifyAll()方法去喚醒,才能進入就緒狀态等待JVM的調用。否則一緻處于休眠狀态。
難點:線程休眠和喚醒的時機,就是說什麼時候調用notify()或者notifyAll()方法???
拿生産者和消費者的例子來說:生産者往隊列中塞資料,消費者從隊列中取資料,是以這個隊列是共享資料
資料存儲類 EventStorage
塞資料方法和取資料方法:set()、get()

1 public synchronized void set(){
2 while (storage.size()==maxSize){
3 try {
4 wait();
5 } catch (InterruptedException e) {
6 e.printStackTrace();
7 }
8 }
9 storage.add(new Date());
10 System.out.printf("Set: %d\n", storage.size());
11 notify();
12 }
13 public synchronized void get(){
14 while (storage.size()==0){
15 try {
16 wait();
17 } catch (InterruptedException e) {
18 e.printStackTrace();
19 }
20 }
21 System.out.printf("Get: %d: %s\n",storage.size(),((LinkedList<?>)storage).poll());
22 notify();
23 }

分析上面這個簡單的程式:
1、方法使用synchronized關鍵字聲明同步代碼塊。是以這個函數裡面可以使用同步條件。
2、首先判斷隊列是否已經滿了,這裡要使用while而不是if。為什麼呢?while是一緻查詢是否已經滿了,而if是判斷一次就完事了。
3、如果滿了,調用wait()方法釋放該對象,那麼其他方法(例如get())就可以使用這個對象了。get()方法進入後取出一個資料,然後喚醒上一個被休眠的線程。
4、雖然線程被喚醒了,但是由于get()方法線程占用對象鎖,是以set()方法處于阻塞狀态。直到get()方法取出所有的資料滿足休眠條件以後,set()方法重新執行
5、重複以上步驟
Java提供了同步代碼塊的另一種機制,它比synchronized關鍵字更強大也更加靈活。這種機制基于Lock接口及其實作類(例如:ReentrantLock)
它比synchronized關鍵字好的地方:
1、提供了更多的功能。tryLock()方法的實作,這個方法試圖擷取鎖,如果鎖已經被其他線程占用,它将傳回false并繼續往下執行代碼。
2、Lock接口允許分離讀和寫操作,允許多個線程讀和隻有一個寫線程。ReentrantReadWriteLock
3、具有更好的性能
一個鎖的使用執行個體:

1 public class PrintQueue {
2 private final Lock queueLock=new ReentrantLock();
3
4 public void printJob(Object document){
5 queueLock.lock();
6
7 try {
8 Long duration=(long)(Math.random()*10000);
9 System.out.printf("%s: PrintQueue: Printing a Job during %d seconds\n",Thread.currentThread().getName(),(duration/1000));
10 Thread.sleep(duration);
11 } catch (InterruptedException e) {
12 e.printStackTrace();
13 } finally {
14 queueLock.unlock();
15 }
16 }
17 }

聲明一把鎖,其中ReentrantLock(可重入的互斥鎖)是Lock接口的一個實作
1 private final Lock queueLock=new ReentrantLock();
然後在函數裡面調用lock()方法聲明同步代碼塊(臨界區)
1 queueLock.lock();
最後在finally塊中釋放鎖,重要!!!
1 queueLock.unlock();
鎖機制最大的改進之一就是ReadWriteLock接口和他的唯一實作類ReentrantReadWriteLock.這個類有兩個鎖,一個是讀操作鎖,一個是寫操作鎖。使用讀操作鎖時可以允許多個線程同時通路,使用寫操作鎖時隻允許一個線程進行。在一個線程執行寫操作時,其他線程不能夠執行讀操作。
在調用寫操作鎖時,使用一個線程。
寫操作鎖的用法:

1 public void setPrices(double price1, double price2) {
2 lock.writeLock().lock();
3 this.price1=price1;
4 this.price2=price2;
5 lock.writeLock().unlock();
6 }

讀操作鎖:

1 public double getPrice1() {
2 lock.readLock().lock();
3 double value=price1;
4 lock.readLock().unlock();
5 return value;
6 }
7 public double getPrice2() {
8 lock.readLock().lock();
9 double value=price2;
10 lock.readLock().unlock();
11 return value;
12 }

ReentrantLock和ReetrantReadWriteLock構造函數都含有一個布爾參數fair。預設fair為false,即非公平模式。
公平模式:當有很多線程在等待鎖時,鎖将選擇一個等待時間最長的線程進入臨界區。
非公平模式:當有很多線程在等待鎖時,鎖将随機選擇一個等待區(就緒狀态)的線程進入臨界區。
這兩種模式隻适用于lock()和unlock()方。而Lock接口的tryLock()方法沒有将線程置于休眠,fair屬性并不影響這個方法。
鎖條件可以和synchronized關鍵字聲明的臨界區的方法(wait(),notify(),notifyAll())做類比。鎖條件通過Conditon接口聲明。Condition提供了挂起線程和喚醒線程的機制。
使用方法:

1 private Condition lines;
2 private Condition space;
3 */
4 public void insert(String line) {
5 lock.lock();
6 try {
7 while (buffer.size() == maxSize) {
8 space.await();
9 }
10 buffer.offer(line);
11 System.out.printf("%s: Inserted Line: %d\n", Thread.currentThread()
12 .getName(), buffer.size());
13 lines.signalAll();
14 } catch (InterruptedException e) {
15 e.printStackTrace();
16 } finally {
17 lock.unlock();
18 }
19 }
20 public String get() {
21 String line=null;
22 lock.lock();
23 try {
24 while ((buffer.size() == 0) &&(hasPendingLines())) {
25 lines.await();
26 }
27
28 if (hasPendingLines()) {
29 line = buffer.poll();
30 System.out.printf("%s: Line Readed: %d\n",Thread.currentThread().getName(),buffer.size());
31 space.signalAll();
32 }
33 } catch (InterruptedException e) {
34 e.printStackTrace();
35 } finally {
36 lock.unlock();
37 }
38 return line;
39 }

-
1 public synchronized void addAmount(double amount) { 2 }
-
1 synchronized(obj){ 2 }

1 public class Account {
2 private double balance;
3 public double getBalance() {
4 return balance;
5 }
6 public void setBalance(double balance) {
7 this.balance = balance;
8 }
9 public synchronized void addAmount(double amount) {
10 double tmp=balance;
11 try {
12 Thread.sleep(10);
13 } catch (InterruptedException e) {
14 e.printStackTrace();
15 }
16 tmp+=amount;
17 balance=tmp;
18 }
19 public synchronized void subtractAmount(double amount) {
20 double tmp=balance;
21 try {
22 Thread.sleep(10);
23 } catch (InterruptedException e) {
24 e.printStackTrace();
25 }
26 tmp-=amount;
27 balance=tmp;
28 }
29 }


1 public class Bank implements Runnable {
2 private Account account;
3 public Bank(Account account) {
4 this.account=account;
5 }
6 public void run() {
7 for (int i=0; i<100; i++){
8 account.subtractAmount(1000);
9 }
10 }
11 }


1 public class Company implements Runnable {
2 private Account account;
3 public Company(Account account) {
4 this.account=account;
5 }
6
7 public void run() {
8 for (int i=0; i<100; i++){
9 account.addAmount(1000);
10 }
11 }
12 }


1 public class Main {
2 public static void main(String[] args) {
3 Account account=new Account();
4 account.setBalance(1000);
5 Company company=new Company(account);
6 Thread companyThread=new Thread(company);
7 Bank bank=new Bank(account);
8 Thread bankThread=new Thread(bank);
9
10 companyThread.start();
11 bankThread.start();
12 try {
13 companyThread.join();
14 bankThread.join();
15 System.out.printf("Account : Final Balance: %f\n",account.getBalance());
16 } catch (InterruptedException e) {
17 e.printStackTrace();
18 }
19 }
20 }


1 public class Cinema {
2 private long vacanciesCinema1;
3 private long vacanciesCinema2;
4
5 private final Object controlCinema1, controlCinema2;
6
7 public Cinema(){
8 controlCinema1=new Object();
9 controlCinema2=new Object();
10 vacanciesCinema1=20;
11 vacanciesCinema2=20;
12 }
13
14 public boolean sellTickets1 (int number) {
15 synchronized (controlCinema1) {
16 if (number<vacanciesCinema1) {
17 vacanciesCinema1-=number;
18 return true;
19 } else {
20 return false;
21 }
22 }
23 }
24
25 public boolean sellTickets2 (int number){
26 synchronized (controlCinema2) {
27 if (number<vacanciesCinema2) {
28 vacanciesCinema2-=number;
29 return true;
30 } else {
31 return false;
32 }
33 }
34 }
35
36 public boolean returnTickets1 (int number) {
37 synchronized (controlCinema1) {
38 vacanciesCinema1+=number;
39 return true;
40 }
41 }
42 public boolean returnTickets2 (int number) {
43 synchronized (controlCinema2) {
44 vacanciesCinema2+=number;
45 return true;
46 }
47 }
48 public long getVacanciesCinema1() {
49 return vacanciesCinema1;
50 }
51 public long getVacanciesCinema2() {
52 return vacanciesCinema2;
53 }
54 }


1 public synchronized void set(){
2 while (storage.size()==maxSize){
3 try {
4 wait();
5 } catch (InterruptedException e) {
6 e.printStackTrace();
7 }
8 }
9 storage.add(new Date());
10 System.out.printf("Set: %d\n", storage.size());
11 notify();
12 }
13 public synchronized void get(){
14 while (storage.size()==0){
15 try {
16 wait();
17 } catch (InterruptedException e) {
18 e.printStackTrace();
19 }
20 }
21 System.out.printf("Get: %d: %s\n",storage.size(),((LinkedList<?>)storage).poll());
22 notify();
23 }


1 public class PrintQueue {
2 private final Lock queueLock=new ReentrantLock();
3
4 public void printJob(Object document){
5 queueLock.lock();
6
7 try {
8 Long duration=(long)(Math.random()*10000);
9 System.out.printf("%s: PrintQueue: Printing a Job during %d seconds\n",Thread.currentThread().getName(),(duration/1000));
10 Thread.sleep(duration);
11 } catch (InterruptedException e) {
12 e.printStackTrace();
13 } finally {
14 queueLock.unlock();
15 }
16 }
17 }

1 private final Lock queueLock=new ReentrantLock();
1 queueLock.lock();
1 queueLock.unlock();

1 public void setPrices(double price1, double price2) {
2 lock.writeLock().lock();
3 this.price1=price1;
4 this.price2=price2;
5 lock.writeLock().unlock();
6 }


1 public double getPrice1() {
2 lock.readLock().lock();
3 double value=price1;
4 lock.readLock().unlock();
5 return value;
6 }
7 public double getPrice2() {
8 lock.readLock().lock();
9 double value=price2;
10 lock.readLock().unlock();
11 return value;
12 }


1 private Condition lines;
2 private Condition space;
3 */
4 public void insert(String line) {
5 lock.lock();
6 try {
7 while (buffer.size() == maxSize) {
8 space.await();
9 }
10 buffer.offer(line);
11 System.out.printf("%s: Inserted Line: %d\n", Thread.currentThread()
12 .getName(), buffer.size());
13 lines.signalAll();
14 } catch (InterruptedException e) {
15 e.printStackTrace();
16 } finally {
17 lock.unlock();
18 }
19 }
20 public String get() {
21 String line=null;
22 lock.lock();
23 try {
24 while ((buffer.size() == 0) &&(hasPendingLines())) {
25 lines.await();
26 }
27
28 if (hasPendingLines()) {
29 line = buffer.poll();
30 System.out.printf("%s: Line Readed: %d\n",Thread.currentThread().getName(),buffer.size());
31 space.signalAll();
32 }
33 } catch (InterruptedException e) {
34 e.printStackTrace();
35 } finally {
36 lock.unlock();
37 }
38 return line;
39 }
