天天看點

java程式設計異步通信

作者:尚矽谷教育
java程式設計異步通信

目前的位置

一、分布式的業務場景

java程式設計異步通信

1 、如何高效完成各個分布式系統的協作

通過消息隊列來達到異步解耦的效果,減少了程式之間的阻塞等待時間,降低了因為服務之間調用的依賴風險。

2、消息的弊端?如何解決?

消息隊列的問題在于不确定性,不能絕對保證消息的準确到達,是以要引入延遲、周期性的主動輪詢,來發現未到達的消息,進而進行補償。

二、消息隊列簡介

消息隊列,也叫消息中間件。消息的傳輸過程中儲存消息的容器。

消息隊列都解決了什麼問題?

  1. 異步
java程式設計異步通信

2、并行

java程式設計異步通信

3、解耦

java程式設計異步通信

4、排隊(削峰填谷)

java程式設計異步通信

5 弊端:不确定性和延遲

java程式設計異步通信

解決方案:最終一緻

消息模式

點對點

java程式設計異步通信

訂閱

java程式設計異步通信

三 消息隊列工具 ActiveMQ

1 、簡介

java程式設計異步通信

同類産品: RabbitMQ 、 Kafka、Redis(List)

1.1 對比RabbitMQ

最接近的同類型産品,經常拿來比較,性能伯仲之間,基本上可以互相替代。最主要差別是二者的協定不同RabbitMQ的協定是AMQP(Advanced Message Queueing Protoco),而ActiveMQ使用的是JMS(Java Messaging Service )協定。顧名思義JMS是針對Java體系的傳輸協定,隊列兩端必須有JVM,是以如果開發環境都是java的話推薦使用ActiveMQ,可以用Java的一些對象進行傳遞比如Map、BLob、Stream等。而AMQP通用行較強,非java環境經常使用,傳輸内容就是标準字元串。

另外一點就是RabbitMQ用Erlang開發,安裝前要裝Erlang環境,比較麻煩。ActiveMQ解壓即可用不用任何安裝。

1.2 對比KafKa

Kafka性能超過ActiveMQ等傳統MQ工具,叢集擴充性好。

弊端是:

在傳輸過程中可能會出現消息重複的情況,

不保證發送順序

一些傳統MQ的功能沒有,比如消息的事務功能。

是以通常用Kafka處理大資料日志。

1.3 對比Redis

其實Redis本身利用List可以實作消息隊列的功能,但是功能很少,而且隊列體積較大時性能會急劇下降。對于資料量不大、業務簡單的場景可以使用。

2 安裝 ActiveMQ

拷貝apache-activemq-5.14.4-bin.tar.gz到Linux伺服器的/opt下

解壓縮 tar -zxvf apache-activemq-5.14.4-bin.tar.gz

重命名 mv apache-activemq-5.14.4 activemq

vim /opt/activemq/bin/activemq

java程式設計異步通信

增加兩行

JAVA_HOME="/opt/jdk1.8.0_152"
JAVA_CMD="/opt/jdk1.8.0_152/bin           

注冊服務

ln -s /opt/activemq/bin/activemq /etc/init.d/activemq
chkconfig --add activemq           

啟動服務

service activemq start

java程式設計異步通信

關閉服務

service activemq stop

通過netstat 檢視端口

java程式設計異步通信

activemq兩個重要的端口,一個是提供消息隊列的預設端口:61616

另一個是控制台端口8161

通過控制台測試

啟動消費端

java程式設計異步通信

進入網頁控制台

java程式設計異步通信

賬号/密碼預設: admin/admin

點選Queues

java程式設計異步通信
java程式設計異步通信
java程式設計異步通信

觀察用戶端

java程式設計異步通信

在Java中使用消息隊列

3.1 在gmall-service-util中導入依賴坐标

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>           

3.2 producer端

