天天看点

JMS相关

2. JMS简介

2.1. JMS的体系结构

JMS应用由以下几部分组成:

JMS provider :是一个消息系统,它实现了JMS 接口并提供管理和控制的功能。

JMS clients :是用Java语言写的一些程序和组件,它们产生和使用消息。

Messages :是在JMS clients之间传递的消息的对象。

Administered objects :是由使用JMS clients 的人生成的预选设置好的JMS 对象。有两种这样的对象:destinations和connection factories。

2.2. Message机制

JMS规范制定了两种Message机制:point-to-point和publish/subscribe。

2.2.1 point-to-point

point-to-point (PTP) 的产品和应用是建立在message queues, senders和receivers概念上的。 每一个message发送到某一个特定的queue, 然后接收客户从queue 里面提取messages。 Queues里保存所有发给接收客户的messages,直到messages被提供了或过期。

应该注意:

1.每一个message只有一个使用者。

2.一个message的sender和receiver没有时间上的依赖关系。无论sendere有没有在运行,Receiver都可提取message。

3.Receiver完成对message处理这后,发出确认。

当你所发出的每一个消息必须由一个使用者成功处理的情况下,使用 PTP messaging机制。

2.2.2 publish/subscribe

在publish/subscribe (pub/sub)产品或应用中, 客户发送messages到一个topic。Publishers和subscribers通常是匿名的并且可以动态发布或订阅。系统负责分发从多个publishers来的同一个topic的messages。当messages 分发到了所有目前的subscribers,Topics就不再保留他们。

Pub/sub messaging有如下的特点:

1.每一个message可以有多个使用者;

2.Publishers和subscribers在时间上有依赖关系。一个订阅了某一个topic的客户,只能使用在它生成订阅之后发布的message, 并且subscriber必须一直保持活动状态。

JMS API允许客户生成持久性的订阅,从而在某种程度上放宽了这种时间上的依赖关系, 提高了灵活性处可靠性。

2.2.3 Messaging的使用

Messaging本生是asynchronous的,使message的使用者之间没有时间上的依赖关系。但是,JMS规范给出了更精确的定义,使Message可以以两种方式被使用:

1.Synchronously:subscriber或receiver可以能过调用receive方法显示地从destination上提取message。Receive方法在收到一个 message后结束,或当message 在一定的时间限制内没有收到时超时结束。

2.Asynchronously:客户可以为某一个使用者注册一个message listener。message listener和event listener很相似。当一个message到达了destination, JMS provider通过调用listener的onMessage方法将message传递过去,由onMessage方法负责处理message。

3. JMS API编程模型

一个JMS应用由以下几个模块组成:

3.1. Administered Objects

JMS应用的destinations和connection factories最后是通过管理而不是编程来使用,因为不同的provider使用他们的方法不一样。

JMS 客户应该使用同一的接口得到这些objects,从而使用JMS应用可以运行在不同provider上,而不需要修改或修改很少。通常管理员在JNDI上设置administered objects, 然后JMS clients 在JNDI上look up这些对象。

3.1.1 Connection Factories:

connection factory 是client用来生成与provider的connection的对象。connection factory封装了一套由管理员定义的connection configuration参数。每个connection factory 是一个QueueConnectionFactory 或 TopicConnectionFactory接口的实例。

在JMS 客户程序中, 通常先执行connection factory 的JNDI API lookup。 如下例:

Context ctx = new InitialContext();

QueueConnectionFactory queueConnectionFactory =

  (QueueConnectionFactory) ctx.lookup("QueueConnectionFactory");

TopicConnectionFactory topicConnectionFactory =

  (TopicConnectionFactory) ctx.lookup("TopicConnectionFactory");

如果调用不带参数的InitialContext的lookup方法,就在当前classpath 找jndi.properties文件。

3.1.2 Destinations:

Destination 是一个对象用于定义用户产生的messages 的去向或用户使用的messages 的来源。

在PTP里,destinations被称作queues, 可以用下面的J2EE SDK command来生成它们:

j2eeadmin -addJmsDestination queue_name queue

在pub/sub里, destinations被称为topics, 可以用下面的J2EE SDK command 来生成它们:

j2eeadmin -addJmsDestination topic_name topic

一个JMS应用可以同时使用多个queues 和/或topics。

除了lookup connection factory, 也常要lookup destination。例如:

