天天看点

SeaTunnel 实践 | SeaTunnel 帮你快速玩转 Spark 数据处理

作者:灯惉

Databricks 开源的 Apache Spark 对于分布式数据处理来说是一个伟大的进步。我们在使用 Spark 时发现了很多可圈可点之处,我们在此与大家分享一下我们在简化 Spark 使用和编程以及加快 Spark 在生产环境落地上做的一些努力。

01

一个 Spark Streaming 读取 Kafka 的案例

以一个线上案例为例,介绍如何使用 Spark Streaming 统计 Nginx 后端日志中每个域名下每个状态码每分钟出现的次数,并将结果数据输出到外部数据源 Elasticsearch 中。其中原始数据已经通过 Rsyslog 传输到了 Kafka 中。

02

流数据读取

从 Kafka 中每隔一段时间读取数据,生成 DStream。

val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)
           

具体方法请参考

Spark Streaming + Kafka Integration Guide:

03

数据清洗

日志案例

192.168.0.1 seatunnel.apache.org 127.0.0.1 0.001s [22/Feb/2021:22:12:15 +0800] "GET /seatunnel HTTP/1.1" 200 8938 "http://github.com/" - "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36"

通过 Split 方法从非结构化的原始数据 message 中获取域名以及状态码字段,并组成方便聚合的结构化数据格式 Map(key -> value)

val splitList = message.split(" ")
val domain = splitList(1)
val httpCode = splitList(9)
val item = Map((domain, httpCode) -> 1L)
           

04

数据聚合

利用 Spark 提供的 reduceByKey 方法对数据进行聚合计算,统计每分钟每个域名下的每个错误码出现的次数,其中 mapRdd 是在清洗数据阶段组成的 RDD。

val reduceRdd = mapRdd.reduceByKey((a:Long, b:Long) => (a + b))
           

05

数据输出

利用 Spark 提供的 foreachRDD 方法将结果数据 reduceRdd 输出到外部数据源 Elasticsearch。

reduceRdd.foreachRDD(rdd => {
    rdd.saveToEs("es_index" + "/es_type", esCfg)
})
           

06

不可避免的麻烦

我们的确可以利用 Spark 提供的 API 对数据进行自由处理,但是整套逻辑的开发调试是个不小的工程,需要一定的 Spark 基础以及使用经验才能开发出稳定高效的 Spark 处理流程。

除了业务逻辑开发方面的问题,任务发布上线时可能还会遇到以下不可逃避的麻烦:

  • 应用到生产环境调试周期长
  • 逻辑变更无法快速修改上线
  • 可能存在的数据丢失与重复问题
  • 如何最大化提升程序效率
  • 缺少应用运行状态监控

因此我们开始尝试更加简单高效的 Spark 方案,并试着解决以上问题。

07

一种简单高效的方式 - SeaTunnel

Apache SeaTunnel 是下一代高性能、分布式、海量数据集成框架。通过我们的努力让 Spark 的使用更简单,更高效,并将业界和广大用户使用 Spark 的优质经验固化到 SeaTunnel 这个产品中,明显减少学习成本,加快分布式数据处理能力在生产环境落地。

SeaTunnel 实践 | SeaTunnel 帮你快速玩转 Spark 数据处理

SeaTunnel 项目地址:

https://github.com/apache/incubator-seatunnel

SeaTunnel 原名 Waterdrop,2021 年 10 月 12 日起更名为 SeaTunnel。

08

SeaTunnel 的特性

  • 内置丰富插件,支持各种数据产品方便快捷的传输和集成数据,批流一体;
  • 基于模块化和插件化设计,支持热插拔,带来更好的扩展性和定制能力;
  • 特有的架构设计下,使得开发配置更简单,几乎零代码,无使用成本;
  • 经历多家企业,大规模生产环境使用和海量数据的洗礼,稳定健壮。

09

SeaTunnel 的原理和工作流程

