1 flink sink
2 file sink
package com.study.liucf.unbounded.sink
import com.study.liucf.bean.LiucfSensorReding
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala._
/**
* @Author liucf
* @Date 2021/9/13
*/
object FileSink {
def main(args: Array[String]): Unit = {
//建立flink執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//讀取資料
val inputStream: DataStream[String] = env.readTextFile("src\\main\\resources\\sensor.txt")
//轉換資料類型 string 類型轉換成LiucfSensorReding,求最小值
val ds = inputStream.map(r=>{
val arr = r.split(",")
LiucfSensorReding(arr(0),arr(1).toLong,arr(2).toDouble)
})
//輸出到控制台
ds.print()
//輸出到檔案
// ds
// .writeAsCsv("src\\main\\resources\\sensor.csv")
// .setParallelism(1)//預設會分布式并行執行根據多少并行度生成多少檔案,這裡我讓它生成一個檔案
ds.addSink(StreamingFileSink.forRowFormat(
new Path("src\\main\\resources\\sensor2.csv"),
new SimpleStringEncoder[LiucfSensorReding]()
).build())
//可見writeAsCSV已經被棄用了
//啟動flink執行
env.execute("liucf sink api")
}
}
2 kafka sink
本示例示範,資料從kafka的一個topic:sensor_input_csv讀入然
後寫出到kafka的另一個topic:sensor_out
2.1 生産者生産到topic:topic:sensor_input_csv
package com.study.liucf.kafka
import java.util.Properties
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.serialization.StringSerializer
object SensorProduce2 {
def main(args: Array[String]): Unit = {
val kafkaProp = new Properties()
kafkaProp.put("bootstrap.servers", "192.168.109.151:9092")
kafkaProp.put("acks", "1")
kafkaProp.put("retries", "3")
//kafkaProp.put("batch.size", 16384)//16k
kafkaProp.put("key.serializer", classOf[StringSerializer].getName)
kafkaProp.put("value.serializer", classOf[StringSerializer].getName)
kafkaProp.put("topic","sensor_input_csv")
val producer = new KafkaProducer[String, String](kafkaProp)
val sensor = "sensor_1,1617505482,36.6"
send(sensor,producer)
producer.close()
}
def send(str:String,producer: KafkaProducer[String, String]): Unit ={
val record = new ProducerRecord[String, String]("sensor_input_csv", str )
producer.send(record, new Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
if (metadata != null) {
println("發送成功")
}
if (exception != null) {
println("消息發送失敗")
}
}
})
}
}
2.2 flink代碼
資料從kafka的一個topic:sensor_input_csv讀入然 後寫出到kafka的另一個topic:sensor_out
package com.study.liucf.unbounded.sink
import java.util.Properties
import com.study.liucf.bean.LiucfSensorReding
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka._
/**
* @Author liucf
* @Date 2021/9/18
* 本示例示範,資料從kafka的一個topic:sensor_input_csv讀入然
* 後寫出到kafka的另一個topic:sensor_out
*/
object KafkaSink {
def main(args: Array[String]): Unit = {
//建立執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//設定kafak配置項
val props = new Properties()
props.setProperty("bootstrap.servers","192.168.109.151:9092")
props.setProperty("topic","sensor_input_csv")
//添加kafka資料源
val inputDs: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("sensor_input_csv",new SimpleStringSchema(),props))
val transDs: DataStream[String] = inputDs.map(d=>{
val arr = d.split(",")
LiucfSensorReding(arr(0),arr(1).toLong,arr(2).toDouble).toString
})
//輸出結果到标準控制台
transDs.print()
//輸出到kafka的另一個topic裡
val outProps = new Properties()
outProps.setProperty("bootstrap.servers","192.168.109.151:9092")
transDs.addSink(new FlinkKafkaProducer[String](
"sensor_out",new SimpleStringSchema(),outProps))
//啟動執行flink
env.execute("liucf kafka sink api test")
}
}
2.3 消費者從topic:sensor_out消費
消費資料并列印出來
package com.study.liucf.kafka
import java.util
import java.util.Properties
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}
import scala.collection.JavaConversions._
/**
* @author [email protected]
* @date 2021/1/14 20:44
* @version 1.0
*/
object SimpleConsumer {
def main(args: Array[String]): Unit = {
/** 判斷是否指定消費的topic */
if (args.length == 0) {
println("Enter topic name")
return
}
//Kafka consumer configuration settings
/**參數傳遞topic*/
val topicName = args(0)
/**為屬性建立執行個體來通路生成器配置*/
val props: Properties = new Properties()
import java.util.UUID
props.put("bootstrap.servers", "192.168.109.151:9092")
props.put("acks", "all")
props.put("retries", "0")
/** 将單個消費者配置設定給組 */
props.put("group.id", "test")
/** 如果值為true,則為偏移啟用自動落實,否則不送出。 */
props.put("enable.auto.commit", "true")
/** 傳回更新的消耗偏移量寫入ZooKeeper的頻率 */
props.put("auto.commit.interval.ms", "1000")
/** 表示Kafka在放棄和繼續消費消息之前等待ZooKeeper響應請求(讀取或寫入)多少毫秒。 */
props.put("session.timeout.ms", "30000")
/** 反序列化器接口的鍵 */
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
/** 反序列化器接口的值 */
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
/** 當各分區下有已送出的offset時,
* 從送出的offset開始消費(也就是從上一次消費的偏移量開始往後消費,不管上次消費的consumer是誰);
* 無送出的offset時,從頭開始消費(如果該topic沒被消費過就從開始消費) */
props.put("auto.offset.reset", "earliest")
/** props.put("auto.offset.reset", "earliest"); 加上props.put("group.id", UUID.randomUUID().toString());
* 可以實作 --from-beginning 功能 */
props.put("group.id", UUID.randomUUID.toString)
/** 當各分區下有已送出的offset時,從送出的offset開始消費(也就是從上一次消費的偏移量開始往後消費,不管上次消費的consumer是誰);
* 無送出的offset時,消費新産生的該分區下的資料(也就是說如果該topic沒被消費過即沒有offset記錄的情況下
* 則從我這個consumer啟動開始消費開始往後生産到該topic的資料才被消費,之前的數就不消費了) */
// props.put("auto.offset.reset", "latest");
val consumer: KafkaConsumer[String, String] = new KafkaConsumer(props)
/**Kafka Consumer subscribes list of topics here.Kafka使用者在這裡訂閱主題清單。*/
consumer.subscribe(util.Arrays.asList(topicName))
println("Subscribed to topic " + topicName)
while (true){
val records: ConsumerRecords[String, String] = consumer.poll(100)
for (record <- records) {
/** print the offset,key and value for the consumer records.列印消費者記錄的偏移量、鍵和值。 */
printf("offset = %d, key = %s, value = %s\n", record.offset, record.key, record.value)
}
}
}
}