天天看點

(3)flink的實時wordCount

文章目錄

      • 代碼
      • linux

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object StreamingWordCount {
  /*
  args中傳遞參數 : --host note01 --port 7777
   */

  def main(args: Array[String]): Unit = {
    //從外部指令中擷取參數
    val tool = ParameterTool.fromArgs(args)
    val host = tool.get("host")
    val port = tool.get("port").toInt

    //建立流環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //接受socket文本流
    val textDstream = env.socketTextStream(host, port)
    // flatMap和Map需要引用的隐式轉換
    import org.apache.flink.api.scala._
    val rs = textDstream.flatMap(_.split(" ")).filter(_.nonEmpty)
      .map((_, 1))
      .keyBy(0)  //實時這裡使用的是keyBy不是groupBy flink是有狀态的,keyBy儲存了狀态,狀态預設在記憶體中
      .sum(1)

    rs.print()

    //執行
    env.execute()
  }
}

           

[root@note01 ~]# nc -lk 7777