天天看點

java rabbitmq消費者_RabbitMQ 消費者Consumer

RabbitMQ的maven 依賴

com.rabbitmq

amqp-client

3.5.6

log4j

log4j

1.2.17

import com.rabbitmq.client.*;

import org.apache.log4j.Logger;

import java.io.IOException;

import java.nio.charset.Charset;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

public class RabbitConsumer {

private static final Logger LOG = Logger.getLogger(RabbitConsumer.class);

private static final String QUEUE_NAME = "QUEUE_NAME";

private static final String IP_ADDRESS = "10.1.19.2";

private static final String USER_NAME = "admin";

private static final String PASSWORD = "admin";

private static final int PORT = 5672;

public static void consume() throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost(IP_ADDRESS);

factory.setPort(PORT);

factory.setVirtualHost("/");

factory.setUsername(USER_NAME);

factory.setPassword(PASSWORD);

Connection connection = factory.newConnection();

final Channel channel = connection.createChannel();

Consumer consumer = new DefaultConsumer(channel) {

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(50);

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {

// 注意編碼要和發送段的編碼一緻性,否則惠出現亂碼,這裡被坑了,存入資料庫亂碼

String data = new String(body, Charset.defaultCharset());

fixedThreadPool.execute(new Thread() {

@Override

public void run() {

System.out.println(data);

//TODO 處理接收到的消息 data

}

});

// 消息确認

try {

channel.basicAck(envelope.getDeliveryTag(), false);

} catch (IOException e) {

e.printStackTrace();

}

}

};

//參數:隊列名 是否自動應答 消費者

channel.basicConsume(QUEUE_NAME, false, consumer);

}

public static void main(String[] args) throws Exception {

consume();

}

}