天天看點

千呼萬喚始出來:Java終于釋出了"協程"——虛拟線程!

作者:一個即将退役的碼農

今天分享的内容是 Java 的一個重量級功能:虛拟線程。

背景

2022-09-20,JDK 19 釋出了GA版本,備受矚目的協程功能也算塵埃落地,不過,此次 GA版本并不是以協程來命名,而是使用了 Virtual

Thread(虛拟線程),并且是 preview預覽版本。小編最早關注到協程功能是在 2020年,那時孵化項目叫做 Java project Loom,

使用的是 Fiber(直譯為:纖維,意譯為:輕量級線程,即協程),但是 GA版本為何最終被定義為 Virtual Thread(虛拟線程),原因不得而知。

GA: General Availability,正式釋出的版本,在國外都是用 GA來指代 release版本;

JEP: JDK Enhancement Proposal, JDK增強建議,JEP是一個JDK核心技術相關的增強建議文檔;

為什麼需要虛拟線程

既然 Java官方推出一個和線程這麼相近的概念,必定是要解決線程的某些問題,是以,我們先回顧下線程的一些特點:

  • Java中的線程是對作業系統線程的一個簡單包裝,線程的建立,排程和銷毀等都是由作業系統完成;
  • 線程切換需要消耗CPU時間,這部分時間是與業務無關的;
  • 線程的性能直接受作業系統處理能力的影響;

是以,線程是一種重量級的資源,作為一名 Java程式員應該深有體會。是以,為了更好的管理線程,Java采用了池化(線程池)的方式進行管理線程,避免線程頻繁建立和銷毀帶來的開銷。但是,盡管線程池避免線程大部分建立和銷毀的開銷,但是線程的排程還是直接受作業系統的影響,那麼有沒有更好的方式來打破這種限制,是以,虛拟線程就孕育而生。

在 JDK 19源碼中,官方直接在 java.lang包下新增一個 VirtualThread類來表示虛拟線程,為了更好的區分虛拟線程和原有的 Thread線程,官方給 Thread類賦予了一個高大上的名字:平台線程。

下面給出了 JDK 19中虛拟線程的 Diagram截圖以及平台線程和系統線程的關系圖:

千呼萬喚始出來:Java終于釋出了"協程"——虛拟線程!
千呼萬喚始出來:Java終于釋出了"協程"——虛拟線程!
想了解更多關系線程的知識,可以參考往期的文章:深度剖析:Java線程運作機制,程式員必看的知識點!

如何建立虛拟線程

1.通過 Thread.startVirtualThread()建立

如下示例代碼,通過 Thread.startVirtualThread()可以建立一個新的并且已啟動的虛拟線程,該方法等價于 Thread.ofVirtual().start(task):

public class VirtualThreadTest {  
  
public static void main(String[] args) {  
CustomThread customThread = new CustomThread();  
// 建立并且啟動虛拟線程  
Thread.startVirtualThread(customThread);  
}  
}  
  
class CustomThread implements Runnable {  
@Override  
public void run() {  
System.out.println("CustomThread run");  
}  
}  
           

2.通過 Thread.ofVirtual()建立

如下示例代碼,通過 Thread.ofVirtual().unstarted()方式可以建立一個新的未啟動的虛拟線程,然後通過 Thread.start()來啟動線程,也可以通過 Thread.ofVirtual().start()直接建立一個新的并已啟動的虛拟線程:

public class VirtualThreadTest {  
  
public static void main(String[] args) {  
CustomThread customThread = new CustomThread();  
// 建立并且不啟動虛拟線程,然後 unStarted.start()方法啟動虛拟線程  
Thread unStarted = Thread.ofVirtual().unstarted(customThread);  
unStarted.start();  
  
// 等同于  
Thread.ofVirtual().start(customThread);  
}  
}  
  
class CustomThread implements Runnable {  
@Override  
public void run() {  
System.out.println("CustomThread run");  
}  
}  
           

3.通過 ThreadFactory建立

如下示例代碼,通過 ThreadFactory.newThread()方式就能建立一個虛拟線程,然後通過 Thread.start()來啟動線程:

public class VirtualThreadTest {  
  
public static void main(String[] args) {  
CustomThread customThread = new CustomThread();  
// 擷取線程工廠類  
ThreadFactory factory = Thread.ofVirtual().factory();  
// 建立虛拟線程  
Thread thread = factory.newThread(customThread);  
// 啟動線程  
thread.start();  
}  
}  
  
class CustomThread implements Runnable {  
@Override  
public void run() {  
System.out.println("CustomThread run");  
}  
}  
           

4.通過 Executors.newVirtualThreadPerTaskExecutor()建立

如下示例代碼,通過 JDK自帶的Executors工具類方式建立一個虛拟線程,然後通過 executor.submit()來啟動線程:

public class VirtualThreadTest {  
  
public static void main(String[] args) {  
CustomThread customThread = new CustomThread();  
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();  
executor.submit(customThread);  
}  
}  
  
class CustomThread implements Runnable {  
@Override  
public void run() {  
System.out.println("CustomThread run");  
}  
}  
           

通過上述列舉的 4種建立虛拟線程的方式可以看出,官方為了降低虛拟線程的門檻,盡力複用原有的Thread線程類,這樣可以平滑的過渡到虛拟線程的使用。不過,在 Java 19中,虛拟線程還是一個預覽功能,預設關閉,需要使用參數 --enable-preview 來啟用該功能,預覽功能源碼和啟動虛拟線程指令如下:

// Thread 源碼,通過 @PreviewFeature 注解來标注 虛拟線程為 預覽功能  
public class Thread implements Runnable {  
/**  
* Creates a virtual thread to execute a task and schedules it to execute.  
This method is equivalent to: Thread.ofVirtual().start(task);  
Params: task – the object to run when the thread executes  
Returns: a new, and started, virtual thread  
Throws: UnsupportedOperationException – if preview features are not enabled  
Since: 19  
See Also: Inheritance when creating threads  
* @param task  
* @return  
*/  
@PreviewFeature(feature = PreviewFeature.Feature.VIRTUAL_THREADS)  
public static Thread startVirtualThread(Runnable task) {  
Objects.requireNonNull(task);  
// 判斷是否開啟虛拟線程功能  
PreviewFeatures.ensureEnabled();  
var thread = ThreadBuilders.newVirtualThread(null, null, 0, task);  
thread.start();  
return thread;  
}  
  
// 異常資訊提醒 可以通過 --enable-preview 開啟虛拟線程功能  
public static void ensureEnabled() {  
if (!isEnabled()) {  
throw new UnsupportedOperationException(  
"Preview Features not enabled, need to run with --enable-preview");  
}  
}  
}  
           
# 開啟虛拟線程功能  
java --source 19 --enable-preview XXX.java  
           

IDEA 中配置 --enable-preview 如下圖:

千呼萬喚始出來:Java終于釋出了"協程"——虛拟線程!

為了更好的感受虛拟線程的性能,我們模拟一個對比測試用例:分别使用虛拟線程和線程池執行10w個任務,每個線程任務睡眠10ms,統計各自的總耗時和建立的最大平台線程總數,示例代碼如下:

// 虛拟線程  
public class VirtualThreadTest {  
static List<Integer> list = new ArrayList<>();  
  
public static void main(String[] args) {  
// 開啟一個線程來監控目前的平台線程(系統線程)總數  
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);  
scheduledExecutorService.scheduleAtFixedRate(() -> {  
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();  
ThreadInfo[] threadInfo = threadBean.dumpAllThreads(false, false);  
saveMaxThreadNum(threadInfo.length);  
}, 10, 10, TimeUnit.MILLISECONDS);  
  
long start = System.currentTimeMillis();  
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();  
for (int i = 0; i < 10000; i++) {  
executor.submit(() -> {  
// 線程睡眠 10ms,可以等同于模拟業務耗時10ms  
try {  
TimeUnit.MILLISECONDS.sleep(10);  
} catch (InterruptedException e) {  
  
}  
});  
}  
executor.close();  
System.out.println("max:" + list.get(0) + " platform thread/os thread");  
System.out.printf("totalMillis:%dms\n", System.currentTimeMillis() - start);  
}  
}  
  
