laitimes

A brief look at modern message queues and cloud storage

author:Flash Gene
A brief look at modern message queues and cloud storage

Guide

This paper describes the birth and landing effect of many competitive features such as software and hardware integration, millions of queues, and hierarchical storage in the modernization and evolution of the message system. Discuss industry-leading technologies such as Shared-Log storage and computing separation, FFM and coroutines, RDMA transmission, and columnar storage, extending messages to the field of streaming.

At the end of the 1970s, the messaging system was used to manage multi-host print jobs, and this ability to decouple peak shaving was gradually standardized into a "point-to-point model" and a slightly more complex "publish-subscribe model" to achieve distributed collaboration in data processing. With the development of the times, Kafka, Amazon SQS, RocketMQ, Azure Service Bus, Google Pub/Sub, RabbitMQ and many other products provide developers with competitive solutions in different business scenarios, and extend rich semantics and features such as sequence, transaction, scheduled message, and precise delivery, making the message system a standard component in distributed systems.

With the growth of cloud-native and microservice concepts, the message system has witnessed the strong vitality of highly dynamic and asynchronous systems such as serverless and event-driven, and the architecture and model of the message queue itself have also changed. I would like to share with you how PaaS/MaaS layer infrastructure services such as messaging systems enjoy the dividends of software and hardware development in the modernization and evolution, how to meet the growing data analysis needs in Stream Processing and Real-Time Analysis scenarios, and how to balance the contradiction between cost and machine performance. Let the message infrastructure truly embrace cloud native, combine the Shared-Log storage and computing separation that has become popular in recent years to reduce costs, and cooperate with FFM, coroutines, user-mode TCP and RDMA transmission, columnar storage and other technologies to further extend messages to the field of streaming, and become the foundation of EDA, comprehensive serverless and light computing in the future.

1. Sending and receiving messages

1.1. Sending, Fair or Greedy

From the perspective of the user or client, it seems that the message is only sent through RPC to transmit the serialized data to the message queue server and process the response. There are many details to consider in a distributed environment, such as:

1. Sending delay: The server side of the messaging system is generally deployed across availability zones to provide disaster recovery capabilities.

2. Multi-language support: Compared with consumers deployed on a large scale on the cloud, producers have a wider range of sources, from small microcontrollers, various front-end pages, to complex back-end service internal communication, and have a stronger demand for multi-language and multi-protocol support.

3. Multi-scenario support: It is necessary to adapt to multiple models, such as sequential messages for synchronous binary logs and scenarios for massive clients of the Internet of Things.

4. Send Failure: What is the client behavior due to server downtime or network failure to send? Choose whether to backpressure the data or fail fast.

A brief look at modern message queues and cloud storage

Figure: The difference between client requests being directly transmitted to the storage node broker and through a proxy

When we look back at the days when the network was not fast enough and "mobile data" was not as good as "mobile computing", we often refer to the classic direct-connected network architecture. Typical products include HDFS, Kafka, Redis, and Elasticsearch, which directly establish connections between clients and multiple server nodes, effectively reducing latency and improving real-time performance compared with the architecture where all traffic is processed by proxies first. In fact, the client has to fault tolerate various distributed problems on the server side, resulting in more complex service discovery and load balancing mechanisms, and the client also needs to know how to handle a single point of failure "gracefully", which makes it more difficult to roll the client version.

In the proxy mode, storage and computing are separated, and metadata such as request caching, shared authentication, and service discovery can also be implemented in the proxy. This division of responsibilities can also significantly simplify the configuration of high-level capabilities such as cross-region networking and remote multi-active. With Proxy, the problem of multi-language client access is also simplified, and only multiple protocols need to be resolved in the stateless proxy. As for the problem of one more communication delay, Proxy can use a private protocol with higher performance between the server's internal and back-end storage clusters, and there will be no significant difference with the development of low-serialization overhead libraries such as FlatBuffer and RDMA communication.

A brief look at modern message queues and cloud storage

Figure: TCP Congestion Control [2]