public static void main(String[] args) {
ConnectionFactory connect = new ActiveMQConnectionFactory("tcp://192.168.67.163:61616");
try {
Connection connection = connect.createConnection();
connection.start();
//第一個值表示是否使用事務,如果選擇true,第二個值相當于選擇0
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue testqueue = session.createQueue("TEST1");
MessageProducer producer = session.createProducer(testqueue);
TextMessage textMessage=new ActiveMQTextMessage();
textMessage.setText("今天天氣真好!");
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
producer.send(textMessage);
session.commit();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}           

3.3 consumer

public static void main(String[] args) {
ConnectionFactory connect = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://192.168.67.163:61616");
try {
Connection connection = connect.createConnection();
connection.start();
//第一個值表示是否使用事務,如果選擇true,第二個值相當于選擇0
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination testqueue = session.createQueue("TEST1");
MessageConsumer consumer = session.createConsumer(testqueue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(message instanceof TextMessage){
try {
String text = ((TextMessage) message).getText();
System.out.println(text);
//session.rollback();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
}catch (Exception e){
e.printStackTrace();
}
}           

3.4 關于事務控制

producer送出時的事務 事務開啟 隻執行send并不會送出到隊列中,隻有當執行session.commit()時,消息才被真正地送出到隊列中。
事務不開啟 隻要執行send,就進入到隊列中。
consumer 接收時的事務

事務開啟,簽收必須寫

Session.SESSION_TRANSACTED

收到消息後,消息并沒有真正地被消費。消息隻是被鎖住。一旦出現該線程死掉、抛異常,或者程式執行了session.rollback()那麼消息會釋放,重新回到隊列中被别的消費端再次消費。

事務不開啟,簽收方式選擇

Session.AUTO_ACKNOWLEDGE

隻要調用comsumer.receive方法 ,自動确認。
Session.CLIENT_ACKNOWLEDGE

需要用戶端執行 message.acknowledge(),否則視為未送出狀态,線程結束後,其他線程還可以接收到。

這種方式跟事務模式很像,差別是不能手動復原,而且可以單獨确認某個消息。

Session.DUPS_OK_ACKNOWLEDGE 在Topic模式下做批量簽收時用的,可以提高性能。但是某些情況消息可能會被重複送出,使用這種模式的consumer要可以處理重複送出的問題。

3.5 持久化與非持久化

通過producer.setDeliveryMode(DeliveryMode.PERSISTENT) 進行設定

持久化的好處就是當activemq當機的話,消息隊列中的消息不會丢失。非持久化會丢失。但是會消耗一定的性能。

四 與springboot整合

1 配置類ActiveMQConfig

@Configuration
public class ActiveMQConfig {
@Value("${spring.activemq.broker-url:disabled}")
String brokerURL ;
@Value("${activemq.listener.enable:disabled}")
String listenerEnable;
@Bean
public ActiveMQUtil getActiveMQUtil() throws JMSException {
if(brokerURL.equals("disabled")){
return null;
}
ActiveMQUtil activeMQUtil=new ActiveMQUtil();
activeMQUtil.init(brokerURL);
return activeMQUtil;
}
//定義一個消息監聽器連接配接工廠,這裡定義的是點對點模式的監聽器連接配接工廠
@Bean(name = "jmsQueueListener")
public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory ) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
if(!listenerEnable.equals("true")){
return null;
}
factory.setConnectionFactory(activeMQConnectionFactory);
//設定并發數
factory.setConcurrency("5");
//重連間隔時間
 factory.setRecoveryInterval(5000L);
factory.setSessionTransacted(false);
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);

return factory;
}
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory ( ){

ActiveMQConnectionFactory activeMQConnectionFactory =
new ActiveMQConnectionFactory( brokerURL);
return activeMQConnectionFactory;
}
}           

2 工具類ActiveMQUtil

public class ActiveMQUtil {
PooledConnectionFactory pooledConnectionFactory=null;
public ConnectionFactory init(String brokerUrl) {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
//加入連接配接池
pooledConnectionFactory=new PooledConnectionFactory(factory);
//出現異常時重新連接配接
pooledConnectionFactory.setReconnectOnException(true);
//
pooledConnectionFactory.setMaxConnections(5);
pooledConnectionFactory.setExpiryTimeout(10000);
return pooledConnectionFactory;
}
public ConnectionFactory getConnectionFactory(){
return pooledConnectionFactory;
}
}           

五 在支付業務子產品中應用

1 支付成功通知

支付子產品利用消息隊列通知訂單系統,支付成功

在支付子產品中配置application.properties

spring.activemq.broker-url=tcp://mq.server.com:61616           

在PaymentServiceImpl中增加發送方法:

public void sendPaymentResult(String orderId,String result){
ConnectionFactory connectionFactory = activeMQUtil.getConnectionFactory();
Connection connection=null;
try {
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue paymentResultQueue = session.createQueue("PAYMENT_RESULT_QUEUE");
MapMessage mapMessage=new ActiveMQMapMessage();
mapMessage.setString("orderId",orderId);
mapMessage.setString("result",result);
MessageProducer producer = session.createProducer(paymentResultQueue);
producer.send(mapMessage);
session.commit();           
producer.close();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}           

在PaymentController中增加一個方法用來測試

@RequestMapping("sendResult")
@ResponseBody
public String sendPaymentResult(@RequestParam("orderId") String orderId){
paymentService.sendPaymentResult(orderId,"success" );
return "has been sent";
}           

在浏覽器中通路:

java程式設計異步通信

檢視隊列内容:有一個在隊列中沒有被消費的消息。

java程式設計異步通信
java程式設計異步通信
java程式設計異步通信

2 訂單子產品消費消息

application.properties

spring.activemq.broker-url=tcp://mq.server.com:61616
activemq.listener.enable=true           

訂單消息消息後要更新訂單狀态,先準備好訂單狀态更新的方法

public void updateProcessStatus(String orderId , ProcessStatus processStatus, Map<String,String>... paramMaps) {
OrderInfo orderInfo = new OrderInfo();
orderInfo.setId(orderId);
orderInfo.setOrderStatus(processStatus.getOrderStatus());
orderInfo.setProcessStatus(processStatus);
//動态增加需要補充更新的屬性
if (paramMaps != null && paramMaps.length > 0) {
Map<String, String> paramMap = paramMaps[0];
for (Map.Entry<String, String> entry : paramMap.entrySet()) {
String properties = entry.getKey();
String value = entry.getValue();
try {
BeanUtils.setProperty(orderInfo, properties, value);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
}
orderInfoMapper.updateByPrimaryKeySelective(orderInfo);
}           

消息隊列的消費端

@JmsListener(destination = "PAYMENT_RESULT_QUEUE",containerFactory = "jmsQueueListener")
public void consumePaymentResult(MapMessage mapMessage) throws JMSException {
String orderId = mapMessage.getString("orderId");
String result = mapMessage.getString("result");
if(!"success".equals(result)){
orderService.updateProcessStatus( orderId , ProcessStatus.PAY_FAIL);
}else{
orderService.updateProcessStatus( orderId , ProcessStatus.PAID);
}
 orderService.sendOrderResult(orderId);
}           

3 訂單子產品發送減庫存通知

訂單子產品除了接收到請求改變單據狀态,還要發送庫存系統

檢視看《庫存管理系統接口手冊》中【減庫存的消息隊列消費端接口】中的描述,組織相應的消息資料進行傳遞。

@Transactional
public void sendOrderResult(String orderId){
OrderInfo orderInfo = getOrderInfo(orderId);
Map<String, Object> messageMap = initWareOrderMessage(orderInfo);
String wareOrderJson= JSON.toJSONString(messageMap);
Session session = null;
try {
Connection conn = activeMQUtil.getConnection();
session = conn.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("ORDER_RESULT_QUEUE");
MessageProducer producer = session.createProducer(queue);
TextMessage message =new ActiveMQTextMessage();
message.setText(wareOrderJson);
producer.send(message);
updateProcessStatus(orderInfo.getId(), ProcessStatus.NOTIFIED_WARE);
session.commit();
producer.close();
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}           

針對接口手冊中需要的消息進行組織

public Map<String,Object> initWareOrderMessage( OrderInfo orderInfo ) {
//準備發送到倉庫系統的訂單
String wareId = orderInfo.getWareId();
HashMap<String, Object> hashMap = new HashMap<>();
hashMap.put("orderId", orderInfo.getId());
hashMap.put("consignee", orderInfo.getConsignee());
hashMap.put("consigneeTel", orderInfo.getConsigneeTel());
hashMap.put("orderComment", orderInfo.getOrderComment());
hashMap.put("orderBody", orderInfo.getOrderSubject());
hashMap.put("deliveryAddress", orderInfo.getDeliveryAddress());
hashMap.put("paymentWay", "2");//1 貨到付款 2 線上支付
hashMap.put("wareId",wareId);
List<HashMap<String, String>> details = new ArrayList<>();
List<OrderDetail> orderDetailList = orderInfo.getOrderDetailList();
for (OrderDetail orderDetail : orderDetailList) {
HashMap<String, String> detailMap = new HashMap<>();
detailMap.put("skuId", orderDetail.getSkuId());
detailMap.put("skuNum", "" + orderDetail.getSkuNum());
detailMap.put("skuName", orderDetail.getSkuName());
details.add(detailMap);
}
hashMap.put("details", details);
return hashMap;
}           

4 消費減庫存結果

給倉庫系統發送減庫存消息後,還要接受減庫存成功或者失敗的消息。

同樣根據《庫存管理系統接口手冊》中【商品減庫結果消息】的說明完成。消費該消息的消息隊列監聽程式。

接受到消息後主要做的工作就是更新訂單狀态。

@JmsListener(destination = "SKU_DEDUCT_QUEUE",containerFactory = "jmsQueueListener")
public void consumeSkuDeduct(MapMessage mapMessage) throws JMSException {
String orderId = mapMessage.getString("orderId");
String status = mapMessage.getString("status");
if("DEDUCTED".equals(status)){
orderService.updateProcessStatus( orderId , ProcessStatus.WAITING_DELEVER);
return ;
}else{
orderService.updateProcessStatus( orderId , ProcessStatus.STOCK_EXCEPTION);
return ;
}
}           

最後一次支付完成後,所有業務全部走通應該可以在訂單清單中,檢視到對應的訂單是待發貨狀态。

5 驗證結果

java程式設計異步通信

相關閱讀:

Java異常處理的概述

java異常體系結構

java二維數組的使用步驟

java數組的文法和使用步驟

java技術重定向