Flume自定義clickhouse sink
CommonConf.java
package com.tbl.flume.conf;
public class CommonConf {
public static final String TIME_FIELD = "time_field";
public static final String TOPIC = "topic";
public static final String TOPIC_PREFIX = "topic_prefix";
public static final String TABLE = "table";
public static final String CONSUME_DATE="consume_date";
public static final String CONSUME_DATE_TIME="consume_date_time";
}
AbstractInterceptor.java
package com.tbl.flume.interceptor;
import com.tbl.flume.conf.CommonConf;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* 自定義flume攔截器
*/
public abstract class AbstractInterceptor implements Interceptor {
protected static final Logger logger = LoggerFactory.getLogger(AbstractInterceptor.class);
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
doIntercept(event);
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
for (Event event : events) {
intercept(event);
}
return events;
}
@Override
public void close() {
}
protected abstract void doIntercept(Event event);
}
TimeJsonObjectEventInterceptor.java
package com.tbl.flume.interceptor;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.tbl.flume.conf.CommonConf;
import com.tbl.flume.sink.AbstractClickhouseSink;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.apache.flume.interceptor.InterceptorBuilderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import static com.tbl.flume.conf.CommonConf.*;
public class TimeJsonObjectEventInterceptor extends AbstractInterceptor {
private String timeField;
private String prefix;
public TimeJsonObjectEventInterceptor(Context context) {
timeField = context.getString(TIME_FIELD);
prefix = context.getString(TOPIC_PREFIX);
}
@Override
protected void doIntercept(Event event) {
JSONObject jsonObject = JSON.parseObject(new String(event.getBody(), StandardCharsets.UTF_8));
String dateTime = jsonObject.getString(timeField);
String[] ts = dateTime.split(" ");
event.getHeaders().put(CONSUME_DATE, ts[0]);
event.getHeaders().put(CONSUME_DATE_TIME, dateTime);
String topic = event.getHeaders().get(TOPIC);
String[] topics = topic.split(prefix);
event.getHeaders().put(TABLE, topics[topics.length - 1]);
}
public static class Builder implements Interceptor.Builder {
private Context context;
@Override
public Interceptor build() {
return new TimeJsonObjectEventInterceptor(context);
}
@Override
public void configure(Context context) {
this.context = context;
}
}
}
AbstractClickhouseSink.java
package com.tbl.flume.sink;
import cc.blynk.clickhouse.BalancedClickhouseDataSource;
import cc.blynk.clickhouse.ClickHouseConnectionImpl;
import cc.blynk.clickhouse.ClickHouseDataSource;
import cc.blynk.clickhouse.copy.CopyManager;
import cc.blynk.clickhouse.copy.CopyManagerFactory;
import cc.blynk.clickhouse.settings.ClickHouseProperties;
import cn.hutool.db.Db;
import cn.hutool.db.Entity;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.mortbay.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import static com.tbl.flume.sink.ClickHouseSinkConstants.*;
import static com.tbl.flume.sink.ClickHouseSinkConstants.DEFAULT_BATCH_SIZE;
public abstract class AbstractClickhouseSink extends AbstractSink implements Configurable {
protected static final Logger logger = LoggerFactory.getLogger(AbstractClickhouseSink.class);
private BalancedClickhouseDataSource balancedClickhouseDataSource;
private SinkCounter sinkCounter;
private ClickHouseProperties clickHouseProperties;
private String table;
private String urls;
private long maxWaitTime;
private String table_prefix;
private Db db;
private List<Entity> entities;
private long lastBatchToCkTime;
/**
* 批量的大小
*/
private int batchSize;
@Override
public synchronized void start() {
balancedClickhouseDataSource = new BalancedClickhouseDataSource(urls, clickHouseProperties);
db = Db.use(balancedClickhouseDataSource);
entities = new ArrayList<>();
sinkCounter.start();
super.start();
}
/*
* 單線程執行
*/
@Override
public Status process() throws EventDeliveryException {
Status status = null;
// Start transaction
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
int count;
StringBuilder batch = new StringBuilder();
saveToClickhouse();
for (count = 0; count < batchSize; ++count) {
Event event = ch.take();
if (event == null) {
break;
}
String headTable = event.getHeaders().get(TABLE);
if (StringUtils.isNotBlank(headTable)) {
entities.add(eventToEntity(event, table_prefix + headTable));
} else {
Preconditions.checkArgument(StringUtils.isNotBlank(table), "ClickHouse table must be specified!");
entities.add(eventToEntity(event, table));
}
}
if (count <= 0) {
sinkCounter.incrementBatchEmptyCount();
txn.commit();
return Status.BACKOFF;
} else if (count < batchSize) {
sinkCounter.incrementBatchUnderflowCount();
} else {
sinkCounter.incrementBatchCompleteCount();
}
sinkCounter.addToEventDrainAttemptCount(count);
saveToClickhouse();
sinkCounter.incrementEventDrainSuccessCount();
status = Status.READY;
txn.commit();
logger.debug("clickhouse sink commit ok");
} catch (Throwable t) {
txn.rollback();
logger.error(t.getMessage(), t);
status = Status.BACKOFF;
// re-throw all Errors
if (t instanceof Error) {
throw (Error) t;
}
} finally {
txn.close();
}
return status;
}
@Override
public void configure(Context context) {
logger.debug("開始初始化clickhouse-sink");
sinkCounter = new SinkCounter(getName());
clickHouseProperties = new ClickHouseProperties();
this.batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE);
String servers = context.getString(SERVERS);
String database = context.getString(DATABASE);
this.table_prefix = context.getString(TABLE_PREFIX);
this.table = context.getString(TABLE);
String user = context.getString(USER);
String password = context.getString(PASSWORD);
Preconditions.checkArgument(StringUtils.isNotBlank(servers), "ClickHouse host must be specified!");
Preconditions.checkArgument(StringUtils.isNotBlank(database), "ClickHouse database must be specified!");
Preconditions.checkArgument(StringUtils.isNotBlank(user), "ClickHouse user must be specified!");
Preconditions.checkArgument(StringUtils.isNotBlank(password), "ClickHouse password must be specified!");
this.maxWaitTime = context.getLong(MAX_WAIT_TIME, DEFAULT_MAX_WAIT_TIME);
//jdbc:clickhouse://<first-host>:<port>,<second-host>: <port>/<database>
this.urls = String.format("%s%s/%s", JDBC_CLICKHOUSE_PROTOCOL, servers, database);
logger.info(urls);
logger.info(table_prefix);
clickHouseProperties.withCredentials(user, password);
logger.debug("結束初始化clickhouse-sink");
}
/**
* 将event轉化為Entity
*
* @param event
* @return
*/
protected abstract Entity eventToEntity(Event event, String tableName);
protected void saveToClickhouse() throws SQLException {
long now = System.currentTimeMillis();
long waitTime = now - lastBatchToCkTime;
if (0 == lastBatchToCkTime || entities.size() >= batchSize || waitTime >= maxWaitTime) {
db.insert(entities);
logger.debug("{} data save to clickhouse success", entities.size());
lastBatchToCkTime = now;
} else {
logger.debug("wait for next data batch ......");
}
}
@Override
public synchronized void stop() {
sinkCounter.incrementConnectionClosedCount();
sinkCounter.stop();
//這裡還需要将剩餘的資料入庫
super.stop();
}
}
ClickHouseSinkConstants.java
package com.tbl.flume.sink;
public class ClickHouseSinkConstants {
/**
* ip1:port1,ip2:port2
*/
public static final String SERVERS = "servers";
public static final String BATCH_SIZE = "batchSize";
public static final String USER = "user";
public static final String PASSWORD = "password";
public static final String DATABASE = "database";
public static final String TABLE = "table";
public static final String TABLE_PREFIX = "table_prefix";
public static final int DEFAULT_BATCH_SIZE = 10000;
public static final String MAX_WAIT_TIME = "max_waite_time";
public static final long DEFAULT_MAX_WAIT_TIME = 10000;
public static final String JDBC_CLICKHOUSE_PROTOCOL = "jdbc:clickhouse://";
}
JsonObjectClickhouseSink.java
package com.tbl.flume.sink;
import cn.hutool.db.Entity;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Event;
import java.nio.charset.StandardCharsets;
public class JsonObjectClickhouseSink extends AbstractClickhouseSink {
@Override
protected Entity eventToEntity(Event event, String tableName) {
JSONObject jsonObject = JSON.parseObject(new String(event.getBody(), StandardCharsets.UTF_8));
Entity entity = new Entity();
entity.setTableName(tableName);
entity.putAll(jsonObject);
return entity;
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>yth</artifactId>
<groupId>com.tbl</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>flume</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.6.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>cc.blynk.clickhouse</groupId>
<artifactId>clickhouse4j</artifactId>
<version>1.4.4</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.3.10</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<finalName>tbl-flume-consume.jar</finalName>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>