laitimes

Flink Keyed State的优化与实践

author:Flash Gene

1. Background

When the traffic of the left and right flows is large and the wait time for join is 1 hour, the storage size of the Flink Keyed State (Flink State is divided into Operator State and Keyed State, and all states in the following section represent the latter) can easily reach the terabyte level (RocksDBStateBackend is used internally by default).

When the storage of the State reaches the terabyte level, we will find that the RT of the scan/next/readNull request of the State will become high, and the dual-stream Join not only has a large traffic, but also joins There are also a large number of query fields, resulting in a large length of the State's Value, which causes obvious periodic glitches in the CPU during peak traffic periods, which is caused by RocksDB compactions. In the following content, we mainly follow up the read and write behavior of RocksDB from the business scenario to optimize the problem of high time consumption of RT, and use the optimization solution to alleviate the pressure of compaction.

2.Flink Keyed State诊断

We counted the internal terabyte-level large state tasks, and all the states were generated by the dual-stream join operator. There are two types of dual-stream joins in Flink: one is a regular join without time interval limit, in which the state keys of the left and right streams store a single RowData in both the inner and outer joins. The other type is Interval Join, which is limited by time interval and Latency Join, which is based on the delay ejection data implemented by Interval Join, and the key of the left and right flow is small and the value is large in the large state task, as shown in the following figure, the write KV length monitoring of the LatencyJoin of the large state task.

Flink Keyed State的优化与实践

2.1 Dual-stream join feature

The second type of dual-stream join is used in both internal and state storage sizes, and is most used in the two business lines of internal business and AI.

In this type of dual-stream Join Operator, Flink uses two States to cache data from the left and right streams respectively, and the structure is MapState<long, list>The Key stores the millisecond timestamp, and the Value stores all the RowData records in the same On expression column at the current millisecond time, and a RowData is the collection of all fields that need to be included in the Project after the current stream is joined. The length of a RowData is determined by the sum of the lengths of all Project fields.

Flink Keyed State的优化与实践

The following diagram shows the write process of RocksDB [2], where a pair of KVs write from a client to RocksDB, they write to the WAL (WAL is disabled in Flink), and then to the memory table, and when the memory table is full (threshold write_buffer_size=64MB) or flush (Flink) is triggered When the number of files in L0 reaches the threshold (level0_file_num_compaction_trigger=4), a compaction operation is triggered to merge all data in L0 and L1 and write to L1, and when the storage size of L1 reaches the threshold (max_bytes_ for_level_base=256MB), the compaction will be triggered to merge the L1 file and several files of L2 and write them to L2, and when the storage size of LN+1 is 10 (max_bytes_for_level_multiplier) times that of LN+1, the compaction will be triggered to merge data to LN+1.

Flink Keyed State的优化与实践

After you add state.backend.rocksdb.log.level=DEBUG_LEVEL to the task, you will find that if a terabyte of dual-stream Join a large State task, the SST level of RocksDB will become [2,4,41,98,0,0,0] This is also due to the two characteristics of dual-stream joins: one is that the traffic is particularly large and the number of KVs that need to be recorded is huge, and the other is that the value mentioned above is long, resulting in a large overall storage consumption, and the hierarchical distribution of RocksDB causes more data to be stored in L3, and the overall time consumption of compaction becomes higher due to the increase in more layers.

2.2 RocksDB读RT高

The following diagram shows the read process of RocksDB [2], in which the client first queries the data from the Memory Table, and if it is not found, the data is read at the L0 layer of the disk, and all files in the L0 layer are read sequentially because the data in the L0 layer is not globally ordered. If no data is queried in the L0 layer, only one SST file is queried in each L1 to LN layer until the data is queried.

Flink Keyed State的优化与实践

As mentioned above, the key of a dual-stream join is a timestamp, the value is stored in the list of data sets with the current timestamp, and the data of the left and right streams is entered into the join Over time, there will be a large number of Get requests to break through RocksDB, which we mark as ReadNull The number of files in the L3 layer is relatively large, and all ReadNull requests will be accessed from L0 to L3, and the overall read time is relatively high.

In addition, the state of this kind of join is the MapState<long, list> structure, and there will be two places to call the Map's iterator. One is the RowData collection that joins the current state to stream data, and the other is the timer state that is triggered by the Timer State to clean up the data outside the time window of the current state, otherwise the state will become larger and larger. The iterator of the Map in the State calls the seek and next of RocksDB to query data, and the seek and next operations will traverse all the data that meets the currentKey, and will also read the SST file of the lowest level of RocksDB.

