文章目錄
- 簡介
簡介
分布式程序指的是将Process程序分布到多台機器上,充分利用多台機器的性能完成複雜的任務。我們可以将這一點應用到分布式爬蟲的開發中。
分布式程序在Python中依然要用到 multiprocessing 子產品。multiprocessing子產品不但支援多程序,其中
managers
子子產品還支援把多程序分布到多台機器上。可以寫一個服務程序作為排程者,将任務分布到其他多個程序中,依靠網絡通信進行管理。舉個例子:在做爬蟲程式時,常常會遇到這樣的場景,我們想抓取某個網站
的所有圖檔,如果使用多程序的話,一般是一個程序負責抓取圖檔的連結位址,将連結位址存放到
Queue
中,另外的程序負責從
Queue
中讀取連結位址進行下載下傳和存儲到本地。現在把這個過程做成分布式,一台機器上的程序負責抓取連結,其他機器上的程序負責下載下傳存儲。那麼遇到的主要問題是将Queue暴露到網絡中,讓其他機器程序都可以通路,分布式程序就是将這一個過程進行了封裝,我們可以将這個過程稱為本地隊列的網絡化。整體過程如圖1-24所示。
要實作上面例子的功能,建立分布式程序需要分為六個步驟:
- 建立隊列
,用來進行程序間的通信。服務程序建立任務隊列Queue
,用來作為傳遞任務給任務程序的通道;服務程序建立結果隊列task_queue
result_queue
,作為任務程序完成任務後回複服務程序的通道。在分布式多程序環
境下,必須通過由
獲得的Queue接口來添加任務。Queuemanager
- 把第一步中建立的隊列在網絡上注冊,暴露給其他程序(主機),注冊後獲得網絡隊列,相當于本地隊列的映像。
- 建立一個對象(Queuemanager(BaseManager))執行個體
,綁定端口和驗證密碼。manager
- 啟動第三步中建立的執行個體,即啟動管理
,監管資訊通道。manager
- 通過管理執行個體的方法獲得通過網絡通路的
對象,即再把網絡隊列實體化成可以使用的本地隊列。Queue
- 建立任務到“本地”隊列中,自動上傳任務到網絡隊列中,配置設定給任務程序進行處理。接下來通過程式實作上面的例子(Linux版),首先編寫的是服務程序(
),代碼如下:taskManager.py
import random,time,Queue
from multiprocessing.managers import BaseManager
# 第一步:建立task_queue和result_queue,用來存放任務和結果
task_queue=Queue.Queue()
result_queue=Queue.Queue()
class Queuemanager(BaseManager):
pass
# 第二步:把建立的兩個隊列注冊在網絡上,利用register方法,callable參數關聯了Queue對象,
# 将Queue對象在網絡中暴露
Queuemanager.register('get_task_queue',callable=lambda:task_queue)
Queuemanager.register('get_result_queue',callable=lambda:result_queue)
# 第三步:綁定端口8001,設定驗證密碼‘qiye’。這個相當于對象的初始化
manager=Queuemanager(address=('',8001),authkey='qiye')
# 第四步:啟動管理,監聽資訊通道
manager.start()
# 第五步:通過管理執行個體的方法獲得通過網絡通路的Queue對象
task=manager.get_task_queue()
result=manager.get_result_queue()
# 第六步:添加任務
for url in ["ImageUrl_"+i for i in range(10)]:
print 'put task %s ...' %url
task.put(url)
# 擷取傳回結果
print 'try get result...'
for i in range(10):
print 'result is %s' %result.get(timeout=10)
# 關閉管理
manager.shutdown()
任務程序已經編寫完成,接下來編寫任務程序(
taskWorker.py
),建立任務程序的步驟相對較少,需要四個步驟:
- 使用
注冊用于擷取QueueManager
的方法名稱,任務程序隻能通過名稱來在網絡上擷取Queue
。Queue
- 連接配接伺服器,端口和驗證密碼注意保持與服務程序中完全一緻。
- 從網絡上擷取
,進行本地化。Queue
- 從
隊列擷取任務,并把結果寫入task
隊列result
程式
taskWorker.py
代碼(win/linux版)如下:
# coding:utf-8
import time
from multiprocessing.managers import BaseManager
# 建立類似的QueueManager:
class QueueManager(BaseManager):
pass
# 第一步:使用QueueManager注冊用于擷取Queue的方法名稱
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 第二步:連接配接到伺服器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和驗證密碼注意保持與服務程序完全一緻:
m = QueueManager(address=(server_addr, 8001), authkey='qiye')
# 從網絡連接配接:
m.connect()
# 第三步:擷取Queue的對象:
task = m.get_task_queue()
result = m.get_result_queue()
# 第四步:從task隊列擷取任務,并把結果寫入result隊列:
while(not task.empty()):
image_url = task.get(True,timeout=5)
print('run task download %s...' % image_url)
time.sleep(1)
result.put('%s--->success'%image_url)
# 處理結束:
print('worker exit.')
最後開始運作程式,先啟動服務程序
taskManager.py
,運作結果如下:
put task ImageUrl_0 ...
put task ImageUrl_1 ...
put task ImageUrl_2 ...
put task ImageUrl_3 ...
put task ImageUrl_4 ...
put task ImageUrl_5 ...
put task ImageUrl_6 ...
put task ImageUrl_7 ...
put task ImageUrl_8 ...
put task ImageUrl_9 ...
try get result...
接着再啟動任務程序taskWorker.py,運作結果如下:
Connect to server 127.0.0.1...
run task download ImageUrl_0...
run task download ImageUrl_1...
run task download ImageUrl_2...
run task download ImageUrl_3...
run task download ImageUrl_4...
run task download ImageUrl_5...
run task download ImageUrl_6...
run task download ImageUrl_7...
run task download ImageUrl_8...
run task download ImageUrl_9...
worker exit.
當任務程序運作結束後,服務程序運作結果如下:
result is ImageUrl_0--->success
result is ImageUrl_1--->success
result is ImageUrl_2--->success
result is ImageUrl_3--->success
result is ImageUrl_4--->success
result is ImageUrl_5--->success
result is ImageUrl_6--->success
result is ImageUrl_7--->success
result is ImageUrl_8--->success
result is ImageUrl_9--->success
其實這就是一個簡單但真正的分布式計算,把代碼稍加改造,啟動多個worker,就可以把任務分布到幾台甚至幾十台機器上,實作大規模的分布式爬蟲。
注意 由于平台的特性,建立服務程序的代碼在和
Linux
上有一些不同,建立工作程序的代碼是一緻的。
Windows
# coding:utf-8
# taskManager.py for windows
import queue
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support
# 任務個數
task_number = 10
# 定義收發隊列
task_queue = queue.Queue(task_number);
result_queue = queue.Queue(task_number);
def get_task():
return task_queue
def get_result():
return result_queue
# 建立類似的QueueManager:
class QueueManager(BaseManager):
pass
def win_run():
# Windows下綁定調用接口不能使用lambda,是以隻能先定義函數再綁定
QueueManager.register('get_task_queue',callable = get_task)
QueueManager.register('get_result_queue',callable = get_result)
# 綁定端口并設定驗證密碼,Windows下需要填寫IP位址,Linux下不填預設為本地
manager = QueueManager(address = ('127.0.0.1',8001))
# 啟動
manager.start()
try:
# 通過網絡擷取任務隊列和結果隊列
task = manager.get_task_queue()
result = manager.get_result_queue()
# 添加任務
for url in ["ImageUrl_"+str(i) for i in range(10)]:
print('put task %s ...' %url)
task.put(url)
print('try get result...')
for i in range(10):
print('result is %s' %result.get(timeout=10))
except:
print('Manager error')
finally:
# 一定要關閉,否則會報管道未關閉的錯誤
manager.shutdown()
if __name__ == '__main__':
# Windows下多程序可能會有問題,添加這句可以緩解
freeze_support()
win_run()
$ python taskManager_win.py
put task ImageUrl_0 ...
put task ImageUrl_1 ...
put task ImageUrl_2 ...
put task ImageUrl_3 ...
put task ImageUrl_4 ...
put task ImageUrl_5 ...
put task ImageUrl_6 ...
put task ImageUrl_7 ...
put task ImageUrl_8 ...
put task ImageUrl_9 ...
try get result...