public class ThreadTest {  
static List<Integer> list = new ArrayList<>();  
public static void main(String[] args) {  
// 開啟一個線程來監控目前的平台線程(系統線程)總數  
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);  
scheduledExecutorService.scheduleAtFixedRate(() -> {  
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();  
ThreadInfo[] threadInfo = threadBean.dumpAllThreads(false, false);  
saveMaxThreadNum(threadInfo.length);  
}, 1, 1, TimeUnit.SECONDS);  
  
long start = System.currentTimeMillis();  
ExecutorService executor = Executors.newFixedThreadPool(200);  
for (int i = 0; i < 100000; i++) {  
executor.submit(() -> {  
try {  
// 線程睡眠 10ms,可以等同于模拟業務耗時10ms  
TimeUnit.MILLISECONDS.sleep(10);  
} catch (InterruptedException e) {  
  
}  
});  
}  
executor.close();  
System.out.println("max:" + list.get(0) + " platform thread/os thread");  
System.out.printf("totalMillis:%dms\n", System.currentTimeMillis() - start);  
}  
}  
  
// 儲存平台線程的建立的最大總數  
public static List<Integer> saveMaxThreadNum(int num) {  
if (list.isEmpty()) {  
list.add(num);  
} else {  
Integer integer = list.get(0);  
if (num > integer) {  
list.add(0, num);  
}  
}  
return list;  
}  
           

兩個示例的運作結果:

千呼萬喚始出來:Java終于釋出了"協程"——虛拟線程!

通過運作結果可以發現:

  • 使用虛拟線程執行 10w個任務總耗時為:129ms,最大建立了 18個平台線程;
  • 使用線程池執行 10w個任務總耗時為:6103 ms,最大建立了 207個平台線程;
  • 兩者總耗時差50倍,最大建立的平台線程總數差 10倍,是以性能差可想而知;

核心源碼解析

首先從 VirtualThread類開始,源碼如下:

/**  
* A thread that is scheduled by the Java virtual machine rather than the operating system.  
*/  
final class VirtualThread extends BaseVirtualThread {  
  
/**  
* Creates a new {@code VirtualThread} to run the given task with the given  
* scheduler. If the given scheduler is {@code null} and the current thread  
* is a platform thread then the newly created virtual thread will use the  
* default scheduler. If given scheduler is {@code null} and the current  
* thread is a virtual thread then the current thread's scheduler is used.  
*  
* @param scheduler the scheduler or null  
* @param name thread name  
* @param characteristics characteristics  
* @param task the task to execute  
*/  
VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {  
super(name, characteristics, /*bound*/ false);  
Objects.requireNonNull(task);  
  
// choose scheduler if not specified  
if (scheduler == null) {  
Thread parent = Thread.currentThread();  
if (parent instanceof VirtualThread vparent) {  
scheduler = vparent.scheduler;  
} else {  
scheduler = DEFAULT_SCHEDULER;  
}  
}  
  
this.scheduler = scheduler;  
this.cont = new VThreadContinuation(this, task);  
this.runContinuation = this::runContinuation;  
}  
  
/**  
* 建立預設的排程器  
* Creates the default scheduler.  
*/  
@SuppressWarnings("removal")  
private static ForkJoinPool createDefaultScheduler() {  
ForkJoinWorkerThreadFactory factory = pool -> {  
PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread(pool);  
return AccessController.doPrivileged(pa);  
};  
PrivilegedAction<ForkJoinPool> pa = () -> {  
int parallelism, maxPoolSize, minRunnable;  
String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");  
String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");  
String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");  
if (parallelismValue != null) {  
parallelism = Integer.parseInt(parallelismValue);  
} else {  
parallelism = Runtime.getRuntime().availableProcessors();  
}  
if (maxPoolSizeValue != null) {  
maxPoolSize = Integer.parseInt(maxPoolSizeValue);  
parallelism = Integer.min(parallelism, maxPoolSize);  
} else {  
maxPoolSize = Integer.max(parallelism, 256);  
}  
if (minRunnableValue != null) {  
minRunnable = Integer.parseInt(minRunnableValue);  
} else {  
minRunnable = Integer.max(parallelism / 2, 1);  
}  
Thread.UncaughtExceptionHandler handler = (t, e) -> { };  
boolean asyncMode = true; // FIFO  
return new ForkJoinPool(parallelism, factory, handler, asyncMode,  
0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);  
};  
return AccessController.doPrivileged(pa);  
}  
}  
           

