前言
閱讀本文前,需要讀者對happens-before比較熟悉,了解非阻塞同步的一些基本概念。本文主要為happens-before法則的靈活運用,和一些解決問題的小技巧,分析問題的方式。
背景介紹
原始需求為:本人當時在編寫一個正則替換工具,裡面會動态地顯示所有的比對結果(包括替換預覽),文本、正規表達式、參數,這些資料的其中一項發生了變化,結果就應該被更新,為了提供友好的互動體驗,資料變化時,應該是發起一個異步請求,由另一個獨立的線程來完成運算,完成後通知UI更新結果。由于是動态顯示,是以送出會非常頻繁。
需求描述
需要這樣一個工具類,允許使用者頻繁地送出資料(本文之後以“submit”表示該操作)和更新結果(本文之後以“update”表示該操作),submit時,如果目前有進行中的運算,則應該取消,使用新參數執行新的運算;update時,如果目前沒有進行中的運算(處于阻塞狀态),并且目前結果不是最新的,則喚醒該線程,使用目前的新資料,執行新的運算。此處之是以分為submit和update兩個方法,是為了支援手動更新,即點選更新按鈕時,才更新結果。
此外,出于練手的原因,也出于編寫一個功能全面,更實用的工具的目的,我還加入了一些額外的需求:
1、引入多線程場景,update和submit均可由多個線程同時發起,該工具類應設計成線程安全的。
2、允許延遲執行運算,如果延時内執行submit,僅重新計算延時。如果運算不友善取消,在短時間頻繁submit的場景下,延時會是一個很好的應對辦法。
3、允許設定一個最大延遲時間,作為延遲開啟運算的補充。當長時間頻繁submit時,會形成這樣的局面,一直未進入運算環節,新結果計算不出來,上一次計算結果卻是很早以前的。如果需要顯示一個較新但不是最新的結果,最大延遲時間将會很有用。
4、提供主動取消方法,主動取消正在進行的運算。
5、update時,允許等待運算完成,同時也可設定逾時時間。當主動取消、逾時、完成了目前或更(更加的意思)新的資料對應的運算時,結束等待。
需求交待完了,有興趣有精力的讀者,可以先試着思考下怎麼實作。
問題分析
該工具應該維護一個狀态字段,這樣才能在發起某個操作時,根據所處的狀态作出正确的動作,如:如果目前不處于停止狀态(或者主動取消狀态,原因見下文),執行update就不需要喚醒運算線程。簡單分析可知,至少應該有這樣幾種狀态:
1、停止狀态:目前沒有運算任務,線程進入阻塞狀态,主動取消和運算完成後,進入該狀态
2、延遲狀态:設定了延遲開啟運算時,進入運算前,處于該狀态
3、運算狀态:正在執行運算
4、主動取消狀态:當發起主動取消時,進入該狀态
5、新任務狀态:當時有新的運算任務時,進入該狀态,然後重新進入運算狀态
延遲
再來看一下延遲,如果延遲500毫秒,就每次sleep(500),那麼期間再submit怎麼辦?将它喚醒然後重新sleep(500)嗎?顯然不行,成本太大了。
我有一個小技巧:将500分成多個合适的等份,使用一個計數器,每次sleep一個等份,計數器加1,如果發起submit,僅把計數器置0即可,雖然看起來線程的狀态切換變多了,但應對頻繁重置時,它更穩定。雖然時間上會上下波動一個等份,但此處并不需要多麼精确。
現在還面臨這樣一個問題,如何知道目前是處于延遲狀态并計數器置0?取出狀态值進行判斷,然後置0,這方法顯然不行,因為置0的時候,可能狀态已經變了,是以你無法知道該操作是否生效了。
我想到的辦法是,再引入一個延遲重置狀态。如果處于該狀态,則下一次計數器加1時,将計數器重置,狀态變更是可以知道成功與否的。
狀态變更
有些狀态的變更是有條件的,比如說目前處于取消狀态,就不能把它轉為運算狀态,運算狀态隻能由新任務狀态、延遲狀态(延遲完成後執行運算)或延遲重置狀态轉入。這種場景正好跟CAS一緻,是以,使用一個AtomicInteger來表示狀态。
分析下各狀态之間的轉換,可以得出下面的狀态變更圖:
藍色的a(bcd)|(e)f線路為停止狀态下,發起一次update,運算完重新回到停止的過程,開啟延遲時是bcd,否則是e。
紅色的線j表示超過了最大延遲時間,退出延遲,進入運算狀态(也可以是d)。
綠色的線ghi(包括a)表示:如果發起了submit或update,狀态應該怎麼改變。如果處于延遲重置、新任務則不需要進行任何操作;如果處于延遲狀态,則轉為延遲重置即可;如果處于運算狀态,則可能使用了舊參數,應該轉為新任務;如果為主動取消或停止狀态,并且是調用update方法,則轉為新任務,并且可能處于阻塞狀态,應該喚醒該線程。
黑色的線l表示,可在任意狀态下發起主動取消,進入該狀态。然後通知等待線程後,轉入停止狀态,對應紫色的k,如果在停止狀态下發起主動取消,則僅轉為主動取消狀态,不會通知等待線程。是以當線程阻塞時,可能處于停止狀态或者主動取消狀态。
順序問題
上面已經分析到,當submit時,應該把延遲轉為延遲重置、或運算轉為新任務,這兩個嘗試的順序是不是也有講究呢?
是的,因為正常執行流程a(bcd)|(e)f中,運算狀态在延遲狀态之後,假如先嘗試運算轉為新任務,可能此時為延遲狀态,故失敗,再嘗試延遲轉為延遲重置時,狀态在這期間從剛才的延遲轉為了運算,故兩次嘗試都失敗了,本應該重置延遲的,卻什麼也沒幹,這是錯誤的。而将兩次嘗試順序調換一下,隻要狀态為延遲或運算,那麼兩次狀态轉換嘗試中,一定有一次會成功。
之後的代碼中還有多處類似的順序細節。
解決方案
下面給出完整的代碼,除去等待運算完成那部分,其它地方均為wait-free級别的實作。
calculateResult是具體執行運算的方法;上文中的submit對應代碼裡的updateParametersVersion方法,上文中的update對應剩餘幾個update方法。
updateAndWait方法中,使用了上一篇中講到的BoundlessCyclicBarrier,其維護的版本号就是參數的版本号ParametersVersion。
/**
* @author [email protected]
* @date 2013-2-2
*/
public abstract class LatestResultsProvider {
/** update return value */
public static final int UPDATE_FAILED = -1;
public static final int UPDATE_NO_NEED_TO_UPDATE = 0;
public static final int UPDATE_SUCCESS = 1;
public static final int UPDATE_COMMITTED = 2;
/** work states*/
private static final int WS_OFF = 0;
private static final int WS_NEW_TASK = 1;
private static final int WS_WORKING = 2;
private static final int WS_DELAYING = 3;
private static final int WS_DELAY_RESET = 4;
private static final int WS_CANCELED = 5;
private final AtomicInteger workState;
private int sleepPeriod = 30;
private final AtomicInteger parametersVersion;
private volatile int updateDelay;// updateDelay>=0
private volatile int delayUpperLimit;
private final BoundlessCyclicBarrier barrier;
private Thread workThread;
/**
*
* @param updateDelay unit: millisecond
* @param delayUpperLimit limit the sum of the delay, disabled
* while delayUpperLimit<0, unit: millisecond
*/
public LatestResultsProvider(int updateDelay, int delayUpperLimit) {
if (updateDelay < 0)
this.updateDelay = 0;
else
this.updateDelay = updateDelay;
this.delayUpperLimit = delayUpperLimit;
barrier = new BoundlessCyclicBarrier(0);
workState = new AtomicInteger(WS_OFF);
parametersVersion = new AtomicInteger(0);
initThread();
}
private void initThread() {
workThread = new Thread("trytocatch's worker") {
@Override
public void run() {
int sleepCount = 0;
for (;;) {
try {
while (!workState.compareAndSet(WS_NEW_TASK,
updateDelay > 0 ? WS_DELAY_RESET : WS_WORKING)) {
if (workState.compareAndSet(WS_CANCELED, WS_OFF)) {
barrier.cancel();
}
LockSupport.park();
interrupted();
}
if (workState.get() == WS_DELAY_RESET) {
int delaySum = 0;
for (;;) {
if (workState.compareAndSet(WS_DELAY_RESET,
WS_DELAYING)) {
sleepCount = (updateDelay + sleepPeriod - 1)
/ sleepPeriod;
}
sleep(sleepPeriod);
if (--sleepCount <= 0
&& workState.compareAndSet(WS_DELAYING,
WS_WORKING))
break;
if (delayUpperLimit >= 0) {
delaySum += sleepPeriod;
if (delaySum >= delayUpperLimit) {
if (!workState.compareAndSet(
WS_DELAYING, WS_WORKING))
workState.compareAndSet(
WS_DELAY_RESET, WS_WORKING);
break;
}
if (workState.get() != WS_DELAYING
&& workState.get() != WS_DELAY_RESET)
if (isWorking()) {
int workingVersion = parametersVersion.get();
try {
calculateResult();
if (workState.compareAndSet(WS_WORKING, WS_OFF))
barrier.nextCycle(workingVersion);
} catch (Throwable t) {
t.printStackTrace();
workState.set(WS_CANCELED);
} catch (InterruptedException e) {
workState.compareAndSet(WS_DELAYING, WS_CANCELED);
workState.compareAndSet(WS_DELAY_RESET, WS_CANCELED);
}
}// for(;;)
}// run()
};
workThread.setDaemon(true);
workThread.start();
public int getUpdateDelay() {
return updateDelay;
* @param updateDelay
* delay time. unit: millisecond
public void setUpdateDelay(int updateDelay) {
this.updateDelay = updateDelay < 0 ? 0 : updateDelay;
public int getDelayUpperLimit() {
return delayUpperLimit;
public void setDelayUpperLimit(int delayUpperLimit) {
public final void stopCurrentWorking() {
workState.set(WS_CANCELED);
* @return NO_NEED_TO_UPDATE, COMMITTED
public final int update() {
if (isResultUptodate())
return UPDATE_NO_NEED_TO_UPDATE;
if (workState.compareAndSet(WS_CANCELED, WS_NEW_TASK)
|| workState.compareAndSet(WS_OFF, WS_NEW_TASK))
LockSupport.unpark(workThread);
return UPDATE_COMMITTED;
* @param timeout
* unit:nanoseconds
* @return FAILED, NO_NEED_TO_UPDATE, SUCCESS
* @throws InterruptedException
public final int updateAndWait(long nanosTimeout)
throws InterruptedException {
int newVersion = parametersVersion.get();
if (update() == UPDATE_NO_NEED_TO_UPDATE)
barrier.awaitWithAssignedVersion(newVersion, nanosTimeout);
return barrier.getVersion() - newVersion >= 0 ? UPDATE_SUCCESS
: UPDATE_FAILED;
public final int updateAndWait() throws InterruptedException {
return updateAndWait(0);
public final boolean isResultUptodate() {
return parametersVersion.get() == barrier.getVersion();
* be used in calculateResult()
* @return true: the work state is working, worth to calculate the
* result absolutely, otherwise you can cancel the current calculation
protected final boolean isWorking() {
return workState.get()==WS_WORKING;
* you must call this after update the parameters, and before calling the
* update
protected final void updateParametersVersion() {
int pVersion = parametersVersion.get();
//CAS failed means that another thread do the same work already
if (parametersVersion.compareAndSet(pVersion, pVersion + 1))
if (!workState.compareAndSet(WS_DELAYING, WS_DELAY_RESET))
workState.compareAndSet(WS_WORKING, WS_NEW_TASK);
* implement this to deal with you task
protected abstract void calculateResult();
}
代碼中,我直接在構造方法裡開啟了新的線程,一般來說,是不推薦這樣做的,但在此處,除非在構造還未完成時就執行update方法,否則不會引發什麼問題。
最後,附上該正則替換工具的介紹和下載下傳位址:
http://www.cnblogs.com/trytocatch/p/RegexReplacer.html http://www.cnblogs.com/trytocatch/p/RegexReplacer.html小結
狀态變更非常适合使用非阻塞算法,并且還能夠達到wait-free級别。限于篇幅,有些沒講到的細節,請讀者借助代碼來了解吧,如有疑問,歡迎回複讨論。
系列總結
本實戰系列就到此結束了,簡單總結下。
非阻塞同步相對于鎖同步而言,由代碼塊,轉為了點,是另一種思考方式。
有時,無法做到一步完成,也許可以分成兩步完成,同樣可以解決問題,ConcurrentLinkedQueue就是這麼做的。
如果需要維護多個資料之間的某種一緻關系,則可以将它們封裝到一個類中,更新時采用更新該類對象的引用的方式。
衆所周知,鎖同步算法是難以測試的,非阻塞同步算法更加難以測試,我個人認為,其正确性主要靠慎密的推敲和論證。
非阻塞同步算法比鎖同步算法要顯得更複雜些,如果對性能要求不高,對非阻塞算法掌握得還不太熟練,建議不要使用非阻塞算法,鎖同步算法要簡潔得多,也更容易維護,如上面所說的,兩條看似沒有順序的語句,調換下順序,可能就會引發BUG。