天天看點

ActiveMQ點對點消息傳遞ActiveMQ

上篇文章中詳細介紹了ActiveMQ。本文繼續介紹ActiveMQ的具體操作

ActiveMQ

處理對象消息

1.定義消息載體對象

/**
 * Order Bean
 * 定義消息載體類型. 即要在ActiveMQ中傳遞的資料實體類型.
 * 消息載體對象必須實作接口java.io.Serializable, 因為消息需要在網絡中傳遞,要求必須可序列化
 * @author dengp
 *
 */
public class Order implements Serializable{

    private static final long serialVersionUID = 1L;
    
    private String id;
    private String nick;
    private Long price;
    private Date createTime;
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getNick() {
        return nick;
    }
    public void setNick(String nick) {
        this.nick = nick;
    }
    public Long getPrice() {
        return price;
    }
    public void setPrice(Long price) {
        this.price = price;
    }
    public Date getCreateTime() {
        return createTime;
    }
    public void setCreateTime(Date createTime) {
        this.createTime = createTime;
    }
    public static long getSerialversionuid() {
        return serialVersionUID;
    }
    @Override
    public String toString() {
        return "Order [id=" + id + ", nick=" + nick + ", price=" + price + ", createTime=" + createTime + "]";
    }
}      

2.定義消息生産者

/**
 * ActiveMQ中的生産者(Producer)
 * @author dengp
 *
 */
public class OrderProducer {

