天天看點

消息隊列--ActiveMQ

JMS(Java Message Service,Java消息服務)是一組Java應用程式接口(Java API),它提供建立、發送、接收、讀取消息的服務。Java消息服務支援兩種消息模型:Point-to-Point消息(P2P)和釋出訂閱消息(Publish Subscribe messaging,簡稱Pub/Sub)。

消息隊列廠商隻需要實作這些接口即可,與JDBC的實作過程是類似的(JDBC也是提供一組接口,然後MySQL有MySQL的實作,Oracle有Oracle的實作...)。

首先去ActiveMQ官網下載下傳,我目前版本是 ActiveMQ 5.15.0 Release

消息隊列--ActiveMQ
然後解壓,我解壓到D盤
消息隊列--ActiveMQ

bin目錄用來啟動MQ,conf是配置檔案,那個jar包是MQ的包,一會要用。

打開eclipse,建立一個Java工程,把這個包複制到工程,然後build path。

消息隊列--ActiveMQ

然後建立兩個類,一個作為生産者,一個作為消費者。

生産者:

package mq.helloworld;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {

    
    public static void main(String[] args) throws Exception {
        
        //1. 建立一個ConnectionFactory. tcp://0.0.0.0:61616
        String userName = ActiveMQConnectionFactory.DEFAULT_USER;
        String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
        String brokerURL = "tcp://localhost:61616";
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName, password, brokerURL);
        
        //2. 通過ConnectionFactory建立一個Connection連接配接,并且調用start方法開啟
        Connection connection = connectionFactory.createConnection();
        connection.start();
        
        //3. 通過Connection建立Session,用于接收消息[第一個參數:是否啟用事務;第二個參數:設定簽收模式]
//        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);//手工簽收--常用
        
        //4. 通過Session建立Destination對象
        Destination destination = session.createQueue("foodQueue");
        
        //5. 通過Session建立發送或接受對象
        MessageProducer messageProducer = session.createProducer(destination);
        
        //6. 設定持久化特性或非持久化特性
        messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        
        //7. 使用JMS規範裡面消息類型之一 TextMessage 來建立資料,用MessageProducer來發送
        for (int i = 0; i < 5; i++) {
            TextMessage message = session.createTextMessage();
            message.setText("大蘿蔔" + i);
            messageProducer.send(message);
            System.out.println("生産者:" + message.getText());
        }
        
        //使用事務要手動送出
        //session.commit();
        
        //8. 關閉連接配接
        connection.close();
        
    }// main
}      

 消費者:

package mq.helloworld;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Receiver {

public static void main(String[] args) throws Exception {
        
        //1. 建立一個ConnectionFactory. tcp://0.0.0.0:61616
        String userName = ActiveMQConnectionFactory.DEFAULT_USER;
        String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
        String brokerURL = "tcp://localhost:61616";
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName, password, brokerURL);
        
        //2. 通過ConnectionFactory建立一個Connection連接配接,并且調用start方法開啟
        Connection connection = connectionFactory.createConnection();
        connection.start();
        
        //3. 通過Connection建立Session,用于接收消息[第一個參數:是否啟用事務;第二個參數:設定簽收模式]
//        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        
        //4. 通過Session建立Destination對象
        Destination destination = session.createQueue("foodQueue");
        
        //5. 通過Session建立發送或接受對象
        MessageConsumer messageConsumer = session.createConsumer(destination);
        
        //7. 使用JMS規範裡面消息類型之一 TextMessage 來建立資料
        while (true) {
            TextMessage message = (TextMessage) messageConsumer.receive();
            if(message == null) break;
            message.acknowledge();    //手動簽收
            System.out.println("消費者:" + message.getText());
        }
        
        //8. 關閉連接配接
        connection.close();
        
    }// main
}      

 先啟動MQ

消息隊列--ActiveMQ

在浏覽器裡輸入http://localhost:8161/admin/

消息隊列--ActiveMQ

出現這個界面就ok。

消息隊列--ActiveMQ

然後運作Sender。

消息隊列--ActiveMQ

這裡是控制台列印的,是否真的發出去了,看MQ的管理界面。

消息隊列--ActiveMQ

ok。然後運作receiver

消息隊列--ActiveMQ

好了,五個大蘿蔔已經吃了,看管理界面的變化。

消息隊列--ActiveMQ

注意:如果沒有【手動簽收】,MQ是不會認為用戶端已經消費了的

message.acknowledge();    //手動簽收      

到此,一個hello 完成

小LUA

面對敵人的嚴刑逼供,我一個字也沒說,而是一五一十寫了下來。