laitimes

AMQP protocol learning

author:Flash Gene
AMQP protocol learning

Recently read the AMQP protocol. The AMQP protocol is a protocol that cannot be bypassed in the message queue, and you can learn the message queue and your own protocol design by reading the protocol. The reading experience of the protocol is very good, the protocol itself is not overly complicated, and the specification explains the design ideas in various places.

This article is based on AMQP 0.9.1, which was published in 2008. So many of the features mentioned later were advanced at the time, but they may be taken for granted now. For example, the first version of Kafka was released in 2011, RTMP version 1.0 was released in 2012, and HTTP/2 was released in 2015.

Various concepts and components in the AMQP protocol

AMQP的全称为:Advanced Message Queuing Protocol(高级消息队列协议)

AMQP covers network protocols and server-side services

  • A set of message capability definitions known as the Advanced Message Queuing Protocol Model (AMQ Model). The model covers the components of the broker service that are used to route and store messages, as well as the rules that tie these components together.
  • A network layer protocol, AMQP. Enables client programs to communicate with servers that implement AMQ Model.

AMQP is like a language that ties things together, not a system. The goal is to make the server programmable via protocol. If you understand the design goal of AMQP, you will be able to understand the design idea of its protocol.

The AMQP protocol is a binary protocol with some modern features: multi-channel, negotiated, asynchronous, secure, portable, language-neutral, and efficient. The protocol is divided into two main layers:

AMQP protocol learning

功能层(Functional Layer):定义了一系列的命令

Transport Layer: Carries methods from the application → the server to handle multiplexing, framing, encoding, heartbeat, data-representation, and error handling.

After layering in this way, the transport layer can be replaced with another transport protocol without modifying the functional layer. Similarly, the same transport layer can be used to implement different upper-layer protocols based on this. Maybe RabbitMQ is also for similar reasons, and it is relatively easy to support MQTT, STOMP and other protocols.

The design of the AMQ Model is driven by the following requirements:

  • Ensure interoperability between standards-compliant implementations.
  • Provides a clear and straightforward way to control QoS
  • Maintain consistent and clear naming
  • Various configurations of the server can be modified through the protocol
  • Use command notation that can be easily mapped to application-level APIs
  • Clearly, each operation can only do one thing.

The AMQP transport layer is driven by the following requirements:

  • Compact. Ability to quickly seal and unpack
  • Messages of any size can be carried with no significant limitations
  • The same connection can carry multiple channels
  • Survive for a long time with no significant limitations
  • Allows asynchronous command pipelines
  • Easy to scale. It is easy to handle new or changing requirements
  • Backwards compatible
  • With a strong assertion model, it can be fixed
  • Be neutral about programming languages
  • Suitable for code generation processes

During the design process, it was desirable to be able to support different message schemas:

  • Save-last-go model. There are multiple Writers and only one Reader
  • Spread out workloads. There are multiple Writers and multiple Readers
  • Publish-subscribe model, multiple writers, and multiple readers
  • Routing based on message content, multiple writers, multiple readers
  • 队列文件传输,多个Writer,多个Reader
  • A point-to-point connection between two nodes
  • Market data distribution. Multiple data sources, multiple readers

AMQ Model

There are three main components:

  • exchange: Receives messages from the Publisher program and routes them to the Message Queue according to some rules
  • Message queue: Stores messages. Until the message is safely delivered to the consumer.
  • binding: defines the relationship between message queue and exchange, and provides rules for message routing.

AMQ Model的整体架构

AMQP protocol learning

The architecture of AMQP can be thought of as an email service:

  • An AMQP message is similar to an email message
  • A message queue is similar to a mailbox
  • The consumer is like an email client that pulls and deletes messages.
  • An exchange is like an MTA (mail server). Check the email and decide how to send the email to one or more mailboxes based on the routing information and routing table in the email.
  • The routing key is similar to the To:, Cc:, Bcc: addresses in emails. Does not contain server-side information.
  • Each switch instance is similar to each MTA process. Used to handle messages from different subdomains, or to specific types of messages.
  • Binding 类似于MTA中的路由表。

In AMQP, the producer sends messages directly to the server, which then routes them to the mailbox. Consumers pick up messages directly from their mailbox. However, in many middleware prior to AMQP, publishers sent messages directly to the corresponding mailbox (similar to storing the publish queue) or directly to the mailing list (similar to topic subscriptions).

The main difference here is that the user can control the binding rules of the message queue and exchange, rather than relying on the middleware's own code. So you can do a lot of interesting things. For example, define a rule that copies all messages containing such and such headers to the message queue. “

