天天看點

camel JMS replyTo

package test.requestreply;

import javax.jms.ConnectionFactory;

import org.apache.activemq.ActiveMQConnectionFactory;

import org.apache.activemq.camel.component.ActiveMQComponent;

import org.apache.camel.CamelContext;

import org.apache.camel.Exchange;

import org.apache.camel.Processor;

import org.apache.camel.ProducerTemplate;

import org.apache.camel.builder.RouteBuilder;

import org.apache.camel.component.mock.MockEndpoint;

import org.apache.camel.impl.DefaultCamelContext;

public class Test {

publicstaticvoid main(String args[])throws Exception {

fromJava();

}

publicstaticvoid fromJava()throws Exception {

CamelContext context = new DefaultCamelContext();

ProducerTemplate template = context.createProducerTemplate();

ActiveMQComponent component = context.getComponent("activemq", ActiveMQComponent.class);

ConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false&broker.useJmx=false");

component.setConnectionFactory(cf);

context.addRoutes(new RouteBuilder() {

@Override

publicvoid configure()throws Exception {

from("activemq:queue:queue2").inOut("activemq:queue:queue3").process(new TestProcessor())

.to("mock:result").to("log:com.citi.etrading.retail.tps.batch.common.processor.ETPSBatchMessageConverter?level=INFO");

from("activemq:queue:queue3").process(new TestProcessor1())

.to("mock:result");

 

from("activemq:queue:queue1?replyTo=queue:queue2&replyToType=Exclusive")

.transform(constant("Bye Camel"));

//.to("activemq:queue:queue3");

}

});

context.start();

template.sendBody("activemq:queue:queue1","Hello World");

Thread.sleep(5000);

MockEndpoint endpoint = (MockEndpoint)context.getEndpoint("mock:result");

endpoint.expectedBodiesReceived("Hello World");

context.stop();

}

publicstaticclass TestProcessorimplements Processor{

publicvoid process(Exchange exchange)throws Exception {

//TODO Auto-generated method stub

System.out.println(exchange);

System.out.println("queue2");

}

}

publicstaticclass TestProcessor1implements Processor{

publicvoid process(Exchange exchange)throws Exception {

//TODO Auto-generated method stub

System.out.println(exchange);

System.out.println("queue3");

}

}

}

 

===============

http://stackoverflow.com/questions/10085340/camel-is-it-possible-to-implement-request-reply-competing-consumers-with-the

If you use Camel 2.9 or better, then I suggest to use replyToType=Exclusive on the activemq endpoint where you do request/reply. This tells Camel that the queue is exclusive, and it speedup, as no JMS message selectors is needed to pickup expected correlated messages.

See the section Request-reply over JMS onwards at the Camel JMS docs:http://camel.apache.org/jms

If you use temporary queues, then that is also fast as well, as no JMS message selectors is needed.

Also your route starts with a direct endpoint. That is a synchronous call, so the caller will wait/block until the Exchange is completely done.

Also the Splitter EIP can be configured to run in parallel mode which will use concurrent processing. And if you have a big message to split, then consider using streaming which will split the message on-demand, instead of loading the entire message content into memory.

Anyway there is a lot going on in the route. Can you pin-point more precisely where you have an issue? It makes it easier to help out.

=================

http://grokbase.com/t/camel/users/128n88xeva/how-to-use-request-reply-in-jms

Hi

Just configure the name of the reply queue on the from uri. You may

want to configure the reply queue as being exclusive then it runs

faster. Then when the route completes, the message will be send back

to the reply queue. eg in the example below, after the processor has

run, the message containing "Hello World" will be send back to the

OUTPUT.Q.

from("activemq://queue:INPUT.Q?jmsMessageType=Object&replyTo=OUTPUT.Q&replyToType=Exclusive")

.process(new

Processor() {

@Override

public void process(Exchange exchange) throws Exception {

exchange.getOut().setBody("Hello World");

}

});

I suggest to read the JMS page. It has sections about request/reply

etc, and some pointers how and what to look out for. And what to

configure etc.

http://camel.apache.org/jms

And mind that you can use 3 kind of queues when doing request/reply

- temporary queue

- shared queue

- exclusive queue

So mind the difference.

Make sure someone is listening on OUTPUT.Q and sending back a message

to REPLY.Q.

You get this exception because Camel timed out after 20 seconds. You

can adjust this value if you need longer timeout.

===================

http://camel.465427.n5.nabble.com/After-publish-OutOnly-Reply-received-for-unknown-correlationID-td3368679.html

After publish (OutOnly): Reply received for unknown correlationID

