天天看點

探索 Java 同步機制

本文從典型的 Monitor Object 設計模式入手,從一個新的視角,來探讨 Java 語言的同步機制。

本文将從兩個方面進行闡述:

  1. 使 用 C++ 語言來描述 Monitor Object 設計模式。Java 對于這樣一個典型的模式做了很好的語言層面的封裝,是以對于 Java 的開發者來說,很多關于該模式本身的東西被屏蔽掉了。本文試圖使用 Native C++ 語言,幫助讀者從本質上對 Monitor object 設計模式有一個更全面的認識。
  2. 結合 C++ 版本的 Monitor Object 設計模式,引領讀者對于 Java 同步機制有一個更深刻的認識,幫助讀者正确有效地使用 Java 同步機制。

預備知識

在開始正式讨論之前,需要了解一些預備知識。

什麼是 RAII

資 源擷取即初始化(RAII, Resource Acquisition Is Initialization)是指,在一個對象的構造函數中獲得資源 , 并且在該對象的析構函數中釋放它。這個資源可以是對象、記憶體、檔案句柄或者其它類型。實作這種功能的類,我們就說它采用了資源擷取即初始化(RAII)的 方式。 RAII 是一種很典型的語言慣用法,被很多的 OO 語言所使用,下面是 C++ 的例子。

清單 1. RAII Using C++

使 用 RAII 的好處是:由于析構函數由系統自動調用,這樣可以幫助我們自動地隐式釋放我們所擷取的資源。事情上,我們熟知的很多 c++ 技術都用到了這一設計模式,比如:智能指針 (Smart Pointer),以及我們接下來要讨論的範圍鎖 (Scoped Lock) 。

不同于 C++,Java 對象沒有析構函數,Java System 提供了 GC 來管理記憶體資源。而對于像資料庫連接配接,Sockets 這樣類型的資源, Java 提供了 finalize() 來處理。但是,請注意,Java 的 finalizer 與 C++ 的析構函數是不同的,finalize() 函數由 GC 異步地在某個恰當的時候調用,我們不能等同地使用 finalize() 來實作 C++ 裡的 RAII 。通常的做法是使用 Java 提供的 finally 語句塊。

清單 2. RAII Using Java
MyResource res = null; 
try {
    res = new MyResource();
    // Use the resource 
} finally {
    //At exit point, close the resource.
    if (res != null) { res.close(); }
}
      

什麼是區域鎖 (Scoped Lock)

區域鎖是指線程執行進入一個區域時,一個鎖将自動被擷取,當該線程執行離開這個區域時,這個鎖将被自動釋放。 C++ 區域鎖的實作使用了 RAII 技術 , 實作如下。

清單 3. Scoped Lock Using C++
template <class LOCK>
class Guard {
public: 
	// Store a pointer to the lock and acquire the lock. 
	Guard (LOCK &lock)
	:m_pLlock (&lock), m_bOwner (false) {
		m_pLlock->acquire ();
		m_bOwner = true; 
	}
	// Release the lock when the guard goes out of scope, 
	// but only if <acquire> succeeded. 
	virtual ~Guard () {
		if (m_bOwner) m_pLlock->release ();
	}
private: 
	// Pointer to the lock we're managing. 
	LOCK *m_pLlock; 
	// Records if the lock is held by this object. 
	bool m_bOwner; 
	// ... maybe need disallow copying and assignment ... 
 };      

Guard 是一個模闆類,LOCK 類型指的是對作業系統提供的線程鎖的抽象,比如,在 Windows 平台上,LOCK 可以是對 CRITICAL_SECTION 的封裝。

那麼對于 Java,怎麼實作區域鎖呢?不必擔心,Java 對于區域鎖模式在語言層面上已經做了封裝,是以對于 Java 開發者來說,不必像 C++ 這樣來開發自己的區域鎖類,這就是我們所熟知的 synchronized 關鍵字。

清單 4. Scoped Lock Using Java
public int scopedLockSample() {
synchronized(this) {
		try {
			//do some work…
		} catch( MyException1 e) {
			//no need release lock explicitly
			return -1; 
		} catch( MyException2 e) {
			//no need release lock explicitly
			return -2; 
		}
		//other exceptions handling... 	
	}
	return 0; 
}      

