天天看點

【原創】Python處理海量資料的實戰研究

最近看了July的一些關于Java處理海量資料的問題研究,深有感觸,連結:http://blog.csdn.net/v_july_v/article/details/6685962

感謝July ^_^

他用的是Java的Hash Map等方法做了處理,講解的非常深刻入骨

我也一時興起,想拿Python試試刀,看看Python對于海量資料的處理能力如何。無奈在百度和Google輸入“Python 海量資料”都無果。可能是國内使用python的不多,用python處理海量資料的就更少了。不過這澆滅不了我的欲望,哈哈

打算拿July的其中一個問題來試驗一下:

July給出的解決方案:

方案1:首先是這一天,并且是通路百度的日志中的IP取出來,逐個寫入到一個大檔案中。注意到IP是32位的,最多有2^32個IP。同樣可以采用映射的方法,比如模1000,把整個大檔案映射為1000個小檔案,再找出每個小文中出現頻率最大的IP(可以采用hash_map進行頻率統計,然後再找出頻率最大的幾個)及相應的頻率。然後再在這1000個最大的IP中,找出那個頻率最大的IP,即為所求。
           

下手吧!

(一)生成資料

我首先構造1億個IP,這些IP前兩段都是“10.197”,後兩段為0-255的随機數

把這1億個IP存入一個文本檔案中

Python代碼如下:

__author__ = "Wally Yu ([email protected])"
__date__ = "$Date: 2012/04/09 $"

def generateRandom(rangeFrom, rangeTo):
    import random
    return random.randint(rangeFrom,rangeTo)

def generageMassiveIPAddr(fileLocation,numberOfLines):
    IP = []
    file_handler = open(fileLocation, 'a+')
    for i in range(numberOfLines):
        IP.append('10.197.' + str(generateRandom(0,255))+'.'+ str(generateRandom(0,255)) + '\n')

    file_handler.writelines(IP)
    file_handler.close()

if __name__ == '__main__':
    from time import ctime
    print ctime()
    for i in range(10):
        print '  ' + str(i) + ": " + ctime()
        generageMassiveIPAddr('d:\\massiveIP.txt', 10000000)
    print ctime()
           

這裡插一下,我的軟體硬體環境:

硬體:

 - ThinkPad T420(CPU: i7, 記憶體16G)

軟體:

 -OS: WinXP32位 (隻認出3.6G記憶體)

 - Python:2.7

從Python的print日志中基本可以看出,生成一千萬條IP位址大概需要1分鐘,生成1億條記錄需要10分鐘

占據硬碟空間:1.4G

日志:

Mon Apr 09 16:52:28 2012
  0: Mon Apr 09 16:52:28 2012
  1: Mon Apr 09 16:53:28 2012
  2: Mon Apr 09 16:54:29 2012
  3: Mon Apr 09 16:55:30 2012
  4: Mon Apr 09 16:56:32 2012
  5: Mon Apr 09 16:57:33 2012
  6: Mon Apr 09 16:58:36 2012
  7: Mon Apr 09 16:59:36 2012
  8: Mon Apr 09 17:00:36 2012
  9: Mon Apr 09 17:01:35 2012
Mon Apr 09 17:02:36 2012
           

(二)處理思路

假設現在可用記憶體僅為128M,而每行IP經計算需要14Byte

因為資料太大,把1億條資料載入記憶體,再做排序會導緻記憶體溢出。July的思想:“以大化小,分而治之”非常合适,我轉化後的操作思路:

1. 每行IP需要14B空間,那麼128M記憶體最多可以處理 128M / 14B = 9142857個IP

    把每36571429個IP拆成一個小檔案儲存起來,每個小檔案的大小小于等于128M,共将生成11個檔案

2. 對每個小檔案用Hash Table處理,Python有自己非常高效的Hash Table:Dictionary!

    具體處理如下:

    1). 建構名為“Result”的Dictionary,“key”為IP位址,“value”為該IP位址出現的次數,用來記錄11個檔案每一個的最多出現的IP

    2). 建構名為“IP”的Dictionary,“key”為IP位址,“value”為該IP位址出現的次數,用來記錄每一個小檔案的所有IP位址

    3). 讀入每一條IP位址到“IP” Dictionary,如果該IP位址出現過,把相應的value的值加1;如果該IP位址沒有出現過,則key=IP位址,value=1

    4). 對“IP” Dictionary進行内排序,傳回最大的IP位址(如果有若幹個最大值是一樣的,就都傳回)

    5). 将傳回值存入“Result” Dictionary

    6). 對“Result”進行内排序,傳回最大的IP位址(如果有若幹個最大值是一樣的,就都傳回)

