天天看點

flink寫入mysql的兩種方式

方式一 通過JDBCOutputFormat

在flink中沒有現成的用來寫入MySQL的sink,但是flink提供了一個類,

JDBCOutputFormat,通過這個類,如果你提供了jdbc的driver,則可以當做sink使用。

JDBCOutputFormat其實是flink的batch api,但也可以用來作為stream的api使用,社群也推薦通過這種方式來進行。

JDBCOutputFormat用起來很簡單,隻需要一個prepared statement,driver和database connection,就可以開始使用了。

1 JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
2  .setDrivername("com.mysql.jdbc.Driver")
3  .setDBUrl("jdbc:mysql://localhost:1234/test?user=xxx&password=xxx")
4  .setQuery(query)
5  .finish();      

如下的sql語句可以作為prepared statement:

String query = "INSERT INTO public.cases (caseid, tracehash) VALUES (?, ?)";      

對應的表的結構:

1 CREATE TABLE cases
2 (
3  caseid VARCHAR(255),
4  tracehash VARCHAR(255)
5 );      

但有一點要明确,JDBCOutputFormat隻能處理Row,而Row是對prepared statement的參數的一個包裝類。這意味着我們需要将流中的case轉換為row,通過map就能做的。

1 DataStream<Case> cases = ...
2 
3   DataStream<Row> rows = cases.map((MapFunction<Case, Row>) aCase -> {
4    Row row = new Row(2); // our prepared statement has 2 parameters
5    row.setField(0, aCase.getId()); //first parameter is case ID
6    row.setField(1, aCase.getTraceHash()); //second paramater is tracehash
7    return row;
8   });      

這樣,我們就能添加sink了:

1 rows.writeUsingOutputFormat(jdbcOutput);      

這樣,你就可以将資料寫入mysql了。

但是在你在流上附加了視窗之後,可能會得到下面的報錯:

1 "Unknown column type for column %s. Best effort approach to set its value: %s."      

因為視窗處理的類型,沒有明确的類型定義,如下修改之前的定義,顯式的指定類型:

1 JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
2  .setDrivername("com.mysql.jdbc.Driver")
3  .setDBUrl("jdbc:mysql://localhost:1234/test?user=xxx&password=xxx")
4  .setQuery(query)
5  .setSqlTypes(new int[] { Types.VARCHAR, Types.VARCHAR }) //set the types
6  .finish();      

JDBCOutputFormat

 has a 

batchInterval

, which you can specify on the 

JDBCOutputFormatBuilder

. If, however, I specify a batch interval of 5000, I would potentially never write anything to the database, or wait a very long time until anything was written.

JDBCOutputFormat

 還有一個很有用的參數,batchInterval,見名知意,就是多少資料送出一次,盡量高效率的向資料庫送出資料。當然還有比如timeout等其他參數,可以探索。

方式二 通過自定義sink送出

我們通過繼承RichSinkFunction<IN>來實作自定義sink:

1 public class RichCaseSink extends RichSinkFunction<Case> {
 2 
 3   private static final String UPSERT_CASE = "INSERT INTO public.cases (caseid, tracehash) "
 4       + "VALUES (?, ?) "
 5       + "ON CONFLICT (caseid) DO UPDATE SET "
 6       + "  tracehash=?";
 7 
 8   private PreparedStatement statement;
 9 
10 
11   @Override
12   public void invoke(Case aCase) throws Exception {
13 
14     statement.setString(1, aCase.getId());
15     statement.setString(2, aCase.getTraceHash());
16     statement.setString(3, aCase.getTraceHash());
17     statement.addBatch();
18     statement.executeBatch();
19   }
20 
21   @Override
22   public void open(Configuration parameters) throws Exception {
23     Class.forName("com.mysql.jdbc.Driver");
24     Connection connection =
25         DriverManager.getConnection("jdbc:mysql://localhost:5432/casedb?user=signavio&password=signavio");
26 
27     statement = connection.prepareStatement(UPSERT_CASE);
28   }
29 
30 }      

這樣,就可以在流上添加sink 了:

1 DataStream<Case> cases = ...
2 cases.addSink(new RichCaseSink());      

當然,上面的實作很簡略,沒有給出批量送出或者逾時送出,這個都可以很容易的添加,比如close()中關閉連接配接。

但是上面的實作中,最大的問題還是沒有跟flink的狀态管理相結合,這個才是重頭戲。

方式二 加強版的自定義sink

在checkpoint的時候儲存資料,繼承接口

CheckpointedFunction

 :

1 @Override
 2 public void snapshotState(FunctionSnapshotContext context) throws Exception {
 3   long checkpointId = context.getCheckpointId();
 4   List<Case> cases = pendingCasesPerCheckpoint.get(checkpointId);
 5   if(cases == null){
 6     cases = new ArrayList<>();
 7     pendingCasesPerCheckpoint.put(checkpointId, cases);
 8   }
 9   cases.addAll(pendingCases);
10   pendingCases.clear();
11 }      

在消息到達的時候不插入資料,隻是留存資料:

1 @Override
2 public void invoke(Case aCase) throws Exception {
3   pendingCases.add(aCase);
4 }      

這樣,通過繼承CheckpointListener,我們就能在某個checkpoint完成的時候插入資料:

1 @Override
 2 public void notifyCheckpointComplete(long checkpointId) throws Exception {
 3 
 4  Iterator<Map.Entry<Long, List<Case>>> pendingCheckpointsIt =
 5    pendingCasesPerCheckpoint.entrySet().iterator();
 6 
 7  while (pendingCheckpointsIt.hasNext()) {
 8 
 9   Map.Entry<Long, List<Case>> entry = pendingCheckpointsIt.next();
10   Long pastCheckpointId = entry.getKey();
11   List<Case> pendingCases = entry.getValue();
12 
13   if (pastCheckpointId <= checkpointId) {
14 
15    for (Case pendingCase : pendingCases) {
16     statement.setString(1, pendingCase.getId());
17     statement.setString(2, pendingCase.getTraceHash());
18     statement.setString(3, pendingCase.getTraceHash());
19     statement.addBatch();
20    }
21    pendingCheckpointsIt.remove();
22   }
23  }
24  statement.executeBatch();
25 
26 }      

前提,是需要設定checkpoint,比如:

ExecutionEnvironment env = ...
env.enableCheckpointing(10000L);      

這樣,每隔10s,當一個checkpoint做成功,就會插入一次資料。

當然,上面的代碼驗證可用,但不建議在生産環境使用,生産環境需要考慮更多的問題。

繼續閱讀