天天看點

RabbitMQ簡單使用

作者:fly2future

這篇文章通過一個最簡單的例子,讓初學者能了解RabbitMQ如何完成生産消息和消息的。

所有的程式員在學習一門新技術的時候,都是從 Hello World 進入到Colorful World的,本節也将按照慣例,從HelloWorld開始,示範RabbitMQ的Produce和Consumer的簡單使用。本RabbitMQ系列的示範代碼預設都是使用Java語言。

設定賬号

在開始HelloWorld之前,需要注意的是,RabbitMQ預設的賬号是guest / guest,這個賬号有限制,預設隻能通過本地網絡(localhost)通路,遠端通路受限制,是以在實際發送和消費消息之前,需要設定新的賬号和設定權限。具體賬号和權限的内容敬請關注後面的更新。

添加賬号

我們為HelloWorld建立一個新的使用者為root,并設定密碼為root,後續Java用戶端代碼中使用這個root賬号發送和消費消息。

[root@hidden -]# rabbitmqct1 add user root root
Creating user "root"           

設定權限

在建立好賬号之後就要為這個賬号建立權限了。

[root@hidden - ]# rabbitmqct1 set_permissions -p / root ".*" ".*" ".*"
Setting permissions for user "root" in vhost "/"           

設定角色

最後需要為這個賬号添加角色,這裡我們添加管理者角色

[root@hidden - ]# rabbitmqct1 set user_tags root administrator
Setting tags for user "root" to [administrator]           

經過上面的步驟,root賬号就已經建立成功,也可以通過用戶端連結rabbitmq的broker,如果遇到下面的問題,說明是賬号出現的問題,參考上面的步驟設定,或者檢查賬号是否正确。

Exception in thread "main" com.rabbitmq.c1ient.AuthenticationFai1ureException :
ACCESS REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker 1ogfi1e.           

添加Maven依賴

RabbitMQ的java版用戶端的maven以來如下,可以根據自己的環境選擇具體的版本即可。

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupld>com.rabbitmq</groupld>
<artifactld>amqp-client</artifactld>
<version>${rabbitmq.version}</version>
</dependency>           

Producer案例

Producer就是用來向MQ發送消息,下面就是一個Producer的HelloWorld。

import com.rabbitmq.client.*;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ProducerTest {

  @Test
  public void helloWorld() throws IOException, TimeoutException {
    // 交換器名字
    String exchangeName = "helloworld_exchange";
    // 路由鍵
    String routingKey = "helloworld_routing_key";
    // 隊列名字
    String queueName = "helloworld_queue";

    // 建立連接配接工廠,用來建立具體的連接配接
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("localhost");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("root");
    connectionFactory.setPassword("root");
    // 根據上面的設定資訊,建立具體的連接配接
    Connection connection = connectionFactory.newConnection();
    // 在建立的連接配接上,建立一個通道
    Channel channel = connection.createChannel();

    // 在通道上聲明交換器
    channel.exchangeDeclare(exchangeName, "direct", true, false, null);
    // 在通道上聲明隊列
    channel.queueDeclare(queueName, true, false, false, null);
    // 聲明交換器和隊列的綁定關系
    channel.queueBind(queueName, exchangeName, routingKey);

    String message = "hello world";
    // 往通道上發送消息,消息通過綁定鍵發送到指定的隊列,也就是上面申明的綁定關系的隊列
    channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

    // 關閉通過和連接配接
    channel.close();
    connection.close();

  }

}           

Consumer案例

消息發送到MQ,Consumer就可以訂閱隊列,并開始消費消息,下面就是一個Consumer的HelloWorld。

import com.rabbitmq.client.*;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {

  @Test
  public void helloWorld() throws IOException, TimeoutException, InterruptedException {
    // 訂閱的隊列
    String queueName = "helloworld_queue";
    // 訂閱隊列所在的broker的位址資訊,這裡示範另一種建立連接配接的方式
    Address[] addresses = {new Address("localhost", 5672)};

    ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername("root");
    factory.setPassword("root");
    Connection connection = factory.newConnection(addresses);

    Channel channel = connection.createChannel();
    // consumer端一般需要設定的值,表示一次消費的消息數量的最大值
    channel.basicQos(64);

    // 建立預設的Consumer,并實作回調函數,在回調中确認消息,發送ack
    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println(Thread.currentThread().getName() + " -> consumer : " + consumerTag + " , receive message : " + new String(body));

        channel.basicAck(envelope.getDeliveryTag(), false);
      }
    };

    // 消費消息,異步執行
    String consumeTag = channel.basicConsume(queueName, consumer);
    System.out.println(consumeTag);

    // 等到消費消息完成,并傳回确認給broker
    TimeUnit.SECONDS.sleep(10);

    // 關閉資源
    channel.close();
    connection.close();
  }
}           

以上就是通過一個HelloWorld,了解下RabbitMQ的簡單使用,後續會不定期更新RabbitMQ的内容,感興趣的小夥伴敬請關注哦。