(三)實作

1)拆分成小檔案

代碼如下:

__author__ = "Wally Yu ([email protected])"
__date__ = "$Date: 2012/04/10 $"

from time import ctime
def splitFile(fileLocation, targetFoler):
    file_handler = open(fileLocation, 'r')
    block_size = 9142857
    line = file_handler.readline()
    temp = []
    countFile = 1
    while line:
        for i in range(block_size):
            if i == (block_size-1):
                # write block to small files
                file_writer = open(targetFoler + "\\file_"+str(countFile)+".txt", 'a+')
                file_writer.writelines(temp)
                file_writer.close()
                temp = []
                print "  file " + str(countFile) + " generated at: " + str(ctime())
                countFile = countFile + 1
            else:
                temp.append(file_handler.readline())
    
    file_handler.close()

if __name__ == '__main__':
    print "Start At: " + str(ctime())
    splitFile("d:\\massiveIP.txt", "d:\\massiveData")
           

運作後的log如下:

Start At: Tue Apr 10 10:56:25 2012
  file 1 generated at: Tue Apr 10 10:56:37 2012
  file 2 generated at: Tue Apr 10 10:56:47 2012
  file 3 generated at: Tue Apr 10 10:57:00 2012
  file 4 generated at: Tue Apr 10 10:57:14 2012
  file 5 generated at: Tue Apr 10 10:57:26 2012
  file 6 generated at: Tue Apr 10 10:57:42 2012
  file 7 generated at: Tue Apr 10 10:57:53 2012
  file 8 generated at: Tue Apr 10 10:58:04 2012
  file 9 generated at: Tue Apr 10 10:58:16 2012
  file 10 generated at: Tue Apr 10 10:58:27 2012
  file 11 generated at: Tue Apr 10 10:58:38 2012
           

可見拆分一個檔案需要費時10-15秒,拆分檔案總共耗時2分14秒

2). 找出出現次數最大的IP:

代碼如下:

__author__ = "Wally Yu ([email protected])"
__date__ = "$Date: 2012/04/10 $"

import os
from time import ctime

def findIP(targetFolder):
    Result = {}
    IP = {}
    for root, dirs, files in os.walk(targetFolder):
        for f in files:
            # read small files
            file_handler = open(os.path.join(targetFolder, f), 'r')
            lines = file_handler.readlines()
            file_handler.close()
            # get IP in file, store to IP Dictionary
            for line in lines:
                if line in IP:
                    IP[line] = IP[line] + 1
                else:
                    IP[line] = 1
            # sort Dictionary
            IP = sorted(IP.items(), key=lambda d: d[1])
            # get max item(s), store to Result Dictionary
            maxItem = IP[-1][1]
            print '  File ' + str(f) + ":"
            print "    maxItem: " + str(IP[-1])
            tempTuple = IP.pop()
            while tempTuple[1] == maxItem:
                if tempTuple[0] in Result:
                    Result[tempTuple[0]] = Result[tempTuple[0]] + 1
                else:
                    Result[tempTuple[0]] = tempTuple[1]
                tempTuple = IP.pop()
            IP = {}
            print '    Finished: ' + ctime()
                    
    print sorted(Result.items(), key=lambda d: d[1])

if __name__ == '__main__':
    print 'Start: ' + ctime()
    findIP("d:\\massiveData")
    print 'End: ' + ctime()
           

運作後的log如下:

