天天看點

【Flink】flink ClickHouseSink--資料寫入ClickHouse1.概述一、下載下傳Flink源碼,添加ClickHOuseDialect檔案二、添加ClickHouseRowConverter

【Flink】flink ClickHouseSink--資料寫入ClickHouse1.概述一、下載下傳Flink源碼,添加ClickHOuseDialect檔案二、添加ClickHouseRowConverter

1.概述

轉載:Flink自定義ClickHouseSink–資料寫入ClickHouse

這個版本是flink1.11版本才有的相關功能。

遇到需要将Kafka資料寫入ClickHouse的場景,本文将介紹如何使用Flink JDBC Connector将資料寫入ClickHouse

Flink JDBC Connector

Flink JDBC源碼:

/**
 * Default JDBC dialects.
 */
public final class JdbcDialects {

	private static final List<JdbcDialect> DIALECTS = Arrays.asList(
		new DerbyDialect(),
		new MySQLDialect(),
		new PostgresDialect(),
		new ClickHouseDialect()
	);
           

包含三種Connector,但是不包含ClickHouse的連接配接方式

現在通過自定義實作ClickHouseSink

一、下載下傳Flink源碼,添加ClickHOuseDialect檔案

【Flink】flink ClickHouseSink--資料寫入ClickHouse1.概述一、下載下傳Flink源碼,添加ClickHOuseDialect檔案二、添加ClickHouseRowConverter

以下是ClickHOuseDialect檔案裡面的代碼

備注:因為Clickhouse不支援删除操作,是以這個檔案内的getDeleteStatement、getUpdateStatement方法都預設調的getInsertIntoStatement方法,即插入操作,有需求的也可以把删除和更新操作都實作了(有一個思路是給資料做一個标志字段,插入是1,删除是-1,更新是先-1,在1)

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.connector.jdbc.dialect;

import org.apache.flink.connector.jdbc.internal.converter.ClickHouseRowConverter;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;


/**
 * JDBC dialect for ClickHouse.
 */
public class ClickHouseDialect extends AbstractDialect {

	private static final long serialVersionUID = 1L;

	@Override
	public String dialectName() {
		return "ClickHouse";
	}

	@Override
	public boolean canHandle(String url) {
		return url.startsWith("jdbc:clickhouse:");
	}

	@Override
	public JdbcRowConverter getRowConverter(RowType rowType) {
		return new ClickHouseRowConverter(rowType);
	}

	@Override
	public Optional<String> defaultDriverName() {
		return Optional.of("ru.yandex.clickhouse.ClickHouseDriver");
	}

	@Override
	public String quoteIdentifier(String identifier) {
		return "`" + identifier + "`";
	}

	@Override
	public Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) {
		return Optional.of(getInsertIntoStatement(tableName, fieldNames));
	}

	@Override
	public String getRowExistsStatement(String tableName, String[] conditionFields) {
		return null;
	}

//	@Override
//	public String getInsertIntoStatement(String tableName, String[] fieldNames) {
//
//	}

	@Override
	public String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) {
		return getInsertIntoStatement(tableName, fieldNames);
	}

	@Override
	public String getDeleteStatement(String tableName, String[] fieldNames) {
		return getInsertIntoStatement(tableName, fieldNames);
	}

	@Override
	public String getSelectFromStatement(String tableName, String[] selectFields, String[] conditionFields) {
		return null;
	}

	@Override
	public int maxDecimalPrecision() {
		return 0;
	}

	@Override
	public int minDecimalPrecision() {
		return 0;
	}

	@Override
	public int maxTimestampPrecision() {
		return 0;
	}

	@Override
	public int minTimestampPrecision() {
		return 0;
	}

	@Override
	public List<LogicalTypeRoot> unsupportedTypes() {
		// The data types used in Mysql are list at:
		// https://dev.mysql.com/doc/refman/8.0/en/data-types.html

		// TODO: We can't convert BINARY data type to
		//  PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in LegacyTypeInfoDataTypeConverter.
		return Arrays.asList(
			LogicalTypeRoot.BINARY,
			LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
			LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE,
			LogicalTypeRoot.INTERVAL_YEAR_MONTH,
			LogicalTypeRoot.INTERVAL_DAY_TIME,
			LogicalTypeRoot.ARRAY,
			LogicalTypeRoot.MULTISET,
			LogicalTypeRoot.MAP,
			LogicalTypeRoot.ROW,
			LogicalTypeRoot.DISTINCT_TYPE,
			LogicalTypeRoot.STRUCTURED_TYPE,
			LogicalTypeRoot.NULL,
			LogicalTypeRoot.RAW,
			LogicalTypeRoot.SYMBOL,
			LogicalTypeRoot.UNRESOLVED
		);
	}
}


           

完了把ClickHouseDialect方法加到JdbcDialects下

【Flink】flink ClickHouseSink--資料寫入ClickHouse1.概述一、下載下傳Flink源碼,添加ClickHOuseDialect檔案二、添加ClickHouseRowConverter

二、添加ClickHouseRowConverter

【Flink】flink ClickHouseSink--資料寫入ClickHouse1.概述一、下載下傳Flink源碼,添加ClickHOuseDialect檔案二、添加ClickHouseRowConverter
【Flink】flink ClickHouseSink--資料寫入ClickHouse1.概述一、下載下傳Flink源碼,添加ClickHOuseDialect檔案二、添加ClickHouseRowConverter

三、打包,上傳

把寫好的源碼打包上傳到flink安裝目錄的lib目錄下,另外,clickhouse和kafka相關的包最好都提前下好,避免運作報錯

【Flink】flink ClickHouseSink--資料寫入ClickHouse1.概述一、下載下傳Flink源碼,添加ClickHOuseDialect檔案二、添加ClickHouseRowConverter

四、測試

跑一個flink shell進行測試

bin/pyflink-shell.sh  local
1
           

跑之前,需要在clickhouse裡建好相應的庫表

以下是代碼:

把資料寫入clickhouse

st_env.sql_update("""CREATE TABLE t(
  `class_id` string,
  `task_id` string,
  `history_id` string,
  `id` string
) WITH (
  'connector.type' = 'kafka',
  'connector.version' = 'universal',
  'connector.topic' = 'smallcourse__learn_record',
  'connector.properties.zookeeper.connect' = 'localhost:2181',
  'connector.properties.bootstrap.servers' = 'localhost:9092',
  'connector.properties.group.id' = 't2',
  'connector.startup-mode' = 'latest-offset',
  'format.type' = 'json',
  'update-mode' = 'append'
)""")


st_env.sql_update("""CREATE TABLE test_lin (
   `class_id` string,
  `task_id` string,
  `history_id` string,
  `id` string
) WITH (
  'connector.type' = 'jdbc', 
  'connector.url' = 'jdbc:clickhouse://localhost:8123/demo', 
  'connector.table' = 'test_lin',  
  'connector.driver' = 'ru.yandex.clickhouse.ClickHouseDriver', 
  'connector.username' = '', 
  'connector.password' = '',
  'connector.write.flush.max-rows' = '1'
  )""")

d=st_env.sql_query("select class_id,task_id,history_id,id from t")
d.insert_into("test_lin")
st_env.execute("d")

           

最後:

去clickhouse執行sql

select count() from test_lin;

1

結果:

【Flink】flink ClickHouseSink--資料寫入ClickHouse1.概述一、下載下傳Flink源碼,添加ClickHOuseDialect檔案二、添加ClickHouseRowConverter