天天看点

JMS分布式消息中间件

什么是分布式消息中间件?

想要了解分布式消息中间件,首先要了解一下什么是分布式系统和什么又是中间件。

分布式系统:由一组若干个可以独立运行的程序模块,集成于一个分布式处理的系统。

消息中间件:可与OA、ERP集成的免费消息中间件Active Messenger,利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者)。

什么是JMS:

JMS(Java Messaging Service)是Java平台上有关面向消息中间件的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接口简化接收消息的企业应用的开发。

JMS主要定义了五种不同的消息正文格式,以及消息调用的消息类型。

  1.  TextMessage--一个字符串对象
  2. MapMessage--一套名称-值对
  3. ObjectMessage--一个序列化的 Java 对象
  4. BytesMessage--一个字节的数据流
  5. StreamMessage -- Java 原始值的数据流

JMS消息传递类型:

消息传递有两种类型:

一种是点对点,即生产者对应消费者一一对应。

JMS分布式消息中间件

另一种是发布/订阅模式,由一个生产者产生消息并进行发送,由多个消费者进行接收。类似于广播。

JMS分布式消息中间件

要点:

1.消息中间件主要用于不需要返回值的工程

2.消息中间件传递的实体必须要实现实例化接口,或者传递原始文本 (json格式也是对象序列化的一种方式)

解决activeMQ消息服务器,消息丢失的处理方案:

点对点queue模式消息丢失处理

点对点发布模式是默认的发布模式,如果消息接受方宕机,那么消息服务器就会将消息持久化到服务器中,等到接收方重新启动后,消息服务器会将持久化的消息重新发送给接收方,并将消息服务器中持久化的消息清除。

发布/订阅模式消息丢失处理:

发布订阅模式时,消息服务器只会给接收方发送消息,不会考虑接收方是否宕机是否接收到消息,如果想要解决消息丢失,就要在消息服务器中特别配置检测接收方是否收到消息,如果没有接受到消息就将消息持久化到服务器中。而点对点模式不用特别配置。

常见的消息中间件产品有:

1.ActiveMQ:

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。

2.RabbitMQ:

MQP协议的领导实现,支持多种场景。淘宝的MySQL集群内部有使用它进行通讯,OpenStack开源云平台的通信组件,最先在金融行业得到运用。

3.ZeroMQ:史上最快的消息队列系统

4.Kafka:Apache下的一个子项目 。特点:高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统。适合处理海量数据。

下面是使用ActiveMQ消息中间件点对点的生产者

//1.创建连接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://本机的IP地址:61616");
//2.获取连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//	AUTO_ACKNOWLEDGE = 1    自动确认	
//	CLIENT_ACKNOWLEDGE = 2    客户端手动确认
//	DUPS_OK_ACKNOWLEDGE = 3    自动批量确认
//	SESSION_TRANSACTED = 0    事务提交并确认	
//5.创建队列对象
Queue queue = session.createQueue("test-queue");
//6.创建消息生产者
MessageProducer producer = session.createProducer(queue);
//7.创建消息
TextMessage textMessage = session.createTextMessage("欢迎来到神奇的ActiveMQ世界");
//8.发送消息
producer.send(textMessage);
//9.关闭资源
producer.close();
session.close();
connection.close();
           

使用ActiveMQ消息中间件点对点的消费者

//1.创建连接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://本机IP地址:61616");
//2.获取连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建队列对象
Queue queue = session.createQueue("test-queue");
//6.创建消息消费
MessageConsumer consumer = session.createConsumer(queue);
	
