目錄
1 flink 從集合中讀取資料
1.1 代碼
1.2 執行結果
2 fink 從檔案中讀取資料
2.1 資料檔案
2.2 代碼
2.3 輸出結果
3 flink 從kafka讀取資料
3.1 pom引入依賴flink-connector-kafka
3.2 建立生産者
3.3 使用flink kafka api 讀取資料
3.4 測試結果
1 flink 從集合中讀取資料
1.1 代碼
package com.study.liucf.bounded.api.source
import org.apache.flink.streaming.api.scala._
/**
* @Author liucf
* @Date 2021/9/5
*/
case class SensorReding(id:String,timeStamp:Long,temperature:Double)
object CollectionSource {
def main(args: Array[String]): Unit = {
//定義一個提供資料的List
val dataList = List(
SensorReding("sensor_1",1630851513,36.1),
SensorReding("sensor_2",1630851512,36.2),
SensorReding("sensor_3",1630851513,36.3),
SensorReding("sensor_4",1630851514,36.4),
SensorReding("sensor_5",1630851515,36.5),
)
//定義執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//讀取集合資料源
val ds: DataStream[SensorReding] = env.fromCollection(dataList)
//輸出結果到标準控制台
ds.print()
//啟動執行器
env.execute("liucf collection source test")
}
}
1.2 執行結果
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
4> SensorReding(sensor_2,1630851512,36.2)
6> SensorReding(sensor_4,1630851514,36.4)
5> SensorReding(sensor_3,1630851513,36.3)
7> SensorReding(sensor_5,1630851515,36.5)
3> SensorReding(sensor_1,1630851513,36.1)
Process finished with exit code 0
2 fink 從檔案中讀取資料
2.1 資料檔案
2.2 代碼
package com.study.liucf.bounded.api.source
import org.apache.flink.streaming.api.scala._
/**
* @Author liucf
* @Date 2021/9/5
*/
object FileSource {
def main(args: Array[String]): Unit = {
//建立flink執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//從檔案中讀取資料
val ds = env.readTextFile("src\\main\\resources\\sensor.txt")
//輸出結果到标準控制台
ds.print()
//啟動flink執行
env.execute("liucf File source test")
}
}
2.3 輸出結果
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
6> sensor_4,1630851514,36.4
5> sensor_3,1630851513,36.3
2> sensor_1,1630851513,36.1
3> sensor_2,1630851512,36.2
8> sensor_5,1630851515,36.5
Process finished with exit code 0
3 flink 從kafka讀取資料
3.1 pom引入依賴flink-connector-kafka
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.10.1</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
3.2 建立生産者
package com.study.liucf.kafka
import java.util.Properties
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.serialization.StringSerializer
object SensorProduce2 {
def main(args: Array[String]): Unit = {
val kafkaProp = new Properties()
kafkaProp.put("bootstrap.servers", "192.168.189.151:9092")
kafkaProp.put("acks", "1")
kafkaProp.put("retries", "3")
//kafkaProp.put("batch.size", 16384)//16k
kafkaProp.put("key.serializer", classOf[StringSerializer].getName)
kafkaProp.put("value.serializer", classOf[StringSerializer].getName)
kafkaProp.put("topic","sensor_input_csv")
val producer = new KafkaProducer[String, String](kafkaProp)
val sensor = "sensor_1,1617505482,36.6"
send(sensor,producer)
producer.close()
}
def send(str:String,producer: KafkaProducer[String, String]): Unit ={
val record = new ProducerRecord[String, String]("sensor_input_csv", str )
producer.send(record, new Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
if (metadata != null) {
println("發送成功")
}
if (exception != null) {
println("消息發送失敗")
}
}
})
}
}
3.3 使用flink kafka api 讀取資料
package com.study.liucf.unbounded.source
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
/**
* @Author liucf
* @Date 2021/9/7
*/
object KafkaSource {
def main(args: Array[String]): Unit = {
//建立執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//設定kafak配置項
val props = new Properties()
props.setProperty("bootstrap.servers","192.168.109.151:9092")
props.setProperty("topic","sensor_input_csv")
//添加kafka資料源
val ds: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("sensor_input_csv",new SimpleStringSchema(),props))
//輸出結果到标準控制台
ds.print()
//啟動執行flink
env.execute("liucf kafka api test")
}
}
3.4 測試結果
生成資料
flink消費到資料