synchronized 保證在進入該區域後,獲得對象鎖,不管最終該函數從哪裡退出,該對象鎖都會被正确釋放。

什麼是條件變量 (Condition Variables)

條 件變量通常被一個線程用于使自己等待,直到一個涉及共享資料的條件表達式到達特定的狀态。當另外的協作線程訓示共享資料的狀态已發生變化,排程器就喚醒在 該條件變量上挂起的線程。于是新喚醒的線程重新對它的條件表達式進行求值,如果共享資料已到達合适狀态,就恢複處理。以下是條件變量的 C++ 實作。

清單 5. Thread Condition Using C++
class Thread_Condition {
public:
	// Initialize the condition variable and associate it with the specified lock.
	Thread_Condition (const Thread_Mutex &m) 
	:m_obMutex(m) {
		cond_init (&cond_, USYNC_THREAD, 0); 
	}
	// Destroy the condition variable. 
	virtual ~Thread_Condition () {
		cond_destroy (&cond_);
	}
	// Wait for the <Thread_Condition> to be notified
	// or until <timeout> has elapsed. If <timeout> == 0 then wait indefinitely. 
	void wait (Time_Value *timeout = 0) {
		cond_timedwait(&cond_, &m_obMutex.m_lock,timeout == 0?0:timeout->msec ());
	}
	// Notify one thread waiting on <Thread_Condition>.
	void notify () { cond_signal (&cond_); }
	// Notify all threads waiting on <Thread_Condition>.
	void notify_all () { cond_broadcast (&cond_);
}
private: 
	// Solaris condition variable. 
	cond_t cond_;
	// Reference to mutex lock. 
	const Thread_Mutex &m_obMutex; 
};      

Thread_Condition 的實作與作業系統提供的 API 密切相關,以上的例子是基于 Solaris condition variable API 的面向對象的封裝。另外,這裡的 Thread_Mutex 類型是對作業系統提供的線程鎖的面向對象的封裝 (Thread_Mutex 類型就是 Guard 模闆參數 LOCK 所指向的類型 ) 。

而對于 Java,問題就變得簡單很多,你不需要去封裝自己的條件變量類,Java 的根類 Object 提供了 wait/notify/notifyAll 方法給開發者,很容易使用,這個我們在後面的讨論中會看到。

探索 Java 同步機制
探索 Java 同步機制

Monitor Object 設計模式 C++ 描述

我們将從以下幾個方面來讨論 Monitor Object 模式。

問題描述

我們在開發并發的應用時,經常需要設計這樣的對象,該對象的方法會在多線程的環境下被調用,而這些方法的執行都會改變該對象本身的狀态。為了防止競争條件 (race condition) 的出現,對于這類對象的設計,需要考慮解決以下問題:

  • 在任一時間内,隻有唯一的公共的成員方法,被唯一的線程所執行。
  • 對于對象的調用者來說,如果總是需要在調用方法之前進行拿鎖,而在調用方法之後進行放鎖,這将會使并發應用程式設計變得更加困難。合理的設計是,該對象本身確定任何針對它的方法請求的同步被透明的進行,而不需要調用者的介入。
  • 如果一個對象的方法執行過程中,由于某些條件不能滿足而阻塞,應該允許其它的用戶端線程的方法調用可以通路該對象。

我 們使用 Monitor Object 設計模式來解決這類問題:将被客戶線程并發通路的對象定義為一個 monitor 對象。客戶線程僅僅通過 monitor 對象的同步方法才能使用 monitor 對象定義的服務。為了防止陷入競争條件,在任一時刻隻能有一個同步方法被執行。每一個 monitor 對象包含一個 monitor 鎖,被同步方法用于串行通路對象的行為和狀态。此外,同步方法可以根據一個或多個與 monitor 對象相關的 monitor conditions 來決定在何種情況下挂起或恢複他們的執行。

結構

