天天看點

10-flink-1.10.1- flink Sink api 輸出算子

1 flink sink

10-flink-1.10.1- flink Sink api 輸出算子

 2 file sink

package com.study.liucf.unbounded.sink

import com.study.liucf.bean.LiucfSensorReding
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala._

/**
 * @Author liucf
 * @Date 2021/9/13
 */
object FileSink {
  def main(args: Array[String]): Unit = {
    //建立flink執行環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //讀取資料
    val inputStream: DataStream[String] = env.readTextFile("src\\main\\resources\\sensor.txt")
    //轉換資料類型 string 類型轉換成LiucfSensorReding,求最小值
    val ds = inputStream.map(r=>{
      val arr = r.split(",")
      LiucfSensorReding(arr(0),arr(1).toLong,arr(2).toDouble)
    })
    //輸出到控制台
    ds.print()
    //輸出到檔案
//    ds
//      .writeAsCsv("src\\main\\resources\\sensor.csv")
//      .setParallelism(1)//預設會分布式并行執行根據多少并行度生成多少檔案,這裡我讓它生成一個檔案
    ds.addSink(StreamingFileSink.forRowFormat(
      new Path("src\\main\\resources\\sensor2.csv"),
      new SimpleStringEncoder[LiucfSensorReding]()
    ).build())
    //可見writeAsCSV已經被棄用了
    //啟動flink執行
    env.execute("liucf sink api")
  }
}
           

2 kafka sink

本示例示範,資料從kafka的一個topic:sensor_input_csv讀入然
後寫出到kafka的另一個topic:sensor_out      

2.1 生産者生産到topic:topic:sensor_input_csv

package com.study.liucf.kafka

import java.util.Properties

import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.serialization.StringSerializer

object SensorProduce2 {
  def main(args: Array[String]): Unit = {
    val kafkaProp = new Properties()
    kafkaProp.put("bootstrap.servers", "192.168.109.151:9092")
    kafkaProp.put("acks", "1")
    kafkaProp.put("retries", "3")
    //kafkaProp.put("batch.size", 16384)//16k
    kafkaProp.put("key.serializer", classOf[StringSerializer].getName)
    kafkaProp.put("value.serializer", classOf[StringSerializer].getName)
    kafkaProp.put("topic","sensor_input_csv")
    val producer = new KafkaProducer[String, String](kafkaProp)
    val sensor = "sensor_1,1617505482,36.6"
    send(sensor,producer)
    producer.close()
  }


  def send(str:String,producer: KafkaProducer[String, String]): Unit ={
    val record = new ProducerRecord[String, String]("sensor_input_csv", str )
    producer.send(record, new Callback {
      override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
        if (metadata != null) {
          println("發送成功")
        }
        if (exception != null) {
          println("消息發送失敗")
        }
      }
    })
  }
}
           

2.2 flink代碼

資料從kafka的一個topic:sensor_input_csv讀入然 後寫出到kafka的另一個topic:sensor_out

package com.study.liucf.unbounded.sink

import java.util.Properties

import com.study.liucf.bean.LiucfSensorReding
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka._
/**
 * @Author liucf
 * @Date 2021/9/18
 *      本示例示範,資料從kafka的一個topic:sensor_input_csv讀入然
 *      後寫出到kafka的另一個topic:sensor_out
 */
object KafkaSink {
  def main(args: Array[String]): Unit = {
    //建立執行環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //設定kafak配置項
    val props = new Properties()
    props.setProperty("bootstrap.servers","192.168.109.151:9092")
    props.setProperty("topic","sensor_input_csv")
    //添加kafka資料源
    val inputDs: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("sensor_input_csv",new SimpleStringSchema(),props))
    val transDs: DataStream[String] = inputDs.map(d=>{
      val arr = d.split(",")
      LiucfSensorReding(arr(0),arr(1).toLong,arr(2).toDouble).toString
    })
    //輸出結果到标準控制台
    transDs.print()
    //輸出到kafka的另一個topic裡
    val outProps = new Properties()
    outProps.setProperty("bootstrap.servers","192.168.109.151:9092")
    transDs.addSink(new FlinkKafkaProducer[String](
      "sensor_out",new SimpleStringSchema(),outProps))
    //啟動執行flink
    env.execute("liucf kafka sink api test")
  }
}
           

2.3 消費者從topic:sensor_out消費

消費資料并列印出來

package com.study.liucf.kafka

import java.util
import java.util.Properties

import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}

import scala.collection.JavaConversions._

/**
 * @author [email protected]
 * @date 2021/1/14 20:44
 * @version 1.0
 */
object SimpleConsumer {
  def main(args: Array[String]): Unit = {
    /** 判斷是否指定消費的topic */
    if (args.length == 0) {
      println("Enter topic name")
      return
    }
    //Kafka consumer configuration settings
    /**參數傳遞topic*/
    val topicName = args(0)
    /**為屬性建立執行個體來通路生成器配置*/
    val props: Properties = new Properties()
    import java.util.UUID
    props.put("bootstrap.servers", "192.168.109.151:9092")
    props.put("acks", "all")
    props.put("retries", "0")

    /** 将單個消費者配置設定給組 */
    props.put("group.id", "test")

    /** 如果值為true,則為偏移啟用自動落實,否則不送出。 */
    props.put("enable.auto.commit", "true")

    /** 傳回更新的消耗偏移量寫入ZooKeeper的頻率 */
    props.put("auto.commit.interval.ms", "1000")

    /** 表示Kafka在放棄和繼續消費消息之前等待ZooKeeper響應請求(讀取或寫入)多少毫秒。 */
    props.put("session.timeout.ms", "30000")

    /** 反序列化器接口的鍵 */
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

    /** 反序列化器接口的值 */
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

    /** 當各分區下有已送出的offset時,
     * 從送出的offset開始消費(也就是從上一次消費的偏移量開始往後消費,不管上次消費的consumer是誰);
     * 無送出的offset時,從頭開始消費(如果該topic沒被消費過就從開始消費) */
    props.put("auto.offset.reset", "earliest")

    /** props.put("auto.offset.reset", "earliest"); 加上props.put("group.id", UUID.randomUUID().toString());
     * 可以實作 --from-beginning 功能 */
    props.put("group.id", UUID.randomUUID.toString)

    /** 當各分區下有已送出的offset時,從送出的offset開始消費(也就是從上一次消費的偏移量開始往後消費,不管上次消費的consumer是誰);
     * 無送出的offset時,消費新産生的該分區下的資料(也就是說如果該topic沒被消費過即沒有offset記錄的情況下
     * 則從我這個consumer啟動開始消費開始往後生産到該topic的資料才被消費,之前的數就不消費了) */
    //        props.put("auto.offset.reset", "latest");
    val consumer: KafkaConsumer[String, String] = new KafkaConsumer(props)
    /**Kafka Consumer subscribes list of topics here.Kafka使用者在這裡訂閱主題清單。*/
    consumer.subscribe(util.Arrays.asList(topicName))
    println("Subscribed to topic " + topicName)
    while (true){

      val records: ConsumerRecords[String, String] = consumer.poll(100)

      for (record <- records) {
        /** print the offset,key and value for the consumer records.列印消費者記錄的偏移量、鍵和值。 */
        printf("offset = %d, key = %s, value = %s\n", record.offset, record.key, record.value)
      }
    }
  }
}
           
10-flink-1.10.1- flink Sink api 輸出算子

繼續閱讀