天天看點

大資料ClickHouse(十四):Integration系清單引擎

大資料ClickHouse(十四):Integration系清單引擎

文章目錄

​​Integration系清單引擎​​

​​一、HDFS​​

​​二、MySQL​​

​​三、Kafka​​

Integration系清單引擎

ClickHouse提供了許多與外部系統內建的方法,包括一些表引擎。這些表引擎與其他類型的表引擎類似,可以用于将外部資料導入到ClickHouse中,或者在ClickHouse中直接操作外部資料源。

一、HDFS

HDFS引擎支援ClickHouse 直接讀取HDFS中特定格式的資料檔案,目前檔案格式支援Json,Csv檔案等,ClickHouse通過HDFS引擎建立的表,不會在ClickHouse中産生資料,讀取的是HDFS中的資料,将HDFS中的資料映射成ClickHouse中的一張表,這樣就可以使用SQL操作HDFS中的資料。

ClickHouse并不能夠删除HDFS上的資料,當我們在ClickHouse用戶端中删除了對應的表,隻是删除了表結構,HDFS上的檔案并沒有被删除,這一點跟Hive的外部表十分相似。

  • 文法:
ENGINE = HDFS(URI, format)      

注意:URI是HDFS檔案路徑,format指定檔案格式。HDFS檔案路徑中檔案為多個時,可以指定成some_file_?,或者當資料映射的是HDFS多個檔案夾下資料時,可以指定somepath/* 來指定URI

  • 其他配置:

由于HDFS配置了HA 模式,有叢集名稱,是以URI使用mycluster HDFS叢集名稱時,ClickHouse不識别,這時需要做以下配置:

  1. 将hadoop路徑下$HADOOP_HOME/etc/hadoop下的hdfs-site.xml檔案複制到/etc/clickhouse-server目錄下。
  2. 修改/etc/init.d/clickhouse-server 檔案,加入一行 “export LIBHDFS3_CONF=/etc/clickhouse-server/hdfs-site.xml”
  3. 重新開機ClickHouse-server 服務
serveice clickhouse-server restart      

當然,這裡也可以不做以上配置,在寫HDFS URI時,直接寫成對應的節點+端口即可。

  • 示例:
#在HDFS路徑 hdfs://mycluster/ch/路徑下,建立多個csv檔案,寫入一些資料
c1.csv檔案内容:
  1,張三,19
2,李四,20
c2.csv檔案内容:
  3,王五,21
4,馬六,22

#建立表 t_hdfs,使用HDFS引擎
node1 :) create table t_hdfs(id UInt8,name String,age UInt8) engine = HDFS('hdfs://mycluster/ch/*.csv','CSV')

#查詢表 t_hdfs中的資料
node1 :) select * from t_hdfs;
┌─id─┬─name─┬─age─┐
│  3 │ 王五 │  21 │
│  4 │ 馬六 │  22 │
└────┴──────┴─────┘
┌─id─┬─name─┬─age─┐
│  1 │ 張三 │  19 │
│  2 │ 李四 │  20 │
└────┴──────┴─────┘      

注意:這裡表t_hdfs不會在clickhouse對應的節點路徑下建立資料目錄,同時這種表映射的是HDFS路徑中的csv檔案,不能插入資料,t_hdfs是隻讀表。

#建立表 t_hdfs2 檔案 ,使用HDFS引擎
node1 :) create table t_hdfs2(id UInt8,name String,age UInt8) engine = HDFS('hdfs://mycluster/chdata','CSV');

#向表 t_hdfs2中寫入資料
node1 :) insert into t_hdfs2 values(5,'田七',23),(6,'趙八',24);

#查詢表t_hdfs2中的資料
node1 :) select * from t_hdfs2;
┌─id─┬─name─┬─age─┐
│  5 │ 田七 │  23 │
│  6 │ 趙八 │  24 │
└────┴──────┴─────┘      

注意:t_hdfs2表沒有直接映射已經存在的HDFS檔案,這種表允許查詢和插入資料。

二、MySQL

ClickHouse MySQL資料庫引擎可以将MySQL某個庫下的表映射到ClickHouse中,使用ClickHouse對資料進行操作。ClickHouse同樣支援MySQL表引擎,即映射一張MySQL中的表到ClickHouse中,使用ClickHouse進行資料操作,與MySQL資料庫引擎一樣,這裡映射的表隻能做查詢和插入操作,不支援删除和更新操作。

  • 文法:
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
    ...
) ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause']);      

 以上文法的解釋如下:

  • host:port - MySQL伺服器名稱和端口
  • database - MySQL 資料庫。
  • table - 映射的MySQL中的表
  • user - 登入mysql的使用者名
  • password - 登入mysql的密碼
  • replace_query  - 将INSERT INTO 查詢是否替換為 REPLACE INTO 的标志,預設為0,不替換。當設定為1時,所有的insert into 語句更改為 replace into 語句。當插入的資料有重複主鍵資料時,此值為0預設報錯,此值為1時,主鍵相同這條資料,預設替換成新插入的資料。
  • on_duplicate_clause - 預設不使用。當插入資料主鍵相同時,可以指定隻更新某列的資料為新插入的資料,對應于on duplicate key 後面的語句,其他的值保持不變,需要replace_query 設定為0。
  • 示例:
#在mysql 中建立一張表 t_ch,指定id為主鍵
CREATE TABLE t_ch (
  id INT,
  NAME VARCHAR (255),
  age INT,
  PRIMARY KEY (id)
)

#向表中增加一些資料
insert into  t_ch values (1,"張三",18),(2,"李四",19),(3,"王五",20)

#在ClickHouse中建立MySQL引擎表 t_mysql_engine
node1 :) create table t_mysql_engine (
:-]  id UInt8,
:-]  name String,
:-]  age UInt8
:-] )engine = MySQL('node2:3306','test','t_ch','root','123456');

#查詢ClickHouse表 t_mysql_engine 中的資料:
node1 :) select * from t_mysql_engine;
┌─id─┬─name─┬─age─┐
│  1 │ 張三 │  18 │
│  2 │ 李四 │  19 │
│  3 │ 王五 │  20 │
└────┴──────┴─────┘

#在ClickHouse中向表 t_mysql_engine中插入一條資料
node1 :) insert into t_mysql_engine values (4,'馬六','21');
┌─id─┬─name─┬─age─┐
│  1 │ 張三 │  18 │
│  2 │ 李四 │  19 │
│  3 │ 王五 │  20 │
│  4 │ 馬六 │  21 │
└────┴──────┴─────┘

#在ClickHouse中向表 t_mysql_engine中再插入一條資料,這裡主鍵重複,報錯。
node1 :) insert into t_mysql_engine values (4,'田七','22');
Exception: mysqlxx::BadQuery: Duplicate entry '4' for key
 'PRIMARY' (node2:3306).      

注意:在clickhouse 中 t_mysql_engine表不會在ClickHouse伺服器節點上建立資料目錄。

  • 測試 replace_query  :
#在mysql 中删除表 t_ch,重新建立,指定id為主鍵
CREATE TABLE t_ch (
  id INT,
  NAME VARCHAR (255),
  age INT,
  PRIMARY KEY (id)
)

#向表中增加一些資料
insert into  t_ch values (1,"張三",18),(2,"李四",19),(3,"王五",20)

#在ClickHouse中删除MySQL引擎表 t_mysql_engine,重建
node1 :) create table t_mysql_engine (
:-]  id UInt8,
:-]  name String,
:-]  age UInt8
:-] )engine = MySQL('node2:3306','test','t_ch','root','123456',1);

#查詢ClickHouse表 t_mysql_engine 中的資料:
node1 :) select * from t_mysql_engine;
┌─id─┬─name─┬─age─┐
│  1 │ 張三 │  18 │
│  2 │ 李四 │  19 │
│  3 │ 王五 │  20 │
└────┴──────┴─────┘

#在ClickHouse中向表 t_mysql_engine中插入一條資料,主鍵重複。這裡由于指定了replace_query = 1 ,是以目前主鍵資料會被替換成新插入的資料。
node1 :) insert into t_mysql_engine values (3,'馬六','21');

#查詢ClichHouse t_mysql_engine表資料
node1 :) select * from t_mysql_engine;
┌─id─┬─name─┬─age─┐
│  1 │ 張三 │  18 │
│  2 │ 李四 │  19 │
│  3 │ 馬六 │  21 │
└────┴──────┴─────┘      
  • 測試 on_duplicate_clause:
#在mysql 中删除表 t_ch,重新建立,指定id為主鍵
CREATE TABLE t_ch (
  id INT,
  NAME VARCHAR (255),
  age INT,
  PRIMARY KEY (id)
)

#向表中增加一些資料
insert into  t_ch values (1,"張三",18),(2,"李四",19),(3,"王五",20)

#在ClickHouse中删除MySQL引擎表 t_mysql_engine,重建
node1 :) create table t_mysql_engine (
:-]  id UInt8,
:-]  name String,
:-]  age UInt8
:-] )engine = MySQL('node2:3306','test','t_ch','root','123456',0,'update age = values(age)');

#查詢ClickHouse表 t_mysql_engine 中的資料:
node1 :) select * from t_mysql_engine;
┌─id─┬─name─┬─age─┐
│  1 │ 張三 │  18 │
│  2 │ 李四 │  19 │
│  3 │ 王五 │  20 │
└────┴──────┴─────┘

#在ClickHouse 中向表 t_mysql_engine中插入一條資料
node1 :) insert into t_mysql_engine values (4,'馬六','21');
┌─id─┬─name─┬─age─┐
│  1 │ 張三 │  18 │
│  2 │ 李四 │  19 │
│  3 │ 王五 │  20 │
│  4 │ 馬六 │  21 │
└────┴──────┴─────┘

#在ClickHouse中向表 t_mysql_engine中插入一條資料,主鍵重複。
node1 :) insert into t_mysql_engine values (4,'田七','100');

#查詢ClichHouse t_mysql_engine表資料
node1 :) select * from t_mysql_engine;
┌─id─┬─name─┬─age─┐
│  1 │ 張三 │  18 │
│  2 │ 李四 │  19 │
│  3 │ 王五 │  20 │
│  4 │ 馬六 │ 100 │
└────┴──────┴─────┘      

三、Kafka

ClickHouse中還可以建立表指定為Kafka為表引擎,這樣建立出的表可以查詢到Kafka中的流資料。對應建立的表不會将資料存入ClickHouse中,這裡這張kafka引擎表相當于一個消費者,消費Kafka中的資料,資料被查詢過後,就不會再次被查詢到。

  • 文法:
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host:port',
    kafka_topic_list = 'topic1,topic2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format'[,]      

對以上參數的解釋:

  • kafka_broker_list: 以逗号分隔的Kafka Broker節點清單
  • kafka_topic_list : topic清單
  • kafka_group_name : kafka消費者組名稱
  • kafka_format : Kafka中消息的格式,例如:JSONEachRow、CSV等等,具體參照https://clickhouse.tech/docs/en/interfaces/formats/。這裡一般使用JSONEachRow格式資料,需要注意的是,json字段名稱需要與建立的Kafka引擎表中字段的名稱一樣,才能正确的映射資料。
  • 示例:
#建立表 t_kafka_consumer ,使用Kafka表引擎
node1 :) create table t_kafka_consumer (
:-] id UInt8,
:-] name String,
:-] age UInt8
:-] ) engine = Kafka()
:-] settings 
:-] kafka_broker_list='node1:9092,node2:9092,node3:9092',
:-] kafka_topic_list='ck-topic',
:-] kafka_group_name='group1',
:-] kafka_format='JSONEachRow';

#啟動kafka,在kafka中建立ck-topic topic,并向此topic中生産以下資料:
建立topic:
kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic ck-topic --partitions 3 --replication-factor 3

生産資料:
kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic ck-topic

生産資料如下:
{"id":1,"name":"張三","age":18}
{"id":2,"name":"李四","age":19}
{"id":3,"name":"王五","age":20}
{"id":4,"name":"馬六","age":21}
{"id":5,"name":"田七","age":22}

#在ClickHouse中查詢表 t_kafka_consumer資料,可以看到生産的資料
node1 :) select * from t_kafka_consumer;
┌─id─┬─name─┬─age─┐
│  2 │ 李四 │  19 │
│  5 │ 田七 │  22 │
│  1 │ 張三 │  18 │
│  4 │ 馬六 │  21 │
│  3 │ 王五 │  20 │
└────┴──────┴─────┘      

注意:再次檢視表 t_kafka_consumer資料 ,我們發現讀取不到任何資料,這裡對應的ClikcHouse中的Kafka引擎表,隻是相當于是消費者,消費讀取Kafka中的資料,資料被消費完成之後,不能再次查詢到對應的資料。

以上在ClickHouse中建立的Kafka引擎表 t_kafka_consumer 隻是一個資料管道,當查詢這張表時就是消費Kafka中的資料,資料被消費完成之後,不能再次被讀取到。如果想将Kafka中topic中的資料持久化到ClickHouse中,我們可以通過物化視圖方式通路Kafka中的資料,可以通過以下三個步驟完成将Kafka中資料持久化到ClickHouse中:

  1. 建立Kafka 引擎表,消費kafka中的資料。
  2. 再建立一張ClickHouse中普通引擎表,這張表面向終端使用者查詢使用。這裡生産環境中經常建立MergeTree家族引擎表。
  3. 建立物化視圖,将Kafka引擎表資料實時同步到終端使用者查詢表中。
#在ClickHouse中建立 t_kafka_consumer2 表,使用Kafka引擎
node1 :) create table t_kafka_consumer2 (
:-] id UInt8,
:-] name String,
:-] age UInt8
:-] ) engine = Kafka()
:-] settings 
:-] kafka_broker_list='node1:9092,node2:9092,node3:9092',
:-] kafka_topic_list='ck-topic',
:-] kafka_group_name='group1',
:-] kafka_format='JSONEachRow';

#在ClickHouse中建立一張終端使用者查詢使用的表,使用MergeTree引擎
node1 :) create table t_kafka_mt(
:-] id UInt8,
:-] name String,
:-] age UInt8
:-] ) engine = MergeTree()
:-] order by id;

#建立物化視圖,同步表t_kafka_consumer2資料到t_kafka_mt中
node1 :) create materialized view  view_consumer to t_kafka_mt 
:-] as select id,name,age from t_kafka_consumer2;      
#向Kafka ck-topic中生産以下資料:
生産資料:
kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic ck-topic

生産資料如下:
{"id":1,"name":"張三","age":18}
{"id":2,"name":"李四","age":19}
{"id":3,"name":"王五","age":20}
{"id":4,"name":"馬六","age":21}
{"id":5,"name":"田七","age":22}


#查詢表 t_kafka_mt中的資料,資料同步完成。
node1 :) select * from t_kafka_mt;
┌─id─┬─name─┬─age─┐
│  1 │ 張三 │  18 │
│  2 │ 李四 │  19 │
│  3 │ 王五 │  20 │
│  4 │ 馬六 │  21 │
│  5 │ 田七 │  22 │
└────┴──────┴─────┘      
  • 📢歡迎點贊 👍 收藏 ⭐留言 📝 如有錯誤敬請指正!
  • 📢本文由 Lansonli 原創,

繼續閱讀