在 Monitor Object 模式中,主要有四種類型的參與者:

  • 監視者對象 (Monitor Object): 負責定義公共的接口方法,這些公共的接口方法會在多線程的環境下被調用執行。
  • 同步方法:這些方法是監視者對象所定義。為了防止競争條件,無論是否同時有多個線程并發調用同步方法,還是監視者對象含有多個同步方法,在任一時間内隻有監視者對象的一個同步方法能夠被執行。
  • 監視鎖 (Monitor Lock): 每一個監視者對象都會擁有一把監視鎖。
  • 監視條件 (Monitor Condition): 同步方法使用監視鎖和監視條件來決定方法是否需要阻塞或重新執行。

執行序列圖

在監視者對象模式中,在參與者之間将發生如下的協作過程:

1、 同步方法的調用和串行化。當客戶線程調用監視者對象的同步方法時,必須首先擷取它的監視鎖。隻要該監視者對象有其他同步方法正在被執行,擷取操作便不會成 功。在這種情況下,客戶線程将被阻塞直到它擷取監視鎖。當客戶線程成功擷取監視鎖後,進入臨界區,執行方法實作的服務。一旦同步方法完成執行,監視鎖會被 自動釋放,目的是使其他客戶線程有機會調用執行該監視者對象的同步方法。

2、同步方法線程挂起。如果調用同步方法的客戶線程必須被阻塞或是有其他原因不能立刻進行,它能夠在一個監視條件上等待,這将導緻該客戶線程暫時釋放監視鎖,并被挂起在監視條件上。

3、監視條件通知。一個客戶線程能夠通知一個監視條件,目的是為了讓一個前期使自己挂起在一個監視條件上的同步方法線程恢複運作。

4、同步方法線程恢複。一旦一個早先被挂起在監視條件上的同步方法線程擷取通知,它将繼續在最初的等待監視條件的點上執行。在被通知線程被允許恢複執行同步方法之前,監視鎖将自動被擷取。圖 1 描述了

監視者

對象的動态特性。

圖 1. Monitor Object Sequence Diagram.
探索 Java 同步機制

示例

在本節中,我們将使用監視者對象設計模式來解決一個實際的問題。

這是一個典型的生産者 / 消費者模式問題。假定我們有一個固定長度的消息隊列,該隊列會被多個生産者 / 消費者線程所操作,生産者線程負責将消息放入該隊列,而消費者線程負責從該對列中取出消息。