Topic myTopic = (Topic) ctx.lookup("MyTopic");

Queue myQueue = (Queue) ctx.lookup("MyQueue");

3.2. Connections

Connection封装了一个与JMS provider的虚拟连接。Connection表示在client和provider service daemon之间打开的TCP/IP socket。可以用connection 生成一个或多个sessions。

就象connection factories, connections有两种方式:实现QueueConnection或TopicConnection接口。例如, 当有一个QueueConnectionFactory 或TopicConnectionFactory对象, 可以用他们来创造一个connection:

QueueConnection queueConnection =

  queueConnectionFactory.createQueueConnection();

TopicConnection topicConnection =

  topicConnectionFactory.createTopicConnection();

当应用程序完成后, 必须关闭你所创建的connections。否则JMS provider 无法释放资源。关闭了connection同时也关闭了sessions和message产生者和message使用者。

queueConnection.close();

topicConnection.close();

在使用messages前, 必须调用connection的start方法。如果要暂时停止传送message而不关闭connection, 可以调用stop方法。

connection factory 是client用来生成与provider的connection的对象。connection factory封装了一套由管理员定义的connection configuration参数。每个connection factory 是一个QueueConnectionFactory 或 TopicConnectionFactory接口的实

3.3. Sessions

Session是单线程的context用于产生和使用messages。用sessions创建 message producers, message consumers, and messages。Sessions管理message listeners的执行顺序。

Ssession提供transactional context用于将一系列的sends和receives动作组合在一个工作单元里。

Sessions, 就象connections, 也有两种方式:实现QueueSession或TopicSession接口。例如:

TopicSession topicSession =

  topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);

第一个参数表示sessiong不实现事务处理; 第二个参数表示当session成功收到messages后自动确认。

相同的,可以用QueueConnection对象创建QueueSession:

QueueSession queueSession =

  queueConnection.createQueueSession(true, 0);

这里, 第一个参数表示session实现事务处理; 第二个参数表示当session成功收到messages后不自动确认。

3.4. Message Producers

message producer是由session创建和一个object,用于将messages传递到destination. PTP方式的message producer实现QueueSender接口。 pub/sub方式的message producer实现TopicPublisher接口。

例如::

QueueSender queueSender = queueSession.createSender(myQueue);

TopicPublisher topicPublisher = topicSession.createPublisher(myTopic);

用null作为createSender或createPublisher的参数,可以创建一个不确定的producer。用不确定的producer, 可以等到真正send或publish message的时候才指定destination

当创建了一个message producer, 就可以用它来发送messages. (You have to create the messages first; see Section 3.6, "Messages.") Wi。例如:

queueSender.send(message);

topicPublisher.publish(message);

3.5. Message Consumer

message consumer也中由session创建和一个object,用于接收发送到destination 的message 。message consumer允许JMS client到JMS provider注册感兴趣的destination。JMS provider管理messages从destination到注册了这个destination的consumers之间的传送。

PTP方式的message consumer实现QueueReceiver接口。pub/sub方式的message consumer实现TopicSubscriber接口。

例如:

QueueReceiver queueReceiver = queueSession.createReceiver(myQueue);

TopicSubscriber topicSubscriber = topicSession.createSubscriber(myTopic);

可以用TopicSession.createDurableSubscriber方法创建一个durable topic subscriber。

当创建了一个message consumer, 它就是活动的,就可以用它接收messages。可以用QueueReceiver 或TopicSubscriber的close方法把message consumer变成不活动的。Message的传送在调用了connection的start方法后才开始。

不论是QueueReceiver或TopicSubscriber, 都可以用receive方法来同步consume message。 你可以在调用start方法后的任何时间调用它:

queueConnection.start();

Message m = queueReceiver.receive();

topicConnection.start();

Message m = topicSubscriber.receive(1000); // time out after a second

异步consume message, 可以使用message listener。

3.5.1   Message Listeners

message listener是一个对象,用作充当messages的异步事件处理器。它实现了MessageListener接口, 它只有一个方法:onMessage. 在onMessage方法内, 可以定义当收到一个message后做的事情。

用setMessageListener方法在某个QueueReceiver 或TopicSubscriber里注册message listener。例如:

TopicListener topicListener = new TopicListener();

topicSubscriber.setMessageListener(topicListener);

当注册了message listener, 调用QueueConnection或TopicConnection的方法来开始传送message。