    public void sendhello2ActiveMq(Order messageObject) {
        ConnectionFactory factory = null;
        Connection conn = null;
        Session session = null;
        Destination destination = null;
        MessageProducer producer = null;
        Message message = null;
        try {
            
            factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.88.121:61616");
            // 建立連結對象
            conn = factory.createConnection();
            // 啟動連接配接對象
            conn.start();
            
            session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 建立目的地,目的地的命名既是隊列的指令
            destination = session.createQueue("MQ-Hello-Object");
            producer = session.createProducer(destination);
            // 建立消息對象. 此消息是對象消息, 其中儲存資料為對象.
            message = session.createObjectMessage(messageObject);
            // 發送消息
            producer.send(message);
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("通路ActiveMQ服務發生錯誤!!");
        } finally {
            try {
                // 回收消息發送者資源
                if (null != producer)
                    producer.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
            try {
                // 回收會話資源
                if (null != session)
                    session.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
            try {
                // 回收連結資源
                if (null != conn)
                    conn.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}      

3.定義消息消費者

/**
 * ActiveMQ中的消費者(Consumer)
 * @author dengp
 *
 */
public class OrderConsumer {

    public void reciveOrderFormActiveMq() {
        ConnectionFactory factory = null;
        Connection conn = null;
        Session session = null;
        Destination destination = null;
        MessageConsumer consumer = null;
        Message message = null;
        try {
            factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.88.121:61616");
            conn = factory.createConnection();
            conn.start();
            session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue("MQ-Hello-Object");
            // 建立消息消費者, 建立的消息消費者與某目的地對應, 即方法參數目的地.
            consumer = session.createConsumer(destination);
            // 從ActiveMQ中擷取消息
            message = consumer.receive();
            Object obj = ((ObjectMessage)message).getObject();
            System.out.println("ActiveMQ擷取的消息是:"+obj);
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("通路ActiveMQ服務發生錯誤!!");
        } finally {
            try {
                // 回收消息發送者資源
                if (null != consumer)
                    consumer.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
            try {
                // 回收會話資源
                if (null != session)
                    session.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
            try {
                // 回收連結資源
                if (null != conn)
                    conn.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}      

4.測試

生産者測試

public static void main(String[] args) {
    OrderProducer pro = new OrderProducer();
    Order order = new Order();
    order.setId("100");
    order.setNick("波波烤鴨");
    order.setPrice(9999l);
    order.setCreateTime(new Date());
    pro.sendhello2ActiveMq(order);
}      
ActiveMQ點對點消息傳遞ActiveMQ

消費者測試

擷取到了相關的資訊

ActiveMQ點對點消息傳遞ActiveMQ
ActiveMQ點對點消息傳遞ActiveMQ

實作隊列服務監聽

1.觀察者模式

1.1事件源

事件發生的源頭。 監聽器監聽的具體位置。

1.2事件

具體觸發的事件。 如: 單擊事件, 輕按兩下事件 等。

其中必然包含事件源資訊。

1.3監聽器

處理事件的代碼邏輯。

Java觀察者模式(Observer) 

2.定義生成者代碼

package com.dpb.observe;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * ActiveMQ中的生産者(Producer)
 * @author dengp
 *
 */
public class MyProducer {

    public void sendhello2ActiveMq(String messageText) {
        // 連接配接工廠,用于建立Connection對象
        ConnectionFactory factory = null;
        // activeMQ 連接配接對象
        Connection conn = null;
        // 一次和ActiveMQ的持久會話對象
        Session session = null;
        // 目的地
        Destination destination = null;
        // 消息發送者
        MessageProducer producer = null;
        // 封裝消息的對象
        Message message = null;
        try {
            /*
             * 建立連結工廠 ActiveMQConnectionFactory -由ActiveMQ實作的ConnectionFactory接口實作類. 
             * 構造方法: public ActiveMQConnectionFactory(String userName, String password,
             * String brokerURL) 
             * userName - 通路ActiveMQ服務的使用者名,使用者名可以通過jetty-realm.properties配置檔案配置. 
             * password - 通路ActiveMQ服務的密碼,密碼可以通過jetty-realm.properties配置檔案配置. 
             * brokerURL -通路ActiveMQ服務的路徑位址. 路徑結構為 - 協定名://主機位址:端口号 此連結基于TCP/IP協定.
             */
            factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.88.121:61616");
            // 建立連結對象
            conn = factory.createConnection();
            // 啟動連接配接對象
            conn.start();
            /*
             * 建立會話對象 
             * 方法 - connection.createSession(boolean transacted, int acknowledgeMode); 
             * transacted - 是否使用事務, 
             * 可選值為true|false 
             * true - 使用事務, 當設定此變量值, 則acknowledgeMode參數無效, 
             * 建議傳遞的acknowledgeMode參數值為 Session.SESSION_TRANSACTED 
             * false - 不使用事務, 設定此變量值,則acknowledgeMode參數必須設定. 
             * acknowledgeMode - 消息确認機制, 可選值為:
             * Session.AUTO_ACKNOWLEDGE - 自動确認消息機制 
             * Session.CLIENT_ACKNOWLEDGE -用戶端确認消息機制 
             * Session.DUPS_OK_ACKNOWLEDGE - 有副本的用戶端确認消息機制
             */
            session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 建立目的地,目的地的命名既是隊列的指令
            destination = session.createQueue("MQ-Hello-observe");
            // 建立消息生成者, 建立的消息生成者與某目的地對應, 即方法參數目的地.
            producer = session.createProducer(destination);
            // 建立消息對象,建立一個文本消息對象。此消息對象中儲存要傳遞的文本資料.
            message = session.createTextMessage(messageText);

            // 發送消息
            producer.send(message);
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("通路ActiveMQ服務發生錯誤!!");
        } finally {
            try {
                // 回收消息發送者資源
                if (null != producer)
                    producer.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
            try {
                // 回收會話資源
                if (null != session)
                    session.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
            try {
                // 回收連結資源
                if (null != conn)
                    conn.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}      

3.定義消費者代碼

package com.dpb.observe;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * ActiveMQ中的消費者(Consumer)
 * @author dengp
 *
 */
public class MyConsumer {

    public void reciveHelloFormActiveMq() {
        // 連接配接工廠,用于建立Connection對象
        ConnectionFactory factory = null;
        // activeMQ 連接配接對象
        Connection conn = null;
        // 一次和ActiveMQ的持久會話對象
        Session session = null;
        // 目的地
        Destination destination = null;
        // 消息消費者
        MessageConsumer consumer = null;
        // 封裝消息的對象
        Message message = null;
        try {
            /*
             * 建立連結工廠 ActiveMQConnectionFactory -由ActiveMQ實作的ConnectionFactory接口實作類. 
             * 構造方法: public ActiveMQConnectionFactory(String userName, String password,
             * String brokerURL) 
             * userName - 通路ActiveMQ服務的使用者名,使用者名可以通過jetty-realm.properties配置檔案配置. 
             * password - 通路ActiveMQ服務的密碼,密碼可以通過jetty-realm.properties配置檔案配置. 
             * brokerURL -通路ActiveMQ服務的路徑位址. 路徑結構為 - 協定名://主機位址:端口号 此連結基于TCP/IP協定.
             */
            factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.88.121:61616");
            // 建立連結對象
            conn = factory.createConnection();
            // 啟動連接配接對象
            conn.start();
            /*
             * 建立會話對象 
             * 方法 - connection.createSession(boolean transacted, int acknowledgeMode); 
             * transacted - 是否使用事務, 
             * 可選值為true|false 
             * true - 使用事務, 當設定此變量值, 則acknowledgeMode參數無效, 
             * 建議傳遞的acknowledgeMode參數值為 Session.SESSION_TRANSACTED 
             * false - 不使用事務, 設定此變量值,則acknowledgeMode參數必須設定. 
             * acknowledgeMode - 消息确認機制, 可選值為:
             * Session.AUTO_ACKNOWLEDGE - 自動确認消息機制 
             * Session.CLIENT_ACKNOWLEDGE -用戶端确認消息機制 
             * Session.DUPS_OK_ACKNOWLEDGE - 有副本的用戶端确認消息機制
             */
            session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 建立目的地,目的地的命名既是隊列的指令
            destination = session.createQueue("MQ-Hello-observe");
            // 建立消息消費者, 建立的消息消費者與某目的地對應, 即方法參數目的地.
            consumer = session.createConsumer(destination);
            // 監聽ActiveMQ服務中的消息,當發現消息的時候,自動處理
            consumer.setMessageListener(new MessageListener() {
                /**
                 * 當用消息到來的時候觸發該方法,在該方法中處理消息
                 */
                @Override
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    String messageString = null;
                    try {
                        messageString = textMessage.getText();
                    } catch (JMSException e) {
                        e.printStackTrace();
                        messageString = "處理消息失敗!!";
                    }
                    System.out.println("處理的消息内容是 : " + messageString);
                }
            });
            // 阻塞程序
            System.in.read();
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("通路ActiveMQ服務發生錯誤!!");
        } finally {
            try {
                // 回收消息發送者資源
                if (null != consumer)
                    consumer.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
            try {
                // 回收會話資源
                if (null != session)
                    session.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
            try {
                // 回收連結資源
                if (null != conn)
                    conn.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}      

生産者

public static void main(String[] args) {
    MyProducer pro = new MyProducer();
    pro.sendhello2ActiveMq("你好啊...listener");
}      
ActiveMQ點對點消息傳遞ActiveMQ

消費者

public static void main(String[] args) {
    MyConsumer con = new MyConsumer();
    con.reciveHelloFormActiveMq();
}      
ActiveMQ點對點消息傳遞ActiveMQ
ActiveMQ點對點消息傳遞ActiveMQ

web頁面中可以看到還線上的消費者。