天天看点

python实现多线程下载_多线程分段下载研究的python实现(一)

我一直对下载文件比较感兴趣。现在我下载文件大部分是用迅雷,但迅雷也有一些不如意的地方,内存占用大,一些不必要的功能太多,不可定制。尤其是最后一点。现在有些下载对useragent,cookie,auth等要求很多,迅雷这时就无能为力了。最好能有一个在后台运行的download server,以较小的内存代价运行,界面通过访问本地的端口显示。下面对其中可能涉及的一些技术进行探讨。在python下实现下载比较容易。以下载http://nodejs.org/dist/v0.10.12/node.exe为例:

importurllib2

urllib2.urlopen('http://nodejs.org/dist/v0.10.12/node.exe')

但在网速不给力的情况下,这样下载就会花很长时间,分段下载是个可行的方法。

首先要得到文件的大小:

importhttpliubdef_getinfo(host,path):

conn=httplib.HTTPConnection(host)

conn.request('HEAD',path)

res=conn.getresponse()if res.status==httplib.OK:

result=res.getheader('content-length')

conn.close()return int(result)

然后分段下载:

CHUNK=16*1024

def_getpart(host,path,fname,start,end):

current=start

headers={'Range':'bytes=%s-%s' %(start, end)}

f=open(fname,'wb')

conn=httplib.HTTPConnection(host)

conn.request('GET',path,'',headers)print 'No. %d begin to download range bytes=%s-%s' %(start, end)

res=conn.getresponse()whileTrue :

chunk=res.read(CHUNK)if notchunk:

conn.close()

f.close()breakf.seek(current)

f.write(chunk)print 'No. %d write at %d'%(pid,current)

current=current+CHUNK

采取res.read(CHUNK)的方式可以减少内存的占用。

分段下载没有问题,但问题是依次分段下载根本没有解决问题,只有同时分段下载才能提高网速。“同时”意味着多线程,复杂的锁,资源分配实非我辈菜鸟所能掌握的。好在有大神给出了解决方案,用gevent。gevent引入了协程的概念,程序实际上要单线程运行,但在程序内部不停地切换,实现多线程的效果。以较低的代价实现多线程,又叫协程。这与nodejs较相似。但nodejs的学习难度太大,只好用gevent了。

说到gevent,我不得不吐糟一下,资料太混乱了。主页面上的是0.13.8,实际上在github上是1.0rc2。文档中介绍的是0.13.8的,1.0rc2变化巨大,与文档根本对不上。只能一点一点摸索。

引入gevent后的整个下载是这样的。

temp=urlparse.urlparse(url)

size=_getinfo(temp.netloc,temp.path)

num=20t=size//num

p=gevent.pool(15)

fname=url.split('/')[-1]for i inrange(num):

start=t*i

end=start+t-1

if i==num-1 :end=size

job=gevent.spawn(_getpart,temp.netloc,temp.path,fname,start,end)

p.add(job)

p.join()print 'downloaded'

同时也要对_getpart进行修改

def _getpart(self,pid,host,path,fname,start,end,**kwargs):

monkey.patch_socket()

current=start

headers={'Range':'bytes=%s-%s' %(start, end)}

f=self._open(fname)

conn=self._getconn(host)

conn.request('GET',path,'',headers)print 'No. %d begin to download range bytes=%s-%s' %(pid ,start, end)

gevent.sleep(0)

res=conn.getresponse()whileTrue :

gevent.sleep(0)

chunk=res.read(self.CHUNK)if notchunk:

self._returnconn(host,conn)

self._close(fname)print 'No. %d download range bytes=%s-%s successed' %(pid ,start, end)breakf.seek(current)

f.write(chunk)print 'No. %d write at %d'%(pid,current)

current=current+self.CHUNK

在凡是需要等待的地方加入gevent.sleep(0),这相当于通知系统这里要等待,可以干别的了。但这这里有个问题,在没有patch(加入monkey.patch_socket())时,

所有的协程总是以固定的顺序写入,并不像是并行运行。只有patch后才像是并行。找了许多文档也没有解释清楚。

这样就行了吗,还不行。同一文件被反复打开,关闭。如何解决,引入计数。就是这样:

#文件句柄记录,{文件名:[文件句柄,引用计数]}

self._filehandle={}

def_open(self,fn):if fn inself._filehandle:

self._filehandle[fn][1]+=1

returnself._filehandle[fn][0]else:

fp=open(fn,'wb')

self._filehandle[fn]=[fp,1]returnfpdef_close(self,fn):

