K-Means 算法聚類
非監督學習: 從資料中發現隐含的關系 對資料進行聚類 cluster
監督學習: 根據已有的曆史資料 對資料進行分類 classification
K-Means 算法
問題:如何對資料進行聚類?
-
假設資料集T中, 由K類的資料, 但是如何确定這些資料之間存在關系
損失函數: 平方誤差函數 我們可以以它們之間距離度量确定資料之間存在關系,越是相似,那麼距離度量就越近
算法思路:
确定K個質心, 将資料分為k類, 實質上質心的值就是資料集中所屬于第k個質心的樣本資料子集的均值
确定算法思路之後,嘗試實作算法
已知條件: 資料集 損失函數 需要選擇質心 周遊資料集與每個質心的距離 根據損失函數進行重新配置設定質心 周遊執行直到聚類效果達到最優
分解問題:
step1: 讀取資料集
step2: 建立損失函數 SSE sum of Square
step3: 初始化質心選擇 選擇k個質心
step4: k-means 算法實作
算法主體step4
step4 算法實作僞代碼;
資料集次元為mxn
建立K個點作為啟始質心
建立次元為(mx2)的矩陣clusterment 第一列記錄配置設定的質心 第二列記錄與該質心的損失函數度量
while True 循環 (有且僅當資料配置設定不改變時,停止循環)
周遊資料集中的每一個點
周遊每一個質心
計算每一個點與每一個質心的損失函數大小
比較大小,記錄min_error and best_k 最小損失函數 以及對應的質心
将資料點配置設定到距離最近的簇中
并且檢查資料配置設定是否仍然變動 沒有變動了就停止循環
同時更新每個質心的值
每個質心的都是資料集中所屬于該質心的樣本資料集的均值 (是以才是k-Means算法)
傳回 更新後的質心以及 clusterments
K-Means算法實作
import numpy as np
def load_data(filename):
fr = open(filename)
data_set = list()
for lines in fr.readlines():
line = lines.strip().split('\t')
line_list = list()
for i in line:
line_list.append(float(i))
data_set.append(line_list)
data_set = np.mat(data_set)
return data_set
def calc_dist_sse(data_set, centroids):
"""
k-means 距離度量方式 sum of square error
:param data_set:
:param centroids:
:return:
"""
# print('data set', type(data_set))
# print('centriods', type(centroids))
return np.sqrt(np.sum(np.power(data_set - centroids, )))
def create_centroids(data_set, k):
"""
k means 算法 随機選取質心
:param data_set: 資料集
:param k: k個質心
:return
centroids
"""
m, n = np.shape(data_set)
# print('shape data set', np.shape(data_set))
centroids = np.mat(np.zeros((k, n)))
# print('centroids', np.shape(centroids))
for row in range(k):
for index in range(n):
range_min = np.min(data_set[:, index])
range_of = np.max(data_set[:, index]) - range_min
centroids[row, index] = range_min + np.random.uniform(range_of)
# print('centroids\n', centroids)
return centroids
def k_means(data_set, k, calc_dist, create_centroids=create_centroids):
"""
k means 算法主體
計算質心——配置設定——重新計算
實作重點:
part1: 通過對每個點周遊所有質心并計算點到每個置信的距離
part2: 周遊所有質心并更新它們的取值
:param data_set:資料集
:param k:k個簇
:param calc_dist: 指向計算距離的函數
:param create_centroids: 指向建立質心的函數
:return:
centroids: 最終确定的質心
cluster_ment: 聚類的情況(class, dist) 配置設定到哪個質心附近 以及dist距離
"""
m, n = np.shape(data_set)
cluster_ment = np.mat(np.zeros((m, )))
centroids = create_centroids(data_set, k)
print('init centriods', centroids)
cluster_changed = True
count =
while cluster_changed:
count +=
cluster_changed = False
# step1 通過對每個點周遊所有質心并計算每個點到每個質心的距離
for i in range(m): # 循環每一個資料點并配置設定到最近的質心中去
# print(np.inf, -1)
min_dist = np.inf
min_index = -
for j in range(k):
dist_ji = calc_dist(data_set[i, :], centroids[j, :]) # 計算資料點到質心的最小距離
if dist_ji < min_dist: # 如果距離比最小距離還小, 就更新最小距離和最小質心的index
min_dist = dist_ji
min_index = j
if cluster_ment[i, ] != min_index: # 簇配置設定結果改變
cluster_changed = True
cluster_ment[i, :] = min_index, min_dist** # 更新簇配置設定結果為最小質心的索引,最小距離
# step2 周遊所有質心并更新它們的取值
for cent in range(k):
pts_in_cluster = data_set[np.nonzero(cluster_ment[:, ].A == cent)[]] # 擷取該簇中的所有點
centroids[cent, :] = np.mean(pts_in_cluster, axis=) # 将質心修改為簇中所有點的平均值
print('centriods changed', centroids)
print('count', count)
return centroids, cluster_ment
K-Means聚類算法缺陷
缺陷:當面對資料分布過于’散亂’的時候,以及初始質心選擇不合理(靠的太近), k取值不合理(不好把握資料聚類中到底幾類), 損失函數不合理
當出現上述情況的時候,k-means算法可能會陷入局部最優,而不是全局最優
解決問題的思路:
- 什麼時候才是全局最優?
- 已知損失函數 J(θ)=(y(i)−y^)2−−−−−−−−√ J ( θ ) = ( y ( i ) − y ^ ) 2
- 有且僅當存在 argmin∑i∈kNJ(θ) arg min ∑ i ∈ k N J ( θ )
- 即: 所有樣本所配置設定的質心與其損失函數之和 最小時才是達到了最優化
算法實作思路:
對生成後的簇進行處理
1.’大簇’,第k個質心點若是配置設定的樣本資料點較多,就嘗試将該簇分解為兩個小簇并計算最大SSE 與沒有改變之前相比較, 小就分解成功
2.對兩個’小簇’進行合并,并計算SEE之和, 與未合并之前比較, 若是小于就合并成功,反之不需要分解
針對這一思路: 處理生成後的簇較為複雜,是以出現了K-Means算法的優化算法: b-k-means算法 二分k-Means算法
二分K-Means算法
解決問題: 為克服K-Means算法收斂于局部最小值的問題
優化思路;
在上述解決K-Means算法缺陷的思路中,我們是對應生成後的簇進行優化
那麼如果在生成之前進行優化呢
即:算法一開始, 就将資料集看做一個大簇,然後再分解優化!!!
算法思路:
首先将所有點看做一個簇, 然後将該簇一分為二
之後選擇其中的一個簇,繼續劃分
問題? 如果選擇? 取決于對其劃分是否可以最大程度的降低SSE的值
疊代上述步驟, 直到得到使用者指定的簇數目
算法實作:
- 以矩陣形式處理資料集 次元為mxn
- 一開始将所有資料點看做一個簇, 是以初始化質心 該質心為所有資料點的均值
- 建立次元為mx2的矩陣 記錄配置設定的質心以及與相應質心的距離
- 計算所有資料點到初始質心的距離平方誤差
當質心數量小于k時: 周遊每一個質心 調用K-Means算法 算法參數為(與該質心對應的樣本資料集, k=2, 損失函數)====>二分K-Means算法 傳回本次二分的 質心與簇配置設定效果 計算本次被二分的質心對于資料集的損失函數之和 與沒有被二分的質心對應資料集的損失函數之和(2+1) ————————> 總的誤差和越小,越相似, 效果越優化, 劃分的效果更好 記錄本次優化的原質心index, 優化拆分出的質心集, 優化拆分出簇配置設定效果 #更新最好的簇配置設定結果 調用K-means配置設定結構 最好的簇配置設定預設為0,1 原本為1的更新為 len(centList) 原本為0的更新為最佳質心 # 更新質心清單 更新原質心List中的第i個質心為使用二分K-Means算法優化後的第1個質心 更新原質心List中的第2個質心為二分K-Means算法優化後的第2個質心 重新配置設定最好簇下的資料(質心)以及SSE 傳回 質心 與 簇配置設定結果
算法實作 以及測試
def biKMeans(data_set, k, calc_dist=calc_dist_sse):
m = np.shape(data_set)[]
clusterAssment = np.mat(np.zeros((m,))) # 儲存每個資料點的簇配置設定結果和平方誤差
centroid0 = np.mean(data_set, axis=).tolist()[] # 質心初始化為所有資料點的均值
centList =[centroid0] # 初始化隻有 1 個質心的 list
for j in range(m): # 計算所有資料點到初始質心的距離平方誤差
clusterAssment[j,] = calc_dist(np.mat(centroid0), data_set[j,:])**
while (len(centList) < k): # 當質心數量小于 k 時
lowestSSE = np.inf
for i in range(len(centList)): # 對每一個質心
ptsInCurrCluster = data_set[np.nonzero(clusterAssment[:,].A==i)[],:] # 擷取目前簇 i 下的所有資料點
centroidMat, splitClustAss = k_means(ptsInCurrCluster, , calc_dist) # 将目前簇 i 進行二分 kMeans 處理
sseSplit = sum(splitClustAss[:,]) # 将二分 kMeans 結果中的平方和的距離進行求和
sseNotSplit = sum(clusterAssment[np.nonzero(clusterAssment[:,].A!=i)[],]) # 将未參與二分 kMeans 配置設定結果中的平方和的距離進行求和
print("sseSplit, and notSplit: ",sseSplit,sseNotSplit)
if (sseSplit + sseNotSplit) < lowestSSE: # 總的(未拆分和已拆分)誤差和越小,越相似,效果越優化,劃分的結果更好(注意:這裡的了解很重要,不明白的地方可以和我們一起讨論)
bestCentToSplit = i
bestNewCents = centroidMat
bestClustAss = splitClustAss.copy()
lowestSSE = sseSplit + sseNotSplit
# 找出最好的簇配置設定結果
bestClustAss[np.nonzero(bestClustAss[:,].A == )[],] = len(centList) # 調用二分 kMeans 的結果,預設簇是 0,1. 當然也可以改成其它的數字
bestClustAss[np.nonzero(bestClustAss[:,].A == )[],] = bestCentToSplit # 更新為最佳質心
print('the bestCentToSplit is: ',bestCentToSplit)
print('the len of bestClustAss is: ', len(bestClustAss))
# 更新質心清單
centList[bestCentToSplit] = bestNewCents[,:].tolist()[] # 更新原質心 list 中的第 i 個質心為使用二分 kMeans 後 bestNewCents 的第一個質心
centList.append(bestNewCents[,:].tolist()[]) # 添加 bestNewCents 的第二個質心
clusterAssment[np.nonzero(clusterAssment[:,].A == bestCentToSplit)[],:]= bestClustAss # 重新配置設定最好簇下的資料(質心)以及SSE
return np.mat(centList), clusterAssment
def main():
filename = 'testSet.txt'
data_set = load_data(filename)
print('mean data set ', np.mean(data_set, axis=).tolist()[])
# print('*****data set*****\n', data_set)
# create_centroids(data_set, k=3)
#
# centriods, cluster_ment = k_means(data_set, k=3, calc_dist=calc_dist_sse, create_centroids=create_centroids)
# print('result centriods,', centriods)
# print("result cluster_ment", cluster_ment)
centroids, cluster_ment = biKMeans(data_set, , calc_dist=calc_dist_sse)
print('centroids', centroids)
print('cluster ment', cluster_ment)
if __name__ == '__main__':
main()
參考文獻
《機器學習實戰》