laitimes

Use JuiceFS to initiate acceleration for the Flink container

Author | Hu Mengyu, Zhihu big data architecture development engineer

Edit | Linda

Because of its reliability and ease of use, Flink has become one of the most popular stream processing frameworks today, dominating the field of stream computing. As early as 18 years ago, Zhihu introduced Flink, and now, Flink has become one of the most important components inside Zhihu, accumulating more than 4,000 Flink real-time tasks and processing petabytes of data every day.

Flink is deployed in a variety of ways, classified according to the resource scheduler, which can be roughly divided into standalone, Flink on YARN, Flink on Kubernetes and so on. At present, the deployment method for internal use is native Kubernetes officially provided by Flink. When it comes to Kubernetes, we have to talk about container mirroring, because the dependencies of Flink tasks are various, and how to mirror Flink is also a headache.

Flink mirroring and dependency handling

Flink tasks can be roughly divided into two categories, the first is the Flink SQL task, the Flink SQL task dependencies are roughly the following:

1. Official connector JAR package, such as flink-hive-connector, flink-jdbc-connector, flink-kafka-connector, etc.; 2. Unofficial or internally implemented connector JAR package; 3. User's UDF JAR package, some complex calculation logic, the user may implement the UDF by himself.

The second type of Flink task is Flink's jar package task, in addition to the above three dependencies, it also depends on the Flink jar package written by the user.

Obviously, for each Flink task, its dependencies are not the same, and we cannot mirror each Flink task individually, we are currently dealing with it as follows:

1. Classify dependencies into stable dependencies and non-stable dependencies; 2. Stable dependencies include components (such as Flink, JDK, etc.) and official connector packages, which are very stable and will only be changed in the case of Flink version upgrades and bug fixes, so we will punch such dependencies into the image when building the image; 3. Unstable dependencies include third-party connectors and the user's own JAR package. Because the third-party connector is not officially maintained by Flink, the probability of a problem needing to be fixed is relatively larger; the user's own JAR package is different for each task, and the user will often change and resubmit. For such unstable dependencies, we inject them dynamically by storing them in the distributed file system and downloading them into the container using a pre command when the container starts.

After the above processing, the Flink image has a certain ability to dynamically load dependencies, and the startup process of Flink Job is roughly as follows:

Use JuiceFS to initiate acceleration for the Flink container

File system selection

HDFS stores dependent pain points

Storing Flink-dependent file systems has always been the HDFS of choice before, but we encountered the following pain points during the use of it:

1. NameNode in the peak period of the task pressure is too large, the container in the download dependencies to request file metadata to NameNode will be stuck, some small batch tasks, the task itself may only need to run for more than ten seconds, but because the NameNode pressure is too large, resulting in download dependencies may take a few minutes; 2. The current Flink cluster We are multi-data center deployment, but HDFS only has an offline computer room large cluster, so there will be a cross-data center to pull files, 3. There are some special Flink tasks that do not rely on HDFS at all, in other words, it neither uses checkpoint nor reads and writes HDFS, but because the dependencies of Flink containers are stored on HDFS, such tasks are still inseparable from HDFS.

Use object storage for pain points

Later we replaced HDFS with object storage, solving some of the pain points of HDFS, but soon we found a new problem - the slow single-threaded download of object storage. There are generally the following options for object storage download acceleration:

1. Use multi-threaded download for segmented download, but the pre command of the container is actually only suitable for performing some relatively simple shell commands, if the segmented download is used, it is necessary to make a relatively large transformation of this block, which is a relatively large pain point; 2. Add a proxy layer to the object storage to do caching, accelerate things by the proxy, and the client can still read it single-threaded. The disadvantage of this approach is that it is necessary to maintain an additional proxy component of the object store, and the stability of the component also needs to be guaranteed.

Try JuiceFS

Coincidentally, the company is doing JuiceFS POC internally, there is a ready-made object storage proxy layer available, we conducted a series of tests on it, found that JuiceFS fully meets the needs of our scenario, let us be more surprised by the following points:

1. JuiceFS comes with S3 gateway perfectly compatible with S3 object storage protocol, which allows us to quickly go online without any changes, and S3 gateway itself is stateless, and the scaling capacity is very convenient; JuiceFS comes with cache acceleration function, after testing, after using JuiceFS proxy object storage, single-threaded reading of files is 4 times faster than the original; 4. JuiceFS provides a way to mount the local file system, and later you can try to mount directly into the container directory by relying on it; JuiceFS can choose to separate the deployment of metadata and storage, storage we choose the original object storage, cloud vendors to ensure the availability of 11 9; metadata we choose the distributed KV system - TiKV, the reason for choosing TiKV is that our colleagues in the online architecture group have rich experience in the development and operation of TiKV, SLA can be greatly guaranteed. In this way, the usability and scalability of JuiceFS is very strong.

JuiceFS goes live

The go-live process for JuiceFS is divided into the following phases:

1. Data migration, we need to synchronize the data originally stored on HDFS and object storage to JuiceFS, because JuiceFS provides tools for data synchronization, and Flink's dependencies are not particularly large, so we quickly completed this part of the work; 2. Modify the address of the Flink image pull dependency, because JuiceFS is compatible with the object storage protocol, we only need to modify the original object storage endpoint on the platform side The address of the JuiceFS S3 gateway is sufficient.

After JuiceFS went live, the flowchart of our Flink task launch was roughly as follows:

Use JuiceFS to initiate acceleration for the Flink container

Compared to using HDFS, we can get a predictable container startup time, and the speed of container download dependencies is not affected by the peak period of business, and the speed of container download dependencies is about 4 times faster than that of native object storage.

Showcase

It took less than half a month from the beginning of the investigation of JuiceFS to the launch of JuiceFS, mainly because the juiceFS documentation is very complete, so that we have taken a lot of detours, followed by the JuiceFS community partners also have questions and answers, so our online process is very smooth.

The benefits brought to us by the initial attempt at JuiceFS are still relatively obvious, and we will consider applying JuiceFS to the data lake scene and the scene loaded by the algorithm model to make the use of our data more flexible and efficient.

About the Author:

Hu Mengyu, Zhihu big data architecture development engineer, is mainly responsible for the secondary development of Zhihu internal big data components and the construction of data platform.

Read on