一.說明
在大資料平台的業務場景中,處理實時kafka資料流資料,以成為必要的能力;此篇将嘗試通過Apache NiFi來接入Kafka資料然後處理後存儲之HBase
二.開拔
Ⅰ).配置ConsumeKafka_0_10
測試使用了kafka0.10版本,是以NiFi中也選擇對于版本
a).選擇ConsumeKafka_0_10
在Processor中搜尋ConsumeKafka_0_10
b).配置ConsumeKafka_0_10
1.Kafka Brokers: hostname1:9092,hostname2:9092:hostname3:9092
2.Topic Name(s): entry_index_nifi
3.Group ID: entry_index_nifi
Ⅱ).配置PutHBaseJSON
a).選擇PutHBaseJSON
在Processor中搜尋PutHBaseJSON
b).配置PutHBaseJSON
1.HBase Client Service: 選擇比對版本的HBaseClient
2.Table Name: 配置入庫HBase表名
3.Row Identifier Field Name: 配置RowKey值
4.Column Family: 配置列簇
c).選擇HBase_1_1_2_ClientService
在Processor中搜尋HBase_1_1_2_ClientService
d).配置HBase_1_1_2_ClientService
1.Zookeeper Quorum: hostname1:2181,hostname2:2181,hostname3:2181
2.Zookeeper Client Port: 2181
3.Zookeeper ZNode Parent: /hbase
e).激活HBase_1_1_2_ClientService
Ⅲ).啟動服務
可以點選選擇單個Processor啟動,也可以在空白處點選流程啟動
Ⅳ).驗證結果
a).Kafka源資料
./bin/kafak-console-consumer.sh --zookeeper hostname1:2181,hostname2:2181,hostname3:2181 ---topic entry_index_nifi
b).HBase入庫資料
scan 'kafka.entry_index_nifi',{LIMIT=>10}
三.HBase指令
## 建立表空間
create_namespace 'kafka'
## 檢視表空間
list_namespace_tables 'kafka'
## 建立表
create 'kafka.entry_index_nifi','cf1'
## 檢視表資料
scan 'kafka.entry_index_nifi',{LIMIT=>10}