And this is what I think is the most important difference between AMQP and some other message queues.

life cycle

The lifecycle of the message

AMQP protocol learning
  • Messages are generated by producers. The producer puts the content into the message and sets some properties and routes for the message. The producer then sends the message to the server.
  • The server receives the message, and the exchanger (in most cases) routes the message to several message queues on that server. If the message cannot find a route, it will be discarded or returned to the producer (at the discretion of the producer).
  • A message can exist in many message queues. The server can do this by copying messages, reference counting, and so on. This does not affect interoperability. However, when a message is routed to multiple message queues, the message on each message queue is the same. There is no unique identifier that can distinguish between the various replicas.
  • The message arrives at the message queue. The message queue immediately attempts to deliver it to the consumer via AMQP. If this is not possible, the message queue stores the message (in memory or on disk as required by the producer) and waits for the consumer to be ready. If there is no consumer, the message queue can return the message to the producer via AMQP (again, if the producer requires it).
  • When a message queue can deliver a message to a consumer, it removes the message from its internal buffer. It can be deleted immediately or after the consumer has confirmed that the message has been successfully processed (ACK). It is up to the consumer to choose how and when to "acknowledge" the message. The consumer can also reject the message (negative confirmation).
  • The producer sends a message to confirm with the consumer, which is divided into a transaction. When an application plays multiple roles at the same time: send messages, send ack, commit, or roll back transactions. The delivery of a message from the server to the consumer is not transactional. It is enough for the consumer to confirm the message.

In this process, the producer can only send all messages to a single point (exchange), and cannot send messages directly to a message-queue.

The life cycle of an exchange

Each AMQP server creates its own exchange, which cannot be destroyed. The AMQP program can also create its own exchanger. AMQP does not use the create method, but uses the declare method to indicate that if it does not exist, it will be created, and if it exists, it will continue. The program can create exchangers for private use and destroy them after the task is completed. While AMQP provides a way to destroy an exchanger, in general it is not necessary for a program to destroy it.

The life cycle of a queue

There are two types of queues,

  • Persistent message queues: Shared by many consumers. When all consumers have exited, the queue still exists and continues to collect messages.
  • Adhemeral message queues: Adhemeral message queues are private and bound to consumers. When a consumer disconnects, the message queue is deleted.
AMQP protocol learning

The lifecycle of a temporary message queue

绑定(Bindings)

A binding is the relationship between an exchange and a message queue that tells the exchange how to route messages.

// 绑定命令的伪代码
Queue.Bind <queue> TO <exchange> WHERE <condition>           

A few classic use cases: shared queues, private reply queues, publish-subscribe.

Construct a shared queue

Queue.Declare queue=app.svc01 // 声明一个叫做 app.svc01 的队列

// Comsumer
Basic.Consume queue=app.svc01 // 消费者消费该队列

// Producer
Basic.Publish routing-key=app.svc01 
// 生产者发布消息。routingKey为队列名称           
AMQP protocol learning

https://www.rabbitmq.com/tutorials/tutorial-two-python.html

Construct a private reply queue

In general, the reply queue is private, temporary, named by the server, and has only one consumer. (Instead of using the example in the AMQP protocol directly, the example of RabbitMQ is used)

Queue.Declare queue=rpc_queue // 调用的队列

// Server
Basic.Consume queue=rpc_queue

// Client
Queue.Declare queue=<empty> exclusive=TRUE

S:Queue.Declare-Ok queue=amq.gen-X
... // AMQP服务端告诉队列名称
Basic.Publish queue=rpc_queue reply_to=amq_gen-X... // 客户端向服务端发送请求

// Server
handleMessage()
// 服务端处理好消息后,向消息列的reply-to字段中的队列发送响应
Basic.Publish exchange=<empty> routing-key={message.replay_to}           
AMQP protocol learning

https://www.rabbitmq.com/tutorials/tutorial-six-python.html

Construct a publish-subscribe queue

In traditional middleware, the term subscription is ambiguous. There are at least two concepts: a conditional set of matching messages, and a temporary queue to hold matching messages. AMQP splits these two parts into: binding and message queus. In AMQP, there is no entity called subscription

The publish-subscribe model for AMQP is:

  • Hold messages to one consumer (multiple consumers in some scenarios)
  • Collect messages from multiple sources, such as matching topics, message fields, or content

The key difference between a subscription queue and a named queue or reply queue is that the subscription queue name is independent of the routing purpose, and the routing is done based on abstract match criteria, not a one-to-one match of the routing key field.

