天天看點

Windows平台下kafka+flink開發環境搭建

1.準備工作

安裝JDK1.8(注意最好設定環境變量JAVA_HOME)

安裝SCALA 2.11

下載下傳kafka

下載下傳flink1.7

2.kafka安裝

由于kafka自帶zookeeper是以下載下傳下來就能使用。

1.進入目前目錄的DOS界面:在 kafka 目錄,按住shift+滑鼠右鍵->在此處打開指令視窗(W)

Windows平台下kafka+flink開發環境搭建

2.開啟zookeeper服務

bin\windows\zookeeper-server-start.bat config\zookeeper.properties

Windows平台下kafka+flink開發環境搭建
Windows平台下kafka+flink開發環境搭建

3.啟動第二個DOS界面開啟kafka服務

bin\windows\kafka-server-start.bat config\server.properties

Windows平台下kafka+flink開發環境搭建
Windows平台下kafka+flink開發環境搭建

4.啟動第三個DOS界面開啟kafka生産者(produce)

  • 建立一個主題test

    bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

    Windows平台下kafka+flink開發環境搭建
  • 檢視主題test

    bin\windows\kafka-topics.bat --list --zookeeper localhost:2181

    Windows平台下kafka+flink開發環境搭建
  • 啟動生産者(consumer)

    bin\windows\kafka-console-producer.bat --broker-list {本機ip}:9092 --topic test

    由于是本地{本機ip}=localhost

    Windows平台下kafka+flink開發環境搭建

    5.啟動第4個DOS界面建立消費者

    bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

    Windows平台下kafka+flink開發環境搭建
    6.驗證生産者和消費者之間的聯通性
    Windows平台下kafka+flink開發環境搭建
    7.idea開發的pom檔案添加下面的依賴
    Windows平台下kafka+flink開發環境搭建

    8.編寫代碼

    public static void main(String[] args) throws Exception {

    // create execution environment

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties properties = new Properties();

    properties.setProperty(“bootstrap.servers”, “localhost:9092”);

    properties.setProperty(“group.id”, “flink_consumer”);

    DataStream stream = env.addSource(new FlinkKafkaConsumer<>(

    “demo”, new SimpleStringSchema(), properties) );

    stream.map(new MapFunction<String, String>() {

    private static final long serialVersionUID = -6867736771747690202L;

    @Override

    public String map(String value) throws Exception {

    return "Stream Value: " + value;

    }}).print();

    env.execute();

    }

    9.IDEA執行結果

    Windows平台下kafka+flink開發環境搭建

    10.啟動flink進入bin目錄下執行start-cluster.bat

    此時出現兩個Java的dos界面,同時可以進入http://localhost:8081檢視

    Windows平台下kafka+flink開發環境搭建
    Windows平台下kafka+flink開發環境搭建
    11送出程式檢視結果
    Windows平台下kafka+flink開發環境搭建
    Windows平台下kafka+flink開發環境搭建

繼續閱讀