方式一 通過JDBCOutputFormat
在flink中沒有現成的用來寫入MySQL的sink,但是flink提供了一個類,
JDBCOutputFormat,通過這個類,如果你提供了jdbc的driver,則可以當做sink使用。
JDBCOutputFormat其實是flink的batch api,但也可以用來作為stream的api使用,社群也推薦通過這種方式來進行。
JDBCOutputFormat用起來很簡單,隻需要一個prepared statement,driver和database connection,就可以開始使用了。
1 JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
2 .setDrivername("com.mysql.jdbc.Driver")
3 .setDBUrl("jdbc:mysql://localhost:1234/test?user=xxx&password=xxx")
4 .setQuery(query)
5 .finish();
如下的sql語句可以作為prepared statement:
String query = "INSERT INTO public.cases (caseid, tracehash) VALUES (?, ?)";
對應的表的結構:
1 CREATE TABLE cases
2 (
3 caseid VARCHAR(255),
4 tracehash VARCHAR(255)
5 );
但有一點要明确,JDBCOutputFormat隻能處理Row,而Row是對prepared statement的參數的一個包裝類。這意味着我們需要将流中的case轉換為row,通過map就能做的。
1 DataStream<Case> cases = ...
2
3 DataStream<Row> rows = cases.map((MapFunction<Case, Row>) aCase -> {
4 Row row = new Row(2); // our prepared statement has 2 parameters
5 row.setField(0, aCase.getId()); //first parameter is case ID
6 row.setField(1, aCase.getTraceHash()); //second paramater is tracehash
7 return row;
8 });
這樣,我們就能添加sink了:
1 rows.writeUsingOutputFormat(jdbcOutput);
這樣,你就可以将資料寫入mysql了。
但是在你在流上附加了視窗之後,可能會得到下面的報錯:
1 "Unknown column type for column %s. Best effort approach to set its value: %s."
因為視窗處理的類型,沒有明确的類型定義,如下修改之前的定義,顯式的指定類型:
1 JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
2 .setDrivername("com.mysql.jdbc.Driver")
3 .setDBUrl("jdbc:mysql://localhost:1234/test?user=xxx&password=xxx")
4 .setQuery(query)
5 .setSqlTypes(new int[] { Types.VARCHAR, Types.VARCHAR }) //set the types
6 .finish();
JDBCOutputFormat
has a
batchInterval
, which you can specify on the
JDBCOutputFormatBuilder
. If, however, I specify a batch interval of 5000, I would potentially never write anything to the database, or wait a very long time until anything was written.
JDBCOutputFormat
還有一個很有用的參數,batchInterval,見名知意,就是多少資料送出一次,盡量高效率的向資料庫送出資料。當然還有比如timeout等其他參數,可以探索。
方式二 通過自定義sink送出
我們通過繼承RichSinkFunction<IN>來實作自定義sink:
1 public class RichCaseSink extends RichSinkFunction<Case> {
2
3 private static final String UPSERT_CASE = "INSERT INTO public.cases (caseid, tracehash) "
4 + "VALUES (?, ?) "
5 + "ON CONFLICT (caseid) DO UPDATE SET "
6 + " tracehash=?";
7
8 private PreparedStatement statement;
9
10
11 @Override
12 public void invoke(Case aCase) throws Exception {
13
14 statement.setString(1, aCase.getId());
15 statement.setString(2, aCase.getTraceHash());
16 statement.setString(3, aCase.getTraceHash());
17 statement.addBatch();
18 statement.executeBatch();
19 }
20
21 @Override
22 public void open(Configuration parameters) throws Exception {
23 Class.forName("com.mysql.jdbc.Driver");
24 Connection connection =
25 DriverManager.getConnection("jdbc:mysql://localhost:5432/casedb?user=signavio&password=signavio");
26
27 statement = connection.prepareStatement(UPSERT_CASE);
28 }
29
30 }
這樣,就可以在流上添加sink 了:
1 DataStream<Case> cases = ...
2 cases.addSink(new RichCaseSink());
當然,上面的實作很簡略,沒有給出批量送出或者逾時送出,這個都可以很容易的添加,比如close()中關閉連接配接。
但是上面的實作中,最大的問題還是沒有跟flink的狀态管理相結合,這個才是重頭戲。
方式二 加強版的自定義sink
在checkpoint的時候儲存資料,繼承接口
CheckpointedFunction
:
1 @Override
2 public void snapshotState(FunctionSnapshotContext context) throws Exception {
3 long checkpointId = context.getCheckpointId();
4 List<Case> cases = pendingCasesPerCheckpoint.get(checkpointId);
5 if(cases == null){
6 cases = new ArrayList<>();
7 pendingCasesPerCheckpoint.put(checkpointId, cases);
8 }
9 cases.addAll(pendingCases);
10 pendingCases.clear();
11 }
在消息到達的時候不插入資料,隻是留存資料:
1 @Override
2 public void invoke(Case aCase) throws Exception {
3 pendingCases.add(aCase);
4 }
這樣,通過繼承CheckpointListener,我們就能在某個checkpoint完成的時候插入資料:
1 @Override
2 public void notifyCheckpointComplete(long checkpointId) throws Exception {
3
4 Iterator<Map.Entry<Long, List<Case>>> pendingCheckpointsIt =
5 pendingCasesPerCheckpoint.entrySet().iterator();
6
7 while (pendingCheckpointsIt.hasNext()) {
8
9 Map.Entry<Long, List<Case>> entry = pendingCheckpointsIt.next();
10 Long pastCheckpointId = entry.getKey();
11 List<Case> pendingCases = entry.getValue();
12
13 if (pastCheckpointId <= checkpointId) {
14
15 for (Case pendingCase : pendingCases) {
16 statement.setString(1, pendingCase.getId());
17 statement.setString(2, pendingCase.getTraceHash());
18 statement.setString(3, pendingCase.getTraceHash());
19 statement.addBatch();
20 }
21 pendingCheckpointsIt.remove();
22 }
23 }
24 statement.executeBatch();
25
26 }
前提,是需要設定checkpoint,比如:
ExecutionEnvironment env = ...
env.enableCheckpointing(10000L);
這樣,每隔10s,當一個checkpoint做成功,就會插入一次資料。
當然,上面的代碼驗證可用,但不建議在生産環境使用,生産環境需要考慮更多的問題。