Start: Thu Apr 12 10:20:01 2012
  File file_1.txt:
    maxItem: ('10.197.223.85\n', 190)
    Finished: Thu Apr 12 10:20:23 2012
  File file_10.txt:
    maxItem: ('10.197.44.215\n', 194)
    Finished: Thu Apr 12 10:20:37 2012
  File file_11.txt:
    maxItem: ('10.197.251.171\n', 181)
    Finished: Thu Apr 12 10:20:48 2012
  File file_2.txt:
    maxItem: ('10.197.181.190\n', 191)
    Finished: Thu Apr 12 10:21:00 2012
  File file_3.txt:
    maxItem: ('10.197.135.27\n', 193)
    Finished: Thu Apr 12 10:21:14 2012
  File file_4.txt:
    maxItem: ('10.197.208.113\n', 192)
    Finished: Thu Apr 12 10:21:24 2012
  File file_5.txt:
    maxItem: ('10.197.120.160\n', 190)
    Finished: Thu Apr 12 10:21:34 2012
  File file_6.txt:
    maxItem: ('10.197.69.155\n', 193)
    Finished: Thu Apr 12 10:21:44 2012
  File file_7.txt:
    maxItem: ('10.197.88.144\n', 192)
    Finished: Thu Apr 12 10:21:55 2012
  File file_8.txt:
    maxItem: ('10.197.103.234\n', 193)
    Finished: Thu Apr 12 10:22:08 2012
  File file_9.txt:
    maxItem: ('10.197.117.46\n', 192)
    Finished: Thu Apr 12 10:22:20 2012
[('10.197.251.171\n', 181), ('10.197.120.160\n', 190), ('10.197.223.85\n', 190), ('10.197.181.190\n', 191), ('10.197.117.46\n', 192), ('10.197.208.113\n', 192), ('10.197.88.144\n', 192), ('10.197.147.29\n', 193), ('10.197.68.183\n', 193), ('10.197.69.155\n', 193), ('10.197.103.234\n', 193), ('10.197.135.27\n', 193), ('10.197.44.215\n', 194)]
End: Thu Apr 12 10:22:21 2012
           

由此可見,最大的IP位址為:“10.197.44.215”,共出現194次!

而Python的計算時間為2分20秒,非常快

【原創】Python處理海量資料的實戰研究

(四)引申測試

以上是在假設記憶體僅為128M下的計算時間,為了測試Python真正的執行效率,打算再寫一算法,将所有1.4G的資料一次性導入記憶體,并作内排序,看看它的執行效率

代碼如下:

__author__ = "Wally Yu ([email protected])"
__date__ = "$Date: 2012/04/10 $"

import os
from time import ctime

def findIPAtOnce(targetFile):
    print "Started At: " + ctime()
    Result = {}
    file_handler = open(targetFile, 'r')
    lines = file_handler.readlines()
    file_handler.close()
    print "File Read Finished At: " + ctime()

    for line in lines:
        if line in Result:
            Result[line] = Result[line] + 1
        else:
            Result[line] = 1
    print "Write to Dic Finished At: " + ctime()

    Result = sorted(Result.items(), key=lambda d: d[1])
    print "Sorting Finished At: " + ctime()

    print 'Result:'
    for i in range(10):
        print '  ' + str(Result.pop())

if __name__ == '__main__':
    findIPAtOnce("d:\\massiveIP.txt")
           

最後得到了Memory Error:

Traceback (most recent call last):
  File "C:/Documents and Settings/wally-yu/Desktop/findIPAtOnce.py", line 30, in <module>
    findIPAtOnce("d:\\massiveIP.txt")
  File "C:/Documents and Settings/wally-yu/Desktop/findIPAtOnce.py", line 11, in findIPAtOnce
    lines = file_handler.readlines()
MemoryError
           

哈哈哈!

為了測試Python的處理速度,重新生成一個小一點的Txt檔案,重新運作generageMassiveIPAddr函數,生成一千萬個IP位址

if __name__ == '__main__':
    from time import ctime
    print ctime()
    for i in range(1):
        print '  ' + str(i) + ": " + ctime()
        generageMassiveIPAddr('d:\\massiveIP_small.txt', 10000000)
    print ctime()
           

生成後的Txt占據144M的空間

再次運作

if __name__ == '__main__':
    findIPAtOnce("d:\\massiveIP_small.txt")
           

得到Log如下:

Started At: Thu Apr 12 11:03:35 2012
File Read Finished At: Thu Apr 12 11:03:41 2012
Write to Dic Finished At: Thu Apr 12 11:03:44 2012
Sorting Finished At: Thu Apr 12 11:03:45 2012
Result:
  ('10.197.222.105\n', 215)
  ('10.197.68.118\n', 210)
  ('10.197.229.152\n', 206)
  ('10.197.22.46\n', 202)
  ('10.197.98.83\n', 201)
  ('10.197.53.43\n', 201)
  ('10.197.169.65\n', 200)
  ('10.197.225.22\n', 200)
  ('10.197.146.78\n', 200)
  ('10.197.57.101\n', 200)
           

