天天看點

SeaTunnel 實踐 | SeaTunnel 幫你快速玩轉 Spark 資料處理

作者:燈惉

Databricks 開源的 Apache Spark 對于分布式資料處理來說是一個偉大的進步。我們在使用 Spark 時發現了很多可圈可點之處,我們在此與大家分享一下我們在簡化 Spark 使用和程式設計以及加快 Spark 在生産環境落地上做的一些努力。

01

一個 Spark Streaming 讀取 Kafka 的案例

以一個線上案例為例,介紹如何使用 Spark Streaming 統計 Nginx 後端日志中每個域名下每個狀态碼每分鐘出現的次數,并将結果資料輸出到外部資料源 Elasticsearch 中。其中原始資料已經通過 Rsyslog 傳輸到了 Kafka 中。

02

流資料讀取

從 Kafka 中每隔一段時間讀取資料,生成 DStream。

val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)
           

具體方法請參考

Spark Streaming + Kafka Integration Guide:

03

資料清洗

日志案例

192.168.0.1 seatunnel.apache.org 127.0.0.1 0.001s [22/Feb/2021:22:12:15 +0800] "GET /seatunnel HTTP/1.1" 200 8938 "http://github.com/" - "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36"

通過 Split 方法從非結構化的原始資料 message 中擷取域名以及狀态碼字段,并組成友善聚合的結構化資料格式 Map(key -> value)

val splitList = message.split(" ")
val domain = splitList(1)
val httpCode = splitList(9)
val item = Map((domain, httpCode) -> 1L)
           

04

資料聚合

利用 Spark 提供的 reduceByKey 方法對資料進行聚合計算,統計每分鐘每個域名下的每個錯誤碼出現的次數,其中 mapRdd 是在清洗資料階段組成的 RDD。

val reduceRdd = mapRdd.reduceByKey((a:Long, b:Long) => (a + b))
           

05

資料輸出

利用 Spark 提供的 foreachRDD 方法将結果資料 reduceRdd 輸出到外部資料源 Elasticsearch。

reduceRdd.foreachRDD(rdd => {
    rdd.saveToEs("es_index" + "/es_type", esCfg)
})
           

06

不可避免的麻煩

我們的确可以利用 Spark 提供的 API 對資料進行自由處理,但是整套邏輯的開發調試是個不小的工程,需要一定的 Spark 基礎以及使用經驗才能開發出穩定高效的 Spark 處理流程。

除了業務邏輯開發方面的問題,任務釋出上線時可能還會遇到以下不可逃避的麻煩:

  • 應用到生産環境調試周期長
  • 邏輯變更無法快速修改上線
  • 可能存在的資料丢失與重複問題
  • 如何最大化提升程式效率
  • 缺少應用運作狀态監控

是以我們開始嘗試更加簡單高效的 Spark 方案,并試着解決以上問題。

07

一種簡單高效的方式 - SeaTunnel

Apache SeaTunnel 是下一代高性能、分布式、海量資料內建架構。通過我們的努力讓 Spark 的使用更簡單,更高效,并将業界和廣大使用者使用 Spark 的優質經驗固化到 SeaTunnel 這個産品中,明顯減少學習成本,加快分布式資料處理能力在生産環境落地。

SeaTunnel 實踐 | SeaTunnel 幫你快速玩轉 Spark 資料處理

SeaTunnel 項目位址:

https://github.com/apache/incubator-seatunnel

SeaTunnel 原名 Waterdrop,2021 年 10 月 12 日起更名為 SeaTunnel。

08

SeaTunnel 的特性

  • 内置豐富插件,支援各種資料産品友善快捷的傳輸和內建資料,批流一體;
  • 基于子產品化和插件化設計,支援熱插拔,帶來更好的擴充性和定制能力;
  • 特有的架構設計下,使得開發配置更簡單,幾乎零代碼,無使用成本;
  • 經曆多家企業,大規模生産環境使用和海量資料的洗禮,穩定健壯。

09

SeaTunnel 的原理和工作流程

