一、前面我們簡單的說了一下,Python中的協程原理。這裡補充Java的協程實作過程。有需要可以檢視python之協程。
二、Java協程,其實做Java這麼久我也沒有怎麼聽過Java協程的東西,但是一直有有聽到微線程/協程的概念,這不在學習Python的時候接觸到了協程一詞。然後傳回來去了解Java的協程問題,但是看了很多資料,發現官網以及很多地方都沒有涉及到協程的東西,沒有辦法,隻能通過強大的社群來學習協程的相關東西。
三、這裡主要關注的是:quasar。
1)協程的目的:當我們在使用多線程的時候,如果存在長時間的I/O操作。這個時候線程一直處于阻塞狀态,如果線程很多的時候,會存在很多線程處于空閑狀态,造成了資源應用不徹底。相對的協程不一樣了,在單線程中多個任務來回自行如果出現長時間的I/O操作,讓其讓出目前的協程排程,執行下一個任務。當然可能所有任務,全部卡在同一個點上,但是這隻是針對于單線程而言,當所有資料正常傳回時,會同時處理目前的I/O操作。
2)多線程測試(這裡使用100萬個線程,來測試記憶體占用)
for (int i = 0; i < 1000000; i++) {
new Thread(() -> {
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
結果:
直接卡死了,記憶體溢出
可想而知,如果存在100萬個線程,開銷是有多大。
3)協程測試
a、阿裡雲搜尋到的依賴包
<dependency>
<groupId>co.paralleluniverse</groupId>
<artifactId>quasar-core</artifactId>
<version>0.7.9</version>
<classifier>jdk8</classifier>
</dependency>
b、測試記憶體占用量
public static void main(String[] args) throws Exception {
//使用阻塞隊列來擷取結果。
LinkedBlockingQueue<Fiber<Integer>> fiberQueue = new LinkedBlockingQueue<>();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
for (int i = 0; i < 1000000; i++) {
int finalI = i;
//這裡的Fiber有點像Callable,可以傳回資料
Fiber<Integer> fiber = new Fiber<>((SuspendableCallable<Integer>) () -> {
//這裡用于測試記憶體占用量
Fiber.sleep(100000);
System.out.println("in-" + finalI + "-" + LocalDateTime.now().format(formatter));
return finalI;
});
//開始執行
fiber.start();
//加入隊列
fiberQueue.add(fiber);
}
while (true) {
//阻塞
Fiber<Integer> fiber = fiberQueue.take();
System.out.println("out-" + fiber.get() + "-" + LocalDateTime.now().format(formatter));
}
}
堆:
估計:1個G左右。
記憶體:
估計:1個G左右,也就是每一個fiber占用1Kb左右。
c、正常測試
修改一下參數:
public static void main(String[] args) throws Exception {
//使用阻塞隊列來擷取結果。
LinkedBlockingQueue<Fiber<Integer>> fiberQueue = new LinkedBlockingQueue<>();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
for (int i = 0; i < 10; i++) {
int finalI = i;
//這裡的Fiber有點像Callable,可以傳回資料
Fiber<Integer> fiber = new Fiber<>((SuspendableCallable<Integer>) () -> {
//這裡用于測試記憶體占用量
Fiber.sleep(1000);
System.out.println("in-" + finalI + "-" + LocalDateTime.now().format(formatter));
return finalI;
});
//開始執行
fiber.start();
//加入隊列
fiberQueue.add(fiber);
}
while (true) {
//阻塞
Fiber<Integer> fiber = fiberQueue.take();
System.out.println("out-" + fiber.get() + "-" + LocalDateTime.now().format(formatter));
}
}
4)可以看出并發的狀态還是很不錯的,當然這隻是多個任務執行而已。
四、通過上面的測試,可以看出,quasar中Fiber,很像Callable的用法、而且在記憶體占用上面減少了很多,當然,堆的數量确實不少,但是可以接受。
還是要說明:協程的方式更多用來做I/O密集型的操作。計算密集型的還是使用線程更加合理。
五、原理,我估麼着看了一下源碼,複雜度很高,這裡不做深究。
原理參考:次時代Java程式設計(一):Java裡的協程
1、Quasar裡的Fiber其實是一個continuation,他可以被Quasar定義的scheduler排程,一個continuation記錄着運作執行個體的狀态,而且會被随時中斷,并且也會随後在他被中斷的地方恢複。Quasar其實是通過修改bytecode來達到這個目的,是以運作Quasar程式的時候,你需要先通過java-agent在運作時修改你的代碼,當然也可以在編譯期間這麼幹。golang的内置了自己的排程器,Quasar則預設使用ForkJoinPool這個JDK7以後才有的,具有work-stealing功能的線程池來當排程器。work-stealing非常重要,因為你不清楚哪個Fiber會先執行完,而work-stealing可以動态的從其他的等等隊列偷一個context過來,這樣可以最大化使用CPU資源。
2、那這裡你會問了,Quasar怎麼知道修改哪些位元組碼呢,其實也很簡單,Quasar會通過java-agent在運作時掃描哪些方法是可以中斷的,同時會在方法被調用前和排程後的方法内插入一些continuation邏輯,如果你在方法上定義了@Suspendable注解,那Quasar會對調用該注解的方法做類似下面的事情。
3、這裡假設你在方法f上定義了@Suspendable,同時去調用了有同樣注解的方法g,那麼所有調用f的方法會插入一些位元組碼,這些位元組碼的邏輯就是記錄目前Fiber棧上的狀态,以便在未來可以動态的恢複。(Fiber類似線程也有自己的棧)。在suspendable方法鍊内Fiber的父類會調用Fiber.park,這樣會抛出SuspendExecution異常,進而來停止線程的運作,好讓Quasar的排程器執行排程。這裡的SuspendExecution會被Fiber自己捕獲,業務層面上不應該捕獲到。如果Fiber被喚醒了(排程器層面會去調用Fiber.unpark),那麼f會在被中斷的地方重新被調用(這裡Fiber會知道自己在哪裡被中斷),同時會把g的調用結果(g會return結果)插入到f的恢複點,這樣看上去就好像g的return是f的local variables了,進而避免了callback嵌套。
4、上面啰嗦了一大堆,其實簡單點講就是,想辦法讓運作中的線程棧停下來,好讓Quasar的排程器介入。JVM線程中斷的條件隻有兩個,一個是抛異常,另外一個就是return。這裡Quasar就是通過抛異常的方式來達到的,是以你會看到我上面的代碼會抛出SuspendExecution。但是如果你真捕獲到這個異常,那就說明有問題了,是以一般會這麼寫。