Apriori算法是Agrawl和R.Srikant于1994年提出的,為布爾關聯規則挖掘頻繁項集的原創性算法[AS94b]。該算法使用了頻繁項集性質的先驗知識,使用了一種稱為逐層疊代方法。為了提高頻繁項集逐層産生的效率,該算法使用了先驗性質 用于壓縮搜尋空間。
先驗性質 :頻繁項集的所有非空子集也一定是頻繁的。
Apriori算法挖掘頻繁項集主要由兩步組成——連接配接步 和剪枝步 。
參考連結:關聯規則,Apriori算法及python實作
原文中沒有進行剪枝的操作,本文主要添加了剪枝部分的代碼,通過周遊候選項集的子集,加入該項的子集不在頻繁項集中,則該項不是頻繁項,将其删去。
# -*- coding: utf-8 -*-
import copy
def PowerSetsBinary(items):
"""
找出集合的所有子集
"""
#generate all combination of N items
N = len(items)
#enumerate the 2**N possible combinations
for i in range(**N):
combo = []
for j in range(N):
#test jth bit of integer i
if(i >> j ) % == :
combo.append(items[j])
yield combo
def loadDataSet():
"""
建立一個用于測試的簡單的資料集
"""
D = [[, , ], [, ], [, ], [, , ], [, ], [, ], [, ], [, , , ], [, , ]]
return D
def createC1(dataSet):
"""
建構初始候選項集的清單,即所有候選項集隻包含一個元素,
C1是大小為1的所有候選項集的集合
"""
C1 = []
for transaction in dataSet:
for item in transaction:
if [item] not in C1:
C1.append([item])
C1.sort()
# return map( frozenset, C1 )
# return [var for var in map(frozenset,C1)]
return [frozenset(var) for var in C1]
def scanDataSet(D, Ck, minSupport):
"""
計算Ck中的項集在資料集合D(記錄或者transactions)中的支援度,
傳回滿足最小支援度的項集的集合,和所有項集支援度資訊的字典。
"""
subSetCount = {}
# D=[{},{},{}] tid.type==set
for tid in D:
# Ck = [{},{},{}],can.type==frozenset
for can in Ck:
# 檢查候選k項集中的每一項的所有元素是否都出現在每一個事務中,若true,則加1
if can.issubset(tid):
# subSetCount為候選支援度計數,get()傳回值,如果值不在字典中則傳回預設值0。
subSetCount[can] = subSetCount.get(can, ) +
numItems = float(len(D))
returnList = []
# 選擇出來的頻繁項集,未使用先驗性質
supportData = {}
for key in subSetCount:
# 計算絕對支援度。
support = subSetCount[key] / numItems # 每個項集的支援度
if support >= minSupport: # 将滿足最小支援度的項集,加入returnList
returnList.insert(, key)
supportData[key] = support # 彙總支援度資料
return returnList, supportData
def aprioriGen(Lk, k): # Aprior算法
"""
由初始候選項集的集合Lk生成新的生成候選項集,
k表示生成的新項集中所含有的元素個數
"""
returnList = []
for i in range(len(Lk)):
L1 = list(Lk[i])[: k - ]
for j in range(i + , len(Lk)):
# Lk[i].type == frozenset
# 隻需取前k-2個元素相等的候選頻繁項集即可組成元素個數為k+1的候選頻繁項集
L2 = list(Lk[j])[: k - ]
L1.sort()
L2.sort()
if L1 == L2:
# print("k:{}---L1:{}---L2:{}".format(k, Lk[i], Lk[j]))
# 傳回一個包含Lk[i]和Lk[j]中每一個元素的集合set,相當于集合的union方法
returnList.append(Lk[i] | Lk[j])
# print("returnList:{}".format(returnList))
return returnList
def has_infrequent_subset(L, Ck, k):
# 這裡涉及到深拷貝、淺拷貝的知識
Ckc = copy.deepcopy(Ck)
for i in Ck:
p = [t for t in i]
i_subset = PowerSetsBinary(p)
subsets = [i for i in i_subset]
# print(subsets)
for each in subsets:
# print(each)
if each!=[] and each!=p and len(each)<k:
# [t for z in L for t in z]将清單中的frozenset全部移到一層中
if frozenset(each) not in [t for z in L for t in z]:
Ckc.remove(i)
break
return Ckc
def apriori(dataSet, minSupport):
# 建構初始候選項集C1
C1 = createC1(dataSet)
# 将dataSet集合化,以滿足scanDataSet的格式要求
D = [set(var) for var in dataSet]
# 建構初始的頻繁項集,即所有項集隻有一個元素
L1, suppData = scanDataSet(D, C1, minSupport)
# 最初的L1中的每個項集含有一個元素,新生成的
L = [L1]
# 項集應該含有2個元素,是以 k=2
k =
while (len(L[k - ]) > ):
Ck = aprioriGen(L[k - ], k)
# 剪枝
Ck2 = has_infrequent_subset(L, Ck, k)
# 候選支援度計數和min_sup進行比較
Lk, supK = scanDataSet(D, Ck2, minSupport)
# 将新的項集的支援度資料加入原來的總支援度字典中
suppData.update(supK)
# 将符合最小支援度要求的項集加入L
L.append(Lk)
# 新生成的項集中的元素個數應不斷增加
k +=
# 傳回所有滿足條件的頻繁項集的清單,和所有候選項集的支援度資訊
return L[:-], suppData
if __name__ == '__main__':
myDat = loadDataSet()
L, suppData = apriori(myDat, )
print("頻繁項集L:", L)
挖掘出頻繁項集之後,接下來進行第二步關聯規則的挖掘。請參看第二篇文章由頻繁項集産生關聯規則