清單 6. Message_Queue.h
class Message_Queue {
public:
	enum { MAX_MESSAGES = 100/* ... */ };
	// The constructor defines the maximum number
	// of messages in the queue. This determines when the queue is 'full.'
	Message_Queue(size_t max_messages = MAX_MESSAGES); 
	virtual ~Message_Queue();
	// Put the <Message> at the tail of the queue. 
	// If the queue is full, block until the queue is not full. 
	/* synchronized */ 
	void put (const Message &msg); 
	// Get the <Message> from the head of the queue
	// and remove it. If the queue is empty, block until the queue is not empty. 
	/* synchronized */ 
	Message get();
	// True if the queue is empty, else false. 
	/* synchronized */ 
	bool empty () const; 
	// True if the queue is full, else false. 
	/* synchronized */ 
	bool full () const; 
private:
	// Put the <Message> at the tail of the queue, and
	// get the <Message> at its head, respectively. 
	// Note that, the internal methods are not synchronized. 
	void put_i (const Message &msg); 
	Message get_i ();
	// True if the queue is empty, else false. 
	bool empty_i () const; 
	// True if the queue is full, else false. 
	bool full_i () const; 
private: 
	// Internal Queue representation omitted, could be a
	// circular array or a linked list, etc.. ... 
	// Current number of <Message>s in the queue. 
	size_t message_count_; 
	// The maximum number <Message>s that can be
	// in a queue before it's considered 'full.'
	size_t max_messages_; 
	// Monitor lock that protects the queue's
	// internal state from race conditions during concurrent access. 
	mutable Thread_Mutex monitor_lock_; 
	// Condition variable used in conjunction with <monitor_lock_> to make
	// synchronized method threads wait until the queue is no longer empty. 
	Thread_Condition not_empty_;
	// Condition variable used in conjunction with <monitor_lock_> to make
	// synchronized method threads wait until the queue is no longer full. 
	Thread_Condition not_full_;
};      
清單 7. Message_Queue.cpp
#include "Message_Queue.h"
Message_Queue::Message_Queue (size_t max_messages) 
:not_full_(monitor_lock_),
 not_empty_(monitor_lock_),
 max_messages_(max_messages), 
 message_count_(0) {
}
bool Message_Queue::empty () const {
	Guard<Thread_Mutex> guard (monitor_lock_); 
	return empty_i ();
}
bool Message_Queue::full () const {
	Guard<Thread_Mutex> guard (monitor_lock_);
	return full_i ();
}
void Message_Queue::put (const Message &msg) {
	// Use the Scoped Locking idiom to acquire/release the < monitor_lock_> upon
	// entry/exit to the synchronized method. 
	Guard<Thread_Mutex> guard (monitor_lock_);
	// Wait while the queue is full. 
	while (full_i ()) {
		// Release < monitor_lock_> and suspend the
		// calling thread waiting for space in the queue. 
		// The <monitor_lock_> is reacquired automatically when <wait> returns. 
		not_full_.wait ();
	}
	// Enqueue the <Message> at the tail. 
	put_i (msg); 
	// Notify any thread waiting in <get> that the queue has at least one <Message>.
	not_empty_.notify ();
} // Destructor of <guard> releases <monitor_lock_>.
Message Message_Queue::get () {
// Use the Scoped Locking idiom to acquire/release the <monitor_lock_> upon
// entry/exit to the synchronized method. 
Guard<Thread_Mutex> guard (monitor_lock_);
// Wait while the queue is empty. 
while (empty_i ()) {
	// Release <monitor_lock_> and suspend the
	// calling thread waiting for a new <Message> to
	// be put into the queue. The <monitor_lock_> is
	// reacquired automatically when <wait> returns. 
	not_empty_.wait ();
}
// Dequeue the first <Message> in the queue and update the <message_count_>.
	Message m = get_i ();
	// Notify any thread waiting in <put> that the
	// queue has room for at least one <Message>.
	not_full_.notify ();
	return m; 
} // Destructor of <guard> releases <monitor_lock_>.
bool Message_Queue::empty_i () const {
	return message_count_ == 0; 
}
bool Message_Queue::full_i () const {
	return message_count_ == max_messages_; 
}
Message_Queue::~Message_Queue() {
}
      
探索 Java 同步機制
探索 Java 同步機制

Monitor Object Java 實踐

認識 Java Monitor Object

Java Monitor 從兩個方面來支援線程之間的同步,即:互斥執行與協作。 Java 使用對象鎖 ( 使用 synchronized 獲得對象鎖 ) 保證工作在共享的資料集上的線程互斥執行 , 使用 notify/notifyAll/wait 方法來協同不同線程之間的工作。這些方法在 Object 類上被定義,會被所有的 Java 對象自動繼承。

實質上,Java 的 Object 類本身就是監視者對象,Java 語言對于這樣一個典型并發設計模式做了内建的支援。不過,在 Java 裡,我們已經看不到了我們在 C++ 一節所讨論的區域鎖與條件變量的概念。下圖很好地描述了 Java Monitor 的工作機理。

圖 2. Java Monitor
探索 Java 同步機制

線程如果獲得監視鎖成功,将成為該監視者對象的擁有者。在任一時刻内,監視者對象隻屬于一個活動線程 (Owner) 。擁有者線程可以調用 wait 方法自動釋放監視鎖,進入等待狀态。

示例

在本節,我們将用 Java Monitor 來重新解決用 C++ 實作的生産者 / 消費者模式問題。

清單 8. Message Class
public class Message {
    private static int OBJ_COUNT = 0;
    public int obj_index_;
    Message(){
        synchronized(Message.class) {
            OBJ_COUNT++;
            obj_index_ = OBJ_COUNT;
        }
    }
    
    @Override
    public String toString() {
        return "message["+obj_index_+"]";
    }
}
      
清單 9. MessageQueue Class
public class MessageQueue {
private int message_count_;
	private int max_messages_;
	private Message[] buffer_;
		
	private int in_ = 0, out_ = 0;
	public MessageQueue(int max_messages) {
		max_messages_   = max_messages;
		message_count_  = 0;
		buffer_          = new Message[max_messages_];
	}
	
