import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import java.util.Properties;
public class ReadingToKafka {
public static void main(String[] args) throws Exception {
//String outPath = "/user/storm/test";
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.getConfig().enableSysoutLogging();
env.enableCheckpointing(1000);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "xxxxxxxxxxxxxxxxxxxxxxxx");
properties.setProperty("zookeeper.connect", "xxxxxxxxxxxxxxxxxxxxxxxx");
properties.setProperty("group.id", "test");
//properties.setProperty("fs.default-scheme", "hdfs://hostname:8020");
//FlinkKafkaConsumer010
FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<String>("test", new SimpleStringSchema(),
properties);
//隻讀取最新的資料源
myConsumer.setStartFromLatest();
//添加kafka資料源
DataStreamSource<String> stream = env.addSource(myConsumer);
BucketingSink<String> hdfs_sink = new BucketingSink<String>(
"hdfs:///user/storm/data/");
hdfs_sink.setBatchSize(1024 * 1024 * 400);
hdfs_sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd"));
//hdfs_sink.setWriter(new SequenceFileWriter<IntWritable,Text>()).
//設定的是關閉不活躍桶的門檻值,多久時間沒有資料寫入就關閉桶
hdfs_sink.setBatchRolloverInterval(3600000);
//存到hdfs
stream.addSink(hdfs_sink);
env.execute("flink to hdfs");
//流計算
/* DataStream<Tuple2<String, Integer>> counts = stream.flatMap(new LineSplitter())
.keyBy(0).sum(1);*/
//counts.writeAsCsv(outPath).setParallelism(1);
//counts.print();
//env.execute("WordCount from Kafka data");
}
/* public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] tokens = value.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}*/
}