天天看点

RabbitMQ生产与消费代码实现

maven

<dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.5.4</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.3.5.RELEASE</version>
        </dependency>
    </dependencies>
           

生产者

public class Producer {
	@SuppressWarnings("resource")
	public static void main(String[] args) {
		new AnnotationConfigApplicationContext(ProducerConfiguration.class);
	}
}
           
package product;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;  
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.amqp.support.converter.SerializerMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.beans.factory.config.BeanPostProcessor;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.scheduling.annotation.Scheduled;  
import org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor;  
  
import com.rabbitmq.client.AMQP;
  
@Configuration  
public class ProducerConfiguration {  
  
    // 指定队列名称 routingkey的名称默认为Queue的名称,使用Exchange类型为DirectExchange  
	protected final String VirtualHost = "testhost";
	protected final String EXCHANGE = "amq.direct";
    protected final String ROUTEKING = "routingKeyB"; 
    protected final String helloWorldQueueName = "testB";
    private Map<String, Message> map = new ConcurrentHashMap<>();
	
	public void add(String id, Message message) {
        map.put(id,  message);
    }

    public void del(String id) {
        map.remove(id);
    }
    // 创建链接  
    @Bean  
    public ConnectionFactory connectionFactory() {  
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1");  
        connectionFactory.setUsername("guest");  
        connectionFactory.setPassword("guest");  
        connectionFactory.setPort(AMQP.PROTOCOL.PORT);
        connectionFactory.setVirtualHost(VirtualHost);
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        //connectionFactory.setConnectionListeners(listeners);
        return connectionFactory;  
    }  
  
    // 创建rabbitTemplate 消息模板类  
    @Bean  
    @Scope("prototype")
    public RabbitTemplate rabbitTemplate() {  
        RabbitTemplate template = new RabbitTemplate(connectionFactory());  
        template.setExchange(EXCHANGE);
        template.setRoutingKey(ROUTEKING); 
        template.setMessageConverter(new SerializerMessageConverter());
        template.setMandatory(true);
        return template;  
    }  
    
    // 创建rabbitAdmin 代理类 
    @Bean  
    public AmqpAdmin amqpAdmin() {  
        return new RabbitAdmin(connectionFactory());  
    } 
    
    //创建一个调度  
    @Bean  
    public ScheduledProducer scheduledProducer() {  
        return new ScheduledProducer();  
    }  
  
    @Bean  
    public BeanPostProcessor postProcessor() {  
        return new ScheduledAnnotationBeanPostProcessor();  
    }  
  
    class ScheduledProducer {  
  
    	@Autowired  
        private RabbitTemplate rabbitTemplate;
  
        //自增整数  
        private final AtomicInteger counter = new AtomicInteger();  
        /** 
         * 每3秒发送一条消息 
         *  
         * Spring3中加强了注解的使用,其中计划任务也得到了增强,
            现在创建一个计划任务只需要两步就完成了: 
			创建一个Java类,添加一个无参无返回值的方法,在方法上用@Scheduled注解修饰一下; 
			在Spring配置文件中添加三个<task:**** />节点; 
			参考:http://zywang.iteye.com/blog/949123 
         */  
        @Scheduled(fixedRate = 3000)  
        public void sendMessage() {  
        	rabbitTemplate = rabbitTemplate();
            
        	//Exchange出错由此方法监听
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){
    			@Override
    			public void confirm(CorrelationData correlationData, boolean ack) {
    				if (!ack) {
    		             System.out.println("生产者发送mq消息确认失败"+correlationData.getId());
    		         } else {
    		             //处理丢失的消息(nack)
    		            System.out.println("生产者发送mq消息确认成功");
    		            del(correlationData.getId());
    		         }
    			}
    		});
            