通過 VirtualThread類的源碼可以總結出:

  • VirtualThread繼承 BaseVirtualThread類,BaseVirtualThread類繼承 Thread類;
  • 虛拟線程是 JVM進行排程的,而不是作業系統;
  • VirtualThread類是一個終态類,是以該類無法被繼承,無法被擴充;

VirtualThread類,隻提供了一個構造器,接收 4個參數:

  • Executor scheduler:如果給定的排程器為空并且目前線程是平台線程,那麼新建立的虛拟線程将使用預設排程程式(底層采用 ForkJoinPool),如果給定的排程器為空并且目前線程是虛拟線程,則使用目前線程的排程程式
  • String name:自定義線程名
  • int characteristics:線程特征值
  • Runnable task:需要執行的任務

然後我們看下 JDK中建立虛拟線程的源碼:

public class Thread implements Runnable {  
/**  
* Creates a virtual thread to execute a task and schedules it to execute.  
This method is equivalent to: Thread.ofVirtual().start(task);  
Params: task – the object to run when the thread executes  
Returns: a new, and started, virtual thread  
Throws: UnsupportedOperationException – if preview features are not enabled  
Since: 19  
See Also: Inheritance when creating threads  
* @param task  
* @return  
*/  
@PreviewFeature(feature = PreviewFeature.Feature.VIRTUAL_THREADS)  
public static Thread startVirtualThread(Runnable task) {  
Objects.requireNonNull(task);  
// 判斷是否開啟虛拟線程功能  
PreviewFeatures.ensureEnabled();  
var thread = ThreadBuilders.newVirtualThread(null, null, 0, task);  
thread.start();  
return thread;  
}  
  
// 異常資訊提醒 可以通過 --enable-preview 開啟虛拟線程功能  
public static void ensureEnabled() {  
if (!isEnabled()) {  
throw new UnsupportedOperationException(  
"Preview Features not enabled, need to run with --enable-preview");  
}  
}  
}  
  
class ThreadBuilders {  
static Thread newVirtualThread(Executor scheduler,  
String name,  
int characteristics,  
Runnable task) {  
if (ContinuationSupport.isSupported()) {  
return new VirtualThread(scheduler, name, characteristics, task);  
} else {  
if (scheduler != null)  
throw new UnsupportedOperationException();  
return new BoundVirtualThread(name, characteristics, task);  
}  
}  
  
/**  
* Returns a builder for creating a virtual {@code Thread} or {@code ThreadFactory}  
* that creates virtual threads.  
*  
* @apiNote The following are examples using the builder:  
* {@snippet :  
* // Start a virtual thread to run a task.  
* Thread thread = Thread.ofVirtual().start(runnable);  
*  
* // A ThreadFactory that creates virtual threads  
* ThreadFactory factory = Thread.ofVirtual().factory();  
* }  
*  
* @return A builder for creating {@code Thread} or {@code ThreadFactory} objects.  
* @throws UnsupportedOperationException if preview features are not enabled  
* @since 19  
*/  
@PreviewFeature(feature = PreviewFeature.Feature.VIRTUAL_THREADS)  
public static Builder.OfVirtual ofVirtual() {  
PreviewFeatures.ensureEnabled();  
return new ThreadBuilders.VirtualThreadBuilder();  
}  
}  
           

Thread.startVirtualThread()建立虛拟線程,會調用ThreadBuilders.newVirtualThread(),最終調用 new VirtualThread()構造器來建立虛拟線程。

從上文我們在介紹虛拟線程建立的 4種方式也可以看出,虛拟線程建立的入口在 Thread 或者 Executors 類中,和以前使用線程或者線程池的習慣保持一緻。

