1.準備工作
安裝JDK1.8(注意最好設定環境變量JAVA_HOME)
安裝SCALA 2.11
下載下傳kafka
下載下傳flink1.7
2.kafka安裝
由于kafka自帶zookeeper是以下載下傳下來就能使用。
1.進入目前目錄的DOS界面:在 kafka 目錄,按住shift+滑鼠右鍵->在此處打開指令視窗(W)
2.開啟zookeeper服務
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
3.啟動第二個DOS界面開啟kafka服務
bin\windows\kafka-server-start.bat config\server.properties
4.啟動第三個DOS界面開啟kafka生産者(produce)
-
建立一個主題test
bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
-
檢視主題test
bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
-
啟動生産者(consumer)
bin\windows\kafka-console-producer.bat --broker-list {本機ip}:9092 --topic test
由于是本地{本機ip}=localhost
5.啟動第4個DOS界面建立消費者
bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
6.驗證生産者和消費者之間的聯通性 7.idea開發的pom檔案添加下面的依賴8.編寫代碼
public static void main(String[] args) throws Exception {
// create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty(“bootstrap.servers”, “localhost:9092”);
properties.setProperty(“group.id”, “flink_consumer”);
DataStream stream = env.addSource(new FlinkKafkaConsumer<>(
“demo”, new SimpleStringSchema(), properties) );
stream.map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(String value) throws Exception {
return "Stream Value: " + value;
}}).print();
env.execute();
}
9.IDEA執行結果
10.啟動flink進入bin目錄下執行start-cluster.bat
此時出現兩個Java的dos界面,同時可以進入http://localhost:8081檢視
11送出程式檢視結果