天天看點

【Netty】之 Future(Promise)

文章目錄

  • ​​一、`Future`簡介​​
  • ​​二、`Promise`簡介​​
  • ​​(1)舉個栗子​​
  • ​​(2)常用`Promise`類​​
  • ​​三、`Promise`源碼淺析​​
  • ​​(1)`get()`​​
  • ​​(2)`await()`​​
  • ​​(3)`addListener()`​​
  • ​​(3)`notifyListeners()`​​
  • ​​(4)`setSuccess()`​​
  • ​​(5)`cancel()`​​
Netty 版本:4.1.12.Final

一、​

​Future​

​簡介

​Netty​

​​的​

​Future​

​​是在​

​JUC​

​​包下的​

​Future​

​基礎上。

主要增加功能:

  1. 添加監聽事件
  2. 删除監聽事件
@SuppressWarnings("ClassNameSameAsAncestorName")
public interface Future<V> extends java.util.concurrent.Future<V> {

    boolean isSuccess();
   
    boolean isCancellable();

    // 但IO操作異常時,傳回原因,否則為null
    Throwable cause();
    
    // 向 Future添加事件,Future完成時,會執行這些事件
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    
    // 移除監聽事件
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);


    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    // 阻塞,等待Future完成
    Future<V> sync() throws InterruptedException;
    
    // 阻塞,不可被打斷
    Future<V> syncUninterruptibly();
    
    // 等待 Future 完成
    Future<V> await() throws InterruptedException;
    
    // 等待,不可被打斷
    Future<V> awaitUninterruptibly();

    boolean await(long timeout, TimeUnit unit) throws InterruptedException;


    boolean await(long timeoutMillis) throws InterruptedException;

    boolean awaitUninterruptibly(long timeout, TimeUnit unit);
  
    boolean awaitUninterruptibly(long timeoutMillis);

    // 立即獲得結果,如果沒有完成,傳回null
    V getNow();
    
    // 如果成功取消,Future失敗,導緻 CancellationException
    @Override
    boolean cancel(boolean mayInterruptIfRunning);
}      
【Netty】之 Future(Promise)

常用類:​

​SucceededFuture​

​​ ​

​FailedFuture​

不需要設定業務邏輯代碼,隻需要設定成功後的傳回和抛出的異常

二、​

​Promise​

​簡介

Future: 業務邏輯所在任務執行的狀态(成功或失敗)是在 Future 中實作的

Promise:可以在業務邏輯控制任務的執行結果

接口定義如下:

public interface Promise<V> extends Future<V> {
    // 設定future執行結果為成功
    Promise<V> setSuccess(V result);
    
    // 嘗試設定future執行結果為成功,傳回是否設定成功
    boolean trySuccess(V result);
    
    // 設定失敗
    Promise<V> setFailure(Throwable cause);
    
    boolean tryFailure(Throwable cause);

    // 設定為不能取消
    boolean setUncancellable();
    
    
    // 以下省略了覆寫Future的一些方法
}      

(1)舉個栗子

public class PromiseTest {

    public static void main(String[] args) {
        
        PromiseTest test = new PromiseTest();
        NioEventLoopGroup loop = new NioEventLoopGroup();
        Promise<String> promise = test.search(loop, "Netty In Action");

        promise.addListener(new GenericFutureListener<Future<? super String>>() {
            @Override
            public void operationComplete(Future<? super String> future) throws Exception {
                System.out.println("Listener 1, price is " + future.get());
            }

        });

        loop.shutdownGracefully();
    }


    private Promise<String> search(NioEventLoopGroup loop, String prod) {
        
        DefaultPromise<String> promise = new DefaultPromise<String>(loop.next());
        
        loop.schedule(new Runnable() {
            @Override
            public void run() {
                System.out.println(String.format("   >>search price of %s from internet!",prod));
                promise.setSuccess("$33.33"); // 等待5S後設定future為成功,

            }
        },0, TimeUnit.SECONDS);

        return promise;
    }
}      

可用​

​Promise​

​對象,為其設定異步調用完成後的操作。

同時可以繼續去做其他任務。

(2)常用​

​Promise​

​類

有:​

​DefaultPromise​

​​,​

​DefaultChannelPromise​

在​

​SingleThreadEventLoop.java​

​中運用

public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}      

三、​

​Promise​

​源碼淺析

檢視​

​DefaultPromise​

【Netty】之 Future(Promise)

(1)​

​get()​

調用的是父類 ​

​AbstractFuture​

@Override
public V get() throws InterruptedException, ExecutionException {
    // 等待任務完成
    await();

    // 擷取異常原因
    Throwable cause = cause();
    if (cause == null) {
        return getNow();
    }
    if (cause instanceof CancellationException) {
        throw (CancellationException) cause;
    }
    throw new ExecutionException(cause);
}      

