天天看點

即學即用Kotlin - 協程前言目錄一、基礎二、流4. Flow的特點5. 操作符對比三、通道四、多協程資料處理總結

前言

上周在内部分享會上大佬同僚分享了關于 Kotlin 協程的知識,之前有看過 Kotlin 協程的一些知識,以為自己還挺了解協程的,結果…

即學即用Kotlin - 協程前言目錄一、基礎二、流4. Flow的特點5. 操作符對比三、通道四、多協程資料處理總結

在這一次分享中,發現 Flow 和 Channel 這一塊兒知識是自己不怎麼了解的,本文也将着重和大家聊一聊這一塊兒的内容,協程部分将分為三篇,本文是第一篇:

《即學即用Kotlin - 協程》 《抽絲剝繭Kotlin - 協程基礎篇》 《抽絲剝繭Kotlin - 協程Flow篇》

目錄

即學即用Kotlin - 協程前言目錄一、基礎二、流4. Flow的特點5. 操作符對比三、通道四、多協程資料處理總結

一、基礎

1. 概念

相信大家或多或少的都了解過,協程是什麼,官網上這麼說:

Essentially, coroutines are light-weight threads.

協程是輕量級的線程,為什麼是輕量的?可以先告訴大家結論,因為它基于線程池API,是以在處理并發任務這件事上它真的遊刃有餘。

有可能有的同學問了,既然它基于線程池,那我直接使用線程池或者使用 Android 中其他的異步任務解決方式,比如 Handler、RxJava等,不更好嗎?

協程可以使用阻塞的方式寫出非阻塞式的代碼,解決并發中常見的回調地獄,這是其最大的優點,後面介紹。

2. 使用

GlobalScope.launch(Dispatchers.Main) {
    val res = getResult(2)
    mNumTv.text = res.toString()
}
           

啟動協程的代碼就是如此的簡單。上面的代碼中可以分為三部分,分别是 GlobalScope、Dispatcher 和 launch,他們分别對應着協程的作用域、排程器和協程建構器,我們挨個兒介紹。

協程作用域

協程的作用域有三種,他們分别是:

  • runBlocking:頂層函數,它和 coroutineScope 不一樣,它會阻塞目前線程來等待,是以這個方法在業務中并不适用 。
  • GlobalScope:全局協程作用域,可以在整個應用的聲明周期中操作,且不能取消,是以仍不适用于業務開發。
  • 自定義作用域:自定義協程的作用域,不會造成記憶體洩漏。

顯然,我們不能在 Activity 中調用 GlobalScope,這樣可能會造成記憶體洩漏,看一下如何自定義作用域,具體的步驟我在注釋中已給出:

class MainActivity : AppCompatActivity() {
    // 1\. 建立一個 MainScope
    val scope = MainScope()

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        // 2\. 啟動協程
        scope.launch(Dispatchers.Unconfined) {
            val one = getResult(20)
            val two = getResult(40)
            mNumTv.text = (one + two).toString()
        }
    }

    // 3\. 銷毀的時候釋放
    override fun onDestroy() {
        super.onDestroy()

        scope.cancel()
    }

    private suspend fun getResult(num: Int): Int {
        delay(5000)
        return num * num
    }
}
           

排程器

排程器的作用是将協程限制在特定的線程執行。主要的排程器類型有:

  • Dispatchers.Main:指定執行的線程是主線程,如上面的代碼。
  • Dispatchers.IO:指定執行的線程是 IO 線程。
  • Dispatchers.Default:預設的排程器,适合執行 CPU 密集性的任務。
  • Dispatchers.Unconfined:非限制的排程器,指定的線程可能會随着挂起的函數的發生變化。

什麼是挂起?我們就以九心吃飯為例,如果到公司對面的廣場吃飯,九心得經過:

  • 走到廣場 10min > 點餐 5min > 等待上餐 10min > 就餐 30min > 回來 10 min

