文章目錄
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object StreamingWordCount {
/*
args中傳遞參數 : --host note01 --port 7777
*/
def main(args: Array[String]): Unit = {
//從外部指令中擷取參數
val tool = ParameterTool.fromArgs(args)
val host = tool.get("host")
val port = tool.get("port").toInt
//建立流環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//接受socket文本流
val textDstream = env.socketTextStream(host, port)
// flatMap和Map需要引用的隐式轉換
import org.apache.flink.api.scala._
val rs = textDstream.flatMap(_.split(" ")).filter(_.nonEmpty)
.map((_, 1))
.keyBy(0) //實時這裡使用的是keyBy不是groupBy flink是有狀态的,keyBy儲存了狀态,狀态預設在記憶體中
.sum(1)
rs.print()
//執行
env.execute()
}
}
[root@note01 ~]# nc -lk 7777