天天看點

哪個線程執行 CompletableFuture’s tasks 和 callbacks?

這是api的基礎部分,它有一個很實用的supplyasync()方法,這個方法和executorservice.submit()很像,但不同的是傳回completablefuture:

<code>1</code>

<code>completablefuture.supplyasync(() -&gt; {</code>

<code>2</code>

<code>3</code>

<code>                </code><code>log.info(</code><code>"downloading"</code><code>);</code>

<code>4</code>

<code>                </code><code>return</code> <code>ioutils.tostring(is, standardcharsets.utf_8);</code>

<code>5</code>

<code>            </code><code>} </code><code>catch</code> <code>(ioexception e) {</code>

<code>6</code>

<code>                </code><code>throw</code> <code>new</code> <code>runtimeexception(e);</code>

<code>7</code>

<code>            </code><code>}</code>

<code>8</code>

<code>        </code><code>});</code>

<code>executorservice pool = executors.newfixedthreadpool(</code><code>10</code><code>);</code>

<code>final</code> <code>completablefuture future =</code>

<code>        </code><code>completablefuture.supplyasync(() -&gt; {</code>

<code>            </code><code>//...</code>

<code>        </code><code>}, pool);</code>

這僅僅是開始…

假如你想轉換給定的completablefuture,例如提取string的長度:

<code>completablefuture intfuture =</code>

<code>    </code><code>future.thenapply(s -&gt; s.length());</code>

那麼是誰調用了<code>s.length()?坦白點,我一點也不在乎。隻要涉及到lambda表達式,那麼所有的執行者像thenapply這樣的就是廉價的,我們并不關心是誰調用了lambda表達式。但如果這樣的表達式會占用一點點的cpu來完成阻塞的網絡通信那又會如何呢?</code>

首先預設情況下會發生什麼?試想一下:我們有一個傳回string類型的背景任務,當結果完成時我們想要異步地去執行特定的變換。最容易的實作方法是通過包裝一個原始的任務(傳回string),任務完成時截獲它。當内部的task結束後,回調就開始執行,執行變換和傳回改進的值。就像有一個面介于我們的代碼和初始的計算結果之間(個人看法:這裡指的是下面的future裡面包含的task執行完畢傳回結果s,然後立馬執行callback也就是thenapply裡面的lambda表達式,這也就是為什麼作者說有一個面位于初始計算結果和回調執行代碼之間)。那就是說這應該相當明顯了,s.length()的變換會在和執行原始任務相同的線程裡完成,哈?并不完全是這樣!(這裡指的是有時候變換的線程和執行原始任務的線程不是同一個線程,看下面就知道)

<code>01</code>

<code>completablefuture future =</code>

<code>02</code>

<code>03</code>

<code>            </code><code>sleepseconds(</code><code>2</code><code>);</code>

<code>04</code>

<code>            </code><code>return</code> <code>"abc"</code><code>;</code>

<code>05</code>

<code>06</code>

<code>07</code>

<code>future.thenapply(s -&gt; {</code>

<code>08</code>

<code>    </code><code>log.info(</code><code>"first transformation"</code><code>);</code>

<code>09</code>

<code>    </code><code>return</code> <code>s.length();</code>

<code>10</code>

<code>});</code>

<code>11</code>

<code>12</code>

<code>future.get();</code>

<code>13</code>

<code>pool.shutdownnow();</code>

<code>14</code>

<code>pool.awaittermination(</code><code>1</code><code>, timeunit.minutes);</code>

<code>15</code>

<code>16</code>

<code>17</code>

<code>    </code><code>log.info(</code><code>"second transformation"</code><code>);</code>

<code>18</code>

<code>19</code>

如果future裡面的task還在運作,那麼包含first transformation的 thenapply()就會一直處于挂起狀态。而這個task完成後thenapply()會立即執行,執行的線程和執行task的線程是同一個。然而在注冊第二次變換之前(也就是執行第二個thenapply()),我們将一直等待直到task完成(和第一個變換是一樣的,都需要等待)。更壞的情況是,我們完全地關閉了線程池,保證其他的代碼将不會執行。那麼哪個線程将要執行二次變換呢?我們都知道當注冊了callback的future完成時,二次變換必定會立刻執行。這就是說它是使用預設的主線程(來完成callback),上面的代碼輸出如下:

pool-1-thread-1 | first transformation      main | second transformation

二次變換在注冊的時候就意識到completablefuture已經完成了(指的是future裡面的task已經傳回結果,其實在第一次調用thenapply()之前就已經傳回了,是以這一次不用等待task),是以它立刻執行了變換。由于此時已經沒有其他的線程,是以thenapply()就隻能在目前的main線程環境中被調用。最主要的原因還是因為這種行為機制在實際的變換成本很高時(如很耗時)很容易出錯。想象一下thenapply()内部的lambda表達式在進行一些繁重的計算或者阻塞的網絡調用,突然我們的異步 <code>completablefuture阻塞了調用者線程!</code>

有兩種技術去控制執行回調和變換的線程,需要注意的是這些方法僅僅适用你的變換需要很高成本的時候,其他情況下可以忽略。那麼第一個方法可以選擇使用操作者的 <code>*async方法,例如:</code>

<code>future.thenapplyasync(s -&gt; {</code>

這一次second transformation被自動地解除安裝到了我們的老朋友線程forkjoinpool.commonpool()中去了:

<code>pool-</code><code>1</code><code>-thread-</code><code>1</code>                  <code>| first transformation</code>

<code>forkjoinpool.commonpool-worker-</code><code>1</code> <code>| second transformation</code>

但我們并不喜歡commonpool,是以我們提供自己的:

<code>}, pool2);</code>

注意到這裡使用的是不同的線程池(pool-1 vs. pool-2):

<code>pool-</code><code>1</code><code>-thread-</code><code>1</code> <code>| first transformation</code>

<code>pool-</code><code>2</code><code>-thread-</code><code>1</code> <code>| second transformation</code>

我相信如果你在處理一些長時間運作的callbacks和transformations上有些麻煩(記住這篇文章同樣也适用于completablefuture的其他大部分方法),你應該簡單地使用其他表意明确的completablefuture,就像這樣:

<code>//imagine this is slow and costly</code>

<code>completablefuture&lt;integer&gt; strlen(string s) {</code>

<code>    </code><code>return</code> <code>completablefuture.supplyasync(</code>

<code>            </code><code>() -&gt; s.length(),</code>

<code>            </code><code>pool2);</code>

<code>}</code>

<code>//...</code>

<code>completablefuture&lt;integer&gt; intfuture =</code>

<code>        </code><code>future.thencompose(s -&gt; strlen(s));</code>

這種方法更加明确,知道我們的變換有很大的開銷,我們不會将它運作在一些随意的不可控的線程上。取而代之的是我們會将string到completablefuture&lt;integer&gt;的變換封裝為一個異步操作。然而,我們必須用thencompose()取代thenapply(),否則的話我們會得到completablefuture&lt;completablefuture&lt;integer&gt;&gt;.

但如果我們的transformation 沒有一個能夠很好地處理嵌套completablefuture的形式怎麼辦,如applytoeither()會等待第一個<code>future完成然後執行transformation.</code>

<code>completablefuture&lt;completablefuture&lt;integer&gt;&gt; poor =</code>

<code>        </code><code>future1.applytoeither(future2, s -&gt; strlen(s));</code>

這裡有個很實用的技巧,用來“展開”這類難以了解的資料結構,這種技巧叫flatten,通過使用<code>flatmap(identity)</code> (or <code>flatmap(x -&gt; x)</code>)。在我們的例子中flatmap()就叫做thencompose:

<code>completablefuture&lt;integer&gt; good =</code>

<code>        </code><code>poor.thencompose(x -&gt; x);</code>

我把它留給你,去弄懂它是怎樣和為什麼這樣工作的。我想這篇文章已經盡量清楚地闡述了線程是如何參與到completablefuture中去的。