如果九心點廣場的外賣呢?

  • 九心:下單 5min > 等待(等待的時候可以工作) 30min > 就餐 30min
  • 外賣騎手:到店 > 取餐 > 送外賣

從九心吃飯的例子可以看出,如果點了外賣,九心花費的時間較少了,可以空閑出更多的時間做自己的事。再仔細分析一下,其實從公司到廣場和等待取餐這個過程并沒有省去,隻是九心把這個過程交給了外賣員。

協程的原理跟九心點外賣的原理是一緻的,耗時阻塞的操作并沒有減少,隻是交給了其他線程:

即學即用Kotlin - 協程前言目錄一、基礎二、流4. Flow的特點5. 操作符對比三、通道四、多協程資料處理總結

launch

launch 的作用從它的名稱就可以看的出來,啟動一個新的協程,它傳回的是一個 Job對象,我們可以調用 Job#cancel() 取消這個協程。

除了 launch,還有一個方法跟它很像,就是 async,它的作用是建立一個協程,之後傳回一個 Deferred對象,我們可以調用 Deferred#await()去擷取傳回的值,有點類似于 Java 中的 Future,稍微改一下上面的代碼:

class MainActivity : AppCompatActivity() {
    // 1\. 建立一個 MainScope
    val scope = MainScope()

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        // 2\. 啟動協程
        scope.launch(Dispatchers.Unconfined) {
            val one = async { getResult(20) }
            val two = async { getResult(40) }
            mNumTv.text = (one.await() + two.await()).toString()
        }
    }

    // 3\. 銷毀的時候釋放
    override fun onDestroy() {
        super.onDestroy()

        scope.cancel()
    }

    private suspend fun getResult(num: Int): Int {
        delay(5000)
        return num * num
    }
}
           

與修改前的代碼相比,async 能夠并發執行任務,執行任務的時間也是以縮短了一半。

除了上述的并發執行任務,async 還可以對它的 start 入參設定成懶加載

val one = async(start = CoroutineStart.LAZY) { getResult(20) }
           

這樣系統就可以在調用它的時候再為它配置設定資源了。

suspend

suspend 是修飾函數的關鍵字,意思是目前的函數是可以挂起的,但是它僅僅起着提醒的作用,比如,當我們的函數中沒有需要挂起的操作的時候,編譯器回給我們提醒 Redudant suspend modifier,意思是目前的 suspend 是沒有必要的,可以把它删除。

那我們什麼時候需要使用挂起函數呢?常見的場景有:

  • 耗時操作:使用 withContext 切換到指定的 IO 線程去進行網絡或者資料庫請求。
  • 等待操作:使用delay方法去等待某個事件。

withContext 的代碼:

private suspend fun getResult(num: Int): Int {
    return withContext(Dispatchers.IO) {
        num * num
    }
}
           

delay 的代碼:

private suspend fun getResult(num: Int): Int {
    delay(5000)
    return num * num
}
           

結合 Android Jetpack

在介紹自定義協程作用域的時候,我們需要主動在 Activity 或者 Fragment 中的 onDestroy 方法中調用 job.cancel(),忘記處理可能是程式員經常會犯的錯誤,如何避免呢?

Google 總是能夠解決程式員的痛點,在 Android Jetpack 中的 lifecycle、LiveData 和 ViewModel 已經內建了快速使用協程的方法,如果我們已經引入了 Android Jetpack,可以引入依賴:

dependencies {
        def lifecycle_version = "2.2.0"

        // ViewModel
        implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:$lifecycle_version"
        // LiveData
        implementation "androidx.lifecycle:lifecycle-livedata-ktx:$lifecycle_version"
        // Lifecycles only (without ViewModel or LiveData)
        implementation "androidx.lifecycle:lifecycle-runtime-ktx:$lifecycle_version"
    }
           

使用可以結合具體的場景,比如結合 Lifecycle,需要使用 lifecycleScope 協程作用域:

lifecycleScope.launch {
    // 代表目前生命周期處于 Resumed 的時候才會執行(選擇性使用)
    whenResumed { 
        // ... 具體的協程代碼
    }
}
           

即使你不使用 Android Jetpack 元件,由于 Lifecycles 在很早之前就内置在 Android 系統的代碼中,是以你仍然可以僅僅引入 Lifecycle 的協程擴充庫,因為它會幫助你很好的處理 Activity 或者 Fragment 的生命周期。

引入 Android Jetpack 協程擴充庫官方文檔:點我打開

二、流

長期以來,在 Android 中響應式程式設計的首選方案是 RxJava,我們今天就來了解一下 Kotlin中的響應式程式設計 Flow。如果你能熟練使用 RxJava,那你肯定能快速上手 Flow。

曾經我在**《即學即用Android Jetpack - ViewModel & LiveData》**一文中說過,LiveData 的使用類似于 RxJava,現在我收回這句話,事實上,LiveData 更加簡單和純粹,它建立單一的生産消費模型,Flow 才是類似于 RxJava 的存在。

1. 基礎

先上一段代碼:

lifecycleScope.launch {
    // 建立一個協程 Flow<T>
    createFlow()
        .collect {num->
            // 具體的消費處理
            // ...
        }
    }
}
           

我在 createFlow 這個方法中,傳回了 Flow 的對象,是以我們可以這樣對比。

即學即用Kotlin - 協程前言目錄一、基礎二、流4. Flow的特點5. 操作符對比三、通道四、多協程資料處理總結

建立 Flow 對象

我們暫不考慮 RxJava中的背壓和非背壓,直接先将 Flow 對标 RxJava 中的 Observable。

和 RxJava 一樣,在建立 Flow 對象的時候我們也需要調用 emit 方法發射資料:

fun createFlow(): Flow<Int> = flow {
    for (i in 1..10)
        emit(i)
}
           

一直調用 emit 可能不便捷,因為 RxJava 提供了 Observable.just() 這類的操作符,顯然,Flow 也為我們提供了快速建立操作:

  • flowof(vararg elements: T):幫助可變數組生成 Flow 執行個體
  • 擴充函數 .asFlow():面向數組、清單等集合

比如可以使用 (1…10).asFlow() 代替上述的 Flow 對象的建立。

消費資料

collect 方法和 RxJava 中的 subscribe 方法一樣,都是用來消費資料的。

除了簡單的用法外,這裡有兩個問題得注意一下:

  • collect 函數是一個 suspend 方法,是以它必須發生在協程或者帶有 suspend 的方法裡面,這也是我為什麼在一開始的時候啟動了 lifecycleScope.launch。
  • lifecycleScope 是我使用的 Lifecycle 的協程擴充庫當中的,你可以替換成自定義的協程作用域。

2. 線程切換

我們學習 RxJava 的時候,大佬們都會說,RxJava 牛逼,牛逼在哪兒呢?

切換線程,同樣的,Flow 的協程切換也很牛逼。Flow 是這麼切換協程的:

lifecycleScope.launch {
    // 建立一個協程 Flow<T>
    createFlow()
        // 将資料發射的操作放到 IO 線程中的協程
        .flowOn(Dispatchers.IO)
        .collect { num ->
            // 具體的消費處理
            // ...
        }
    }
}
           

和 RxJava 對比:

即學即用Kotlin - 協程前言目錄一、基礎二、流4. Flow的特點5. 操作符對比三、通道四、多協程資料處理總結

改變資料發射的線程

flowOn 使用的參數是協程對應的排程器,它實質改變的是協程對應的線程。

改變消費資料的線程

我在上面的表格中并沒有寫到在 Flow 中如何改變消費線程,并不意味着 Flow 不可以指定消費線程?

Flow 的消費線程在我們啟動協程指定排程器的時候就确認好了,對應着啟動協程的排程器。比如在上面的代碼中 lifecycleScope 啟動的排程器是 Dispatchers.Main,那麼 collect 方法就消費在主線程。

