天天看點

關于Kotlin中“協程一種輕量級線程”的解釋

文章目錄

      • 前言
      • 問題在哪
      • 協程是什麼
      • Kt協程和線程
      • 協程與線程
      • 結論

前言

相信很多人都聽過或者看到過這樣一種說法“協程是一種輕量級的線程”。以下文檔中都有類似的描述:

  1. Kotlin中文網-協程-基礎-第一個協程程式
    本質上,協程是輕量級的線程。
  2. Kotlin英文官網對應位置
    Essentially, coroutines are light-weight threads.

顯然,翻譯很準确

問題在哪

相信很多人了解過Kt協程都看過上線官方的這個描述。

因而很多人覺得協程比線程牛逼,因為是”輕量級“。是以有人覺得協程是另一種方式實作的類似線程的功能,是以産生了以下對話:

A:Kotlin協程和線程有啥差別

B:協程是對線程操作的封裝,通過狀态機和回調實作以同步樣式代碼書寫異步邏輯,消除回調低于,更便于了解,巴拉巴拉。。。

A:記憶體消耗上有啥差別?

B:記憶體消耗上如果線程和協程比的話協程更占優勢,因為協程底層實作是類似線程池,有線程複用,是以更省記憶體,如果是線程池和協程比較,實際上并無太大差別。

A:這和我的了解不太一樣呢,我的了解是協程是通過編譯器實作的,是以比線程更節省記憶體,比如我開100w個線程和100w個協程,線程會oom,協程不會

協程是什麼

廣義上的協程是一個**概念(協程是一種非搶占式或者說協作式的計算機程式并發排程)**而不是一個具體的架構。

Kt中的協程是Kt對協程概念的一種具體實作。

Kt協程和線程

Kt協程被官方描述為輕量級線程。

那它跟線程到底是什麼關系呢,是包含封裝關系還是功能類似的兩種不同的實作方式呢?

康康源碼?show me the fucking code!!

協程的切換線程必然要使用到Dispatchers中的幾個變量

public actual object Dispatchers {
   
    @JvmStatic
    public actual val Default: CoroutineDispatcher = createDefaultDispatcher()

   
    @JvmStatic
    public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher

   
    @JvmStatic
    public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined

   
    @JvmStatic
    public val IO: CoroutineDispatcher = DefaultScheduler.IO
}
           

分别看看具體實作

  1. Main
internal class AndroidDispatcherFactory : MainDispatcherFactory {

    override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
        HandlerContext(Looper.getMainLooper().asHandler(async = true), "Main")

    override fun hintOnError(): String? = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used"

    override val loadPriority: Int
        get() = Int.MAX_VALUE / 2
}
           
  1. IO和Default
    /**
     * Coroutine scheduler (pool of shared threads) which primary target is to distribute dispatched coroutines
     * over worker threads, including both CPU-intensive and blocking tasks, is the most efficient manner.
     ..............
     **/
     
    @Suppress("NOTHING_TO_INLINE")
    internal class CoroutineScheduler(
        @JvmField val corePoolSize: Int,
        @JvmField val maxPoolSize: Int,
        @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
        @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
    ) : Executor, Closeable {
        fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
            val task = createTask(block, taskContext)
         
            if (task.mode == TASK_NON_BLOCKING) {
                if (skipUnpark) return
                signalCpuWork()
            } else {
                signalBlockingWork(skipUnpark = skipUnpark)
            }
        }
    
    
        private fun signalBlockingWork(skipUnpark: Boolean) {
            if (tryCreateWorker(stateSnapshot)) return
        }
    
        internal fun signalCpuWork() {
            if (tryCreateWorker()) return
        }
        private fun tryCreateWorker(state: Long = controlState.value): Boolean {
            val created = createdWorkers(state)
            val blocking = blockingTasks(state)
            val cpuWorkers = (created - blocking).coerceAtLeast(0)
            /*
             * We check how many threads are there to handle non-blocking work,
             * and create one more if we have not enough of them.
             */
            if (cpuWorkers < corePoolSize) {
                val newCpuWorkers = createNewWorker()
                // If we've created the first cpu worker and corePoolSize > 1 then create
                // one more (second) cpu worker, so that stealing between them is operational
                if (newCpuWorkers == 1 && corePoolSize > 1) createNewWorker()
                if (newCpuWorkers > 0) return true
            }
            return false
        }
        /*
         * Returns the number of CPU workers after this function (including new worker) or
         * 0 if no worker was created.
         */
        private fun createNewWorker(): Int {
            synchronized(workers) {
                // Make sure we're not trying to resurrect terminated scheduler
                if (isTerminated) return -1
                val state = controlState.value
                val created = createdWorkers(state)
                val blocking = blockingTasks(state)
                val cpuWorkers = (created - blocking).coerceAtLeast(0)
                // Double check for overprovision
                if (cpuWorkers >= corePoolSize) return 0
                if (created >= maxPoolSize) return 0
                // start & register new worker, commit index only after successful creation
                val newIndex = createdWorkers + 1
                require(newIndex > 0 && workers[newIndex] == null)
                /*
                 * 1) Claim the slot (under a lock) by the newly created worker
                 * 2) Make it observable by increment created workers count
                 * 3) Only then start the worker, otherwise it may miss its own creation
                 */
                val worker = Worker(newIndex)
                workers[newIndex] = worker
                require(newIndex == incrementCreatedWorkers())
                worker.start()
                return cpuWorkers + 1
            }
        }
    
    }
               
    IO和Default最後都會走到CoroutineScheduler的dispatch方法,上述代碼為精簡後的代碼。
  2. 第一行注釋就說明了CoroutineScheduler是一個共享線程池。
  3. 熟悉線程池的構造方法和邏輯都能在CoroutineScheduler中看到線程池的影子。
  4. 看看調用邏輯:dispatch --> signalBlockingWork/signalCpuWork --> tryCreateWorker --> createNewWorker–> Worker.start()
    internal inner class Worker private constructor() : Thread() 
               
  5. 扔物線

    Kotlin 的協程用力瞥一眼

    從 Android 開發者的角度去了解它們的關系:
    • 我們所有的代碼都是跑線上程中的,而線程是跑在程序中的。
    • 協程沒有直接和作業系統關聯,但它不是空中樓閣,它也是跑線上程中的,可以是單線程,也可以是多線程。
    • 單線程中的協程總的執行時間并不會比不用協程少。
    • Android 系統上,如果在主線程進行網絡請求,會抛出

      NetworkOnMainThreadException

      ,對于在主線程上的協程也不例外,這種場景使用協程還是要切線程的。
    【碼上開學】到底什麼是「非阻塞式」挂起?協程真的更輕量級嗎?

    視訊中講了一個網絡 IO 的例子,IO 阻塞更多是反映在「等」這件事情上,它的性能瓶頸是和網絡的資料交換,你切多少個線程都沒用,該花的時間一點都少不了。

    而這跟協程半毛錢關系沒有,切線程解決不了的事情,協程也解決不了。

    協程與線程

    協程我們講了 3 期,Kotlin 協程和線程是無法脫離開講的。

    别的語言我不說,在 Kotlin 裡,協程就是基于線程來實作的一種更上層的工具 API,類似于 Java 自帶的 Executor 系列 API 或者 Android 的 Handler 系列 API。

    隻不過呢,協程它不僅提供了友善的 API,在設計思想上是一個基于線程的上層架構,你可以了解為新造了一些概念用來幫助你更好地使用這些 API,僅此而已。

  6. 官方證明輕量級的方式

    運作以下代碼:

    import kotlinx.coroutines.*
    
    fun main() = runBlocking {
        repeat(100_000) { // 啟動大量的協程
            launch {
                delay(5000L)
                print(".")
            }
        }
    }
               
    可以在這裡擷取完整代碼。

    它啟動了 10 萬個協程,并且在 5 秒鐘後,每個協程都輸出一個點。

    現在,嘗試使用線程來實作。會發生什麼?(很可能你的代碼會産生某種記憶體不足的錯誤)

    **用協程和線程比這不是欺負人嗎?咋不用協程和線程池比呢 **

結論

綜上所述,表達一下我自己的結論:Kotlin協程底層是以線程為基礎實作的,複用方面類似線程池。所謂的輕量級更多是指使用上面而不是指性能上面,性能上面約等于線程池。