SeaTunnel 2.X 版本尚在孵化中, 这里主要介绍 SeaTunnle 1.X 版本内容。SeaTunnel 利用了 Spark 的 Streaming, SQL, DataFrame 等技术,结合 Java 的反射机制、Service Loader 等技术实现了一套完整的可插拔的数据处理工作流,如下:

SeaTunnel 实践 | SeaTunnel 帮你快速玩转 Spark 数据处理

SeaTunnel pipeline

多个 Transform(Filter) 构建了数据处理的 Pipeline,满足各种各样的数据处理需求,如果您熟悉 SQL,也可以直接通过 SQL 插件构建数据处理的 Pipeline,简单高效。

以下是一个启动配置文件展示:

spark {
  # Seatunnel defined streaming batch duration in seconds
  spark.streaming.batchDuration = 5

  spark.app.name = "Waterdrop"
  spark.ui.port = 13000
}

input {
  socket {}
}

filter {
  split {
    fields = ["msg", "name"]
    delimiter = ","
    result_table_name = "tmp1"
  }
  
  sql {
    sql = "select * from tmp1"
  }
}

output {
  stdout {}
}
           

整个配置由4个部分组成:

  • spark 是 Spark 相关的配置,可配置的 Spark 参数见:Spark Configuration;
  • input 可配置任意的 input 插件及其参数,具体参数随不同的插件而变化。input 支持包括 File, Hive, Kafka, ES, Jdbc 等插件;
  • filter 可配置任意的 filter 插件及其参数,具体参数随不同的 filter 插件而变化。filter 中的多个插件按配置顺序形成了数据处理的pipeline, 默认上一个 filter 的输出是下一个filter 的输入;
  • output 可配置任意的 output 插件及其参数,具体参数随不同的插件而变化;
  • input 和 filter 以及 output 的随意组合,构建了多种多样的数据同步场景。

10

如何使用 SeaTunnel

Step 1 : 使用 SeaTunnel 前请先准备好 Spark 和 Java 运行环境。

Step 2 : 下载 SetTunnel 安装包并解压:

https://github.com/apache/incubator-seatunnel/releases

# 以 SeaTunnel 1.5.7 为例:
wget https://github.com/apache/incubator-seatunnel/releases/download/v1.5.7/seatunnel-1.5.7.zip
unzip seatunnel-1.5.7.zip
ln -s seatunnel-1.5.7 seatunnel
cd seatunnel           

Step 3 : 配置 SeaTunnel(从 kafka 消费数据,做字符串分割,输出到终端), 编辑 config/application.conf。

spark {
  # Waterdrop defined streaming batch duration in seconds
  spark.streaming.batchDuration = 5

  spark.app.name = "Waterdrop"
  spark.ui.port = 13000
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
}

input {
  kafka {
    topics = "mytopic"
    consumer.bootstrap.servers = "localhost:9092"
    consumer.zookeeper.connect = "localhost:2181"
    consumer.group.id = "waterdrop_group"
  }
}

filter {
  split {
    fields = ["msg", "name"]
    delimiter = ","
  }
}

output {
  stdout {}
}           

Step 4 : 启动 SeaTunnel

./bin/start-seatunnel.sh --master yarn --deploy-mode client --config ./config/application.conf           

通过这样一个配置文件启动的方式即可快速实现文章开始部分介绍的手写 Spark Streaming 读取 Kafka 进行处理后写入 ElasticSearch 的逻辑。

更详细的使用方法见 Seatunnel Quick Start:

https://interestinglab.github.io/seatunnel-docs/#/zh-cn/v1/quick-start

11

SeaTunnel RoadMap

SeaTunnel 后续规划主要按以下两个方面详细展开:

  1. 提供更多 Conncetor 和数据处理插件,提高易用性、可靠性、数据一致性,也欢迎各位开发者来贡献 Idea。
  2. 提升核心能力,包括但不限于兼容 Flink 最新版本、支持多插件版本、提供运行 Metrics 以及执行报告等。

详情参考 SeaTunnel RoadMap

https://github.com/orgs/apache/projects/28/views/1

上一篇: 波斯菊少女