天天看點

flink入門案例之WordCount

flink

入門案例之

WordCount

,以下測試代碼都是在本地執行的

添加依賴

添加maven依賴

<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>
    </dependencies>
           

build資訊

<build>
    <plugins>
    <!-- 該插件用于将Scala代碼編譯成class檔案 -->
    <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.4.6</version>
        <executions>
            <execution>
                <!-- 聲明綁定到maven的compile階段 -->
                <goals>
                    <goal>testCompile</goal>
                </goals>
            </execution>
        </executions>
    </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.0.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
           

批處理

import org.apache.flink.api.scala.{AggregateDataSet, DataSet, ExecutionEnvironment, createTypeInformation}

object WordCount {
  def main(args: Array[String]): Unit = {
    //擷取執行環境
    //建立一個執行環境,該環境代表目前在其中執行程式的上下文。
    // 如果程式是獨立調用的,則此方法傳回本地執行環境。
    // 如果從指令行用戶端内部調用該程式以将其送出給叢集,則此方法将傳回該叢集的執行環境。
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    //建立通過逐行讀取給定檔案而産生的字元串資料集。
    val inputDS: DataSet[String] = env.readTextFile("D:/test/a.txt")
    val wordCounts: AggregateDataSet[(String, Int)] = inputDS.flatMap(_.split(" ")).map((_, 1)).groupBy(0).sum(1)
    //将DataSet中的元素列印到調用print()方法的JVM的标準輸出流System.out中。
    // 對于在群集中執行的程式,此方法需要将DataSet的内容收集回用戶端,以在用戶端列印。
    //為每個元素編寫的字元串由AnyRef.toString方法定義。
    //與()和()方法類似,此方法立即觸發程式執行。
    wordCounts.print()
  }
}
           

流處理

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object StreamWordCount {
  def main(args: Array[String]): Unit = {
    //替換為你自己的IP
    val host = "192.168.xx.1x1"
    val port = 9000

    // 建立流處理環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 接收socket文本流
    val textDstream: DataStream[String] = env.socketTextStream(host, port)

    // flatMap和Map需要引用的隐式轉換
    import org.apache.flink.api.scala._

    val value: DataStream[(String, Int)] = textDstream.flatMap(_.split("\\s")).filter((_.nonEmpty)).map((_, 1)).keyBy(0).sum(1)
    value.print().setParallelism(1)
    env.execute("Socket WordCountTest")

  }
}
           

之後在

linux

上用如下指令進行測試

總結

  • 導入包的時候一定要留意導入的是

    java

    的包還是

    scala

    的包,很多函數是兩個包都有的,導入錯了讓你懷疑人生

繼續閱讀