天天看點

Java整合Flink流式處理從Kafka擷取的資料

上次的例子https://blog.csdn.net/xxkalychen/article/details/117149540?spm=1001.2014.3001.5502将Flink的資料源設定為Socket,隻是為了測試提供流式資料。生産中一般不會這麼用,标準模型是從消息隊列擷取流式資料。Flink提供了跟kafka連接配接的封裝,我們隻需要一點小小的改動就可以實作從Kafka擷取資料。

不過修改之前,需要搭建一個Kafka伺服器。具體搭建過程這裡不做詳述。現在我們來修改程式。

一、添加pom依賴。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.12</artifactId>
    <version>${flink.version}</version>
</dependency>
           

其版本要和flink保持一緻。

二、修改測試類。我們另外建立一個測試類。

package com.chris.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.util.Arrays;
import java.util.Properties;

/**
 * @author Chris Chan
 * Create on 2021/5/22 7:23
 * Use for:
 * Explain: Flink流式處理從Kafka擷取的資料
 */
public class KafkaStreamTest {
    public static void main(String[] args) throws Exception {
        new KafkaStreamTest().execute(args);
    }

    private void execute(String[] args) throws Exception {
        //擷取執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //配置kafka
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "flink.chris.com:9092");
        properties.put("group.id", "flink_group_1");
        //從socket擷取資料
        DataStreamSource<String> streamSource = env.addSource(new FlinkKafkaConsumer<String>("topic_flink", new SimpleStringSchema(), properties));

        //wordcount計算
        SingleOutputStreamOperator<Tuple2<String, Integer>> operator = streamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            /**
             * map計算
             * @param value 輸入資料 用空格分隔的句子
             * @param out map計算之後的收集器
             * @throws Exception
             */
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                //用空格分隔為單詞
                String[] words = value.split(" ");
                //統計單詞使用頻次,放入收集器
                Arrays.stream(words)
                        //洗去前後空格
                        .map(String::trim)
                        //過濾掉空字元串
                        .filter(word -> !"".equals(word))
                        //加入收集器
                        .forEach(word -> out.collect(new Tuple2<>(word, 1)));
            }
        })
                //按照二進制組第一個字段word分組,把第二個字段統計出來
                .keyBy(0).sum(1);
        operator.print();

        env.execute();
    }
}
           

修改的部分也是四行。

//配置kafka
Properties properties = new Properties();
properties.put("bootstrap.servers", "flink.chris.com:9092");
properties.put("group.id", "flink_group_1");
//從socket擷取資料
DataStreamSource<String> streamSource = env.addSource(new FlinkKafkaConsumer<String>("topic_flink", new SimpleStringSchema(), properties));
           

配置一下Kafka的伺服器和消費者分組,然後建立消費者,從Kafka擷取資料。

三、測試。

測試之前,首先啟動zookeeper和kafka,并且讓kafka處于主題消息生産發送狀态。

nohup /var/app/kafka_2.13-2.8.0/bin/zookeeper-server-start.sh /var/app/kafka_2.13-2.8.0/config/zookeeper.properties > /var/app/kafka_2.13-2.8.0/logs/zookeeper.log 2>&1 &
nohup /var/app/kafka_2.13-2.8.0/bin/kafka-server-start.sh /var/app/kafka_2.13-2.8.0/config/server.properties > /var/app/kafka_2.13-2.8.0/logs/kafka.log 2>&1 &
           
/var/app/kafka_2.13-2.8.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic_flink
           

運作程式,在kafka輸入資料。

Java整合Flink流式處理從Kafka擷取的資料

檢視背景。

Java整合Flink流式處理從Kafka擷取的資料

送出到叢集執行效果。

可是并不成功,原因很簡單,Flink叢集隻包含Flink基本必須的包,對于工程中使用的其他包是不包含的,是以會出現找不到相關類的異常。是以我們還需要配置maven的assmbly插件,采用完全打包的方式。

在pom中添加

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>

        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
                <archive>
                    <manifest>
                        <mainClass>com.chris.flink.KafkaStreamTest</mainClass>
                    </manifest>
                </archive>
            </configuration>
        </plugin>
    </plugins>
</build>
           

這個配置指定了一個主類。

然後打包。

mvn clean package assembly:assembly
           

打包過程耗時比較長,因為這種方法需要把依賴包全都打包進去。打包完成之後,在target下有個

flink-demo-20210522-1.0.0-SNAPSHOT-jar-with-dependencies.jar
           

這就是我們需要的完整jar包了。

把這個包從頁面傳上去,建立任務時發現主類已經填好了,就是因為我們已經在清單中配置了一個主類。如果要使用其他主類,就修改。

送出之後,稍等一會,就處于運作狀态,等待資料。

我們在kafka輸入資料。

Java整合Flink流式處理從Kafka擷取的資料

檢視線上标準輸出。

Java整合Flink流式處理從Kafka擷取的資料

測完之後,發現之前有個小擔心不存在。原來在測試storm時,本地需要的基礎包線上上不能有,否則會沖突,是以不停在改pom中的作用域,很是折騰人。原以為flink也會出現這種問題,結果沒有,輕松好多。