文章目錄
-
-
- 前言
- 問題在哪
- 協程是什麼
- Kt協程和線程
- 協程與線程
- 結論
-
前言
相信很多人都聽過或者看到過這樣一種說法“協程是一種輕量級的線程”。以下文檔中都有類似的描述:
- Kotlin中文網-協程-基礎-第一個協程程式
本質上,協程是輕量級的線程。
- Kotlin英文官網對應位置
Essentially, coroutines are light-weight threads.
顯然,翻譯很準确
問題在哪
相信很多人了解過Kt協程都看過上線官方的這個描述。
因而很多人覺得協程比線程牛逼,因為是”輕量級“。是以有人覺得協程是另一種方式實作的類似線程的功能,是以産生了以下對話:
A:Kotlin協程和線程有啥差別
B:協程是對線程操作的封裝,通過狀态機和回調實作以同步樣式代碼書寫異步邏輯,消除回調低于,更便于了解,巴拉巴拉。。。
A:記憶體消耗上有啥差別?
B:記憶體消耗上如果線程和協程比的話協程更占優勢,因為協程底層實作是類似線程池,有線程複用,是以更省記憶體,如果是線程池和協程比較,實際上并無太大差別。
A:這和我的了解不太一樣呢,我的了解是協程是通過編譯器實作的,是以比線程更節省記憶體,比如我開100w個線程和100w個協程,線程會oom,協程不會
協程是什麼
廣義上的協程是一個**概念(協程是一種非搶占式或者說協作式的計算機程式并發排程)**而不是一個具體的架構。
Kt中的協程是Kt對協程概念的一種具體實作。
Kt協程和線程
Kt協程被官方描述為輕量級線程。
那它跟線程到底是什麼關系呢,是包含封裝關系還是功能類似的兩種不同的實作方式呢?
康康源碼?show me the fucking code!!
協程的切換線程必然要使用到Dispatchers中的幾個變量
public actual object Dispatchers {
@JvmStatic
public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
@JvmStatic
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
@JvmStatic
public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
@JvmStatic
public val IO: CoroutineDispatcher = DefaultScheduler.IO
}
分别看看具體實作
- Main
internal class AndroidDispatcherFactory : MainDispatcherFactory {
override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
HandlerContext(Looper.getMainLooper().asHandler(async = true), "Main")
override fun hintOnError(): String? = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used"
override val loadPriority: Int
get() = Int.MAX_VALUE / 2
}
- IO和Default
IO和Default最後都會走到CoroutineScheduler的dispatch方法,上述代碼為精簡後的代碼。/** * Coroutine scheduler (pool of shared threads) which primary target is to distribute dispatched coroutines * over worker threads, including both CPU-intensive and blocking tasks, is the most efficient manner. .............. **/ @Suppress("NOTHING_TO_INLINE") internal class CoroutineScheduler( @JvmField val corePoolSize: Int, @JvmField val maxPoolSize: Int, @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS, @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME ) : Executor, Closeable { fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) { val task = createTask(block, taskContext) if (task.mode == TASK_NON_BLOCKING) { if (skipUnpark) return signalCpuWork() } else { signalBlockingWork(skipUnpark = skipUnpark) } } private fun signalBlockingWork(skipUnpark: Boolean) { if (tryCreateWorker(stateSnapshot)) return } internal fun signalCpuWork() { if (tryCreateWorker()) return } private fun tryCreateWorker(state: Long = controlState.value): Boolean { val created = createdWorkers(state) val blocking = blockingTasks(state) val cpuWorkers = (created - blocking).coerceAtLeast(0) /* * We check how many threads are there to handle non-blocking work, * and create one more if we have not enough of them. */ if (cpuWorkers < corePoolSize) { val newCpuWorkers = createNewWorker() // If we've created the first cpu worker and corePoolSize > 1 then create // one more (second) cpu worker, so that stealing between them is operational if (newCpuWorkers == 1 && corePoolSize > 1) createNewWorker() if (newCpuWorkers > 0) return true } return false } /* * Returns the number of CPU workers after this function (including new worker) or * 0 if no worker was created. */ private fun createNewWorker(): Int { synchronized(workers) { // Make sure we're not trying to resurrect terminated scheduler if (isTerminated) return -1 val state = controlState.value val created = createdWorkers(state) val blocking = blockingTasks(state) val cpuWorkers = (created - blocking).coerceAtLeast(0) // Double check for overprovision if (cpuWorkers >= corePoolSize) return 0 if (created >= maxPoolSize) return 0 // start & register new worker, commit index only after successful creation val newIndex = createdWorkers + 1 require(newIndex > 0 && workers[newIndex] == null) /* * 1) Claim the slot (under a lock) by the newly created worker * 2) Make it observable by increment created workers count * 3) Only then start the worker, otherwise it may miss its own creation */ val worker = Worker(newIndex) workers[newIndex] = worker require(newIndex == incrementCreatedWorkers()) worker.start() return cpuWorkers + 1 } } }
- 第一行注釋就說明了CoroutineScheduler是一個共享線程池。
- 熟悉線程池的構造方法和邏輯都能在CoroutineScheduler中看到線程池的影子。
- 看看調用邏輯:dispatch --> signalBlockingWork/signalCpuWork --> tryCreateWorker --> createNewWorker–> Worker.start()
internal inner class Worker private constructor() : Thread()
-
扔物線
Kotlin 的協程用力瞥一眼
從 Android 開發者的角度去了解它們的關系:
【碼上開學】到底什麼是「非阻塞式」挂起?協程真的更輕量級嗎?- 我們所有的代碼都是跑線上程中的,而線程是跑在程序中的。
- 協程沒有直接和作業系統關聯,但它不是空中樓閣,它也是跑線上程中的,可以是單線程,也可以是多線程。
- 單線程中的協程總的執行時間并不會比不用協程少。
- Android 系統上,如果在主線程進行網絡請求,會抛出
,對于在主線程上的協程也不例外,這種場景使用協程還是要切線程的。NetworkOnMainThreadException
視訊中講了一個網絡 IO 的例子,IO 阻塞更多是反映在「等」這件事情上,它的性能瓶頸是和網絡的資料交換,你切多少個線程都沒用,該花的時間一點都少不了。
而這跟協程半毛錢關系沒有,切線程解決不了的事情,協程也解決不了。
協程與線程
協程我們講了 3 期,Kotlin 協程和線程是無法脫離開講的。
别的語言我不說,在 Kotlin 裡,協程就是基于線程來實作的一種更上層的工具 API,類似于 Java 自帶的 Executor 系列 API 或者 Android 的 Handler 系列 API。
隻不過呢,協程它不僅提供了友善的 API,在設計思想上是一個基于線程的上層架構,你可以了解為新造了一些概念用來幫助你更好地使用這些 API,僅此而已。
-
官方證明輕量級的方式
運作以下代碼:
import kotlinx.coroutines.* fun main() = runBlocking { repeat(100_000) { // 啟動大量的協程 launch { delay(5000L) print(".") } } }
可以在這裡擷取完整代碼。
它啟動了 10 萬個協程,并且在 5 秒鐘後,每個協程都輸出一個點。
現在,嘗試使用線程來實作。會發生什麼?(很可能你的代碼會産生某種記憶體不足的錯誤)
**用協程和線程比這不是欺負人嗎?咋不用協程和線程池比呢 **
結論
綜上所述,表達一下我自己的結論:Kotlin協程底層是以線程為基礎實作的,複用方面類似線程池。所謂的輕量級更多是指使用上面而不是指性能上面,性能上面約等于線程池。