Modern message queues are constantly striving for "smarter" error handling capabilities and providing users with the opportunity to choose a strategy. From the network level, the failure of the transmission is essentially an unreachable phenomenon and can also be considered a congestion phenomenon. In general, there are two solutions: TCP-BBR's congestion control technology and Sharp Speed's multi-fold packet delivery algorithm. In some log-collecting applications, message queues will pay more attention to global throughput, choose an algorithm based on the overflow principle such as TCP-BBR, in terms of finding the most suitable BDP (bandwidth delay product), and reduce the sending speed of the client by backpressure when the server is busy, such as Kafka's batch accumulation mechanism (accumulate, parameter is linger.ms) Similar to the idea of Nagle algorithm, both are to improve performance by accumulating small packets into large packets. There are also ways to implement sliding windows and Flink's trust-based flow control at the framework layer to deal with backpressure more elegantly.

In fact, the message queue will also provide a fast failure strategy, and the user will decide the error handling behavior, whether to retry more times or take the bottom link. The timeout for the message to be sent is configured for a short timeout, and the message is resent by quickly retrying multiple times, which is a globally unfair greedy strategy. It is mostly used in message queues like RocketMQ that pay more attention to data importance and real-time (the server-side asynchronous write delay is <1ms, and the synchronous write or multi-replica latency is a few milliseconds), and when the storage layer is busy, it is better to choose "fast failure" instead of waiting in line, and it can also be said that the server wants to use higher priority to meet the needs of those online applications. 

1.2. Consumption, recording the status of various models

The consumption of messages is essentially a two-phase commit with the cooperation of the server, and an engineer with a certain distributed foundation will describe it like this: "The server has some logical or physical queues, and the client uses long polling to initiate a request to the server, and if there is a message, it will return, and the request will be suspended on the server when there is no message. After the message is processed by the business logic in the application, the consumption result is reported to the server. The mission of the server is to better and faster satisfy these waiting clients, record the current pull sites in the queue, and maintain the handle and volume of retrieved messages. For the storage layer of message queues, the data structure needs to be carefully designed to maintain this content, and to maintain high availability by propagating within the server using a consistent protocol such as Paxos/Raft, which is also known as the "state" of the server.

Let's take a look at some simple examples: for example, Kafka uses a built-in topic to record the offset submission of a group, and the replication link of multiple copies of messages is reused to ensure reliability, which is convenient for offset monitoring and backtracking. The number of subscription groups in RocketMQ is two or three orders of magnitude higher than that of Kafka, so you choose to checkpoint memory nodes on a regular basis. For a specific subscription group and a queue, only one digit is needed to describe the consumption progress, and the queue model works simply and efficiently in most scenarios, but there are some inherent flaws:

1. There are prerequisites and assumptions for consumer load balancing according to the queue dimension:

a. Unequal number of queues leads to uneven load, for example, 8 queues with 3 consumers, the optimal distribution is 3,3,2.

b. The model assumes that the capabilities of each client are equal, and the old and new models are mixed in actual production, and the computing power cannot be fully utilized.

2. Slow tasks in the queue will block the entire queue. For example, if there are 5 messages with the offset 34567 and the business logic takes a long time to consume offset = 5, the two messages in the concurrent consumption mode 67 are consumed faster, and the observed accumulation is always 3, resulting in false positives.

3. When the consumer or server is down, the service is still sensitive to the recurrence of consumption for a few seconds, affecting the user experience, such as SMS push scenarios. 

There are even more representative scenarios where we hit these "defects", such as the rendering business, where each message in the queue represents a rendering job.

1. The number of consumers is large, and the same subscription group may have hundreds or thousands of machines consuming at the same time.

2. In this scenario, the processing time of a single piece of data takes a long time, ranging from a few seconds to several hours.

3. Due to the high consumer load and the large use of spot instances, the process freeze and downtime rate of the consumer side is much higher than that of general services.

Traditional message queues generally adopt a queuing model similar to Kafka, which encounters a classic "work-stealing" problem, in which the load of the task cannot be evenly distributed to all consumers, and the blocking of a single message will affect the submission of subsequent successful message offsets. At this point, we want a delivery algorithm based on invisible time, and the rough workflow of the algorithm is as follows:

1. The client sets an invisible time, such as 5 minutes, and pulls a batch of messages to the server.

2. The server returns a batch of messages, and starts a countdown in the background for 5 minutes, and a field will be attached to the message for identification, also known as handle.

