最近也是有很多同學問我,StructuredStreaming結合kafka的使用,我簡單的寫了一個wordcount的demo,後續會有更加具體和詳細的介紹,今天先來一個簡單的demo吧.代碼在本地可以直接跑通.
添加依賴:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
Structured Streaming将實時流抽象成一張無邊界的表,輸入的每一條資料當成輸入表的一個新行,同時将流式計算的結果映射為另外一張表,完全以結構化的方式去操作流式資料。我們通過下面的代碼看一下就明白了.
package spark
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.functions._
/**
* structredstreaming消費kafka的資料,實作exactly-once的語義;
*/
object StructuredStreaming {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").se