天天看點

《Hadoop與大資料挖掘》一2.5.4 Hadoop K-Means程式設計實作

本節書摘來華章計算機《hadoop與大資料挖掘》一書中的第2章 ,第2.5.4節,張良均 樊 哲 位文超 劉名軍 許國傑 周 龍 焦正升 著 更多章節内容可以通路雲栖社群“華章計算機”公衆号檢視。

在下面的實作過程中,會進行簡單實作思路介紹,針對一些實作會有動手實踐給讀者練習。一般情況下我們建議讀者自己全部實作,對于實作起來有難度的讀者,我們提供了參考程式,但是需要注意,參考程式不是完整的,裡面設定了todo提示,這些地方是需要讀者去完善的。

《Hadoop與大資料挖掘》一2.5.4 Hadoop K-Means程式設計實作

思路1

不管是思路1還是思路2,hadoop實作k-means算法都包含4個步驟:①初始化聚類中心向量;②進行聚類并更新聚類中心向量;③判斷是否達到循環條件,如果是則循環;④判斷是否需要對原始資料進行分類,如果是則進行分類操作。下面就針對這4個步驟分别進行分析。

(1)初始化聚類中心向量:蓄水池抽樣

初始化聚類中心其實和單機算法類似,可以有多種方法,比如随機取出k個聚類中心向量、直接取出前k個聚類中心向量等。在hadoop的程式設計架構mapreduce限制下,如果是随機取k個聚類中心向量,那麼實作起來就是這樣的:周遊一次所有資料,統計資料個數n,再次周遊,按照k/n機率抽取k個資料。這樣不是不可以,但是效率太低,并且如果真要實作起來,還是要考慮多個問題的,比如如果有多個mapper怎麼處理?

這裡提出一種效率高,并且還能達到随機取數的算法—蓄水池抽樣。

什麼是蓄水池抽樣呢?簡單描述:先選中第1~k個元素,作為被選中的元素。然後依次對第k+1至第n個元素做如下操作:每個元素都有k/x的機率被選中,然後等機率地(1/k)替換掉被選中的元素(其中x是元素的序号)。其算法僞代碼描述如代碼清單2-33所示。

蓄水池抽樣同樣可以使用driver、mapper、reducer來進行分析。driver部分可以參考mapreduce程式的固定模式,但是需要注意,需要傳入聚類中心向量的個數,即k值。其代碼參考代碼清單2-34。

mapper就是蓄水池抽樣算法的具體實作了,這裡需要注意,map函數針對每條記錄進行篩選,并不輸出,是以這裡在cleanup進行輸出。這樣就需要在setup裡面初始化一個變量來存儲目前已經被選為聚類中心向量的值。其各個函數描述如下。

setup():讀取傳入的參數值selectedrecordsnum,初始化目前處理的行數周遊row、存儲已經選擇的selectedrecordsnum個資料變量selectedrecords。

map():每次map函數讀取一行記錄,判斷目前行數row是否小于selectedre-cordsnum,如果小于則直接把目前記錄加入selectedrecords;否則,以機率selectedrecordsnum/row使用目前記錄來對selectedrecords中的任一記錄進行替換。其部分代碼如代碼清單2-35所示。

cleanup():直接輸出selectedrecords的内容即可。

在設計reducer的時候需要考慮的一個問題是,如果有多個mapper怎麼辦?多個mapper就會發送k×n個聚類中心向量到reducer中(其中n為mapper的個數),是以在reducer端需要對k×n個記錄再次篩選,選出其中的k個聚類中心向量。這裡當然也有多種方法,其實這裡的選擇和最開始我們在mapper中針對所有資料随機選取k條記錄的選擇一樣,這裡所有資料隻是“變”小了而已。因為是在reducer中處理(一個reducer可以了解為單機),是以其實也可以了解為單機的随機選擇k條記錄的算法。這裡随機選擇k條記錄的算法也可以,不過我們這裡還是選擇使用蓄水池抽樣。

這裡隻能使用一個reducer,為什麼?請讀者思考。

動手實踐:蓄水池抽樣hadoop實作

首先了解上面蓄水池抽樣算法的hadoop實作的描述及分析,接着建立工程,并參考上節完善工程代碼功能。

實驗步驟:

1)打開eclipse,建立工程2.5_002_sample;

2)添加相關環境(如jdk路徑、hadoop路徑等);

3)參考上節蓄水池抽樣hadoop實作原理實作編寫源代碼;

4)把工程編譯,并導出jar包,然後上傳jar包到master節點上,使用yarn jar的方式運作,檢視輸出及相關日志。

思考:

1)還有其他方式實作蓄水池抽樣嗎?

2)如何檢視蓄水池抽樣抽取出來的結果?

(2)更新聚類中心向量

更新聚類中心向量其實就是整個k-means算法的核心所在,k-means算法的每次循環其實就是一個不斷更新聚類中心向量的過程。那麼具體怎麼更新呢?我們在單機算法中已經知道怎麼更新了,怎麼把其轉換為hadoop的mapreduce代碼呢?其實,可以把每個mapper了解為一個單機算法,因為其處理的資料其實是所有資料的一部分(一個檔案塊)。下面來看具體涉及的driver、mapper和reducer。

