天天看點

RabbitMQ-從基礎到實戰(2)— 防止消息丢失

http://www.cnblogs.com/4----/p/6526033.html

轉載請注明出處0.目錄

RabbitMQ-從基礎到實戰(1)— Hello RabbitMQ

RabbitMQ-從基礎到實戰(3)— 消息的交換(上)

RabbitMQ-從基礎到實戰(4)— 消息的交換(中)

RabbitMQ-從基礎到實戰(5)— 消息的交換(下)

RabbitMQ-從基礎到實戰(6)— 與Spring內建

1.簡介

RabbitMQ中,消息丢失可以簡單的分為兩種:用戶端丢失和服務端丢失。針對這兩種消息丢失,RabbitMQ都給出了相應的解決方案。

2.防止用戶端丢失消息

RabbitMQ-從基礎到實戰(2)— 防止消息丢失

如圖,生産者P向隊列中生産消息,C1和C2消費隊列中的消息,預設情況下,RabbitMQ會平均的分發消費給C1C2(Round-robin dispatching),假設一個任務的執行時間非常長,在執行過程中,用戶端挂了(連接配接斷開),那麼,該用戶端正在處理且未完成的消息,以及配置設定給它還沒來得及執行的消息,都将丢失。因為預設情況下,RabbitMQ分發完消息後,就會從記憶體中把消息删除掉。

3.消息确認(Message acknowledgment)

為了解決上述問題,RabbitMQ引入了消息确認機制,當消息處理完成後,給Server端發送一個确認消息,來告訴服務端可以删除該消息了,如果連接配接斷開的時候,Server端沒有收到消費者發出的确認資訊,則會把消息轉發給其他保持線上的消費者。

驗證上述問題

首先,我們驗證上述問題(用戶端丢失消息)是否真的存在,對Consumer進行如下改造。

RabbitMQ-從基礎到實戰(2)— 防止消息丢失

先生産兩條消息

RabbitMQ-從基礎到實戰(2)— 防止消息丢失

啟動消費者,在消費者接收到消息,還沒處理完成的時候,強制關掉

RabbitMQ-從基礎到實戰(2)— 防止消息丢失

這時,觀察控制台,發現兩條消息都沒有了,1條是在執行中丢失的,還有1條,已經配置設定給這個Consumer,還沒來得及處理,也丢失了

RabbitMQ-從基礎到實戰(2)— 防止消息丢失

這證明了上述問題是真的存在的,如果發生在生産環境,将産生難以預料的後果

引入消息确認機制

為了友善觀察,我們用CMD來運作Consumer,要通過maven打成可執行的JAR包,需要在pom.xml中增加如下配置

RabbitMQ-從基礎到實戰(2)— 防止消息丢失
<build>
        <finalName>Consumer</finalName>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>com.liyang.ticktock.rabbitmq.App</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>assembly</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

        </plugins>
    </build>      
RabbitMQ-從基礎到實戰(2)— 防止消息丢失

上述配置描述了最終打包名字、入口類路徑、帶上依賴包、使用1.8版本的JDK進行打包,配置完後,就可以通過maven的install方法,在target目錄生成可執行的jar包,如果包大小很小,應檢查配置,是不是沒有帶上依賴包

RabbitMQ-從基礎到實戰(2)— 防止消息丢失

再次改造Consummer類

RabbitMQ-從基礎到實戰(2)— 防止消息丢失

install成可執行jar包,通過cmd開啟兩個consumer

RabbitMQ-從基礎到實戰(2)— 防止消息丢失

通過Sender發送一條消息,然後用Ctrl+C結束先收到消息的Consumer,發現另外一個Consumer接收到了未處理完的消息

RabbitMQ-從基礎到實戰(2)— 防止消息丢失

問題得到了解決,現在消費者在執行過程中死掉也不會丢失消息了

看一下發送确認的方法

RabbitMQ-從基礎到實戰(2)— 防止消息丢失
 1 /** 2      * Acknowledge one or several received 3      * messages. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk} 4      * or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method 5      * containing the received message being acknowledged. 6      * @see com.rabbitmq.client.AMQP.Basic.Ack 7      * @param deliveryTag the tag from the received 這個是RabbitMQ用來區分消息的,文檔在這 8      * @param multiple true to acknowledge all messages up to and 為true的話,确認所有消息,為false隻确認目前消息 9      * including the supplied delivery tag; false to acknowledge just10      * the supplied delivery tag.11      * @throws java.io.IOException if an error is encountered12      */13     void basicAck(long deliveryTag, boolean multiple) throws IOException;      
RabbitMQ-從基礎到實戰(2)— 防止消息丢失

在官方文檔中,這樣描述deliveryTag

RabbitMQ-從基礎到實戰(2)— 防止消息丢失

簡單來說,就是RabbitMQ内部用來區分消息的一個标簽,從envelope中擷取就行了

忘記确認将引起記憶體洩漏

RabbitMQ隻有在收到消費者确認後,才會從記憶體中删除消息,如果消費者忘了确認(更多情況是因為代碼問題沒有執行到确認的代碼),将會導緻記憶體洩漏

驗證一下

注釋掉Consumer中的确認代碼

RabbitMQ-從基礎到實戰(2)— 防止消息丢失
RabbitMQ-從基礎到實戰(2)— 防止消息丢失

運作Sender和Consumer,不停的生産消費消息,發現消費者在正常的消費消息

RabbitMQ-從基礎到實戰(2)— 防止消息丢失

檢視控制台,發現已經被吃掉了43KB的記憶體,是以,在試用過程中,一定要保證消息确認在任何情況下都可以發出,否則即使消費者處理完成,RabbitMQ也不會把消息在記憶體中清除,在該消費者斷開連接配接之後,還會把消息轉發給其他消費者重新處理,将引發難以預計的問題

RabbitMQ-從基礎到實戰(2)— 防止消息丢失
RabbitMQ-從基礎到實戰(2)— 防止消息丢失

4.消息的持久化

現在,消費者當機已經無法影響到我們的消息了,但如果RabbitMQ重新開機了,消息依然會丢失。所幸的是,RabbitMQ提供了持久化的機制,将記憶體中的消息持久化到硬碟上,即使重新開機RabbitMQ,消息也不會丢失。但是,仍然有一個非常短暫的時間視窗(RabbitMQ收到消息還沒來得及存到硬碟上)會導緻消息丢失,如果需要嚴格的控制,可以參考官方文檔

要使用RabbitMQ的消息持久化,在聲明隊列時設定一個參數即可

RabbitMQ-從基礎到實戰(2)— 防止消息丢失

注意,RabbitMQ不允許對一個已經存在的隊列用不同的參數重新聲明,對于試圖這麼做的程式,會報錯,是以,改動之前代碼之前,要在控制台中把原來的隊列删除

RabbitMQ-從基礎到實戰(2)— 防止消息丢失

重新聲明隊列後,發現Durable為true

RabbitMQ-從基礎到實戰(2)— 防止消息丢失

重新開機RabbitMQ

RabbitMQ-從基礎到實戰(2)— 防止消息丢失

隊列的消息沒有丢失

5.結束語

繼續閱讀