天天看點

Spark:聚類算法之LDA主題模型算法Spark上實作LDA原理Spark實作LDA執行個體

http://blog.csdn.net/pipisorry/article/details/52912179

Spark上實作LDA原理

LDA主題模型算法

[主題模型TopicModel:隐含狄利克雷分布LDA ]

Spark實作LDA的GraphX基礎

在Spark 1.3中,MLlib現在支援最成功的主題模型之一,隐含狄利克雷分布(LDA)。LDA也是基于GraphX上建構的第一個MLlib算法,GraphX是實作它最自然的方式。

有許多算法可以訓練一個LDA模型。我們選擇EM算法,因為它簡單并且快速收斂。因為用EM訓練LDA有一個潛在的圖結構,在GraphX之上建構LDA是一個很自然的選擇。

LDA主要有兩類資料:詞和文檔。我們把這些資料存成一個偶圖(如下所示),左邊是詞節點,右邊是文檔節點。每個詞節點存儲一些權重值,表示這個詞語和哪個主題相關;類似的,每篇文章節點存儲目前文章讨論主題的估計。

Spark:聚類算法之LDA主題模型算法Spark上實作LDA原理Spark實作LDA執行個體

每當一個詞出現在一篇文章中,圖中就有一個邊連接配接對應的詞節點和文章節點。例如,在上圖中,文章1包含詞語“hockey” 和“system”

這些邊也展示了這個算法的流通性。每輪疊代中,每個節點通過收集鄰居資料來更新主題權重資料。下圖中,文章2通過從連接配接的詞節點收集資料來更新它的主題估計。

Spark:聚類算法之LDA主題模型算法Spark上實作LDA原理Spark實作LDA執行個體

GraphX是以是LDA自然的選擇。随着MLlib的成長,我們期望未來可以有更多圖結構的學習算法!

可擴充性

LDA的并行化并不直覺,已經有許多研究論文提出不同的政策來實作。關鍵問題是所有的方法都需要很大量的通訊。這在上圖中很明顯:詞和文檔需要在每輪疊代中用新資料更新相鄰節點,而相鄰節點太多了。

我們選擇了EM算法的部分原因就是它通過很少輪的疊代就能收斂。更少的疊代,更少的通訊。

Note: Spark的貢獻者正在開發更多LDA算法:線上變分貝葉斯(一個快速近似算法)和吉布斯采樣(一個更慢但是有時更準确的算法)。

[用 LDA 做主題模型:當 MLlib 邂逅 GraphX]

[Spark官網上關于LDA的解釋:Latent Dirichlet allocation (LDA)]

PySpark.ml庫中Clustering LDA簡介

LDA通過 setOptimizer 函數支援不同的推斷算法。EMLDAOptimizer 對于似然函數用 expectation-maximization 算法學習聚類,然後獲得一個合理的結果。OnlineLDAOptimizer使用疊代的mini-batch抽樣來進行 online variational inference,它通常對記憶體更友好。

LDA接收文檔集合表示的詞頻向量,和下列參數(使用builder模式進行設定):

  • k: 主題數(也就是聚類中心數)
  • optimizer: 優化計算方法,目前支援"em", "online"。學習LDA模型使用的優化器,EMLDAOptimizer 或者 OnlineLDAOptimizer。
  • docConcentration: 文檔-主題分布的先驗Dirichlet參數。值越大,推斷的分布越平滑。文章分布的超參數(Dirichlet分布的參數)。隻支援對稱的先驗,是以在提供的k維向量中所有值都相等。所有值也必須大于1.0。
  • topicConcentration: 主題-詞語分布的先驗Dirichlet參數。值越大,推斷的分布越平滑。主題分布的超參數(Dirichlet分布的參數),必需>1.0。
  • maxIterations: 疊代次數的限制
  • checkpointInterval: 疊代計算時檢查點的間隔。如果你使用checkpointing(在Spark配置中設定),該參數設定checkpoints建立的次數,如果maxIterations過大,使用checkpointing可以幫助減少磁盤上shuffle檔案的大小,然後幫助失敗恢複。
  • setSeed:随機種子

參數設定

Expectation Maximization

docConcentration: 提供Vector(-1)會導緻預設值 (uniform k dimensional vector with value (50/k))+1。

topicConcentration: 提供-1會導緻預設值0.1 加1。

Online Variational Bayes

docConcentration:Providing Vector(-1) results indefault behavior (uniform k dimensional vector with value (1.0/k) ).

topicConcentration: Providing -1 results in defaulting to a value of (1.0/k) .

[Latent Dirichlet allocation (LDA)]

[Asuncion, Welling, Smyth, and Teh. “On Smoothing and Inference for Topic Models.” UAI, 2009.]

所有spark.mllib的 LDA 模型都支援:

  • describeTopics: 傳回主題,它是最重要的term組成的數組和term對應的權重組成的數組。
  • topicsMatrix: 傳回一個 vocabSize*k 維的矩陣,每一個列是一個topic。