Use the disableReplyTo option to not expect any replies.

disableReplyTo false If true, a producer will behave like a InOnly exchange with the exception that JMSReplyTo header is sent out and not be suppressed like in the case of InOnly. Like InOnly the producer will not wait for a reply. A consumer with this flag will behave like InOnly. This feature can be used to bridge InOut requests to another queue so that a route on the other queue will send it´s response directly back to the original JMSReplyTo.

================http://camel.apache.org/jms.html

About using Camel to send and receive messages and JMSReplyTo

The JMS component is complex and you have to pay close attention to how it works in some cases. So this is a short summary of some of the areas/pitfalls to look for.

When Camel sends a message using its

JMSProducer

, it checks the following conditions:

  • The message exchange pattern,
  • Whether a

    JMSReplyTo

    was set in the endpoint or in the message headers,
  • Whether any of the following options have been set on the JMS endpoint:

    disableReplyTo

    ,

    preserveMessageQos

    ,

    explicitQosEnabled

    .

All this can be a tad complex to understand and configure to support your use case.

JmsProducer

The

JmsProducer

behaves as follows, depending on configuration:

Exchange Pattern Other options Description
InOut - Camel will expect a reply, set a temporary

JMSReplyTo

, and after sending the message, it will start to listen for the reply message on the temporary queue.
InOut

JMSReplyTo

is set
Camel will expect a reply and, after sending the message, it will start to listen for the reply message on the specified

JMSReplyTo

queue.
InOnly - Camel will send the message and not expect a reply.
InOnly

JMSReplyTo

is set
By default, Camel discards the

JMSReplyTo

destination and clears the

JMSReplyTo

header before sending the message. Camel then sends the message and does not expect a reply. Camel logs this in the log at

WARN