可見時間消耗如下:

檔案資料讀取:6秒

寫入Dictionary:3秒

排序:1秒

總共耗時不過10秒,而且大多時間都是I/O的開銷!!!

(五)小節

由以上種種可見Python對于海量資料處理的高效率,也為Python的同行處理海量資料提供了一些思路

有興趣的朋友可以拿其他語言做同樣的測試,共同進步

(六)修改

注:

1. 以上完成于2012年4月10日,本節及以下完成于2012年4月18日

2. 六、七節的增加是由于lidguan兄發現的一個大漏洞而做的修改,非常感謝!具體評論見下:

4樓 lidguan昨天 16:23發表[回複] [引用][舉報][删除]
【原創】Python處理海量資料的實戰研究

我有個問題,請不吝賜教:

将一個大檔案分割成許多的小檔案,但樓主分割方式好像是根據檔案大小來分割,然後分别排序,得出一系列的最大值,最後在最大值中比較,得出一個最後結果....

但每個ip可能在不同檔案中都有記錄,也許這個倒黴的ip是第一個檔案的第二名,在第二個檔案也是第二名,你用最大值進行比較,就會把這個倒黴的ip忽略掉,但其實這個ip才是真正的最大值..

我不懂python,當僞代碼看的....如有不對的地方,請多多原諒

我确實也是考慮不周,才導緻了以上算法的巨大漏洞,今天做如下修改:

思路:

1. 不對大檔案進行拆分,否則會産生lidguan兄提到的問題

2. 假設這個一億個IP位址的重複率比較高,重複後的IP可以一次性記錄入Python Dictionary (Java Hash Map),那麼就直接從大檔案中一條一條讀取IP位址,記錄入Dictionary

3. 對Dictionary進行排序并輸出

代碼:

__author__ = "Wally Yu ([email protected])"
__date__ = "$Date: 2012/04/18 $"

import os
from time import ctime

def findIPAtOnce(targetFile):
    print "Started At: " + ctime()
    Result = {}
    file_handler = open(targetFile, 'r')

    for line in file_handler:
        if line in Result:
            Result[line] = Result[line] + 1
        else:
            Result[line] = 1
    print "Write to Dic Finished At: " + ctime()

    file_handler.close()

    Result = sorted(Result.items(), key=lambda d: d[1])
    print "Sorting Finished At: " + ctime()

    print 'Result:'
    for i in range(10):
        print '  ' + str(Result.pop())

if __name__ == '__main__':
    findIPAtOnce("d:\\massiveIP.txt")
           

Log:

>>> 
Started At: Wed Apr 18 13:20:34 2012
Write to Dic Finished At: Wed Apr 18 13:21:34 2012
Sorting Finished At: Wed Apr 18 13:21:34 2012
Result:
  ('10.197.200.159\n', 1713)
  ('10.197.143.163\n', 1707)
  ('10.197.68.193\n', 1693)
  ('10.197.136.119\n', 1692)
  ('10.197.71.24\n', 1692)
  ('10.197.132.242\n', 1690)
  ('10.197.4.219\n', 1688)
  ('10.197.113.84\n', 1684)
  ('10.197.204.142\n', 1681)
  ('10.197.78.110\n', 1675)
           

由此可見,出現最多的IP為“10.197.200.159”,出現次數1713次!

執行時間:

  • 讀取檔案并寫入Dictionary:60秒
  • 内排序:小于1秒!!!

經過這次的修改,運算結果相信是可靠的

(七)總結

1. 修改後的代碼是在假設IP位址的重複性比較高,可以一次性導入記憶體中操作的前提下展開的;如果重複性低,或者更大量的資料,導緻無法一次導入記憶體的話,就需要使用外排序來找出IP位址了

2. 希望大家多多探讨,也幸虧lidguan兄的指出,否則險些釀成大錯

【原創】Python處理海量資料的實戰研究

3. 最近在讨論的純粹的QA是否有必要存在,我也來多嘴幾句,我相信這些代碼如果是經過了QA的測試後多多少少會降低風險,尤其是此類明顯的邏輯性的錯誤是肯定可以避免的,是以QA人員的重要性不言而喻。說要消滅QA,鄙人覺得是軟體工程思維的一種倒退

版權所有。轉載本BLOG内任何文章,請以超連結形式注明出處。