注意:LDA仍然是一個正在開發的實驗特性。某些特性隻在兩種優化器/由優化器生成的模型中的一個提供。目前,分布式模型可以轉化為本地模型,反過來不可以。

LDA求解的優化器/模型

Expectation Maximization

Implemented in EMLDAOptimizer and DistributedLDAModel.

提供給LDA的參數有:

  • docConcentration: 隻支援對稱的先驗,是以在提供的k維向量中所有值都相等。所有值也必須大于1.0。提供Vector(-1)會導緻預設值 (uniform k dimensional vector with value (50/k))+1。
  • topicConcentration: 隻支援對稱的先驗,所有值也必須大于1.0。提供-1會導緻預設值0.1 加1。
  • maxIterations: EM疊代的最大次數。

注意:做足夠多次疊代是重要的。在早期的疊代中,EM經常會有一些無用的topics,但是這些topics經過更多次的疊代會有改善。依賴你的資料集,如果使用至少20個topic,可能需要50-100次的疊代。

EMLDAOptimizer 會産生 DistributedLDAModel, 它不隻存儲推斷的主題,還有所有的訓練語料,以及訓練語料庫中每個文檔的主題分布:

  • topTopicsPerDocument: 訓練語料庫中每個文檔的前若幹個主題和對應的權重
  • topDocumentsPerTopic: 每個主題下的前若幹個文檔和文檔中對應的主題的權重
  • logPrior: 基于超參doc Concentration 和 topic Concentration,估計的主題和文檔-主題分布的對數機率。
  • logLikelihood: 基于推斷的主題和文檔-主題分布,訓練語料的對數似然。

Online Variational Bayes

Implemented in OnlineLDAOptimizer and LocalLDAModel.

提供給LDA的參數有:

  • docConcentration: 通過傳遞每個次元值都等于Dirichlet參數的向量使用不對稱的先驗,值應該大于等于0 。提供 Vector(-1) 會使用預設值(uniform k dimensional vector with value (1.0/k) )。
  • topicConcentration: 隻支援對稱的先驗,值必須大于等于0。提供值-1會使用預設值 (1.0/k) 。
  • maxIterations: 送出的minibatches的最大次數。
  • 此外,OnlineLDAOptimizer 接收下列參數:
    • miniBatchFraction: 每次疊代使用的語料庫抽樣比例。
    • optimizeDocConcentration: 如果設定為true,每次 minibatch 之後計算參數 docConcentration (aka alpha) 的最大似然估計,然後在傳回的 LocalLDAModel 使用優化了的docConcentration。
    • τ0 和 κ : 用作學習率衰減,用 (τ0+iter)−κ 計算,這裡 iter 是目前的疊代次數。
    OnlineLDAOptimizer 生成 LocalLDAModel,它隻存儲了推斷的主題。LocalLDAModel支援:
    • logLikelihood(documents): 給定推斷的主題,計算提供的文檔的下界。
    • logPerplexity(documents): 給定推斷的主題,計算提供的文檔的複雜度的上界。

[ PySpark.ml庫中的Clustering]

皮皮blog

Spark實作LDA執行個體

步驟

1)加載資料

傳回的資料格式為:documents: RDD[(Long, Vector)],其中:Long為文章ID,Vector為文章分詞後的詞向量;使用者可以讀取指定目錄下的資料,通過分詞以及資料格式的轉換,轉換成RDD[(Long, Vector)]即可。

2)建立模型

模型參數設定說明見上面的簡介

3)結果輸出

topicsMatrix以及topics(word,topic))輸出。

注意事項

從Pyspark LDA model中擷取document-topic matrix

mllib上的lda不是分布式的,目前好像隻存儲topic的資訊,而不存儲doc的資訊,是以是沒法擷取doc-topic矩陣的。

要擷取的話,隻能使用ml中的lda,

或者使用scala版本的lda:

val ldaModel = lda.run(documents)

val distLDAModel = ldaModel.asInstanceOf[DistributedLDAModel]

distLDAModel.topicDistributions

[Extract document-topic matrix from Pyspark LDA Model]

使用pyspark實作LDA

# -*- coding: utf-8 -*-
[dev_pipi]
corpus_filename = /home/pipi/files/DATASETS/SparkMLlib/sample_lda_data.txt
;corpus_filename = hdfs://...
SPARK_HOME = /home/pipi/ENV/spark
PYSPARK_PYTHON = /home/pipi/ENV/ubuntu_env/bin/python
SPARK_LOCAL_IP = 127.0.0.1
JAVA_HOME = /home/pipi/ENV/jdk

