天天看点

Java并发-异步执行Future

Java并发-异步执行Future

在开发过程中很容易遇到这么一种情况,要执行一个耗时的任务,为了提高效率我们在执行耗时任务的时候同时执行其他暂时不需要耗时任务的代码,直到需要这个耗时任务的返回时才去获取任务结果,那么在Java中Future接口是专门用来异步执行,并在需要的时候获取执行的结果的一个接口。典型的我们有

/**
 * jdk 
 * /
public class Main {
    public static void main(String[] args) {
        ExecutorService exec = new ThreadPoolExecutor(, , , TimeUnit.SECONDS, new ArrayBlockingQueue<>());
        Future<String> future = exec.submit(Main::doSomethingNeedTime);
        //其他的操作
        try{
            String result = future.get();
        }catch (Exception ex){
            //log
        }
    }

    private static String doSomethingNeedTime(){
        try {
            TimeUnit.SECONDS.sleep();
        }catch (InterruptedException ex){
            //log
        }
        return "over";
    }
}
           

在这里线程池submit方法返回的是一个FutureTask类,接下来我们就从源码入手讲一讲这个FutureTask具体是怎么工作的。

FutureTask

FutureTask实现了FutureRunnable接口,而这个接口又继承了Future接口和Runnable接口。

正式因为FutureTask实现的接口FutureRunnable即继承了Future接口又继承了Runnable接口所以ThreadPoolExecutor的submit方法其实是将Callable用FutureTask适配成了Runnable最后调用了execute方法。

Future的主要字段如下

//代表当前FutureTask的状态
    private volatile int state;
    private static final int NEW          = ;
    private static final int COMPLETING   = ;
    private static final int NORMAL       = ;
    private static final int EXCEPTIONAL  = ;
    private static final int CANCELLED    = ;
    private static final int INTERRUPTING = ;
    private static final int INTERRUPTED  = ;
    //要执行的任务,最后get获取的就是这个任务的返回
    private Callable<V> callable;
    //callable的结果
    private Object outcome; 
    //运行Callable的线程
    private volatile Thread runner;
    //等待线程链表表头
    private volatile WaitNode waiters;
           

Future一共有如上6种状态,它的生命周期状态一般经历如下变化,所有FutureTask都由新建态New开始

//COMPLETING代表完成中,是调用set函数给outcome赋值过程中的状态
    NEW -> COMPLETING -> NORMAL
    NEW -> COMPLETING -> EXCEPTIONAL
    NEW -> CANCELLED
    NEW -> INTERRUPTING -> INTERRUPTED
           

WaitNode是一个封装了调用get函数想获取future返回值的线程,通过WaitNode组成的链表来park那些在FutureTask状态<=2时调用get想获取结果的线程,并在future运算完成后unpark线程让他们获取结果

static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }
           

接下来我们通过代码来看一看一个future整个的执行过程,首先是run方法,这个方法是Runnable接口run方法的实现,用来执行callable任务得到结果。

public void run() {
        //初始必须为新建态,任务只能由一个线程执行,通过cas设置执行线程
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
           

run方法的主要思想是首先一个FutureTask执行run方法时一定是New状态,通过cas来制定执行线程,如果cas失败说明有其他线程抢先执行了这个方法了,方法的执行很简单,就是执行Callable获取返回值,并根据try/catch来处理执行成功和抛出异常两种情况。

如果执行成功会调用set方法给outcome赋值,赋值过程为首先CAS将状态改为完成中(CAS失败说明有别的线程已经在执行或执行过这个方法了),然后赋值,赋值完毕后状态改为完成,完成后调用finishCompletion唤醒等待结果的线程

protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); 
            finishCompletion();
        }
    }

    private void finishCompletion() {
        //遍历链表将节点移除并unpark线程
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
        //钩子函数
        done();

        callable = null;        // to reduce footprint
    }
           

这里的done()是钩子函数,本身是空实现,我们可以根据需要在继承FutureTask后重写这个方法做后置处理。

那么线程时何时被阻塞的呢?在unpark后线程又如何获取到结果?这就考get函数了,get函数首先判断FutureTask状态,如果还没有完成就会调用report函数,进入无限循环直到FutureTask状态大于2即大于完成中状态返回最终状态,最后会根据FutureTask状态调用report返回结果或抛出对应异常

public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, L);
        return report(s);
    }

    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            //调用get的线程被中断了,移除封装它的Node(如果存在)抛出异常
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            //状态大于完成中返回最终状态
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            //完成中状态则让出cpu
            else if (s == COMPLETING) 
                Thread.yield();
            //线程没有创建对应节点则创建
            else if (q == null)
                q = new WaitNode();
            //对应节点没有入队则入队
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            //如果是带有超时的等待则调用parkNanos进行带有时限的挂起                                        
            else if (timed) {
                nanos = deadline - System.nanoTime();
                //超时移除节点,返回当前状态
                if (nanos <= L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }
           

awaitDone方法思路是进入无线循环,每次循环查看状态如果状态大于完成中就返回最终状态否则,先判断是否在完成中如果是线程让出cpu,再判断是否为线程创建了节点,如果没有则创建,再判断节点是否入队没有通过CAS尝试入队,失败也没关系在之后的循环中还会继续尝试,最后状态<=2时,线程节点也入队了那么久挂起线程,线程会在set方法完成后调用的finishCompletion方法中被unpark,然后执行下一次循环,这时状态一定大于了完成中,本方法返回最终状态。最后report方法根据最终状态来返回结果

private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }
           

上述就是一个FutureTask基本的工作过程,其实FutureTask还提供了其他的额外功能,比如带时限等待的get,它的原理其实和上述内容大同小异

public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        //对awaitDone方法增加了超时判断,超时返回时状态时<=2的
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }
           

取消任务执行

public boolean cancel(boolean mayInterruptIfRunning) {
        //必须在新建态下取消
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            //取消后唤醒等待get结果的线程
            finishCompletion();
        }
        return true;
    }
           

总结

Future是异步任务的一个基础接口,除了本文介绍了常用的FutureTask实现外,Java8还有CompletableFuture实现,从而方便的进行组合式异步编程,他配合流可以很方便执行io繁忙的任务并且能够指定在任务完成后要执行的顺序任务或者指定和他一起并发执行的任务并在所有任务完成后指定一个对所有任务结果处理的任务等等,感兴趣的小伙伴可以参考Java8实战的11章。