self._filehandle[fn][1]-=1

if self._filehandle[fn][1] ==0:

self._filehandle[fn][0].close()del self._filehandle[fn]

以后打开文件时调用_open(),关闭时调用_close()。这样避免了重复打开,重复关闭。

再改进下吧,connection也可以重用。下载时大量的联接都是针对同一网站,重用可以加快下载。策略是req完成后,联接并不马上关闭,而是保存的“联接池”中,当需用联接时,先在联接池中查找,有就重用,没有就先新建一条联接。没用的联接在经过一段时间再关闭。所以联接池并不对联接数量进行控制。

既然是延迟关闭,那么就用到了timer,python自带的timer就可以啊。最终还是没用,因为timer是threading下的,属新开线程,消耗太大。另外仅管timer在文档中说是class但实际上根本不是,坑爹啊,是function,_Timer才是class。

gevent的timer也不太好用,文档没有,不行自造一个吧。要求:在固定时隔内运行一系列函数,函数可动态增减,定时器可开关。,结果就是这个,这是网上抄来的,向大侠表示感谢。

classTimer(object):def __init__(self,interval):

self._interval=interval

self._callbacks={}

self._app=None

self._flag=Falsedefstart(self):

self._flag=Trueif self._app==None:

self._app=gevent.spawn( self._loop )defstop(self):

self._flag=False

self._app==Nonedef_loop( self ):while notself._flag:for callback inself._callbacks.value():

callback()

gevent.sleep( self.interval )defstatus(self):returnself._flagdef add(self, funcname,func, *args, **kwargs ):if not funcname inself._callbacks:

self._callbacks[ funcname ]= callback(func, *args, **kwargs)defremove(funcname):if funcname inself._callbacks:del self._callbackes[funcname]

严格来说这个定时器还是有些隐患,因为在_callbacks中的所有函数执行完成后,才开始计时,如果执行时间较长,或有异常就会出问题。上面提到的callback是对函数的封装,也是向同一位大侠抄来的:

classcallback:"""可执行对象"""

def __init__( self, func, *args, **kwargs ):

self.func=func

self.args=list(args)

self.kwargs=kwargsdef __call__( self, *args, **kwargs ):for k inself.kwargs:if notkwargs.has_key(k):

kwargs[k]=self.kwargs[k]

args= self.args +list(args)return self.func( *args, **kwargs )def __str__( self ):return ""%(str(self.func),str(self.args),str(self.kwargs))

这样复用联接就好办了

def_getconn(self,host):

result=Nonefor item inself._conn:if item[0]==host:

result=item[1]

self._conn.remove(item)break

if result==None:

result=httplib.HTTPConnection(host)

result.connect()returnresultdef_returnconn(self,host,conn):

self._conn.append([host,conn,time.time()])

self._timer.add('closeconn',self._closeconn)def_closeconn(self):

limit=time.time()-self.CONNDELAY

temp=[]for item inself._conn:if item[2]

item.close()else:

temp.append(item)

self._conn=tempif len(self._conn)==0:

self._timer.remove('closeconn')

三个函数分别是取联接,归还联接,及定时器中的关闭联接。这样还不行,有的重用联接还有抛出异常,responsenotready。问题出在第一个head request上,request一定要read只后联接才能重用。

def _getinfo(self,host,path, **kwargs):

conn=self._getconn(host)

conn.request('HEAD',path)

res=conn.getresponse()res.read()if res.status==httplib.OK:

result=res.getheader('content-length')

self._returnconn(host,conn)return int(result)

最后再把整个封装成一个类,可以随时增加下载。下面是完整的代码:

1 from gevent importmonkey2 from gevent.pool importPool3 from gevent importgreenlet4 importurlparse,httplib,gevent,time5 from gevent.core importtimer6

7 #封装callback

8 classcallback:9

12 def __init__( self, func, *args, **kwargs ):13 self.func =func14 self.args =list(args)15 self.kwargs =kwargs16

17 def __call__( self, *args, **kwargs ):18 for k inself.kwargs:19 if notkwargs.has_key(k):20 kwargs[k] =self.kwargs[k]21 args = self.args +list(args)22 return self.func( *args, **kwargs )23

24 def __str__( self ):25 return ""%(str(self.func),str(self.args),str(self.kwargs))26

27 #定时器类

28 classTimer(object):29

30 def __init__(self,interval):31 self._interval=interval32 self._callbacks ={}33 self._app=None34 self._flag=False35

