這篇文章通過一個最簡單的例子,讓初學者能了解RabbitMQ如何完成生産消息和消息的。
所有的程式員在學習一門新技術的時候,都是從 Hello World 進入到Colorful World的,本節也将按照慣例,從HelloWorld開始,示範RabbitMQ的Produce和Consumer的簡單使用。本RabbitMQ系列的示範代碼預設都是使用Java語言。
設定賬号
在開始HelloWorld之前,需要注意的是,RabbitMQ預設的賬号是guest / guest,這個賬号有限制,預設隻能通過本地網絡(localhost)通路,遠端通路受限制,是以在實際發送和消費消息之前,需要設定新的賬号和設定權限。具體賬号和權限的内容敬請關注後面的更新。
添加賬号
我們為HelloWorld建立一個新的使用者為root,并設定密碼為root,後續Java用戶端代碼中使用這個root賬号發送和消費消息。
[root@hidden -]# rabbitmqct1 add user root root
Creating user "root"
設定權限
在建立好賬号之後就要為這個賬号建立權限了。
[root@hidden - ]# rabbitmqct1 set_permissions -p / root ".*" ".*" ".*"
Setting permissions for user "root" in vhost "/"
設定角色
最後需要為這個賬号添加角色,這裡我們添加管理者角色
[root@hidden - ]# rabbitmqct1 set user_tags root administrator
Setting tags for user "root" to [administrator]
經過上面的步驟,root賬号就已經建立成功,也可以通過用戶端連結rabbitmq的broker,如果遇到下面的問題,說明是賬号出現的問題,參考上面的步驟設定,或者檢查賬号是否正确。
Exception in thread "main" com.rabbitmq.c1ient.AuthenticationFai1ureException :
ACCESS REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker 1ogfi1e.
添加Maven依賴
RabbitMQ的java版用戶端的maven以來如下,可以根據自己的環境選擇具體的版本即可。
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupld>com.rabbitmq</groupld>
<artifactld>amqp-client</artifactld>
<version>${rabbitmq.version}</version>
</dependency>
Producer案例
Producer就是用來向MQ發送消息,下面就是一個Producer的HelloWorld。
import com.rabbitmq.client.*;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ProducerTest {
@Test
public void helloWorld() throws IOException, TimeoutException {
// 交換器名字
String exchangeName = "helloworld_exchange";
// 路由鍵
String routingKey = "helloworld_routing_key";
// 隊列名字
String queueName = "helloworld_queue";
// 建立連接配接工廠,用來建立具體的連接配接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
// 根據上面的設定資訊,建立具體的連接配接
Connection connection = connectionFactory.newConnection();
// 在建立的連接配接上,建立一個通道
Channel channel = connection.createChannel();
// 在通道上聲明交換器
channel.exchangeDeclare(exchangeName, "direct", true, false, null);
// 在通道上聲明隊列
channel.queueDeclare(queueName, true, false, false, null);
// 聲明交換器和隊列的綁定關系
channel.queueBind(queueName, exchangeName, routingKey);
String message = "hello world";
// 往通道上發送消息,消息通過綁定鍵發送到指定的隊列,也就是上面申明的綁定關系的隊列
channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
// 關閉通過和連接配接
channel.close();
connection.close();
}
}
Consumer案例
消息發送到MQ,Consumer就可以訂閱隊列,并開始消費消息,下面就是一個Consumer的HelloWorld。
import com.rabbitmq.client.*;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class ConsumerTest {
@Test
public void helloWorld() throws IOException, TimeoutException, InterruptedException {
// 訂閱的隊列
String queueName = "helloworld_queue";
// 訂閱隊列所在的broker的位址資訊,這裡示範另一種建立連接配接的方式
Address[] addresses = {new Address("localhost", 5672)};
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("root");
factory.setPassword("root");
Connection connection = factory.newConnection(addresses);
Channel channel = connection.createChannel();
// consumer端一般需要設定的值,表示一次消費的消息數量的最大值
channel.basicQos(64);
// 建立預設的Consumer,并實作回調函數,在回調中确認消息,發送ack
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(Thread.currentThread().getName() + " -> consumer : " + consumerTag + " , receive message : " + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 消費消息,異步執行
String consumeTag = channel.basicConsume(queueName, consumer);
System.out.println(consumeTag);
// 等到消費消息完成,并傳回确認給broker
TimeUnit.SECONDS.sleep(10);
// 關閉資源
channel.close();
connection.close();
}
}
以上就是通過一個HelloWorld,了解下RabbitMQ的簡單使用,後續會不定期更新RabbitMQ的内容,感興趣的小夥伴敬請關注哦。