文章目錄
- 一、`Future`簡介
- 二、`Promise`簡介
- (1)舉個栗子
- (2)常用`Promise`類
- 三、`Promise`源碼淺析
- (1)`get()`
- (2)`await()`
- (3)`addListener()`
- (3)`notifyListeners()`
- (4)`setSuccess()`
- (5)`cancel()`
Netty 版本:4.1.12.Final
一、 Future
簡介
Future
Netty
的
Future
是在
JUC
包下的
Future
基礎上。
主要增加功能:
- 添加監聽事件
- 删除監聽事件
@SuppressWarnings("ClassNameSameAsAncestorName")
public interface Future<V> extends java.util.concurrent.Future<V> {
boolean isSuccess();
boolean isCancellable();
// 但IO操作異常時,傳回原因,否則為null
Throwable cause();
// 向 Future添加事件,Future完成時,會執行這些事件
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 移除監聽事件
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 阻塞,等待Future完成
Future<V> sync() throws InterruptedException;
// 阻塞,不可被打斷
Future<V> syncUninterruptibly();
// 等待 Future 完成
Future<V> await() throws InterruptedException;
// 等待,不可被打斷
Future<V> awaitUninterruptibly();
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
boolean await(long timeoutMillis) throws InterruptedException;
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
boolean awaitUninterruptibly(long timeoutMillis);
// 立即獲得結果,如果沒有完成,傳回null
V getNow();
// 如果成功取消,Future失敗,導緻 CancellationException
@Override
boolean cancel(boolean mayInterruptIfRunning);
}
常用類:
SucceededFuture
FailedFuture
不需要設定業務邏輯代碼,隻需要設定成功後的傳回和抛出的異常
二、 Promise
簡介
Promise
Future: 業務邏輯所在任務執行的狀态(成功或失敗)是在 Future 中實作的
Promise:可以在業務邏輯控制任務的執行結果
接口定義如下:
public interface Promise<V> extends Future<V> {
// 設定future執行結果為成功
Promise<V> setSuccess(V result);
// 嘗試設定future執行結果為成功,傳回是否設定成功
boolean trySuccess(V result);
// 設定失敗
Promise<V> setFailure(Throwable cause);
boolean tryFailure(Throwable cause);
// 設定為不能取消
boolean setUncancellable();
// 以下省略了覆寫Future的一些方法
}
(1)舉個栗子
public class PromiseTest {
public static void main(String[] args) {
PromiseTest test = new PromiseTest();
NioEventLoopGroup loop = new NioEventLoopGroup();
Promise<String> promise = test.search(loop, "Netty In Action");
promise.addListener(new GenericFutureListener<Future<? super String>>() {
@Override
public void operationComplete(Future<? super String> future) throws Exception {
System.out.println("Listener 1, price is " + future.get());
}
});
loop.shutdownGracefully();
}
private Promise<String> search(NioEventLoopGroup loop, String prod) {
DefaultPromise<String> promise = new DefaultPromise<String>(loop.next());
loop.schedule(new Runnable() {
@Override
public void run() {
System.out.println(String.format(" >>search price of %s from internet!",prod));
promise.setSuccess("$33.33"); // 等待5S後設定future為成功,
}
},0, TimeUnit.SECONDS);
return promise;
}
}
可用
Promise
對象,為其設定異步調用完成後的操作。
同時可以繼續去做其他任務。
(2)常用 Promise
類
Promise
有:
DefaultPromise
,
DefaultChannelPromise
在
SingleThreadEventLoop.java
中運用
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
三、 Promise
源碼淺析
Promise
檢視
DefaultPromise
(1) get()
get()
調用的是父類
AbstractFuture
@Override
public V get() throws InterruptedException, ExecutionException {
// 等待任務完成
await();
// 擷取異常原因
Throwable cause = cause();
if (cause == null) {
return getNow();
}
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
throw new ExecutionException(cause);
}
(2) await()
await()
@Override
public Promise<V> await() throws InterruptedException {
// 是否完成
if (isDone()) {
return this;
}
// 是否中斷
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
// 檢查死鎖
checkDeadLock();
// 同步
synchronized (this) {
// 是否完成
while (!isDone()) {
// 增加等待者
incWaiters();
try {
// 調用Object.wait(), 阻塞
wait();
} finally {
// 減少等待者
decWaiters();
}
}
}
return this;
}
這裡的
sychronized
隻是為了同步 增減 waiter
(3) addListener()
addListener()
@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
checkNotNull(listener, "listener");
// 同步添加
synchronized (this) {
addListener0(listener);
}
// 若完成,則通知
if (isDone()) {
notifyListeners();
}
return this;
}
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
if (listeners == null) {
listeners = listener;
} else if (listeners instanceof DefaultFutureListeners) {
((DefaultFutureListeners) listeners).add(listener);
} else {
listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
}
}
這裡可以看下
DefaultFutureListeners.java
感覺這塊設計的并不是很好的
Netty版本:4.1.12.Final
它維護了一個數組來儲存
listener
添加/删除操作類似
ArrayList
final class DefaultFutureListeners {
private GenericFutureListener<? extends Future<?>>[] listeners;
private int size;
private int progressiveSize; // the number of progressive listeners
@SuppressWarnings("unchecked")
DefaultFutureListeners(
GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) {
listeners = new GenericFutureListener[2];
listeners[0] = first;
listeners[1] = second;
size = 2;
if (first instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
if (second instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
}
public void add(GenericFutureListener<? extends Future<?>> l) {
GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
final int size = this.size;
if (size == listeners.length) {
this.listeners = listeners = Arrays.copyOf(listeners, size << 1);
}
listeners[size] = l;
this.size = size + 1;
if (l instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
}
public void remove(GenericFutureListener<? extends Future<?>> l) {
final GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
int size = this.size;
for (int i = 0; i < size; i ++) {
if (listeners[i] == l) {
int listenersToMove = size - i - 1;
if (listenersToMove > 0) {
System.arraycopy(listeners, i + 1, listeners, i, listenersToMove);
}
listeners[-- size] = null;
this.size = size;
if (l instanceof GenericProgressiveFutureListener) {
progressiveSize --;
}
return;
}
}
}
// 。。。
}
(3) notifyListeners()
notifyListeners()
比如:
- 在
線程中調用main
addListener()
-
執行回調,會送出到線程池中執行notifyListeners()
内部維護了 用來記錄是否已經觸發過監聽事件,隻有未觸發過且t監聽清單不為空,才會依次周遊并調用
notifiyingListeners
operatingComplete
private void notifyListeners() {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
// 擷取目前線程 map
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
// 棧深度
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListenersNow();
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}
(4) setSuccess()
setSuccess()
private boolean setSuccess0(V result) {
return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
checkNotifyWaiters(); // 調用 Object.notfiyAll() 喚醒所有線程
return true;
}
return false;
}
(5) cancel()
cancel()
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {
// 檢查通知等待者,喚醒等待線程
checkNotifyWaiters();
// 通知所有監聽者
notifyListeners();
return true;
}
return false;
}