今天分享的内容是 Java 的一個重量級功能:虛拟線程。
背景
2022-09-20,JDK 19 釋出了GA版本,備受矚目的協程功能也算塵埃落地,不過,此次 GA版本并不是以協程來命名,而是使用了 Virtual
Thread(虛拟線程),并且是 preview預覽版本。小編最早關注到協程功能是在 2020年,那時孵化項目叫做 Java project Loom,
使用的是 Fiber(直譯為:纖維,意譯為:輕量級線程,即協程),但是 GA版本為何最終被定義為 Virtual Thread(虛拟線程),原因不得而知。
GA: General Availability,正式釋出的版本,在國外都是用 GA來指代 release版本;
JEP: JDK Enhancement Proposal, JDK增強建議,JEP是一個JDK核心技術相關的增強建議文檔;
為什麼需要虛拟線程
既然 Java官方推出一個和線程這麼相近的概念,必定是要解決線程的某些問題,是以,我們先回顧下線程的一些特點:
- Java中的線程是對作業系統線程的一個簡單包裝,線程的建立,排程和銷毀等都是由作業系統完成;
- 線程切換需要消耗CPU時間,這部分時間是與業務無關的;
- 線程的性能直接受作業系統處理能力的影響;
是以,線程是一種重量級的資源,作為一名 Java程式員應該深有體會。是以,為了更好的管理線程,Java采用了池化(線程池)的方式進行管理線程,避免線程頻繁建立和銷毀帶來的開銷。但是,盡管線程池避免線程大部分建立和銷毀的開銷,但是線程的排程還是直接受作業系統的影響,那麼有沒有更好的方式來打破這種限制,是以,虛拟線程就孕育而生。
在 JDK 19源碼中,官方直接在 java.lang包下新增一個 VirtualThread類來表示虛拟線程,為了更好的區分虛拟線程和原有的 Thread線程,官方給 Thread類賦予了一個高大上的名字:平台線程。
下面給出了 JDK 19中虛拟線程的 Diagram截圖以及平台線程和系統線程的關系圖:
想了解更多關系線程的知識,可以參考往期的文章:深度剖析:Java線程運作機制,程式員必看的知識點!
如何建立虛拟線程
1.通過 Thread.startVirtualThread()建立
如下示例代碼,通過 Thread.startVirtualThread()可以建立一個新的并且已啟動的虛拟線程,該方法等價于 Thread.ofVirtual().start(task):
public class VirtualThreadTest {
public static void main(String[] args) {
CustomThread customThread = new CustomThread();
// 建立并且啟動虛拟線程
Thread.startVirtualThread(customThread);
}
}
class CustomThread implements Runnable {
@Override
public void run() {
System.out.println("CustomThread run");
}
}
2.通過 Thread.ofVirtual()建立
如下示例代碼,通過 Thread.ofVirtual().unstarted()方式可以建立一個新的未啟動的虛拟線程,然後通過 Thread.start()來啟動線程,也可以通過 Thread.ofVirtual().start()直接建立一個新的并已啟動的虛拟線程:
public class VirtualThreadTest {
public static void main(String[] args) {
CustomThread customThread = new CustomThread();
// 建立并且不啟動虛拟線程,然後 unStarted.start()方法啟動虛拟線程
Thread unStarted = Thread.ofVirtual().unstarted(customThread);
unStarted.start();
// 等同于
Thread.ofVirtual().start(customThread);
}
}
class CustomThread implements Runnable {
@Override
public void run() {
System.out.println("CustomThread run");
}
}
3.通過 ThreadFactory建立
如下示例代碼,通過 ThreadFactory.newThread()方式就能建立一個虛拟線程,然後通過 Thread.start()來啟動線程:
public class VirtualThreadTest {
public static void main(String[] args) {
CustomThread customThread = new CustomThread();
// 擷取線程工廠類
ThreadFactory factory = Thread.ofVirtual().factory();
// 建立虛拟線程
Thread thread = factory.newThread(customThread);
// 啟動線程
thread.start();
}
}
class CustomThread implements Runnable {
@Override
public void run() {
System.out.println("CustomThread run");
}
}
4.通過 Executors.newVirtualThreadPerTaskExecutor()建立
如下示例代碼,通過 JDK自帶的Executors工具類方式建立一個虛拟線程,然後通過 executor.submit()來啟動線程:
public class VirtualThreadTest {
public static void main(String[] args) {
CustomThread customThread = new CustomThread();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
executor.submit(customThread);
}
}
class CustomThread implements Runnable {
@Override
public void run() {
System.out.println("CustomThread run");
}
}
通過上述列舉的 4種建立虛拟線程的方式可以看出,官方為了降低虛拟線程的門檻,盡力複用原有的Thread線程類,這樣可以平滑的過渡到虛拟線程的使用。不過,在 Java 19中,虛拟線程還是一個預覽功能,預設關閉,需要使用參數 --enable-preview 來啟用該功能,預覽功能源碼和啟動虛拟線程指令如下:
// Thread 源碼,通過 @PreviewFeature 注解來标注 虛拟線程為 預覽功能
public class Thread implements Runnable {
/**
* Creates a virtual thread to execute a task and schedules it to execute.
This method is equivalent to: Thread.ofVirtual().start(task);
Params: task – the object to run when the thread executes
Returns: a new, and started, virtual thread
Throws: UnsupportedOperationException – if preview features are not enabled
Since: 19
See Also: Inheritance when creating threads
* @param task
* @return
*/
@PreviewFeature(feature = PreviewFeature.Feature.VIRTUAL_THREADS)
public static Thread startVirtualThread(Runnable task) {
Objects.requireNonNull(task);
// 判斷是否開啟虛拟線程功能
PreviewFeatures.ensureEnabled();
var thread = ThreadBuilders.newVirtualThread(null, null, 0, task);
thread.start();
return thread;
}
// 異常資訊提醒 可以通過 --enable-preview 開啟虛拟線程功能
public static void ensureEnabled() {
if (!isEnabled()) {
throw new UnsupportedOperationException(
"Preview Features not enabled, need to run with --enable-preview");
}
}
}
# 開啟虛拟線程功能
java --source 19 --enable-preview XXX.java
IDEA 中配置 --enable-preview 如下圖:
為了更好的感受虛拟線程的性能,我們模拟一個對比測試用例:分别使用虛拟線程和線程池執行10w個任務,每個線程任務睡眠10ms,統計各自的總耗時和建立的最大平台線程總數,示例代碼如下:
// 虛拟線程
public class VirtualThreadTest {
static List<Integer> list = new ArrayList<>();
public static void main(String[] args) {
// 開啟一個線程來監控目前的平台線程(系統線程)總數
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleAtFixedRate(() -> {
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
ThreadInfo[] threadInfo = threadBean.dumpAllThreads(false, false);
saveMaxThreadNum(threadInfo.length);
}, 10, 10, TimeUnit.MILLISECONDS);
long start = System.currentTimeMillis();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
for (int i = 0; i < 10000; i++) {
executor.submit(() -> {
// 線程睡眠 10ms,可以等同于模拟業務耗時10ms
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
}
});
}
executor.close();
System.out.println("max:" + list.get(0) + " platform thread/os thread");
System.out.printf("totalMillis:%dms\n", System.currentTimeMillis() - start);
}
}
public class ThreadTest {
static List<Integer> list = new ArrayList<>();
public static void main(String[] args) {
// 開啟一個線程來監控目前的平台線程(系統線程)總數
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleAtFixedRate(() -> {
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
ThreadInfo[] threadInfo = threadBean.dumpAllThreads(false, false);
saveMaxThreadNum(threadInfo.length);
}, 1, 1, TimeUnit.SECONDS);
long start = System.currentTimeMillis();
ExecutorService executor = Executors.newFixedThreadPool(200);
for (int i = 0; i < 100000; i++) {
executor.submit(() -> {
try {
// 線程睡眠 10ms,可以等同于模拟業務耗時10ms
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
}
});
}
executor.close();
System.out.println("max:" + list.get(0) + " platform thread/os thread");
System.out.printf("totalMillis:%dms\n", System.currentTimeMillis() - start);
}
}
// 儲存平台線程的建立的最大總數
public static List<Integer> saveMaxThreadNum(int num) {
if (list.isEmpty()) {
list.add(num);
} else {
Integer integer = list.get(0);
if (num > integer) {
list.add(0, num);
}
}
return list;
}
兩個示例的運作結果:
通過運作結果可以發現:
- 使用虛拟線程執行 10w個任務總耗時為:129ms,最大建立了 18個平台線程;
- 使用線程池執行 10w個任務總耗時為:6103 ms,最大建立了 207個平台線程;
- 兩者總耗時差50倍,最大建立的平台線程總數差 10倍,是以性能差可想而知;
核心源碼解析
首先從 VirtualThread類開始,源碼如下:
/**
* A thread that is scheduled by the Java virtual machine rather than the operating system.
*/
final class VirtualThread extends BaseVirtualThread {
/**
* Creates a new {@code VirtualThread} to run the given task with the given
* scheduler. If the given scheduler is {@code null} and the current thread
* is a platform thread then the newly created virtual thread will use the
* default scheduler. If given scheduler is {@code null} and the current
* thread is a virtual thread then the current thread's scheduler is used.
*
* @param scheduler the scheduler or null
* @param name thread name
* @param characteristics characteristics
* @param task the task to execute
*/
VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
super(name, characteristics, /*bound*/ false);
Objects.requireNonNull(task);
// choose scheduler if not specified
if (scheduler == null) {
Thread parent = Thread.currentThread();
if (parent instanceof VirtualThread vparent) {
scheduler = vparent.scheduler;
} else {
scheduler = DEFAULT_SCHEDULER;
}
}
this.scheduler = scheduler;
this.cont = new VThreadContinuation(this, task);
this.runContinuation = this::runContinuation;
}
/**
* 建立預設的排程器
* Creates the default scheduler.
*/
@SuppressWarnings("removal")
private static ForkJoinPool createDefaultScheduler() {
ForkJoinWorkerThreadFactory factory = pool -> {
PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread(pool);
return AccessController.doPrivileged(pa);
};
PrivilegedAction<ForkJoinPool> pa = () -> {
int parallelism, maxPoolSize, minRunnable;
String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
if (parallelismValue != null) {
parallelism = Integer.parseInt(parallelismValue);
} else {
parallelism = Runtime.getRuntime().availableProcessors();
}
if (maxPoolSizeValue != null) {
maxPoolSize = Integer.parseInt(maxPoolSizeValue);
parallelism = Integer.min(parallelism, maxPoolSize);
} else {
maxPoolSize = Integer.max(parallelism, 256);
}
if (minRunnableValue != null) {
minRunnable = Integer.parseInt(minRunnableValue);
} else {
minRunnable = Integer.max(parallelism / 2, 1);
}
Thread.UncaughtExceptionHandler handler = (t, e) -> { };
boolean asyncMode = true; // FIFO
return new ForkJoinPool(parallelism, factory, handler, asyncMode,
0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);
};
return AccessController.doPrivileged(pa);
}
}
通過 VirtualThread類的源碼可以總結出:
- VirtualThread繼承 BaseVirtualThread類,BaseVirtualThread類繼承 Thread類;
- 虛拟線程是 JVM進行排程的,而不是作業系統;
- VirtualThread類是一個終态類,是以該類無法被繼承,無法被擴充;
VirtualThread類,隻提供了一個構造器,接收 4個參數:
- Executor scheduler:如果給定的排程器為空并且目前線程是平台線程,那麼新建立的虛拟線程将使用預設排程程式(底層采用 ForkJoinPool),如果給定的排程器為空并且目前線程是虛拟線程,則使用目前線程的排程程式
- String name:自定義線程名
- int characteristics:線程特征值
- Runnable task:需要執行的任務
然後我們看下 JDK中建立虛拟線程的源碼:
public class Thread implements Runnable {
/**
* Creates a virtual thread to execute a task and schedules it to execute.
This method is equivalent to: Thread.ofVirtual().start(task);
Params: task – the object to run when the thread executes
Returns: a new, and started, virtual thread
Throws: UnsupportedOperationException – if preview features are not enabled
Since: 19
See Also: Inheritance when creating threads
* @param task
* @return
*/
@PreviewFeature(feature = PreviewFeature.Feature.VIRTUAL_THREADS)
public static Thread startVirtualThread(Runnable task) {
Objects.requireNonNull(task);
// 判斷是否開啟虛拟線程功能
PreviewFeatures.ensureEnabled();
var thread = ThreadBuilders.newVirtualThread(null, null, 0, task);
thread.start();
return thread;
}
// 異常資訊提醒 可以通過 --enable-preview 開啟虛拟線程功能
public static void ensureEnabled() {
if (!isEnabled()) {
throw new UnsupportedOperationException(
"Preview Features not enabled, need to run with --enable-preview");
}
}
}
class ThreadBuilders {
static Thread newVirtualThread(Executor scheduler,
String name,
int characteristics,
Runnable task) {
if (ContinuationSupport.isSupported()) {
return new VirtualThread(scheduler, name, characteristics, task);
} else {
if (scheduler != null)
throw new UnsupportedOperationException();
return new BoundVirtualThread(name, characteristics, task);
}
}
/**
* Returns a builder for creating a virtual {@code Thread} or {@code ThreadFactory}
* that creates virtual threads.
*
* @apiNote The following are examples using the builder:
* {@snippet :
* // Start a virtual thread to run a task.
* Thread thread = Thread.ofVirtual().start(runnable);
*
* // A ThreadFactory that creates virtual threads
* ThreadFactory factory = Thread.ofVirtual().factory();
* }
*
* @return A builder for creating {@code Thread} or {@code ThreadFactory} objects.
* @throws UnsupportedOperationException if preview features are not enabled
* @since 19
*/
@PreviewFeature(feature = PreviewFeature.Feature.VIRTUAL_THREADS)
public static Builder.OfVirtual ofVirtual() {
PreviewFeatures.ensureEnabled();
return new ThreadBuilders.VirtualThreadBuilder();
}
}
Thread.startVirtualThread()建立虛拟線程,會調用ThreadBuilders.newVirtualThread(),最終調用 new VirtualThread()構造器來建立虛拟線程。
從上文我們在介紹虛拟線程建立的 4種方式也可以看出,虛拟線程建立的入口在 Thread 或者 Executors 類中,和以前使用線程或者線程池的習慣保持一緻。
final class VirtualThread extends BaseVirtualThread {
/**
* Mounts this virtual thread onto the current platform thread. On
* return, the current thread is the virtual thread.
*/
@ChangesCurrentThread
private void mount() {
// sets the carrier thread
Thread carrier = Thread.currentCarrierThread();
setCarrierThread(carrier);
// sync up carrier thread interrupt status if needed
if (interrupted) {
carrier.setInterrupt();
} else if (carrier.isInterrupted()) {
synchronized (interruptLock) {
// need to recheck interrupt status
if (!interrupted) {
carrier.clearInterrupt();
}
}
}
// set Thread.currentThread() to return this virtual thread
carrier.setCurrentThread(this);
}
/**
* Unmounts this virtual thread from the carrier. On return, the
* current thread is the current platform thread.
*/
@ChangesCurrentThread
private void unmount() {
// set Thread.currentThread() to return the platform thread
Thread carrier = this.carrierThread;
carrier.setCurrentThread(carrier);
// break connection to carrier thread, synchronized with interrupt
synchronized (interruptLock) {
setCarrierThread(null);
}
carrier.clearInterrupt();
}
}
mount() 和 unmount() 是虛拟線程兩個核心方法:
- mount(),可以将此虛拟線程挂載到目前平台線程上,傳回時,目前線程是虛拟線程;
- unmount(),從載體線程解除安裝此虛拟線程,傳回時,目前線程是平台線程
通過這兩個方式可以看出虛拟線程是搭載在平台線程上運作,運作結束後,從平台線程上解除安裝。
虛拟線程的狀态和轉換
下表總結了虛拟線程中的所有線程狀态以及狀态之間轉化的條件:
狀态 | 轉換條件 |
NEW -> STARTED | Thread.start |
STARTED -> TERMINATED | failed to start |
STARTED -> RUNNING | first run |
RUNNING -> PARKING | Thread attempts to park |
PARKING -> PARKED | cont.yield successful, thread is parked |
PARKING -> PINNED | cont.yield failed, thread is pinned |
PARKED -> RUNNABLE | unpark or interrupted |
PINNED -> RUNNABLE | unpark or interrupted |
RUNNABLE -> RUNNING | continue execution |
RUNNING -> YIELDING | Thread.yield |
YIELDING -> RUNNABLE | yield successful |
YIELDING -> RUNNING | yield failed |
RUNNING -> TERMINATED | done |
3種線程的關系
VirtualThread,Platform Thread,OS Thread 三者的關系如下圖:
說明:
在現有的線程模型下,一個 Java線程相當于一個作業系統線程,多個虛拟線程需要挂載在一個平台線程(載體線程)上,每個平台線程和系統線程一一對應。是以,VirtualThread是屬于 JVM級别的線程,由JVM排程,它是非常輕量級的資源,使用完後立即被銷毀,是以就不需要像平台線程一樣使用池化(線程池)。
虛拟線程在執行到 IO 操作或 Blocking操作時,會自動切換到其他虛拟線程執行,進而避免目前線程等待,可以高效通過少數線程去排程大量虛拟線程,最大化提升線程的執行效率。
總結
-
Virtual Thread将會在性能上帶來的巨大提高,不過,目前業界80~90%的代碼還跑在 Java 8上,等 JDK
19投入實際生産環境,可能需要一個漫長的過程;
- 虛拟線程高度複用了現有的 Thread線程的功能,友善現有方式平滑遷移到虛拟線程;
- 虛拟線程是将 Thread作為載體線程,它并沒有改變原來的線程模型;
- 虛拟線程是 JVM排程的,而不是作業系統排程;
- 使用虛拟線程可以顯著提高程式吞吐量;
- 虛拟線程适合 并發任務數量很高 或者 IO密集型的場景,對于 計算密集型任務還需通過過增加CPU核心解決,或者利用分布式計算資源來來解決;
- 虛拟線程目前隻是一個預覽功能,隻能從源碼和簡單的測試來分析,并無真實生産環境的驗證;
曾一段時間内,JDK一直緻力于 Reactor響應式程式設計,試圖從這條路子來提升 Java的性能,但是最終發現:響應式程式設計難了解,難調試,難使用, 是以又把焦點轉向了同步程式設計,為了改善性能,虛拟線程誕生了。或許虛拟線程很難在短時間内運用到實際生産中,但是通過官方的JDK版本釋出,我們可以看到:盡管是 Oracle這樣的科技型巨頭也會走彎路,了解 JDK的動态,可以幫助我們更好的把握學習 Java的重心以及後面的發展趨勢。
參考
Virtual Thread JEP
java-virtual-threads