前言
最近遇到一个问题:有一个几十万行的数据文件,需要提取每一行中的特定字段去请求数据。每一次请求数据可能花费100毫秒的时间,这样的话,如果我们单线程顺序读取文件,发送请求,时间开销很大。如何缩短时间呢?自然想到了多线程处理。可惜自己多线程程序写的并不多,昨晚折腾了许久,终于有所收获!
单线程处理
直接看代码吧:
<a href="http://s3.51cto.com/wyfs02/M01/70/C0/wKioL1W9flSCpKdqAAFiH8w4feg489.jpg" target="_blank"></a>
为了方便,用Thread.sleep的方式进行模拟发送请求获取数据。
运行结果如下:
单线程读取文件并发请求耗时:506047
在单线程处理的情况下,仅仅5000多条数据,就花费了8分多钟!
队列+多线程处理
思路:
用一个线程A去读取文件,将每一行放入到队列Q中;
开启多个线程B去同步的读取队列Q中的数据并发送请求;
线程A和线程B之间通过队列Q进行了交互,队列Q应该是阻塞队列;
线程A的结束,很好说,就是文件读取完毕;那么线程B什么时候结束呢?而且线程B是一组线程,又如何确保他们都正常结束呢?
下面我们先看看线程A的代码吧:
<a href="http://s3.51cto.com/wyfs02/M02/70/C0/wKioL1W9g42ymrYcAAIsgS8ufGo972.jpg" target="_blank"></a>
线程A是一个读取文件形成队列Q的任务,注意下面几点:
构造方法中,队列是外部传入的
定义了一个public属性END,在文件读取最后被放入到了队列中,这是为了通知线程B如果读到了END就可以结束了
放入队列中的方法用的是阻塞式的put
我们在看看线程B的代码:
首先看看属性定义以及构造方法
<a href="http://s3.51cto.com/wyfs02/M00/70/C4/wKiom1W9g52QJvXEAADdQvnE6TI311.jpg" target="_blank"></a>
同样的,队列是由外部传入构造方法;为了方便,给每一个线程B一个标示。
我们重点关注下线程B的run方法:
<a href="http://s3.51cto.com/wyfs02/M00/70/C1/wKioL1W9hijz2dUoAAHqWTVDo50480.jpg" target="_blank"></a>
说明:
首先是一个while循环,如果读取到了队列中的END标示,那么while结束,线程结束
这里取出队首的方法是阻塞式的take
由于队列中的一行,应该只能被一个线程B处理,因此加锁处理取出队首的过程
如果一个线程B读取到了END标示,由于END是读取并删除了,为了其他线程B尽快结束,应该将END继续放入到队列中
主程序部分:
<a href="http://s3.51cto.com/wyfs02/M02/70/C1/wKioL1W9iBGDfOKoAAILDd0fSJg242.jpg" target="_blank"></a>
在主程序部分,我们定义了阻塞队列及其大小,启动一个线程A,以及多个线程B。
运行结果:
线程[13] 处理167
线程[22] 处理167
线程[28] 处理167
线程[1] 处理167
线程[29] 处理167
线程[27] 处理167
线程[25] 处理167
线程[21] 处理167
线程[18] 处理167
线程[20] 处理167
线程[0] 处理168
线程[4] 处理168
线程[19] 处理168
线程[7] 处理168
线程[15] 处理168
线程[23] 处理168
线程[14] 处理168
线程[3] 处理168
线程[17] 处理168
线程[8] 处理168
线程[5] 处理168
线程[2] 处理168
线程[6] 处理168
线程[24] 处理168
线程[12] 处理168
线程[9] 处理168
线程[11] 处理168
线程[16] 处理168
线程[26] 处理168
线程[10] 处理168
队列 + 多线程同步 读取文件并发请求耗时:16921
可以发现,现在5000多行文件的处理,从原来的8分多钟,变成了现在的16S,多线程实在是太恐怖了,哈哈!
本文转自zfz_linux_boy 51CTO博客,原文链接:http://blog.51cto.com/zhangfengzhe/1680939,如需转载请自行联系原作者