当message开始传送, 当有message送来,message consumer自动调用message listener的 onMessage方法。onMessage方法只有一个Message类型的参数。

message listener并不对应特定的destination类型. 相同的listener可以从queue或topic上得到message, 这由listener是由QueueReceiver还是 TopicSubscriber对象设置的。然而message listener通常对应某一个message类型或格式, 如果要回应messages, message listener必须创建一个message producer。

onMessage方法应该处理所有的exceptions。

Session负责管理message listeners的执行顺序。任何时候,只有一个message listeners在运行。

在J2EE 1.3中, message-driven bean是一个特殊的message listener。

 3.5.2  Message Selectors

如果你的消息应用程序需要过滤收到的messages, 可以用JMS API中的message selector来让message consumer定义它所感兴趣的messages。Message selectors负责过滤到JMS provider的message,而不是到应用程序的。

message selector是一个含有表达式的字符串。表达式的语法是SQL92 conditional expression syntax的一个子集。当创建message consumer时, createReceiver, createSubscriber, 和createDurableSubscriber方法都可以让你定义某个message selector作为参数。

message consumer只接收headers和properties与selector匹配的messages。message selector不能根据message body的内容进行选择。

3.6   Messages

JMS application的最终目的是为了与其它应用程序分享messages。JMS messages有一个基本的格式,这种格式既简单又灵活,可以上你生成在其它平台上非JMS applications可以使用的格式。

JMS message有三个部分:

1. header

2. Properties (可选)

3. body (可选)

3.6.1   Message Headers

JMS message header包含几具预定义的fields,它的值可以被clients和providers用来识别和发送messages. (Table 3.1 例出了JMS message header和fields和如何set它们的值。) 例如, 每一个message都有唯一个的标识符, 表示在JMSMessageID里。JMSDestination表示message发送到的queue或topic。 其它fields包括timestamp和优先级。

每一个header field都有对应的setter和getter方法。某些header fields是由客户set的, 但有些是自动在send或publish方法中set的, 这有可能覆盖一一些客户set的值:

Table 3.1:    如何设置JMS Message Header Field的值

Header Field Set By

JMSDestination send or publish method

JMSDeliveryMode send or publish method

JMSExpiration send or publish method

JMSPriority send or publish method

JMSMessageID send or publish method

JMSTimestamp send or publish method

JMSCorrelationID Client

JMSReplyTo Client

JMSType Client

JMSRedelivered JMS provider

3.6.2   Message Properties

你可以创建和设置messages properties以补充header fields的不足。可以通过使用properties来达到与其它消息系统的兼容, 或者用它们来做message selectors。

3.6.3   Message Bodies

JMS API定义了5种message body格式, 让你可以用不同的格式send和receive数据,并与现有的message格式兼容。Table 3.2描述了各种消息格式。

Table 3.2:    JMS Message格式

Message Type Body Contains

TextMessage A java.lang.String object (for example, the contents of an Extensible Markup Language file).

MapMessage A set of name/value pairs, with names as String objects and values as primitive types in the Java programming language. The entries can be accessed sequentially by enumerator or randomly by name. The order of the entries is undefined.

BytesMessage A stream of uninterpreted bytes. This message type is for literally encoding a body to match an existing message format.

StreamMessage A stream of primitive values in the Java programming language, filled and read sequentially.

ObjectMessage A Serializable object in the Java programming language.

Message Nothing. Composed of header fields and properties only. This message type is useful when a message body is not required.

JMS API为每一种messages都提供了一种create方法。例如:

TextMessage message = queueSession.createTextMessage();

message.setText(msg_text);     // msg_text is a String

queueSender.send(message);

在使用者一端, 必须将收到的Message cast成适当的message类型。 例如:

Message m = queueReceiver.receive();

if (m instanceof TextMessage) {

    TextMessage message = (TextMessage) m;

    System.out.println("Reading message: " + message.getText());

} else {

    // Handle error

}

3.7   Exception Handling

JMS API的方法的Exception的根类是JMSException。JMSException类包括如下子类:

IllegalStateException

InvalidClientIDException

InvalidDestinationException

InvalidSelectorException

JMSSecurityException

MessageEOFException

MessageFormatException

MessageNotReadableException

MessageNotWriteableException

ResourceAllocationException

TransactionInProgressException

TransactionRolledBackException