天天看點

activeMQ建立生産者與消費者

普通web項目,未使用maven
           
配置檔案:default.properties,放到conf下就行。
           
#----  activeMQ配置參數    ----------------------
AMQ_USER = admin
AMQ_PASSWORD = admin
#URL = ActiveMQConnection.DEFAULT_BROKER_URL;
AMQ_URL = failover://tcp://192.168.42.147:61616
AMQ_SUBJECT = ActiveMQ.Demo
#----  activeMQ配置參數    ----------------------
           
消費者:jmsReceiver
package com.cn.cncc.activeMQ;

import java.util.Enumeration;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

import com.cn.cncc.util.PropertiesUtils;

/**
 * 說明:
 * 
 * @author hjr
 * @version 建立時間:2017-3-22 下午1:22:40S
 */
public class JmsReceiver implements MessageListener {

	private Destination dest = null;
	private Connection conn = null;
	private Session session = null;
	private MessageConsumer consumer = null;

	private boolean stop = false;
	
	// 初始化
	private void initialize() throws JMSException, Exception {
		//讀取上面配置檔案中的内容
		PropertiesUtils.readProperties("default.properties");
		String USER = PropertiesUtils.getProperty("AMQ_USER");;
		String PASSWORD = PropertiesUtils.getProperty("AMQ_PASSWORD");;
		// private String URL = ActiveMQConnection.DEFAULT_BROKER_URL;
		String URL = PropertiesUtils.getProperty("AMQ_URL");// localhost
		String SUBJECT = PropertiesUtils.getProperty("AMQ_SUBJECT");
		
		// 連接配接工廠是使用者建立連接配接的對象.
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
				USER, PASSWORD, URL);
		// 連接配接工廠建立一個jms connection
		conn = connectionFactory.createConnection();
		// 是生産和消費的一個單線程上下文。會話用于建立消息的生産者,消費者和消息。
		session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); 
		//dest是客戶用來指定他生産消息的目标還有他消費消息的來源的對象.
		dest = session.createQueue(SUBJECT);
		// dest = session.createTopic(SUBJECT);
		// 會話建立消息的生産者将消息發送到目的地
		consumer = session.createConsumer(dest);
	}
	public JmsReceiver() throws Exception {  
	    initialize();  
	  }  
	/**
	 * 消費消息
	 * 
	 * @throws JMSException
	 * @throws Exception
	 */
	public void receiveMessage() throws JMSException, Exception {
		//線程啟動
		conn.start();
		consumer.setMessageListener(this);
		// 等待接收消息
		while (!stop) {
			Thread.sleep(1000);
		}

	}

	@SuppressWarnings("rawtypes")
	@Override
	public void onMessage(Message msg) {
		try {
			
			TextMessage message = (TextMessage) msg;
			System.out.println("------Received TextMessage------");
			System.out.println(message.getText());
				
			stop = true;
		} catch (JMSException e) {
			e.printStackTrace();
		} finally {
			try {
				this.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}

	// 關閉連接配接
	public void close() throws JMSException {
		System.out.println("Consumer:->Closing connection");
		if (consumer != null)
			consumer.close();
		if (session != null)
			session.close();
		if (conn != null)
			conn.close();
	}
}
           
生産者:
package com.cn.cncc.activeMQ;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 說明: activemq send message
 * 
 * @author hjr
 * @version 建立時間:2017-3-22 下午1:22:40
 */
public class JmsSender {
	//其實也可以從配置檔案讀取的
	private String USER = ActiveMQConnection.DEFAULT_USER;//admin
	private String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//admin
	//private String URL = ActiveMQConnection.DEFAULT_BROKER_URL;//localhost
	private String URL = "failover://tcp://192.168.42.147:61616";//localhost
	private String SUBJECT = "ActiveMQ.Demo";

	private Destination destination = null;
	private Connection conn = null;
	private Session session = null;
	private MessageProducer producer = null;

	// 初始化
	private void initialize() throws JMSException, Exception {
		// 連接配接工廠
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
				USER, PASSWORD, URL);
		conn = connectionFactory.createConnection();
		// 事務性會話,自動确認消息
		session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 消息的目的地(Queue/Topic)
		destination = session.createQueue(SUBJECT);
		// destination = session.createTopic(SUBJECT);
		// 消息的提供者(生産者)
		producer = session.createProducer(destination);
		// 不持久化消息
		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
	}

	public void sendMessage(String msgType) throws JMSException, Exception {
		initialize();
		// 連接配接到JMS提供者(伺服器)
		conn.start();
		for (int i = 0; i < 100; i++) {
			// 發送文本消息
			if ("text".equals(msgType)) {
				String textMsg = "ActiveMQ Text Message!";
				TextMessage msg = session.createTextMessage();
				// TextMessage msg = session.createTextMessage(textMsg);
				msg.setText(textMsg);
				producer.send(msg);
			}
		}
		
	}

	// 關閉連接配接
	public void close() throws JMSException {
		if (producer != null)
			producer.close();
		if (session != null)
			session.close();
		if (conn != null)
			conn.close();
	}

}
           
PropertiesUtils工具類:
           
package com.cn.cncc.util;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Properties;
/**
 * @name PropertiesUtils
 * @description 讀取配置檔案工具類
 * @author hjr
 * @createDate 2016-11-24
 */
public class PropertiesUtils {

	private static Properties _prop = new Properties();

    /**
     * @name readProperties
     * @description 讀取配置檔案
     * @author hjr
     * @createDate 2016-11-24
     * @param fileName
     */
    public static void readProperties(String fileName){
        try {
            InputStream in = PropertiesUtils.class.getResourceAsStream("/"+fileName);
            BufferedReader bf = new BufferedReader(new InputStreamReader(in));
            _prop.load(bf);
        }catch (IOException e){
            e.printStackTrace();
        }
    }

    /**
     * @name getProperty
     * @description 根據key讀取對應的value
     * @author hjr
     * @createDate 2016-11-24
     * @param key
     * @return String
     */
    public static String getProperty(String key){
        return _prop.getProperty(key);
    }

}
           
測試類:
package com.cn.cncc.activeMQ;

import javax.jms.JMSException;

import javax.jms.Connection;

/**
 * 說明:
 * 
 * @author hjr
 * @version 建立時間:2017-03-22 下午4:33:17
 */
public class Test {
	static Connection mqConnection;

	public static void main(String[] args) throws JMSException, Exception {
		JmsSender sender = new JmsSender();
		sender.sendMessage("map");
		sender.close();
		
		
		JmsReceiver receiver = new JmsReceiver();
		receiver.receiveMessage();
		receiver.close();
	}
}
           
有時會報NoClassDefFoundError 錯誤,可将下面兩個包放到lib檔案夾下,并buildpath.
activeMQ建立生産者與消費者
javax.jms.jar、javax.management.j2ee.jar,