天天看點

隊列在多線程中的應用

前言

最近遇到一個問題:有一個幾十萬行的資料檔案,需要提取每一行中的特定字段去請求資料。每一次請求資料可能花費100毫秒的時間,這樣的話,如果我們單線程順序讀取檔案,發送請求,時間開銷很大。如何縮短時間呢?自然想到了多線程處理。可惜自己多線程程式寫的并不多,昨晚折騰了許久,終于有所收獲!

單線程處理

直接看代碼吧:

<a href="http://s3.51cto.com/wyfs02/M01/70/C0/wKioL1W9flSCpKdqAAFiH8w4feg489.jpg" target="_blank"></a>

為了友善,用Thread.sleep的方式進行模拟發送請求擷取資料。

運作結果如下:

單線程讀取檔案并發請求耗時:506047

在單線程處理的情況下,僅僅5000多條資料,就花費了8分多鐘!

隊列+多線程處理

思路:

用一個線程A去讀取檔案,将每一行放入到隊列Q中;

開啟多個線程B去同步的讀取隊列Q中的資料并發送請求;

線程A和線程B之間通過隊列Q進行了互動,隊列Q應該是阻塞隊列;

線程A的結束,很好說,就是檔案讀取完畢;那麼線程B什麼時候結束呢?而且線程B是一組線程,又如何確定他們都正常結束呢?

下面我們先看看線程A的代碼吧:

<a href="http://s3.51cto.com/wyfs02/M02/70/C0/wKioL1W9g42ymrYcAAIsgS8ufGo972.jpg" target="_blank"></a>

線程A是一個讀取檔案形成隊列Q的任務,注意下面幾點:

構造方法中,隊列是外部傳入的

定義了一個public屬性END,在檔案讀取最後被放入到了隊列中,這是為了通知線程B如果讀到了END就可以結束了

放入隊列中的方法用的是阻塞式的put

我們在看看線程B的代碼:

首先看看屬性定義以及構造方法

<a href="http://s3.51cto.com/wyfs02/M00/70/C4/wKiom1W9g52QJvXEAADdQvnE6TI311.jpg" target="_blank"></a>

同樣的,隊列是由外部傳入構造方法;為了友善,給每一個線程B一個标示。

我們重點關注下線程B的run方法:

<a href="http://s3.51cto.com/wyfs02/M00/70/C1/wKioL1W9hijz2dUoAAHqWTVDo50480.jpg" target="_blank"></a>

說明:

首先是一個while循環,如果讀取到了隊列中的END标示,那麼while結束,線程結束

這裡取出隊首的方法是阻塞式的take

由于隊列中的一行,應該隻能被一個線程B處理,是以加鎖處理取出隊首的過程

如果一個線程B讀取到了END标示,由于END是讀取并删除了,為了其他線程B盡快結束,應該将END繼續放入到隊列中

主程式部分:

<a href="http://s3.51cto.com/wyfs02/M02/70/C1/wKioL1W9iBGDfOKoAAILDd0fSJg242.jpg" target="_blank"></a>

在主程式部分,我們定義了阻塞隊列及其大小,啟動一個線程A,以及多個線程B。

運作結果:

線程[13] 處理167

線程[22] 處理167

線程[28] 處理167

線程[1] 處理167

線程[29] 處理167

線程[27] 處理167

線程[25] 處理167

線程[21] 處理167

線程[18] 處理167

線程[20] 處理167

線程[0] 處理168

線程[4] 處理168

線程[19] 處理168

線程[7] 處理168

線程[15] 處理168

線程[23] 處理168

線程[14] 處理168

線程[3] 處理168

線程[17] 處理168

線程[8] 處理168

線程[5] 處理168

線程[2] 處理168

線程[6] 處理168

線程[24] 處理168

線程[12] 處理168

線程[9] 處理168

線程[11] 處理168

線程[16] 處理168

線程[26] 處理168

線程[10] 處理168

隊列 + 多線程同步 讀取檔案并發請求耗時:16921

可以發現,現在5000多行檔案的處理,從原來的8分多鐘,變成了現在的16S,多線程實在是太恐怖了,哈哈!

本文轉自zfz_linux_boy 51CTO部落格,原文連結:http://blog.51cto.com/zhangfengzhe/1680939,如需轉載請自行聯系原作者

繼續閱讀