天天看點

Kotlin協程使用前言join()函數結構化并發(Structured concurrency)自定義作用域(Scope builder)取消協程

前言

現在還在學習階段,無法寫的詳細,玩寫一些自己需要記的東西。

join()函數

fun main() = runBlocking {
    val job = GlobalScope.launch {
        delay(1000)
        println("World!")
    }
    println("Hello,")
    delay(2000)
}
           

runBlocking { }會啟動一個協程,暫且稱之為runBlocking協程,GlobalScope.launch{ },暫且稱之為GlobalScope協程,runBlocking協程會阻塞調用它的線程,如main線程調用了它,則它就會阻塞main線程,直到runBlocking協程執行結束,main線程才會解除阻塞。而GlobalScope協程不會阻塞線程,是以在執行runBlocking時,執行遲延2000,就是為了讓main線程阻塞,如果不阻塞的話,main線程一瞬間就執行結束了,而GlobalScope的協程的作用域範圍是整個應用的生命周期,一但應用的生命結束了,它就會被取消,而main線程就是應用的生命。main執行結束就意味着GlobalScope協程全部要取消掉。GlobalScope協程是異步的,是以main線程可以比它先結束,是以了為預防main線程比它先結束,就在runBlocking這個阻塞線程上延遲了2秒以阻塞main線程兩秒,這樣異步的GlobalScope協程就有足夠的時間執行完成。

但是這樣做是不完美的,因為真實開發中,我們無法确定GlobalScope協程需要執行多少時間,然後我們就使用相同的時間來阻塞主線程,這個執行時間是無法提前知道的。是以可以使用協程的join()函數,代表等待此協程執行結束,如下:

fun main() = runBlocking {
    val job = GlobalScope.launch {
        delay(1000)
        println("World!")
    }
    println("Hello,")
    job.join()
}
           

線上程上也有一樣的函數join(),線上程上就代表等待該線程執行結束,示例如下:

fun main() {
    val threadA = thread {
        println("threadA start")

        val threadB = thread {
            println("threadB start")
            Thread.sleep(2000)
            println("threadB end")
        }
        
        println("threadA end")
    }
}
           

輸出結果如下:

threadA start
threadA end
threadB start
threadB end
           

這裡有兩個線程,分别為threadA、threadB,從輸出結果看線程A先執行,且先結束,然後線程B才開始執行,如果有需求要求線程A在結束前,必須等到線程B執行結束之後線程A才能結束,這如何實作呢?你可以線上程A上sleep(3000),這是OK,但這是基于你知道線程B的執行時間是多少了,如果你不知道線程B的執行時間那又怎麼辦呢?使用join()函數,代碼如下:

fun main() {
    val threadA = thread {
        println("threadA start")

        val threadB = thread {
            println("threadB start")
            Thread.sleep(2000)
            println("threadB end")
        }
        
        threadB.join() // 等待線程B執行結束。這行代碼會一直阻塞,直到線程B執行結束。
        println("threadA end")
    }
}
           

輸出結果如下:

threadA start
threadB start
threadB end
threadA end
           

OK,完美的實作了線程B先結束,然後線程A才能結束。這裡有一點可以注意一下,還一個main線程,main線程一瞬間就執行結束了,而子線程還能執行,這說明子線程它的作用域為程序,隻要程序不結束,子線程就能運作,main線程先結束也不會影響子線程的執行。這一點與協程不同。

結構化并發(Structured concurrency)

fun main() = runBlocking {
    val job = GlobalScope.launch {
        delay(1000)
        println("World!")
    }
    println("Hello,")
    job.join()
}
           

GlobalScope.launch { }啟動的是一個頂級協程,如果每次都用這種方式啟動頂級協程,它還是需要消耗一些資源的,而且為了這個協程能執行完成,你還需要記得每次都保持它的引用并調用join()函數,這有點累啊。是以,我們可以啟動子協程,runBlocing{ }也是啟動一個頂級協程,我們可以使用runBlocking的作用域來開啟子協程(不是頂級協程),這樣的子協程的作用域就和runBlocking使用的是同一個了,這樣的話子協程沒執行結束之前,就相當于runBloking沒執行結束,就會一直阻塞main線程,這樣的話,我們就不需要寫join()函數的調用了,而且子協程比頂級協程更便宜!示例如下:

fun main() = runBlocking { // this: CoroutineScope,runBlocking使用的作用域對象
    this.launch { // 使用runBlocking的作用域對象啟動一個新協程
        delay(1000L)
        println("World!")
    }
    println("Hello,")
}
           

這樣,我們不需要調用join()函數,也能確定子協程中的輸出語句能執行,因為子協程和runBlocking協程的作用域對象是同一個。

