package test.requestreply;
import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultCamelContext;
public class Test {
publicstaticvoid main(String args[])throws Exception {
fromJava();
}
publicstaticvoid fromJava()throws Exception {
CamelContext context = new DefaultCamelContext();
ProducerTemplate template = context.createProducerTemplate();
ActiveMQComponent component = context.getComponent("activemq", ActiveMQComponent.class);
ConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false&broker.useJmx=false");
component.setConnectionFactory(cf);
context.addRoutes(new RouteBuilder() {
@Override
publicvoid configure()throws Exception {
from("activemq:queue:queue2").inOut("activemq:queue:queue3").process(new TestProcessor())
.to("mock:result").to("log:com.citi.etrading.retail.tps.batch.common.processor.ETPSBatchMessageConverter?level=INFO");
from("activemq:queue:queue3").process(new TestProcessor1())
.to("mock:result");
from("activemq:queue:queue1?replyTo=queue:queue2&replyToType=Exclusive")
.transform(constant("Bye Camel"));
//.to("activemq:queue:queue3");
}
});
context.start();
template.sendBody("activemq:queue:queue1","Hello World");
Thread.sleep(5000);
MockEndpoint endpoint = (MockEndpoint)context.getEndpoint("mock:result");
endpoint.expectedBodiesReceived("Hello World");
context.stop();
}
publicstaticclass TestProcessorimplements Processor{
publicvoid process(Exchange exchange)throws Exception {
//TODO Auto-generated method stub
System.out.println(exchange);
System.out.println("queue2");
}
}
publicstaticclass TestProcessor1implements Processor{
publicvoid process(Exchange exchange)throws Exception {
//TODO Auto-generated method stub
System.out.println(exchange);
System.out.println("queue3");
}
}
}
===============
http://stackoverflow.com/questions/10085340/camel-is-it-possible-to-implement-request-reply-competing-consumers-with-the
If you use Camel 2.9 or better, then I suggest to use replyToType=Exclusive on the activemq endpoint where you do request/reply. This tells Camel that the queue is exclusive, and it speedup, as no JMS message selectors is needed to pickup expected correlated messages.
See the section Request-reply over JMS onwards at the Camel JMS docs:http://camel.apache.org/jms
If you use temporary queues, then that is also fast as well, as no JMS message selectors is needed.
Also your route starts with a direct endpoint. That is a synchronous call, so the caller will wait/block until the Exchange is completely done.
Also the Splitter EIP can be configured to run in parallel mode which will use concurrent processing. And if you have a big message to split, then consider using streaming which will split the message on-demand, instead of loading the entire message content into memory.
Anyway there is a lot going on in the route. Can you pin-point more precisely where you have an issue? It makes it easier to help out.
=================
http://grokbase.com/t/camel/users/128n88xeva/how-to-use-request-reply-in-jms
Hi
Just configure the name of the reply queue on the from uri. You may
want to configure the reply queue as being exclusive then it runs
faster. Then when the route completes, the message will be send back
to the reply queue. eg in the example below, after the processor has
run, the message containing "Hello World" will be send back to the
OUTPUT.Q.
from("activemq://queue:INPUT.Q?jmsMessageType=Object&replyTo=OUTPUT.Q&replyToType=Exclusive")
.process(new
Processor() {
@Override
public void process(Exchange exchange) throws Exception {
exchange.getOut().setBody("Hello World");
}
});
I suggest to read the JMS page. It has sections about request/reply
etc, and some pointers how and what to look out for. And what to
configure etc.
http://camel.apache.org/jms
And mind that you can use 3 kind of queues when doing request/reply
- temporary queue
- shared queue
- exclusive queue
So mind the difference.
Make sure someone is listening on OUTPUT.Q and sending back a message
to REPLY.Q.
You get this exception because Camel timed out after 20 seconds. You
can adjust this value if you need longer timeout.
===================
http://camel.465427.n5.nabble.com/After-publish-OutOnly-Reply-received-for-unknown-correlationID-td3368679.html
After publish (OutOnly): Reply received for unknown correlationID
Use the disableReplyTo option to not expect any replies.
disableReplyTo false If true, a producer will behave like a InOnly exchange with the exception that JMSReplyTo header is sent out and not be suppressed like in the case of InOnly. Like InOnly the producer will not wait for a reply. A consumer with this flag will behave like InOnly. This feature can be used to bridge InOut requests to another queue so that a route on the other queue will send it´s response directly back to the original JMSReplyTo.
================http://camel.apache.org/jms.html
About using Camel to send and receive messages and JMSReplyTo
The JMS component is complex and you have to pay close attention to how it works in some cases. So this is a short summary of some of the areas/pitfalls to look for.
When Camel sends a message using its
JMSProducer
, it checks the following conditions:
- The message exchange pattern,
- Whether a
was set in the endpoint or in the message headers,JMSReplyTo
- Whether any of the following options have been set on the JMS endpoint:
,disableReplyTo
,preserveMessageQos
.explicitQosEnabled
All this can be a tad complex to understand and configure to support your use case.
JmsProducer
The
JmsProducer
behaves as follows, depending on configuration:
Exchange Pattern | Other options | Description |
---|---|---|
InOut | - | Camel will expect a reply, set a temporary , and after sending the message, it will start to listen for the reply message on the temporary queue. |
InOut | is set | Camel will expect a reply and, after sending the message, it will start to listen for the reply message on the specified queue. |
InOnly | - | Camel will send the message and not expect a reply. |
InOnly | is set | By default, Camel discards the destination and clears the header before sending the message. Camel then sends the message and does not expect a reply. Camel logs this in the log at level (changed to level from Camel 2.6 onwards. You can use to instruct Camel to keep the . In all situations the does not expect any reply and thus continue after sending the message. |
JmsConsumer
The
JmsConsumer
behaves as follows, depending on configuration:
Exchange Pattern | Other options | Description |
---|---|---|
InOut | - | Camel will send the reply back to the queue. |
InOnly | - | Camel will not send a reply back, as the pattern is InOnly. |
- | | This option suppresses replies. |
So pay attention to the message exchange pattern set on your exchanges.
If you send a message to a JMS destination in the middle of your route you can specify the exchange pattern to use, see more at Request Reply.
This is useful if you want to send an
InOnly
message to a JMS topic:
|
Request-reply over JMS
Camel supports Request Reply over JMS. In essence the MEP of the Exchange should be
InOut
when you send a message to a JMS queue.
Camel offers a number of options to configure request/reply over JMS that influence performance and clustered environments. The table below summaries the options.
Option | Performance | Cluster | Description |
---|---|---|---|
| Fast | Yes | A temporary queue is used as reply queue, and automatic created by Camel. To use this do not specify a replyTo queue name. And you can optionally configure to make it stand out that temporary queues are in use. |
| Slow | Yes | A shared persistent queue is used as reply queue. The queue must be created beforehand, although some brokers can create them on the fly such as Apache ActiveMQ. To use this you must specify the replyTo queue name. And you can optionally configure to make it stand out that shared queues are in use. A shared queue can be used in a clustered environment with multiple nodes running this Camel application at the same time. All using the same shared reply queue. This is possible because JMS Message selectors are used to correlate expected reply messages; this impacts performance though. JMS Message selectors is slower, and therefore not as fast as or queues. See further below how to tweak this for better performance. |
| Fast | No (*Yes) | An exclusive persistent queue is used as reply queue. The queue must be created beforehand, although some brokers can create them on the fly such as Apache ActiveMQ. To use this you must specify the replyTo queue name. And you must configure to instruct Camel to use exclusive queues, as is used by default, if a queue name was configured. When using exclusive reply queues, then JMS Message selectors are not in use, and therefore other applications must not use this queue as well. An exclusive queue cannot be used in a clustered environment with multiple nodes running this Camel application at the same time; as we do not have control if the reply queue comes back to the same node that sent the request message; that is why shared queues use JMS Message selectors to make sure of this. Though if you configure each Exclusive reply queue with an unique name per node, then you can run this in a clustered environment. As then the reply message will be sent back to that queue for the given node, that awaits the reply message. |
| Fast | Yes | Camel 2.10.3: Allows to process reply messages concurrently using concurrent message listeners in use. You can specify a range using the and options. Notice: That using reply queues may not work as well with concurrent listeners, so use this option with care. |
| Fast | Yes | Camel 2.10.3: Allows to process reply messages concurrently using concurrent message listeners in use. You can specify a range using the and options. Notice: That using reply queues may not work as well with concurrent listeners, so use this option with care. |
The
JmsProducer
detects the
InOut
and provides a
JMSReplyTo
header with the reply destination to be used. By default Camel uses a temporary queue, but you can use the
replyTo
option on the endpoint to specify a fixed reply queue (see more below about fixed reply queue).
Camel will automatic setup a consumer which listen on the reply queue, so you should not do anything.
This consumer is a Spring
DefaultMessageListenerContainer
which listen for replies. However it's fixed to 1 concurrent consumer.
That means replies will be processed in sequence as there are only 1 thread to process the replies. If you want to process replies faster, then we need to use concurrency. But not using the
concurrentConsumer
option. We should use the
threads
from the Camel DSL instead, as shown in the route below:
Icon
Instead of using threads, then use concurrentConsumers option if using Camel 2.10.3 or better. See further below.
|
In this route we instruct Camel to route replies asynchronously using a thread pool with 5 threads.
From Camel 2.10.3 onwards you can now configure the listener to use concurrent threads using the
concurrentConsumers
and
maxConcurrentConsumers
options. This allows you to easier configure this in Camel as shown below:
|
Request-reply over JMS and using a shared fixed reply queue
If you use a fixed reply queue when doing Request Reply over JMS as shown in the example below, then pay attention.
|
In this example the fixed reply queue named "bar" is used. By default Camel assumes the queue is shared when using fixed reply queues, and therefore it uses a
JMSSelector
to only pickup the expected reply messages (eg based on the
JMSCorrelationID
). See next section for exclusive fixed reply queues. That means its not as fast as temporary queues. You can speedup how often Camel will pull for reply messages using the
receiveTimeout
option. By default its 1000 millis. So to make it faster you can set it to 250 millis to pull 4 times per second as shown:
|
Notice this will cause the Camel to send pull requests to the message broker more frequent, and thus require more network traffic.
It is generally recommended to use temporary queues if possible.
Request-reply over JMS and using an exclusive fixed reply queue
Available as of Camel 2.9
In the previous example, Camel would anticipate the fixed reply queue named "bar" was shared, and thus it uses a
JMSSelector
to only consume reply messages which it expects. However there is a drawback doing this as JMS selectos is slower. Also the consumer on the reply queue is slower to update with new JMS selector ids. In fact it only updates when the
receiveTimeout
option times out, which by default is 1 second. So in theory the reply messages could take up till about 1 sec to be detected. On the other hand if the fixed reply queue is exclusive to the Camel reply consumer, then we can avoid using the JMS selectors, and thus be more performant. In fact as fast as using temporary queues. So in Camel 2.9 onwards we introduced the
ReplyToType
option which you can configure to
Exclusive
to tell Camel that the reply queue is exclusive as shown in the example below:
|
Mind that the queue must be exclusive to each and every endpoint. So if you have two routes, then they each need an unique reply queue as shown in the next example:
|
The same applies if you run in a clustered environment. Then each node in the cluster must use an unique reply queue name. As otherwise each node in the cluster may pickup messages which was intended as a reply on another node. For clustered environments its recommended to use shared reply queues instead.