3. If the client does not submit the consumption within 5 minutes (ack by handle), the client can obtain the messages again after 5 minutes.

A brief look at modern message queues and cloud storage

Soon we will find out that this model is still flawed, if the consumer pulls the message for 1 minute and immediately goes down, the business has to endure a delay of 4 minutes before it can be processed again, even if the other consumers are still idle at this time. For example, call change invisible time every 30 seconds to update the remaining invisible time to 1 minute, and the delay time of the message will be controlled within 1 minute no matter when the client goes down.

In RocketMQ, this method of interval-based and single-message consumption is called "pop consumption", and the corresponding client implementation is SimpleConsumer, which is simple because the client no longer needs to care about complex load balancing and location management, and it is easier to adapt to multiple languages. This management capability is more complex in Pulsar and is referred to as WorkQueue mode and Range Ack site management. The more the server does, the less the client and the user have to worry about, resulting in a great deal of flexibility. The evolution of this business model has driven the change of the message cloud storage model, of course, all this also comes at a cost, for example, for the stateless consumption model such as SimpleConsumer, the average time taken to pull messages is higher than that of the commonly used PullConsumer, and the number of server client interactions will be higher.

Second, the server-side capability is improved

The abundance of these client interfaces and policies relies on the improvement of server-side technology. The increasing popularity of AIO, Zero Copy, and DirectIO has greatly simplified the complexity of building high-performance systems, and a well-designed stand-alone storage engine can typically handle 100,000 or even millions of writes per second. The user's focus has also shifted from the simple read/write throughput to the functional features of the product, after all, the throughput can be solved by horizontal expansion, and the addition of a separate set of components increases the maintenance difficulty. As a user, you always expect to reduce external dependencies as much as possible, and will not choose a product just because the product has a delicate dashboard, but may be forced to give up due to the lack of functionality of the product, such as financial-grade products that rely heavily on transmission and storage encryption capabilities.

Different from open communities, an important core competitiveness of message queues for cloud vendors is to build a "unified message kernel", adapt the access protocols of multiple products on it, and provide consistent underlying capabilities for all products to maximize the benefits of function reuse. In this case, the marginal cost of adapting a new product is decreasing, which also leads to NoSQL databases, message queues, caching components, and even log services gradually moving towards a converged ecosystem. I believe that the enrichment of storage features in modern message queues will be mainly reflected in the following aspects: massive queue support, hierarchical storage cost reduction, layered replication architecture evolution with the maturity of distributed file system to produce the form of multi-model storage, flexible adjustment of multi-replica policies, and better and faster support for streaming tasks.

2.1. Mass queues and multi-mode unification

Personally, I understand that Kafka is actually more focused on global throughput, RocketMQ is more inclined to real-time applications, while RabbitMQ carries certain business logic through the message model, and MQTT scenarios need to support a large number of devices and topics. These differentiating features and competitiveness are extensions of the two domain models of message queues, namely the topic-based publish and subscribe model and the queue-based peer-to-peer model, and the "unified message kernel" needs to be well adapted to multi-mode scenarios. 

As we all know, the community version of Kafka will build an independent LogSegment for each partition to store messages, and cooperate with policies such as disk space pre-allocation of files, which has obvious performance problems in the scenario of massive queues. In RocketMQ, messages are stored in the form of CommitLog, and in order to ensure the write performance of the foreground, all data is stored in a mixed manner. At the storage engine level, a large number of custom optimizations have to be made, such as using separate hot and cold separation services for weight calculation, and using separate cold read thread pools.

However, the index is still an independent small file, and by default, each file in the consumption queue stores 300,000 message indexes, and an index occupies 20 bytes, so that the size of each index file is 300 * 1000 * 20 / 1024 / 1024 ≈ 5.72M, and millions of queues will occupy several terabytes of disk space. In scenarios with a large number of queues, it is inevitable to store these indexes in a mixed manner and merge them into large files at the file system level. We found that LSM structures like RocksDB that support sorting can merge small files and write SST to SST in batches, which significantly improves the fragmentation of a large number of small files. Performance tests show that when RocksDB storage indexes are used instead of native file edition indexes, a single node can support millions of queues, and the index space occupation of the local disk is reduced from 200 GB to 30 GB in the scenario of 40,000 queues (including retry queues) on a single server, which increases the CPU overhead by about 10% compared with using file version cq as the index.