level (changed to

DEBUG

level from Camel 2.6 onwards. You can use

preserveMessageQuo=true

to instruct Camel to keep the

JMSReplyTo

. In all situations the

JmsProducer

does not expect any reply and thus continue after sending the message.

JmsConsumer

The

JmsConsumer

behaves as follows, depending on configuration:

Exchange Pattern Other options Description
InOut - Camel will send the reply back to the

JMSReplyTo

queue.
InOnly - Camel will not send a reply back, as the pattern is InOnly.
-

disableReplyTo=true

This option suppresses replies.

So pay attention to the message exchange pattern set on your exchanges.

If you send a message to a JMS destination in the middle of your route you can specify the exchange pattern to use, see more at Request Reply.

This is useful if you want to send an

InOnly

message to a JMS topic:

from("activemq:queue:in")

.to("bean:validateOrder")

.to(ExchangePattern.InOnly, "activemq:topic:order")

.to("bean:handleOrder");

Request-reply over JMS

Camel supports Request Reply over JMS. In essence the MEP of the Exchange should be

InOut

when you send a message to a JMS queue.

Camel offers a number of options to configure request/reply over JMS that influence performance and clustered environments. The table below summaries the options.

Option Performance Cluster Description

Temporary

Fast Yes A temporary queue is used as reply queue, and automatic created by Camel. To use this do not specify a replyTo queue name. And you can optionally configure

replyToType=Temporary

to make it stand out that temporary queues are in use.

Shared

Slow Yes A shared persistent queue is used as reply queue. The queue must be created beforehand, although some brokers can create them on the fly such as Apache ActiveMQ. To use this you must specify the replyTo queue name. And you can optionally configure

replyToType=Shared

to make it stand out that shared queues are in use. A shared queue can be used in a clustered environment with multiple nodes running this Camel application at the same time. All using the same shared reply queue. This is possible because JMS Message selectors are used to correlate expected reply messages; this impacts performance though. JMS Message selectors is slower, and therefore not as fast as

Temporary

or

Exclusive

queues. See further below how to tweak this for better performance.

Exclusive

Fast No (*Yes) An exclusive persistent queue is used as reply queue. The queue must be created beforehand, although some brokers can create them on the fly such as Apache ActiveMQ. To use this you must specify the replyTo queue name. And you must configure

replyToType=Exclusive

to instruct Camel to use exclusive queues, as

Shared

is used by default, if a

replyTo

queue name was configured. When using exclusive reply queues, then JMS Message selectors are not in use, and therefore other applications must not use this queue as well. An exclusive queue cannot be used in a clustered environment with multiple nodes running this Camel application at the same time; as we do not have control if the reply queue comes back to the same node that sent the request message; that is why shared queues use JMS Message selectors to make sure of this. Though if you configure each Exclusive reply queue with an unique name per node, then you can run this in a clustered environment. As then the reply message will be sent back to that queue for the given node, that awaits the reply message.

concurrentConsumers

Fast Yes Camel 2.10.3: Allows to process reply messages concurrently using concurrent message listeners in use. You can specify a range using the

concurrentConsumers

and

maxConcurrentConsumers

options. Notice: That using

Shared

reply queues may not work as well with concurrent listeners, so use this option with care.

maxConcurrentConsumers

Fast Yes Camel 2.10.3: Allows to process reply messages concurrently using concurrent message listeners in use. You can specify a range using the

concurrentConsumers

and

maxConcurrentConsumers

options. Notice: That using

Shared

reply queues may not work as well with concurrent listeners, so use this option with care.

The

JmsProducer

detects the

InOut

and provides a

JMSReplyTo

header with the reply destination to be used. By default Camel uses a temporary queue, but you can use the

replyTo

option on the endpoint to specify a fixed reply queue (see more below about fixed reply queue).

Camel will automatic setup a consumer which listen on the reply queue, so you should not do anything.

This consumer is a Spring

DefaultMessageListenerContainer

which listen for replies. However it's fixed to 1 concurrent consumer.

That means replies will be processed in sequence as there are only 1 thread to process the replies. If you want to process replies faster, then we need to use concurrency. But not using the

concurrentConsumer

option. We should use the

threads

from the Camel DSL instead, as shown in the route below:

Icon

Instead of using threads, then use concurrentConsumers option if using Camel 2.10.3 or better. See further below.

from(xxx)

.inOut().to("activemq:queue:foo")

.threads(

5

)

.to(yyy)

.to(zzz);

In this route we instruct Camel to route replies asynchronously using a thread pool with 5 threads.

From Camel 2.10.3 onwards you can now configure the listener to use concurrent threads using the

concurrentConsumers

and

maxConcurrentConsumers

options. This allows you to easier configure this in Camel as shown below:

from(xxx)

.inOut().to("activemq:queue:foo?concurrentConsumers=

5

")

.to(yyy)

.to(zzz);

Request-reply over JMS and using a shared fixed reply queue

If you use a fixed reply queue when doing Request Reply over JMS as shown in the example below, then pay attention.

from(xxx)

.inOut().to("activemq:queue:foo?replyTo=bar")

.to(yyy)

In this example the fixed reply queue named "bar" is used. By default Camel assumes the queue is shared when using fixed reply queues, and therefore it uses a

JMSSelector

to only pickup the expected reply messages (eg based on the

JMSCorrelationID

). See next section for exclusive fixed reply queues. That means its not as fast as temporary queues. You can speedup how often Camel will pull for reply messages using the

receiveTimeout

option. By default its 1000 millis. So to make it faster you can set it to 250 millis to pull 4 times per second as shown:

from(xxx)

.inOut().to("activemq:queue:foo?replyTo=bar&receiveTimeout=

250

")

.to(yyy)

Notice this will cause the Camel to send pull requests to the message broker more frequent, and thus require more network traffic.

It is generally recommended to use temporary queues if possible.

Request-reply over JMS and using an exclusive fixed reply queue

Available as of Camel 2.9

In the previous example, Camel would anticipate the fixed reply queue named "bar" was shared, and thus it uses a

JMSSelector

to only consume reply messages which it expects. However there is a drawback doing this as JMS selectos is slower. Also the consumer on the reply queue is slower to update with new JMS selector ids. In fact it only updates when the

receiveTimeout

option times out, which by default is 1 second. So in theory the reply messages could take up till about 1 sec to be detected. On the other hand if the fixed reply queue is exclusive to the Camel reply consumer, then we can avoid using the JMS selectors, and thus be more performant. In fact as fast as using temporary queues. So in Camel 2.9 onwards we introduced the

ReplyToType

option which you can configure to

Exclusive

to tell Camel that the reply queue is exclusive as shown in the example below:

from(xxx)

.inOut().to("activemq:queue:foo?replyTo=bar&replyToType=Exclusive")

.to(yyy)

Mind that the queue must be exclusive to each and every endpoint. So if you have two routes, then they each need an unique reply queue as shown in the next example:

from(xxx)

.inOut().to("activemq:queue:foo?replyTo=bar&replyToType=Exclusive")

.to(yyy)

from(aaa)

.inOut().to("activemq:queue:order?replyTo=order.reply&replyToType=Exclusive")

.to(bbb)

The same applies if you run in a clustered environment. Then each node in the cluster must use an unique reply queue name. As otherwise each node in the cluster may pickup messages which was intended as a reply on another node. For clustered environments its recommended to use shared reply queues instead.