(2)​

​await()​

@Override
public Promise<V> await() throws InterruptedException {
    // 是否完成
    if (isDone()) {
        return this;
    }

    // 是否中斷
    if (Thread.interrupted()) {
        throw new InterruptedException(toString());
    }
    
    // 檢查死鎖
    checkDeadLock();

    // 同步
    synchronized (this) {
        // 是否完成
        while (!isDone()) {
            // 增加等待者
            incWaiters();
            try {
                // 調用Object.wait(), 阻塞
                wait();
            } finally {
                // 減少等待者
                decWaiters();
            }
        }
    }
    return this;
}      

這裡的 ​

​sychronized​

​ 隻是為了同步 增減 waiter

(3)​

​addListener()​

@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
    checkNotNull(listener, "listener");
    
    // 同步添加
    synchronized (this) {
        addListener0(listener);
    }

    // 若完成,則通知
    if (isDone()) {
        notifyListeners();
    }

    return this;
}

private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
    if (listeners == null) {
        listeners = listener;
    } else if (listeners instanceof DefaultFutureListeners) {
        ((DefaultFutureListeners) listeners).add(listener);
    } else {
        listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
    }
}      

這裡可以看下 ​

​DefaultFutureListeners.java​

感覺這塊設計的并不是很好的

Netty版本:4.1.12.Final

它維護了一個數組來儲存 ​

​listener​

添加/删除操作類似​

​ArrayList​

final class DefaultFutureListeners {

    private GenericFutureListener<? extends Future<?>>[] listeners;
    private int size;
    private int progressiveSize; // the number of progressive listeners

    @SuppressWarnings("unchecked")
    DefaultFutureListeners(
            GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) {
        listeners = new GenericFutureListener[2];
        listeners[0] = first;
        listeners[1] = second;
        size = 2;
        if (first instanceof GenericProgressiveFutureListener) {
            progressiveSize ++;
        }
        if (second instanceof GenericProgressiveFutureListener) {
            progressiveSize ++;
        }
    }

    public void add(GenericFutureListener<? extends Future<?>> l) {
        GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
        final int size = this.size;
        if (size == listeners.length) {
            this.listeners = listeners = Arrays.copyOf(listeners, size << 1);
        }
        listeners[size] = l;
        this.size = size + 1;

        if (l instanceof GenericProgressiveFutureListener) {
            progressiveSize ++;
        }
    }

    public void remove(GenericFutureListener<? extends Future<?>> l) {
        final GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
        int size = this.size;
        for (int i = 0; i < size; i ++) {
            if (listeners[i] == l) {
                int listenersToMove = size - i - 1;
                if (listenersToMove > 0) {
                    System.arraycopy(listeners, i + 1, listeners, i, listenersToMove);
                }
                listeners[-- size] = null;
                this.size = size;

                if (l instanceof GenericProgressiveFutureListener) {
                    progressiveSize --;
                }
                return;
            }
        }
    }

    // 。。。
}      

(3)​

​notifyListeners()​

比如:

  1. 在​

    ​main​

    ​​線程中調用​

    ​addListener()​

  2. ​notifyListeners()​

    ​執行回調,會送出到線程池中執行
内部維護了 ​

​notifiyingListeners​

​​用來記錄是否已經觸發過監聽事件,隻有未觸發過且t監聽清單不為空,才會依次周遊并調用​

​operatingComplete​

private void notifyListeners() {
    EventExecutor executor = executor();
    if (executor.inEventLoop()) {
        // 擷取目前線程 map
        final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
        // 棧深度
        final int stackDepth = threadLocals.futureListenerStackDepth();
        if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
            threadLocals.setFutureListenerStackDepth(stackDepth + 1);
            try {
                notifyListenersNow();
            } finally {
                threadLocals.setFutureListenerStackDepth(stackDepth);
            }
            return;
        }
    }

    safeExecute(executor, new Runnable() {
        @Override
        public void run() {
            notifyListenersNow();
        }
    });
}      

(4)​

​setSuccess()​

private boolean setSuccess0(V result) {
    return setValue0(result == null ? SUCCESS : result);
}

private boolean setValue0(Object objResult) {
    if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
            RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
        checkNotifyWaiters(); // 調用 Object.notfiyAll() 喚醒所有線程
        return true;
    }
    return false;
}      

(5)​

​cancel()​

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
    if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {
        // 檢查通知等待者,喚醒等待線程
        checkNotifyWaiters();
        // 通知所有監聽者
        notifyListeners();
        return true;
    }
    return false;
}