So what are the real advantages of LSM-like structures? As we all know, the storage engine has two structures: in-place updates and out-of-place updates. The most typical structure in in-situ updates is the B-tree and its variants, and the B-tree is a balanced multi-way search tree that inserts multiple entries in each leaf to reduce the height of the tree and improve the performance and stability of the query. Due to the continuous structure of the B-tree (the index structure is able to sort newly inserted index entries according to their final key-value, placed alongside all other pre-existing entries), data read from the B-tree is unlikely to be referenced again for a second insert in its short time in the buffer, so write requests in the B-tree cannot be grouped like LSM.

Second, the amount of data persisted by the storage engine usually far exceeds the memory, and the access rate of any mergeable operation (such as sequential access by queue) is in the direction of cold data, while the compaction mechanism in the LSM structure will have a local clustering batch effect, that is, continuous data retrieval can better cooperate with the pre-read, which is an extremely significant advantage. It is true that the compaction mechanism works very well in the database field, but due to the particularity of large value in the message queue scenario, the additional read/write amplification overhead is very high for the message system, and some KV separation features like WiscKey in the fast paper will be tried to reduce read/write amplification. The compaction mechanism can also be optimized using schemes similar to those proposed by TerarkDB and Titan, which also increases the complexity of the messaging system to implement the business characteristics of "topic granularity TTL" and "site periodic correction".

2.2. Tiered storage of conversion data assets

In recent years, how to control the ballooning infrastructure cost has been a hot topic in the community, and it is one of the important selling points to attract users to switch from the VM self-built model to commercial products in terms of whether commercial products can be converted into price competitiveness. In the past, the messaging system was based on local disks or block storage (cloud disks), and the price of cloud SSD disks was 0.8-1 yuan/GB/month, while the price of object storage was generally 0.1-0.12 yuan/GB/month, which was several times different. If the storage space usage of a single machine in production is 2 TB, the storage cost per month is 1,600 yuan, and the storage cost of object storage is 200 * 0.8 + 1,800 * 0.1 = 340 yuan per month, assuming that hot data accounts for 20% of the storage data. Why not further reduce the capacity of block storage to the extreme cost? In fact, in the scenario of tiered storage, the "marginal benefit" brought by blindly pursuing too small local disk capacity is diminishing. 

The main reasons are as follows:

  • Fault redundancy, as an important part of the infrastructure, message queue stability is above all else. Although the availability of OBS itself is high, if you encounter problems such as network fluctuations, using OBS as the primary storage is very likely to generate backpressure and cause hot data to be unable to be written, and the reading and writing of hot data will affect online production services, which will have a fatal impact on business availability.
  • Local disks that are too small have no obvious price advantage. As we all know, cloud computing focuses on inclusiveness and fairness, and the proportion of computing costs increases after the use of tiered storage, and the write traffic of hot data remains unchanged. If you choose 50 GB of block storage and need the IOPS read and write capabilities of 200 GB of ESSD-level block storage, the unit cost is almost several times that of ordinary block storage with low IOPS.
  • When uploading, you can better use the local disk to "accumulate batches" to reduce the request cost of object storage.
  • It can provide lower latency and save read costs for "warm" data when read, and cold data can also be placed in the local disk cache.

"Whoever is closest to the data is more likely to succeed" is the core point of data-driven decision-making, and message queues, as a data channel for application development, are often limited by the capacity and cost constraints of local disks, and often can only be stored for a few days. Tiered storage provides a low-cost solution to greatly extend the life cycle of messages, and can flexibly and dynamically use multiple data formats such as FlatBuffer and Parquet when turning cold to transform message queues from channels to storage pools for users' data assets.

2.3. Reduce computing costs based on distributed storage