3. 異常和完成

異常捕獲

即學即用Kotlin - 協程前言目錄一、基礎二、流4. Flow的特點5. 操作符對比三、通道四、多協程資料處理總結

Flow 中的 catch 對應着 RxJava 中的 onError,catch 操作:

lifecycleScope.launch {
    flow {
        //...
    }.catch {e->

    }.collect(

    )
}
           

除此以外,你可以使用聲明式捕獲 try { } catch (e: Throwable) { } 去捕獲異常,不過 catch 本質上是一個擴充方法,它是對聲明式捕獲的封裝。

完成

即學即用Kotlin - 協程前言目錄一、基礎二、流4. Flow的特點5. 操作符對比三、通道四、多協程資料處理總結

Flow 中的 onCompletion 對應這 RxJava 中的 onComplete 回調,onCompletion操作:

lifecycleScope.launch {
    createFlow()
        .onCompletion {
            // 處理完成操作
        }
        .collect {

        }
}
           

除此以外,我們還可以通過捕獲式 try {} finally {} 去擷取完成情況。

4. Flow的特點

我們在對 Flow 已經有了一些基礎的認知了,再來聊一聊 Flow 的特點,Flow 具有以下特點:

  • 冷流
  • 有序
  • 協作取消

如果你對 Kotlin 中的 Sequence 有一些認識,那麼你應該可以輕松的 Get 到前兩個點。

冷流

有點類似于懶加載,當我們觸發 collect 方法的時候,資料才開始發射。

lifecycleScope.launch {
    val flow = (1..10).asFlow().flowOn(Dispatchers.Main)

    flow.collect { num ->
            // 具體的消費處理
            // ...
        }
    }
}
           

也就是說,在第2行的時候,雖然流建立好了,但是資料一直到第四行發生 collect 才開始發射。

有序

看代碼比較容易了解:

lifecycleScope.launch {
    flow {
        for(i in 1..3) {
            Log.e("Flow","$i emit")
            emit(i)
        }
    }.filter {
        Log.e("Flow","$it filter")
        it % 2 != 0
    }.map {
        Log.e("Flow","$it map")
        "${it * it} money"
    }.collect {
        Log.e("Flow","i get $it")
    }
}
           

得到的日志:

E/Flow: 1 emit
E/Flow: 1 filter
E/Flow: 1 map
E/Flow: i get 1 money
E/Flow: 2 emit
E/Flow: 2 filter
E/Flow: 3 emit
E/Flow: 3 filter
E/Flow: 3 map
E/Flow: i get 9 money
           

從日志中,我們很容易得出這樣的結論,每個資料都是經過 emit、filter 、map和 collect 這一套完整的處理流程後,下個資料才會開始處理,而不是所有的資料都先統一 emit,完了再統一 filter,接着 map,最後再 collect。

協作取消

Flow 采用和協程一樣的協作取消,也就是說,Flow 的 collect 隻能在可取消的挂起函數中挂起的時候取消,否則不能取消。

如果我們想取消 Flow 得借助 withTimeoutOrNull 之類的頂層函數,不妨猜一下,下面的代碼最終會列印出什麼?

lifecycleScope.launch {
    val f = flow {
        for (i in 1..3) {
            delay(500)
            Log.e(TAG, "emit $i")
            emit(i)
        }
    }
    withTimeoutOrNull(1600) {
        f.collect {
            delay(500)
            Log.e(TAG, "consume $it")
        }
    }
    Log.e(TAG, "cancel")
}
           

5. 操作符對比

限于篇幅,我僅介紹一下 Flow 中操作符的作用,就不一一介紹每個操作符具體怎麼使用了。

普通操作符:

即學即用Kotlin - 協程前言目錄一、基礎二、流4. Flow的特點5. 操作符對比三、通道四、多協程資料處理總結

