天天看點

并發程式設計 Promise, Future 和 Callback

在并發程式設計中,我們通常會用到一組非阻塞的模型:promise,future 和 callback。其中的 future 表示一個可能還沒有實際完成的異步任務的結果,針對這個結果可以添加 callback 以便在任務執行成功或失敗後做出對應的操作,而 promise 交由任務執行者,任務執行者通過 promise 可以标記任務完成或者失敗。 可以說這一套模型是很多異步非阻塞架構的基礎。

這一套經典的模型在 scala、c# 中得到了原生的支援,但 jdk 中暫時還隻有無 callback 的 future 出現,當然也并非在 java 界就沒有發展了,比如 guava 就提供了listenablefuture 接口,而 netty 4+ 更是提供了完整的 promise、future 和 listener 機制,在 netty 的官方文檔 using as a generic library 中也介紹了将 netty 作為一個 lib 包依賴,并且使用 listenable futures 的示例。在實際的項目使用中,發現 netty 的 eventloop 機制不一定适用其他場景,是以想去除對 eventloop 的依賴,實作一個簡化版本。

參考 scala 和 netty 的代碼重新定義了接口和實作,先介紹下和 netty 版本的差別:

去除了對 eventloop 的依賴,callback 的執行政策不同:任務未完成時添加的 callback,會在結束任務的線程執行;任務完成後添加的 callback 會在添加 callback 線程立即執行

一個 callback 執行後會立即被清理

callback 可以根據任務結果添加,支援添加以下三種 callback: oncomplete, onsuccess, onfailure, 不需要和 netty 的 futurelistener 一樣大部分場景下都需要檢查 future.issuccess 等

支援 callback 的組合,callback 包含一些函數式的方法,比如 compose 和 andthen 可以用來組合

使用 countdownlatch 替換掉了 netty 的 wait/notify 實作

去掉 netty future 一些不常使用的方法,同時補充一些模型間關聯的方法,比如 promise.getfuture

然後再介紹幾個使用這個 commons-future 的示例:

異步執行任務,獲得 future 後添加 callback

<code>01</code>

<code>final</code> <code>taskpromise promise =</code><code>new</code> <code>defaulttaskpromise();</code>

<code>02</code>

<code>final</code> <code>taskfuture future = promise.getfuture();</code>

<code>03</code>

<code>final</code> <code>countdownlatch latch =</code><code>new</code> <code>countdownlatch(</code><code>1</code><code>);</code>

<code>04</code>

<code>future.oncomplete(</code><code>new</code> <code>taskcallback() {</code><code>// 添加結束 callback</code>

<code>05</code>

<code>    </code><code>@override</code>

<code>06</code>

<code>    </code><code>public</code> <code>taskfuture apply(taskfuture f) {</code>

<code>07</code>

<code>        </code><code>latch.countdown();</code>

<code>08</code>

<code>        </code><code>return</code> <code>f;</code>

<code>09</code>

<code>    </code><code>}</code>

<code>10</code>

<code>});</code>

<code>11</code>

<code>new</code> <code>thread(</code><code>new</code> <code>runnable() {</code>

<code>12</code>

<code>13</code>

<code>    </code><code>public</code> <code>void</code> <code>run() {</code>

<code>14</code>

<code>        </code><code>promise.setsuccess(</code><code>null</code><code>);</code>

<code>15</code>

<code>16</code>

<code>}).start();</code>

<code>17</code>

<code>latch.await();</code>

異步執行任務,獲得 future 後添加成功結束的 callback

<code>future.onsuccess(</code><code>new</code> <code>taskcallback() {</code><code>// 添加成功結束 callback</code>

異步執行任務,獲得 future 後,添加失敗結束的組合 callback

<code>final</code> <code>countdownlatch latch =</code><code>new</code> <code>countdownlatch(</code><code>2</code><code>);</code>

<code>future.onfailure(</code><code>new</code> <code>taskcallback() {</code>

<code>}.andthen(</code><code>new</code> <code>taskcallback() {</code>

<code>    </code><code>public</code> <code>taskfuture apply(taskfuture f2) {</code>

<code>        </code><code>return</code> <code>f2;</code>

<code>}));</code>

<code>18</code>

<code>19</code>

<code>20</code>

<code>        </code><code>promise.setfailure(</code><code>new</code> <code>illegalstateexception(</code><code>"cm"</code><code>));</code>

<code>21</code>

<code>22</code>

<code>23</code>

異步執行任務,獲得 future 後阻塞等待任務完成

<code>        </code><code>try</code> <code>{</code>

<code>            </code><code>timeunit.seconds.sleep(</code><code>2</code><code>);</code>

<code>        </code><code>}</code><code>catch</code> <code>(interruptedexception e) {</code>

<code>        </code><code>}</code>

<code>future.await();</code>