问题:我给你10个的url,你帮我去把10url的网址下载。
传统方式
# 传统串行方式
import requests
import time
urls = ['https://github.com/' for _ in range(10)]
start = time.time()
for url in urls:
response = requests.get(url)
# print(response)
spend_time = time.time() - start
print(spend_time)
# 12.493084907531738
一、多进程和多线程实现并发
import time
from concurrent.futures import ProcessPoolExecutor
import requests
start = time.time()
def task(url):
response = requests.get(url)
return response
def done(future, *args, **kwargs):
response = future.result()
print(response.url)
if __name__ == '__main__':
url_list = ['https://www.douban.com/' for _ in range(100)]
with ProcessPoolExecutor(max_workers=10) as pool:
for url in url_list:
v = pool.submit(task, url)
v.add_done_callback(done)
print(time.time() - start)
# 11.862671136856079
花费时间 11.862671136856079秒
#########编写方式二#########
import requests
from concurrent.futures import ThreadPoolExecutor
pool = ThreadPoolExecutor()
def task(url):
response = requests.get(url)
return response
def done(future,*args,**kwargs):
response = future.result()
print(response.url)
if __name__ == '__main__':
start = time.time()
with ThreadPoolExecutor(max_workers=os.cpu_count()) as pool:
url_list = ['https://www.douban.com/' for _ in range(100)]
for url in url_list:
v = pool.submit(task,url)
v.add_done_callback(done)
print(time.time() - start)
# 8.904985904693604
花费时间 8.904985904693604秒
二、基于事件循环的异步IO
1. asyncio + aiohttp
import time
import asyncio
import aiohttp
async def start_request(session, url):
sem = asyncio.Semaphore(10, loop=loop)
async with sem:
print(f'make request to {url}')
with async_timeout.timeout(60):
async with session.get(url, verify_ssl=False) as response:
if response.status == 200:
print(response.status)
async def run(urls):
conn = aiohttp.TCPConnector(ssl=False,
limit=60, # 连接池在windows下不能太大, <500
use_dns_cache=True)
async with aiohttp.ClientSession(connector=conn, loop=loop) as session:
datas = await asyncio.gather(*[start_request(session, url) for url in urls], return_exceptions=True)
for ind, url in enumerate(urls):
if isinstance(datas[ind], Exception):
print(f"{ind}, {url}: 下载失败 请重新下载:")
if __name__ == '__main__':
start = time.time()
urls = (('http://www.baidu.com/') for _ in range(100))
loop = asyncio.get_event_loop()
loop.run_until_complete(run(urls))
print(time.time() - start)
# 1.4860568046569824
2. Twisted
import time
from twisted.internet import defer
from twisted.internet import reactor
from twisted.web.client import getPage
start = time.time()
def one_done(content, arg):
response = content.decode('utf-8')
# print(response)
print(arg)
def all_done(arg):
reactor.stop()
print(time.time() - start)
@defer.inlineCallbacks
def task(url):
res = getPage(bytes(url, encoding='utf8')) # 发送Http请求
res.addCallback(one_done, url)
yield res
url_list = ('http://www.cnblogs.com' for _ in range(100))
defer_list = [] # [特殊,特殊,特殊(已经向url发送请求)]
for url in url_list:
v = task(url)
defer_list.append(v)
d = defer.DeferredList(defer_list)
d.addBoth(all_done)
reactor.run() # 死循环
# 5.039534091949463
3. tornado
import time
from tornado.httpclient import AsyncHTTPClient
from tornado.httpclient import HTTPRequest
from tornado import ioloop
COUNT = 0
start = time.time()
def handle_response(response):
global COUNT
COUNT -= 1
if response.error:
print("Error:", response.error)
else:
# print(response.body)
print(response.request)
# 方法同twisted
# ioloop.IOLoop.current().stop()
if COUNT == 0:
ioloop.IOLoop.current().stop()
def func():
url_list = ['http://www.baidu.com' for _ in range(100)]
global COUNT
COUNT = len(url_list)
for url in url_list:
print(url)
http_client = AsyncHTTPClient()
http_client.fetch(HTTPRequest(url), handle_response)
if __name__ == '__main__':
ioloop.IOLoop.current().add_callback(func)
ioloop.IOLoop.current().start() # 死循环
print(time.time() - start)
# 3.0621743202209473

"""
########http请求本质,IO阻塞########
sk = socket.socket()
#1.连接
sk.connect(('www.baidu.com',80,)) #阻塞
print('连接成功了')
#2.连接成功后发送消息
sk.send(b"GET / HTTP/1.0\r\nHost: baidu.com\r\n\r\n")
#3.等待服务端响应
data = sk.recv(8096)#阻塞
print(data) #\r\n\r\n区分响应头和影响体
#关闭连接
sk.close()
"""
"""
########http请求本质,IO非阻塞########
sk = socket.socket()
sk.setblocking(False)
#1.连接
try:
sk.connect(('www.baidu.com',80,)) #非阻塞,但会报错
print('连接成功了')
except BlockingIOError as e:
print(e)
#2.连接成功后发送消息
sk.send(b"GET / HTTP/1.0\r\nHost: baidu.com\r\n\r\n")
#3.等待服务端响应
data = sk.recv(8096)#阻塞
print(data) #\r\n\r\n区分响应头和影响体
#关闭连接
sk.close()
"""
异步非阻塞请求的本质

class HttpRequest:
def __init__(self,sk,host,callback):
self.socket = sk
self.host = host
self.callback = callback
def fileno(self):
return self.socket.fileno()
class HttpResponse:
def __init__(self,recv_data):
self.recv_data = recv_data
self.header_dict = {}
self.body = None
self.initialize()
def initialize(self):
headers, body = self.recv_data.split(b'\r\n\r\n', 1)
self.body = body
header_list = headers.split(b'\r\n')
for h in header_list:
h_str = str(h,encoding='utf-8')
v = h_str.split(':',1)
if len(v) == 2:
self.header_dict[v[0]] = v[1]
class AsyncRequest:
def __init__(self):
self.conn = []
self.connection = [] # 用于检测是否已经连接成功
def add_request(self,host,callback):
try:
sk = socket.socket()
sk.setblocking(0)
sk.connect((host,80))
except BlockingIOError as e:
pass
request = HttpRequest(sk,host,callback)
self.conn.append(request)
self.connection.append(request)
def run(self):
while True:
rlist,wlist,elist = select.select(self.conn,self.connection,self.conn,0.05)
for w in wlist:
print(w.host,'连接成功...')
# 只要能循环到,表示socket和服务器端已经连接成功
tpl = "GET / HTTP/1.0\r\nHost:%s\r\n\r\n" %(w.host,)
w.socket.send(bytes(tpl,encoding='utf-8'))
self.connection.remove(w)
for r in rlist:
# r,是HttpRequest
recv_data = bytes()
while True:
try:
chunck = r.socket.recv(8096)
recv_data += chunck
except Exception as e:
break
response = HttpResponse(recv_data)
r.callback(response)
r.socket.close()
self.conn.remove(r)
if len(self.conn) == 0:
break
def f1(response):
print('保存到文件',response.header_dict)
def f2(response):
print('保存到数据库', response.header_dict)
url_list = [
{'host':'www.youku.com','callback': f1},
{'host':'v.qq.com','callback': f2},
{'host':'www.cnblogs.com','callback': f2},
]
if __name__ == '__main__':
req = AsyncRequest()
for item in url_list:
req.add_request(item['host'], item['callback'])
req.run()
自定异步非阻塞IO
作者:张亚飞
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明。