天天看點

RabbitMQ生産與消費代碼實作

maven

<dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.5.4</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.3.5.RELEASE</version>
        </dependency>
    </dependencies>
           

生産者

public class Producer {
	@SuppressWarnings("resource")
	public static void main(String[] args) {
		new AnnotationConfigApplicationContext(ProducerConfiguration.class);
	}
}
           
package product;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;  
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.amqp.support.converter.SerializerMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.beans.factory.config.BeanPostProcessor;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.scheduling.annotation.Scheduled;  
import org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor;  
  
import com.rabbitmq.client.AMQP;
  
@Configuration  
public class ProducerConfiguration {  
  
    // 指定隊列名稱 routingkey的名稱預設為Queue的名稱,使用Exchange類型為DirectExchange  
	protected final String VirtualHost = "testhost";
	protected final String EXCHANGE = "amq.direct";
    protected final String ROUTEKING = "routingKeyB"; 
    protected final String helloWorldQueueName = "testB";
    private Map<String, Message> map = new ConcurrentHashMap<>();
	
	public void add(String id, Message message) {
        map.put(id,  message);
    }

    public void del(String id) {
        map.remove(id);
    }
    // 建立連結  
    @Bean  
    public ConnectionFactory connectionFactory() {  
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1");  
        connectionFactory.setUsername("guest");  
        connectionFactory.setPassword("guest");  
        connectionFactory.setPort(AMQP.PROTOCOL.PORT);
        connectionFactory.setVirtualHost(VirtualHost);
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        //connectionFactory.setConnectionListeners(listeners);
        return connectionFactory;  
    }  
  
    // 建立rabbitTemplate 消息模闆類  
    @Bean  
    @Scope("prototype")
    public RabbitTemplate rabbitTemplate() {  
        RabbitTemplate template = new RabbitTemplate(connectionFactory());  
        template.setExchange(EXCHANGE);
        template.setRoutingKey(ROUTEKING); 
        template.setMessageConverter(new SerializerMessageConverter());
        template.setMandatory(true);
        return template;  
    }  
    
    // 建立rabbitAdmin 代理類 
    @Bean  
    public AmqpAdmin amqpAdmin() {  
        return new RabbitAdmin(connectionFactory());  
    } 
    
    //建立一個排程  
    @Bean  
    public ScheduledProducer scheduledProducer() {  
        return new ScheduledProducer();  
    }  
  
    @Bean  
    public BeanPostProcessor postProcessor() {  
        return new ScheduledAnnotationBeanPostProcessor();  
    }  
  
    class ScheduledProducer {  
  
    	@Autowired  
        private RabbitTemplate rabbitTemplate;
  
        //自增整數  
        private final AtomicInteger counter = new AtomicInteger();  
        /** 
         * 每3秒發送一條消息 
         *  
         * Spring3中加強了注解的使用,其中計劃任務也得到了增強,
            現在建立一個計劃任務隻需要兩步就完成了: 
			建立一個Java類,添加一個無參無傳回值的方法,在方法上用@Scheduled注解修飾一下; 
			在Spring配置檔案中添加三個<task:**** />節點; 
			參考:http://zywang.iteye.com/blog/949123 
         */  
        @Scheduled(fixedRate = 3000)  
        public void sendMessage() {  
        	rabbitTemplate = rabbitTemplate();
            
        	//Exchange出錯由此方法監聽
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){
    			@Override
    			public void confirm(CorrelationData correlationData, boolean ack) {
    				if (!ack) {
    		             System.out.println("生産者發送mq消息确認失敗"+correlationData.getId());
    		         } else {
    		             //處理丢失的消息(nack)
    		            System.out.println("生産者發送mq消息确認成功");
    		            del(correlationData.getId());
    		         }
    			}
    		});
            