// Consumer
Queue.Declare queue=<empty> exclusive=TRUE
// 这里是使用服务端下发的队列名称,并设置为独占。
// 也可以使用约定的队列名称。这样就相当于把发布-订阅模型与共享队列组合使用了

S:Queue.Declare-Ok queue=tmp.2

Queue.Bind queue=
tmp.2 
TO exchange=amq.topic WHERE routing-key=*.orange.*
Basic.Consume queue=t
mp.2


// Producer
Basic.Publish exchange=amq.topic routing-key=quick.orange.rabbit           
AMQP protocol learning

https://www.rabbitmq.com/tutorials/tutorial-five-python.html

AMQP command architecture

Middleware is highly complex, so the challenge when designing a protocol is to tame its complexity. The AMQP approach is to build a traditional API model based on classes. The class contains the method and defines what the method should do explicitly.

There are two different ways to have a conversation in AMQP:

  • Synchronous request-response. One node sends a request, and the other stage sends a response. Suitable for methods where performance is not important. When a synchronization request is sent, the node cannot send the next request until it receives a reply
  • Asynchronous notifications. A node sends data, but does not expect a reply. It is generally used where performance is important. Asynchronous requests send messages as quickly as possible, without waiting for acknowledgments. Implement features such as throttling at higher layers (e.g., the consumer layer) only when needed. There can be no acknowledgment in AMQP, either it succeeds, or you will receive an exception that closes the channel or connection. If you need to explicitly track success or failure, then transactions should be used.

Classes in AMQP

Connection class

AMQP is a persistent connection protocol. Connections are designed to be used for long periods of time and can carry multiple Channels. The lifecycle of a Connection is:

  1. The client opens a TCP/IP connection to the server and sends protocol headers. This is the only data sent by the client that cannot be parsed into a method.
  2. The server returns its protocol version and attributes (such as a list of supported security mechanisms). the Start method
  3. The client selects the security mechanism Start-Ok
  4. 服务端开始认证过程, 它使用SASL的质询-响应模型(challenge-response model)。 它向客户端发送一个质询 Secure
  5. The client sends an authentication response Secure-OK to the server. For example, if you use the plain authentication mechanism, the response will include the login and password
  6. The client repeatedly challenges Secure or goes to the negotiation step, sending a series of parameters, such as the maximum frame size of Tune
  7. The client accepts, or lowers these parameters to Tune-Ok
  8. The client officially opens the connection and selects a vhost open
  9. The server confirms that the VHost is valid Open-Ok
  10. Clients can use the connection as intended
  11. When a node intends to end the connection close
  12. The other node needs to end the handshake Close-Ok
  13. The server and client disable the socket connection.

If a node finds an error before sending or receiving Open or Open-Ok, it must simply close the socket without sending any data.

Channel类

AMQP is a multi-channel protocol. Channels provide a way to establish multiple lightweight connections over heavy TCP/IP connections. This makes the protocol more firewall-friendly because port usage is predictable. It also means that it is easy to support traffic shaping and other QoS features.

Channels are independent of each other and can perform different functions synchronously. The available bandwidth is shared between the current activity.

The expectation here is also to encourage multithreaded client programs to use a channel per thread model. However, it is also possible for a client to have multiple connections open on one or more AMQP servers.

The lifecycle of a channel is as follows:

  1. The client opens a new channel, Open
  2. The server confirms that the new channel is ready Open-Ok
  3. The client and server use the channel as expected.
  4. A node closes the channel Close
  5. The other node makes a handshake Close-Ok for channel closing

Exchange class

The Exchange class allows the application to operate the server-side exchange. This class allows the program to set its own routes, rather than through certain configurations. However, most programs don't require this level of complexity, and middleware in the past didn't just support this semantics.

The lifecycle of Exchange is:

  1. The client asks the server to ensure that Declare exists on the exchange. The client can be refined to: "Create if the switch doesn't exist" or "Warning me if the switch doesn't exist, no need to create"
  2. The client sends a message to Exchange
  3. Clients can also choose to remove Exchange Delete

Queue class

This class is used to let the program manage the message queue on the server. Almost all consumer applications are basic steps, at least verifying that the message queue used exists.

The lifecycle of a persistent message queue is very simple

  1. The client asserts that there is a Declare for this message queue (sets the passive parameter)
  2. The server confirms that declare-ok exists in the message queue
  3. Messages are read in the client's message queue