36 defstart(self):37 self._flag=True38 if self._app==None:39 self._app =gevent.spawn( self._loop )40

41

42 defstop(self):43 self._flag=False44 self._app==None45

46 def_loop( self ):47 while notself._flag:48 for callback inself._callbacks.value():49 callback()50 gevent.sleep( self.interval )51

52 defstatus(self):53 returnself._flag54

55 def add(self, funcname,func, *args, **kwargs ):56 if not funcname inself._callbacks:57 self._callbacks[ funcname ]= callback(func, *args, **kwargs)58

59 defremove(funcname):60 if funcname inself._callbacks:61 delself._callbackes[funcname]62

63 #服务器类,

64 classserver(object):65

66 def __init__(self):67

68 #读写块大小

69 self.CHUNK=16*1024

70 #下载协程池大小

71 self.POOLSIZE=15

72 #下载超时(S)

73 self.TIMEOUT=10

74 #下载重试次数

75 self.RETRY=10

76 #连接空闲后,关闭前的时间

77 self.CONNDELAY=3

78 #计时器扫描间隔

79 self.INTERVAL=1.0

80 #文件句柄记录,{文件名:[文件句柄,引用计数]}

81 self._filehandle={}82 #联接记录,[host,conn,time.time()]

83 self._conn=[]84 self._pool=Pool(self.POOLSIZE)85 self._timer=Timer(self.INTERVAL)86

87

88 #建立联接

89 def_getconn(self,host):90 result=None91 for item inself._conn:92 if item[0]==host:93 result=item[1]94 self._conn.remove(item)95 break

96 if result==None:97 result=httplib.HTTPConnection(host)98 result.connect()99 returnresult100

101 #将富余连接置入self._conn

102 def_returnconn(self,host,conn):103 self._conn.append([host,conn,time.time()])104 self._timer.add('closeconn',self._closeconn)105

106 #关闭超时连接

107 def_closeconn(self):108 limit=time.time()-self.CONNDELAY109 temp=[]110 for item inself._conn:111 if item[2]<112 item.close else:114 temp.append self._conn="temp116" if len self._timer.remove>

119

120 #记录文件句柄

121 def_open(self,fn):122 if fn inself._filehandle:123 self._filehandle[fn][1]+=1

124 returnself._filehandle[fn][0]125 else:126 fp=open(fn,'wb')127 self._filehandle[fn]=[fp,1]128 returnfp129

130 #按引用计数关闭文件句柄

131 def_close(self,fn):132 self._filehandle[fn][1]-=1

133 if self._filehandle[fn][1] ==0:134 self._filehandle[fn][0].close()135 delself._filehandle[fn]136

137 #取得下载文件信息

138 def _getinfo(self,host,path, **kwargs):139 conn=self._getconn(host)140 conn.request('HEAD',path)141 res=conn.getresponse()142 res.read()144 if res.status==httplib.OK:145 result=res.getheader('content-length')146 self._returnconn(host,conn)147 returnint(result)148

149 #分段下载

150 def _getpart(self,pid,host,path,fname,start,end,**kwargs):151 monkey.patch_socket()152 current=start153 headers={'Range':'bytes=%s-%s' %(start, end)}154 f=self._open(fname)155 conn=self._getconn(host)156 conn.request('GET',path,'',headers)157 print 'No. %d begin to download range bytes=%s-%s' %(pid ,start, end)158 gevent.sleep(0)159 res=conn.getresponse()160 whileTrue :161 gevent.sleep(0)162 chunk=res.read(self.CHUNK)163

164 if notchunk:165 self._returnconn(host,conn)166 self._close(fname)167 print 'No. %d download range bytes=%s-%s successed' %(pid ,start, end)168 break

169 f.seek(current)170 f.write(chunk)171 print 'No. %d write at %d'%(pid,current)172 current=current+self.CHUNK173

174 #添加下载任务

175 defadd(self,url):176 temp=urlparse.urlparse(url)177 size=self._getinfo(temp.netloc,temp.path)178 num=20

179 t=size//num180 fname=url.split('/')[-1]181 for i inrange(num):182 start=t*i183 end=start+t-1

184 if i==num-1 :end=size185 job=gevent.spawn(self._getpart,i,temp.netloc,temp.path,fname,start,end)186 self._pool.add(job)187 self._pool.join()188 print 'downloaded'

这个东西还存在很多问题,没有实现持久化,不能控制具体下载线程,没有处理重定向、超时等,下一步进一步研究。欢迎各位达人指正,拍砖。

112>