什么是分布式消息中间件?
想要了解分布式消息中间件,首先要了解一下什么是分布式系统和什么又是中间件。
分布式系统:由一组若干个可以独立运行的程序模块,集成于一个分布式处理的系统。
消息中间件:可与OA、ERP集成的免费消息中间件Active Messenger,利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者)。
什么是JMS:
JMS(Java Messaging Service)是Java平台上有关面向消息中间件的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接口简化接收消息的企业应用的开发。
JMS主要定义了五种不同的消息正文格式,以及消息调用的消息类型。
- TextMessage--一个字符串对象
- MapMessage--一套名称-值对
- ObjectMessage--一个序列化的 Java 对象
- BytesMessage--一个字节的数据流
- StreamMessage -- Java 原始值的数据流
JMS消息传递类型:
消息传递有两种类型:
一种是点对点,即生产者对应消费者一一对应。
另一种是发布/订阅模式,由一个生产者产生消息并进行发送,由多个消费者进行接收。类似于广播。
要点:
1.消息中间件主要用于不需要返回值的工程
2.消息中间件传递的实体必须要实现实例化接口,或者传递原始文本 (json格式也是对象序列化的一种方式)
解决activeMQ消息服务器,消息丢失的处理方案:
点对点queue模式消息丢失处理
点对点发布模式是默认的发布模式,如果消息接受方宕机,那么消息服务器就会将消息持久化到服务器中,等到接收方重新启动后,消息服务器会将持久化的消息重新发送给接收方,并将消息服务器中持久化的消息清除。
发布/订阅模式消息丢失处理:
发布订阅模式时,消息服务器只会给接收方发送消息,不会考虑接收方是否宕机是否接收到消息,如果想要解决消息丢失,就要在消息服务器中特别配置检测接收方是否收到消息,如果没有接受到消息就将消息持久化到服务器中。而点对点模式不用特别配置。
常见的消息中间件产品有:
1.ActiveMQ:
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。
2.RabbitMQ:
MQP协议的领导实现,支持多种场景。淘宝的MySQL集群内部有使用它进行通讯,OpenStack开源云平台的通信组件,最先在金融行业得到运用。
3.ZeroMQ:史上最快的消息队列系统
4.Kafka:Apache下的一个子项目 。特点:高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统。适合处理海量数据。
下面是使用ActiveMQ消息中间件点对点的生产者
//1.创建连接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://本机的IP地址:61616");
//2.获取连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// AUTO_ACKNOWLEDGE = 1 自动确认
// CLIENT_ACKNOWLEDGE = 2 客户端手动确认
// DUPS_OK_ACKNOWLEDGE = 3 自动批量确认
// SESSION_TRANSACTED = 0 事务提交并确认
//5.创建队列对象
Queue queue = session.createQueue("test-queue");
//6.创建消息生产者
MessageProducer producer = session.createProducer(queue);
//7.创建消息
TextMessage textMessage = session.createTextMessage("欢迎来到神奇的ActiveMQ世界");
//8.发送消息
producer.send(textMessage);
//9.关闭资源
producer.close();
session.close();
connection.close();
使用ActiveMQ消息中间件点对点的消费者
//1.创建连接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://本机IP地址:61616");
//2.获取连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建队列对象
Queue queue = session.createQueue("test-queue");
//6.创建消息消费
MessageConsumer consumer = session.createConsumer(queue);
//7.监听消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
try {
System.out.println("接收到消息:"+textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
//8.等待键盘输入
System.in.read();
//9.关闭资源
consumer.close();
session.close();
connection.close();
使用ActiveMQ消息中间件发布/订阅的生产者
1.2.3.4同上
//5.创建主题对象
Topic topic = session.createTopic("test-topic");
//6.创建消息生产者
MessageProducer producer = session.createProducer(topic);
//7.创建消息
TextMessage textMessage = session.createTextMessage("欢迎来到神奇的ActiveMQ世界");
//8.发送消息
producer.send(textMessage);
//9.关闭资源
producer.close();
session.close();
connection.close();
使用ActiveMQ消息中间件发布/订阅的消费者
//5.创建主题对象
Topic topic = session.createTopic("test-topic");
//6.创建消息消费
MessageConsumer consumer = session.createConsumer(topic);
//7.监听消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
try {
System.out.println("接收到消息:"+textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
//8.等待键盘输入
System.in.read();
//9.关闭资源
consumer.close();
session.close();
connection.close();
Spring整合JMS:
创建一个spring配置文件applicationContext-jms-producer.xml(生产者)
<context:component-scan base-package="生产者类的包"></context:component-scan>
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://本机IP地址:61616"/>
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory"/>
</bean>
<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<!--这个是队列目的地,点对点的 文本信息-->
<!--<bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queue_text"/>
</bean> -->
<!--这个是订阅模式 文本信息-->
<bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="topic_text"/>
</bean>
配置文件 applicationContext-jms-consumer-queue.xml(消费者)
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://本地IP地址:61616"/>
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory"/>
</bean>
<!--这个是队列目的地,点对点的 文本信息-->
<bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queue_text"/>//value的值必须与生产者中的值相同
</bean>
<!--这个是队列目的地,发布订阅的 文本信息-->
<bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="topic_text"/>
</bean>
<!-- 我的监听类 -->
<bean id="myMessageListener" class="类的全路径名"></bean>
<!-- 消息监听容器 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueTextDestination/topicTextDestination" />
<property name="messageListener" ref="myMessageListener" />
</bean>
在对应的包下面创建生产者:(点对点)
@Component
public class QueueProducer {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Destination queueTextDestination;
/**
* 发送文本消息
* @param text
*/
public void sendTextMessage(final String text){
jmsTemplate.send(queueTextDestination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(text);
}
});
}
}
监听类:(点对点消费者)
public class MyMessageListener implements MessageListener {
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
try {
System.out.println("接收到消息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
SpringJMS发布订阅:
生产者类:
@Component
public class TopicProducer {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Destination topicTextDestination;
/**
* 发送文本消息
* @param text
*/
public void sendTextMessage(final String text){
jmsTemplate.send(topicTextDestination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(text);
}
});
}
}