SeaTunnel 2.X 版本尚在孵化中, 這裡主要介紹 SeaTunnle 1.X 版本内容。SeaTunnel 利用了 Spark 的 Streaming, SQL, DataFrame 等技術,結合 Java 的反射機制、Service Loader 等技術實作了一套完整的可插拔的資料處理工作流,如下:

SeaTunnel 實踐 | SeaTunnel 幫你快速玩轉 Spark 資料處理

SeaTunnel pipeline

多個 Transform(Filter) 建構了資料處理的 Pipeline,滿足各種各樣的資料處理需求,如果您熟悉 SQL,也可以直接通過 SQL 插件建構資料處理的 Pipeline,簡單高效。

以下是一個啟動配置檔案展示:

spark {
  # Seatunnel defined streaming batch duration in seconds
  spark.streaming.batchDuration = 5

  spark.app.name = "Waterdrop"
  spark.ui.port = 13000
}

input {
  socket {}
}

filter {
  split {
    fields = ["msg", "name"]
    delimiter = ","
    result_table_name = "tmp1"
  }
  
  sql {
    sql = "select * from tmp1"
  }
}

output {
  stdout {}
}
           

整個配置由4個部分組成:

  • spark 是 Spark 相關的配置,可配置的 Spark 參數見:Spark Configuration;
  • input 可配置任意的 input 插件及其參數,具體參數随不同的插件而變化。input 支援包括 File, Hive, Kafka, ES, Jdbc 等插件;
  • filter 可配置任意的 filter 插件及其參數,具體參數随不同的 filter 插件而變化。filter 中的多個插件按配置順序形成了資料處理的pipeline, 預設上一個 filter 的輸出是下一個filter 的輸入;
  • output 可配置任意的 output 插件及其參數,具體參數随不同的插件而變化;
  • input 和 filter 以及 output 的随意組合,建構了多種多樣的資料同步場景。

10

如何使用 SeaTunnel

Step 1 : 使用 SeaTunnel 前請先準備好 Spark 和 Java 運作環境。

Step 2 : 下載下傳 SetTunnel 安裝包并解壓:

https://github.com/apache/incubator-seatunnel/releases

# 以 SeaTunnel 1.5.7 為例:
wget https://github.com/apache/incubator-seatunnel/releases/download/v1.5.7/seatunnel-1.5.7.zip
unzip seatunnel-1.5.7.zip
ln -s seatunnel-1.5.7 seatunnel
cd seatunnel           

Step 3 : 配置 SeaTunnel(從 kafka 消費資料,做字元串分割,輸出到終端), 編輯 config/application.conf。

spark {
  # Waterdrop defined streaming batch duration in seconds
  spark.streaming.batchDuration = 5

  spark.app.name = "Waterdrop"
  spark.ui.port = 13000
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
}

input {
  kafka {
    topics = "mytopic"
    consumer.bootstrap.servers = "localhost:9092"
    consumer.zookeeper.connect = "localhost:2181"
    consumer.group.id = "waterdrop_group"
  }
}

filter {
  split {
    fields = ["msg", "name"]
    delimiter = ","
  }
}

output {
  stdout {}
}           

Step 4 : 啟動 SeaTunnel

./bin/start-seatunnel.sh --master yarn --deploy-mode client --config ./config/application.conf           

通過這樣一個配置檔案啟動的方式即可快速實作文章開始部分介紹的手寫 Spark Streaming 讀取 Kafka 進行處理後寫入 ElasticSearch 的邏輯。

更詳細的使用方法見 Seatunnel Quick Start:

https://interestinglab.github.io/seatunnel-docs/#/zh-cn/v1/quick-start

11

SeaTunnel RoadMap

SeaTunnel 後續規劃主要按以下兩個方面詳細展開:

  1. 提供更多 Conncetor 和資料處理插件,提高易用性、可靠性、資料一緻性,也歡迎各位開發者來貢獻 Idea。
  2. 提升核心能力,包括但不限于相容 Flink 最新版本、支援多插件版本、提供運作 Metrics 以及執行報告等。

詳情參考 SeaTunnel RoadMap

https://github.com/orgs/apache/projects/28/views/1

上一篇: 波斯菊少女