天天看點

Java之協程(quasar)

  一、前面我們簡單的說了一下,Python中的協程原理。這裡補充Java的協程實作過程。有需要可以檢視python之協程。

  二、Java協程,其實做Java這麼久我也沒有怎麼聽過Java協程的東西,但是一直有有聽到微線程/協程的概念,這不在學習Python的時候接觸到了協程一詞。然後傳回來去了解Java的協程問題,但是看了很多資料,發現官網以及很多地方都沒有涉及到協程的東西,沒有辦法,隻能通過強大的社群來學習協程的相關東西。

  三、這裡主要關注的是:quasar。

  1)協程的目的:當我們在使用多線程的時候,如果存在長時間的I/O操作。這個時候線程一直處于阻塞狀态,如果線程很多的時候,會存在很多線程處于空閑狀态,造成了資源應用不徹底。相對的協程不一樣了,在單線程中多個任務來回自行如果出現長時間的I/O操作,讓其讓出目前的協程排程,執行下一個任務。當然可能所有任務,全部卡在同一個點上,但是這隻是針對于單線程而言,當所有資料正常傳回時,會同時處理目前的I/O操作。

  2)多線程測試(這裡使用100萬個線程,來測試記憶體占用) 

for (int i = 0; i < 1000000; i++) {
            new Thread(() -> {
                try {
                    Thread.sleep(100000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }          

  結果:

  直接卡死了,記憶體溢出

  

Java之協程(quasar)

  可想而知,如果存在100萬個線程,開銷是有多大。

  3)協程測試

  a、阿裡雲搜尋到的依賴包

    <dependency>
            <groupId>co.paralleluniverse</groupId>
            <artifactId>quasar-core</artifactId>
            <version>0.7.9</version>
            <classifier>jdk8</classifier>
        </dependency>      

  b、測試記憶體占用量

  public static void main(String[] args) throws Exception {
        //使用阻塞隊列來擷取結果。
        LinkedBlockingQueue<Fiber<Integer>> fiberQueue = new LinkedBlockingQueue<>();
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        for (int i = 0; i < 1000000; i++) {
            int finalI = i;
            //這裡的Fiber有點像Callable,可以傳回資料
            Fiber<Integer> fiber = new Fiber<>((SuspendableCallable<Integer>) () -> {
                //這裡用于測試記憶體占用量
                Fiber.sleep(100000);
                System.out.println("in-" + finalI + "-" + LocalDateTime.now().format(formatter));
                return finalI;
            });
            //開始執行
            fiber.start();
            //加入隊列
            fiberQueue.add(fiber);
        }
        while (true) {
            //阻塞
            Fiber<Integer> fiber = fiberQueue.take();
            System.out.println("out-" + fiber.get() + "-" + LocalDateTime.now().format(formatter));
        }
    }      

  堆:

Java之協程(quasar)

   估計:1個G左右。

  記憶體:

Java之協程(quasar)

  估計:1個G左右,也就是每一個fiber占用1Kb左右。

   c、正常測試

  修改一下參數:

public static void main(String[] args) throws Exception {
        //使用阻塞隊列來擷取結果。
        LinkedBlockingQueue<Fiber<Integer>> fiberQueue = new LinkedBlockingQueue<>();
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        for (int i = 0; i < 10; i++) {
            int finalI = i;
            //這裡的Fiber有點像Callable,可以傳回資料
            Fiber<Integer> fiber = new Fiber<>((SuspendableCallable<Integer>) () -> {
                //這裡用于測試記憶體占用量
                Fiber.sleep(1000);
                System.out.println("in-" + finalI + "-" + LocalDateTime.now().format(formatter));
                return finalI;
            });
            //開始執行
            fiber.start();
            //加入隊列
            fiberQueue.add(fiber);
        }
        while (true) {
            //阻塞
            Fiber<Integer> fiber = fiberQueue.take();
            System.out.println("out-" + fiber.get() + "-" + LocalDateTime.now().format(formatter));
        }
    }      
Java之協程(quasar)

  4)可以看出并發的狀态還是很不錯的,當然這隻是多個任務執行而已。

  四、通過上面的測試,可以看出,quasar中Fiber,很像Callable的用法、而且在記憶體占用上面減少了很多,當然,堆的數量确實不少,但是可以接受。

  還是要說明:協程的方式更多用來做I/O密集型的操作。計算密集型的還是使用線程更加合理。

  五、原理,我估麼着看了一下源碼,複雜度很高,這裡不做深究。

  原理參考:次時代Java程式設計(一):Java裡的協程

  1、Quasar裡的Fiber其實是一個continuation,他可以被Quasar定義的scheduler排程,一個continuation記錄着運作執行個體的狀态,而且會被随時中斷,并且也會随後在他被中斷的地方恢複。Quasar其實是通過修改bytecode來達到這個目的,是以運作Quasar程式的時候,你需要先通過java-agent在運作時修改你的代碼,當然也可以在編譯期間這麼幹。golang的内置了自己的排程器,Quasar則預設使用ForkJoinPool這個JDK7以後才有的,具有work-stealing功能的線程池來當排程器。work-stealing非常重要,因為你不清楚哪個Fiber會先執行完,而work-stealing可以動态的從其他的等等隊列偷一個context過來,這樣可以最大化使用CPU資源。

  2、那這裡你會問了,Quasar怎麼知道修改哪些位元組碼呢,其實也很簡單,Quasar會通過java-agent在運作時掃描哪些方法是可以中斷的,同時會在方法被調用前和排程後的方法内插入一些continuation邏輯,如果你在方法上定義了@Suspendable注解,那Quasar會對調用該注解的方法做類似下面的事情。

  3、這裡假設你在方法f上定義了@Suspendable,同時去調用了有同樣注解的方法g,那麼所有調用f的方法會插入一些位元組碼,這些位元組碼的邏輯就是記錄目前Fiber棧上的狀态,以便在未來可以動态的恢複。(Fiber類似線程也有自己的棧)。在suspendable方法鍊内Fiber的父類會調用Fiber.park,這樣會抛出SuspendExecution異常,進而來停止線程的運作,好讓Quasar的排程器執行排程。這裡的SuspendExecution會被Fiber自己捕獲,業務層面上不應該捕獲到。如果Fiber被喚醒了(排程器層面會去調用Fiber.unpark),那麼f會在被中斷的地方重新被調用(這裡Fiber會知道自己在哪裡被中斷),同時會把g的調用結果(g會return結果)插入到f的恢複點,這樣看上去就好像g的return是f的local variables了,進而避免了callback嵌套。

  4、上面啰嗦了一大堆,其實簡單點講就是,想辦法讓運作中的線程棧停下來,好讓Quasar的排程器介入。JVM線程中斷的條件隻有兩個,一個是抛異常,另外一個就是return。這裡Quasar就是通過抛異常的方式來達到的,是以你會看到我上面的代碼會抛出SuspendExecution。但是如果你真捕獲到這個異常,那就說明有問題了,是以一般會這麼寫。

繼續閱讀