Tiered storage can effectively solve the cost problem of cold data storage, but cannot reduce the overall cost of ownership. On the one hand, the multi-copy technology is used for hot data, on the one hand, to ensure the reliability of data and the high availability of services, and at the same time, the readability of replicas can provide a larger read bandwidth, which is suitable for the message scenario of one write and multiple reads. In some scenarios, the following problems will inevitably arise in this architecture:

  • In order to reduce client complexity, message queues often use Y-shaped writes, and the internal Raft or other log replication algorithms on the server side occupy a lot of bandwidth, resulting in a decrease in write performance and throughput. Each time the message data is updated, the primary replica needs to be updated first, replication and quorum calculation are performed through the consistency protocol, and the write latency is at least at least in the four-hop network RT (client-> primary, primary-> replication, standby response progress, and primary return success), and there will be a long tail.
  • Computing costs are wasted, and the standby server generally does not need to process a large number of read requests from clients, and the CPU usage is usually half of the mainstay. When the primary replica is able to fully satisfy read and write requests, the computing power of the standby replica is wasted. This is also difficult to optimize by using technologies such as mixing and single-process multi-containerization (such as Flink's slot mechanism and RocketMQ's broker container mechanism).
  • Slow scale-out and scale-in: When hotspots occur or emergency scale-out is required, for example, when data replication is required for Kafka Community Edition, it takes effect slowly.
  • Multiple replicas require control plane components for identity assertion, and many teams may be afraid to maintain a service like ZK and worry about all sorts of problems. The introduction of Kraft in Kafka, the synchronization of Dledger and CommitLog in RocketMQ, and the JRaft Controller also have a certain degree of complexity, which increases the O&M burden of the entire system architecture.

Multi-replica architectures face many challenges, such as the difficulty of Monotonic Reads, and the problem of ISR (In-sync Replicas) being out of sync between replicas. Luckily, we have wonderful theoretical guidance. In the PacificA paper published by Microsoft in 2008, it proposed a log-based system architecture and three solutions to achieve consistent data replication:

A brief look at modern message queues and cloud storage

图:来自微软论文 PacificA: Replication in Log-Based Distributed Storage Systems [8]

1.日志复制(Log Replication),类似于Raft中描述的Replicated State Machine(复制状态机),主服务器(Primary)和备服务器(Secondary)之间复制日志,每个节点照相同的顺序执行相同的指令。

2. Log Merge, the primary maintains the data structure, and the secondary server does not keep the data structure in memory, but only receives checkpoints and log replication. When the primary server fails, the standby server can recover by loading and replaying the logs.

3. Layered Replication, so that the consistent replication of data is directly handed over to the underlying distributed file system such as HDFS.

A brief look at modern message queues and cloud storage

Figure: According to the optimization of the PolarDB team, the network transfer between the primary node and the read-only node is reduced by 98%.

图片来自Analyze the Technical Essentials of PolarDB at the Architecture Level

Message queues are data-intensive and latency-sensitive storage applications, and in the hierarchical architecture, the write latency is fully optimized, whether it is Kafka's sendfile syscal or RocketMQ's mmap, due to some problems with the kernel mechanism, it cannot take full advantage of modern hardware. Distributed file systems are usually based on user-mode file systems such as SPDK, with a run-to-complete threading model and star 2-3 asynchronous writes to ensure data reliability and write performance far beyond cloud disks (the underlying layer of cloud disks is also based on distributed file systems) and some local old SSDs. Under the hierarchical architecture, the use of computing and memory resources is more flexible, and the computing power supports independent and refined management.

The reliability of the data and the read and write performance are solved by a "more professional team". Of course, some of the original technologies will also change, and the pagecache of the operating system can no longer be relied on for data reading, which has many beautiful solutions in the industry, such as PolarDB shared buffer pool and WrapStream's "distributed mmap", which can make good use of the memory on multiple nodes.

Modern application-layer storage engines practice the concept of "Log is Streaming", which completely offloads the complexity of storage to a lower level of distributed storage. Because human resources are always limited at any given time, "subtraction" should be done in a timely manner. With each technology introduction comes additional complexity, avoiding the escalation of maintenance costs caused by over-engineering. Taking Flink as an example, which is extremely feature-rich, the complexity of the self-operation and maintenance community edition scares off many small and medium-sized users, and keeping the dependence on the storage engine low will allow the product to have a wider developer base and achieve sustainable development. 

2.4. Self-Closed-Loop Streaming Capabilities