The lifecycle of a temporary message queue is more interesting:

  1. The client creates a message queue Declare (no queue name is provided, the server assigns a name). The server confirms Declare-Ok
  2. The client starts a consumer on the message queue
  3. The client cancels the consumption by displaying the cancellation or implicitly canceling the connection by closing the channel or connecting the connection
  4. When the last consumer disappears from the message queue, the server deletes the message queue after the polite timeout

AMQP implements the distribution model of topic subscriptions. This allows subscriptions to be load balanced across cooperating subscribers. The lifecycle involves an additional binding phase:

  1. The client creates a queue for Declare, and the server confirms Declare-OK
  2. The client binds the message queue to a topic exchange, and the server confirms Bind-OK
  3. The client uses the message queue as before.

Basic class

Basic implements the message functions described in this specification. The following semantics are supported:

  1. Send messages from the client → the server. Asynchronous Publish
  2. 开始或者停止消费Consume,Cancel
  3. Send messages from the server to the client. Asynchronous Deliver, Return
  4. 确认消息Ack,Reject
  5. Synchronous Read Message Get from the message queue

Transactional Category:

AMQP supports two types of transactions:

  1. Automatic transactions. Each published message and reply is processed as a separate transaction.
  2. Server-local transactions: The server caches the published messages and responses, and the client submits them as needed.

The Transaction class ("tx") gives applications access to the second type, which is a server transaction. The semantics of this class are:

  1. The application requires server-side transactions, which are selected in each channel that is needed
  2. The app does some work for Publish, Ack
  3. Application Commit or Roll-back
  4. The app works fine, and the cycle repeats.

A transaction contains a publish message and an ACK, but does not include distribution. Therefore, a rollback does not requeue or redistribute any messages. The client has the authority to acknowledge these messages in the transaction.

Feature description

The functional description of AMQP is also the functional description of RabbitMQ to a certain extent, but RabbitMQ has some extensions based on AMQP

Messages and Content

The message carries some attributes, as well as the specific content (binary data)

Messages are persistent. Persistent messages can be safely stored on the hard disk, and can be delivered even if there is a network error for verification, a server crash overflow, etc.

Messages can have priority. In the same queue, higher-priority messages are sent before lower-priority messages. When a message needs to be discarded, such as insufficient memory on the server, low-priority messages will be discarded first

The server must not modify the content of the message. However, the server may add some attributes to the message header, but it will not remove or modify the existing attributes.

Web Hosting (VHost)

A virtual host is a data partition on the server side. When used in multi-tenancy, it can be easily managed.

Web hosts have their own namespaces, exchanges, message queues, and so on. All connections can only be made with one virtual host.

交换器(Exchange)

An exchange is a message routing agent within a virtual host. The routing information (usually a routing-key) used to process the message is then sent to the message queue or internal service. Exchanges may be persistent, temporary, and automatically deleted. Exchanges can route messages to message queues in parallel. This creates multiple instances of the message.

Direct exchanger

AMQP protocol learning
  1. A message queue is bound to an exchange using RoutingKey K
  2. The producer sends a message to the exchange with a RoutingKey of R
  3. When K=R, the message is forwarded to the message queue

Fanout exchanger

AMQP protocol learning
  1. A message queue does not bind the exchanger with any parameters
  2. The producer sends a message to the exchanger
  3. The message is sent to the message queue unconditionally

Topic exchanger

  1. Message queues are bound to the exchange using routing rule P
  2. The producer uses the RoutingKey R to send messages to the exchange
  3. If R can match P, the message is sent to the message queue.

RoutingKey must consist of several words separated by dots. Each word can only contain letters and numbers. where * matches one word, # matches 0 or more words. For example, *.stock.# matches usd.stock and eur.stock.db but not stock.nasdaq

Headers exchanger

  1. Message queues are bound using the parameter table of the header. Not applicable to RoutingKey
  2. The producer sends a message to the exchange, and the header contains the specified key-value pairs
  3. If there is a match, it is passed to the message queue.

Like what:

format=json,type=log,x-match=all
format=line,type=log,x-match=any           

If x-match is all, they must all match. If x-match is any, any header can match.

System exchanger

This shouldn't be used in normal times, so I'll skip it here. If you are interested, you can directly check out section 3.1.3.5 of AMQP 0.9.1.

AMQP's transport architecture

Explains how commands map to the transport layer. When designing your own protocol, you can refer to its design ideas and the issues that need to be paid attention to in the middle.

AMQP is a binary protocol. There are different types of frame compositions. The frame carries the protocol's method and other information. All frames have the same basic structure, i.e., header, payload, and end. The payload format depends on the type of frame.