	synchronized boolean full () {	
		return full_i ();
	}
	synchronized  void put (Message msg) {
		while (full_i ()) {
			try { 
			    System.out.println("thread["+
			      Thread.currentThread().getId()+
			      "]"+
			      "release monitor lock, wait for space in the queue");
                wait(); 
            } catch (InterruptedException e) {
                //do something.
            } finally { 
                //do something.
            } 
		}//end while.
		put_i(msg);
		notifyAll();
	}
	synchronized  Message get() {
		while (empty_i ()) {
	  		try { 
	  		    System.out.println("thread["+
	  		      Thread.currentThread().getId()+
	  		      "]"+
	  		      "release monitor lock, wait for message in the queue");
                			wait(); 
                		} catch (InterruptedException e) {
                			//do something.
                		} finally { 
                			//do something.
                		} 
		}//end while.
		Message m = get_i ();
		notifyAll();
		return m;
	}
	private boolean empty_i () {
		return message_count_ == 0;
	}
	private boolean full_i () {
		return message_count_ == max_messages_;
	}
	private void put_i (Message msg) {
	    System.out.println("thread ["+
	        Thread.currentThread().getId()+
	        "] put message <"+
	        msg+
	        ">" +
	        "to the queue");
		buffer_[in_] = msg; 
            	in_ = (in_ + 1) % max_messages_;  
            	message_count_++; 
	}
	private Message get_i() {
         	Message msg = buffer_[out_]; 
            	out_= (out_ + 1) % max_messages_;
            	message_count_--;
            	System.out.println("thread ["+
            	    Thread.currentThread().getId()+
            	    "] get message <"+
            	    msg+
            	    ">" +
            	    "from the queue");
	    	return msg;
	}
}
      

在 Java 的示例中,沒有放更多的注釋,希望讀者通過對照 C++ 的示例,來閱讀了解這裡的 Java 代碼。可以看到,使用 Java 的版本代碼簡潔了很多。另外,這裡提供的 Java 代碼,稍作修改,是直接可以作為獨立的 Java 程式運作的。

探索 Java 同步機制
探索 Java 同步機制

總結

我們對比一下 Monitor Object 設計模式的 C++ 版本與 Java 版本,做出如下的總結。

在 Java 的版本中,我們不需要親自開發 Scoped Lock,Thread Condition 類,Java 語言給我們提供了内建的支援,我們很容易使用 synchronized, wait/notify 這些 Java 特性來建構基于 Monitor Object 模式的應用。而缺點是:缺乏一些必要的靈活性。比如 : 在 Java 的版本中,我們并不能區分出 not empty 與 not full 這兩個條件變量,是以我們隻能使用 notifyAll 來通知所有等待者線程,而 C++ 版本使用了不同的通知喚醒:not_full_.notify 與 not_empty_.notify 。同樣,在 Java 中對于 synchrnonized 的使用,後面一定要跟 {} 語句塊,這在代碼的書寫上有些不靈活,而在 C++ 中的,Scoped Lock 預設就是保護目前的語句塊,當然你也可以選擇使用 {} 來顯式聲明。而且,使用 synchroninzed 所獲得的對象鎖,無法細粒度地區分是獲得讀鎖還是寫鎖。

不過總的來說,Java 的确簡化了基于 Monitor Object 并發模式的開發。不過,我們應該意識到,并發的實際應用開發決不會像 Java 文法這麼展現出來的簡單,簡潔。我們更應該看到并發應用程式本質的一些東西,這有利于幫助我們建構更加健壯的并發應用。

參考資料

  • “Java單例對象同步問題探讨”(developerWorks,2003 年 12 月):本文将探讨一下在多線程環境下,使用單例對象作配置資訊管理時可能會帶來的幾個同步問題,并針對每個問題給出可選的解決辦法。
  • “Java 理論與實踐: Mustang 中的同步優化”(developerWorks,2005 年 11 月):本文介紹一些為 Mustang 安排的同步優化。
  • developerWorks Java 技術專區:這裡有數百篇關于 Java 程式設計的文章。

關于作者

探索 Java 同步機制
探索 Java 同步機制
李三紅任職于 IBM CDL,負責 Lotus Notes 産品研發。