天天看點

sparkmllib關聯規則算法(FPGrowth,Apriori)

關聯規則算法的思想就是找頻繁項集,通過頻繁項集找強關聯。

介紹下基本概念:

對于A->B

1、置信度:P(B|A),在A發生的事件中同時發生B的機率 p(AB)/P(A) 例如購物籃分析:牛奶 ⇒ 面包

2、支援度:P(A ∩ B),既有A又有B的機率

假如支援度:3%,置信度:40%

支援度3%:意味着3%顧客同時購買牛奶和面包

置信度40%:意味着購買牛奶的顧客40%也購買面包

3、如果事件A中包含k個元素,那麼稱這個事件A為k項集事件A滿足最小支援度門檻值的事件稱為頻繁k項集。

4、同時滿足最小支援度門檻值和最小置信度門檻值的規則稱為強規則

apriori算法的思想

(得出的的強規則要滿足給定的最小支援度和最小置信度)

apriori算法的思想是通過k-1項集來推k項集。首先,找出頻繁“1項集”的集合,該集合記作L1。L1用于找頻繁“2項集”的集合L2,而L2用于找L3。如此下去,直到不能找到“K項集”。找每個Lk都需要一次資料庫掃描(這也是它最大的缺點)。

核心思想是:連接配接步和剪枝步。連接配接步是自連接配接,原則是保證前k-2項相同,并按照字典順序連接配接。剪枝步,是使任一頻繁項集的所有非空子集也必須是頻繁的。反之,如果某個候選的非空子集不是頻繁的,那麼該候選肯定不是頻繁的,進而可以将其從CK(頻繁項集)中删除。

下面一個比較經典的例子來說明apriori算法的執行步驟:

sparkmllib關聯規則算法(FPGrowth,Apriori)

上面隻計算了頻繁項集的支援度,沒有計算它的置信度。

基本概念

1. 項與項集
這是一個集合的概念,在一籃子商品中的一件消費品即一項(item),則若幹項的集合為項集,如{啤酒,尿布}構成一個二進制項集。
2、關聯規則
關聯規則用亍表示資料内隐含的關聯性,例如表示購買了尿布的消費者往往也會購買啤酒。關聯性強度如何,由3 個概念,即支援度、置信度、提升度來控制和評價。
3、支援度(support)
支援度是指在所有項集中{X, Y}出現的可能性,即項集中同時含有X 和Y 的機率:
設定最小門檻值為5%,由亍{尿布,啤酒}的支援度為800/10000=8%,滿足最小門檻值要求,成為頻繁項集,保留規則;而{尿布,面包}的支援度為100/10000=1%,則被剔除。
4、置信度(confidence)
置信度表示在先決條件X 發生的條件下,關聯結果Y 發生的機率:這是生成強關聯規則的第二個門檻,衡量了所考察的關聯規則在“質”上的可靠性。相似地,我們需要對置信度設定最小門檻值(mincon)來實作進一步篩選。
    當設定置信度的最小門檻值為70%時,例如{尿布,啤酒}中,購買尿布時會購買啤酒的置信度為800/1000=80%,保留規則;而購買啤酒時會購買尿布的置信度為800/2000=40%,則被剔除。
5. 提升度(lift)
提升度表示在含有X 的條件下同時含有Y 的可能性與沒有X 這個條件下項集中含有Y 的可能性之比:公式為置信度(artichok=>cracker)/支援度(cracker)。該名額與置信度同樣衡量規則的可靠性,可以看作是置信度的一種互補名額。
           

FPGrowth 算法

)掃描事務資料庫D 一次。收集頻繁項的集合F 和它們的支援度。對F 按支援度降序排序,結果為頻繁項
表L。
)建立FP 樹的根節點,以“null”标記它。對亍D 中的每個事務Trans,執行:選擇 Trans
中的頻繁項,并按L 中的次序排序。設排序後的頻繁項表為[p | P],其中,p 是第一個元素,而P 是剩餘元素的表。調用insert_tree([p | P], T)。該過程執行情況如下。如果T 有子節點N 使得N.item-name = p.item-name,則N 的計數增加;否則建立一個新節點N 将其計數設定為,連結到它的父節點T,并且通過節點的鍊結構将其連結到具有相同item-name 的節點中。如果P非空,則遞歸地調用insert_tree(P, N)。
           

分析執行個體

sparkmllib關聯規則算法(FPGrowth,Apriori)
sparkmllib關聯規則算法(FPGrowth,Apriori)
sparkmllib關聯規則算法(FPGrowth,Apriori)
sparkmllib關聯規則算法(FPGrowth,Apriori)
sparkmllib關聯規則算法(FPGrowth,Apriori)
sparkmllib關聯規則算法(FPGrowth,Apriori)
sparkmllib關聯規則算法(FPGrowth,Apriori)

