0x00 文檔路徑:
- Python » 3.6.2 Documentation » The Python Standard Library » 17. Concurrent Execution »
0x01 子產品簡述:
添加于python3.2
提供更加高效的接口來實作異步執行
通過具體實作來剖析
0x02 具體實作
參考官方文檔給出的例子
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=) as executor:
future = executor.submit(pow, , )
print(future.result())
-
通過ThreadPoolExecutor來生成一個Executor對象
源碼位置Lib\concurrent\futures\thread.py 83行
一共兩個參數max_workers=None、thread_name_profix=”
max_workers
用來指定最大線程數
if max_workers is None: # Use this number because ThreadPoolExecutor is often # used to overlap I/O instead of CPU work. max_workers = (os.cpu_count() or ) *
先判斷是否是None,即未指定時,預設是cpu數量*5,比如你是四核的cpu,那麼預設最大線程20
再判斷是否小于0,若小于則抛出ValueError
-
調用ThreadPoolExecutor對象的submit方法
ThreadPoolExecutor繼承了Executor對象,并實作了部分重寫
源碼位置
Lib\concurrent\futures\thread.py 106行
一共三個參數 fn , *rags , **kwargs
submit方法執行 fn(*args,**kwargs) , 然後傳回一個Future對象
-
調用Future對象的result方法
傳回被執行函數的結果
源碼位置
Lib\concurrent\futures\_base.py 378行
一個參數timeout = None
不指定時,将不會限制等待時間,即一直等到函數完成
如果在指定時間内函數未完成,則抛出異常TimtoutError
-
因為with上下文管理器的原因,自動調用Executor對象的shutdown方法來釋放資源
直接使用with 語句即可
另一個例子
from telnetlib import Telnet
def detect_port(port):
try:
print('{} testing .. '.format(port))
Telnet('127.0.0.1',port,timeout=)
print('{} opened '.format(port))
except:
pass
with ThreadPoolExecutor()as executor:
executor.map(detect_port,range())
map方法
完全繼承于Executor
源碼位置
Lib\concurrent\futures\_base.py
一共四個參數 fn , *iterables , timeout = None , chunksize = 1
chunsize用于ProcessPoolExecutor,其他的參數同上面的submit
def map(self, fn, *iterables, timeout=None, chunksize=):
if timeout is not None:
end_time = timeout + time.time()
fs = [self.submit(fn, *args) for args in zip(*iterables)]
def result_iterator():
try:
for future in fs:
if timeout is None:
yield future.result()
else:
yield future.result(end_time - time.time())
finally:
for future in fs:
future.cancel()
return result_iterator()
map裡面調用了内置函數zip,zip函數的特點是
In []: print(*zip('abc',[,,,]))
('a', ) ('b', ) ('c', )
舉例說
def detect_port(ip,port):
檢測端口
with ThreadPoolExecutor()as ecexutor:
executor.map(detect_port,['127.0.0.1','192.168.1.10'],['21','22','23'])
原本是打算逐個ip掃描這三個端口是否開放
但是此時隻會掃描127.0.0.1的21端口,以及192.168.1.10的23端口
def detect_port(ip,port):
檢測端口
def detect_all_ip(ip):
ports = [xx,xx,..]
with ThreadPoolExecutor()as executor:
[executor.submit(detect_port,ip,ports) for port in ports]
def main():
ips = ['xxx','xxx',..]
with ThreadPoolExecutor()as executor:
executor.map(detect_all_ip,ips)
0x03 myself
如下是之前寫的通過telnet掃描端口的腳本,便是利用了ThreadPoolExecutor,可供參考
detectPortByTelnet.py下載下傳
新搭了部落格,有興趣的朋友們可以去踩
樓蘭’s Blog