網絡程式設計實驗課程要求必須寫一個套接字的應用程式,考慮到之前寫過的單詞發音抓取程式的效率比較低下,就順便結合套接字做一個分布式的抓取軟體。其中涉及到動态任務領取,負載均衡,多線程,加鎖解鎖,簡單的HTML代碼解析,檔案讀寫等功能。程式還是使用Python完成,對于學習Python、套接字程式設計、分布式程式設計甚至叢集程式設計都有一定的意義。
另外,此軟體具有一定的攻擊性,如果啟動的從節點數量過多,并且每個節點上啟動的線程數量過大,那麼提供單詞發音的伺服器可能承受不住壓力。是以,此軟體對于編寫web伺服器的負載壓力測試有一定的參考意義。
下面是系統結構圖:
<a href="http://blog.51cto.com/attachment/201205/163813333.png" target="_blank"></a>
圖1 系統結構圖
圖1為系統結構圖,整個系統由主節點和從節點兩大部分構成,主節點上有四種不同類型的線程,分别為:
l 任務偵聽線程:負責接受從節點上的任務請求線程,需要注意的是,任務偵聽線程并不負責和請求線程通信,而是啟動一個新的單詞分發線程負責和請求線程通信。
l 單詞分發線程池:為從節點上的請求線程服務的主節點上的線程的集合。
l 檔案偵聽線程:負責接受從節點上的資料回傳請求,需要注意的是,檔案偵聽線程并不負責和請求線程通信,而是啟動一個新的寫磁盤線程和請求線程通信。
l 寫磁盤線程:為從節點上的請求線程服務的主節點上的線程的集合。
用戶端也是使用并發的方式實作,一共有以下三類線程:
l 請求線程:當單詞隊列小于一定的門檻值時,請求線程負責向主節點的任務偵聽線程請求任務。
l 下載下傳線程池,負責現在單詞音頻檔案的線程的集合
l 檔案回傳線程:負責将寫入本地磁盤上的檔案回傳到主節點上的線程
另外,系統中還有幾個關鍵的資料結構:
l 主節點上的全部單詞清單,其中存儲了所有的需要下載下傳發音檔案對應的單詞,該資料結構是一個隊列。
l 從節點上的部分單詞清單,其中存儲了從節點上需要下載下傳的部分單詞,該資料結構是一個隊列。
l 從節點上的已下載下傳的mp3檔案名稱清單,其中存儲了已經下載下傳的mp3檔案的路徑,該資料結構為一個隊列。 <b></b>
下面為具體的編碼實作,僅為核心代碼沒有貼所有代碼。如果有需要可以給我發郵件索取~
class WordDispatchThread(threading.Thread):
def __init__(self,wordsList):
threading.Thread.__init__(self)
self.wordSocket = socket(AF_INET,SOCK_STREAM)
self.wordSocket.bind(('',wordSocketPort))
self.wordSocket.listen(maxConnectNum)
self.wordsList = wordsList
def run(self):
while True:
clientSock,address = self.wordSocket.accept()
print 'connect from %s is established.' % str(address)
wordWorker = WordServiceThread(clientSock,self.wordsList)
wordWorker.start()
說明:在該類的構造函數中,将建立偵聽套接字,并将全部單詞清單作為類的參數成員。線程運作為一個死循環,其中啟動accept方法,收到任務請求線程的請求之後,将啟動任務分發線程為其服務。
該線程的狀态轉換圖如圖2所示:
<a href="http://blog.51cto.com/attachment/201205/202750401.png" target="_blank"></a>
圖2 任務偵聽線程狀态轉換圖
class FileStoreThread(threading.Thread):
def __init__(self,wordsDownloaded):
self.fileSocket = socket(AF_INET,SOCK_STREAM)
self.fileSocket.bind(('',fileSocketPort))
self.fileSocket.listen(maxConnectNum)
self.wordsList = wordsDownloaded
clientSock,address = self.fileSocket.accept()
fileWorker = FileServiceThread(clientSock,self.wordsList)
fileWorker.start()
說明:在該類的構造函數中,将建立偵聽套接字,并将已下載下傳的單詞清單作為類的參數成員。線程運作為一個死循環,其中啟動accept方法,收到任務請求線程的請求之後,将啟動寫磁盤線程為其服務。
檔案偵聽線程狀态轉換圖如圖3所示:
<a href="http://blog.51cto.com/attachment/201205/202930389.png" target="_blank"></a>
圖3 檔案偵聽線程的狀态轉換圖
class WordServiceThread(threading.Thread):
def __init__(self,clientSocket,wordsList):
self.clientSocket = clientSocket
global wordsListMutex
request = self.clientSocket.recv(maxWordLength)
requestStr = request.decode('ascii')
if(requestStr == 'please'):
tmpWordsList = ''
wordsListMutex.acquire()
for i in range(0,taskGrain):
tmpWord = self.wordsList.front()
tmpWordsList = tmpWordsList + ' ' + tmpWord
wordsListMutex.release()
self.clientSocket.send(tmpWordsList)
else:
self.clientSocket.close()
break
說明:在該類的構造函數中得到為請求線程服務的套接字,以及全部單詞清單的隊列。在run中執行任務發送協定。首先受到一個請求字元串,應該為‘please’,否則表示應該在伺服器端關閉為其服務的套接字。
收到please之後,從所有單詞隊列中取出一定數量的單詞拼成一個字元串發給送從節點的請求線程。
該線程的執行邏輯對應的流程圖如圖4所示:
<a href="http://blog.51cto.com/attachment/201205/203050857.png" target="_blank"></a>
圖4 任務分發線程
class FileServiceThread(threading.Thread):
global soundQueueMutex
def __init__(self,clientSocket,wordsDownloaded):
self.socket = clientSocket
self.wordsDownloaded = wordsDownloaded
rawSaveName = self.socket.recv(maxWordLength)
word = rawSaveName.decode('ascii')
if(word == ''):
continue
if(word == '0'):
saveName = word + '.mp3'
outfd = open(saveName,'wb')
helloPacket = 'OK'
helloPacketStr = helloPacket.encode('ascii')
self.socket.send(helloPacketStr)
while True:
data = self.socket.recv(dataBlockSize)
outfd.write(data)
query = 'any more?'
self.socket.send(query.encode('ascii'))
answer = self.socket.recv(maxWordLength)
answer = answer.decode('ascii')
if(answer == 'yes'):
continue
else:
break
print 'received %s successfully...' % saveName
outfd.close()
說明:在構造函數中擷取到為用戶端檔案回寫線程服務的套接字,通過該套接字執行資料回寫協定,将資料寫回到磁盤上。
寫磁盤線程的執行邏輯對應的流程圖如圖圖5所示:
<a href="http://blog.51cto.com/attachment/201205/203211307.png" target="_blank"></a>
圖5 寫磁盤線程
class TaskRequestThread(threading.Thread):
global wordQueueMutex
def __init__(self,wordQueue):
self.requestSocket = socket(AF_INET,SOCK_STREAM)
self.taskQueue = wordQueue
self.requestSocket.connect(('localhost',wordSocketPort))
wordQueueMutex.acquire()
if(len(self.taskQueue) < downloadWokerNum ):
requestStr = 'please'
self.requestSocket.send(requestStr.encode('ascii'))
rawStr = self.requestSocket.recv(rawStrLen) #group 10 words once
wordsStr = rawStr.decode('ascii')
wordsList = wordsStr.split()
for word in wordsList:
self.taskQueue.append(word)
exit = True
for word in self.taskQueue:
if word != '0':
exit = False
if(exit == True):
self.requestSocket.send('0')
self.requestSocket.close()
wordQueueMutex.release()
wordQueueMutex.release()
time.sleep(taskRequestWorkerSleep)
說明:請求線程定期掃描任務隊列中任務的數量,如果小于一定的門檻值,則從主節點上請求新的單詞清單任務。否則,線程進入睡眠。如果目前隊列中的所有單詞都為‘0’,說明主節點上已經沒有單詞任務了,這時可以退出了。
任務請求線程對應的執行邏輯如圖6所示:
<a href="http://blog.51cto.com/attachment/201205/203317113.png" target="_blank"></a>
圖6 任務請求線程
class DownloadThread(threading.Thread):
global fileQueueMutex
def __init__(self,wordQueue,fileQueue):
self.wordQueue = wordQueue
self.fileQueue = fileQueue
time.sleep(2) #delete this when presenting a demostration
word = ''
wordQueueMutex.acquire()
while (len(self.wordQueue) == 0):
time.sleep(downloadWorkerSleep)
wordQueueMutex.acquire()
if(self.wordQueue[0]=='0'):
fileQueueMutex.acquire()
self.fileQueue.append('0')
fileQueueMutex.release()
break;
word = self.wordQueue.pop(0)
url = "http://www.dwds.de/?qu="+word
urlContent = urllib2.urlopen(url).read()
#print urlContent
urlList = re.findall('filename=http://media.dwds.de/dwds/media/sound/dwdswb_aussprache/.*\.mp3', urlContent)
try:
finalUrl = urlList[0][9:]
#print finalUrl
soundData = urllib2.urlopen(finalUrl).read()
saveName=word+'.mp3'+'.local'
#print saveName
outfd = open(saveName,'wb')
outfd.write(soundData)
outfd.close()
fileQueueMutex.acquire()
self.fileQueue.append(word)
fileQueueMutex.release()
print '%s: OK' % word
except:
print '%s: FAILED' % word
finally:
pass
<b></b>
說明:下載下傳線程在構造函數中就獲得了任務清單隊列和已下載下傳檔案隊列,其處理過程前面已經叙述過了。首先從目前任務隊列中取出一個單詞,如果單詞為’0’,表示已經沒有任務了,這時線程退出。否則,就需要去建構單詞頁面的URL,然後分析頁面的HTML代碼,使用正規表達式找到單詞音頻檔案的URL,接着将資料讀入記憶體并寫入磁盤。另外注意的是,下載下傳線程在退出的時候會給已下載下傳單詞隊列中寫入’0’,以通知回傳線程退出。
下載下傳線程的執行邏輯對應的流程圖如圖7所示:
<a href="http://blog.51cto.com/attachment/201205/203414555.png" target="_blank"></a>
圖7 下載下傳線程
class FileTransferThread(threading.Thread):
def __init__(self,fileQueue):
self.fileQueue = fileQueue;
self.fileSocket.connect(('localhost',fileSocketPort))
self.exitCounter = 0
fileQueueMutex.acquire()
while(len(self.fileQueue)==0):
time.sleep(fileWorkerSleep)
word = self.fileQueue.pop(0)
fileQueueMutex.release()
self.exitCounter = self.exitCounter + 1
if (self.exitCounter == downloadWokerNum):
self.fileSocket.send(word.encode('ascii'))
self.fileSocket.close()
self.fileSocket.send(word.encode('ascii'))
response = self.fileSocket.recv(helloLength)
responseStr = response.decode('ascii')
if(responseStr != 'OK'):
self.fileSocket.close()
saveName = word + '.mp3' + '.local'
infd = open(saveName,'rb')
data = infd.read(dataBlockSize)
while True:
self.fileSocket.send(data)
query = self.fileSocket.recv(maxWordLength)
answer = ''
data = infd.read(dataBlockSize)
if not data:
answer = 'no'
self.fileSocket.send(answer.encode('ascii'))
answer = 'yes'
infd.close()
說明:回傳線程在構造函數中獲得了已下載下傳的單詞的隊列。在運作的過程中,首先判斷目前的已下載下傳單詞隊列中是否有檔案名,如果有則立即回傳資料。下載下傳線程在退出的時候會給已下載下傳單詞隊列中寫入’0’,以通知回傳線程退出。回傳線程會統計獲得的0的數量,如果統計的數量等于下載下傳線程的數量,表示下載下傳線程全部退出,同時檔案已經回傳完畢。這時,回傳線程也可以退出了。
檔案回傳線程的執行邏輯對應的流程圖如圖8所示:
<a href="http://blog.51cto.com/attachment/201205/203524498.png" target="_blank"></a>
圖8 檔案回傳線程
class WordsList:
def __init__(self):
if(len(sys.argv)<2):
print 'Usage: %s filename' % sys.argv[0]
sys.exit(-1)
filePath = sys.argv[1]
self.t = []
try:
for line in fileinput.input(filePath):
wordLen = len(line)
if( wordLen > 1 and line[wordLen-1] == '\n'):
word = line[0:wordLen-1]
self.t.append(word)
self.t.append(line)
self.t.append('0')
except:
print 'constructing words list error.'
print 'maybe the provided file path is wrong.Check it twice.'
sys.exit(-2)
finally:
pass
print 'constructing words successfully...'
def front(self):
if(self.t[0]!='0'):
return self.t.pop(0)
else:
return self.t[0]
說明:該類在構造函數中,從指令行上提供的檔案名中解析出所有的單詞,并存放在一個清單中,在最後加入一個’0’表示已經沒有單詞了。此類中還提供一個方法--front,用于從單詞隊列中擷取隊首的單詞,如果沒有單詞的話,傳回為’0’。這一點非常重要,用于控制從節點線程的結束。
今年一月份的時候,準備背誦德語四級單詞,但是很多單詞發音記不清楚,是以找到了一個單詞發音網站。當時,想将這些音頻檔案存放在我的mp3播放器中,這樣我可以随時随地的背誦記憶了,是以用Python寫了一個簡單的單詞發音抓取程式,但是抓取效率不是很高,速度比較慢。而我的研究方向為高性能計算,現在主要是叢集計算,是以結合網絡課程上要求的套接字網絡程式設計實驗和我的研究方向,完成了此軟體的開發。
該軟體的目标很明确,就是以伺服器能否提供的最大負載來下載下傳單詞對應的音頻檔案。此軟體有一定的攻擊性,如果這個軟體可以通過網際網路傳播,并在背景隐秘的運作,則完全可以使提供單詞下載下傳的網站癱瘓。
在實際程式設計中,遇到了一些比較棘手的問題。特别是檔案傳輸過程,雖然可以使用ftp協定傳輸,但是為了保持軟體的最小依賴性,還是實作了一個簡單的檔案傳輸協定。因為Karn算法的原因,有些位元組被緩沖在緩存中導緻接受過程混亂等問題都是之前沒有遇到過的。這些問題通過自定義的一些簡單的協定得以解決。
之前還想添加一個語音提示的功能,在某個單詞下載下傳完畢之後,語音提示該單詞已經下載下傳完成。但考慮到多節點下載下傳時單詞發音反而會成為瓶頸,是以放棄了這個功能的實作。
此軟體僅僅作為一個示範軟體,并沒有在實際的叢集上運作過。軟體的實際運作環境還是做了很多的假設,并不能保證深度測試的時候不出現故障。
關于軟體的并發,由于希望此軟體能夠運作在叢集上,是以就必須考慮負載均衡的問題。這裡使用主從模式,伺服器作為主節點,其他節點作為從節點。為了做到負載均衡,使用動态任務配置設定的并行模式,每個從節點在沒有任務的時候向主節點索取,而不是被動的從主節點接受。動态任務配置設定的模式可以保證各個節點都處于繁忙狀态,最小化負載不均帶來的問題。
本文轉自hipercomer 51CTO部落格,原文連結:http://blog.51cto.com/hipercomer/874996