天天看點

python 分布式程序

文章目錄

  • ​​簡介​​
python 分布式程式

簡介

分布式程序指的是将Process程序分布到多台機器上,充分利用多台機器的性能完成複雜的任務。我們可以将這一點應用到分布式爬蟲的開發中。

分布式程序在Python中依然要用到 ​​multiprocessing​​ 子產品。multiprocessing子產品不但支援多程序,其中​

​managers​

​子子產品還支援把多程序分布到多台機器上。可以寫一個服務程序作為排程者,将任務分布到其他多個程序中,依靠網絡通信進行管理。舉個例子:在做爬蟲程式時,常常會遇到這樣的場景,我們想抓取某個網站

的所有圖檔,如果使用多程序的話,一般是一個程序負責抓取圖檔的連結位址,将連結位址存放到​

​Queue​

​​中,另外的程序負責從​

​Queue​

​中讀取連結位址進行下載下傳和存儲到本地。現在把這個過程做成分布式,一台機器上的程序負責抓取連結,其他機器上的程序負責下載下傳存儲。那麼遇到的主要問題是将Queue暴露到網絡中,讓其他機器程序都可以通路,分布式程序就是将這一個過程進行了封裝,我們可以将這個過程稱為本地隊列的網絡化。整體過程如圖1-24所示。

python 分布式程式

要實作上面例子的功能,建立分布式程序需要分為六個步驟:

  1. 建立隊列​

    ​Queue​

    ​,用來進行程序間的通信。服務程序建立任務隊列​

    ​task_queue​

    ​,用來作為傳遞任務給任務程序的通道;服務程序建立結果隊列​

    ​result_queue​

    ​,作為任務程序完成任務後回複服務程序的通道。在分布式多程序環

    境下,必須通過由​

    ​Queuemanager​

    ​獲得的Queue接口來添加任務。
  2. 把第一步中建立的隊列在網絡上注冊,暴露給其他程序(主機),注冊後獲得網絡隊列,相當于本地隊列的映像。
  3. 建立一個對象(Queuemanager(BaseManager))執行個體​

    ​manager​

    ​,綁定端口和驗證密碼。
  4. 啟動第三步中建立的執行個體,即啟動管理​

    ​manager​

    ​,監管資訊通道。
  5. 通過管理執行個體的方法獲得通過網絡通路的​

    ​Queue​

    ​對象,即再把網絡隊列實體化成可以使用的本地隊列。
  6. 建立任務到“本地”隊列中,自動上傳任務到網絡隊列中,配置設定給任務程序進行處理。接下來通過程式實作上面的例子(Linux版),首先編寫的是服務程序(​

    ​taskManager.py​

    ​),代碼如下:
import random,time,Queue
from multiprocessing.managers import BaseManager
# 第一步:建立task_queue和result_queue,用來存放任務和結果
task_queue=Queue.Queue()
result_queue=Queue.Queue()
class Queuemanager(BaseManager):
  pass
# 第二步:把建立的兩個隊列注冊在網絡上,利用register方法,callable參數關聯了Queue對象,
# 将Queue對象在網絡中暴露

Queuemanager.register('get_task_queue',callable=lambda:task_queue)
Queuemanager.register('get_result_queue',callable=lambda:result_queue)
# 第三步:綁定端口8001,設定驗證密碼‘qiye’。這個相當于對象的初始化
manager=Queuemanager(address=('',8001),authkey='qiye')
# 第四步:啟動管理,監聽資訊通道
manager.start()
# 第五步:通過管理執行個體的方法獲得通過網絡通路的Queue對象
task=manager.get_task_queue()
result=manager.get_result_queue()
# 第六步:添加任務
for url in ["ImageUrl_"+i for i in range(10)]:
  print 'put task %s ...' %url
  task.put(url)
# 擷取傳回結果
print 'try get result...'
for i in range(10):
  print 'result is %s' %result.get(timeout=10)
# 關閉管理
manager.shutdown()      

任務程序已經編寫完成,接下來編寫任務程序(​

​taskWorker.py​

​),建立任務程序的步驟相對較少,需要四個步驟:

  1. 使用​

    ​QueueManager​

    ​​注冊用于擷取​

    ​Queue​

    ​​的方法名稱,任務程序隻能通過名稱來在網絡上擷取​

    ​Queue​

    ​。
  2. 連接配接伺服器,端口和驗證密碼注意保持與服務程序中完全一緻。
  3. 從網絡上擷取​

    ​Queue​

    ​,進行本地化。
  4. 從​

    ​task​

    ​​隊列擷取任務,并把結果寫入​

    ​result​

    ​隊列