Through monitoring, we found that the read RT of seek, next, and readnull in the big state takes 99 lines, reaching the double-digit millisecond level, which is an undesirable result for real-time stream computing.

Flink Keyed State的优化与实践

3.State optimization

Whether it takes more time to read RT or compaction causes CPU glitches, the value in the state is too large, resulting in more layers of SST, and scanning too many SST files will reduce the read performance. A State record, the process from L0 to LN, will not only be read and written in the cross-layer, but also in the same layer, due to other data compaction to the current layer, it will also be read and written many times, in the high-level compaction, it will be found that the sum of disk read and write IO will reach 400MB/ If the value is not moved in the compaction, then the IO and CPU glitches can be greatly reduced, and the BlobDB [3] scheme of the RocksDB community was found in the research.

When a pair of KVs are flushed, if the value length is greater than the threshold, the value will be written to the blob file, and only the index of the value in the blob will be recorded in the SST file.

Flink Keyed State的优化与实践

The highest version of RocksDB in the Flink community[5] After communicating with the internal distributed storage team, we decided to use version 7.8.3 of Flink to build Flink's RocksDB to use the BlobDB feature, so we merged Flink's original CompactionFilter and other implementations into the 7.8.3 release.

After adapting and enabling KV separation in large states, the file size of SST drops sharply in RocksDB logs, and all state keys are clustered in the L0 and L1 layers. Due to the reduction of the SST level, the ReadNull request access to the L1 layer ends, while seek and next requests only need to access the corresponding blob file once when there is data, and if there is no data, the IO read operation on the blob can also be omitted. The final effect is that the time taken by ReadNull has been reduced to about 100 subtleties, and the RT 99 line of scan and next has also been reduced to about 1 millisecond.

Flink Keyed State的优化与实践

In addition, we found that the CPU glitch of the task of enabling KV separation was weakened, and the overall CPU usage was reduced by 50%. Although the level of SST has been reduced, the GC of blob files still exists, and the GC will do the migration of blob file data, integrate new files and delete old blob files, and its work will consume disk IO and CPU resources, while the GC of blob cannot be closed in reality GC causes the storage layer files to grow, not to decrease. Even though compaction still exists, we still enjoy the optimization brought by upgrading the RocksDB version, and the frequency of high glitches is much less than that of the old version.

Flink Keyed State的优化与实践

4. Summary and outlook

In addition to the KV separation of large states, some tasks of small states will have a periodic glitch phenomenon of 20 minutes, we refer to the InnerCompaction [6] patch of the distributed storage team, the KV length is very small, and the storage size of the L1 layer is close to max_bytes_for_level_ base, the four L0-layer SSTs generated by the four checkpoints will cause CPU glitches, and the optimization principle of InnerCompaction is that the small files in the L0 layer are compacted in advance on the same layer to avoid frequent merging with L1 data and prevent I/O amplification.

In the second half of last year, after the KV separation feature of Flink State was introduced internally to users, the CPU usage of all large state tasks on the line was reduced by 20-50%. In the future, the following aspects may continue to be optimized.

a) Upgrade the RocksDB instance of the earlier version of Alink.

b) The snapshots executed by Flink checkpoint to the RocksDB layer are relatively light and not strongly associated with compaction, so you can consider reducing the rate of compaction to further weaken the CPU glitch.

c) At present, the internal KV separation also needs to be enabled by a parameter control, and it is expected to be enabled globally by default in the later stage, and the threshold of the value length of KV separation is also expected to be adaptive, without user awareness.

Reference:

[1]https://mp.weixin.qq.com/s/E23JO7YvzJrocbOIGO5X-Q

[2]https://blog.csdn.net/microGP/article/details/120416193

[3]https://github.com/facebook/rocksdb/wiki/BlobDB

[4]https://www.usenix.org/system/files/conference/fast16/fast16-papers-lu.pdf

[5]https://github.com/ververica/frocksdb

[6] https://github.com/bilibili/rocksdb

Author: Zhang Chenyi & Zhang Yang

Source: WeChat public account: Bilibili Technology

Source: https://mp.weixin.qq.com/s/H78o74GfXsON8Js7wLqoIg