//7.监听消息
consumer.setMessageListener(new MessageListener() {
	public void onMessage(Message message) {
		TextMessage textMessage=(TextMessage)message;
		try {
			System.out.println("接收到消息:"+textMessage.getText());
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
});	
//8.等待键盘输入
System.in.read();	
//9.关闭资源
consumer.close();
session.close();
connection.close();
           

使用ActiveMQ消息中间件发布/订阅的生产者

1.2.3.4同上
//5.创建主题对象
Topic topic = session.createTopic("test-topic");
//6.创建消息生产者
MessageProducer producer = session.createProducer(topic);
//7.创建消息
TextMessage textMessage = session.createTextMessage("欢迎来到神奇的ActiveMQ世界");
//8.发送消息
producer.send(textMessage);
//9.关闭资源
producer.close();
session.close();
connection.close();
           

使用ActiveMQ消息中间件发布/订阅的消费者

//5.创建主题对象
Topic topic = session.createTopic("test-topic");
//6.创建消息消费
MessageConsumer consumer = session.createConsumer(topic);
//7.监听消息
	consumer.setMessageListener(new MessageListener() {
		public void onMessage(Message message) {
			TextMessage textMessage=(TextMessage)message;
			try {
				System.out.println("接收到消息:"+textMessage.getText());
			} catch (JMSException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	});
//8.等待键盘输入
System.in.read();
//9.关闭资源
consumer.close();
session.close();
connection.close();
           

Spring整合JMS:

创建一个spring配置文件applicationContext-jms-producer.xml(生产者)

<context:component-scan base-package="生产者类的包"></context:component-scan>        
    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->  
	<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
	    <property name="brokerURL" value="tcp://本机IP地址:61616"/>  
	</bean>          
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
	<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
	<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
	    <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
	</bean>  	   
    <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->  
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
	    <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
	    <property name="connectionFactory" ref="connectionFactory"/>  
	</bean>      
    <!--这个是队列目的地,点对点的  文本信息-->  
	<!--<bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">  
	    <constructor-arg value="queue_text"/>  
	</bean> -->
	<!--这个是订阅模式  文本信息-->  
	<bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic">  
	    <constructor-arg value="topic_text"/>  
	</bean>  
           

配置文件 applicationContext-jms-consumer-queue.xml(消费者)

<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->  
	<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
	    <property name="brokerURL" value="tcp://本地IP地址:61616"/>  
	</bean>	   
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
	<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
	<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
	    <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
	</bean>  	
    <!--这个是队列目的地,点对点的  文本信息-->  
	<bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">  
	    <constructor-arg value="queue_text"/>//value的值必须与生产者中的值相同  
	</bean> 
    <!--这个是队列目的地,发布订阅的  文本信息-->  
	<bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic">  
	    <constructor-arg value="topic_text"/>  
	</bean>  
   
	<!-- 我的监听类 -->
	<bean id="myMessageListener" class="类的全路径名"></bean>
	<!-- 消息监听容器 -->
	<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="connectionFactory" />
		<property name="destination" ref="queueTextDestination/topicTextDestination" />
		<property name="messageListener" ref="myMessageListener" />
	</bean>	
           

在对应的包下面创建生产者:(点对点)

@Component
public class QueueProducer {
	
	@Autowired
	private JmsTemplate jmsTemplate;
	
	@Autowired
	private Destination queueTextDestination;
	
	/**
	 * 发送文本消息
	 * @param text
	 */
	public void sendTextMessage(final String text){
		jmsTemplate.send(queueTextDestination, new MessageCreator() {			
			public Message createMessage(Session session) throws JMSException {
				return session.createTextMessage(text);
			}
		});		
	}
}
           

监听类:(点对点消费者)

public class MyMessageListener implements MessageListener {
	public void onMessage(Message message) {
	TextMessage textMessage=(TextMessage)message;		
		try {
			System.out.println("接收到消息:"+textMessage.getText());
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
}
           

SpringJMS发布订阅:

生产者类:

@Component
public class TopicProducer {
	@Autowired
	private JmsTemplate jmsTemplate;
	
	@Autowired
	private Destination topicTextDestination;
	
	/**
	 * 发送文本消息
	 * @param text
	 */
	public void sendTextMessage(final String text){
		jmsTemplate.send(topicTextDestination, new MessageCreator() {			
			public Message createMessage(Session session) throws JMSException {
				return session.createTextMessage(text);
			}
		});		
	}
}
           

继续阅读