For many years, message queues such as RocketMQ have been used in core links such as transactions and supply chain fulfillment, and a large amount of high-value business data is flowing. In order to propose some solutions for the community to make "data assets" generate value through computing, such as light computing solutions such as KStream and KsqlDB, I am also familiar with platforms with more powerful features such as Spark and Flink, after I participated in contributing to the improvement of the community based on the new version of Flink-Connector-RocketMQ in FLIP 27/191. I realized that modern message queues are growing and evolving towards a "message, event, stream" platform.

A brief look at modern message queues and cloud storage

The streaming processing scenario is complex because it needs to strike a balance between performance, accuracy, and cost.

  • Repeatable reads, i.e., the replayability of the data, cannot produce "read flapping" so that in the event of a failure, it can be properly recovered from the state of the last successfully processed, such as a snapshot. This means that message consumers (commonly referred to as sources in computing frameworks) must be able to accurately locate and replay messages that read a specific site in the event of an outage or network jitter. The message storage system must ensure that the data progress observed by downstream consumers does not exceed the latest state of the cluster's confirmed submissions, that is, the high water mark of the majority under the quorum principle, to ensure the idempotency of the consumption process.
  • Partitioning strategy and time watermarking, a common requirement in engineering is to keep the number of partitions in the data source stable, so as to avoid load imbalance caused by partition changes. In fact, this model has many implicit semantics, such as the perfect high-availability mechanism on the server side, the absence of hotspots and data skew in each partition, the peer-to-peer capabilities of the nodes where the downstream consumers are located, and the support for periodic time watermarks in message queues, etc. In the mode of not aware of the number of partitions, services such as Google's Pub/Sub need to maintain a large number of handle states on the compute side in order to achieve end-to-end "exactly-once" computing. In the DSL mechanism, the user needs to provide corresponding implementations for accumulations and retractions, and know how to correct the results, for example, join to generate a new table, then the correction will be propagated backwards from the current subtopology, which also causes a high end-to-end delay.
  • I/O overhead of data forwarding. Even in the modern 100-gigabit network environment, repeated read-write message queue operations such as broadcast, aggregation, and data repartitioning in the stream processing framework have long I/O time-consuming. The practice of maintaining distributed transactions such as TwoPhaseCommit in Flink Sink is actually very complex, bringing "read-write amplification" to message queues.

By maintaining the computing framework and message queue storage separately, we gain the flexibility to decouple the computing logic from the data storage, but also increase the difficulty and complexity of system maintenance. In the future, the storage layer of message queues will also integrate some lightweight computing power on top of its core functions, such as data transformation, rolling aggregation, and support for concepts and operations such as windows. If the storage layer introduces the concept of schema and can retrieve the corresponding part of the data according to specific needs, it can further reduce read and write operations. 

3. Frontier exploration of the industry

In addition to messaging products such as RocketMQ and Kafka, which have a large user base, we also pay attention to challengers in the market segment, who are targeting industry pain points or digging deep into performance to create differentiated competitiveness, such as WrapStream and Redpanda.

3.1. Remove the local disk completely

Recovering from a Kafka node downtime involves data replication, which is a complex and time-consuming process. The reason is that Kafka strongly relies on local disks, and even if the community proposes and implements KIPs such as tiered storage, hot data needs to be retained on multiple local disks for at least 12-24 hours, which is costly. WarpStream is compatible with Kafka, completely removes its dependency on EBS, and builds Kafka directly on Object Storage S3. Architecturally, it is divided into Proxy agent-like agents and Meta metadata management services that support millions of TPS. The core process is as follows:

1. Send a message, and the agent will write the data of different topics to the object storage store in batches, and let the Meta service sequence after success.

a. Due to the need to accumulate batches, save call fees, and the high latency of object storage itself, the latency of write requests is very high, reaching 400ms.

b. For multi-zone scenarios, write load balancing and stream switching are implemented through hack client IDs without destroying the native Kafka protocol.

c. Support for idempotent messages is to be adjudicated by Meta services via offset and ignore requests that are not successfully returned. This design is consistent with the fast rolling design of Pangea's seal and new chunk and the various hierarchical storage append models. 