            //路由键routingKey出错由此方法监听,
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback(){

    			@Override
    			public void returnedMessage(Message message, int replyCode, String replyText, String exchange,
    					String routingKey) {
    				System.out.println("returnedMessage收到回调,成功发送到broker");
    				System.out.println("confirm收到回调,message:" + message.toString());
    				System.out.println("confirm收到回调,replyCode:" + replyCode);
    				System.out.println("confirm收到回调,replyText:" + replyText);
    				System.out.println("confirm收到回调,exchange:" + exchange);
    				System.out.println("confirm收到回调,routingKey:" + routingKey);
    			}
    		});
            String uuid = UUID.randomUUID().toString();
            rabbitTemplate.correlationConvertAndSend("Producer  " + counter.getAndIncrement(),new CorrelationData(uuid)); 
            add(uuid,getMessage("Producer  " + counter,"发送"));
            System.out.println("生产者发送成功 " + counter);
            System.out.println(map.toString());
        }
        
        public Message getMessage(String messageBody, String massageId) {
    		MessageProperties messageproperties = new MessageProperties();
    		messageproperties.setMessageId(massageId);
    		Message message = new Message(messageBody.getBytes(), messageproperties);
    		
    		return message;
    	}
    } 
}  
           

消费者

public class Consumer {  
    @SuppressWarnings("resource")
	public static void main(String[] args) {  
        new AnnotationConfigApplicationContext(ConsumerConfiguration.class);  
    }  
}
           
@Configuration  
public class ConsumerConfiguration {  
  
    // 指定队列名称 routingkey的名称默认为Queue的名称,使用Exchange类型为DirectExchange  
	protected static final String VirtualHost = "testhost";
	protected static final String EXCHANGE = "amq.direct";
    protected static final String ROUTEKING = "routingKey"; 
    protected static final String springQueueDemo = "testB"; 
  
    // 创建链接  
    @Bean
    public ConnectionFactory connectionFactory() {  
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(  
                "127.0.0.1");  
        connectionFactory.setUsername("guest");  
        connectionFactory.setPassword("guest");  
        connectionFactory.setPort(AMQP.PROTOCOL.PORT);
        connectionFactory.setVirtualHost(ConsumerConfiguration.VirtualHost);
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        return connectionFactory;  
    }  
  
    // 创建rabbitAdmin 代理类  
    @Bean  
    public AmqpAdmin amqpAdmin() {  
        return new RabbitAdmin(connectionFactory());  
    }  
  
    // 创建rabbitTemplate 消息模板类  
    @Bean  
    public RabbitTemplate rabbitTemplate() {  
        RabbitTemplate template = new RabbitTemplate(connectionFactory());  
        template.setExchange(ConsumerConfiguration.EXCHANGE);
   
        return template;  
    }  
  
    @Bean  
    public SimpleMessageListenerContainer listenerContainer() {  
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();  
        container.setConnectionFactory(connectionFactory());  
        container.setQueueNames("testB");  // 监听的队列
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);     // NONE 代表自动确认;MANUAL 代表手动确认
        container.setMessageListener(new MessageListenerAdapter(  
                new ConsumerMsgHandler()));  
        return container;  
    }
    
}  
           
public class ConsumerMsgHandler extends MessageListenerAdapter{
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
    	System.out.println(new String(message.getBody()));
        if(message.getMessageProperties().getHeaders().get("error") == null){
        	//consumerConfiguration.sendMq(message);
        	// basicAck方法表示确认成功返回Rabbit服务器
        	channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            System.out.println("消费者消息已经确认");
        }else {
        	/*multiple:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,
        	则可以一次性确认 delivery_tag 小于等于传入值的所有消息*/
        	// basicNack方法表示确认失败返回,当它的参数requeue为true时,表示Rabbit服务器会重发传入当前方法的数据;
        	// 当它的参数requeue为false时,表示Rabbit服务器会丢弃传入当前方法的数据,不在重发
            // channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
            // basicReject方法表示确认失败返回,直接丢弃传入当前方法的数据,不在重发
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
            System.out.println("消费者消息拒绝");
        }
    } 
}
           

参考链接:https://www.jianshu.com/p/2c5eebfd0e95