目錄
1:Broker是什麼
2:根據不同的conf啟動不同的activemq
3:java嵌入式mq
3.1:pom.xml
3.2:啟動自己java代碼寫的内嵌式的mq
3.3:利用自己内嵌式的mq當做隊列,進行測試
1:Broker是什麼
2:根據不同的conf啟動不同的activemq
3:java嵌入式mq
3.1:pom.xml
<dependencies>
<!--activeMq需要的jar包-->
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.16.0</version>
</dependency>
<!--跟spring整合的包-->
<!-- https://mvnrepository.com/artifact/org.apache.xbean/xbean-spring -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>4.17</version>
</dependency>
<!--其他的基礎包-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<!--json裝換的包-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.11.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
3.2:啟動自己java代碼寫的内嵌式的mq
package com.wkl.broker;
import org.apache.activemq.broker.BrokerService;
/**
* Description:自己java内嵌式的mq
* Date: 2020/9/4 - 上午 11:31
* author: wangkanglu
* version: V1.0
*/
public class MyBroker {
public static void main(String[] args) throws Exception {
//Activemq也支援在jvm中内嵌的broker
BrokerService brokerService = new BrokerService();
brokerService.setUseJmx(true);
brokerService.addConnector("tcp://localhost:61616");
brokerService.start();
}
}
3.3:利用自己内嵌式的mq當做隊列,進行測試
生産者:
package com.wkl.queuq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Description:第一次連接配接ActvieMq的服務
* Date: 2020/9/3 - 下午 1:51
* author: wangkanglu
* version: V1.0
*/
public class Produce {
/*賬号密碼如果都是預設的admin,可以不用穿;直傳url*/
private static final String USERNAME = "admin";
private static final String PASSWORD = "admin";
/*這個url以tcp協定開頭,java程式通路的是61616端口*/
private static final String ACTIVE_URL = "tcp://localhost:61616";
private static final String Queue_NAME = "queue01";
public static void main(String[] args) throws JMSException {
//1:建立連接配接工廠,按照給定的賬号,密碼,url位址來連結;
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,ACTIVE_URL);
//2:通過連結工廠獲得connection并啟動
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3:通過connection建立會話,參數分别是事務和簽收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4:建立目的地(隊列還是主題)
Queue queue = session.createQueue(Queue_NAME);
//5:建立消息生産者
MessageProducer producer = session.createProducer(queue);
//6:通過消息生産者producer産生3條消息發到mq的隊列中
for (int i = 1; i <=5 ; i++) {
//7:建立消息
TextMessage textMessage = session.createTextMessage("brokermsg--" + i);
//8:通過perducer發送給mq
producer.send(textMessage);
}
//9:關閉資源;順着申請,倒着關閉
producer.close();
session.close();
connection.close();
System.out.println("---end---");
}
}
消費者:
package com.wkl.queuq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Description:消息的消費者就是處理者的helloword
* Date: 2020/9/3 - 下午 2:38
* author: wangkanglu
* version: V1.0
*/
public class Consumer {
/*賬号密碼如果都是預設的admin,可以不用穿;直傳url*/
private static final String USERNAME = "admin";
private static final String PASSWORD = "admin";
/*這個url以tcp協定開頭,java程式通路的是61616端口*/
private static final String ACTIVE_URL = "tcp://localhost:61616";
private static final String Queue_NAME = "queue01";
public static void main(String[] args) throws JMSException {
System.out.println("2-----");
//1:建立連接配接工廠,按照給定的賬号,密碼,url位址來連結;
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,ACTIVE_URL);
//2:通過連結工廠獲得connection并啟動
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3:通過connection建立會話,參數分别是事務和簽收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4:建立目的地(隊列還是主題)
Queue queue = session.createQueue(Queue_NAME);
//5:建立消息消費者
MessageConsumer consumer = session.createConsumer(queue);
//消息的消費者循環讀取讀取消息
while(true){
//6:consumer讀取消息,當初發送消息是testMessage,接受時也是這個類型
//消費者隻等待4秒;4秒後沒消息就走了
TextMessage textMessage = (TextMessage) consumer.receive(4000L);
if(null!=textMessage){
System.out.println("接收到的消息broker:"+textMessage.getText());
// textMessage.acknowledge();
}else {
break;
}
}
//7:關閉資源;順着申請,倒着關閉
consumer.close();
session.close();
connection.close();
System.out.println("----end----");
}
}