抽取工具類
public class untils {
public static Channel getChannel() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.231.132");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
啟動兩個工作線程
public class work03 {
public final static String QUEUE_NAME="hello3";
public static void main(String[] args) throws Exception {
System.out.println("c2應答短....");
Channel channel = untils.getChannel();
/**
* 消費者資訊
* 1.消費哪個隊列
* 2.消費成功以後是否要自動應答,true自動應答,false手動擋
* 3.消費者未成功消費的回調内容1
* 4.消費者取消的回調
*
*/
//聲明 接收消息
DeliverCallback deliverCallback=(consumerTag, delivery)->{
System.out.println("開始休眠1s...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//1.消息标記
// 2.false 代表隻應答接收到的哪個傳遞的資訊,true為應答所有的消息包括傳遞過來的消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
System.out.println("接收到的消息"+new String(delivery.getBody()));
};
//取消 消息的回調
CancelCallback cancelCallback= consumerTag -> {
System.out.println(consumerTag+"消息消費者中斷");
};
channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);
}
}
public class work03 {
public final static String QUEUE_NAME="hello4";
public static void main(String[] args) throws Exception {
System.out.println("c2應答長....");
Channel channel = untils.getChannel();
/**
* 消費者資訊
* 1.消費哪個隊列
* 2.消費成功以後是否要自動應答,true自動應答,false手動擋
* 3.消費者未成功消費的回調内容1
* 4.消費者取消的回調
*
*/
//聲明 接收消息
DeliverCallback deliverCallback=(consumerTag, delivery)->{
System.out.println("開始休眠10s...");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//1.消息标記
// 2.false 代表隻應答接收到的哪個傳遞的資訊,true為應答所有的消息包括傳遞過來的消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
System.out.println("接收到的消息"+new String(delivery.getBody()));
};
//取消 消息的回調
CancelCallback cancelCallback= consumerTag -> {
System.out.println(consumerTag+"消息消費者中斷");
};
channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);
}
}
生産者
public class produce03 {
public static final String QUEUE_NAME="hello4";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = untils.getChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//從控制太中接受消息
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext())
{
String message=scanner.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));;
System.out.println("發送消息完成"+message);
}
}
}
結果