本文節選自 Effective Java by Joshua Bloch 和 Concurrent Programming in Java by Doug Lea.
1.6 避免過多的同步
1.6.1是否需要同步
過多的同步可能會導緻性能降低、死鎖,甚至不确定行為。通常,在同步區域内應該做盡可能少的工作。同步區域之外被調用的外來方法被稱為“開放調用(open call)”。除了可以避免死鎖之外,開放調用還可以極大地增加并發性。
考慮StringBuffer類和BufferedInputStream類,這些類都是線程安全(thread-safe)的,但是它們往往被用于單個線程中,是以它們所做的鎖操作往往是不必要的,雖然同步的開銷自java平台早期開始就一直在下降,但是它永遠也不會消失。一個給定的類是否應該執行内部同步并不總是很清楚,下面是一些指導性的原則。
如果你正在編寫的類主要被用于同步環境中,同時也被用于不要求同步的環境中,那麼一個合理的方法是,同時提供同步版本和和未同步版本。這也正是Collections Framework采用的方法。還有,java.util.Random也是采用這一種做法是提供一個包裝類(wrapper class),它實作一個描述該類的接口,同時在将方法調用轉發給内部對象中對應的方法之前執行适當的同步操作。種方法。第二種方法适用于那些不是被設計用來擴充或者重新實作的類,它提供一個未同步的類和一個子類,在子類中包含一些被同步的方法,它們依次調用到超類中對應的方法上。
關于是否對一個用于存取成員變量的方法進行同步,需要考慮兩點:合法性和陳舊性。如果成員變量不總是合法的,那麼可以的選擇是:
- 同步所有存取方法
- 確定使用者在得到非法值的時候能得到通知
- 省略存取方法。在并發程式中,對象的屬性可以被異步修改,客戶通過某行代碼得到的值可能在下一行代碼中就改變了。是以需要仔細評估存取方法存在的必要性。
如果成員變量的值總是合法的,但是不能是陳舊資料,那麼可以的選擇是:
- 把成員變量定義為volatile,并去掉存取方法的同步。
1.6.2 分解同步和分解鎖
另外一種增加程式并發性的方法是分解同步,如果一個類的行為可以分解為互相獨立、互不幹擾或者不沖突的子部分,那麼就值得用細粒度的輔助對象來重新構造類。普遍的原則是,把類的内部同步操作分得越細,在大多數情況下,它的活性就越高。但是這一點是以更加複雜和潛在的錯誤為代價的。例如:
public class Shape
{
public synchronized vodi adjustLocation(){ /*Long time operation*/ }
public synchronized vodi adjustDimensions(){ /*Long time operation*/ }
}
我們假設adjustLocation不處理次元資訊,adjustDimensions不處理位置資訊,那麼可以考慮把次元和位置資訊分解到兩個類中, 例如:
public class Shape
{
private final Location location = new Location();
private final Dimensions dimensions = new Dimensions();
public void adjustLocation(){ location.adjustLocation(); }
public void adjustDimensions(){ dimensions.adjustDimensions(); }
}
public class Location
{
public synchronized void adjustLocation(){ /*Long time operation*/ }
}
public class Dimensions
{
public synchronized void adjustDimensions(){ /*Long time operation*/ }
}
如果你不能或者不想分解類,則可以分解每個子功能相關的同步鎖。例如
public class Shape
{
private final Object locationLock = new Object();
private final Object dimensionsLock = new Object();
public void adjustLocation()
{
synchronized(locationLock)
{
/*Long time operation*/
}
}
public void adjustDimensions()
{
synchronized(dimensionsLock)
{
/*Long time operation*/
}
}
}
1.6.3 沖突集合
設想有一個Inventory類,它有store和retrieve方法來存取對象。以下的例子中使用了Hashtable來示範,雖然這種完全同步的Hashtable允許Inventory類的實作無需考慮底層的實作細節。但是,我們仍然想store和retrieve方法添加一些語義上的限制,如下:
- retrieve操作不應該和store操作并發執行。
- 兩個或者兩個以上的retrieve方法不應該同時執行。
- 兩個或者兩個以上的store方法可以同時執行。
以下的非正規符号描述了沖突集合,即不能并發的方法對的集合.
{(store, retrieve), (retrieve, retrieve)}
基于沖突集合的類可以使用before/after這種模式,即基本操作被那些維護者獨占關系的代碼所環繞。首先,對于每個方法,定義一個計數變量,用以表示該方法是否在執行中。其次,把每個基本操作都隔離入非公共方法中。最後,編寫那些基本操作的公共版本,即在那些基本操作的前後添加上before/after的控制。以下是個示例代碼:
public class Inventory
{
protected final Hashtable items = new Hashtable();
protected final Hashtable suppliers = new Hashtable();
protected int storing = 0;
protected int retrieving = 0;
public void store(String desc, Object item, String supplier)
throws InterruptedException
{
synchronized(this)
{
while(retrieving != 0)
{
wait();
++storing;
}
}
try
{
doStore(desc, item, supplier);
}
finally
{
synchronized(this)
{
if(--storing == 0)
{
notifyAll();
}
}
}
}
public Object retrieve(String desc)
throws InterruptedException
{
synchronized(this)
{
while(storing != 0 || retrieving != 0)
{
wait();
++retrieving;
}
}
try
{
return doRetrieve(desc);
}
finally
{
synchronized(this)
{
if(--retrieving == 0)
{
notifyAll();
}
}
}
}
protected void doStore(String desc, Object item, String supplier)
{
items.put(desc, item);
suppliers.put(supplier, desc);
}
protected Object doRetrieve(String desc)
{
Object x = items.get(desc);
if(x != null)
{
items.remove(desc);
}
return x;
}
}
接下來考慮一個更複雜的例子,一個讀出者和寫入者模型,與Inventroy不同,讀出者和寫入者政策不僅應用于特定方法,而是控制所有具有讀出和寫入語義的方法。假設我們需要進行有目的的鎖定(intention lock),比如,要求按照write,read,write,read,write的順序等。這時候我們需要考慮的有以下幾點:
- 如果目前已經存在一個或者多個活動(執行中)的讀出者,而且有一個寫入者正在等待的時候,一個新的讀出者是否能否立即加入?如果答案是肯定的話,那麼不斷增加的讀出者将會使寫入者無法執行;如果答案為否,那麼讀出者的吞吐量就會下降。
- 如果某些讀出者與寫入者同時在等待一個活動的寫入者完成操作,那麼你的處理政策會偏向讀出者還是寫入者?先到者優先?随意?輪流?
雖然以上政策沒有明确的答案,但是一些标準的解決方案和相關的實作還是存在的,以下一個通用的實作,使用了模闆類和before/after這種模式,其子類版本不需要做過多的修改。而且可以通過讓allowReader和allowWriter方法中的謂詞依賴與這個值,來調整控制政策。以下是示例代碼:
public abstract class ReadWrite
{
protected int activeReaders = 0;
protected int activeWriters = 0;
protected int waitingReaders = 0;
protected int waitingWriters = 0;
protected abstract void doRead();
protected abstract void doWrite();
public void read() throws InterruptedException
{
beforeRead();
try { doRead(); }
finally { afterRead(); }
}
public void write() throws InterruptedException
{
beforeWrite();
try { doWrite(); }
finally { afterWrite(); }
}
protected boolean allowReader()
{
return waitingWriters == 0 && activeWriters == 0;
}
protected boolean allowWriter()
{
return activeReaders == 0 && activeWriters == 0;
}
protected synchronized void beforeRead() throws InterruptedException
{
++waitingReaders;
while(!allowReader())
{
try { wait(); }
catch(InterruptedException ie)
{
--waitingReaders;
throw ie;
}
}
--waitingReaders;
++activeReaders;
}
protected synchronized void afterRead()
{
--activeReaders;
notifyAll();
}
protected synchronized void beforeWrite() throws InterruptedException
{
++waitingWriters;
while(!allowWriter())
{
try { wait(); }
catch(InterruptedException ie)
{
--waitingWriters;
throw ie;
}
}
--waitingWriters;
++activeWriters;
}
protected synchronized void afterWrite()
{
--activeWriters;
notifyAll();
}
}