            //路由鍵routingKey出錯由此方法監聽,
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback(){

    			@Override
    			public void returnedMessage(Message message, int replyCode, String replyText, String exchange,
    					String routingKey) {
    				System.out.println("returnedMessage收到回調,成功發送到broker");
    				System.out.println("confirm收到回調,message:" + message.toString());
    				System.out.println("confirm收到回調,replyCode:" + replyCode);
    				System.out.println("confirm收到回調,replyText:" + replyText);
    				System.out.println("confirm收到回調,exchange:" + exchange);
    				System.out.println("confirm收到回調,routingKey:" + routingKey);
    			}
    		});
            String uuid = UUID.randomUUID().toString();
            rabbitTemplate.correlationConvertAndSend("Producer  " + counter.getAndIncrement(),new CorrelationData(uuid)); 
            add(uuid,getMessage("Producer  " + counter,"發送"));
            System.out.println("生産者發送成功 " + counter);
            System.out.println(map.toString());
        }
        
        public Message getMessage(String messageBody, String massageId) {
    		MessageProperties messageproperties = new MessageProperties();
    		messageproperties.setMessageId(massageId);
    		Message message = new Message(messageBody.getBytes(), messageproperties);
    		
    		return message;
    	}
    } 
}  
           

消費者

public class Consumer {  
    @SuppressWarnings("resource")
	public static void main(String[] args) {  
        new AnnotationConfigApplicationContext(ConsumerConfiguration.class);  
    }  
}
           
@Configuration  
public class ConsumerConfiguration {  
  
    // 指定隊列名稱 routingkey的名稱預設為Queue的名稱,使用Exchange類型為DirectExchange  
	protected static final String VirtualHost = "testhost";
	protected static final String EXCHANGE = "amq.direct";
    protected static final String ROUTEKING = "routingKey"; 
    protected static final String springQueueDemo = "testB"; 
  
    // 建立連結  
    @Bean
    public ConnectionFactory connectionFactory() {  
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(  
                "127.0.0.1");  
        connectionFactory.setUsername("guest");  
        connectionFactory.setPassword("guest");  
        connectionFactory.setPort(AMQP.PROTOCOL.PORT);
        connectionFactory.setVirtualHost(ConsumerConfiguration.VirtualHost);
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        return connectionFactory;  
    }  
  
    // 建立rabbitAdmin 代理類  
    @Bean  
    public AmqpAdmin amqpAdmin() {  
        return new RabbitAdmin(connectionFactory());  
    }  
  
    // 建立rabbitTemplate 消息模闆類  
    @Bean  
    public RabbitTemplate rabbitTemplate() {  
        RabbitTemplate template = new RabbitTemplate(connectionFactory());  
        template.setExchange(ConsumerConfiguration.EXCHANGE);
   
        return template;  
    }  
  
    @Bean  
    public SimpleMessageListenerContainer listenerContainer() {  
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();  
        container.setConnectionFactory(connectionFactory());  
        container.setQueueNames("testB");  // 監聽的隊列
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);     // NONE 代表自動确認;MANUAL 代表手動确認
        container.setMessageListener(new MessageListenerAdapter(  
                new ConsumerMsgHandler()));  
        return container;  
    }
    
}  
           
public class ConsumerMsgHandler extends MessageListenerAdapter{
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
    	System.out.println(new String(message.getBody()));
        if(message.getMessageProperties().getHeaders().get("error") == null){
        	//consumerConfiguration.sendMq(message);
        	// basicAck方法表示确認成功傳回Rabbit伺服器
        	channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            System.out.println("消費者消息已經确認");
        }else {
        	/*multiple:為了減少網絡流量,手動确認可以被批處理,當該參數為 true 時,
        	則可以一次性确認 delivery_tag 小于等于傳入值的所有消息*/
        	// basicNack方法表示确認失敗傳回,當它的參數requeue為true時,表示Rabbit伺服器會重發傳入目前方法的資料;
        	// 當它的參數requeue為false時,表示Rabbit伺服器會丢棄傳入目前方法的資料,不在重發
            // channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
            // basicReject方法表示确認失敗傳回,直接丢棄傳入目前方法的資料,不在重發
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
            System.out.println("消費者消息拒絕");
        }
    } 
}
           

參考連結:https://www.jianshu.com/p/2c5eebfd0e95