参考《rabbitmq实战指南》
1、首先项目中引入rabbit-client jar包
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.4.2</version>
</dependency>
2、创建连接rabbitmq服务器的连接
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitConnection {
private final static String HOST = "192.168.10.59";
private final static String VIRTUALHOST = "/";
private final static int PORT = 5672;
private final static String USERNAME = "it";
private final static String PASSWORD = "its123";
/**
* 获取connection
* */
public static Connection createConnection() {
Connection conn = null;
//创建connection工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setVirtualHost(VIRTUALHOST);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
try {
conn = factory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return conn;
}
}
3、创建生产者,需要先执行下,然后在启动队列,去绑定这个exchange:
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* <p>
*
*生产消息
*
* </p>
* @author hz16092620
* @date 2018年9月14日 下午3:20:57
* @version
*/
public class RabbitProducter {
public static void main(String[] args) {
createExchange();
}
/**
* 交换器发送数据
* */
public static void createExchange() {
Connection conn = RabbitConnection.createConnection();
Channel channel = null;
try {
channel = conn.createChannel();
channel.exchangeDeclare("liuhp_exchange", "direct", true);//true表示持久化的
//channel.queueBind("liuhp_quene", "liuhp_exchange", "rabbit_test_routing_key");
//发送消息
for (int i = 0; i < 10; i++) {
channel.basicPublish("liuhp_exchange", "rabbit_test_routing_key", null, String.valueOf(new Random().nextInt(100)).getBytes());
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
try {
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
4、创建消费者 ,rabbitmq服务器先有liuhp_exchange:
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
/**
* <p>
*
*
*
* </p>
* @author hz16092620
* @date 2018年9月14日 下午3:22:20
* @version
*/
public class RabbitConsumer {
public static void main(String[] args) {
consumerMessage();
}
/**
* 消费消息
* */
public static void consumerMessage() {
Connection conn = RabbitConnection.createConnection();
try {
String queneName = "liuhp_quene";
final Channel channel = conn.createChannel();
channel.queueDeclare(queneName, false, true, false, null);
channel.queueBind(queneName, "liuhp_exchange", "rabbit_test_routing_key");
//消费消息,推模式
channel.basicQos(100);
channel.basicConsume(queneName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
/*System.out.println(consumerTag);
System.out.println(envelope.getDeliveryTag());
System.out.println(envelope.getExchange());
System.out.println(envelope.getRoutingKey());*/
StringBuilder sb = new StringBuilder();
for (byte b : body) {
sb.append((char) b);
}
System.out.println(sb.toString());
//channel.basicAck(envelope.getDeliveryTag(), false);//这个可以确认是否处理消息
}
});
/*try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
try {
conn.close();
} catch (IOException e) {
e.printStackTrace();
}*/
} catch (IOException e) {
e.printStackTrace();
} finally {
}
}
}
connetion和channel先不关闭,让其有时间消费消息 ,或者加个Thread.sleep(30000L)。