回到标題:結構化并發,應該說的就是在一個協程裡面開啟多個子協程。

自定義作用域(Scope builder)

coroutineScope { }也可以啟動一個協程,但是它不能直接線上程上調用,隻能在協程上調用,除了這個限制外,它跟runBlocking很像,runBlocking會阻塞調用它的線程,而coroutineScope會阻塞調用它的協程,或許它不是阻塞,隻是挂起,但效果就像阻塞一樣,示例如下:

fun main() = runBlocking<Unit> {
    println("blocking協程執行開始")

    coroutineScope {
        println("自創作用域的協程執行開始")
        delay(1000L)
        println("自創作用域的協程執行結束")
    }

    println("blocking協程執行結束")
}
           

運作結果如下:

blocking協程執行開始
自創作用域的協程執行開始
自創作用域的協程執行結束
blocking協程執行結束
           

注意,這個阻塞隻會阻塞調用它的那個協程,而那個協程啟動的子協程并不會被阻塞,示例如下:

fun main() = runBlocking<Unit> { // this: CoroutineScope,runBlocking使用的作用域對象
    println("blocking協程執行開始")

    launch {
        println("blocking的子協程執行開始")
        delay(2000L)
        println("blocking的子協程執行結束")
    }

    coroutineScope {
        launch {
            println("自創作用域的子協程執行開始")
            delay(3000L)
            println("自創作用域的子協程執行結束")
        }

        println("自創作用域的協程執行開始")
        delay(1000L)
        println("自創作用域的協程執行結束")
    }

    println("blocking協程執行結束")
}
           

運作結果如下:

blocking協程執行開始
自創作用域的協程執行開始
blocking的子協程執行開始
自創作用域的子協程執行開始
自創作用域的協程執行結束
blocking的子協程執行結束
自創作用域的子協程執行結束
blocking協程執行結束
           

取消協程

取消線程

我們先試寫一下取消線程,友善了解,示例如下:

fun main() {
    val thread = thread {
        val format = SimpleDateFormat("HH:mm:ss")
        while (true) {
            Thread.sleep(5000)
            val current = System.currentTimeMillis()
            println("目前時間:${format.format(current)}")
        }
    }

    // 監聽程序結束
    Runtime.getRuntime().addShutdownHook(thread(false) {
        println("程序結束了!")
    })
}
           

執行結果如下:

目前時間:16:19:07
目前時間:16:19:12
目前時間:16:19:17
目前時間:16:19:22
           

示例代碼的功能為:每5秒列印一下目前時間,并且我們添加了一個監聽器,可以監聽到程序什麼時候結束,當然了,上面代碼是看不到程序結束的,因為子線程中的代碼是一個死循環。下面我們來結束一下這個死循環的線程:

fun main() {
    val thread = thread {
        val format = SimpleDateFormat("HH:mm:ss")
        while (true) {
            Thread.sleep(5000)
            val current = System.currentTimeMillis()
            println("目前時間:${format.format(current)}")
        }
    }

    Thread.sleep(16000)
    thread.interrupt()

    // 監聽程序結束
    Runtime.getRuntime().addShutdownHook(thread(false) {
        println("程序結束了!")
    })
}
           

我們在主線程睡了16秒鐘,然後中斷子線程,是以子線程可以列印3次目前時間,因為5秒鐘列印一次,列印3次隻需要15秒,運作結果如下:

目前時間:16:27:49
目前時間:16:27:54
目前時間:16:27:59
Exception in thread "Thread-0" java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at TestKt$main$thread$1.invoke(Test.kt:11)
	at TestKt$main$thread$1.invoke(Test.kt)
	at kotlin.concurrent.ThreadsKt$thread$thread$1.run(Thread.kt:30)
程序結束了!
           

可以看到,代碼抛出了一個異常,而且也監聽到程序結束了,異常是在子線程的sleep()函數抛出來的,那我們try一下,如下:

fun main() {
    val thread = thread {
        val format = SimpleDateFormat("HH:mm:ss")
        while (true) {
            try {
                Thread.sleep(5000)
            } catch (e: InterruptedException) {
                println("哎呀媽呀,發生異常了!")
            }
            val current = System.currentTimeMillis()
            println("目前時間:${format.format(current)}")
        }
    }

    Thread.sleep(16000)
    thread.interrupt()

    // 監聽程序結束
    Runtime.getRuntime().addShutdownHook(thread(false) {
        println("程序結束了!")
    })
}
           

執行效果如下:

目前時間:16:32:10
目前時間:16:32:16
目前時間:16:32:21
哎呀媽呀,發生異常了!
目前時間:16:32:21
目前時間:16:32:26
目前時間:16:32:32
目前時間:16:32:37
目前時間:16:32:42
目前時間:16:32:47
           