程式​

​taskWorker.py​

​代碼(win/linux版)如下:

# coding:utf-8
import time
from multiprocessing.managers import BaseManager
# 建立類似的QueueManager:
class QueueManager(BaseManager):
  pass
# 第一步:使用QueueManager注冊用于擷取Queue的方法名稱
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 第二步:連接配接到伺服器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和驗證密碼注意保持與服務程序完全一緻:
m = QueueManager(address=(server_addr, 8001), authkey='qiye')
# 從網絡連接配接:
m.connect()
# 第三步:擷取Queue的對象:
task = m.get_task_queue()
result = m.get_result_queue()
# 第四步:從task隊列擷取任務,并把結果寫入result隊列:
while(not task.empty()):
  image_url = task.get(True,timeout=5)
  print('run task download %s...' % image_url)
  time.sleep(1)
  result.put('%s--->success'%image_url)
# 處理結束:
print('worker exit.')      

最後開始運作程式,先啟動服務程序​

​taskManager.py​

​,運作結果如下:

put task ImageUrl_0 ...
put task ImageUrl_1 ...
put task ImageUrl_2 ...
put task ImageUrl_3 ...
put task ImageUrl_4 ...
put task ImageUrl_5 ...
put task ImageUrl_6 ...
put task ImageUrl_7 ...
put task ImageUrl_8 ...
put task ImageUrl_9 ...
try get result...      

接着再啟動任務程序taskWorker.py,運作結果如下:

Connect to server 127.0.0.1...
run task download ImageUrl_0...
run task download ImageUrl_1...
run task download ImageUrl_2...
run task download ImageUrl_3...
run task download ImageUrl_4...
run task download ImageUrl_5...
run task download ImageUrl_6...
run task download ImageUrl_7...
run task download ImageUrl_8...
run task download ImageUrl_9...
worker exit.      

當任務程序運作結束後,服務程序運作結果如下:

result is ImageUrl_0--->success
result is ImageUrl_1--->success
result is ImageUrl_2--->success
result is ImageUrl_3--->success
result is ImageUrl_4--->success
result is ImageUrl_5--->success
result is ImageUrl_6--->success
result is ImageUrl_7--->success
result is ImageUrl_8--->success
result is ImageUrl_9--->success      

其實這就是一個簡單但真正的分布式計算,把代碼稍加改造,啟動多個worker,就可以把任務分布到幾台甚至幾十台機器上,實作大規模的分布式爬蟲。

注意 由于平台的特性,建立服務程序的代碼在​

​Linux​

​​和​

​Windows​

​上有一些不同,建立工作程序的代碼是一緻的。
# coding:utf-8
# taskManager.py for windows
import queue
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support
# 任務個數
task_number = 10
# 定義收發隊列
task_queue = queue.Queue(task_number);
result_queue = queue.Queue(task_number);
def get_task():
  return task_queue

def get_result():
  return result_queue
# 建立類似的QueueManager:
class QueueManager(BaseManager):
  pass

def win_run():
  # Windows下綁定調用接口不能使用lambda,是以隻能先定義函數再綁定
  QueueManager.register('get_task_queue',callable = get_task)
  QueueManager.register('get_result_queue',callable = get_result)
  # 綁定端口并設定驗證密碼,Windows下需要填寫IP位址,Linux下不填預設為本地
  manager = QueueManager(address = ('127.0.0.1',8001))
  # 啟動
  manager.start()
  try:
  # 通過網絡擷取任務隊列和結果隊列
    task = manager.get_task_queue()
    result = manager.get_result_queue()
    # 添加任務
    for url in ["ImageUrl_"+str(i) for i in range(10)]:
      print('put task %s ...' %url)
      task.put(url)
    print('try get result...')
    for i in range(10):
      print('result is %s' %result.get(timeout=10))
  except:
    print('Manager error')
  finally:
    # 一定要關閉,否則會報管道未關閉的錯誤
    manager.shutdown()

if __name__ == '__main__':
# Windows下多程序可能會有問題,添加這句可以緩解
  freeze_support()
  win_run()      
$ python taskManager_win.py 
put task ImageUrl_0 ...
put task ImageUrl_1 ...
put task ImageUrl_2 ...
put task ImageUrl_3 ...
put task ImageUrl_4 ...
put task ImageUrl_5 ...
put task ImageUrl_6 ...
put task ImageUrl_7 ...
put task ImageUrl_8 ...
put task ImageUrl_9 ...
try get result...