前言
最近遇到一個問題:有一個幾十萬行的資料檔案,需要提取每一行中的特定字段去請求資料。每一次請求資料可能花費100毫秒的時間,這樣的話,如果我們單線程順序讀取檔案,發送請求,時間開銷很大。如何縮短時間呢?自然想到了多線程處理。可惜自己多線程程式寫的并不多,昨晚折騰了許久,終于有所收獲!
單線程處理
直接看代碼吧:
<a href="http://s3.51cto.com/wyfs02/M01/70/C0/wKioL1W9flSCpKdqAAFiH8w4feg489.jpg" target="_blank"></a>
為了友善,用Thread.sleep的方式進行模拟發送請求擷取資料。
運作結果如下:
單線程讀取檔案并發請求耗時:506047
在單線程處理的情況下,僅僅5000多條資料,就花費了8分多鐘!
隊列+多線程處理
思路:
用一個線程A去讀取檔案,将每一行放入到隊列Q中;
開啟多個線程B去同步的讀取隊列Q中的資料并發送請求;
線程A和線程B之間通過隊列Q進行了互動,隊列Q應該是阻塞隊列;
線程A的結束,很好說,就是檔案讀取完畢;那麼線程B什麼時候結束呢?而且線程B是一組線程,又如何確定他們都正常結束呢?
下面我們先看看線程A的代碼吧:
<a href="http://s3.51cto.com/wyfs02/M02/70/C0/wKioL1W9g42ymrYcAAIsgS8ufGo972.jpg" target="_blank"></a>
線程A是一個讀取檔案形成隊列Q的任務,注意下面幾點:
構造方法中,隊列是外部傳入的
定義了一個public屬性END,在檔案讀取最後被放入到了隊列中,這是為了通知線程B如果讀到了END就可以結束了
放入隊列中的方法用的是阻塞式的put
我們在看看線程B的代碼:
首先看看屬性定義以及構造方法
<a href="http://s3.51cto.com/wyfs02/M00/70/C4/wKiom1W9g52QJvXEAADdQvnE6TI311.jpg" target="_blank"></a>
同樣的,隊列是由外部傳入構造方法;為了友善,給每一個線程B一個标示。
我們重點關注下線程B的run方法:
<a href="http://s3.51cto.com/wyfs02/M00/70/C1/wKioL1W9hijz2dUoAAHqWTVDo50480.jpg" target="_blank"></a>
說明:
首先是一個while循環,如果讀取到了隊列中的END标示,那麼while結束,線程結束
這裡取出隊首的方法是阻塞式的take
由于隊列中的一行,應該隻能被一個線程B處理,是以加鎖處理取出隊首的過程
如果一個線程B讀取到了END标示,由于END是讀取并删除了,為了其他線程B盡快結束,應該将END繼續放入到隊列中
主程式部分:
<a href="http://s3.51cto.com/wyfs02/M02/70/C1/wKioL1W9iBGDfOKoAAILDd0fSJg242.jpg" target="_blank"></a>
在主程式部分,我們定義了阻塞隊列及其大小,啟動一個線程A,以及多個線程B。
運作結果:
線程[13] 處理167
線程[22] 處理167
線程[28] 處理167
線程[1] 處理167
線程[29] 處理167
線程[27] 處理167
線程[25] 處理167
線程[21] 處理167
線程[18] 處理167
線程[20] 處理167
線程[0] 處理168
線程[4] 處理168
線程[19] 處理168
線程[7] 處理168
線程[15] 處理168
線程[23] 處理168
線程[14] 處理168
線程[3] 處理168
線程[17] 處理168
線程[8] 處理168
線程[5] 處理168
線程[2] 處理168
線程[6] 處理168
線程[24] 處理168
線程[12] 處理168
線程[9] 處理168
線程[11] 處理168
線程[16] 處理168
線程[26] 處理168
線程[10] 處理168
隊列 + 多線程同步 讀取檔案并發請求耗時:16921
可以發現,現在5000多行檔案的處理,從原來的8分多鐘,變成了現在的16S,多線程實在是太恐怖了,哈哈!
本文轉自zfz_linux_boy 51CTO部落格,原文連結:http://blog.51cto.com/zhangfengzhe/1680939,如需轉載請自行聯系原作者