Let's assume that we are using a reliable, stream-oriented network layer (e.g., TCP/IP). There can be multiple independent control threads on a single socket connection, that is, channel channels. Different channels share a connection, and the frames on each channel are arranged in strict order, so that a state machine can be used to resolve the protocol.

The wire-level format is designed to be scalable and versatile enough to be used with any higher-layer protocol (not just AMQP). We assume that AMQP will be scaled and optimized.

It mainly involves the following parts: data type, protocol negotiation, framing method, frame details, method frame, content frame, heartbeat frame, error handling, and channel and connection closure.

data type

The data types of AMQP are used in the method frames that they have

  • Integer (1-8 bytes) representing size, number, range, etc. It's all unsigned integers
  • Bits。 It is used to express as an on/off value and is encapsulated as bytes.
  • Short strings. Used to store short text attributes. Up to 255 bytes, don't worry about buffer overflow when parsing.
  • Long string: Used to store binary data blocks
  • 字段表(Field Table),用于存放键值对

Agreement negotiation

When the client connects, the server negotiates an acceptable configuration. When the two nodes reach an agreement, the connection can continue to be used. By negotiating, we can assert assumptions and preconditions. Mainly consult on these aspects of information

  • The protocol and version implemented. The server may provide support for multiple protocols on the same port
  • Encryption parameters and validation
  • Maximum frame size, number of channels, limits on certain operations.

If an agreement is reached, both parties pre-allocate buffers based on the negotiation to avoid deadlocks. An incoming frame is considered safe if it satisfies the negotiation criteria. If it is exceeded, then the other party must disconnect.

Framing method

TCP/IP is a streaming protocol. There is no built-in framing mechanism. Existing protocols generally have the following ways to perform framing:

  • Only one frame is sent per connection. Simple, but slow.
  • Add a separator to the stream to split the frame. Simple, but slow to parse (because you need to read constantly to find the delimiter)
  • Calculate the dimensions of the frames and send the dimensions before each frame. Simple and fast. It is also an option for AMQP

Frame detail

The frame header includes: frame type, channel, and size. The end of the frame contains error detection information.

AMQP protocol learning

Steps to process a frame:

  1. Read the frame header and check the frame type and channel
  2. Depending on the frame type, the payload is read and processed
  3. End-of-frame check

When performance is important, we use read-ahead buffering or gathering reads to avoid three system calls on the frame read.

Method frame

AMQP protocol learning

Processing:

  1. 读取方法帧的payload
  2. Unpack as a structure
  3. Check if the method is allowed in the current context
  4. Check whether the parameters are valid
  5. Execution method.

The method frame is made up of AMQP data fields. The encoded code can be generated directly from the protocol specification and is very fast.

Content frames

Content is app data that is sent directly end-to-end. Content consists of a series of properties and binary data. A series of properties make up the "header of a content frame". Binary data, which can be of any size, can be split into multiple chunks to send, each with a content-body frame

Some methods (e.g. Basic.Publish, Basic.Deliver) carry content. The header of a content frame has the following structure:

AMQP protocol learning

The content-body is used as a separate frame, so that the zero-copy technique can be supported, and this part of the content does not need to be encoded. Put the content properties in their own frame so that the recipient can selectively discard the content they don't want to work on.

Closure of the channel and connection

For clients, connections and channels are considered open as long as an Open is sent. For the server, it's Open-Ok. If a node wants to close the channel and connect, a handshake must occur.

If it shuts down suddenly or unexpectedly, it cannot be detected immediately and may result in a loss of return value. So it is necessary to shake hands before closing. After one node sends a close, the other node must send a close-ok in reply. The two parties can then close the channel or connect. If a node ignores the Close operation, it can cause a deadlock when both parties send Close at the same time.

Finally, more details can be found in the official AMQP specification, as well as other features that RabbitMQ has extended on top of it.

Reference Links

  • AMQP 0.9.1 specification: https://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf
  • AMQP 0.9.1 Full Chinese translation: http://www.blogjava.net/qbna350816/archive/2016/08/12/431554.html
  • Errata for RabbitMQ for AMQP 0.9.1: https://www.rabbitmq.com/amqp-0-9-1-errata.html
  • RabbitMQ pair extension with AMQP 0.9.1: https://www.rabbitmq.com/extensions.html
  • AMQP 1.0 Final Release: http://www.amqp.org/specification/1.0/amqp-org-download

作者:0neSe7en

Source-WeChat public account: Instant technical team

Source: https://mp.weixin.qq.com/s/7HIXUSq-l1DzWEUScwPYAg

Read on