天天看點

06-flink-1.10.1-flink source api1 flink 從集合中讀取資料2 fink 從檔案中讀取資料3 flink 從kafka讀取資料

目錄

1 flink 從集合中讀取資料

1.1 代碼

1.2 執行結果

2 fink 從檔案中讀取資料

2.1 資料檔案

2.2 代碼

2.3 輸出結果

3 flink 從kafka讀取資料

3.1 pom引入依賴flink-connector-kafka

3.2 建立生産者

3.3 使用flink kafka api 讀取資料

3.4 測試結果

1 flink 從集合中讀取資料

1.1 代碼

package com.study.liucf.bounded.api.source

import org.apache.flink.streaming.api.scala._

/**
 * @Author liucf
 * @Date 2021/9/5
 */

case class SensorReding(id:String,timeStamp:Long,temperature:Double)
object CollectionSource {
  def main(args: Array[String]): Unit = {
    //定義一個提供資料的List
    val  dataList = List(
      SensorReding("sensor_1",1630851513,36.1),
      SensorReding("sensor_2",1630851512,36.2),
      SensorReding("sensor_3",1630851513,36.3),
      SensorReding("sensor_4",1630851514,36.4),
      SensorReding("sensor_5",1630851515,36.5),
    )

    //定義執行環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //讀取集合資料源
    val ds: DataStream[SensorReding] = env.fromCollection(dataList)
    //輸出結果到标準控制台
    ds.print()
    //啟動執行器
    env.execute("liucf collection source test")

  }

}
           

1.2 執行結果

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
4> SensorReding(sensor_2,1630851512,36.2)
6> SensorReding(sensor_4,1630851514,36.4)
5> SensorReding(sensor_3,1630851513,36.3)
7> SensorReding(sensor_5,1630851515,36.5)
3> SensorReding(sensor_1,1630851513,36.1)

Process finished with exit code 0
           

2 fink 從檔案中讀取資料

2.1 資料檔案

06-flink-1.10.1-flink source api1 flink 從集合中讀取資料2 fink 從檔案中讀取資料3 flink 從kafka讀取資料

2.2 代碼

package com.study.liucf.bounded.api.source

import org.apache.flink.streaming.api.scala._

/**
 * @Author liucf
 * @Date 2021/9/5
 */
object FileSource {
  def main(args: Array[String]): Unit = {
     //建立flink執行環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //從檔案中讀取資料
    val ds = env.readTextFile("src\\main\\resources\\sensor.txt")
    //輸出結果到标準控制台
    ds.print()
    //啟動flink執行
    env.execute("liucf File source test")
  }
}
           

2.3 輸出結果

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
6> sensor_4,1630851514,36.4
5> sensor_3,1630851513,36.3
2> sensor_1,1630851513,36.1
3> sensor_2,1630851512,36.2
8> sensor_5,1630851515,36.5

Process finished with exit code 0
           

3 flink 從kafka讀取資料

3.1 pom引入依賴flink-connector-kafka

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>1.10.1</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
           

3.2 建立生産者

package com.study.liucf.kafka

import java.util.Properties

import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.serialization.StringSerializer

object SensorProduce2 {
  def main(args: Array[String]): Unit = {
    val kafkaProp = new Properties()
    kafkaProp.put("bootstrap.servers", "192.168.189.151:9092")
    kafkaProp.put("acks", "1")
    kafkaProp.put("retries", "3")
    //kafkaProp.put("batch.size", 16384)//16k
    kafkaProp.put("key.serializer", classOf[StringSerializer].getName)
    kafkaProp.put("value.serializer", classOf[StringSerializer].getName)
    kafkaProp.put("topic","sensor_input_csv")
    val producer = new KafkaProducer[String, String](kafkaProp)
    val sensor = "sensor_1,1617505482,36.6"
    send(sensor,producer)
    producer.close()
  }


  def send(str:String,producer: KafkaProducer[String, String]): Unit ={
    val record = new ProducerRecord[String, String]("sensor_input_csv", str )
    producer.send(record, new Callback {
      override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
        if (metadata != null) {
          println("發送成功")
        }
        if (exception != null) {
          println("消息發送失敗")
        }
      }
    })
  }
}
           

3.3 使用flink kafka api 讀取資料

package com.study.liucf.unbounded.source

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

/**
 * @Author liucf
 * @Date 2021/9/7
 */
object KafkaSource {
  def main(args: Array[String]): Unit = {
    //建立執行環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //設定kafak配置項
    val props = new Properties()
    props.setProperty("bootstrap.servers","192.168.109.151:9092")
    props.setProperty("topic","sensor_input_csv")
    //添加kafka資料源
    val ds: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("sensor_input_csv",new SimpleStringSchema(),props))
    //輸出結果到标準控制台
    ds.print()
    //啟動執行flink
    env.execute("liucf kafka api test")

  }
}
           

3.4 測試結果

生成資料

06-flink-1.10.1-flink source api1 flink 從集合中讀取資料2 fink 從檔案中讀取資料3 flink 從kafka讀取資料

flink消費到資料

06-flink-1.10.1-flink source api1 flink 從集合中讀取資料2 fink 從檔案中讀取資料3 flink 從kafka讀取資料

繼續閱讀