2. Receiving messages, officially known as distributed mmap, multiple agents form a consistent hash to ring, and reads to the same partition are aggregated to the same agent to improve the cache hit rate. At the same time, compation is executed in the background to improve the playback speed and solve the TTL problem. This design is completely stateless, and it is difficult to scale components horizontally, making it relatively simple to implement multi-tenant and large clusters. The flaw is that the operation will rely too much on Meta services. 

In addition, the official website also mentions some low-latency improvements, such as for single-AZ versions of object storage such as Express One Zone:

  • Reduce upload buffer and timeout period for performance.
  • It supports writing multiple buckets to implement a technology similar to 2-3 writes, and uses the quorum principle to quickly ACK send.

3.2. Refactoring in Native Language

Redpanda has chosen to take full advantage of modern hardware features, with low latency and low cloud costs in native languages as its selling points, especially for the improvement of sending long tails. Redpanda's nodes rely on improved Raft (including an optimistic approach to Raft and parallel commits), and cold data relies on object storage. There are many challenges to Kafka, and most vendors choose to use LogSegment as a slice to reduce the compatibility problems caused by the evolution of the Kafka computing layer protocol. Redpanda, on the other hand, chooses bottom-up refactoring, allowing a single fixed thread to perform all operations on a single partition, including network polling, asynchronous IO, fetching events and scheduling computing tasks, etc., this threading model is called thread-per-core, also known as run-to-complete. The Actor model is a powerful concurrency model that reduces the critical zone, replaces the multi-threading of mutexes under the reactor, expects all operations to be processed within 500 us, and the deterministic latency of developing with C++ effectively reduces the long tail of JVM-based applications, resulting in predictable P99 latency.

In the second route, the leader of each partition is responsible for uploading and reusing the Raft link to replicate metadata.

  • The uploaded component scheduler_service and archival_metadata_stm maintain a PID Controller-like fair scheduler algorithm that calculates the total amount of data that needs to be uploaded to Object Storage and dynamically updates the priority. For partitions with a large backlog, the priority is higher. If the upload backlog is small, the upload process is given a lower priority to reduce the interference of background traffic to foreground reads and writes.
  • When data is retrieved, remote_partition and cache_service are responsible for downloading data from OBS and caching it, calculating the relative offset of the corresponding hybrid log segment based on the partition and offset of the consumption request, and then prefetching and caching it to reduce the number of calls to OBS, reduce the average RT, and also support some nearby read policies to reduce cross-AZ traffic to reduce costs.
  • Some of the difficulties and performance improvement designs, such as Seastar (Using Boost.Beast) developed an http client to improve the performance of object storage access. The management of data uploads and caching needs to be fair, and the management of state within each partition is complex.

As for the advantages and disadvantages of the Native language, we have always expected the server to be able to "Write Once, Run EveryWhere", but in fact, because the infrastructure wants to make full use of the capabilities of modern hardware, we have to embed some JNI or do some instruction set optimizations to improve the performance of hot functions. Redpanda specifically mentions the support for Arm, because the project is completely developed in C++, and some switches need to be turned on for dynamic compilation of dependency libraries, which is not as complicated as imagined for cross-platform. The final conclusion is that Arm can bring about 20% cost reduction compared to x86, which is close to the benefit of letting RocketMQ try to port from x86 to Arm.

3.3. Introduction and experimentation of new technologies

In addition to expanding scenarios, modern message queues are also trying emerging technologies and software and hardware integration to further improve performance and reduce costs. For example:

  • Communication layer: The compute layer proxy and storage nodes use TCP to communicate, but TCP communication has a certain delay and bandwidth constraints when deployed in high-density container mode. The TCP protocol is designed for WAN and cannot well support data center scenarios, so we try to change this link to RDMA communication. RDMA technology allows applications to access remote host memory directly, without involving the network software stack. Data can be sent to or received directly from the buffer without being replicated to the network layer. You can directly transfer data in the user mode without the need to switch between the kernel mode and the user mode, and access the memory of the remote host without consuming any CPU in the remote host. In addition, many processing operations of the network protocol stack are offloaded by hardware, which ultimately reduces the end-to-end delay of network data transmission and ensures persistent storage, high throughput, and high real-time performance. According to the actual test results, it can reduce the CPU by about 8%. Of course, for Java applications that are not deployed in the cpu set mode, this will result in more long tail requests.
  • Compute layer: Message queues themselves are also trying to introduce JDK17's coroutine technology to improve the maintainability of code caused by a large number of asynchronous operations. Many traditional optimizations are still not in full swing, such as the well-known reduction of buffer copies based on reference counts, the analysis of hot spots and targeted JNI optimizations, or refactoring through the Native language. For example, under the different architectures of x86 and Arm, there is a big difference in the performance of SpinLock, and these small functions can be optimized and introduced into the platform-related dynamic link library, which can achieve a large performance improvement at a lower cost. For example, for some repetitive operations, you can use the FFM and SIMD technology in JDK21 to optimize the CPU overhead.
  • Storage layer: In the message queue scenario, there is a very large data correlation between the payload of messages on the same topic, and the typical compression ratio can reach 10:1. As messages expand into the realm of streaming, we also try to introduce memory-friendly storage formats such as FlatBuffer and Parquet to improve query performance.

