sun在java5中引入了concurrent包,它對java的并發程式設計提供了強大的支援。首先,它提供了lock接口,可用了更細粒度的控制鎖的區域,它的實作類有reentrantlock,readlock,writelock,其中readlock和writelock共同用于實作reetrantreadwritelock(它繼承自readwritelock,但是沒有實作lock接口,readwritelock接口也沒有繼承lock接口)。而且,它還提供了一些常用并發場景下的類工具:semaphore、countdownlatch和cyclicbarrier。它們個字的應用場景:
semaphore(信号量)
有n個非線程安全的資源(資源池),這些資源使用一個semaphore(計數信号量)保護,每個線程在使用這些資源時需要首先獲得一個信号量(acquire)表示目前資源池還有可用資源,然後線程從該資源池中擷取并移除一個資源,在使用完後,将該資源交回給資源池,并釋放已經獲得信号量(release)(這裡的“移除”、“交回”并不一定需要顯示操作,隻是一種形象的描述,之是以這麼描述是應為這裡的各個資源是一樣的,因而對一個線程它每次拿到的資源不一定是同一個資源,用于區分stripe的使用場景),其中pool是一種典型的應用。
countdownlatch(閉鎖)
有n個task,它們執行完成後需要執行另外一個收尾的task(aggregated task),比如在做report計算中,有n個report要計算,而在所有report計算完成後需要生成一個基于所有report結果的一個總的report,而這個總的report需要等到所有report計算出結果後才能開始,此時就可以定義一個countdownlatch,其初始值是n,在總的report計算前調用countdownlatch的await方法等待其他report執行完成,而其他report在完成後都會調用countdownlatch中的countdown方法。
cyclicbarrier(關卡)
每個線程執行完成後需要等待,直到n個線程都執行完成後,才能繼續執行,在n個線程執行完成之後,而下一次執行開始之前可以添加自定義邏輯(通過建構cyclicbarrier執行個體時傳入一個runnable執行個體自定義邏輯),即在每個線程執行完成後調用cyclicbarrier的await方法并等待(即所謂的關卡),當n個線程都完成後,自定義的runnable執行個體會自動被執行(如果存在這樣的runnable執行個體的話),然後所有線程繼續下一次執行。這個現實中的例子沒有想到比較合适的。。。。
exchanger(交換者)
exchanger是一種特殊的cyclicbarrier,它隻有兩個線程參與,一個生産者,一個消費者,有兩個隊列共同參與,生産者和消費者各自有一個隊列,其中生産者向它的隊列添加資料,而消費者從它包含的隊列中拿資料,當生産者中的隊列滿時調用exchange方法,傳入自己原有的隊列,期待交換得到消費者中空的隊列;而當消費者中的隊列滿時同樣調用exchange方法,傳入自己的原有隊列,期待擷取到生産者中已經填滿的隊列。這樣,生産者和消費者可以和諧的生産消費,并且它們的步驟是一緻的(不管哪一方比另一方快都會等待另一方)。
最後,java5中還提供了一些atomic類以實作簡單場景下高效非lock方式的線程安全,以及blockingqueue、synchronizer、completionservice、concurrenthashmap等工具類。
在實際應用中,我們有一個cache系統,它包含key和payload的鍵值對(map),在cache中map的實作已經是線程安全了,然而我們不僅僅是向cache中寫資料要保證線程安全,在操作payload時,也需要保證線程安全。因為我們在cache中的資料量很大,為每個payload配置一個單獨的鎖顯然不現實,也不需要因為它們沒有那麼高的并發行,因而我們需要一種機制将key分成不同的group,而每個group共享一個鎖(這就是concurrenthashmap的實作思路)。通過key即可獲得一個鎖,并且每個相同的key獲得的鎖執行個體是相同的(獲得相同鎖執行個體的key它們不一定相等,因為這是一對多的關系)。
根據以上應用場景,stripe的實作很簡單,隻需要内部儲存一個lock數組,對每個給定的key,計算其hash值,根據hash值計算其鎖對應的數組下标,而該下标下的lock執行個體既是和該key關聯的lock執行個體。這裡通過hash值把key和lock執行個體關聯起來,為了擴充性,在實作時還可以把計算數組下标的邏輯抽象成一個接口,使用者可以通過傳入自定義該接口的實作類執行個體加入使用者自定義的關聯邏輯,預設采用hash值關聯方式。
在guava中,stripe以抽象類的形式存在,它定義了通過給定key或index獲得相應lock/semaphore/readwritelock執行個體:
public abstract class striped<l> {
/**
* returns the stripe that corresponds to the passed key. it is always guaranteed that if
* {@code key1.equals(key2)}, then {@code get(key1) == get(key2)}.
*
* @param key an arbitrary, non-null key
* @return the stripe that the passed key corresponds to
*/
public abstract l get(object key);
* returns the stripe at the specified index. valid indexes are 0, inclusively, to
* {@code size()}, exclusively.
* @param index the index of the stripe to return; must be in {@code [0
size())}
* @return the stripe at the specified index
public abstract l getat(int index);
* returns the index to which the given key is mapped, so that getat(indexfor(key)) == get(key).
abstract int indexfor(object key);
* returns the total number of stripes in this instance.
public abstract int size();
* returns the stripes that correspond to the passed objects, in ascending (as per
* {@link #getat(int)}) order. thus, threads that use the stripes in the order returned
* by this method are guaranteed to not deadlock each other.
* <p>it should be noted that using a {@code striped<l>} with relatively few stripes, and
* {@code bulkget(keys)} with a relative large number of keys can cause an excessive number
* of shared stripes (much like the birthday paradox, where much fewer than anticipated birthdays
* are needed for a pair of them to match). please consider carefully the implications of the
* number of stripes, the intended concurrency level, and the typical number of keys used in a
* {@code bulkget(keys)} operation. see <a href="http://www.mathpages.com/home/kmath199.htm">balls
* in bins model</a> for mathematical formulas that can be used to estimate the probability of
* collisions.
* @param keys arbitrary non-null keys
* @return the stripes corresponding to the objects (one per each object, derived by delegating
* to {@link #get(object)}; may contain duplicates), in an increasing index order.
public iterable<l> bulkget(iterable<?> keys);
}
可以使用一下幾個靜态工廠方法建立相應的striped執行個體,其中lazyweakxxx建立的striped執行個體中鎖以弱引用的方式存在(在什麼樣的場景中使用呢?):
/**
* creates a {@code striped<lock>} with eagerly initialized, strongly referenced locks.
* every lock is reentrant.
*
* @param stripes the minimum number of stripes (locks) required
* @return a new {@code striped<lock>}
*/
public static striped<lock> lock(int stripes);
* creates a {@code striped<lock>} with lazily initialized, weakly referenced locks.
public static striped<lock> lazyweaklock(int stripes);
* creates a {@code striped<semaphore>} with eagerly initialized, strongly referenced semaphores,
* with the specified number of permits.
* @param stripes the minimum number of stripes (semaphores) required
* @param permits the number of permits in each semaphore
* @return a new {@code striped<semaphore>}
public static striped<semaphore> semaphore(int stripes, final int permits);
* creates a {@code striped<semaphore>} with lazily initialized, weakly referenced semaphores,
public static striped<semaphore> lazyweaksemaphore(int stripes, final int permits);
* creates a {@code striped<readwritelock>} with eagerly initialized, strongly referenced
* read-write locks. every lock is reentrant.
* @return a new {@code striped<readwritelock>}
public static striped<readwritelock> readwritelock(int stripes);
* creates a {@code striped<readwritelock>} with lazily initialized, weakly referenced
public static striped<readwritelock> lazyweakreadwritelock(int stripes);
striped有兩個具體實作類,compactstriped和lazystriped,他們都繼承自poweroftwostriped(用于表達内部儲存的stripes值是2的指數值)。poweroftwostriped實作了indexfor()方法,它使用hash值做映射函數:
private abstract static class poweroftwostriped<l> extends striped<l> {
/** capacity (power of two) minus one, for fast mod evaluation */
final int mask;
@override final int indexfor(object key) {
int hash = smear(key.hashcode());
return hash & mask;
}
}
private static int smear(int hashcode) {
hashcode ^= (hashcode >>> 20) ^ (hashcode >>> 12);
return hashcode ^ (hashcode >>> 7) ^ (hashcode >>> 4);
compactstriped類使用一個數組儲存所有的lock/semaphore/readwritelock執行個體,在初始化時就建立所有的鎖執行個體;而lazystriped類使用一個值為weakreference的concurrentmap做為資料結構,index值為key,lock/semaphore/readwritelock的weakreference為值,所有鎖執行個體在用到時動态建立。在compactstriped中建立鎖執行個體時對reentrantlock/semaphore建立采用paddedxxx版本,不知道為何要做pad。
striped類實作的類圖如下: