天天看點

加強版異步任務架構

一、前言

為了提高流暢性,耗時任務放背景線程運作,已是APP開發的常識了。

關于異步有很多方案,目前最流行的,莫過于RxJava了;

更早一些時候,還有AsyncTask(骨灰級的API)。

總的來說,AsyncTask構思精巧,代碼簡潔,使用友善,有不少地方值得借鑒。

當然問題也有不少,比如不能随Activity銷毀而銷毀導緻的記憶體洩漏,還有不适合做長時間的任務等。

筆者以AsyncTask為範本,寫了一個“加強版的異步任務架構”:

保留了AsyncTask的所有用法,解決了其中的一些問題,同時引入了一些新特性。

接下來給大家介紹一下這“加強版”的架構,希望對各位有所啟發。

二、任務排程

2.1 AsyncTask的Executor

AsyncTask的任務排程主要依賴兩個Executor:ThreadPoolExecutor 和 SerialExecutor。

代碼如下:

private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;

public static final Executor THREAD_POOL_EXECUTOR;
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();

static {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, 30, TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(128), sThreadFactory);
    threadPoolExecutor.allowCoreThreadTimeOut(true);
    THREAD_POOL_EXECUTOR = threadPoolExecutor;
}

private static class SerialExecutor implements Executor {
    final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
    Runnable mActive;

    public synchronized void execute(final Runnable r) {
        mTasks.offer(new Runnable() {
            public void run() {
                try {
                    r.run();
                } finally {
                    scheduleNext();
                }
            }
        });
        if (mActive == null) {
            scheduleNext();
        }
    }

    protected synchronized void scheduleNext() {
        if ((mActive = mTasks.poll()) != null) {
            THREAD_POOL_EXECUTOR.execute(mActive);
        }
    }
}           

關于線程池,估計大家都很熟悉了,參數就不多作解釋了。

如果不是很熟悉,推薦閱讀筆者的另一篇文章

《速讀Java線程池》

上面代碼中,通過巧用“裝飾者模式”,增加“串行排程”的功能。

裝飾者模式有以下特點:

  1. 裝飾對象和真實對象有相同的接口,這樣用戶端對象就能以和真實對象相同的方式和裝飾對象互動。
  2. 裝飾對象包含一個真實對象的引用。
  3. 裝飾對象接受所有來自用戶端的請求,它把這些請求轉發給真實的對象。
  4. 裝飾對象可以在轉發這些請求以前或以後增加一些附加功能。

SerialExecutor隻有二十來行代碼,卻用了兩次裝飾者模式:Runnable和Executor。

  • Runnable部分,往隊列添加的匿名Runnable對象(裝飾對象),當被Executor調用run()方法時,先執行“真實對象”的run()方法,然後再調用scheduleNext();
  • Executor部分,通過增加一個任務隊列,實作串行排程的功能,而具體的任務執行轉發給“真實對象”THREAD_POOL_EXECUTOR。

想要串行排程,為什麼不多加一個coreSize=1的ThreadPoolExecutor呢?

兩個ThreadPoolExecutor,彼此線程不可複用。

雖然SerialExecutor的方案很不錯,但是THREAD_POOL_EXECUTOR的coreSize太小了(不超過4),

這導緻AsyncTask不适合執行長時間運作的任務,否則多幾個任務就會堵塞。

是以,如果要改進AsyncTask,首先要改進Executor。

2.2 通用版Executor

實作思路和 SerialExecutor 差不多,加一個隊列, 實作另一層排程控制。

首先,把 Runnable 和 scheduleNext 兩部分都抽象出來:

interface Trigger {
    fun next()
}

class RunnableWrapper constructor(
        private val r: Runnable,
        private val trigger: Trigger) : Runnable {
    override fun run() {
        try {
            r.run()
        } finally {
            trigger.next()
        }
    }
}           

接下來的實作和SerialExecutor類似:

class PipeExecutor @JvmOverloads constructor(
        windowSize: Int,
        private val capacity: Int = -1,
        private val rejectedHandler: RejectedExecutionHandler = defaultHandler) : TaskExecutor {

    private val tasks = PriorityQueue<RunnableWrapper>()
    private val windowSize: Int = if (windowSize > 0) windowSize else 1
    private var count = 0

    private val trigger : Trigger = object : Trigger {
        override fun next() {
            scheduleNext()
        }
    }

    fun execute(r: Runnable, priority: Int) {
        schedule(RunnableWrapper(r, trigger), priority)
    }

    @Synchronized
    internal fun scheduleNext() {
        count--
        if (count < windowSize) {
            startTask(tasks.poll())
        }
    }

    @Synchronized
    internal fun schedule(r: RunnableWrapper, priority: Int) {
        if (capacity > 0 && tasks.size() >= capacity) {
            rejectedHandler.rejectedExecution(r, TaskCenter.poolExecutor)
        }
        if (count < windowSize || priority == Priority.IMMEDIATE) {
            startTask(r)
        } else {
            tasks.offer(r, priority)
        }
    }

    private fun startTask(active: Runnable?) {
        if (active != null) {
            count++
            TaskCenter.poolExecutor.execute(active)
        }
    }
}           

解析一下代碼中的參數和變量:

  • tasks:任務緩沖區
  • count:正在執行的任務的數量
  • windowSize:并發視窗,控制Executor的并發
  • capacity:任務緩沖區容量,小于等于0時為不限容量,超過容量觸發rejectedHandler
  • rejectedHandler:預設為AbortPolicy(抛出異常)
  • priority:排程優先級

當count>=windowSize時,priority高者先被排程;

優先級相同的任務,遵循先進先出(FIFO)的排程規則。

需要注意的是,排程優先級不同于線程優先級,線程優先級更底層一些。

比如AsyncTask的doInBackground()中就調用了:

Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND)

這可以使得背景線程的線程優先級低于UI線程。

以下是PipeExecutor的流程圖:

定義了PipeExecutor了之後,我們可以實作多個執行個體。

例如,可以仿照 RxJava 的 Schedulers,定義适用于“IO密集型”任務和“計算密集型”任務的Executor。

val io = PipeExecutor(20, 512)
val computation = PipeExecutor(Math.min(Math.max(2, cpuCount), 4), 512)           

也可以定義串行排程的Executor:

val single = PipeExecutor(1)           

不過我們不建議定義全局的串行排程Executor,因為會有互相阻塞的風險。

但是可以根據場景定義專屬的串行排程Executor,比如給日志收集建立一個,給資料上報建立一個……

不同執行個體,猶如不同的水管,往同一個池子進水,故而命名為PipeExecutor。

2.3 去重版Executor

我們項目中,頁面更新用的是“釋出訂閱模式”:

資料層有變更,釋出更新消息;

上層收到消息,異步加載資料,重新整理頁面。

然後就碰到一個問題:若短時間内有多次資料更新,就會有多個消息發往上層。

不做特殊處理,就會幾乎同時啟動多個異步任務,浪費計算資源;

多個線程對并發讀取同一資料,多線程問題也随之而來,若處理不好,結果不可預知。

用串行執行器?所有任務串行的話,無法利用任務并發的優勢。

是以經過比較多種方案,最終的結論是:

  • 1、任務分組,不同組并行,同組串行
  • 2、同組的任務,如果有任務在執行,最多隻能有一個在等待,丢棄後面的任務

所謂分組,就是給任務打tag, 比如重新整理A資料的任務叫ATask, 重新整理B任務的叫BTask。

關于第2點,其實有考慮過其他一些方案,比如下面兩個:

  • 取消正在執行的任務
    • 首先不是所有任務都可以中斷的,可以不接收其結果,但是不一定能中斷其執行
    • 即使能取消(比如中斷網絡請求),也不是最佳方案。

      比方說目前線程或許已經快要下載下傳完了,在等一會後面的任務就可以讀緩存去結果了;

任務2取消任務1,任務3取消任務2……等到最後一個任務執行,使用者可能已經不耐煩了。

  • 如果有任務在執行,丢棄後面的任務

    比方說任務1讀取了資料,在計算的時候,資料源變更,然後發送事件,啟動任務2……

直接丢棄後面的任務,最終頁面顯示的是舊的資料。

我們定義了一個LaneExecutor來實作這個方案,示意圖如下:

各組任務就像一個個車道(Lane), 故而命名為LaneExecutor。

洋蔥似地一層包一層,很明顯,也是裝飾者模式。

職責配置設定:

LaneExecutor負責任務去重;

PipeExecutor負責任務并發控制和排程優先級;

ThreadPoolExecutor負責配置設定線程來執行任務。

但後來又遇到另一個問題:

有多個控件要加載同一個URL的資料,然後很自然地我們就以 URL作為tag了,以避免重複下載下傳(做有緩存,第一任務下載下傳完成之後,後面的任務可以讀取緩存)。

但是用LaneExecutor來執行時,隻保留一個任務在等待,然後最終隻有兩個控件能顯示資料。

查到問題後,筆者給LaneExecutor加了一種模式,該模式下,不丢棄任務。

如此,所有任務都會被執行,但是隻有第一個需要下載下傳資料,後面任務讀緩存就好了。

2.4 統一管理Executor

當項目複雜度到了一定程度,如果沒有統一的公共定義,可能會出現各種備援執行個體。

分散的Executor無法較好地控制并發;

如果各自建立的是ThreadPoolExecutor,則還要加上一條:降低線程複用。

故此,可以集中定義Executor,各子產品統一調用。

object TaskCenter {
    internal val poolExecutor: ThreadPoolExecutor = ThreadPoolExecutor(
            0, 256,
            60L, TimeUnit.SECONDS,
            SynchronousQueue(),
            threadFactory)
    
    // 正常的任務排程器,可控制任務并發,支援任務優先級
    val io = PipeExecutor(20, 512)
    val computation = PipeExecutor(Math.min(Math.max(2, cpuCount), 4), 512)

    // 帶去重政策的 Executor,可用于資料重新整理等任務
    val laneIO = LaneExecutor(io, true)
    val laneCP = LaneExecutor(computation, true)

    // 相同的tag的任務會被串行執行,相當于串行的Executor
    // 可用于寫日志,上報統計資訊等任務
    val serial = LaneExecutor(PipeExecutor(Math.min(Math.max(2, cpuCount), 4), 1024))
}           

2.5 Executor的使用

TaskCenter.io.execute{
        // do something
    }

    TaskCenter.laneIO.execute("laneIO", {
        // do something
    }, Priority.HIGH)

    val serialExecutor = PipeExecutor(1)
    serialExecutor.execute{
        // do something
    }

    TaskCenter.serial.execute ("your tag", {
        // do something
    })           
  • PipeExecutor的使用和正常的Executor是一樣的,execute中傳入Runnable即可,

    然後由于Runnable隻有一個方法,也沒有參數,lambda的形式就顯得更加簡潔了。

  • LaneExecutor由于要給任務打tag, 是以要傳入tag參數;

    如果不傳,則沒有分組的效果,也就是回退到PipeExecutor的特性;

  • 兩種Executor都可以傳入優先級。

很多開源項目都設計了API來使用外部的Executor,比如RxJava可以這樣用:

object TaskSchedulers {
    val io: Scheduler by lazy { Schedulers.from(TaskCenter.io) }
    val computation: Scheduler by lazy { Schedulers.from(TaskCenter.computation) }
    val single by lazy { Schedulers.from(PipeExecutor(1)) }
}           
Observable.range(1, 8)
       .subscribeOn(TaskSchedulers.computation)
       .subscribe { Log.d(tag, "number:$it") }           

這樣有一個好處,各種任務都在一個線程池上執行任務,可複用彼此建立的線程。

三、流程控制

3.1 AsyncTask的執行流

上一章我們分析了任務排程,構造了一系列Executor,增強任務處理方面的通用性。

不過任務排程隻是AsyncTask的一部分,AsyncTask的精髓其實在于流程控制:在任務執行的不同階段,回調相應的方法。

下面是AsyncTask的流程圖:

通過使用FutureTask和Callable,使得AsyncTask具備對任務執行更強的控制力,比如cancel任務。

有的文章說cancel()不一定的立即中斷任務,但其實Futuret.cancel()确實已經是最好的方案了,

如果強行調用Thread.stop(),則猶如關掉空中飛機的引擎,後果不堪設想。

通過與Handler的配合,AsyncTask可以在任務執行過程中和執行結束後釋出資料到UI線程,

這使得AsyncTask尤其适用于“資料加載+界面重新整理”的場景。

而這類場景在APP開發中較為常見,這也是AsyncTask一度被廣泛使用的原因之一。

3.2 生命周期

AsyncTask其中一個廣為诟病的問題就是記憶體洩漏:

若AsyncTask持有Activity引用,且生命周期比Activity的長,則Activity無法被及時回收。

這個問題其實不是AsyncTask獨有,Handler,RxJava等都存在類似問題。

解決方案有多種,靜态類、弱引用、Activity銷毀時取消等。

RxJava提供了dispose方法來取消任務,同時也有很多內建生命周期的開源方案,比如

RxLifecycle

AutoDispose

等。

AsyncTask也提供了cancel方法,但是比較命苦,吐槽者衆,助力者寡。

其實要實作自動cancel不難,建立和Activity/Fragment的關系即可,可通過觀察者模式來實作。

UITask是參考AsyncTask寫的一個類, 使用了上一章介紹的Executor。

結構上,UITask為觀察者,Activity/Fragment為被觀察者,LifecycleManager為 UITask 和 Activity/Fragment 建構關系的橋梁。

實作上需要兩個資料結構:一個SparseArray,一個List。

SparseArray的key為被觀察者的identityHashCode, value為觀察者清單。

UITask提供了host()方法,方法中擷取宿主(也就是Activity/Fragment)的identityHashCode,

通過register()方法,添加 “Activity->UITask” 到SparseArray中。

abstract class UITask<Params, Progress, Result> : LifeListener {
    fun host(host: Any): UITask<Params, Progress, Result> {
        LifecycleManager.register(System.identityHashCode(host), this)
        return this
    }

    override fun onEvent(event: Int) {
        if (event == LifeEvent.DESTROY) {
            cancel(true)
        } else if (event == LifeEvent.SHOW) {
            changePriority(+1)
        } else if (event == LifeEvent.HIDE) {
            changePriority(-1)
        }
    }
}           
override fun onCreate(savedInstanceState: Bundle?) {
    TestTask().host(this).execute("hello")
}           

需要在BaseActivity中通知事件:

abstract class BaseActivity : Activity() {
    override fun onDestroy() {
        super.onDestroy()
        LifecycleManager.notify(this, LifeEvent.DESTROY)
    }

    override fun onPause() {
        super.onPause()
        LifecycleManager.notify(this, LifeEvent.HIDE)
    }

    override fun onResume() {
        super.onResume()
        LifecycleManager.notify(this, LifeEvent.SHOW)
    }
}           

調用notify()方法時,會根據Activity索引到對應觀察者清單,然後周遊清單,回調觀察者onEvent()方法。

其中,當通知的事件為DESTROY時,UITask執行cancel()方法,進而取消任務。

3.3 動态調整優先級

上一節,我們看到UITask除了關注DESTROY事件,還關注 Activity/Fragment 的HIDE和SHOW,

并根據可見狀态調整優先級。

調整優先級有什麼用呢? 下面先看兩張圖感受一下。

為了凸顯效果,我們把加載任務的并發量控制為1(串行)。

第一張是不會自動調整優先級的,完全的先進先出:

可以看到,切換到第二個頁面,由于上一頁的任務還沒執行完,

是以要一直等到上一頁的任務都完成了才輪到第二個頁面加載。

很顯然這樣體驗不太好。

接下來我們看下動态調整優先級是什麼效果:

切換到第二個頁面之後,第一個頁面的任務的“排程優先級”被降低了,是以會優先加載第二個頁面的圖檔;

再次切換回第一個頁面,第二個頁面的優先級被降低,第一個頁面的優先級恢複,是以優先加載第一個頁面的圖檔。

那可否進入第二個頁面的時暫停第一個頁面的任務?

暫停的方案不太友好,比方說使用者在第二個頁面停留很久,第二個頁面的任務都完成了,然後切換回第一個頁面,發現隻有部分圖檔(其他被暫停了)。

而如果隻是調整優先級,則第二個頁面的任務都執行完之後,會接着執行第一個頁面的任務,傳回第一個頁面時就能夠看到所有圖檔了。

這就好比趕車,讓其他人給插個隊,沒有問題,但是不能不給别人排隊了吧。

3.4 鍊式調用

UITask的用法和AsyncTask大同小異,回調方法和參數泛型都是一樣的,是以就不多作介紹了。

如今很多開源庫都提供了鍊式API,使用起來确實靈活友善,視覺上也比較連貫。

喜歡冰糖葫蘆一樣的鍊式調用?

我們提供了一個ChainTask類,拓展了UITask,提供鍊式調用的API。

override fun onCreate(savedInstanceState: Bundle?) {
    val task = ChainTask<Double, Int, String>()
    task.tag("ChainTest")
        .preExecute { result_tv.text = "running" }
        .background { params ->
            for (i in 0..100 step 2) {
                // do something
                task.publishProgress(i)
            }
            "result is:" + (params[0] * 100)
        }
        .progressUpdate { values ->
            val progress = values[0]
            progress_bar.progress = progress
            progress_tv.text = "$progress%"
        }
        .postExecute { result_tv.text = it }
        .cancel { showTips("ChainTask cancel") }
        .priority(Priority.IMMEDIATE)
        .host(this)
        .execute(3.14)
}           

四、總結

最後,可能會這樣的疑問:

既然已經有 RxJava 這樣好用的開源庫來實作異步了, 為什麼還要寫這個項目呢?

首先,RxJava 不僅僅是異步而已:“ReactiveX是一個通過使用可觀察序列來編寫異步和基于事件的程式的庫。”

“可觀察序列 - 事件 - 異步”加起來才使得 RxJava 如此富有魅力。

有所得,必有所付出,為了實作這些豐富的特性,代碼量也是比較可觀的(目前版本jar包約2.2M)。

AsyncTask則比較簡單,除去注釋隻有三百多行代碼;

功能也比較純粹:執行異步任務,在任務執行的不同階段,回調相應的方法。

Task參考了AsyncTask,功能類似,隻是做了一些完善;

jar包大小45K,也算是比較輕量的。

這個年頭,apk動辄幾十M甚至上百M,2.2M的庫并非不可接受。

但是也有一些場景,比方說給第三方寫SDK的時候,對包大小和依賴比較敏感,而且也不需要這麼大而全的特性,這時一些輕量級的方案就比較合适了。

而且,除了包大小之外,Task所實作的功能和RxJava也不盡相同。

如果說AsyncTask是自行車,RxJava是汽車,則Task是機車。

各有各的用途,各有各的靈魂。

五、下載下傳

項目已經上傳到maven和github, 歡迎大家下載下傳 & star

dependencies {
    implementation 'com.horizon.task:task:1.0.4'
}           

項目位址:

https://github.com/No89757/Task