bibliography

1.John K. Ousterhout, et al. "Cloud Programming Simplified: A Berkeley View on Serverless Computing." arXiv preprint arXiv:1902.03383v1 (2019). https://arxiv.org/abs/1902.03383v1

2.TCP Slow-Start and Congestion Avoidance, https://commons.wikimedia.org/w/index.php?title=File:TCP_Slow-Start_and_Congestion_Avoidance.svg

3.Asterios Katsifodimos, et al. "Consistency and Completeness: Rethinking Distributed Stream Processing in Apache Kafka." In Proceedings of the ACM SIGMOD International Conference on Management of Data (SIGMOD '21). Association for Computing Machinery, 2021.

4.Flink, Apache. "Stateful computations over data streams." Accessed: Apr 23 (2021): 2021.

5.State management in Apache Flink®: consistent stateful distributed stream processing. https://dl.acm.org/doi/10.14778/3137765.3137777

6.Michael Armbrust, et al. "Delta Lake: High-performance ACID Table Storage over Cloud Object Stores." Databricks White Paper, Aug. 2020. https://www.databricks.com/wp-content/uploads/2020/08/p975-armbrust.pdf

7.Wei Cao, Zhenjun Liu, Peng Wang, Sen Chen, Caifeng Zhu, Song Zheng, Yuhui Wang, and Guoqing Ma. 2018. PolarFS: an ultra-low latency and failure resilient distributed file system for shared storage cloud database. Proc. VLDB Endow. 11, 12 (August 2018), 1849–1862. https://doi.org/10.14778/3229863.3229872

8.PacificA: Replication in Log-Based Distributed Storage Systems, https://www.microsoft.com/en-us/research/publication/pacifica-replication-in-log-based-distributed-storage-systems/

9.Heidi Howard and Richard Mortier. 2020. Paxos vs Raft: have we reached consensus on distributed consensus? In Proceedings of the 7th Workshop on Principles and Practice of Consistency for Distributed Data (PaPoC '20). Association for Computing Machinery, New York, NY, USA, Article 8, 1–9. https://doi.org/10.1145/3380787.3393681

10.Abhishek Verma, etc. 2015. Large-scale cluster management at Google with Borg. In Proceedings of the Tenth European Conference on Computer Systems (EuroSys '15). Association for Computing Machinery, New York, NY, USA, Article 18, 1–17. https://doi.org/10.1145/2741948.2741964

11.Peter A. Alsberg and John D. Day. 1976. A principle for resilient sharing of distributed resources. In Proceedings of the 2nd international conference on Software engineering (ICSE '76). IEEE Computer Society Press, Washington, DC, USA, 562–570. https://dl.acm.org/doi/10.5555/800253.807732

12.Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung. 2003. The Google file system. SIGOPS Oper. Syst. Rev. 37, 5 (December 2003), 29–43. https://doi.org/10.1145/1165389.945450

13.K. Mani Chandy and Leslie Lamport. 1985. Distributed snapshots: determining global states of distributed systems. ACM Trans. Comput. Syst. 3, 1 (Feb. 1985), 63–75. https://doi.org/10.1145/214451.214456

Author: Xieyang

Source-WeChat public account: Alibaba Cloud Developer

Source: https://mp.weixin.qq.com/s/M02YiF9altrsODORb9gZuw

Read on