特殊的操作符

總會有一些特殊的情況,比如我隻需要取前幾個,我隻要最新的資料等,不過在這些情況下,資料的發射就是并發執行的。

即學即用Kotlin - 協程前言目錄一、基礎二、流4. Flow的特點5. 操作符對比三、通道四、多協程資料處理總結

組合操作符

即學即用Kotlin - 協程前言目錄一、基礎二、流4. Flow的特點5. 操作符對比三、通道四、多協程資料處理總結

展平流操作符

展平流有點類似于 RxJava 中的 flatmap,将你發射出去的資料源轉變為另一種資料源。

即學即用Kotlin - 協程前言目錄一、基礎二、流4. Flow的特點5. 操作符對比三、通道四、多協程資料處理總結

末端操作符

顧名思義,就是幫你做 collect 處理,collect 是最基礎的末端操作符。

即學即用Kotlin - 協程前言目錄一、基礎二、流4. Flow的特點5. 操作符對比三、通道四、多協程資料處理總結

其他還有一些操作符,我這裡就不一一介紹了,感興趣可以檢視 API。

三、通道

Channel是一個面向多協程之間資料傳輸的 BlockQueue。它的使用方式超級簡單:

lifecycleScope.launch {
    // 1. 生成一個 Channel
    val channel = Channel<Int>()

    // 2. Channel 發送資料
    launch {
        for(i in 1..5){
            delay(200)
            channel.send(i * i)
        }
        channel.close()
    }

    // 3. Channel 接收資料
    launch {
        for( y in channel)
            Log.e(TAG, "get $y")
    }
}
           

實作協程之間的資料傳輸需要三步:

1.建立 Channel

建立的 Channel的方式可以分為兩種:

  • 直接建立對象:方式跟上述代碼一緻。
  • 擴充函數 produce

如果使用了擴充函數,代碼就變成了:

lifecycleScope.launch {
    // 1\. 生成一個 Channel
    val channel = produce<Int> {
        for(i in 1..5){
            delay(200)
            send(i * i)
        }
        close()
    }

    // 2\. 接收資料
    // ... 省略 跟之前代碼一緻
}
           

直接将第一步和第二步合并了。

2. 發送資料

發送資料使用的 Channel#send() 方法,當我們資料發送完畢的時候,可以使用 Channel#close() 來表明通道已經結束資料的發送。

3. 接收資料

正常情況下,我們僅需要調用 Channel#receive() 擷取資料,但是該方法隻能擷取一次傳遞的資料,如果我們僅需擷取指定次數的資料,可以這麼操作:

repeat(4){
    Log.e(TAG, "get ${channel.receive()}")
}
           

但如果發送的資料不可以預估呢?這個時候我們就需要疊代 Channel 了

for( y in channel)
    Log.e(TAG, "get $y")
           

四、多協程資料處理

多協程處理并發資料的時候,原子性同樣也得不到保證,協程中出了一種叫 Mutex 的鎖,差別是它的 lock 操作是挂起的,非阻塞的,感興趣的同學可以自行檢視。

總結

個人感覺協層的主要作用是簡化代碼的邏輯,減少了代碼的回調地獄,結合 Kotlin,既可以寫出優雅的代碼,還能降低我們犯錯的機率。至于提升多協程開發的性能?

即學即用Kotlin - 協程前言目錄一、基礎二、流4. Flow的特點5. 操作符對比三、通道四、多協程資料處理總結

如果覺得本文不錯,「三連」是對我最大的鼓勵。

分享一份大佬收錄整理的Kotlin知識點,如果你有需要的話,可以私信我【提升】擷取

你也可以拿去查漏補缺,提升自身的競争力

即學即用Kotlin - 協程前言目錄一、基礎二、流4. Flow的特點5. 操作符對比三、通道四、多協程資料處理總結

【加入】資源豐富,學習氛圍濃厚的圈一起學習交流吧!