源碼分析

sparkmllib關聯規則算法(FPGrowth,Apriori)
sparkmllib關聯規則算法(FPGrowth,Apriori)
def run[Item: ClassTag](data: RDD[Array[Item]]): FPGrowthModel[Item] = {
    if (data.getStorageLevel == StorageLevel.NONE) {
      logWarning("Input data is not cached.")
    }
    val count = data.count()
    val minCount = math.ceil(minSupport * count).toLong
    val numParts = if (numPartitions > ) numPartitions else data.partitions.length
    val partitioner = new HashPartitioner(numParts)
    val freqItems = genFreqItems(data, minCount, partitioner)
    val freqItemsets = genFreqItemsets(data, minCount, freqItems, partitioner)
    new FPGrowthModel(freqItemsets)
  }
           
private def genFreqItems[Item: ClassTag](
      data: RDD[Array[Item]],
      minCount: Long,
      partitioner: Partitioner): Array[Item] = {
    data.flatMap { t =>
      val uniq = t.toSet
      if (t.length != uniq.size) {
        throw new SparkException(s"Items in a transaction must be unique but got ${t.toSeq}.")
      }
      t
    }.map(v => (v, ))
      .reduceByKey(partitioner, _ + _)
      .filter(_._2 >= minCount)
      .collect()
      .sortBy(-_._2)
      .map(_._1)
  }
           
private def genFreqItemsets[Item: ClassTag](
      data: RDD[Array[Item]],
      minCount: Long,
      freqItems: Array[Item],
      partitioner: Partitioner): RDD[FreqItemset[Item]] = {
    val itemToRank = freqItems.zipWithIndex.toMap
    data.flatMap { transaction =>
      genCondTransactions(transaction, itemToRank, partitioner)
    }.aggregateByKey(new FPTree[Int], partitioner.numPartitions)(
      (tree, transaction) => tree.add(transaction, L),
      (tree1, tree2) => tree1.merge(tree2))
    .flatMap { case (part, tree) =>
      tree.extract(minCount, x => partitioner.getPartition(x) == part)
    }.map { case (ranks, count) =>
      new FreqItemset(ranks.map(i => freqItems(i)).toArray, count)
    }
  }
           
def generateAssociationRules(confidence: Double): RDD[AssociationRules.Rule[Item]] = {
    val associationRules = new AssociationRules(confidence)
    associationRules.run(freqItemsets)
  }
           
def run[Item: ClassTag](freqItemsets: RDD[FreqItemset[Item]]): RDD[Rule[Item]] = {
    // For candidate rule X => Y, generate (X, (Y, freq(X union Y)))
    val candidates = freqItemsets.flatMap { itemset =>
      val items = itemset.items
      items.flatMap { item =>
        items.partition( == item) match {
          case (consequent, antecedent) if !antecedent.isEmpty =>
            Some((antecedent.toSeq, (consequent.toSeq, itemset.freq)))
          case _ => None
        }
      }
    }

    // Join to get (X, ((Y, freq(X union Y)), freq(X))), generate rules, and filter by confidence
    candidates.join(freqItemsets.map(x => (x.items.toSeq, x.freq)))
      .map { case (antecendent, ((consequent, freqUnion), freqAntecedent)) =>
      new Rule(antecendent.toArray, consequent.toArray, freqUnion, freqAntecedent)
    }.filter(.confidence >= minConfidence)
  }
           

執行個體

FP-growth:

import org.apache.spark.mllib.fpm.FPGrowth
import org.apache.spark.rdd.RDD

val data = sc.textFile("data/mllib/sample_fpgrowth.txt")

val transactions: RDD[Array[String]] = data.map(s => s.trim.split(' '))

val fpg = new FPGrowth()
  .setMinSupport()
  .setNumPartitions()
val model = fpg.run(transactions)

model.freqItemsets.collect().foreach { itemset =>
  println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)
}

val minConfidence = 
model.generateAssociationRules(minConfidence).collect().foreach { rule =>
  println(
    rule.antecedent.mkString("[", ",", "]")
      + " => " + rule.consequent .mkString("[", ",", "]")
      + ", " + rule.confidence)
}
           

Association Rules:

import org.apache.spark.mllib.fpm.AssociationRules
import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset

val freqItemsets = sc.parallelize(Seq(
  new FreqItemset(Array("a"), L),
  new FreqItemset(Array("b"), L),
  new FreqItemset(Array("a", "b"), L)
))

val ar = new AssociationRules()
  .setMinConfidence()
val results = ar.run(freqItemsets)

results.collect().foreach { rule =>
  println("[" + rule.antecedent.mkString(",")
    + "=>"
    + rule.consequent.mkString(",") + "]," + rule.confidence)
}
           

繼續閱讀