天天看點

ReferenceCountSet無鎖實作 ReferenceCountSet實作

記得很久以前有一次面試被問到如何編寫無鎖程式,我當時覺得那個面試官腦子進水了,我們确實可以在某些情況下減少鎖的使用,但是怎麼可能避免呢?當然我現在還是持這種觀點,在java中,你可以有很多方法減少鎖的使用(至少在你自己的代碼中看起來):

1.     比如常見的可以使用volatile關鍵字來保證某個字段在一個線程中的更新對其他線程的可見性;

2.     可以使用concurrent.atomic包中的各種atomic類來實作某些基本類型操作的,它主要采用忙等機制(cas,compare and swap方法)以及内部的volatile變量來實作不加鎖情況下對某個字段特定更新的線程安全;

3.     使用threadlocal,為每個線程儲存儲存自己的對象,保證對它的操作永遠隻有一個線程;

4.     使用concurrent包下已經實作的線程安全集合,如concurrenthashmap、concurrentskiplistmap、concurrentskiplistset、copyonwritearraylist、copyonwritearrayset等,在這些集合的實作中也會看到volatile和cas的身影,但是有些時候它們自身也不得不使用鎖;

5.     使用blockingqueue實作生産者消費者模式,這個blockingqueue有點類似eventbus,很多時候可以簡化多線程程式設計環境下的複雜性,在特定情況下還可以采用單個生産者或單個消費者的方式來避免多線程環境,但是在對一些臨界資源和類操作時,還是會需要使用鎖,而且在很多blockingqueue的内部實作中也使用了鎖;

6.     使用guava提供的striplock方式來将一些特定的操作、資料映射到固定的線程中,進而保證對它們的操作的線程安全。

然而很多時候java提供的那些所謂的線程安全類隻是一些小範圍的操作,比如atomicinteger中的incrementandget()/decrementandget(),concurrenthashmap中的get和put,當需要将一些操作組合在一起的時候,我們還是不得不使用鎖,比如更具一個key取出map中的值,對值進行一定操作,然後将該值寫入map中。以前我一直覺得這種操作不用鎖是不太可能解決的,但是最近做的一個項目需要從reuters中source資料,reuters中的很多資料tick都非常快,為了提高項目的處理能力,我們必須要減少鎖的使用,因而我開始嘗試着在不用鎖的情況下實作某些操作的組合而依然能保持線程安全,然後發現在組合使用cas和concurrentmap接口中提供的putifabsent、replace、remove等根據傳入的值再來更新狀态的方式還真的能實作不用鎖組合某些操作而依然保持線程安全(至少在自己的代碼中無鎖)。對于這種嘗試出廠了一個referencecountset。

從這個set的名字中應該已經能夠知道它的用途了,在我的這個項目中,我們需要自己維護對某些執行個體的引用計數,是以最基本的,必須有一個集合存儲一個執行個體到它的引用計數的映射關系,而一個執行個體在這個集合中必須是唯一存在,因而我自然想到了使用map來儲存;另外在對引用計數增加時,需要分以下幾個步驟實作:

1.     判斷一個新添加的執行個體是否已經在這個map中存在

2.     如果不存在,則将該執行個體添加到這個map中,并設定其引用計數為1

3.     如果已經存在,則需要取出已經存在的引用計數,對其加1,然後将新值寫入這個map中。

對remove方法來說也是類似的,即需要取出map中已經存在的計數值以後,對其引用減1,然後判斷是否為0來決定是否需要将目前執行個體真正從這個map中移除。

既然需要那麼多的組合操作,顯然沒有一個已經存在的map可以實作不加鎖情況下的線程安全。因而我起初的選擇是使用hashmap<e, integer>加鎖實作(以add為例):

synchronized(map) {

    integer refcount = map.get(element);

    if (refcount == null) {

        refcount = 1;

    } else {

        refcount += 1;

    }

    map.put(element, refcount);

}

但是如何不用鎖實作這些組合操作呢?秘訣就在于靈活的使用atomicinteger和concurrentmap接口。首先需要将這個map改成concurrentmap,如concurrenthashmap,然後将這個map的值改成atomicinteger,然後采用如下實作即可:    

public boolean add(e element) {

    atomicinteger refcount = data.get(element);

        // avoid to put atomicinteger(0) as during remove we need this value to compare

        atomicinteger newrefcount = new atomicinteger(1);

        refcount = data.putifabsent(element, newrefcount);

        if (refcount == null) {

            return true;

        }

    refcount.incrementandget();

    return true;

在這個add方法實作中,我們首先直接使用傳入的element擷取内部存在atomicinteger值,如果該值為null,表示目前還沒有對它有引用計數,因而我們初始化一個atomicinteger(1)對象,但是這時我們不能直接将這個1作為該對象的引用計數,因為另一個線程可能在這中間已經添加相同對象的引用計數了,這個時候如果我們還直接寫入會覆寫在這個中間添加的引用計數。是以我們需要使用concurrentmap中的putifabsent方法,即如果其他線程在這個還是沒有對這個對象有引用計數更新的情況下,我們才會真正寫入現在的引用計數值,進而不會覆寫在這中間寫入的值,該方法傳回原來已經在這個map中的值,因而如果傳回null,表示在這個過程中沒有其他線程對這個對象的計數值有改變,是以直接傳回;如果傳回不為null,表示在這個中間其他線程有對這個對象有做引用計數的改變,并且傳回更新的atomicinteger值,此時隻需要像已經存在引用計數執行個體的情況下對這個傳回的atomicinteger隻自增即可,由于atomicinteger是線程安全的,因而這個操作也是安全的。并且由于對每個線程都使用同一個atomicinteger引用執行個體,因而每個線程的自增都會正确的反映在map的值中,因而這個操作也是正确的。

這裡,我其實是将這個concurrentmap封裝在一個set中,因而我們還需要實作一些其他方法,如size、contains、iterator、remove等,由于其他方法的在concurrenthashmap中已經是線程安全的了,因而我們隻需要實作remove方法即可。這裡的remove方法也包含了多個操作的組合:先取出以存在的計數,對其減1,如果發現它的計數已經減到0,将它從這個map中移除。這裡需要使用concurrentmap中提供的條件remove方法來實作:    

public boolean remove(object obj) {

    atomicinteger refcount = data.get(obj);

        return false;

    if (refcount.decrementandget() <= 0) {

        return data.remove(obj, refcount);

    return false;

這裡需要解釋的就是這個條件remove方法,即目前在對這個object對象的引用計數已經減到0的情況下,我們不能直接将其移除,因為在這個中間可能有另一個線程又增加了對它的引用計數,是以我們需要使用條件remove,即隻有目前map中對這個object對象的值和傳入的值相等時才将它移除,這樣就保證了目前線程的操作不會覆寫中間其他線程的結果。

在這個remove的實作中,有人可能會說在data.get(data)這句話執行完成後,加入它傳回null,此時其他線程可能已經會添加了這個object對象的引用,此時這個方法的執行結果就不對了,但是在這種情況下,即使加鎖,也無法解決這個問題,而且很多情況下的線程安全隻能保證happen-before原則。關于這個類的實作也有一些其他細節的東西,具體可以檢視這裡:

https://github.com/dinglevin/levin-tools/blob/master/src/main/java/org/levin/tools/corejava/sets/referencecountset.java