一、flink處理的主要過程
從上一節wordcount的示例可以看到,flink的處理過程分為下面3個步驟:
1.1 、添加資料源addSource,這裡的資料源可以是檔案,網絡資料流,MQ,Mysql...
1.2、資料轉換(或者稱為資料處理),比如wordcount裡的處理過程,就是把一行文本,按空格拆分成單詞,并按單詞計數
1.3、将處理好的結果,輸出(或下沉)到目标系統,這裡的目标系統,可以是控制台、MQ、Redis、Mysql、ElasticSearch...
可能有同學有疑問,上節wordcount裡,最後的結果隻是調用了1個print方法,好象并沒有addSink的過程?
org.apache.flink.streaming.api.datastream.DataStream#print() 可以看下這個方法的源碼:
/**
* Writes a DataStream to the standard output stream (stdout).
*
* <p>For each element of the DataStream the result of {@link Object#toString()} is written.
*
* <p>NOTE: This will print to stdout on the machine where the code is executed, i.e. the Flink
* worker.
*
* @return The closed DataStream.
*/
@PublicEvolving
public DataStreamSink<T> print() {
PrintSinkFunction<T> printFunction = new PrintSinkFunction<>();
return addSink(printFunction).name("Print to Std. Out");
}
其實最後1行,就已經調用了addSink
二、kafka基本操作
下面将把之前WordCount的流式處理版本,其中的Source與Sink改成常用的Kafka:
注:不熟悉kafka的同學,可以參考下面的步驟學習kafka的常用指令行(kafka老手可跳過)
到kafka官網下載下傳最新的版本,并解壓到本機。
2.1 啟動zookeeper
.\zookeeper-server-start.bat ..\..\config\zookeeper.properties
2.2 啟動kafka
.\kafka-server-start.bat ..\..\config\server.properties
2.3 檢視topic list
.\kafka-topics.bat --list --zookeeper localhost:2181
2.4 啟動procduer
.\kafka-console-producer.bat --broker-list localhost:9092 --topic test1
2.5 啟動consumer
.\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test1
添加Source
1 <dependency>
2 <groupId>org.apache.flink</groupId>
3 <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
4 <version>1.11.2</version>
5 </dependency>
View Code
代碼:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "test-read-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");
DataStreamSource<String> text = env.addSource(new FlinkKafkaConsumer011<>(
"test1",
new SimpleStringSchema(),
props));
添加Transform
DataStream<Tuple2<String, Integer>> counts = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
//将每行按word拆分
String[] tokens = value.toLowerCase().split("\\b+");
//收集(類似:map-reduce思路)
for (String token : tokens) {
if (token.trim().length() > 0) {
out.collect(new Tuple2<>(token.trim(), 1));
}
}
}
})
//按Tuple2裡的第0項,即:word分組
.keyBy(value -> value.f0)
//然後對Tuple2裡的第1項求合
.sum(1);
添加Sink
counts.addSink(new FlinkKafkaProducer010<>("localhost:9092", "test2",
(SerializationSchema<Tuple2<String, Integer>>) element -> ("(" + element.f0 + "," + element.f1 + ")").getBytes()));
如上圖,程式運作起來後,在kafka的proceduer終端,随便輸入一些word,相當于發送消息給flink,然後idea的console控制台,輸出的統計結果。
如果此時,再開一個kafka的consumer,可以看到統計結果,也同步發送到了test2這個topic,即實作了kafka的sink.
附:
完整pom檔案
1 <?xml version="1.0" encoding="UTF-8"?>
2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
4 <modelVersion>4.0.0</modelVersion>
5
6 <groupId>com.cnblogs.yjmyzz</groupId>
7 <artifactId>flink-demo</artifactId>
8 <version>0.0.1-SNAPSHOT</version>
9
10 <properties>
11 <java.version>1.8</java.version>
12 <flink.version>1.11.2</flink.version>
13 </properties>
14
15 <dependencies>
16
17 <!-- flink -->
18 <dependency>
19 <groupId>org.apache.flink</groupId>
20 <artifactId>flink-core</artifactId>
21 <version>${flink.version}</version>
22 </dependency>
23
24 <dependency>
25 <groupId>org.apache.flink</groupId>
26 <artifactId>flink-java</artifactId>
27 <version>${flink.version}</version>
28 </dependency>
29
30 <dependency>
31 <groupId>org.apache.flink</groupId>
32 <artifactId>flink-scala_2.12</artifactId>
33 <version>${flink.version}</version>
34 </dependency>
35
36 <dependency>
37 <groupId>org.apache.flink</groupId>
38 <artifactId>flink-clients_2.12</artifactId>
39 <version>${flink.version}</version>
40 </dependency>
41
42 <!--kafka-->
43 <dependency>
44 <groupId>org.apache.flink</groupId>
45 <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
46 <version>1.11.2</version>
47 </dependency>
48
49 <dependency>
50 <groupId>org.apache.flink</groupId>
51 <artifactId>flink-test-utils-junit</artifactId>
52 <version>${flink.version}</version>
53 </dependency>
54 </dependencies>
55
56 <repositories>
57 <repository>
58 <id>central</id>
59 <layout>default</layout>
60 <url>https://repo1.maven.org/maven2</url>
61 </repository>
62 <repository>
63 <id>bintray-streamnative-maven</id>
64 <name>bintray</name>
65 <url>https://dl.bintray.com/streamnative/maven</url>
66 </repository>
67 </repositories>
68
69 <build>
70 <plugins>
71 <plugin>
72 <artifactId>maven-compiler-plugin</artifactId>
73 <version>3.1</version>
74 <configuration>
75 <source>1.8</source>
76 <target>1.8</target>
77 </configuration>
78 </plugin>
79
80 <!-- Scala Compiler -->
81 <plugin>
82 <groupId>net.alchim31.maven</groupId>
83 <artifactId>scala-maven-plugin</artifactId>
84 <version>4.4.0</version>
85 <executions>
86 <execution>
87 <id>scala-compile-first</id>
88 <phase>process-resources</phase>
89 <goals>
90 <goal>compile</goal>
91 </goals>
92 </execution>
93 </executions>
94 <configuration>
95 <jvmArgs>
96 <jvmArg>-Xms128m</jvmArg>
97 <jvmArg>-Xmx512m</jvmArg>
98 </jvmArgs>
99 </configuration>
100 </plugin>
101
102 </plugins>
103 </build>
104
105 </project>
完整java檔案
package com.cnblogs.yjmyzz.flink.demo;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.util.Collector;
import java.util.Properties;
/**
* @author 菩提樹下的楊過(javascript:void(0)/)
*/
public class KafkaStreamWordCount {
public static void main(String[] args) throws Exception {
// 1 設定環境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 定義資料
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "test-read-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");
DataStreamSource<String> text = env.addSource(new FlinkKafkaConsumer011<>(
"test1",
new SimpleStringSchema(),
props));
// 3. 處理邏輯
DataStream<Tuple2<String, Integer>> counts = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
//将每行按word拆分
String[] tokens = value.toLowerCase().split("\\b+");
//收集(類似:map-reduce思路)
for (String token : tokens) {
if (token.trim().length() > 0) {
out.collect(new Tuple2<>(token.trim(), 1));
}
}
}
})
//按Tuple2裡的第0項,即:word分組
.keyBy(value -> value.f0)
//然後對Tuple2裡的第1項求合
.sum(1);
// 4. 列印結果
counts.addSink(new FlinkKafkaProducer010<>("localhost:9092", "test2",
(SerializationSchema<Tuple2<String, Integer>>) element -> ("(" + element.f0 + "," + element.f1 + ")").getBytes()));
counts.print();
// execute program
env.execute("Kafka Streaming WordCount");
}
}
作者:菩提樹下的楊過
本文版權歸作者所有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接配接,否則保留追究法律責任的權利。