天天看點

通過Spark Streaming作業處理Kafka資料

本節介紹如何使用阿裡雲E-MapReduce部署Hadoop叢集和Kafka叢集,并運作Spark Streaming作業消費Kafka資料。

前提條件

  • 已注冊阿裡雲賬号,詳情請參見 注冊雲賬号
  • 已開通E-MapReduce服務。
  • 已完成雲賬号的授權,詳情請參見 角色授權

背景資訊

在開發過程中,通常會遇到消費Kafka資料的場景。在阿裡雲E-MapReduce中,您可通過運作Spark Streaming作業來消費Kafka資料。

步驟一 建立Hadoop叢集和Kafka叢集

推薦您将Hadoop叢集和Kafka叢集建立在同一個安全組下。如果Hadoop叢集和Kafka叢集不在同一個安全組下,則兩者的網絡預設是不互通的,您需要對兩者的安全組分别進行相關配置,以使兩者的網絡互通。

  1. 登入阿裡雲 E-MapReduce 控制台
  2. 建立Hadoop叢集,詳情請參見 建立叢集
    通過Spark Streaming作業處理Kafka資料
  3. 建立Kafka叢集,詳情請參見
    通過Spark Streaming作業處理Kafka資料

步驟二 擷取JAR包并上傳到Hadoop叢集

本例中的JAR包:對E-MapReduce的

Demo

進行了一定的修改後,編譯生成的JAR包。JAR包需要上傳到Hadoop叢集的emr-header-1主機中。

  1. 擷取JAR包( 本例JAR下載下傳位址 )。
  2. 傳回到 阿裡雲 E-MapReduce 控制台
  3. 在叢集管理頁面,單擊Hadoop叢集的叢集ID,進入Hadoop叢集。
  4. 在左側導航樹中選擇主機清單,然後在右側檢視Hadoop叢集中emr-header-1主機的IP資訊。
  5. 通過SSH用戶端登入emr-header-1主機。
  6. 上傳JAR包到emr-header-1主機的某個目錄。
說明: 後續步驟中的代碼有涉及到此路徑,本例上傳路徑為/home/hadoop。上傳JAR包,請保留該登入視窗,後續步驟仍将使用。

步驟三 在Kafka叢集上建立Topic

您可直接在E-MapReduce上以可視化的方式來建立Topic(詳情請參見

Kafka 中繼資料管理

),也可登入Kafka叢集的emr-header-1主機後以指令行的方式來建立Topic。本例以指令行方式建立一個分區數為10、副本數為2、名稱為test的Topic。

  1. 在叢集管理頁面,單擊Kafka叢集的叢集ID,進入Kafka叢集。
  2. 在左側導航樹中選擇主機清單,然後在右側檢視Kafka叢集中emr-header-1主機的IP資訊。
  3. 在SSH用戶端中建立一個指令視窗,登入Kafka叢集的emr-header-1主機。
  4. 通過以下指令建立Topic。
/usr/lib/kafka-current/bin/kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper emr-header-1:2181 /kafka-1.0.0 --topic test --create
           
說明 :建立Topic後,請保留該登入視窗,後續步驟仍将使用。

步驟四 運作Spark Streaming作業

完成上述操作後,您即可在Hadoop叢集上運作Spark Streaming作業。本例将運作一個作業進行流式單詞統計(WordCount)。

  1. 傳回到Hadoop叢集的emr-header-1主機登入視窗。

    如果誤關閉了此視窗,請重新登入,詳情請參見步驟二 擷取JAR包并上傳到Hadoop叢集中的相關步驟。

  2. 通過如下作業指令來進行流式單詞統計(WordCount)。
spark-submit --class com.aliyun.emr.example.spark.streaming.KafkaSample  /home/hadoop/examples-1.2.0-shaded-2.jar 192.168.xxx.xxx:9092 test 5
           

指令中JAR包後面的三個關鍵參數說明如下:

  • 192.168.xxx.xxx:Kafka叢集中任一Kafka Broker元件的内網或外網IP位址,示例如圖 1所示。
  • test:Topic名稱。
  • 5:時間間隔。

圖 1. Kafka叢集元件

通過Spark Streaming作業處理Kafka資料

步驟五 使用Kafka釋出消息

進行本步驟操作時,需要保持Spark Streaming作業一直處于運作狀态。運作Kafka的生産者(producer)後,在Kafka用戶端的指令行中輸入文本時,在Hadoop叢集用戶端的指令行中會實時顯示單詞統計結果。

  1. 傳回到Kafka叢集的emr-header-1主機登入視窗。

    如果誤關閉了此視窗,請重新登入,詳情請參見步驟三 在Kafka叢集上建立Topic中的相關步驟。

  2. 在Kafka叢集的登入視窗中,通過如下指令來運作生産者(producer)。
/usr/lib/kafka-current/ /bin/kafka-console-producer.sh --topic test --broker-list emr-worker-1:9092
           
  1. 在Kafka登入視窗的指令行中不斷輸入文本,則在Hadoop叢集登入視窗中實時顯示文本的統計資訊。
    通過Spark Streaming作業處理Kafka資料

步驟六 檢視Spark Streaming作業的進展

Spark Streaming作業開始運作後,您可在E-MapReduce上檢視作業的狀态。

  1. 在Hadoop叢集的通路連結與端口頁面中,單擊Spark History Server UI後的連結,檢視Spark Streaming作業的狀态。詳情請參見 通路連結與端口
通過Spark Streaming作業處理Kafka資料
通過Spark Streaming作業處理Kafka資料

阿裡巴巴開源大資料技術團隊成立Apache Spark中國技術社群,定期推送精彩案例,技術專家直播,問答區數個Spark技術同學每日線上答疑,隻為營造純粹的Spark氛圍,歡迎釘釘掃碼加入!

通過Spark Streaming作業處理Kafka資料

對開源大資料和感興趣的同學可以加小編微信(下圖二維碼,備注“進群”)進入技術交流微信群。

通過Spark Streaming作業處理Kafka資料