RabbitMQ的maven 依賴
com.rabbitmq
amqp-client
3.5.6
log4j
log4j
1.2.17
import com.rabbitmq.client.*;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class RabbitConsumer {
private static final Logger LOG = Logger.getLogger(RabbitConsumer.class);
private static final String QUEUE_NAME = "QUEUE_NAME";
private static final String IP_ADDRESS = "10.1.19.2";
private static final String USER_NAME = "admin";
private static final String PASSWORD = "admin";
private static final int PORT = 5672;
public static void consume() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
factory.setVirtualHost("/");
factory.setUsername(USER_NAME);
factory.setPassword(PASSWORD);
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel) {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(50);
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
// 注意編碼要和發送段的編碼一緻性,否則惠出現亂碼,這裡被坑了,存入資料庫亂碼
String data = new String(body, Charset.defaultCharset());
fixedThreadPool.execute(new Thread() {
@Override
public void run() {
System.out.println(data);
//TODO 處理接收到的消息 data
}
});
// 消息确認
try {
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
}
}
};
//參數:隊列名 是否自動應答 消費者
channel.basicConsume(QUEUE_NAME, false, consumer);
}
public static void main(String[] args) throws Exception {
consume();
}
}