;topic model設定
K = 3
alpha = 5
beta = 5
max_iter = 20
seed = 0
checkin_point_interval = 10
optimizer = em
      
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
__title__ = 'Spark MLlib LDA執行個體代碼'
__author__ = 'pipi'
__mtime__ = '16-10-24'
__email__ = '[email protected]'
# code is far away from bugs with the god animal protecting
    I love animals. They taste delicious.
              ┏┓      ┏┓
            ┏┛┻━━━┛┻┓
            ┃      ☃      ┃
            ┃  ┳┛  ┗┳  ┃
            ┃      ┻      ┃
            ┗━┓      ┏━┛
                ┃      ┗━━━┓
                ┃  神獸保佑    ┣┓
                ┃ 永無BUG!   ┏┛
                ┗┓┓┏━┳┓┏┛
                  ┃┫┫  ┃┫┫
                  ┗┻┛  ┗┻┛
"""


def config():
    '''
    運作前的參數配置
    '''
    import configparser, os
    SECTION = 'dev_pipi'
    conf = configparser.ConfigParser()
    conf.read(os.path.join(os.path.split(os.path.realpath(__file__))[0], 'config.ini'))

    global corpus_filename, K, alpha, beta, max_iter, seed, checkin_point_interval, optimizer
    corpus_filename = conf.get(SECTION, 'corpus_filename')
    K = conf.getint(SECTION, 'K')
    alpha = conf.getfloat(SECTION, 'alpha')
    beta = conf.getfloat(SECTION, 'beta')
    max_iter = conf.getint(SECTION, 'max_iter')
    seed = conf.getint(SECTION, 'seed')
    checkin_point_interval = conf.getint(SECTION, 'checkin_point_interval')
    optimizer = conf.get(SECTION, 'optimizer')

    # spark environment settings
    import sys, os
    os.environ['SPARK_HOME'] = conf.get(SECTION, 'SPARK_HOME')
    sys.path.append(os.path.join(conf.get(SECTION, 'SPARK_HOME'), 'python'))
    os.environ["PYSPARK_PYTHON"] = conf.get(SECTION, 'PYSPARK_PYTHON')
    os.environ['SPARK_LOCAL_IP'] = conf.get(SECTION, 'SPARK_LOCAL_IP')
    os.environ['JAVA_HOME'] = conf.get(SECTION, 'JAVA_HOME')

    import logging
    logging.basicConfig(filename=os.path.join(os.path.split(os.path.realpath(__file__))[0], 'log.txt'), level=logging.DEBUG)


config()

from pyspark import SparkContext
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors

if __name__ == '__main__':
    sc = SparkContext(master='local[4]', appName='lda')

    data = sc.textFile(corpus_filename).map(lambda line: Vectors.dense([float(i) for i in line.strip().split()]))
    corpus = data.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()
    # print(corpus.take(5))

    lda_model = LDA.train(rdd=corpus, maxIterations=max_iter, seed=seed, checkpointInterval=checkin_point_interval, k=K,
                          optimizer=optimizer, docConcentration=alpha, topicConcentration=beta)
    topics = lda_model.topicsMatrix()
    for tid in range(3):
        print('Topic' + str(tid) + ':')
        for wid in range(0, lda_model.vocabSize()):
            print(' ' + str(topics[wid, tid]))
    lda_model.describeTopics(4)
    sc.stop()

    # df = pyspark.createDataFrame([[1, Vectors.dense([0.0, 1.0])], [2, SparseVector(2, {0: 1.0})],], ["id", "features"])
      

資料及結果:

1 2 6 0 2 3 1 1 0 0 3

1 3 0 1 3 0 0 2 0 0 1

1 4 1 0 0 4 9 0 1 2 0

2 1 0 3 0 0 5 0 2 3 9

3 1 1 9 3 0 2 0 0 1 3

4 2 0 3 4 5 1 1 1 4 0

2 1 0 3 0 0 5 0 2 2 9

1 1 1 9 2 1 2 0 0 1 3

4 4 0 3 4 2 1 3 0 0 0

2 8 2 0 3 0 2 0 2 7 2

1 1 1 9 0 2 2 0 0 3 3

4 1 0 0 4 5 1 3 0 1 0

Topic0:        Topic1:      Topic2:       
 7.37834974184 9.73045219375 8.89119806441
 6.59081862005 11.1175108178 11.2916705621
 3.49398022369 4.20302495549 4.30299482082
 22.1867881493 8.95779840996 8.8554134407 
 5.66785332714 10.4148634185 8.91728325435
 4.66999543003 9.0609229138  8.26908165618
 12.0788314276 8.65705135654 10.2641172159
 2.15391819    4.20420496512 3.64187684489
 2.92593942578 2.42997556379 2.64408501043
 7.77320999456 7.84974291061 8.37704709483
 19.3787362983 6.39079305857 7.2304706431 
      

使用scala的spark實作LDA

[spark的python, scala, java示例代碼:spark/examples/src]

[FatherAbraham1/MLlibMachineLearning]

from: http://blog.csdn.net/pipisorry/article/details/52912179

ref:

繼續閱讀