可以看到,發生中斷異常之後,因為我們catch了這個異常,是以程序沒有被結束,子線程繼續執行,是以啊,以前我們隻知道Thread.sleep()就try catch就完事了,其實不能簡單的隻try catch,當了生中斷異常的時候,說明使用者希望線程結束了,你就不要執行了嘛,修正後如下:

try {
    Thread.sleep(5000)
} catch (e: InterruptedException) {
    println("哎呀媽呀,發生異常了!")
    break
}
           

這裡我隻粘貼了核心代碼,加入了break語句,這樣一但發生中斷異常,我們就退出while循環,線程就能結束了,執行效果如下:

目前時間:16:36:21
目前時間:16:36:26
目前時間:16:36:31
哎呀媽呀,發生異常了!
程序結束了!
           

可以發現,thread.interrupt()隻會讓那些可中斷的函數抛出中斷異常,也僅僅是抛出一個異常而已,并不會正真的結束線程,是以我在想,那假如線程中沒有中斷函數呢?示例如下:

fun main() {
    val thread = thread {
        var start = System.currentTimeMillis()
        val format = SimpleDateFormat("HH:mm:ss")
        while (true) {
            if (System.currentTimeMillis() - start >= 5000) {
                start = System.currentTimeMillis()
                println("目前時間:${format.format(start)}")
            }
        }
    }

    Thread.sleep(16000)
    thread.interrupt()

    // 監聽程序結束
    Runtime.getRuntime().addShutdownHook(thread(false) {
        println("程序結束了!")
    })
}
           

實作的功能還是和之前一樣的,也是每5秒列印一行資訊。執行結果如下:

目前時間:16:41:08
目前時間:16:41:13
目前時間:16:41:18
目前時間:16:41:23
目前時間:16:41:28
           

這裡隻是複制部分輸出,子線程永遠不會結束,是以我們不要想當然的認為調用了thread.interrupt()之後線程就結束了,那這種情況下如何讓線程結束呢?想必大家都能很輕松實作,因為我們在最開始學習線程時就學了,應該使用變量控制線程的結束,示例如下:

fun main() {
    var run = true

    thread {
        var start = System.currentTimeMillis()
        val format = SimpleDateFormat("HH:mm:ss")
        while (run) {
            if (System.currentTimeMillis() - start >= 5000) {
                start = System.currentTimeMillis()
                println("目前時間:${format.format(start)}")
            }
        }
    }

    Thread.sleep(16000)
    run = false
    println("子線程該結束了")

    // 監聽程序結束
    Runtime.getRuntime().addShutdownHook(thread(false) {
        println("程序結束了!")
    })
}
           

執行結果如下:

目前時間:16:52:44
目前時間:16:52:49
目前時間:16:52:54
子線程該結束了
程序結束了!
           

你以為這樣的代碼就萬事大吉了嗎?我們再看下面的示例:

fun main() {
    var run = true
    var count = 0

    thread {
        var start = System.currentTimeMillis()
        val format = SimpleDateFormat("HH:mm:ss")
        while (run) {
            if (System.currentTimeMillis() - start >= 5000) {
                start = System.currentTimeMillis()
                println("目前時間:${format.format(start)}")
                if (++count == 3) {
                    Thread.sleep(5000)
                    start = System.currentTimeMillis()
                    println("我睡醒了!目前時間:${format.format(start)}")
                }
            }
        }
    }

    Thread.sleep(16000)
    run = false
    println("子線程該結束了")

    // 監聽程序結束
    Runtime.getRuntime().addShutdownHook(thread(false) {
        println("程序結束了!")
    })
}
           

執行結果如下:

目前時間:16:59:31
目前時間:16:59:36
目前時間:16:59:41
子線程該結束了
我睡醒了!目前時間:16:59:46
程序結束了!
           

可以看到,當子線程該結束的時候,并沒有馬上結束,而是過了5秒之後才結束的,真實開發中,如果代碼中有函數一直阻塞呢(不一定是sleep()函數)?那你想當然的以為把控制while循環的變量設定為false就萬事大吉了,其實你又錯了,修正這個錯誤,示例如下:

fun main() {
    var run = true
    var count = 0
    var thread: Thread? = null

    thread = thread {
        var start = System.currentTimeMillis()
        val format = SimpleDateFormat("HH:mm:ss")
        while (run && thread?.isInterrupted != true) {
            if (System.currentTimeMillis() - start >= 5000) {
                start = System.currentTimeMillis()
                println("目前時間:${format.format(start)}")
                if (++count == 3) {
                    try {
                        Thread.sleep(5000)
                    } catch (e: InterruptedException) {
                        start = System.currentTimeMillis()
                        println("呀,發生中斷異常了,目前時間:${format.format(start)}")
                        run = false
                        break
                    }
                }
            }
        }
    }

    Thread.sleep(16000)
    run = false
    thread.interrupt()
    println("子線程該結束了")

    // 監聽程序結束
    Runtime.getRuntime().addShutdownHook(thread(false) {
        println("程序結束了!")
    })
}
           

執行結果如下:

目前時間:17:07:53
目前時間:17:07:58
目前時間:17:08:03
子線程該結束了
呀,發生中斷異常了,目前時間:17:08:04
程序結束了!
           

OK,在需要結束的時候,立馬就能結束,變量控制和interrup()調用雙管齊下。當然這種方式僅用在你需要立馬結束線程的情況,不能說什麼情況都這樣用的。比如一部手機,裝了一個錄像App,錄像需要儲存視訊,這肯定是耗時操作啊,需要開子線程,使用者可以随時停止錄制,是以當使用者按下停止錄制的時候,你不能調用儲存錄像的線程的intterp()函數,因為儲存錄像是耗時的,又或者在儲存之前還有加濾鏡操作呢!比如皮膚美白,當使用者按下停止的時候,最後的視訊資料要經過美白處理,然後寫入到檔案,是需要一些時間的,如果你直接中斷線程,那資料就不完整了。是以,在真實開發中,我們一般很少調用interrup()函數來結束線程的,這裡舉出這個例子,是為了讓大家有這樣一個了解:并不是把控制while循環的變量設定為false,線程就馬上結束了,并不是調用了interrup()函數,線程就馬上結束了。

取消協程

講完了線程的取消方式,那接下來就開始講解如何取消協程。

fun main() = runBlocking {
    val job = launch {
        repeat(10) { i ->
            delay(2000L)
            println("i = $i")
        }
    }
    delay(5000L) // 延遲一段時間
    val format = SimpleDateFormat("mm:ss.SSS")
    println("準備取消子協程,目前時間:${format.format(System.currentTimeMillis())}")
    job.cancel() // 取消該作業
    job.join()   // 等待作業執行結束
    println("取消子協程成功,目前時間:${format.format(System.currentTimeMillis())}")
}
           

執行結果如下:

i = 0
i = 1
準備取消子協程,目前時間:31:13.966
取消子協程成功,目前時間:31:13.966
           

官方教程取消協程就是要調用兩個函數的,如下:

job.cancel() // 取消該作業
job.join()   // 等待作業執行結束
           

不要問為什麼,照抄就是了。這兩個函數可以使用一個函數代替,如下:

接下來,我們使用和取消線程時差不多的代碼,我們不用delay()這種虛虛的挂起函數,我們要模拟真正的耗時操作,代碼如下:

fun main() = runBlocking {
    val format = SimpleDateFormat("HH:mm:ss.SSS")
    val job = launch {
        while (true) {
            Thread.sleep(5000)
            val current = System.currentTimeMillis()
            println("目前時間:${format.format(current)}")
        }
    }
    delay(16000L) // 延遲16秒
    println("準備取消子協程,目前時間:${format.format(System.currentTimeMillis())}")
    job.cancelAndJoin()
    println("取消子協程成功,目前時間:${format.format(System.currentTimeMillis())}")
}
           

代碼功能為:子協程每5秒列印一下目前時間,主協程在16秒之後取消子協程,執行結果如下:

目前時間:17:40:14.485
目前時間:17:40:19.485
目前時間:17:40:24.485
目前時間:17:40:29.500
           

這裡隻複制了4行的輸出,實際結果是這個子協程沒有被取消,官方教程說需要使用isActive變量來判斷協程是否取消了,如下:

while (isActive) {
    。。。
}
           

經實驗,還是取消不了的,再仔細對比官方Demo,發現他在開子協程是切換到了其它的線程上了,如下:

val job = launch(Dispatchers.Default) {
	。。。
}
           

再次運作,結果如下:

目前時間:18:01:50.503
目前時間:18:01:55.513
準備取消子協程,目前時間:18:02:00.503
目前時間:18:02:00.523
取消子協程成功,目前時間:18:02:00.523
           

OK,成功取消了,為什麼不切線程就取消不了呢?不得而知,很多知識都還不會,也沒時間去找這個原因了。這裡想說的是,這裡的cancel()操作不就相當于之前取消線程的interrupt()函數嗎?isActive不就相當于isInterrupted()函數嗎,是以,有了取消線程的那些了解,再來了解協程就比較容易。

繼續閱讀