針對driver類,除了一些固定寫法外,還需傳入聚類初始中心向量路徑、聚類中心個數、列分隔符(考慮是否需要?),其示例代碼如代碼清單2-36所示。

reducer設定多個會有什麼問題?可以設定多個嗎?設定多個有什麼好處?

mapper的工作主要包括兩個:其一,讀取首次hdfs上的聚類中心;其二,根據聚類中心對每個鍵值對記錄進行距離計算,輸出距離最小的聚類中心id以及該條鍵值對記錄。下面針對具體實作做分析。

1)setup():讀取傳入的初始聚類中心向量路徑,根據路徑讀取對應的資料,利用分隔符來對初始聚類中心向量進行初始化(初始化為數組和清單)。

2)map():在map階段根據初始化的聚類中心向量對目前記錄進行分類,輸出其對應的聚類中心id、目前記錄,如代碼清單2-37所示。

reducer要做的工作就是針對每個組的所有資料計算其平均值(該平均值就是新的聚類中心向量)。其函數描述如下。

1)reduce():每個reduce函數針對同一個聚類中心id的資料進行處理;具體處理過程為,把每條記錄對應列的值加起來,同時記錄目前的記錄數;接着,使用每列和除以記錄數,即可得到每列平均值,也就是目前聚類中心id新的聚類中心,如代碼清單2-38所示。

3)cleanup():輸出每個類别新的聚類中心。

動手實踐:hadoop實作更新聚類中心向量

實驗步驟如下:

1)打開eclipse,打開上一小節完成的工程;

2)根據上節hadoop實作更新聚類中心實作思路,編寫對應源代碼;

3)把工程編譯并導出jar包,然後上傳jar包到master節點上,使用yarn jar的方式運作,檢視輸出及相關日志。

思考:如何測試代碼?

(3)是否循環

是否循環其實就是檢查前後兩次聚類中心向量是否滿足給定門檻值。這裡使用的是方差,其描述如圖2-51所示。

《Hadoop與大資料挖掘》一2.5.4 Hadoop K-Means程式設計實作

還需要注意的問題是,如果不滿足delta門檻值,那麼再次循環需初始化對應參數,主要包括下一個mapreduce程式的輸入聚類中心向量及輸出路徑等。

動手實踐:hadoop實作更新聚類中心向量循環

2)參考上述描述完成對應的代碼;

3)編譯工程并導出jar包,然後上傳jar包到master節點上,使用yarn jar的方式運作,檢視輸出及相關日志。

(4)是否分類

分類是針對原始資料進行的,這個工作其實在更新聚類中心向量的mapper已經做了這個工作,是以分類可以參考前面的mapper。這裡不給出其具體代碼,讀者隻需要完成動手實踐即可(分類動手實踐)。

動手實踐:hadoop實作最終分類

1)打開eclipse,并打開已經完成的工程;

2)使用kmeansmapper的實作,編輯driver主類,分類原始資料;

3)編譯工程,并導出jar包,然後上傳jar包到master節點上,使用hadoop jar的方式運作,檢視輸出及相關日志。

思路2

思路2其實和思路1裡面的大部分步驟都是一樣的邏輯流程,隻是在更新聚類中心向量環節做了優化。下面隻針對優化的環節做分析,其他部分請讀者參考思路1。

(1)更新聚類中心向量

更新聚類中心向量的driver部分直接參考思路1對應内容即可,這裡直接分析其mapper實作。結合前面内容,我們知道這裡需要實作自定義值類型。

由于mapper輸出的類型包含列和、個數,是以這裡可以自定義一個值類型,該值類型需包含一個double的數組,用于存儲某個類别的所有列和;一個long變量,用于存儲目前類别的資料個數,如代碼清單2-39所示。

同時,需要覆寫readfields、write函數,在這裡針對數組類型還需要做些額外的處理。其處理過程為存儲數組的長度,在執行個體化類的時候傳入數組的長度,否則會報nullpointer的異常,如代碼清單2-40所示。

寫入或者讀取時,注意順序,順序重要嗎?如果亂序會有什麼影響?請讀者思考。

下面針對mapper進行分析。

setup():在setup函數中,除了需要參考思路1把初始聚類中心讀取出來外,還需要初始化“列和”;由于每個類别都有一個“列和”,是以可以定義一個“列和”數組;然後根據聚類中心數來初始化該“列和”數組;同時,根據初始聚類中心的列個數類初始化每個類别的“列和”的double數組,如代碼清單<code>`</code>javascript

2-41所示。

代碼清單2-41 更新聚類中心向量mapper的setup函數示例

private sumnumwritable[] sumnums = null;

@override

protected void setup(mapper.context context)

}

代碼清單2-42 更新聚類中心向量mapper輸出自定義值類型構造函數

public sumnumwritable(int size) {

this.sum = new double [size];

this.num =0;

代碼清單2-44 更新聚類中心向量mapper的cleanup函數示例

/**

輸出

*/

protected void cleanup(context context)

代碼清單2-45 更新聚類中心向量reducer reduce示例代碼

protected void reduce(intwritable key, iterable values,

log.info("id:{},聚類中心是:[{}]",new object[]{key.get(),vec.tostring ()});

繼續閱讀