上次的例子https://blog.csdn.net/xxkalychen/article/details/117149540?spm=1001.2014.3001.5502将Flink的資料源設定為Socket,隻是為了測試提供流式資料。生産中一般不會這麼用,标準模型是從消息隊列擷取流式資料。Flink提供了跟kafka連接配接的封裝,我們隻需要一點小小的改動就可以實作從Kafka擷取資料。
不過修改之前,需要搭建一個Kafka伺服器。具體搭建過程這裡不做詳述。現在我們來修改程式。
一、添加pom依賴。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
其版本要和flink保持一緻。
二、修改測試類。我們另外建立一個測試類。
package com.chris.flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import java.util.Arrays;
import java.util.Properties;
/**
* @author Chris Chan
* Create on 2021/5/22 7:23
* Use for:
* Explain: Flink流式處理從Kafka擷取的資料
*/
public class KafkaStreamTest {
public static void main(String[] args) throws Exception {
new KafkaStreamTest().execute(args);
}
private void execute(String[] args) throws Exception {
//擷取執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//配置kafka
Properties properties = new Properties();
properties.put("bootstrap.servers", "flink.chris.com:9092");
properties.put("group.id", "flink_group_1");
//從socket擷取資料
DataStreamSource<String> streamSource = env.addSource(new FlinkKafkaConsumer<String>("topic_flink", new SimpleStringSchema(), properties));
//wordcount計算
SingleOutputStreamOperator<Tuple2<String, Integer>> operator = streamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
/**
* map計算
* @param value 輸入資料 用空格分隔的句子
* @param out map計算之後的收集器
* @throws Exception
*/
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
//用空格分隔為單詞
String[] words = value.split(" ");
//統計單詞使用頻次,放入收集器
Arrays.stream(words)
//洗去前後空格
.map(String::trim)
//過濾掉空字元串
.filter(word -> !"".equals(word))
//加入收集器
.forEach(word -> out.collect(new Tuple2<>(word, 1)));
}
})
//按照二進制組第一個字段word分組,把第二個字段統計出來
.keyBy(0).sum(1);
operator.print();
env.execute();
}
}
修改的部分也是四行。
//配置kafka
Properties properties = new Properties();
properties.put("bootstrap.servers", "flink.chris.com:9092");
properties.put("group.id", "flink_group_1");
//從socket擷取資料
DataStreamSource<String> streamSource = env.addSource(new FlinkKafkaConsumer<String>("topic_flink", new SimpleStringSchema(), properties));
配置一下Kafka的伺服器和消費者分組,然後建立消費者,從Kafka擷取資料。
三、測試。
測試之前,首先啟動zookeeper和kafka,并且讓kafka處于主題消息生産發送狀态。
nohup /var/app/kafka_2.13-2.8.0/bin/zookeeper-server-start.sh /var/app/kafka_2.13-2.8.0/config/zookeeper.properties > /var/app/kafka_2.13-2.8.0/logs/zookeeper.log 2>&1 &
nohup /var/app/kafka_2.13-2.8.0/bin/kafka-server-start.sh /var/app/kafka_2.13-2.8.0/config/server.properties > /var/app/kafka_2.13-2.8.0/logs/kafka.log 2>&1 &
/var/app/kafka_2.13-2.8.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic_flink
運作程式,在kafka輸入資料。
檢視背景。
送出到叢集執行效果。
可是并不成功,原因很簡單,Flink叢集隻包含Flink基本必須的包,對于工程中使用的其他包是不包含的,是以會出現找不到相關類的異常。是以我們還需要配置maven的assmbly插件,采用完全打包的方式。
在pom中添加
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.chris.flink.KafkaStreamTest</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
這個配置指定了一個主類。
然後打包。
mvn clean package assembly:assembly
打包過程耗時比較長,因為這種方法需要把依賴包全都打包進去。打包完成之後,在target下有個
flink-demo-20210522-1.0.0-SNAPSHOT-jar-with-dependencies.jar
這就是我們需要的完整jar包了。
把這個包從頁面傳上去,建立任務時發現主類已經填好了,就是因為我們已經在清單中配置了一個主類。如果要使用其他主類,就修改。
送出之後,稍等一會,就處于運作狀态,等待資料。
我們在kafka輸入資料。
檢視線上标準輸出。
測完之後,發現之前有個小擔心不存在。原來在測試storm時,本地需要的基礎包線上上不能有,否則會沖突,是以不停在改pom中的作用域,很是折騰人。原以為flink也會出現這種問題,結果沒有,輕松好多。