天天看点

rabbitmq创建简单的生产和消费者

参考《rabbitmq实战指南》

1、首先项目中引入rabbit-client jar包

<dependency>
		<groupId>com.rabbitmq</groupId>
		<artifactId>amqp-client</artifactId>
		<version>4.4.2</version>
	</dependency>
           

2、创建连接rabbitmq服务器的连接

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitConnection {
    
    private final static String HOST = "192.168.10.59";
    
    private final static String VIRTUALHOST = "/";
    
    private final static int PORT = 5672;
    
    private final static String USERNAME = "it";
    
    private final static String PASSWORD = "its123";
    
    
    /**
     * 获取connection
     * */
    public static Connection createConnection() {
	Connection conn = null;
	//创建connection工厂
	ConnectionFactory factory = new ConnectionFactory();
	factory.setHost(HOST);
	factory.setPort(PORT);
	factory.setVirtualHost(VIRTUALHOST);
	factory.setUsername(USERNAME);
	factory.setPassword(PASSWORD);
	
	try {
	    conn = factory.newConnection();
	} catch (IOException e) {
	    e.printStackTrace();
	} catch (TimeoutException e) {
	    e.printStackTrace();
	}
	return conn;
    }

}
           

3、创建生产者,需要先执行下,然后在启动队列,去绑定这个exchange:

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * <p>
 * 
  *生产消息
 *
 * </p>
 * @author	hz16092620 
 * @date	2018年9月14日 下午3:20:57
 * @version      
 */
public class RabbitProducter {
    
    public static void main(String[] args) {
	createExchange();
    }
    
    /**
     * 交换器发送数据
     * */
    public static void createExchange() {
	Connection conn = RabbitConnection.createConnection();
	Channel channel = null;
	try {
	    channel = conn.createChannel();
	    channel.exchangeDeclare("liuhp_exchange", "direct", true);//true表示持久化的
	    //channel.queueBind("liuhp_quene", "liuhp_exchange", "rabbit_test_routing_key");
	    //发送消息
	    for (int i = 0; i < 10; i++) {
		channel.basicPublish("liuhp_exchange", "rabbit_test_routing_key", null, String.valueOf(new Random().nextInt(100)).getBytes());
	    }
	} catch (IOException e) {
	    e.printStackTrace();
	} finally {
	    try {
		channel.close();
	    } catch (IOException e) {
		e.printStackTrace();
	    } catch (TimeoutException e) {
		e.printStackTrace();
	    }
	    try {
		conn.close();
	    } catch (IOException e) {
		e.printStackTrace();
	    }
	}
    }
    
    
}
           
rabbitmq创建简单的生产和消费者

4、创建消费者 ,rabbitmq服务器先有liuhp_exchange:

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

/**
 * <p>
 * 
 *
 *
 * </p>
 * @author	hz16092620 
 * @date	2018年9月14日 下午3:22:20
 * @version      
 */
public class RabbitConsumer {

    
    public static void main(String[] args) {
	consumerMessage();
    }
    
    /**
     * 消费消息
     * */
    public static void consumerMessage() {
	Connection conn = RabbitConnection.createConnection();
	try {
	    String queneName = "liuhp_quene";
	    final Channel channel = conn.createChannel();
	    channel.queueDeclare(queneName, false, true, false, null);
	    channel.queueBind(queneName, "liuhp_exchange", "rabbit_test_routing_key");
	    
	    //消费消息,推模式
	    channel.basicQos(100);
	    channel.basicConsume(queneName, true, new DefaultConsumer(channel) {

		@Override
		public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
		    /*System.out.println(consumerTag);
		    System.out.println(envelope.getDeliveryTag());
		    System.out.println(envelope.getExchange());
		    System.out.println(envelope.getRoutingKey());*/
		    StringBuilder sb = new StringBuilder();
		    for (byte b : body) {
			sb.append((char) b);
		    }
		    System.out.println(sb.toString());
		    //channel.basicAck(envelope.getDeliveryTag(), false);//这个可以确认是否处理消息
		}
		
	    });
	    /*try {
		channel.close();
	    } catch (IOException e) {
		e.printStackTrace();
	    } catch (TimeoutException e) {
		e.printStackTrace();
	    }
	    try {
		conn.close();
	    } catch (IOException e) {
		e.printStackTrace();
	    }*/
	} catch (IOException e) {
	    e.printStackTrace();
	} finally {
	    
	}
    }
}
           
rabbitmq创建简单的生产和消费者
rabbitmq创建简单的生产和消费者

connetion和channel先不关闭,让其有时间消费消息 ,或者加个Thread.sleep(30000L)。