final class VirtualThread extends BaseVirtualThread {  
/**  
* Mounts this virtual thread onto the current platform thread. On  
* return, the current thread is the virtual thread.  
*/  
@ChangesCurrentThread  
private void mount() {  
// sets the carrier thread  
Thread carrier = Thread.currentCarrierThread();  
setCarrierThread(carrier);  
  
// sync up carrier thread interrupt status if needed  
if (interrupted) {  
carrier.setInterrupt();  
} else if (carrier.isInterrupted()) {  
synchronized (interruptLock) {  
// need to recheck interrupt status  
if (!interrupted) {  
carrier.clearInterrupt();  
}  
}  
}  
  
// set Thread.currentThread() to return this virtual thread  
carrier.setCurrentThread(this);  
}  
  
  
/**  
* Unmounts this virtual thread from the carrier. On return, the  
* current thread is the current platform thread.  
*/  
@ChangesCurrentThread  
private void unmount() {  
// set Thread.currentThread() to return the platform thread  
Thread carrier = this.carrierThread;  
carrier.setCurrentThread(carrier);  
  
// break connection to carrier thread, synchronized with interrupt  
synchronized (interruptLock) {  
setCarrierThread(null);  
}  
carrier.clearInterrupt();  
}  
}  
           

mount() 和 unmount() 是虛拟線程兩個核心方法:

  • mount(),可以将此虛拟線程挂載到目前平台線程上,傳回時,目前線程是虛拟線程;
  • unmount(),從載體線程解除安裝此虛拟線程,傳回時,目前線程是平台線程

通過這兩個方式可以看出虛拟線程是搭載在平台線程上運作,運作結束後,從平台線程上解除安裝。

虛拟線程的狀态和轉換

下表總結了虛拟線程中的所有線程狀态以及狀态之間轉化的條件:

狀态 轉換條件
NEW -> STARTED Thread.start
STARTED -> TERMINATED failed to start
STARTED -> RUNNING first run
RUNNING -> PARKING Thread attempts to park
PARKING -> PARKED cont.yield successful, thread is parked
PARKING -> PINNED cont.yield failed, thread is pinned
PARKED -> RUNNABLE unpark or interrupted
PINNED -> RUNNABLE unpark or interrupted
RUNNABLE -> RUNNING continue execution
RUNNING -> YIELDING Thread.yield
YIELDING -> RUNNABLE yield successful
YIELDING -> RUNNING yield failed
RUNNING -> TERMINATED done

3種線程的關系

VirtualThread,Platform Thread,OS Thread 三者的關系如下圖:

千呼萬喚始出來:Java終于釋出了"協程"——虛拟線程!

說明:

在現有的線程模型下,一個 Java線程相當于一個作業系統線程,多個虛拟線程需要挂載在一個平台線程(載體線程)上,每個平台線程和系統線程一一對應。是以,VirtualThread是屬于 JVM級别的線程,由JVM排程,它是非常輕量級的資源,使用完後立即被銷毀,是以就不需要像平台線程一樣使用池化(線程池)。

虛拟線程在執行到 IO 操作或 Blocking操作時,會自動切換到其他虛拟線程執行,進而避免目前線程等待,可以高效通過少數線程去排程大量虛拟線程,最大化提升線程的執行效率。

總結

  • Virtual Thread将會在性能上帶來的巨大提高,不過,目前業界80~90%的代碼還跑在 Java 8上,等 JDK

    19投入實際生産環境,可能需要一個漫長的過程;

  • 虛拟線程高度複用了現有的 Thread線程的功能,友善現有方式平滑遷移到虛拟線程;
  • 虛拟線程是将 Thread作為載體線程,它并沒有改變原來的線程模型;
  • 虛拟線程是 JVM排程的,而不是作業系統排程;
  • 使用虛拟線程可以顯著提高程式吞吐量;
  • 虛拟線程适合 并發任務數量很高 或者 IO密集型的場景,對于 計算密集型任務還需通過過增加CPU核心解決,或者利用分布式計算資源來來解決;
  • 虛拟線程目前隻是一個預覽功能,隻能從源碼和簡單的測試來分析,并無真實生産環境的驗證;

曾一段時間内,JDK一直緻力于 Reactor響應式程式設計,試圖從這條路子來提升 Java的性能,但是最終發現:響應式程式設計難了解,難調試,難使用, 是以又把焦點轉向了同步程式設計,為了改善性能,虛拟線程誕生了。或許虛拟線程很難在短時間内運用到實際生産中,但是通過官方的JDK版本釋出,我們可以看到:盡管是 Oracle這樣的科技型巨頭也會走彎路,了解 JDK的動态,可以幫助我們更好的把握學習 Java的重心以及後面的發展趨勢。

參考

Virtual Thread JEP

java-virtual-threads